Skip to content

Commit 426cd1e

Browse files
Merge pull request #18 from dreamyang-liu/feat/launcher
Add launcher to allow launching worker from router
2 parents 19ed1fb + b7052a2 commit 426cd1e

File tree

14 files changed

+900
-21
lines changed

14 files changed

+900
-21
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ repos:
1717
rev: 5.13.2
1818
hooks:
1919
- id: isort
20+
args: ["--profile", "black"]
2021

2122
- repo: https://github.com/astral-sh/ruff-pre-commit
2223
rev: v0.11.7

README.md

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,30 @@ curl -X POST http://localhost:30081/update_weights_from_disk \
158158
-d '{"model_path": "Qwen/Qwen-Image-2512"}'
159159
```
160160

161+
### Auto-launch workers via YAML config
162+
163+
Instead of starting workers manually, you can let the router spawn and manage
164+
them through a launcher backend.
165+
166+
**Local subprocess launcher** (`examples/local_launcher.yaml`):
167+
168+
```bash
169+
sglang-d-router --port 30081 --launcher-config examples/local_launcher.yaml
170+
```
171+
172+
```yaml
173+
launcher:
174+
backend: local
175+
model: Qwen/Qwen-Image
176+
num_workers: 2
177+
num_gpus_per_worker: 1
178+
worker_base_port: 10090
179+
wait_timeout: 600
180+
```
181+
182+
Fields not set in the YAML fall back to defaults defined in each backend's
183+
config dataclass (see `LocalLauncherConfig`).
184+
161185
## Acknowledgment
162186

163187
This project is derived from [radixark/miles#544](https://github.com/radixark/miles/pull/544). Thanks to the original authors.

development.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
pip install -e .
77
```
88

9-
Run tests:
9+
Run CPU only tests:
1010

