diff --git a/matrix/app_server/deploy_utils.py b/matrix/app_server/deploy_utils.py index 17e2858..e1a70ad 100644 --- a/matrix/app_server/deploy_utils.py +++ b/matrix/app_server/deploy_utils.py @@ -83,11 +83,7 @@ OUTLINES_CACHE_DIR: {{ temp_dir }}/.outlines RAY_DEBUG: legacy TIKTOKEN_RS_CACHE_DIR: {{ temp_dir }} - VLLM_CACHE_ROOT: {{ cache_dir["VLLM_CACHE_ROOT"] }} - TORCH_EXTENSIONS_DIR: {{ cache_dir["TORCH_EXTENSIONS_DIR"] }} - TORCHINDUCTOR_CACHE_DIR: {{ cache_dir["TORCHINDUCTOR_CACHE_DIR"] }} - TORCH_COMPILE_DEBUG_DIR: {{ cache_dir["TORCH_COMPILE_DEBUG_DIR"] }} - TRITON_CACHE_DIR: {{ cache_dir["TRITON_CACHE_DIR"] }} + VLLM_CACHE_ROOT: {{ temp_dir }}/.cache/vllm args: model: {{ app.model_name }} {% for key, value in app.items() %} @@ -375,38 +371,6 @@ def delete_apps(cluster_info, apps_list: List[Dict[str, Union[str, int]]] | None print(f"Actors deleted {actors}") -def setup_native_cache_dirs(base_dir=None): - """ - Make Torch / vLLM caches ABI-safe across: - - vLLM versions - - PyTorch versions - - CUDA versions - - """ - import pathlib - - import torch - import vllm - - if base_dir is None: - base_dir = os.path.expanduser("~/.cache/matrix") - - tag = f"torch{torch.__version__}-cuda{torch.version.cuda or 'cpu'}-vllm{vllm.__version__}" - - def d(name): - path = pathlib.Path(base_dir) / tag / name - path.mkdir(parents=True, exist_ok=True) - return str(path) - - return { - "VLLM_CACHE_ROOT": d("vllm"), - "TORCH_EXTENSIONS_DIR": d("torch-extensions"), - "TORCHINDUCTOR_CACHE_DIR": d("torch-inductor"), - "TORCH_COMPILE_DEBUG_DIR": d("torch-compile"), - "TRITON_CACHE_DIR": d("triton"), - } - - def get_yaml_for_deployment( cluster_info: ClusterInfo, action: Action, @@ -421,7 +385,6 @@ def get_yaml_for_deployment( from matrix.app_server.llm.ray_serve_vllm import BaseDeployment temp_dir = cluster_info.temp_dir - cache_dir = setup_native_cache_dirs() if yaml_config is None: assert applications is not None yaml_str = Template(common_config).render( @@ -481,7 +444,6 @@ def get_yaml_for_deployment( temp_dir=temp_dir, non_model_params=non_model_params, app=app, - cache_dir=cache_dir, ) elif app_type == "code": if "name" not in app: diff --git a/matrix/cli.py b/matrix/cli.py index 1a64906..c7193d3 100644 --- a/matrix/cli.py +++ b/matrix/cli.py @@ -81,25 +81,21 @@ def start_cluster( local: tp.Dict[str, tp.Union[str, int]] | None = None, enable_grafana: bool = False, force_new_head: bool = False, + use_array: bool = True, ) -> tp.Dict[str, tp.Any]: - """ - Starts the Ray cluster with additional keyword arguments. Only do this for new clusters. - - Args: - **kwargs: Arbitrary keyword arguments passed to the RayCluster's start_head method. - """ """ Starts the Ray cluster with the specified number of workers and additional configuration. - + Can add additional workers if the cluster already exists. - + Args: add_workers (int): Number of worker nodes to add in the cluster. slurm (dict, optional): resources for slurm cluster. local (dict, optional): resources for local cluster. enable_grafana (bool, optional): If True, enable prometheus and grafana dashboard. force_new_head (bool): force to remove head.json if haven't run 'matrix stop_cluster'. - + use_array (bool): If True, use Slurm job arrays for workers (default: True). + Returns: None """ @@ -109,6 +105,7 @@ def start_cluster( local, enable_grafana=enable_grafana, force_new_head=force_new_head, + use_array=use_array, ) return convert_to_json_compatible(status) diff --git a/matrix/cluster/ray_cluster.py b/matrix/cluster/ray_cluster.py index 23f6567..8d15f0c 100644 --- a/matrix/cluster/ray_cluster.py +++ b/matrix/cluster/ray_cluster.py @@ -18,6 +18,8 @@ import typing as tp from pathlib import Path +import psutil + from matrix.common import JOB_MANAGER_STORE from matrix.common.cluster_info import ClusterInfo from matrix.utils.basics import convert_to_json_compatible @@ -46,6 +48,34 @@ def _normalize_slurm_keys( return normalized +def _parse_time_limit(time_str: str) -> int | None: + """Parse SLURM time limit string to minutes. + + Formats: infinite, DAYS-HH:MM:SS, HH:MM:SS, MM:SS, MM + """ + if not time_str or time_str.lower() == "infinite": + return None + + try: + days = 0 + if "-" in time_str: + days_part, time_str = time_str.split("-", 1) + days = int(days_part) + + parts = time_str.split(":") + if len(parts) == 3: # HH:MM:SS + hours, minutes, seconds = map(int, parts) + elif len(parts) == 2: # MM:SS + hours = 0 + minutes, seconds = map(int, parts) + else: # MM + return int(parts[0]) + + return days * 24 * 60 + hours * 60 + minutes + except (ValueError, IndexError): + return None + + def _get_slurm_default_requirements(requirements: dict[str, tp.Any]): """ Extract SLURM partition info including CPU and memory requirements. @@ -70,10 +100,10 @@ def _get_slurm_default_requirements(requirements: dict[str, tp.Any]): requirements["partition"] = partition # Get detailed info for the partition - # %P=partition, %c=CPUs, %m=memory(MB), %l=time_limit, %N=nodes + # %G=GRES(GPUs), %c=CPUs, %m=memory(MB), %l=time_limit sinfo_output = ( subprocess.check_output( - ["sinfo", "-h", "-p", partition, "-o", "%G %c %m"], + ["sinfo", "-h", "-p", partition, "-o", "%G %c %m %l"], stderr=subprocess.PIPE, ) .decode() @@ -85,11 +115,12 @@ def _get_slurm_default_requirements(requirements: dict[str, tp.Any]): max_cpus = 0 max_memory = 0 max_gpus = 0 + max_time_min = 0 gpu_type = None for line in lines: parts = line.split() - if len(parts) >= 3: + if len(parts) >= 4: gpu_info = _parse_gpu_gres(parts[0]) if gpu_info: if gpu_info["count"] > max_gpus: @@ -100,11 +131,16 @@ def _get_slurm_default_requirements(requirements: dict[str, tp.Any]): max_cpus = max(max_cpus, cpus) memory = _parse_slurm_value(parts[2]) max_memory = max(max_memory, memory) + time_min = _parse_time_limit(parts[3]) + if time_min is not None: + max_time_min = max(max_time_min, time_min) requirements["cpus_per_task"] = max_cpus requirements["mem_gb"] = max_memory // 1024 if max_gpus > 0: requirements["gpus_per_node"] = max_gpus + if max_time_min > 0: + requirements["timeout_min"] = max_time_min except subprocess.CalledProcessError as e: print(f"Error running sinfo: {e}") @@ -138,6 +174,58 @@ def _parse_gpu_gres(gres_str): return None +def _apply_default_requirements( + requirements: dict[str, tp.Any], + executor: str, +) -> dict[str, tp.Any]: + """ + Apply default Slurm requirements and auto-detect resources from partition. + + Args: + requirements: User-provided requirements + executor: "slurm" or "local" + + Returns: + Updated requirements with defaults and auto-detected values + """ + default_params: dict[str, tp.Any] = { + "ntasks_per_node": 1, + } + + partition = requirements.get("partition") + if partition: + default_params["partition"] = partition + + # Auto-detect resources from Slurm partition info + if executor == "slurm": + default_params = _get_slurm_default_requirements(default_params) + else: + num_cpus = max((os.cpu_count() or 0), 1) + mem_gb = psutil.virtual_memory().total // (1024**3) + num_gpus = len( + [s for s in os.environ.get("CUDA_VISIBLE_DEVICES", "").split(",") if s] + ) + + default_params["cpus_per_task"] = num_cpus + default_params["mem_gb"] = mem_gb + default_params["gpus_per_node"] = num_gpus + default_params["timeout_min"] = 10080 # fallback for local executor + + # Merge defaults into requirements (user values take precedence) + result = requirements.copy() + result.update( + {key: value for key, value in default_params.items() if key not in requirements} + ) + if "mem_gb" not in requirements and "cpus_per_task" in requirements: + result["mem_gb"] = ( + default_params["mem_gb"] + * requirements["cpus_per_task"] + // default_params["cpus_per_task"] + ) # take a fraction based on cpu + + return result + + class RayCluster: """ Manages the lifecycle of a Ray cluster on a Slurm-based system. @@ -279,6 +367,7 @@ def start( local: tp.Dict[str, tp.Union[str, int]] | None, enable_grafana: bool = False, force_new_head: bool = False, + use_array: bool = True, ): """ Starts a Ray cluster on Slurm. @@ -296,6 +385,7 @@ def start( enable_grafana (bool): Whether to start Prometheus and Grafana for monitoring (default: True). force_new_head (bool): force to remove head.json if haven't run 'matrix stop_cluster'. + use_array (bool): If True, use Slurm job arrays for workers (default: True). """ import submitit @@ -325,29 +415,24 @@ def start( if self._cluster_json.exists(): self._cluster_json.unlink() - if self._cluster_json.exists(): + is_new_cluster = not self._cluster_json.exists() + if not is_new_cluster: print(f"Adding workers to existing cluster:\n{self.cluster_info()}") - # todo: check the cluser is alive else: - # start the head node + head_params = requirements.copy() + if ( + add_workers > 0 + ): # cpu host, overwrite as the param are meant for workers + head_params["gpus_per_node"] = 0 + head_params["cpus_per_task"] = 20 + + head_params = _apply_default_requirements(head_params, executor) + print(f"Head Slurm parameters: {head_params}") + s_executor = submitit.AutoExecutor( folder=str(self._log_dir), cluster=executor, ) - head_default_params = {"timeout_min": 10080, "cpus_per_task": 20} - if add_workers == 0: - head_params = requirements - else: - head_params = { - k: v for k, v in requirements.items() if k in common_params - } - head_params.update( - { - key: value - for key, value in head_default_params.items() - if key not in head_params - } - ) s_executor.update_parameters( name=f"ray_head_{self.cluster_id}", **head_params, @@ -395,44 +480,36 @@ def start( # start the workers if add_workers > 0: + worker_params = _apply_default_requirements(requirements, executor) + if not use_array: + worker_params["nodes"] = add_workers + num_jobs = 1 + else: + num_jobs = add_workers + logical_resources = { + f"{key}-{value}": 1 + for key, value in worker_params.items() + if key in _SLURM_KEY_ALIASES.values() + } + print(f"Worker Slurm parameters: {worker_params}") + s_executor = submitit.AutoExecutor( folder=str(self._log_dir), cluster=executor ) - default_params: dict[str, tp.Any] = { - "ntasks_per_node": 1, - "timeout_min": 10080, - } - partition = requirements.get("partition") - if partition: - default_params["partition"] = partition - if executor == "slurm": - default_params = _get_slurm_default_requirements(default_params) - requirements.update( - { - key: value - for key, value in default_params.items() - if key not in requirements - } - ) - print(requirements) s_executor.update_parameters( name=f"ray_worker_{self.cluster_id}", - **requirements, + **worker_params, ) cluster_info = self.cluster_info() assert cluster_info is not None + + # submit job jobs = [] with ( s_executor.batch() ): # TODO set slurm array max parallelism here, because we really want all jobs to be scheduled at the same time - logical_resources = { - f"{key}-{value}": 1 - for key, value in requirements.items() - if key in _SLURM_KEY_ALIASES.values() - } - - for i in range(add_workers): + for i in range(num_jobs): jobs.append( s_executor.submit( RayWorkerJob( @@ -440,6 +517,7 @@ def start( worker_wait_timeout_seconds, start_wait_time_seconds, logical_resources, + worker_params, ) ) ) diff --git a/matrix/cluster/ray_worker_job.py b/matrix/cluster/ray_worker_job.py index eae16f3..3865863 100644 --- a/matrix/cluster/ray_worker_job.py +++ b/matrix/cluster/ray_worker_job.py @@ -22,25 +22,17 @@ class _RayWorkerConfiguration: """Configuration class for Ray worker job settings.""" cluster_info: Any + slurm_requirements: Dict[str, Any] worker_wait_timeout_seconds: int = 60 start_wait_time_seconds: int = 60 environment: Dict[str, str] = field(default_factory=dict) logical_resources: str = "{}" def _determine_resource_allocation(self) -> tuple: - """Dynamically determine CPU and GPU resources based on execution environment.""" - executor_type = os.environ.get("SUBMITIT_EXECUTOR", "slurm") - - if executor_type == "local": - num_cpus = max((os.cpu_count() or 0) // 2, 1) - num_gpus = len( - [s for s in os.environ.get("CUDA_VISIBLE_DEVICES", "").split(",") if s] - ) - else: - num_cpus = int(os.environ.get("SLURM_CPUS_ON_NODE", 1)) - num_gpus = int(os.environ.get("SLURM_GPUS_ON_NODE", 0)) - - return num_cpus, num_gpus + return ( + self.slurm_requirements["cpus_per_task"], + self.slurm_requirements["gpus_per_node"], + ) class _RayWorkerJobExecutor: @@ -120,6 +112,7 @@ def __init__( worker_wait_timeout_seconds: int, start_wait_time_seconds: int, # TODO pass this around properly logical_resources: dict[str, Any], + slurm_requirements: dict[str, Any], ): # Store the cluster information for later use self.cluster_info = cluster_info @@ -130,6 +123,7 @@ def __init__( # Initial wait time for Ray worker to start self.start_wait_time_seconds = 60 self.logical_resources = logical_resources + self.slurm_requirements = slurm_requirements def __call__( self, @@ -146,6 +140,7 @@ def __call__( worker_wait_timeout_seconds=self.worker_wait_timeout_seconds, start_wait_time_seconds=self.start_wait_time_seconds, logical_resources=json.dumps(self.logical_resources), + slurm_requirements=self.slurm_requirements, ) _RayWorkerJobExecutor.execute(config) diff --git a/tests/integration/cluster/test_no_array_mode.py b/tests/integration/cluster/test_no_array_mode.py new file mode 100644 index 0000000..34e1383 --- /dev/null +++ b/tests/integration/cluster/test_no_array_mode.py @@ -0,0 +1,134 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +import tempfile +import time +import uuid +from typing import Any, Generator + +import pytest +import ray + +from matrix.cli import Cli +from matrix.utils.ray import status_is_pending, status_is_success + + +@pytest.fixture(scope="module") +def matrix_cluster_no_array() -> Generator[Any, Any, Any]: + """Start and stop Ray cluster with use_array=False for the duration of these tests.""" + with tempfile.TemporaryDirectory() as temp_dir: + cli = Cli(cluster_id=str(uuid.uuid4()), matrix_dir=temp_dir) + cli.start_cluster( + add_workers=1, + slurm=None, + local={"gpus_per_node": 0, "cpus_per_task": 2}, + enable_grafana=False, + use_array=False, + ) + with cli.cluster: + yield cli + + +def test_cluster_starts_no_array(matrix_cluster_no_array: Cli) -> None: + """Test that cluster starts successfully with use_array=False.""" + cli = matrix_cluster_no_array + cluster_info = cli.cluster.cluster_info() + + assert cluster_info is not None, "Cluster info should exist" + assert cluster_info.hostname is not None, "Hostname should be set" + assert cluster_info.port > 0, "Port should be set" + + +def test_workers_have_resources_no_array(matrix_cluster_no_array: Cli) -> None: + """Test that workers have CPU resources allocated when use_array=False.""" + cli = matrix_cluster_no_array + cluster_info = cli.cluster.cluster_info() + assert cluster_info is not None + + # Initialize ray connection + from matrix.utils.ray import init_ray_if_necessary + + init_ray_if_necessary(cluster_info) + + # Get cluster resources + resources = ray.cluster_resources() + + # Workers should have CPU resources + assert ( + resources.get("CPU", 0) > 0 + ), "Workers should have CPU resources when use_array=False" + + +def test_deploy_hello_no_array(matrix_cluster_no_array: Cli) -> None: + """Test that applications can be deployed with use_array=False.""" + cli = matrix_cluster_no_array + cli.deploy_applications(applications=[{"name": "hello", "app_type": "hello"}]) + + for _ in range(10): + status = cli.app.app_status("hello") + if not status_is_pending(status): + break + time.sleep(5) + + assert status_is_success(status), f"Bad status {status}" + assert cli.check_health("hello") + + +def test_incremental_scaling_no_array() -> None: + """Test that workers can be added incrementally with use_array=False.""" + with tempfile.TemporaryDirectory() as temp_dir: + cli = Cli(cluster_id=str(uuid.uuid4()), matrix_dir=temp_dir) + + # Start with 1 worker + cli.start_cluster( + add_workers=1, + slurm=None, + local={"gpus_per_node": 0, "cpus_per_task": 2}, + enable_grafana=False, + use_array=False, + ) + + cluster_info = cli.cluster.cluster_info() + assert cluster_info is not None + + from matrix.utils.ray import init_ray_if_necessary + + init_ray_if_necessary(cluster_info) + + # Check initial resources + initial_resources = ray.cluster_resources() + initial_cpu = initial_resources.get("CPU", 0) + assert initial_cpu > 0, "Should have CPU resources initially" + + # Add more workers + cli.start_cluster( + add_workers=1, + slurm=None, + local={"gpus_per_node": 0, "cpus_per_task": 2}, + enable_grafana=False, + use_array=False, + ) + + # Wait for new worker to join + max_wait = 30 # seconds + start_time = time.time() + new_cpu = initial_cpu + + while time.time() - start_time < max_wait: + time.sleep(2) + new_resources = ray.cluster_resources() + new_cpu = new_resources.get("CPU", 0) + if new_cpu > initial_cpu: + break + + # Check that resources increased (or at least stayed the same in local mode) + # In local mode, the worker might share the same host so CPU might not increase + assert ( + new_cpu >= initial_cpu + ), f"CPU should not decrease: {initial_cpu} -> {new_cpu}" + + # Cleanup + cli.cluster.stop()