Skip to content

Commit 6158641

Browse files
authored
Merge pull request #739 from NVIDIA/am/dynamo
Expand K8s Dynamo support to disagg and multinode
2 parents 6ef083c + 3fba04f commit 6158641

File tree

9 files changed

+276
-115
lines changed

9 files changed

+276
-115
lines changed

conf/experimental/ai_dynamo/test/vllm.toml

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,10 @@ docker_image_url = "nvcr.io/nvidia/ai-dynamo/vllm-runtime:0.6.1.post1"
2929
decode-cmd = 'python3 -m dynamo.vllm'
3030

3131
[cmd_args.dynamo.prefill_worker]
32-
num-nodes = 2
33-
gpu-memory-utilization = 0.95
34-
tensor-parallel-size = 8
3532
pipeline-parallel-size = 1
36-
data-parallel-size = 1
37-
extra-args = "--no-enable-expert-parallel"
3833

3934
[cmd_args.dynamo.decode_worker]
40-
num-nodes = 2
41-
gpu-memory-utilization = 0.95
42-
tensor-parallel-size = 8
4335
pipeline-parallel-size = 1
44-
data-parallel-size = 1
45-
extra-args = "--no-enable-expert-parallel"
4636

4737
[cmd_args.genai_perf]
4838
model = "Qwen/Qwen3-0.6B"
@@ -52,10 +42,10 @@ docker_image_url = "nvcr.io/nvidia/ai-dynamo/vllm-runtime:0.6.1.post1"
5242
output-tokens-mean = 500
5343
output-tokens-stddev = 0
5444
random-seed = 123
55-
request-count = 2
45+
request-count = 128
5646
synthetic-input-tokens-mean = 300
5747
synthetic-input-tokens-stddev = 0
58-
warmup-request-count = 1
48+
warmup-request-count = 10
5949
concurrency = 1
6050
extra-args = "--streaming -- -v --async"
6151

conf/experimental/ai_dynamo/test_scenario/vllm_k8s.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,7 @@ test_name = "vLLM-Qwen3-0.6B"
2424
[Tests.cmd_args.dynamo]
2525
[Tests.cmd_args.dynamo.prefill_worker]
2626
num-nodes = 1
27+
tensor-parallel-size = 8
2728
[Tests.cmd_args.dynamo.decode_worker]
2829
num-nodes = 1
30+
tensor-parallel-size = 8

conf/experimental/ai_dynamo/test_scenario/vllm_slurm.toml

Lines changed: 5 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,13 @@ name = "dynamo-vllm-slurm"
1818

1919
[[Tests]]
2020
id = "qwen3-0.6B"
21-
num_nodes = 3
21+
test_name = "vLLM-Qwen3-0.6B"
22+
num_nodes = 4
2223
time_limit = "00:20:00"
2324

24-
name = "vllm"
25-
description = "vllm"
26-
test_template_name = "AIDynamo"
27-
2825
[Tests.cmd_args]
29-
docker_image_url = "nvcr.io/nvidia/ai-dynamo/vllm-runtime:0.6.1.post1"
3026

3127
[Tests.cmd_args.dynamo]
32-
backend = "vllm"
33-
model = "Qwen/Qwen3-0.6B"
34-
decode-cmd = 'python3 -m dynamo.vllm'
3528
decode-initialized-regex = 'VllmWorker.*has.been.initialized'
3629
etcd-cmd = "etcd --log-level debug"
3730
etcd-port = 2379
@@ -41,41 +34,19 @@ test_template_name = "AIDynamo"
4134
nats-port = 4222
4235
node-setup-cmd = "apt-get update -o APT::Sandbox::User=root && apt-get install -y curl libibverbs1 rdma-core ibverbs-utils libibumad3 libnuma1 librdmacm1 ibverbs-providers; /usr/local/ucx/bin/ucx_info -d |grep Transport | sort -u;"
4336
port = 8787
44-
prefill-cmd = 'python3 -m dynamo.vllm --is-prefill-worker'
4537
prefill-initialized-regex = 'VllmWorker.*has.been.initialized'
46-
workspace-path = "/workspace/"
4738