1111
```bash
1212
pip install pytest

examples/local_launcher.yaml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
launcher:
2+
model: Qwen/Qwen-Image
3+
4+
num_workers: 8
5+
num_gpus_per_worker: 1
6+
worker_host: "127.0.0.1"
7+
worker_base_port: 10090
8+
9+
worker_extra_args: "--dit-cpu-offload false --text-encoder-cpu-offload false"
10+
11+
wait_timeout: 600

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ license = { text = "MIT" }
1212
dependencies = [
1313
"fastapi>=0.110",
1414
"httpx>=0.27",
15+
"omegaconf>=2.3",
1516
"uvicorn>=0.30",
1617
]
1718
classifiers = [

src/sglang_diffusion_routing/cli/main.py

Lines changed: 47 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@
66
import argparse
77
import asyncio
88
import sys
9+
import threading
910

1011
from sglang_diffusion_routing import DiffusionRouter
12+
from sglang_diffusion_routing.launcher import config as _lcfg
1113

1214

1315
def _run_router_server(
1416
args: argparse.Namespace,
15-
worker_urls: list[str] | None = None,
17+
router: DiffusionRouter,
1618
log_prefix: str = "[router]",
1719
) -> None:
1820
try:
@@ -22,10 +24,7 @@ def _run_router_server(
2224
"uvicorn is required to run router. Install with: pip install uvicorn"
2325
) from exc
2426

25-
worker_urls = list(
26-
worker_urls if worker_urls is not None else args.worker_urls or []
27-
)
28-
router = DiffusionRouter(args, verbose=args.verbose)
27+
worker_urls = list(args.worker_urls or [])
2928
refresh_tasks = []
3029
for url in worker_urls:
3130
normalized_url = router.normalize_worker_url(url)
@@ -97,13 +96,52 @@ def _add_router_args(parser: argparse.ArgumentParser) -> None:
9796
parser.add_argument(
9897
"--log-level", type=str, default="info", help="Uvicorn log level."
9998
)
99+
parser.add_argument(
100+
"--launcher-config",
101+
type=str,
102+
default=None,
103+
dest="launcher_config",
104+
help="YAML config for launching router managed workers (see examples/local_launcher.yaml).",
105+
)
100106

101107

102108
def _handle_router(args: argparse.Namespace) -> int:
103-
_run_router_server(
104-
args, worker_urls=list(args.worker_urls), log_prefix="[sglang-d-router]"
105-
)
106-
return 0
109+
log_prefix, backend, router = "[sglang-d-router]", None, None
110+
111+
try:
112+
router = DiffusionRouter(args, verbose=args.verbose)
113+
if args.launcher_config is not None:
114+
launcher_cfg = _lcfg.load_launcher_config(args.launcher_config)
115+
wait_timeout = launcher_cfg.wait_timeout
116+
backend = _lcfg.create_backend(launcher_cfg)
117+
backend.launch()
118+
threading.Thread(
119+
target=backend.wait_ready_and_register,
120+
kwargs=dict(
121+
register_func=router.register_worker,
122+
timeout=wait_timeout,
123+
log_prefix=log_prefix,
124+
),
125+
daemon=True,
126+
).start()
127+
128+
_run_router_server(args, router=router, log_prefix=log_prefix)
129+
return 0
130+
finally:
131+
# TODO (mengyang, shuwen, chenyang): refactor the exit logic of router and backend.
132+
if router is not None:
133+
try:
134+
asyncio.run(router.client.aclose())
135+
except Exception as exc:
136+
print(
137+
f"{log_prefix} warning: failed to close router client: {exc}",
138+
file=sys.stderr,
139+
flush=True,
140+
)
141+
if backend is not None:
142+
print(f"{log_prefix} shutting down managed workers...", flush=True)
143+
backend.shutdown()
144+
print(f"{log_prefix} all managed workers terminated.", flush=True)
107145

108146

109147
def build_parser() -> argparse.ArgumentParser:
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
"""Launcher backends for spinning up SGLang diffusion workers.
2+
3+
Right now only supports local backend, which launches workers as local subprocesses.
4+
We leave this module for future extensions on slurm or kubernetes.
5+
"""
6+
7+
from sglang_diffusion_routing.launcher.backend import (
8+
LaunchedWorker,
9+
LauncherBackend,
10+
WorkerLaunchResult,
11+
)
12+
from sglang_diffusion_routing.launcher.config import (
13+
create_backend,
14+
load_launcher_config,
15+
)
16+
from sglang_diffusion_routing.launcher.local import LocalLauncher, LocalLauncherConfig
17+
18+
__all__ = [
19+
"LaunchedWorker",
20+
"LauncherBackend",
21+
"LocalLauncher",
22+
"LocalLauncherConfig",
23+
"WorkerLaunchResult",
24+
"create_backend",
25+
"load_launcher_config",
26+
]
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
"""Abstract base class and shared data types for launcher backends."""
2+
3+
from __future__ import annotations
4+
5+
import subprocess
6+
from abc import ABC, abstractmethod
7+
from collections.abc import Callable
8+
from dataclasses import dataclass, field
9+
10+
11+
@dataclass
12+
class LaunchedWorker:
13+
"""A worker managed by a launcher backend."""
14+
15+
url: str
16+
process: subprocess.Popen
17+
18+
19+
@dataclass
20+
class WorkerLaunchResult:
21+
"""Aggregated result of launching worker subprocesses."""
22+
23+
workers: list[LaunchedWorker] = field(default_factory=list)
24+
all_processes: list[subprocess.Popen] = field(default_factory=list)
25+
26+
@property
27+
def urls(self) -> list[str]:
28+
return [w.url for w in self.workers]
29+
30+
31+
class LauncherBackend(ABC):
32+
"""Interface for launching and managing SGLang diffusion workers.
33+
34+
Each backend exposes the same lifecycle:
35+
launch → wait_ready_and_register → shutdown.
36+
"""
37+
38+
@abstractmethod
39+
def launch(self) -> list[str]:
40+
"""Launch workers and return their base URLs."""
41+
42+
@abstractmethod
43+
def wait_ready_and_register(
44+
self,
45+
register_func: Callable[[str], None],
46+
timeout: int,
47+
log_prefix: str = "[launcher]",
48+
) -> None:
49+
"""Wait for workers to become healthy and register each via register_func."""
50+
51+
@abstractmethod
52+
def shutdown(self) -> None:
53+
"""Clean up all managed workers."""
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
"""YAML configuration loading and backend factory."""
2+
3+
from __future__ import annotations
4+
5+
from pathlib import Path
6+
7+
import yaml
8+
from omegaconf import DictConfig, OmegaConf
9+
10+
from sglang_diffusion_routing.launcher.backend import LauncherBackend
11+
from sglang_diffusion_routing.launcher.local import LocalLauncher, LocalLauncherConfig
12+
13+
SCHEMA_REGISTRY: dict[str, type] = {
14+
"local": LocalLauncherConfig,
15+
}
16+
17+
BACKEND_REGISTRY: dict[str, type[LauncherBackend]] = {
18+
"local": LocalLauncher,
19+
}
20+
21+
22+
def load_launcher_config(config_path: str) -> DictConfig:
23+
"""Read a YAML config file and return a validated OmegaConf config.
24+
25+
1. Parse the YAML and extract the launcher mapping.
26+
2. Read the backend key to select the structured schema.
27+
3. Merge the YAML values onto the schema defaults.
28+
"""
29+
path = Path(config_path)
30+
if not path.is_file():
31+
raise FileNotFoundError(f"Config file not found: {config_path}")
32+
33+
with path.open() as f:
34+
raw = yaml.safe_load(f)
35+
36+
if not isinstance(raw, dict) or "launcher" not in raw:
37+
raise ValueError(
38+
f"Config file must contain a top-level 'launcher' key: {config_path}"
39+
)
40+
41+
launcher_raw = raw["launcher"]
42+
if not isinstance(launcher_raw, dict):
43+
raise ValueError("'launcher' must be a dictionary")
44+
45+
backend_name = launcher_raw.get("backend", "local")
46+
schema_cls = SCHEMA_REGISTRY.get(backend_name)
47+
if schema_cls is None:
48+
available = ", ".join(sorted(SCHEMA_REGISTRY))
49+
raise ValueError(
50+
f"Unknown launcher backend: {backend_name!r}. "
51+
f"Available backends: {available}"
52+
)
53+
54+
schema = OmegaConf.structured(schema_cls)
55+
yaml_cfg = OmegaConf.create(launcher_raw)
56+
merged = OmegaConf.merge(schema, yaml_cfg)
57+
return merged
58+
59+
60+
def create_backend(config: DictConfig) -> LauncherBackend:
61+
"""Instantiate a LauncherBackend from a validated config."""
62+
backend_name = config.backend
63+
cls = BACKEND_REGISTRY.get(backend_name)
64+
if cls is None:
65+
available = ", ".join(sorted(BACKEND_REGISTRY))
66+
raise ValueError(
67+
f"Unknown launcher backend: {backend_name!r}. "
68+
f"Available backends: {available}"
69+
)
70+
return cls(config)

0 commit comments

Comments
 (0)