From 5049ea9f5a7a80aa8121caa5afdf611b6d605a56 Mon Sep 17 00:00:00 2001 From: Dong Wang Date: Wed, 7 Jan 2026 01:33:31 +0000 Subject: [PATCH 1/7] mini training setup, add --use_array False to make head a worker and all workers using one allocatio --- matrix/cli.py | 17 ++-- matrix/client/endpoint_cache.py | 7 +- matrix/cluster/ray_cluster.py | 140 +++++++++++++++++++++---------- matrix/cluster/ray_head_job.py | 24 +++++- matrix/cluster/ray_worker_job.py | 21 ++--- matrix/common/cluster_info.py | 1 + 6 files changed, 143 insertions(+), 67 deletions(-) diff --git a/matrix/cli.py b/matrix/cli.py index 1a64906..2671397 100644 --- a/matrix/cli.py +++ b/matrix/cli.py @@ -81,25 +81,23 @@ def start_cluster( local: tp.Dict[str, tp.Union[str, int]] | None = None, enable_grafana: bool = False, force_new_head: bool = False, + use_array: bool = True, ) -> tp.Dict[str, tp.Any]: - """ - Starts the Ray cluster with additional keyword arguments. Only do this for new clusters. - - Args: - **kwargs: Arbitrary keyword arguments passed to the RayCluster's start_head method. - """ """ Starts the Ray cluster with the specified number of workers and additional configuration. - + Can add additional workers if the cluster already exists. - + Args: add_workers (int): Number of worker nodes to add in the cluster. slurm (dict, optional): resources for slurm cluster. local (dict, optional): resources for local cluster. enable_grafana (bool, optional): If True, enable prometheus and grafana dashboard. force_new_head (bool): force to remove head.json if haven't run 'matrix stop_cluster'. - + use_array (bool): If True, use Slurm job arrays for workers (default: True). + When False, workers are submitted as individual jobs and the head node + allocates GPU when starting a new cluster with workers. + Returns: None """ @@ -109,6 +107,7 @@ def start_cluster( local, enable_grafana=enable_grafana, force_new_head=force_new_head, + use_array=use_array, ) return convert_to_json_compatible(status) diff --git a/matrix/client/endpoint_cache.py b/matrix/client/endpoint_cache.py index ba115a7..2d566aa 100644 --- a/matrix/client/endpoint_cache.py +++ b/matrix/client/endpoint_cache.py @@ -45,7 +45,12 @@ async def __call__(self, force_update=False): if status is not None and status == 200: ray_query_result = json.loads(content) head_ip = ray_query_result["controller_info"]["node_ip"] - self.ips = set([y["node_ip"] for x, y in ray_query_result["proxies"].items() if y["status"] == "HEALTHY" and y["node_ip"] != head_ip]) # type: ignore[attr-defined] + self.ips = set([y["node_ip"] for x, y in ray_query_result["proxies"].items() if y["status"] == "HEALTHY"]) # type: ignore[attr-defined] + if ( + len(self.ips) > 1 + and not self.cluster_info.head_is_worker + ): + self.ips.discard(head_ip) else: raise Exception(f"status: {status}, {content}") else: diff --git a/matrix/cluster/ray_cluster.py b/matrix/cluster/ray_cluster.py index 23f6567..fb9c7c2 100644 --- a/matrix/cluster/ray_cluster.py +++ b/matrix/cluster/ray_cluster.py @@ -18,6 +18,8 @@ import typing as tp from pathlib import Path +import psutil + from matrix.common import JOB_MANAGER_STORE from matrix.common.cluster_info import ClusterInfo from matrix.utils.basics import convert_to_json_compatible @@ -138,6 +140,58 @@ def _parse_gpu_gres(gres_str): return None +def _apply_default_requirements( + requirements: dict[str, tp.Any], + executor: str, +) -> dict[str, tp.Any]: + """ + Apply default Slurm requirements and auto-detect resources from partition. + + Args: + requirements: User-provided requirements + executor: "slurm" or "local" + + Returns: + Updated requirements with defaults and auto-detected values + """ + default_params: dict[str, tp.Any] = { + "ntasks_per_node": 1, + "timeout_min": 10080, + } + + partition = requirements.get("partition") + if partition: + default_params["partition"] = partition + + # Auto-detect resources from Slurm partition info + if executor == "slurm": + default_params = _get_slurm_default_requirements(default_params) + else: + num_cpus = max((os.cpu_count() or 0), 1) + mem_gb = psutil.virtual_memory().total // (1024**3) + num_gpus = len( + [s for s in os.environ.get("CUDA_VISIBLE_DEVICES", "").split(",") if s] + ) + + default_params["cpus_per_task"] = num_cpus + default_params["mem_gb"] = mem_gb + default_params["gpus_per_node"] = num_gpus + + # Merge defaults into requirements (user values take precedence) + result = requirements.copy() + result.update( + {key: value for key, value in default_params.items() if key not in requirements} + ) + if "mem_gb" not in requirements and "cpus_per_task" in requirements: + result["mem_gb"] = ( + default_params["mem_gb"] + * requirements["cpus_per_task"] + // default_params["cpus_per_task"] + ) # take a fraction based on cpu + + return result + + class RayCluster: """ Manages the lifecycle of a Ray cluster on a Slurm-based system. @@ -279,6 +333,7 @@ def start( local: tp.Dict[str, tp.Union[str, int]] | None, enable_grafana: bool = False, force_new_head: bool = False, + use_array: bool = True, ): """ Starts a Ray cluster on Slurm. @@ -296,6 +351,9 @@ def start( enable_grafana (bool): Whether to start Prometheus and Grafana for monitoring (default: True). force_new_head (bool): force to remove head.json if haven't run 'matrix stop_cluster'. + use_array (bool): If True, use Slurm job arrays for workers (default: True). + When False, workers are submitted as individual jobs and the head node + allocates GPU when starting a new cluster with workers. """ import submitit @@ -325,29 +383,33 @@ def start( if self._cluster_json.exists(): self._cluster_json.unlink() - if self._cluster_json.exists(): + # the host's full resources + requirements_updated = _apply_default_requirements(requirements, executor) + logical_resources = { + f"{key}-{value}": 1 + for key, value in requirements_updated.items() + if key in _SLURM_KEY_ALIASES.values() + } + + is_new_cluster = not self._cluster_json.exists() + + if not is_new_cluster: print(f"Adding workers to existing cluster:\n{self.cluster_info()}") - # todo: check the cluser is alive else: - # start the head node + head_params = requirements.copy() + if ( + use_array and add_workers > 0 + ): # cpu host, overwrite as the param are meant for workers + head_params["gpus_per_node"] = 0 + head_params["cpus_per_task"] = 20 + + head_params = _apply_default_requirements(head_params, executor) + + print(f"Head Slurm parameters: {head_params}") s_executor = submitit.AutoExecutor( folder=str(self._log_dir), cluster=executor, ) - head_default_params = {"timeout_min": 10080, "cpus_per_task": 20} - if add_workers == 0: - head_params = requirements - else: - head_params = { - k: v for k, v in requirements.items() if k in common_params - } - head_params.update( - { - key: value - for key, value in head_default_params.items() - if key not in head_params - } - ) s_executor.update_parameters( name=f"ray_head_{self.cluster_id}", **head_params, @@ -358,6 +420,9 @@ def start( self._cluster_json, worker_wait_timeout_seconds, executor, + head_params, + logical_resources, + not use_array, ) self._add_job(head_job) create_symlinks(self._log_dir, "head", head_job.paths) @@ -394,45 +459,35 @@ def start( self.start_grafana(force=True) # start the workers + if is_new_cluster and not use_array: + add_workers -= 1 # head becomes a worker too if add_workers > 0: s_executor = submitit.AutoExecutor( folder=str(self._log_dir), cluster=executor ) - default_params: dict[str, tp.Any] = { - "ntasks_per_node": 1, - "timeout_min": 10080, - } - partition = requirements.get("partition") - if partition: - default_params["partition"] = partition - if executor == "slurm": - default_params = _get_slurm_default_requirements(default_params) - requirements.update( - { - key: value - for key, value in default_params.items() - if key not in requirements - } - ) - print(requirements) + + worker_params = _apply_default_requirements(requirements, executor) + if not use_array: + worker_params["nodes"] = add_workers + num_jobs = 1 + else: + num_jobs = add_workers + print(f"Worker Slurm parameters: {worker_params}") + s_executor.update_parameters( name=f"ray_worker_{self.cluster_id}", - **requirements, + **worker_params, ) cluster_info = self.cluster_info() assert cluster_info is not None + + # submit job jobs = [] with ( s_executor.batch() ): # TODO set slurm array max parallelism here, because we really want all jobs to be scheduled at the same time - logical_resources = { - f"{key}-{value}": 1 - for key, value in requirements.items() - if key in _SLURM_KEY_ALIASES.values() - } - - for i in range(add_workers): + for i in range(num_jobs): jobs.append( s_executor.submit( RayWorkerJob( @@ -440,6 +495,7 @@ def start( worker_wait_timeout_seconds, start_wait_time_seconds, logical_resources, + worker_params, ) ) ) diff --git a/matrix/cluster/ray_head_job.py b/matrix/cluster/ray_head_job.py index 9521b20..0549409 100644 --- a/matrix/cluster/ray_head_job.py +++ b/matrix/cluster/ray_head_job.py @@ -31,6 +31,9 @@ def start_ray_head( cluster_json_path: Path, worker_wait_timeout_seconds: int, executor: str, + slurm_requirements: tp.Dict[str, tp.Any], + logical_resources: tp.Dict[str, tp.Any], + head_is_worker: bool = False, ): """Start the head node of the Ray cluster on slurm.""" hostname = socket.gethostname() @@ -63,6 +66,20 @@ def start_ray_head( ip_address = socket.gethostbyname(hostname) print(f"Host {hostname}:{port}, IP {ip_address}") + # Determine resource allocation for head + if head_is_worker: + # Head acts as worker - allocate actual resources using same logic as workers + num_cpus, num_gpus = ( + slurm_requirements["cpus_per_task"], + slurm_requirements["gpus_per_node"], + ) + logical_resources = json.dumps(logical_resources) + else: + # Head only - no compute resources + num_cpus = 0 + num_gpus = 0 + logical_resources = json.dumps({}) + with tempfile.TemporaryDirectory(dir="/tmp") as temp_dir: # Start Ray head process ray_process = subprocess.Popen( @@ -77,11 +94,13 @@ def start_ray_head( f"--metrics-export-port={metrics_port}", f"--temp-dir={temp_dir}", "--num-cpus", - "0", + str(num_cpus), "--num-gpus", - "0", + str(num_gpus), "--dashboard-host=0.0.0.0", f"--dashboard-agent-listen-port={dashboard_agent_listen_port}", + "--resources", + logical_resources, ], env=head_env, stdout=subprocess.PIPE, @@ -113,6 +132,7 @@ def start_ray_head( dashboard_agent_listen_port=int(dashboard_agent_listen_port), temp_dir=temp_dir, executor=executor, + head_is_worker=head_is_worker, ) with cluster_json_path.open("w") as f: json.dump(dataclasses.asdict(info), f) diff --git a/matrix/cluster/ray_worker_job.py b/matrix/cluster/ray_worker_job.py index eae16f3..3865863 100644 --- a/matrix/cluster/ray_worker_job.py +++ b/matrix/cluster/ray_worker_job.py @@ -22,25 +22,17 @@ class _RayWorkerConfiguration: """Configuration class for Ray worker job settings.""" cluster_info: Any + slurm_requirements: Dict[str, Any] worker_wait_timeout_seconds: int = 60 start_wait_time_seconds: int = 60 environment: Dict[str, str] = field(default_factory=dict) logical_resources: str = "{}" def _determine_resource_allocation(self) -> tuple: - """Dynamically determine CPU and GPU resources based on execution environment.""" - executor_type = os.environ.get("SUBMITIT_EXECUTOR", "slurm") - - if executor_type == "local": - num_cpus = max((os.cpu_count() or 0) // 2, 1) - num_gpus = len( - [s for s in os.environ.get("CUDA_VISIBLE_DEVICES", "").split(",") if s] - ) - else: - num_cpus = int(os.environ.get("SLURM_CPUS_ON_NODE", 1)) - num_gpus = int(os.environ.get("SLURM_GPUS_ON_NODE", 0)) - - return num_cpus, num_gpus + return ( + self.slurm_requirements["cpus_per_task"], + self.slurm_requirements["gpus_per_node"], + ) class _RayWorkerJobExecutor: @@ -120,6 +112,7 @@ def __init__( worker_wait_timeout_seconds: int, start_wait_time_seconds: int, # TODO pass this around properly logical_resources: dict[str, Any], + slurm_requirements: dict[str, Any], ): # Store the cluster information for later use self.cluster_info = cluster_info @@ -130,6 +123,7 @@ def __init__( # Initial wait time for Ray worker to start self.start_wait_time_seconds = 60 self.logical_resources = logical_resources + self.slurm_requirements = slurm_requirements def __call__( self, @@ -146,6 +140,7 @@ def __call__( worker_wait_timeout_seconds=self.worker_wait_timeout_seconds, start_wait_time_seconds=self.start_wait_time_seconds, logical_resources=json.dumps(self.logical_resources), + slurm_requirements=self.slurm_requirements, ) _RayWorkerJobExecutor.execute(config) diff --git a/matrix/common/cluster_info.py b/matrix/common/cluster_info.py index 459cc9d..10e037d 100644 --- a/matrix/common/cluster_info.py +++ b/matrix/common/cluster_info.py @@ -28,6 +28,7 @@ class ClusterInfo: dashboard_agent_listen_port: tp.Optional[int] = None temp_dir: tp.Optional[str] = None executor: tp.Optional[str] = None + head_is_worker: tp.Optional[bool] = None def get_head_http_host(cluster_info: ClusterInfo) -> str: From bf9788c04794210af71c2bd4e45de2bb1c94b49e Mon Sep 17 00:00:00 2001 From: Dong Wang Date: Wed, 7 Jan 2026 05:04:23 +0000 Subject: [PATCH 2/7] add test --- matrix/cluster/ray_cluster.py | 28 +- .../integration/cluster/test_no_array_mode.py | 304 ++++++++++++++++++ 2 files changed, 318 insertions(+), 14 deletions(-) create mode 100644 tests/integration/cluster/test_no_array_mode.py diff --git a/matrix/cluster/ray_cluster.py b/matrix/cluster/ray_cluster.py index fb9c7c2..a9ebe9c 100644 --- a/matrix/cluster/ray_cluster.py +++ b/matrix/cluster/ray_cluster.py @@ -383,16 +383,7 @@ def start( if self._cluster_json.exists(): self._cluster_json.unlink() - # the host's full resources - requirements_updated = _apply_default_requirements(requirements, executor) - logical_resources = { - f"{key}-{value}": 1 - for key, value in requirements_updated.items() - if key in _SLURM_KEY_ALIASES.values() - } - is_new_cluster = not self._cluster_json.exists() - if not is_new_cluster: print(f"Adding workers to existing cluster:\n{self.cluster_info()}") else: @@ -404,8 +395,13 @@ def start( head_params["cpus_per_task"] = 20 head_params = _apply_default_requirements(head_params, executor) - + logical_resources = { + f"{key}-{value}": 1 + for key, value in head_params.items() + if key in _SLURM_KEY_ALIASES.values() + } print(f"Head Slurm parameters: {head_params}") + s_executor = submitit.AutoExecutor( folder=str(self._log_dir), cluster=executor, @@ -462,18 +458,22 @@ def start( if is_new_cluster and not use_array: add_workers -= 1 # head becomes a worker too if add_workers > 0: - s_executor = submitit.AutoExecutor( - folder=str(self._log_dir), cluster=executor - ) - worker_params = _apply_default_requirements(requirements, executor) if not use_array: worker_params["nodes"] = add_workers num_jobs = 1 else: num_jobs = add_workers + logical_resources = { + f"{key}-{value}": 1 + for key, value in worker_params.items() + if key in _SLURM_KEY_ALIASES.values() + } print(f"Worker Slurm parameters: {worker_params}") + s_executor = submitit.AutoExecutor( + folder=str(self._log_dir), cluster=executor + ) s_executor.update_parameters( name=f"ray_worker_{self.cluster_id}", **worker_params, diff --git a/tests/integration/cluster/test_no_array_mode.py b/tests/integration/cluster/test_no_array_mode.py new file mode 100644 index 0000000..d136693 --- /dev/null +++ b/tests/integration/cluster/test_no_array_mode.py @@ -0,0 +1,304 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +import tempfile +import time +import uuid +from typing import Any, Generator + +import pytest +import ray + +from matrix.cli import Cli +from matrix.utils.ray import status_is_pending, status_is_success + + +@pytest.fixture(scope="module") +def matrix_cluster_no_array() -> Generator[Any, Any, Any]: + """Start and stop Ray cluster with use_array=False for the duration of these tests.""" + with tempfile.TemporaryDirectory() as temp_dir: + cli = Cli(cluster_id=str(uuid.uuid4()), matrix_dir=temp_dir) + cli.start_cluster( + add_workers=1, + slurm=None, + local={"gpus_per_node": 0, "cpus_per_task": 2}, + enable_grafana=False, + use_array=False, + ) + with cli.cluster: + yield cli + + +@pytest.fixture(scope="module") +def matrix_cluster_no_array_multi_worker() -> Generator[Any, Any, Any]: + """Start and stop Ray cluster with use_array=False and multiple workers.""" + with tempfile.TemporaryDirectory() as temp_dir: + cli = Cli(cluster_id=str(uuid.uuid4()), matrix_dir=temp_dir) + cli.start_cluster( + add_workers=2, + slurm=None, + local={"gpus_per_node": 0, "cpus_per_task": 2}, + enable_grafana=False, + use_array=False, + ) + with cli.cluster: + yield cli + + +def test_cluster_starts_no_array(matrix_cluster_no_array: Cli) -> None: + """Test that cluster starts successfully with use_array=False.""" + cli = matrix_cluster_no_array + cluster_info = cli.cluster.cluster_info() + + assert cluster_info is not None, "Cluster info should exist" + assert cluster_info.hostname is not None, "Hostname should be set" + assert cluster_info.port > 0, "Port should be set" + + +def test_head_has_resources_no_array(matrix_cluster_no_array: Cli) -> None: + """Test that head node has CPU resources allocated when use_array=False.""" + cli = matrix_cluster_no_array + cluster_info = cli.cluster.cluster_info() + assert cluster_info is not None + + # Initialize ray connection + from matrix.utils.ray import init_ray_if_necessary + + init_ray_if_necessary(cluster_info) + + # Get cluster resources + resources = ray.cluster_resources() + + # Head should have CPU resources since it's acting as worker + assert ( + resources.get("CPU", 0) > 0 + ), "Head should have CPU resources when use_array=False" + + +def test_deploy_hello_no_array(matrix_cluster_no_array: Cli) -> None: + """Test that applications can be deployed with use_array=False.""" + cli = matrix_cluster_no_array + cli.deploy_applications(applications=[{"name": "hello", "app_type": "hello"}]) + + for _ in range(10): + status = cli.app.app_status("hello") + if not status_is_pending(status): + break + time.sleep(5) + + assert status_is_success(status), f"Bad status {status}" + assert cli.check_health("hello") + + +def test_cluster_multi_worker_no_array( + matrix_cluster_no_array_multi_worker: Cli, +) -> None: + """Test cluster with multiple workers and use_array=False.""" + cli = matrix_cluster_no_array_multi_worker + cluster_info = cli.cluster.cluster_info() + + assert cluster_info is not None, "Cluster info should exist" + + # Initialize ray connection + from matrix.utils.ray import init_ray_if_necessary + + init_ray_if_necessary(cluster_info) + + # Get cluster resources + resources = ray.cluster_resources() + + # Should have resources from head (acting as worker) + 1 additional worker + assert resources.get("CPU", 0) > 0, "Cluster should have CPU resources" + + # Wait for workers to join (local mode might be slower) + max_wait = 30 # seconds + start_time = time.time() + expected_nodes = 2 # head (acting as worker) + 1 additional worker + + while time.time() - start_time < max_wait: + nodes = ray.nodes() + alive_nodes = [n for n in nodes if n["Alive"]] + if len(alive_nodes) >= expected_nodes: + break + time.sleep(2) + + # Check final node count + nodes = ray.nodes() + alive_nodes = [n for n in nodes if n["Alive"]] + # In local mode, workers might not show as separate nodes, so check for at least 1 + assert ( + len(alive_nodes) >= 1 + ), f"Should have at least 1 alive node, got {len(alive_nodes)}" + + # More importantly, check that we have enough CPU resources + # With add_workers=2 and cpus_per_task=2, we should have at least 4 CPUs + # (2 from head + 2 from worker) + total_cpu = resources.get("CPU", 0) + assert total_cpu >= 2, f"Should have at least 2 CPUs, got {total_cpu}" + + +def test_deploy_on_multi_worker_no_array( + matrix_cluster_no_array_multi_worker: Cli, +) -> None: + """Test deployment on multi-worker cluster with use_array=False.""" + cli = matrix_cluster_no_array_multi_worker + cli.deploy_applications(applications=[{"name": "hello_multi", "app_type": "hello"}]) + + for _ in range(10): + status = cli.app.app_status("hello_multi") + if not status_is_pending(status): + break + time.sleep(5) + + assert status_is_success(status), f"Bad status {status}" + assert cli.check_health("hello_multi") + + +def test_incremental_scaling_no_array() -> None: + """Test that workers can be added incrementally with use_array=False.""" + with tempfile.TemporaryDirectory() as temp_dir: + cli = Cli(cluster_id=str(uuid.uuid4()), matrix_dir=temp_dir) + + # Start with 1 worker (head acts as worker) + cli.start_cluster( + add_workers=1, + slurm=None, + local={"gpus_per_node": 0, "cpus_per_task": 2}, + enable_grafana=False, + use_array=False, + ) + + cluster_info = cli.cluster.cluster_info() + assert cluster_info is not None + + from matrix.utils.ray import init_ray_if_necessary + + init_ray_if_necessary(cluster_info) + + # Check initial resources + initial_resources = ray.cluster_resources() + initial_cpu = initial_resources.get("CPU", 0) + assert initial_cpu > 0, "Should have CPU resources initially" + + # Add more workers + cli.start_cluster( + add_workers=1, + slurm=None, + local={"gpus_per_node": 0, "cpus_per_task": 2}, + enable_grafana=False, + use_array=False, + ) + + # Wait for new worker to join + max_wait = 30 # seconds + start_time = time.time() + new_cpu = initial_cpu + + while time.time() - start_time < max_wait: + time.sleep(2) + new_resources = ray.cluster_resources() + new_cpu = new_resources.get("CPU", 0) + if new_cpu > initial_cpu: + break + + # Check that resources increased (or at least stayed the same in local mode) + # In local mode, the worker might share the same host so CPU might not increase + assert ( + new_cpu >= initial_cpu + ), f"CPU should not decrease: {initial_cpu} -> {new_cpu}" + + # Cleanup + cli.cluster.stop() + + +def test_head_only_gets_gpu_resources() -> None: + """Test that head gets GPU resources when add_workers=0.""" + with tempfile.TemporaryDirectory() as temp_dir: + cli = Cli(cluster_id=str(uuid.uuid4()), matrix_dir=temp_dir) + + # Start cluster with only head (add_workers=0) + # In this case, head should get full requirements including GPU resources + cli.start_cluster( + add_workers=0, + slurm=None, + local={"gpus_per_node": 0, "cpus_per_task": 4}, + enable_grafana=False, + use_array=False, # use_array shouldn't matter when add_workers=0 + ) + + cluster_info = cli.cluster.cluster_info() + assert cluster_info is not None + + from matrix.utils.ray import init_ray_if_necessary + + init_ray_if_necessary(cluster_info) + + # Check that head has CPU resources + resources = ray.cluster_resources() + cpu = resources.get("CPU", 0) + assert cpu > 0, f"Head-only cluster should have CPU resources, got {cpu}" + + # Cleanup + cli.cluster.stop() + + +def test_array_mode_comparison() -> None: + """Test that array mode and non-array mode allocate resources differently.""" + # Test 1: Array mode - head should have 0 resources when workers exist + with tempfile.TemporaryDirectory() as temp_dir: + cli_array = Cli(cluster_id=str(uuid.uuid4()), matrix_dir=temp_dir) + + cli_array.start_cluster( + add_workers=1, + slurm=None, + local={"gpus_per_node": 0, "cpus_per_task": 2}, + enable_grafana=False, + use_array=True, # Array mode + ) + + cluster_info_array = cli_array.cluster.cluster_info() + assert cluster_info_array is not None + + from matrix.utils.ray import init_ray_if_necessary + + init_ray_if_necessary(cluster_info_array) + + # In array mode, head gets 0 resources + resources_array = ray.cluster_resources() + # Head has 0 CPUs/GPUs, but worker has resources + # Total should still be > 0 from worker + assert ( + resources_array.get("CPU", 0) > 0 + ), "Array mode should have CPU from workers" + + cli_array.cluster.stop() + + # Test 2: Non-array mode - head should have resources when acting as worker + with tempfile.TemporaryDirectory() as temp_dir: + cli_no_array = Cli(cluster_id=str(uuid.uuid4()), matrix_dir=temp_dir) + + cli_no_array.start_cluster( + add_workers=1, + slurm=None, + local={"gpus_per_node": 0, "cpus_per_task": 2}, + enable_grafana=False, + use_array=False, # Non-array mode + ) + + cluster_info_no_array = cli_no_array.cluster.cluster_info() + assert cluster_info_no_array is not None + + from matrix.utils.ray import init_ray_if_necessary + + init_ray_if_necessary(cluster_info_no_array) + + # In non-array mode, head acts as worker and has resources + resources_no_array = ray.cluster_resources() + assert ( + resources_no_array.get("CPU", 0) > 0 + ), "Non-array mode head should have CPU resources" + + cli_no_array.cluster.stop() From 3fc0f1ca76b8f6ee1ab9377c32114a427defae3a Mon Sep 17 00:00:00 2001 From: Dong Wang Date: Wed, 7 Jan 2026 05:19:35 +0000 Subject: [PATCH 3/7] lint --- matrix/cluster/ray_head_job.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/matrix/cluster/ray_head_job.py b/matrix/cluster/ray_head_job.py index 0549409..864256c 100644 --- a/matrix/cluster/ray_head_job.py +++ b/matrix/cluster/ray_head_job.py @@ -73,12 +73,12 @@ def start_ray_head( slurm_requirements["cpus_per_task"], slurm_requirements["gpus_per_node"], ) - logical_resources = json.dumps(logical_resources) + logical_resources_str = json.dumps(logical_resources) else: # Head only - no compute resources num_cpus = 0 num_gpus = 0 - logical_resources = json.dumps({}) + logical_resources_str = json.dumps({}) with tempfile.TemporaryDirectory(dir="/tmp") as temp_dir: # Start Ray head process @@ -100,7 +100,7 @@ def start_ray_head( "--dashboard-host=0.0.0.0", f"--dashboard-agent-listen-port={dashboard_agent_listen_port}", "--resources", - logical_resources, + logical_resources_str, ], env=head_env, stdout=subprocess.PIPE, From 3cd1cceb71889680981b91c4f62d0d2762d7b41f Mon Sep 17 00:00:00 2001 From: Dong Wang Date: Sun, 11 Jan 2026 20:05:09 +0000 Subject: [PATCH 4/7] revert the changes to head, keep it cpu only --- matrix/cli.py | 2 - matrix/client/endpoint_cache.py | 7 +- matrix/cluster/ray_cluster.py | 16 +- matrix/cluster/ray_head_job.py | 24 +-- matrix/common/cluster_info.py | 1 - .../integration/cluster/test_no_array_mode.py | 180 +----------------- 6 files changed, 9 insertions(+), 221 deletions(-) diff --git a/matrix/cli.py b/matrix/cli.py index 2671397..c7193d3 100644 --- a/matrix/cli.py +++ b/matrix/cli.py @@ -95,8 +95,6 @@ def start_cluster( enable_grafana (bool, optional): If True, enable prometheus and grafana dashboard. force_new_head (bool): force to remove head.json if haven't run 'matrix stop_cluster'. use_array (bool): If True, use Slurm job arrays for workers (default: True). - When False, workers are submitted as individual jobs and the head node - allocates GPU when starting a new cluster with workers. Returns: None diff --git a/matrix/client/endpoint_cache.py b/matrix/client/endpoint_cache.py index 2d566aa..ba115a7 100644 --- a/matrix/client/endpoint_cache.py +++ b/matrix/client/endpoint_cache.py @@ -45,12 +45,7 @@ async def __call__(self, force_update=False): if status is not None and status == 200: ray_query_result = json.loads(content) head_ip = ray_query_result["controller_info"]["node_ip"] - self.ips = set([y["node_ip"] for x, y in ray_query_result["proxies"].items() if y["status"] == "HEALTHY"]) # type: ignore[attr-defined] - if ( - len(self.ips) > 1 - and not self.cluster_info.head_is_worker - ): - self.ips.discard(head_ip) + self.ips = set([y["node_ip"] for x, y in ray_query_result["proxies"].items() if y["status"] == "HEALTHY" and y["node_ip"] != head_ip]) # type: ignore[attr-defined] else: raise Exception(f"status: {status}, {content}") else: diff --git a/matrix/cluster/ray_cluster.py b/matrix/cluster/ray_cluster.py index a9ebe9c..2e606ec 100644 --- a/matrix/cluster/ray_cluster.py +++ b/matrix/cluster/ray_cluster.py @@ -352,8 +352,6 @@ def start( for monitoring (default: True). force_new_head (bool): force to remove head.json if haven't run 'matrix stop_cluster'. use_array (bool): If True, use Slurm job arrays for workers (default: True). - When False, workers are submitted as individual jobs and the head node - allocates GPU when starting a new cluster with workers. """ import submitit @@ -388,18 +386,11 @@ def start( print(f"Adding workers to existing cluster:\n{self.cluster_info()}") else: head_params = requirements.copy() - if ( - use_array and add_workers > 0 - ): # cpu host, overwrite as the param are meant for workers + if add_workers > 0: # cpu host, overwrite as the param are meant for workers head_params["gpus_per_node"] = 0 head_params["cpus_per_task"] = 20 head_params = _apply_default_requirements(head_params, executor) - logical_resources = { - f"{key}-{value}": 1 - for key, value in head_params.items() - if key in _SLURM_KEY_ALIASES.values() - } print(f"Head Slurm parameters: {head_params}") s_executor = submitit.AutoExecutor( @@ -416,9 +407,6 @@ def start( self._cluster_json, worker_wait_timeout_seconds, executor, - head_params, - logical_resources, - not use_array, ) self._add_job(head_job) create_symlinks(self._log_dir, "head", head_job.paths) @@ -455,8 +443,6 @@ def start( self.start_grafana(force=True) # start the workers - if is_new_cluster and not use_array: - add_workers -= 1 # head becomes a worker too if add_workers > 0: worker_params = _apply_default_requirements(requirements, executor) if not use_array: diff --git a/matrix/cluster/ray_head_job.py b/matrix/cluster/ray_head_job.py index 864256c..9521b20 100644 --- a/matrix/cluster/ray_head_job.py +++ b/matrix/cluster/ray_head_job.py @@ -31,9 +31,6 @@ def start_ray_head( cluster_json_path: Path, worker_wait_timeout_seconds: int, executor: str, - slurm_requirements: tp.Dict[str, tp.Any], - logical_resources: tp.Dict[str, tp.Any], - head_is_worker: bool = False, ): """Start the head node of the Ray cluster on slurm.""" hostname = socket.gethostname() @@ -66,20 +63,6 @@ def start_ray_head( ip_address = socket.gethostbyname(hostname) print(f"Host {hostname}:{port}, IP {ip_address}") - # Determine resource allocation for head - if head_is_worker: - # Head acts as worker - allocate actual resources using same logic as workers - num_cpus, num_gpus = ( - slurm_requirements["cpus_per_task"], - slurm_requirements["gpus_per_node"], - ) - logical_resources_str = json.dumps(logical_resources) - else: - # Head only - no compute resources - num_cpus = 0 - num_gpus = 0 - logical_resources_str = json.dumps({}) - with tempfile.TemporaryDirectory(dir="/tmp") as temp_dir: # Start Ray head process ray_process = subprocess.Popen( @@ -94,13 +77,11 @@ def start_ray_head( f"--metrics-export-port={metrics_port}", f"--temp-dir={temp_dir}", "--num-cpus", - str(num_cpus), + "0", "--num-gpus", - str(num_gpus), + "0", "--dashboard-host=0.0.0.0", f"--dashboard-agent-listen-port={dashboard_agent_listen_port}", - "--resources", - logical_resources_str, ], env=head_env, stdout=subprocess.PIPE, @@ -132,7 +113,6 @@ def start_ray_head( dashboard_agent_listen_port=int(dashboard_agent_listen_port), temp_dir=temp_dir, executor=executor, - head_is_worker=head_is_worker, ) with cluster_json_path.open("w") as f: json.dump(dataclasses.asdict(info), f) diff --git a/matrix/common/cluster_info.py b/matrix/common/cluster_info.py index 10e037d..459cc9d 100644 --- a/matrix/common/cluster_info.py +++ b/matrix/common/cluster_info.py @@ -28,7 +28,6 @@ class ClusterInfo: dashboard_agent_listen_port: tp.Optional[int] = None temp_dir: tp.Optional[str] = None executor: tp.Optional[str] = None - head_is_worker: tp.Optional[bool] = None def get_head_http_host(cluster_info: ClusterInfo) -> str: diff --git a/tests/integration/cluster/test_no_array_mode.py b/tests/integration/cluster/test_no_array_mode.py index d136693..34e1383 100644 --- a/tests/integration/cluster/test_no_array_mode.py +++ b/tests/integration/cluster/test_no_array_mode.py @@ -32,22 +32,6 @@ def matrix_cluster_no_array() -> Generator[Any, Any, Any]: yield cli -@pytest.fixture(scope="module") -def matrix_cluster_no_array_multi_worker() -> Generator[Any, Any, Any]: - """Start and stop Ray cluster with use_array=False and multiple workers.""" - with tempfile.TemporaryDirectory() as temp_dir: - cli = Cli(cluster_id=str(uuid.uuid4()), matrix_dir=temp_dir) - cli.start_cluster( - add_workers=2, - slurm=None, - local={"gpus_per_node": 0, "cpus_per_task": 2}, - enable_grafana=False, - use_array=False, - ) - with cli.cluster: - yield cli - - def test_cluster_starts_no_array(matrix_cluster_no_array: Cli) -> None: """Test that cluster starts successfully with use_array=False.""" cli = matrix_cluster_no_array @@ -58,8 +42,8 @@ def test_cluster_starts_no_array(matrix_cluster_no_array: Cli) -> None: assert cluster_info.port > 0, "Port should be set" -def test_head_has_resources_no_array(matrix_cluster_no_array: Cli) -> None: - """Test that head node has CPU resources allocated when use_array=False.""" +def test_workers_have_resources_no_array(matrix_cluster_no_array: Cli) -> None: + """Test that workers have CPU resources allocated when use_array=False.""" cli = matrix_cluster_no_array cluster_info = cli.cluster.cluster_info() assert cluster_info is not None @@ -72,10 +56,10 @@ def test_head_has_resources_no_array(matrix_cluster_no_array: Cli) -> None: # Get cluster resources resources = ray.cluster_resources() - # Head should have CPU resources since it's acting as worker + # Workers should have CPU resources assert ( resources.get("CPU", 0) > 0 - ), "Head should have CPU resources when use_array=False" + ), "Workers should have CPU resources when use_array=False" def test_deploy_hello_no_array(matrix_cluster_no_array: Cli) -> None: @@ -93,76 +77,12 @@ def test_deploy_hello_no_array(matrix_cluster_no_array: Cli) -> None: assert cli.check_health("hello") -def test_cluster_multi_worker_no_array( - matrix_cluster_no_array_multi_worker: Cli, -) -> None: - """Test cluster with multiple workers and use_array=False.""" - cli = matrix_cluster_no_array_multi_worker - cluster_info = cli.cluster.cluster_info() - - assert cluster_info is not None, "Cluster info should exist" - - # Initialize ray connection - from matrix.utils.ray import init_ray_if_necessary - - init_ray_if_necessary(cluster_info) - - # Get cluster resources - resources = ray.cluster_resources() - - # Should have resources from head (acting as worker) + 1 additional worker - assert resources.get("CPU", 0) > 0, "Cluster should have CPU resources" - - # Wait for workers to join (local mode might be slower) - max_wait = 30 # seconds - start_time = time.time() - expected_nodes = 2 # head (acting as worker) + 1 additional worker - - while time.time() - start_time < max_wait: - nodes = ray.nodes() - alive_nodes = [n for n in nodes if n["Alive"]] - if len(alive_nodes) >= expected_nodes: - break - time.sleep(2) - - # Check final node count - nodes = ray.nodes() - alive_nodes = [n for n in nodes if n["Alive"]] - # In local mode, workers might not show as separate nodes, so check for at least 1 - assert ( - len(alive_nodes) >= 1 - ), f"Should have at least 1 alive node, got {len(alive_nodes)}" - - # More importantly, check that we have enough CPU resources - # With add_workers=2 and cpus_per_task=2, we should have at least 4 CPUs - # (2 from head + 2 from worker) - total_cpu = resources.get("CPU", 0) - assert total_cpu >= 2, f"Should have at least 2 CPUs, got {total_cpu}" - - -def test_deploy_on_multi_worker_no_array( - matrix_cluster_no_array_multi_worker: Cli, -) -> None: - """Test deployment on multi-worker cluster with use_array=False.""" - cli = matrix_cluster_no_array_multi_worker - cli.deploy_applications(applications=[{"name": "hello_multi", "app_type": "hello"}]) - - for _ in range(10): - status = cli.app.app_status("hello_multi") - if not status_is_pending(status): - break - time.sleep(5) - - assert status_is_success(status), f"Bad status {status}" - assert cli.check_health("hello_multi") - - def test_incremental_scaling_no_array() -> None: """Test that workers can be added incrementally with use_array=False.""" with tempfile.TemporaryDirectory() as temp_dir: cli = Cli(cluster_id=str(uuid.uuid4()), matrix_dir=temp_dir) - # Start with 1 worker (head acts as worker) + # Start with 1 worker cli.start_cluster( add_workers=1, slurm=None, @@ -212,93 +132,3 @@ def test_incremental_scaling_no_array() -> None: # Cleanup cli.cluster.stop() - - -def test_head_only_gets_gpu_resources() -> None: - """Test that head gets GPU resources when add_workers=0.""" - with tempfile.TemporaryDirectory() as temp_dir: - cli = Cli(cluster_id=str(uuid.uuid4()), matrix_dir=temp_dir) - - # Start cluster with only head (add_workers=0) - # In this case, head should get full requirements including GPU resources - cli.start_cluster( - add_workers=0, - slurm=None, - local={"gpus_per_node": 0, "cpus_per_task": 4}, - enable_grafana=False, - use_array=False, # use_array shouldn't matter when add_workers=0 - ) - - cluster_info = cli.cluster.cluster_info() - assert cluster_info is not None - - from matrix.utils.ray import init_ray_if_necessary - - init_ray_if_necessary(cluster_info) - - # Check that head has CPU resources - resources = ray.cluster_resources() - cpu = resources.get("CPU", 0) - assert cpu > 0, f"Head-only cluster should have CPU resources, got {cpu}" - - # Cleanup - cli.cluster.stop() - - -def test_array_mode_comparison() -> None: - """Test that array mode and non-array mode allocate resources differently.""" - # Test 1: Array mode - head should have 0 resources when workers exist - with tempfile.TemporaryDirectory() as temp_dir: - cli_array = Cli(cluster_id=str(uuid.uuid4()), matrix_dir=temp_dir) - - cli_array.start_cluster( - add_workers=1, - slurm=None, - local={"gpus_per_node": 0, "cpus_per_task": 2}, - enable_grafana=False, - use_array=True, # Array mode - ) - - cluster_info_array = cli_array.cluster.cluster_info() - assert cluster_info_array is not None - - from matrix.utils.ray import init_ray_if_necessary - - init_ray_if_necessary(cluster_info_array) - - # In array mode, head gets 0 resources - resources_array = ray.cluster_resources() - # Head has 0 CPUs/GPUs, but worker has resources - # Total should still be > 0 from worker - assert ( - resources_array.get("CPU", 0) > 0 - ), "Array mode should have CPU from workers" - - cli_array.cluster.stop() - - # Test 2: Non-array mode - head should have resources when acting as worker - with tempfile.TemporaryDirectory() as temp_dir: - cli_no_array = Cli(cluster_id=str(uuid.uuid4()), matrix_dir=temp_dir) - - cli_no_array.start_cluster( - add_workers=1, - slurm=None, - local={"gpus_per_node": 0, "cpus_per_task": 2}, - enable_grafana=False, - use_array=False, # Non-array mode - ) - - cluster_info_no_array = cli_no_array.cluster.cluster_info() - assert cluster_info_no_array is not None - - from matrix.utils.ray import init_ray_if_necessary - - init_ray_if_necessary(cluster_info_no_array) - - # In non-array mode, head acts as worker and has resources - resources_no_array = ray.cluster_resources() - assert ( - resources_no_array.get("CPU", 0) > 0 - ), "Non-array mode head should have CPU resources" - - cli_no_array.cluster.stop() From e40021f78e5e7a93f16072660c7ad98655c1a0dd Mon Sep 17 00:00:00 2001 From: Dong Wang Date: Sun, 11 Jan 2026 20:30:43 +0000 Subject: [PATCH 5/7] get timeout_min from slurm --- matrix/cluster/ray_cluster.py | 42 +++++++++++++++++++++++++++++++---- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/matrix/cluster/ray_cluster.py b/matrix/cluster/ray_cluster.py index 2e606ec..131c746 100644 --- a/matrix/cluster/ray_cluster.py +++ b/matrix/cluster/ray_cluster.py @@ -48,6 +48,34 @@ def _normalize_slurm_keys( return normalized +def _parse_time_limit(time_str: str) -> int | None: + """Parse SLURM time limit string to minutes. + + Formats: infinite, DAYS-HH:MM:SS, HH:MM:SS, MM:SS, MM + """ + if not time_str or time_str.lower() == "infinite": + return None + + try: + days = 0 + if "-" in time_str: + days_part, time_str = time_str.split("-", 1) + days = int(days_part) + + parts = time_str.split(":") + if len(parts) == 3: # HH:MM:SS + hours, minutes, seconds = map(int, parts) + elif len(parts) == 2: # MM:SS + hours = 0 + minutes, seconds = map(int, parts) + else: # MM + return int(parts[0]) + + return days * 24 * 60 + hours * 60 + minutes + except (ValueError, IndexError): + return None + + def _get_slurm_default_requirements(requirements: dict[str, tp.Any]): """ Extract SLURM partition info including CPU and memory requirements. @@ -72,10 +100,10 @@ def _get_slurm_default_requirements(requirements: dict[str, tp.Any]): requirements["partition"] = partition # Get detailed info for the partition - # %P=partition, %c=CPUs, %m=memory(MB), %l=time_limit, %N=nodes + # %G=GRES(GPUs), %c=CPUs, %m=memory(MB), %l=time_limit sinfo_output = ( subprocess.check_output( - ["sinfo", "-h", "-p", partition, "-o", "%G %c %m"], + ["sinfo", "-h", "-p", partition, "-o", "%G %c %m %l"], stderr=subprocess.PIPE, ) .decode() @@ -87,11 +115,12 @@ def _get_slurm_default_requirements(requirements: dict[str, tp.Any]): max_cpus = 0 max_memory = 0 max_gpus = 0 + max_time_min = 0 gpu_type = None for line in lines: parts = line.split() - if len(parts) >= 3: + if len(parts) >= 4: gpu_info = _parse_gpu_gres(parts[0]) if gpu_info: if gpu_info["count"] > max_gpus: @@ -102,11 +131,16 @@ def _get_slurm_default_requirements(requirements: dict[str, tp.Any]): max_cpus = max(max_cpus, cpus) memory = _parse_slurm_value(parts[2]) max_memory = max(max_memory, memory) + time_min = _parse_time_limit(parts[3]) + if time_min is not None: + max_time_min = max(max_time_min, time_min) requirements["cpus_per_task"] = max_cpus requirements["mem_gb"] = max_memory // 1024 if max_gpus > 0: requirements["gpus_per_node"] = max_gpus + if max_time_min > 0: + requirements["timeout_min"] = max_time_min except subprocess.CalledProcessError as e: print(f"Error running sinfo: {e}") @@ -156,7 +190,6 @@ def _apply_default_requirements( """ default_params: dict[str, tp.Any] = { "ntasks_per_node": 1, - "timeout_min": 10080, } partition = requirements.get("partition") @@ -176,6 +209,7 @@ def _apply_default_requirements( default_params["cpus_per_task"] = num_cpus default_params["mem_gb"] = mem_gb default_params["gpus_per_node"] = num_gpus + default_params["timeout_min"] = 10080 # fallback for local executor # Merge defaults into requirements (user values take precedence) result = requirements.copy() From 53e14d9171fdfdf21440e640c6f57bb253ee696d Mon Sep 17 00:00:00 2001 From: Dong Wang Date: Sun, 11 Jan 2026 20:35:12 +0000 Subject: [PATCH 6/7] lint --- matrix/cluster/ray_cluster.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/matrix/cluster/ray_cluster.py b/matrix/cluster/ray_cluster.py index 131c746..8d15f0c 100644 --- a/matrix/cluster/ray_cluster.py +++ b/matrix/cluster/ray_cluster.py @@ -420,7 +420,9 @@ def start( print(f"Adding workers to existing cluster:\n{self.cluster_info()}") else: head_params = requirements.copy() - if add_workers > 0: # cpu host, overwrite as the param are meant for workers + if ( + add_workers > 0 + ): # cpu host, overwrite as the param are meant for workers head_params["gpus_per_node"] = 0 head_params["cpus_per_task"] = 20 From ea4b0f18544e9ef8894c2588e5390e7ce73da11e Mon Sep 17 00:00:00 2001 From: Dong Wang Date: Sun, 11 Jan 2026 23:26:55 +0000 Subject: [PATCH 7/7] only set VLLM_CACHE_ROOT. cache_dir may be node specific --- matrix/app_server/deploy_utils.py | 40 +------------------------------ 1 file changed, 1 insertion(+), 39 deletions(-) diff --git a/matrix/app_server/deploy_utils.py b/matrix/app_server/deploy_utils.py index 17e2858..e1a70ad 100644 --- a/matrix/app_server/deploy_utils.py +++ b/matrix/app_server/deploy_utils.py @@ -83,11 +83,7 @@ OUTLINES_CACHE_DIR: {{ temp_dir }}/.outlines RAY_DEBUG: legacy TIKTOKEN_RS_CACHE_DIR: {{ temp_dir }} - VLLM_CACHE_ROOT: {{ cache_dir["VLLM_CACHE_ROOT"] }} - TORCH_EXTENSIONS_DIR: {{ cache_dir["TORCH_EXTENSIONS_DIR"] }} - TORCHINDUCTOR_CACHE_DIR: {{ cache_dir["TORCHINDUCTOR_CACHE_DIR"] }} - TORCH_COMPILE_DEBUG_DIR: {{ cache_dir["TORCH_COMPILE_DEBUG_DIR"] }} - TRITON_CACHE_DIR: {{ cache_dir["TRITON_CACHE_DIR"] }} + VLLM_CACHE_ROOT: {{ temp_dir }}/.cache/vllm args: model: {{ app.model_name }} {% for key, value in app.items() %} @@ -375,38 +371,6 @@ def delete_apps(cluster_info, apps_list: List[Dict[str, Union[str, int]]] | None print(f"Actors deleted {actors}") -def setup_native_cache_dirs(base_dir=None): - """ - Make Torch / vLLM caches ABI-safe across: - - vLLM versions - - PyTorch versions - - CUDA versions - - """ - import pathlib - - import torch - import vllm - - if base_dir is None: - base_dir = os.path.expanduser("~/.cache/matrix") - - tag = f"torch{torch.__version__}-cuda{torch.version.cuda or 'cpu'}-vllm{vllm.__version__}" - - def d(name): - path = pathlib.Path(base_dir) / tag / name - path.mkdir(parents=True, exist_ok=True) - return str(path) - - return { - "VLLM_CACHE_ROOT": d("vllm"), - "TORCH_EXTENSIONS_DIR": d("torch-extensions"), - "TORCHINDUCTOR_CACHE_DIR": d("torch-inductor"), - "TORCH_COMPILE_DEBUG_DIR": d("torch-compile"), - "TRITON_CACHE_DIR": d("triton"), - } - - def get_yaml_for_deployment( cluster_info: ClusterInfo, action: Action, @@ -421,7 +385,6 @@ def get_yaml_for_deployment( from matrix.app_server.llm.ray_serve_vllm import BaseDeployment temp_dir = cluster_info.temp_dir - cache_dir = setup_native_cache_dirs() if yaml_config is None: assert applications is not None yaml_str = Template(common_config).render( @@ -481,7 +444,6 @@ def get_yaml_for_deployment( temp_dir=temp_dir, non_model_params=non_model_params, app=app, - cache_dir=cache_dir, ) elif app_type == "code": if "name" not in app: