Skip to content

Commit cd7ba84

Browse files
alec-flowersclaude
andcommitted
feat: add mocker backend for smoke-testing the full srt-slurm pipeline
Adds a new `mocker` backend type that uses `dynamo.mocker` — a scheduler simulator that validates model paths, loads tokenizer config, and registers with etcd/NATS discovery without loading model weights. This enables fast end-to-end validation of SLURM jobs, container mounts, discovery, frontend routing, and benchmark clients. Usage: set `backend.type: mocker` in any recipe to swap in the mock server. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 835ddb6 commit cd7ba84

7 files changed

Lines changed: 786 additions & 3 deletions

File tree

recipes/mocker/agg.yaml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# Mocker smoke test - Aggregated mode
2+
# Validates full srt-slurm pipeline (SLURM, container, mounts, tokenizer,
3+
# discovery, frontend, benchmark) without loading model weights.
4+
#
5+
# Usage:
6+
# srtctl apply -f recipes/smoke-test/agg.yaml
7+
# srtctl dry-run -f recipes/smoke-test/agg.yaml
8+
9+
name: "smoke-test-agg"
10+
11+
slurm:
12+
time_limit: "00:15:00"
13+
14+
model:
15+
path: "hf:Qwen/Qwen3-0.6B"
16+
container: "dynamo-sglang"
17+
precision: "fp16"
18+
19+
resources:
20+
gpu_type: "gb200"
21+
gpus_per_node: 4
22+
agg_nodes: 1
23+
agg_workers: 1
24+
25+
frontend:
26+
type: dynamo
27+
enable_multiple_frontends: false
28+
29+
backend:
30+
type: mocker
31+
speedup_ratio: 100
32+
engine_type: vllm
33+
34+
benchmark:
35+
type: "sa-bench"
36+
isl: 128
37+
osl: 128
38+
concurrencies: "4"

recipes/mocker/disagg.yaml

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Mocker smoke test - Disaggregated mode
2+
# Validates prefill/decode disaggregation pipeline with bootstrap rendezvous.
3+
#
4+
# Usage:
5+
# srtctl apply -f recipes/smoke-test/disagg.yaml
6+
# srtctl dry-run -f recipes/smoke-test/disagg.yaml
7+
8+
name: "smoke-test-disagg"
9+
10+
slurm:
11+
time_limit: "00:15:00"
12+
13+
model:
14+
path: "hf:Qwen/Qwen3-0.6B"
15+
container: "dynamo-sglang"
16+
precision: "fp16"
17+
18+
resources:
19+
gpu_type: "gb200"
20+
gpus_per_node: 4
21+
prefill_nodes: 1
22+
decode_nodes: 0
23+
prefill_workers: 1
24+
decode_workers: 1
25+
gpus_per_prefill: 1
26+
gpus_per_decode: 1
27+
28+
frontend:
29+
type: dynamo
30+
enable_multiple_frontends: false
31+
32+
backend:
33+
type: mocker
34+
speedup_ratio: 100
35+
engine_type: vllm
36+
37+
benchmark:
38+
type: "sa-bench"
39+
isl: 128
40+
osl: 128
41+
concurrencies: "4"