4839
[Tests.cmd_args.dynamo.prefill_worker]
49-
data-parallel-size = 1
5040
gpu-memory-utilization = 0.90
5141
max_model_len = 19280
5242
num-nodes = 2
53-
pipeline-parallel-size = 1
54-
tensor-parallel-size = 2
55-
extra-args = "--no-enable-expert-parallel"
43+
tensor-parallel-size = 4
5644

5745
[Tests.cmd_args.dynamo.decode_worker]
58-
data-parallel-size = 1
5946
gpu-memory-utilization = 0.90
6047
max_model_len = 19280
61-
num-nodes = 1
62-
pipeline-parallel-size = 1
63-
tensor-parallel-size = 2
64-
extra-args = "--no-enable-expert-parallel"
65-
66-
[Tests.cmd_args.genai_perf]
67-
concurrency = 8
68-
endpoint = "v1/chat/completions"
69-
endpoint-type = "chat"
70-
extra-inputs = 'min_tokens:10'
71-
output-tokens-mean = 150
72-
output-tokens-stddev = 0
73-
random-seed = 123
74-
request-count = 128
75-
synthetic-input-tokens-mean = 3000
76-
synthetic-input-tokens-stddev = 0
77-
warmup-request-count = 8
78-
extra-args = "--streaming -- -v --async"
48+
num-nodes = 2
49+
tensor-parallel-size = 4
7950

8051
[Tests.extra_env_vars]
8152
UCX_LOG_LEVEL = "warn"

src/cloudai/workloads/ai_dynamo/ai_dynamo.py

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
import logging
1818
from pathlib import Path
19-
from typing import Optional, Union
19+
from typing import Optional
2020

2121
from pydantic import AliasChoices, BaseModel, ConfigDict, Field
2222

@@ -31,10 +31,36 @@ class WorkerBaseArgs(BaseModel):
3131

3232
model_config = ConfigDict(extra="allow", populate_by_name=True)
3333

