Skip to content

Commit 319c68a

Browse files
authored
label workers by logial_resouces to constrain deployment (#129)
1 parent c2a5eb5 commit 319c68a

File tree

4 files changed

+21
-4
lines changed

4 files changed

+21
-4
lines changed

matrix/app_server/llm/ray_serve_vllm.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -603,12 +603,22 @@ def _build_app(cli_args: Dict[str, Any], use_grpc) -> serve.Application:
603603
{"CPU": ray_resources.get("num_cpus", 1), accelerator: 1}
604604
) # for the vLLM actors
605605

606+
ray_resources.pop("num_cpus", None)
607+
ray_resources.pop("num_gpus", None)
608+
custom_resources = ray_resources.pop("resources", None)
609+
610+
# Add custom resources to placement group bundles if specified
611+
# This ensures the deployment is scheduled on nodes with the required resources
612+
if custom_resources:
613+
for bundle in pg_resources:
614+
bundle.update(custom_resources)
606615
# We use the "STRICT_PACK" strategy below to ensure all vLLM actors are placed on
607616
# the same Ray node.
608617
cls = VLLMDeployment if not use_grpc else GrpcDeployment
609618
return cls.options( # type: ignore[union-attr]
610619
placement_group_bundles=pg_resources,
611620
placement_group_strategy="STRICT_PACK" if pp == 1 else "PACK",
621+
**ray_resources,
612622
).bind(
613623
engine_args,
614624
parsed_args.response_role,

matrix/cli.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ def start_cluster(
8383
force_new_head: bool = False,
8484
use_array: bool = True,
8585
prometheus_scrape_interval: int = 10,
86+
logical_resources: tp.Dict[str, int] | None = None,
8687
) -> tp.Dict[str, tp.Any]:
8788
"""
8889
Starts the Ray cluster with the specified number of workers and additional configuration.
@@ -96,6 +97,8 @@ def start_cluster(
9697
enable_grafana (bool, optional): If True, enable prometheus and grafana dashboard.
9798
force_new_head (bool): force to remove head.json if haven't run 'matrix stop_cluster'.
9899
use_array (bool): If True, use Slurm job arrays for workers (default: True).
100+
logical_resources (dict, optional): Custom logical resources to add to workers.
101+
Keys are resource names, values are counts. Defaults to empty.
99102
100103
Returns:
101104
None
@@ -108,6 +111,7 @@ def start_cluster(
108111
force_new_head=force_new_head,
109112
use_array=use_array,
110113
prometheus_scrape_interval=prometheus_scrape_interval,
114+
logical_resources=logical_resources or {},
111115
)
112116
return convert_to_json_compatible(status)
113117

matrix/cluster/ray_cluster.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,7 @@ def start(
370370
force_new_head: bool = False,
371371
use_array: bool = True,
372372
prometheus_scrape_interval: int = 10,
373+
logical_resources: tp.Dict[str, int] | None = None,
373374
):
374375
"""
375376
Starts a Ray cluster on Slurm.
@@ -440,7 +441,7 @@ def start(
440441
cluster=executor,
441442
)
442443
s_executor.update_parameters(
443-
name=f"ray_head_{self.cluster_id}",
444+
name=f"matrix_head_{self.cluster_id}",
444445
**head_params,
445446
)
446447
head_job = s_executor.submit(
@@ -492,18 +493,19 @@ def start(
492493
num_jobs = 1
493494
else:
494495
num_jobs = add_workers
495-
logical_resources = {
496+
worker_logical_resources = {
496497
f"{key}-{value}": 1
497498
for key, value in worker_params.items()
498499
if key in _SLURM_KEY_ALIASES.values()
499500
}
501+
worker_logical_resources.update(logical_resources or {})
500502
print(f"Worker Slurm parameters: {worker_params}")
501503

502504
s_executor = submitit.AutoExecutor(
503505
folder=str(self._log_dir), cluster=executor
504506
)
505507
s_executor.update_parameters(
506-
name=f"ray_worker_{self.cluster_id}",
508+
name=f"matrix_worker_{self.cluster_id}",
507509
**worker_params,
508510
)
509511

@@ -522,7 +524,7 @@ def start(
522524
cluster_info,
523525
worker_wait_timeout_seconds,
524526
start_wait_time_seconds,
525-
logical_resources,
527+
worker_logical_resources,
526528
worker_params,
527529
)
528530
)

matrix/cluster/ray_worker_job.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ def _start_ray_worker(
8585
worker_env (Dict[str, str]): Worker environment variables
8686
num_cpus (int): Number of CPUs
8787
num_gpus (int): Number of GPUs
88+
logical_resources (str): JSON string of logical resources
8889
"""
8990
subprocess.run(
9091
[

0 commit comments

Comments
 (0)