src/srtctl/backends/__init__.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@
1010
"""
1111

1212
from .base import BackendProtocol, BackendType, SrunConfig
13+
from .mocker import MockerProtocol, MockerServerConfig
1314
from .sglang import SGLangProtocol, SGLangServerConfig
1415
from .trtllm import TRTLLMProtocol, TRTLLMServerConfig
1516
from .vllm import VLLMProtocol, VLLMServerConfig
1617

1718
# Union type for all backend configs
18-
BackendConfig = SGLangProtocol | TRTLLMProtocol | VLLMProtocol
19+
BackendConfig = SGLangProtocol | TRTLLMProtocol | VLLMProtocol | MockerProtocol
1920

2021
__all__ = [
2122
# Base types
@@ -32,4 +33,7 @@
3233
# vLLM
3334
"VLLMProtocol",
3435
"VLLMServerConfig",
36+
# Mocker
37+
"MockerProtocol",
38+
"MockerServerConfig",
3539
]

src/srtctl/backends/base.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ class BackendType(str, Enum):
2222

2323
SGLANG = "sglang"
2424
TRTLLM = "trtllm"
25+
VLLM = "vllm"
26+
MOCKER = "mocker"
2527

2628

2729
@dataclass

src/srtctl/backends/mocker.py

Lines changed: 286 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
"""
5+
Dynamo Mocker backend configuration.
6+
7+
Implements BackendProtocol for the dynamo.mocker scheduler simulator.
8+
Used for smoke-testing the full srt-slurm pipeline (SLURM, mounts,
9+
tokenizer, discovery, frontend, benchmark) without loading model weights.
10+
11+
The mocker validates model paths, reads tokenizer config, registers with
12+
etcd/NATS discovery, simulates scheduling and KV cache management, and
13+
generates random tokens at configurable simulated latency.
14+
"""
15+
16+
import builtins
17+
from collections.abc import Sequence
18+
from dataclasses import field
19+
from pathlib import Path
20+
from typing import (
21+
TYPE_CHECKING,
22+
Any,
23+
ClassVar,
24+
Literal,
25+
)
26+
27+
from marshmallow import Schema
28+
from marshmallow_dataclass import dataclass
29+
30+
if TYPE_CHECKING:
31+
from srtctl.backends.base import SrunConfig
32+
from srtctl.core.runtime import RuntimeContext
33+
from srtctl.core.topology import Endpoint, Process
34+
35+
# Type alias for worker modes
36+
WorkerMode = Literal["prefill", "decode", "agg"]
37+
38+
39+
@dataclass(frozen=True)
40+
class MockerServerConfig:
41+
"""Mocker CLI configuration per mode (prefill/decode/aggregated).
42+
43+
Each mode can have its own configuration dict that gets converted
44+
to CLI flags when starting the mocker. Use for per-mode overrides
45+
of mocker-specific parameters.
46+
"""
47+
48+
prefill: dict[str, Any] | None = None
49+
decode: dict[str, Any] | None = None
50+
aggregated: dict[str, Any] | None = None
51+
52+
Schema: ClassVar[type[Schema]] = Schema
53+
54+
55+
@dataclass(frozen=True)
56+
class MockerProtocol:
57+
"""Dynamo Mocker protocol - implements BackendProtocol.
58+
59+
This frozen dataclass both holds configuration AND implements the
60+
BackendProtocol methods for process allocation and launching.
61+
62+
The mocker is a drop-in replacement for real inference backends
63+
(sglang, vllm, trtllm) that simulates scheduling without loading
64+
model weights. It validates model paths, loads tokenizer config,
65+
and registers with etcd/NATS discovery identically to real workers.
66+
67+
Example YAML:
68+
backend:
69+
type: mocker
70+
speedup_ratio: 100
71+
engine_type: vllm
72+
73+
Or with per-mode overrides:
74+
backend:
75+
type: mocker
76+
speedup_ratio: 100
77+
engine_type: sglang
78+
mocker_config:
79+
prefill:
80+
max-num-seqs: 512
81+
decode:
82+
max-num-seqs: 128
83+
"""
84+
85+
type: Literal["mocker"] = "mocker"
86+
87+
# Simulation parameters
88+
engine_type: str = "vllm"
89+
speedup_ratio: float = 100.0
90+
decode_speedup_ratio: float = 1.0
91+
num_gpu_blocks_override: int = 16384
92+
max_num_seqs: int = 256
93+
max_num_batched_tokens: int = 8192
94+
block_size: int | None = None
95+
data_parallel_size: int = 1
96+
num_workers: int = 1
97+
startup_time: float | None = None
98+
kv_transfer_bandwidth: float | None = None
99+
kv_cache_dtype: str | None = None
100+
enable_prefix_caching: bool = True
101+
enable_chunked_prefill: bool = True
102+
preemption_mode: str | None = None
103+
104+
# Environment variables per mode
105+
prefill_environment: dict[str, str] = field(default_factory=dict)
106+
decode_environment: dict[str, str] = field(default_factory=dict)
107+
aggregated_environment: dict[str, str] = field(default_factory=dict)
108+
109+
# Per-mode CLI overrides
110+
mocker_config: MockerServerConfig | None = None
111+
112+
Schema: ClassVar[builtins.type[Schema]] = Schema
113+
114+
# =========================================================================
115+
# BackendProtocol Implementation
116+
# =========================================================================
117+
118+
def get_srun_config(self) -> "SrunConfig":
119+
"""Mocker uses per-process launching (one srun per node)."""
120+
from srtctl.backends.base import SrunConfig
121+
122+
return SrunConfig(mpi=None, oversubscribe=False, launch_per_endpoint=False)
123+
124+
def get_config_for_mode(self, mode: WorkerMode) -> dict[str, Any]:
125+
"""Get merged config dict for a worker mode."""
126+
if not self.mocker_config:
127+
return {}
128+
129+
if mode == "prefill":
130+
return dict(self.mocker_config.prefill or {})
131+
elif mode == "decode":
132+
return dict(self.mocker_config.decode or {})
133+
elif mode == "agg":
134+
return dict(self.mocker_config.aggregated or {})
135+
return {}
136+
137+
def get_environment_for_mode(self, mode: WorkerMode) -> dict[str, str]:
138+
"""Get environment variables for a worker mode."""
139+
if mode == "prefill":
140+
return dict(self.prefill_environment)
141+
elif mode == "decode":
142+
return dict(self.decode_environment)
143+
elif mode == "agg":
144+
return dict(self.aggregated_environment)
145+
return {}
146+
147+
def get_process_environment(self, process: "Process") -> dict[str, str]:
148+
"""Get process-specific environment variables.
149+
150+
The mocker does not need per-process env vars (no NIXL ports, etc.).
151+
"""
152+
return {}
153+
154+
def get_served_model_name(self, default: str) -> str:
155+
"""Get served model name — mocker uses default (model path basename)."""
156+
return default
157+
158+
def allocate_endpoints(
159+
self,
160+
num_prefill: int,
161+
num_decode: int,
162+
num_agg: int,
163+
gpus_per_prefill: int,
164+
gpus_per_decode: int,
165+
gpus_per_agg: int,
166+
gpus_per_node: int,
167+
available_nodes: Sequence[str],
168+
) -> list["Endpoint"]:
169+
"""Allocate endpoints to nodes."""
170+
from srtctl.core.topology import allocate_endpoints
171+
172+
return allocate_endpoints(
173+
num_prefill=num_prefill,
174+
num_decode=num_decode,
175+
num_agg=num_agg,
176+
gpus_per_prefill=gpus_per_prefill,
177+
gpus_per_decode=gpus_per_decode,
178+
gpus_per_agg=gpus_per_agg,
179+
gpus_per_node=gpus_per_node,
180+
available_nodes=available_nodes,
181+
)
182+
183+
def endpoints_to_processes(
184+
self,
185+
endpoints: list["Endpoint"],
186+
base_sys_port: int = 8081,
187+
) -> list["Process"]:
188+
"""Convert endpoints to processes."""
189+
from srtctl.core.topology import endpoints_to_processes
190+
191+
return endpoints_to_processes(endpoints, base_sys_port=base_sys_port)
192+
193+
def build_worker_command(
194+
self,
195+
process: "Process",
196+
endpoint_processes: list["Process"],
197+
runtime: "RuntimeContext",
198+
frontend_type: str = "dynamo",
199+
nsys_prefix: list[str] | None = None,
200+
dump_config_path: Path | None = None,
201+
) -> list[str]:
202+
"""Build the command to start a mocker worker process.
203+
204+
Args:
205+
process: The process to start
206+
endpoint_processes: All processes for this endpoint (for multi-node)
207+
runtime: Runtime context with paths and settings
208+
frontend_type: Frontend type (mocker always uses dynamo discovery)
209+
nsys_prefix: Optional nsys profiling command prefix
210+
dump_config_path: Unused (mocker has no config dump)
211+
"""
212+
mode = process.endpoint_mode
213+
config = self.get_config_for_mode(mode)
214+
215+
# Determine model path: HF model ID or container mount path
216+
model_arg = str(runtime.model_path) if runtime.is_hf_model else "/model"
217+
218+
# Start with nsys prefix if provided
219+
cmd: list[str] = list(nsys_prefix) if nsys_prefix else []
220+
221+
cmd.extend(
222+
[
223+
"python3",
224+
"-m",
225+
"dynamo.mocker",
226+
"--model-path",
227+
model_arg,
228+
]
229+
)
230+
231+
# Disaggregation mode for prefill/decode workers
232+
if mode != "agg":
233+
cmd.extend(["--disaggregation-mode", mode])
234+
235+
# Bootstrap port for prefill workers (disaggregated serving rendezvous)
236+
if mode == "prefill" and process.bootstrap_port is not None:
237+
cmd.extend(["--bootstrap-ports", str(process.bootstrap_port)])
238+
239+
# Core simulation parameters (always emitted)
240+
cmd.extend(["--engine-type", self.engine_type])
241+
cmd.extend(["--speedup-ratio", str(self.speedup_ratio)])
242+
cmd.extend(["--data-parallel-size", str(self.data_parallel_size)])
243+
cmd.extend(["--num-gpu-blocks-override", str(self.num_gpu_blocks_override)])
244+
cmd.extend(["--max-num-seqs", str(self.max_num_seqs)])
245+
cmd.extend(["--max-num-batched-tokens", str(self.max_num_batched_tokens)])
246+
247+
# Optional parameters (only emitted when non-default)
248+
if self.decode_speedup_ratio != 1.0:
249+
cmd.extend(["--decode-speedup-ratio", str(self.decode_speedup_ratio)])
250+
if self.block_size is not None:
251+
cmd.extend(["--block-size", str(self.block_size)])
252+
if self.num_workers > 1:
253+
cmd.extend(["--num-workers", str(self.num_workers)])
254+
if self.startup_time is not None:
255+
cmd.extend(["--startup-time", str(self.startup_time)])
256+
if self.kv_transfer_bandwidth is not None:
257+
cmd.extend(["--kv-transfer-bandwidth", str(self.kv_transfer_bandwidth)])
258+
if self.kv_cache_dtype is not None:
259+
cmd.extend(["--kv-cache-dtype", self.kv_cache_dtype])
260+
if not self.enable_prefix_caching:
261+
cmd.append("--no-enable-prefix-caching")
262+
if not self.enable_chunked_prefill:
263+
cmd.append("--no-enable-chunked-prefill")
264+
if self.preemption_mode is not None:
265+
cmd.extend(["--preemption-mode", self.preemption_mode])
266+
267+
# Per-mode config overrides from mocker_config
268+
cmd.extend(_config_to_cli_args(config))
269+
270+
return cmd
271+
272+
273+
def _config_to_cli_args(config: dict[str, Any]) -> list[str]:
274+
"""Convert config dict to CLI arguments."""
275+
args: list[str] = []
276+
for key, value in sorted(config.items()):
277+
flag_name = key.replace("_", "-")
278+
if isinstance(value, bool):
279+
if value:
280+
args.append(f"--{flag_name}")
281+
elif isinstance(value, list):
282+
args.append(f"--{flag_name}")
283+
args.extend(str(v) for v in value)
284+
elif value is not None:
285+
args.extend([f"--{flag_name}", str(value)])
286+
return args

0 commit comments

Comments
 (0)