34-
num_nodes: Union[int, list[int]] = Field(
35-
serialization_alias="num-nodes", validation_alias=AliasChoices("num-nodes", "num_nodes")
34+
num_nodes: int | list[int] = Field(
35+
default=1, serialization_alias="num-nodes", validation_alias=AliasChoices("num-nodes", "num_nodes")
36+
)
37+
nodes: str | None = Field(default=None)
38+
39+
data_parallel_size: int | list[int] | None = Field(
40+
default=None,
41+
serialization_alias="data-parallel-size",
42+
validation_alias=AliasChoices("data-parallel-size", "data_parallel_size"),
43+
)
44+
gpu_memory_utilization: float | list[float] | None = Field(
45+
default=None,
46+
serialization_alias="gpu-memory-utilization",
47+
validation_alias=AliasChoices("gpu-memory-utilization", "gpu_memory_utilization"),
48+
)
49+
pipeline_parallel_size: int | list[int] | None = Field(
50+
default=None,
51+
serialization_alias="pipeline-parallel-size",
52+
validation_alias=AliasChoices("pipeline-parallel-size", "pipeline_parallel_size"),
53+
)
54+
tensor_parallel_size: int | list[int] | None = Field(
55+
default=None,
56+
serialization_alias="tensor-parallel-size",
57+
validation_alias=AliasChoices("tensor-parallel-size", "tensor_parallel_size"),
58+
)
59+
extra_args: str | list[str] | None = Field(
60+
default=None,
61+
serialization_alias="extra-args",
62+
validation_alias=AliasChoices("extra-args", "extra_args"),
3663
)
37-
nodes: Optional[str] = Field(default=None)
3864

3965

4066
class PrefillWorkerArgs(WorkerBaseArgs):
@@ -57,17 +83,22 @@ class AIDynamoArgs(BaseModel):
5783
model: str = "Qwen/Qwen3-0.6B"
5884
backend: str = "vllm"
5985
workspace_path: str = Field(
86+
default="/workspace",
6087
serialization_alias="workspace-path",
6188
validation_alias=AliasChoices("workspace-path", "workspace_path"),
62-
default="/workspace",
6389
)
64-
decode_worker: DecodeWorkerArgs
90+
decode_worker: DecodeWorkerArgs = Field(default_factory=DecodeWorkerArgs)
6591
decode_cmd: str = Field(
92+
default="python3 -m dynamo.vllm",
6693
serialization_alias="decode-cmd",
6794
validation_alias=AliasChoices("decode-cmd", "decode_cmd"),
95+
)
96+
prefill_worker: PrefillWorkerArgs | None = None
97+
prefill_cmd: str = Field(
6898
default="python3 -m dynamo.vllm",
99+
serialization_alias="prefill-cmd",
100+
validation_alias=AliasChoices("prefill-cmd", "prefill_cmd"),
69101
)
70-
prefill_worker: PrefillWorkerArgs
71102

72103

73104
class GenAIPerfArgs(BaseModel):

src/cloudai/workloads/ai_dynamo/kubernetes_json_gen_strategy.py

Lines changed: 82 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,19 @@
1919
from pathlib import Path
2020
from typing import Any, Dict, cast
2121

22+
import yaml
23+
2224
from cloudai.core import JsonGenStrategy
2325
from cloudai.systems.kubernetes import KubernetesSystem
2426

25-
from .ai_dynamo import AIDynamoTestDefinition
27+
from .ai_dynamo import AIDynamoTestDefinition, WorkerBaseArgs
2628

2729

2830
class AIDynamoKubernetesJsonGenStrategy(JsonGenStrategy):
2931
"""JSON generation strategy for AI Dynamo on Kubernetes systems."""
3032

33+
DEPLOYMENT_FILE_NAME = "deployment.yaml"
34+
3135
def _install_python_packages(self, repo_root: Path, venv_pip: Path) -> None:
3236
installs = [
3337
("perf_analyzer", repo_root),
@@ -68,30 +72,50 @@ def gen_frontend_dict(self) -> dict[str, Any]:
6872
}
6973

7074
def gen_decode_dict(self) -> dict[str, Any]:
71-
system = cast(KubernetesSystem, self.system)
7275
tdef = cast(AIDynamoTestDefinition, self.test_run.test)
73-
return {
74-
"dynamoNamespace": system.default_namespace,
75-
"componentType": "worker",
76-
"replicas": 1,
77-
"resources": {"limits": {"gpu": f"{system.gpus_per_node}"}},
78-
"extraPodSpec": {
79-
"mainContainer": {
80-
"image": tdef.cmd_args.docker_image_url,
81-
"workingDir": tdef.cmd_args.dynamo.workspace_path,
82-
"command": tdef.cmd_args.dynamo.decode_cmd.split(),
83-
"args": ["--model", tdef.cmd_args.dynamo.model],
84-
}
85-
},
86-
}
76+
77+
decode_cfg = self._get_base_service_dict()
78+
decode_cfg["extraPodSpec"]["mainContainer"]["command"] = tdef.cmd_args.dynamo.decode_cmd.split()
79+
80+
args = ["--model", tdef.cmd_args.dynamo.model]
81+
if tdef.cmd_args.dynamo.prefill_worker:
82+
decode_cfg["subComponentType"] = "decode-worker"
83+
args.append("--is-decode-worker")
84+
args.extend(self._args_from_worker_config(tdef.cmd_args.dynamo.decode_worker))
85+
86+
decode_cfg["extraPodSpec"]["mainContainer"]["args"] = args
87+
88+
self._set_multinode_if_needed(decode_cfg, tdef.cmd_args.dynamo.decode_worker)
89+
90+
return decode_cfg
91+
92+
def gen_prefill_dict(self) -> dict[str, Any]:
93+
tdef = cast(AIDynamoTestDefinition, self.test_run.test)
94+
if not tdef.cmd_args.dynamo.prefill_worker:
95+
raise ValueError("Prefill worker configuration is not defined in the test definition.")
96+
97+
prefill_cfg = self._get_base_service_dict()
98+
prefill_cfg["subComponentType"] = "prefill"
99+
prefill_cfg["extraPodSpec"]["mainContainer"]["command"] = tdef.cmd_args.dynamo.prefill_cmd.split()
100+
101+
prefill_cfg["extraPodSpec"]["mainContainer"]["args"] = [
102+
"--model",
103+
tdef.cmd_args.dynamo.model,
104+
"--is-prefill-worker",
105+
*self._args_from_worker_config(tdef.cmd_args.dynamo.prefill_worker),
106+
]
107+
108+
self._set_multinode_if_needed(prefill_cfg, tdef.cmd_args.dynamo.prefill_worker)
109+
110+
return prefill_cfg
87111

88112
def gen_json(self) -> Dict[Any, Any]:
89113
td = cast(AIDynamoTestDefinition, self.test_run.test)
90114
k8s_system = cast(KubernetesSystem, self.system)
91115

92116
self._setup_genai(td)
93117

94-
return {
118+
deployment = {
95119
"apiVersion": "nvidia.com/v1alpha1",
96120
"kind": "DynamoGraphDeployment",
97121
"metadata": {"name": k8s_system.default_namespace},
@@ -102,3 +126,44 @@ def gen_json(self) -> Dict[Any, Any]:
102126
},
103127
},
104128
}
129+
if td.cmd_args.dynamo.prefill_worker:
130+
deployment["spec"]["services"]["VllmPrefillWorker"] = self.gen_prefill_dict()
131+
132+
with (self.test_run.output_path / self.DEPLOYMENT_FILE_NAME).open("w") as f:
133+
yaml.safe_dump(deployment, f)
134+
135+
return deployment
136+
137+
def _get_base_service_dict(self) -> dict[str, Any]:
138+
system = cast(KubernetesSystem, self.system)
139+
tdef = cast(AIDynamoTestDefinition, self.test_run.test)
140+
return {
141+
"dynamoNamespace": system.default_namespace,
142+
"componentType": "worker",
143+
"replicas": 1,
144+
"resources": {"limits": {"gpu": f"{system.gpus_per_node}"}},
145+
"extraPodSpec": {
146+
"mainContainer": {
147+
"image": tdef.cmd_args.docker_image_url,
148+
"workingDir": tdef.cmd_args.dynamo.workspace_path,
149+
}
150+
},
151+
}
152+
153+
def _to_dynamo_arg(self, arg_name: str) -> str:
154+
return "--" + arg_name.replace("_", "-")
155+
156+
def _dynamo_args_dict(self, model: WorkerBaseArgs) -> dict:
157+
return model.model_dump(exclude={"num_nodes", "extra_args", "nodes"}, exclude_none=True)
158+
159+
def _args_from_worker_config(self, worker: WorkerBaseArgs) -> list[str]:
160+
args = []
161+
for arg, value in self._dynamo_args_dict(worker).items():
162+
args.extend([self._to_dynamo_arg(arg), str(value)])
163+
if worker.extra_args:
164+
args.append(f"{worker.extra_args}")
165+
return args
166+
167+
def _set_multinode_if_needed(self, cfg: dict[str, Any], worker: WorkerBaseArgs) -> None:
168+
if cast(int, worker.num_nodes) > 1:
169+
cfg["multinode"] = {"nodeCount": worker.num_nodes}

