Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 1 addition & 39 deletions matrix/app_server/deploy_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,7 @@
OUTLINES_CACHE_DIR: {{ temp_dir }}/.outlines
RAY_DEBUG: legacy
TIKTOKEN_RS_CACHE_DIR: {{ temp_dir }}
VLLM_CACHE_ROOT: {{ cache_dir["VLLM_CACHE_ROOT"] }}
TORCH_EXTENSIONS_DIR: {{ cache_dir["TORCH_EXTENSIONS_DIR"] }}
TORCHINDUCTOR_CACHE_DIR: {{ cache_dir["TORCHINDUCTOR_CACHE_DIR"] }}
TORCH_COMPILE_DEBUG_DIR: {{ cache_dir["TORCH_COMPILE_DEBUG_DIR"] }}
TRITON_CACHE_DIR: {{ cache_dir["TRITON_CACHE_DIR"] }}
VLLM_CACHE_ROOT: {{ temp_dir }}/.cache/vllm
args:
model: {{ app.model_name }}
{% for key, value in app.items() %}
Expand Down Expand Up @@ -375,38 +371,6 @@ def delete_apps(cluster_info, apps_list: List[Dict[str, Union[str, int]]] | None
print(f"Actors deleted {actors}")


def setup_native_cache_dirs(base_dir=None):
"""
Make Torch / vLLM caches ABI-safe across:
- vLLM versions
- PyTorch versions
- CUDA versions

"""
import pathlib

import torch
import vllm

if base_dir is None:
base_dir = os.path.expanduser("~/.cache/matrix")

tag = f"torch{torch.__version__}-cuda{torch.version.cuda or 'cpu'}-vllm{vllm.__version__}"

def d(name):
path = pathlib.Path(base_dir) / tag / name
path.mkdir(parents=True, exist_ok=True)
return str(path)

return {
"VLLM_CACHE_ROOT": d("vllm"),
"TORCH_EXTENSIONS_DIR": d("torch-extensions"),
"TORCHINDUCTOR_CACHE_DIR": d("torch-inductor"),
"TORCH_COMPILE_DEBUG_DIR": d("torch-compile"),
"TRITON_CACHE_DIR": d("triton"),
}


def get_yaml_for_deployment(
cluster_info: ClusterInfo,
action: Action,
Expand All @@ -421,7 +385,6 @@ def get_yaml_for_deployment(
from matrix.app_server.llm.ray_serve_vllm import BaseDeployment

temp_dir = cluster_info.temp_dir
cache_dir = setup_native_cache_dirs()
if yaml_config is None:
assert applications is not None
yaml_str = Template(common_config).render(
Expand Down Expand Up @@ -481,7 +444,6 @@ def get_yaml_for_deployment(
temp_dir=temp_dir,
non_model_params=non_model_params,
app=app,
cache_dir=cache_dir,
)
elif app_type == "code":
if "name" not in app:
Expand Down
15 changes: 6 additions & 9 deletions matrix/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,25 +81,21 @@ def start_cluster(
local: tp.Dict[str, tp.Union[str, int]] | None = None,
enable_grafana: bool = False,
force_new_head: bool = False,
use_array: bool = True,
) -> tp.Dict[str, tp.Any]:
"""
Starts the Ray cluster with additional keyword arguments. Only do this for new clusters.

Args:
**kwargs: Arbitrary keyword arguments passed to the RayCluster's start_head method.
"""
"""
Starts the Ray cluster with the specified number of workers and additional configuration.

Can add additional workers if the cluster already exists.

Args:
add_workers (int): Number of worker nodes to add in the cluster.
slurm (dict, optional): resources for slurm cluster.
local (dict, optional): resources for local cluster.
enable_grafana (bool, optional): If True, enable prometheus and grafana dashboard.
force_new_head (bool): force to remove head.json if haven't run 'matrix stop_cluster'.

use_array (bool): If True, use Slurm job arrays for workers (default: True).

Returns:
None
"""
Expand All @@ -109,6 +105,7 @@ def start_cluster(
local,
enable_grafana=enable_grafana,
force_new_head=force_new_head,
use_array=use_array,
)
return convert_to_json_compatible(status)

Expand Down
168 changes: 123 additions & 45 deletions matrix/cluster/ray_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import typing as tp
from pathlib import Path

import psutil

from matrix.common import JOB_MANAGER_STORE
from matrix.common.cluster_info import ClusterInfo
from matrix.utils.basics import convert_to_json_compatible
Expand Down Expand Up @@ -46,6 +48,34 @@ def _normalize_slurm_keys(
return normalized


def _parse_time_limit(time_str: str) -> int | None:
"""Parse SLURM time limit string to minutes.

Formats: infinite, DAYS-HH:MM:SS, HH:MM:SS, MM:SS, MM
"""
if not time_str or time_str.lower() == "infinite":
return None

try:
days = 0
if "-" in time_str:
days_part, time_str = time_str.split("-", 1)
days = int(days_part)

parts = time_str.split(":")
if len(parts) == 3: # HH:MM:SS
hours, minutes, seconds = map(int, parts)
elif len(parts) == 2: # MM:SS
hours = 0
minutes, seconds = map(int, parts)
else: # MM
return int(parts[0])

return days * 24 * 60 + hours * 60 + minutes
except (ValueError, IndexError):
return None


def _get_slurm_default_requirements(requirements: dict[str, tp.Any]):
"""
Extract SLURM partition info including CPU and memory requirements.
Expand All @@ -70,10 +100,10 @@ def _get_slurm_default_requirements(requirements: dict[str, tp.Any]):
requirements["partition"] = partition

# Get detailed info for the partition
# %P=partition, %c=CPUs, %m=memory(MB), %l=time_limit, %N=nodes
# %G=GRES(GPUs), %c=CPUs, %m=memory(MB), %l=time_limit
sinfo_output = (
subprocess.check_output(
["sinfo", "-h", "-p", partition, "-o", "%G %c %m"],
["sinfo", "-h", "-p", partition, "-o", "%G %c %m %l"],
stderr=subprocess.PIPE,
)
.decode()
Expand All @@ -85,11 +115,12 @@ def _get_slurm_default_requirements(requirements: dict[str, tp.Any]):
max_cpus = 0
max_memory = 0
max_gpus = 0
max_time_min = 0
gpu_type = None

for line in lines:
parts = line.split()
if len(parts) >= 3:
if len(parts) >= 4:
gpu_info = _parse_gpu_gres(parts[0])
if gpu_info:
if gpu_info["count"] > max_gpus:
Expand All @@ -100,11 +131,16 @@ def _get_slurm_default_requirements(requirements: dict[str, tp.Any]):
max_cpus = max(max_cpus, cpus)
memory = _parse_slurm_value(parts[2])
max_memory = max(max_memory, memory)
time_min = _parse_time_limit(parts[3])
if time_min is not None:
max_time_min = max(max_time_min, time_min)

requirements["cpus_per_task"] = max_cpus
requirements["mem_gb"] = max_memory // 1024
if max_gpus > 0:
requirements["gpus_per_node"] = max_gpus
if max_time_min > 0:
requirements["timeout_min"] = max_time_min

except subprocess.CalledProcessError as e:
print(f"Error running sinfo: {e}")
Expand Down Expand Up @@ -138,6 +174,58 @@ def _parse_gpu_gres(gres_str):
return None


def _apply_default_requirements(
requirements: dict[str, tp.Any],
executor: str,
) -> dict[str, tp.Any]:
"""
Apply default Slurm requirements and auto-detect resources from partition.

Args:
requirements: User-provided requirements
executor: "slurm" or "local"

Returns:
Updated requirements with defaults and auto-detected values
"""
default_params: dict[str, tp.Any] = {
"ntasks_per_node": 1,
}

partition = requirements.get("partition")
if partition:
default_params["partition"] = partition

# Auto-detect resources from Slurm partition info
if executor == "slurm":
default_params = _get_slurm_default_requirements(default_params)
else:
num_cpus = max((os.cpu_count() or 0), 1)
mem_gb = psutil.virtual_memory().total // (1024**3)
num_gpus = len(
[s for s in os.environ.get("CUDA_VISIBLE_DEVICES", "").split(",") if s]
)

default_params["cpus_per_task"] = num_cpus
default_params["mem_gb"] = mem_gb
default_params["gpus_per_node"] = num_gpus
default_params["timeout_min"] = 10080 # fallback for local executor

# Merge defaults into requirements (user values take precedence)
result = requirements.copy()
result.update(
{key: value for key, value in default_params.items() if key not in requirements}
)
if "mem_gb" not in requirements and "cpus_per_task" in requirements:
result["mem_gb"] = (
default_params["mem_gb"]
* requirements["cpus_per_task"]
// default_params["cpus_per_task"]
) # take a fraction based on cpu

return result


class RayCluster:
"""
Manages the lifecycle of a Ray cluster on a Slurm-based system.
Expand Down Expand Up @@ -279,6 +367,7 @@ def start(
local: tp.Dict[str, tp.Union[str, int]] | None,
enable_grafana: bool = False,
force_new_head: bool = False,
use_array: bool = True,
):
"""
Starts a Ray cluster on Slurm.
Expand All @@ -296,6 +385,7 @@ def start(
enable_grafana (bool): Whether to start Prometheus and Grafana
for monitoring (default: True).
force_new_head (bool): force to remove head.json if haven't run 'matrix stop_cluster'.
use_array (bool): If True, use Slurm job arrays for workers (default: True).
"""
import submitit

Expand Down Expand Up @@ -325,29 +415,24 @@ def start(
if self._cluster_json.exists():
self._cluster_json.unlink()

if self._cluster_json.exists():
is_new_cluster = not self._cluster_json.exists()
if not is_new_cluster:
print(f"Adding workers to existing cluster:\n{self.cluster_info()}")
# todo: check the cluser is alive
else:
# start the head node
head_params = requirements.copy()
if (
add_workers > 0
): # cpu host, overwrite as the param are meant for workers
head_params["gpus_per_node"] = 0
head_params["cpus_per_task"] = 20

head_params = _apply_default_requirements(head_params, executor)
print(f"Head Slurm parameters: {head_params}")

s_executor = submitit.AutoExecutor(
folder=str(self._log_dir),
cluster=executor,
)
head_default_params = {"timeout_min": 10080, "cpus_per_task": 20}
if add_workers == 0:
head_params = requirements
else:
head_params = {
k: v for k, v in requirements.items() if k in common_params
}
head_params.update(
{
key: value
for key, value in head_default_params.items()
if key not in head_params
}
)
s_executor.update_parameters(
name=f"ray_head_{self.cluster_id}",
**head_params,
Expand Down Expand Up @@ -395,51 +480,44 @@ def start(

# start the workers
if add_workers > 0:
worker_params = _apply_default_requirements(requirements, executor)
if not use_array:
worker_params["nodes"] = add_workers
num_jobs = 1
else:
num_jobs = add_workers
logical_resources = {
f"{key}-{value}": 1
for key, value in worker_params.items()
if key in _SLURM_KEY_ALIASES.values()
}
print(f"Worker Slurm parameters: {worker_params}")

s_executor = submitit.AutoExecutor(
folder=str(self._log_dir), cluster=executor
)
default_params: dict[str, tp.Any] = {
"ntasks_per_node": 1,
"timeout_min": 10080,
}
partition = requirements.get("partition")
if partition:
default_params["partition"] = partition
if executor == "slurm":
default_params = _get_slurm_default_requirements(default_params)
requirements.update(
{
key: value
for key, value in default_params.items()
if key not in requirements
}
)
print(requirements)
s_executor.update_parameters(
name=f"ray_worker_{self.cluster_id}",
**requirements,
**worker_params,
)

cluster_info = self.cluster_info()
assert cluster_info is not None

# submit job
jobs = []
with (
s_executor.batch()
): # TODO set slurm array max parallelism here, because we really want all jobs to be scheduled at the same time
logical_resources = {
f"{key}-{value}": 1
for key, value in requirements.items()
if key in _SLURM_KEY_ALIASES.values()
}

for i in range(add_workers):
for i in range(num_jobs):
jobs.append(
s_executor.submit(
RayWorkerJob(
cluster_info,
worker_wait_timeout_seconds,
start_wait_time_seconds,
logical_resources,
worker_params,
)
)
)
Expand Down
Loading