src/cloudai/workloads/ai_dynamo/report_generation_strategy.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,15 @@
2020
import logging
2121
import shutil
2222
from pathlib import Path
23-
from typing import ClassVar
23+
from typing import TYPE_CHECKING, ClassVar, cast
2424

2525
from cloudai.core import METRIC_ERROR, ReportGenerationStrategy
26+
from cloudai.systems.kubernetes.kubernetes_system import KubernetesSystem
2627
from cloudai.systems.slurm.slurm_system import SlurmSystem
2728

29+
if TYPE_CHECKING:
30+
from .ai_dynamo import AIDynamoTestDefinition
31+
2832
CSV_FILES_PATTERN = "profile*_genai_perf.csv"
2933
JSON_FILES_PATTERN = "profile*_genai_perf.json"
3034

@@ -118,16 +122,19 @@ def get_metric(self, metric: str) -> float:
118122

119123
def _calculate_total_gpus(self) -> int | None:
120124
gpus_per_node = None
121-
if isinstance(self.system, SlurmSystem):
125+
if isinstance(self.system, (SlurmSystem, KubernetesSystem)):
122126
gpus_per_node = self.system.gpus_per_node
123127

124128
if gpus_per_node is None:
125129
return None
126130

127-
num_frontend_nodes = 1
128-
num_prefill_nodes = self.test_run.test.cmd_args.dynamo.prefill_worker.num_nodes
129-
num_decode_nodes = self.test_run.test.cmd_args.dynamo.decode_worker.num_nodes
131+
tdef = cast("AIDynamoTestDefinition", self.test_run.test)
130132

133+
num_frontend_nodes = 1
134+
num_prefill_nodes = (
135+
cast(int, tdef.cmd_args.dynamo.prefill_worker.num_nodes) if tdef.cmd_args.dynamo.prefill_worker else 0
136+
)
137+
num_decode_nodes = cast(int, tdef.cmd_args.dynamo.decode_worker.num_nodes)
131138
return (num_frontend_nodes + num_prefill_nodes + num_decode_nodes) * gpus_per_node
132139

133140
def _read_csv_sections(self, source_csv: Path) -> list[list[list[str]]]:

src/cloudai/workloads/ai_dynamo/slurm_command_gen_strategy.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,14 @@ def _gen_script_args(self, td: AIDynamoTestDefinition) -> List[str]:
7777
self._get_toml_args(
7878
td.cmd_args.dynamo,
7979
"--dynamo-",
80-
exclude=["prefill_worker", "decode_worker", "genai_perf", "workspace_path", "decode_cmd"],
80+
exclude=[
81+
"prefill_worker",
82+
"decode_worker",
83+
"genai_perf",
84+
"workspace_path",
85+
"decode_cmd",
86+
"prefill_cmd",
87+
],
8188
)
8289
)
8390

@@ -106,7 +113,8 @@ def _gen_script_args(self, td: AIDynamoTestDefinition) -> List[str]:
106113
]
107114
)
108115

109-
args.extend(self._get_toml_args(td.cmd_args.dynamo.prefill_worker, "--prefill-"))
116+
if td.cmd_args.dynamo.prefill_worker:
117+
args.extend(self._get_toml_args(td.cmd_args.dynamo.prefill_worker, "--prefill-"))
110118
args.extend(self._get_toml_args(td.cmd_args.dynamo.decode_worker, "--decode-"))
111119
args.extend(self._get_toml_args(td.cmd_args.genai_perf, "--genai-perf-"))
112120

@@ -194,9 +202,11 @@ def get_cached_nodes_spec(self) -> tuple[int, list[str]]:
194202
return self._node_spec_cache[cache_key]
195203

196204
td = cast(AIDynamoTestDefinition, self.test_run.test)
197-
prefill_n = td.cmd_args.dynamo.prefill_worker.num_nodes
205+
prefill_n, prefill_nodes = 0, ""
206+
if td.cmd_args.dynamo.prefill_worker:
207+
prefill_n = cast(int, td.cmd_args.dynamo.prefill_worker.num_nodes)
208+
prefill_nodes = td.cmd_args.dynamo.prefill_worker.nodes
198209
decode_n = td.cmd_args.dynamo.decode_worker.num_nodes
199-
prefill_nodes = td.cmd_args.dynamo.prefill_worker.nodes
200210
decode_nodes = td.cmd_args.dynamo.decode_worker.nodes
201211

202212
assert isinstance(prefill_n, int), "prefill_worker.num_nodes must be an integer"

0 commit comments

Comments
 (0)