Skip to content

Commit 8f063d6

Browse files
committed
Isolate Omni V1 IPC runtime directories
1 parent c430eec commit 8f063d6

6 files changed

Lines changed: 338 additions & 48 deletions

File tree

sglang_omni_router/serve.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ def build_parser() -> argparse.ArgumentParser:
4343
def main(argv: Sequence[str] | None = None) -> None:
4444
args = build_parser().parse_args(argv)
4545
logging.basicConfig(level=getattr(logging, args.log_level.upper(), logging.INFO))
46+
logging.getLogger("httpx").setLevel(logging.WARNING)
47+
logging.getLogger("httpcore").setLevel(logging.WARNING)
4648
config = build_router_config(
4749
worker_urls=args.worker_urls,
4850
host=args.host,

sglang_omni_v1/config/__init__.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# SPDX-License-Identifier: Apache-2.0
2-
from sglang_omni_v1.config.compiler import compile_pipeline
2+
from sglang_omni_v1.config.compiler import (
3+
IpcRuntimeDir,
4+
compile_pipeline,
5+
compile_pipeline_core,
6+
create_ipc_runtime_dir,
7+
prepare_pipeline_runtime,
8+
)
39
from sglang_omni_v1.config.schema import (
410
EndpointsConfig,
511
PipelineConfig,
@@ -8,7 +14,11 @@
814
)
915

1016
__all__ = [
17+
"IpcRuntimeDir",
1118
"compile_pipeline",
19+
"compile_pipeline_core",
20+
"create_ipc_runtime_dir",
21+
"prepare_pipeline_runtime",
1222
"PipelineConfig",
1323
"StageConfig",
1424
"RelayConfig",

sglang_omni_v1/config/compiler.py

Lines changed: 133 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,11 @@
44
from __future__ import annotations
55

66
import inspect
7+
import logging
8+
import re
9+
import shutil
710
import socket
11+
import tempfile
812
from pathlib import Path
913
from typing import Any
1014

@@ -14,43 +18,144 @@
1418
from sglang_omni_v1.pipeline.stage.input import InputHandler
1519
from sglang_omni_v1.utils import import_string
1620

21+
logger = logging.getLogger(__name__)
1722

18-
def compile_pipeline(config: PipelineConfig) -> tuple[Coordinator, list[Stage]]:
19-
"""Build the coordinator and stage objects from the pipeline configuration."""
20-
stages_cfg, name_map, entry_stage = config.apply_fusion()
21-
endpoints = _allocate_endpoints(config, stages=stages_cfg)
2223

23-
coordinator = Coordinator(
24-
completion_endpoint=endpoints["completion"],
25-
abort_endpoint=endpoints["abort"],
26-
entry_stage=entry_stage,
27-
terminal_stages=config.terminal_stages or None,
28-
)
24+
class IpcRuntimeDir:
25+
"""Runtime-owned IPC directory for one pipeline instance."""
26+
27+
def __init__(self, path: Path):
28+
self.path = path
29+
self._closed = False
2930

30-
stage_endpoints = {s.name: endpoints[f"stage_{s.name}"] for s in stages_cfg}
31+
def __enter__(self) -> IpcRuntimeDir:
32+
return self
3133

32-
stages: list[Stage] = []
33-
for stage_cfg in stages_cfg:
34-
stage = _compile_stage(
35-
stage_cfg, config, stage_endpoints, endpoints, name_map=name_map
34+
def __exit__(self, exc_type, exc, tb) -> None:
35+
self.close()
36+
37+
def close(self) -> None:
38+
if self._closed:
39+
return
40+
self._closed = True
41+
try:
42+
shutil.rmtree(self.path)
43+
except FileNotFoundError:
44+
return
45+
except OSError as exc:
46+
logger.warning("Failed to remove IPC runtime dir %s: %s", self.path, exc)
47+
48+
49+
def create_ipc_runtime_dir(config: PipelineConfig) -> IpcRuntimeDir | None:
50+
"""Create a per-run IPC namespace for one pipeline instance."""
51+
if config.endpoints.scheme != "ipc":
52+
return None
53+
54+
base_root = Path(config.endpoints.base_path)
55+
base_root.mkdir(parents=True, exist_ok=True)
56+
57+
namespace_prefix = re.sub(r"[^0-9a-z]+", "-", config.name.lower()).strip("-")
58+
if not namespace_prefix:
59+
namespace_prefix = "pipeline"
60+
path = Path(tempfile.mkdtemp(prefix=f"{namespace_prefix}-", dir=base_root))
61+
return IpcRuntimeDir(path)
62+
63+
64+
def prepare_pipeline_runtime(
65+
config: PipelineConfig,
66+
*,
67+
ipc_runtime_dir: IpcRuntimeDir | None = None,
68+
) -> tuple[
69+
list[StageConfig], dict[str, str], str, dict[str, str], IpcRuntimeDir | None, bool
70+
]:
71+
"""Prepare fused stages and endpoint allocation for one runtime."""
72+
runtime_dir = ipc_runtime_dir
73+
created_runtime_dir = None
74+
if runtime_dir is None:
75+
runtime_dir = create_ipc_runtime_dir(config)
76+
created_runtime_dir = runtime_dir
77+
owns_runtime_dir = created_runtime_dir is not None
78+
79+
try:
80+
stages_cfg, name_map, entry_stage = config.apply_fusion()
81+
endpoints = _allocate_endpoints(
82+
config,
83+
stages=stages_cfg,
84+
ipc_base_dir=runtime_dir.path if runtime_dir else None,
3685
)
37-
coordinator.register_stage(stage.name, stage.control_plane.recv_endpoint)
38-
stages.append(stage)
39-
40-
# Wire streaming targets
41-
stage_map = {stage.name: stage for stage in stages}
42-
cfg_map = {s.name: s for s in stages_cfg}
43-
for stage_cfg in stages_cfg:
44-
stage = stage_map.get(stage_cfg.name)
45-
if stage is not None:
86+
except Exception:
87+
if created_runtime_dir is not None:
88+
created_runtime_dir.close()
89+
raise
90+
91+
return stages_cfg, name_map, entry_stage, endpoints, runtime_dir, owns_runtime_dir
92+
93+
94+
def compile_pipeline_core(
95+
config: PipelineConfig,
96+
*,
97+
ipc_runtime_dir: IpcRuntimeDir | None = None,
98+
) -> tuple[Coordinator, list[Stage], IpcRuntimeDir | None]:
99+
"""Build the coordinator and stage objects from the pipeline configuration."""
100+
stages_cfg, name_map, entry_stage, endpoints, runtime_dir, owns_runtime_dir = (
101+
prepare_pipeline_runtime(
102+
config,
103+
ipc_runtime_dir=ipc_runtime_dir,
104+
)
105+
)
106+
107+
try:
108+
coordinator = Coordinator(
109+
completion_endpoint=endpoints["completion"],
110+
abort_endpoint=endpoints["abort"],
111+
entry_stage=entry_stage,
112+
terminal_stages=config.terminal_stages or None,
113+
)
114+
115+
stage_endpoints = {s.name: endpoints[f"stage_{s.name}"] for s in stages_cfg}
116+
117+
stages: list[Stage] = []
118+
for stage_cfg in stages_cfg:
119+
stage = _compile_stage(
120+
stage_cfg, config, stage_endpoints, endpoints, name_map=name_map
121+
)
122+
coordinator.register_stage(stage.name, stage.control_plane.recv_endpoint)
123+
stages.append(stage)
124+
125+
stage_map = {stage.name: stage for stage in stages}
126+
cfg_map = {s.name: s for s in stages_cfg}
127+
for stage_cfg in stages_cfg:
128+
stage = stage_map.get(stage_cfg.name)
129+
if stage is None:
130+
continue
46131
_wire_stream_targets(
47132
stage,
48133
stage_cfg,
49134
stage_map,
50135
gpu_placement=config.gpu_placement,
51136
cfg_map=cfg_map,
52137
)
138+
except Exception:
139+
if owns_runtime_dir and runtime_dir is not None:
140+
runtime_dir.close()
141+
raise
142+
143+
return coordinator, stages, runtime_dir
144+
145+
146+
def compile_pipeline(config: PipelineConfig) -> tuple[Coordinator, list[Stage]]:
147+
"""Build coordinator and stages directly from a pipeline config.
148+
149+
IPC pipelines need explicit runtime-directory ownership so multiple
150+
replicas cannot bind the same local sockets.
151+
"""
152+
if config.endpoints.scheme == "ipc":
153+
raise ValueError(
154+
"compile_pipeline() does not manage IPC runtime-dir ownership. "
155+
"Use compile_pipeline_core(...) or MultiProcessPipelineRunner."
156+
)
53157

158+
coordinator, stages, _ = compile_pipeline_core(config)
54159
return coordinator, stages
55160

56161

@@ -252,6 +357,7 @@ def _allocate_endpoints(
252357
config: PipelineConfig,
253358
*,
254359
stages: list[StageConfig],
360+
ipc_base_dir: Path | None = None,
255361
) -> dict[str, str]:
256362
endpoints: dict[str, str] = {}
257363

@@ -261,8 +367,9 @@ def _allocate_endpoints(
261367
endpoints["abort"] = config.abort_endpoint
262368

263369
if config.endpoints.scheme == "ipc":
264-
base_dir = Path(config.endpoints.base_path) / config.name
265-
base_dir.mkdir(parents=True, exist_ok=True)
370+
if ipc_base_dir is None:
371+
raise ValueError("IPC endpoint allocation requires an IPC runtime dir")
372+
base_dir = ipc_base_dir
266373
endpoints.setdefault("completion", f"ipc://{base_dir}/completion.sock")
267374
endpoints.setdefault("abort", f"ipc://{base_dir}/abort.sock")
268375
for s in stages:

sglang_omni_v1/pipeline/mp_runner.py

Lines changed: 38 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616
from typing import Any
1717

1818
from sglang_omni_v1.config.compiler import (
19-
_allocate_endpoints,
19+
IpcRuntimeDir,
2020
_build_relay_config,
2121
_detect_same_gpu_targets,
2222
_resolve_factory_args,
23+
prepare_pipeline_runtime,
2324
)
2425
from sglang_omni_v1.config.schema import PipelineConfig, StageConfig
2526
from sglang_omni_v1.pipeline import Coordinator
@@ -32,6 +33,10 @@
3233
def _build_stage_groups(
3334
config: PipelineConfig,
3435
ctx: multiprocessing.context.BaseContext | None = None,
36+
*,
37+
stages_cfg: list[StageConfig] | None = None,
38+
name_map: dict[str, str] | None = None,
39+
endpoints: dict[str, str] | None = None,
3540
) -> list[StageGroup]:
3641
"""Compile *config* into one :class:`StageGroup` per logical stage.
3742
@@ -41,8 +46,10 @@ def _build_stage_groups(
4146
if ctx is None:
4247
ctx = multiprocessing.get_context("spawn")
4348

44-
stages_cfg, name_map, _ = config.apply_fusion()
45-
endpoints = _allocate_endpoints(config, stages=stages_cfg)
49+
if stages_cfg is None or name_map is None or endpoints is None:
50+
if config.endpoints.scheme == "ipc":
51+
raise ValueError("_build_stage_groups requires prepared IPC endpoints")
52+
stages_cfg, name_map, _, endpoints, _, _ = prepare_pipeline_runtime(config)
4653
stage_endpoints = {s.name: endpoints[f"stage_{s.name}"] for s in stages_cfg}
4754
cfg_map = {s.name: s for s in stages_cfg}
4855

@@ -264,6 +271,7 @@ class MultiProcessPipelineRunner:
264271
def __init__(self, config: PipelineConfig):
265272
self._config = config
266273
self._coordinator: Coordinator | None = None
274+
self._ipc_runtime_dir: IpcRuntimeDir | None = None
267275
self._groups: list[StageGroup] = []
268276
self._completion_task: asyncio.Task | None = None
269277
self._monitor_task: asyncio.Task | None = None
@@ -281,10 +289,24 @@ async def start(self, timeout: float = 120.0) -> None:
281289

282290
try:
283291
ctx = multiprocessing.get_context("spawn")
284-
groups = _build_stage_groups(self._config, ctx)
285-
286-
stages_cfg, _, entry_stage = self._config.apply_fusion()
287-
endpoints = _allocate_endpoints(self._config, stages=stages_cfg)
292+
(
293+
stages_cfg,
294+
name_map,
295+
entry_stage,
296+
endpoints,
297+
self._ipc_runtime_dir,
298+
_,
299+
) = prepare_pipeline_runtime(
300+
self._config,
301+
ipc_runtime_dir=self._ipc_runtime_dir,
302+
)
303+
groups = _build_stage_groups(
304+
self._config,
305+
ctx,
306+
stages_cfg=stages_cfg,
307+
name_map=name_map,
308+
endpoints=endpoints,
309+
)
288310

289311
self._coordinator = Coordinator(
290312
completion_endpoint=endpoints["completion"],
@@ -373,6 +395,11 @@ async def stop(self) -> None:
373395

374396
await self._coordinator.stop()
375397
self._groups.clear()
398+
self._coordinator = None
399+
400+
if self._ipc_runtime_dir is not None:
401+
self._ipc_runtime_dir.close()
402+
self._ipc_runtime_dir = None
376403

377404
async def _cleanup_on_failure(self) -> None:
378405
"""Best-effort cleanup after a failed start()."""
@@ -401,3 +428,7 @@ async def _cleanup_on_failure(self) -> None:
401428
except Exception:
402429
pass
403430
self._coordinator = None
431+
432+
if self._ipc_runtime_dir is not None:
433+
self._ipc_runtime_dir.close()
434+
self._ipc_runtime_dir = None

sglang_omni_v1/serve/launcher.py

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,15 @@
2929
import os
3030
import socket
3131
import time
32+
from contextlib import suppress
3233
from typing import Any
3334

3435
import uvicorn
3536
from fastapi import APIRouter
3637
from pydantic import BaseModel
3738

3839
from sglang_omni_v1.client import Client
39-
from sglang_omni_v1.config import PipelineConfig, compile_pipeline
40+
from sglang_omni_v1.config import PipelineConfig, compile_pipeline_core
4041
from sglang_omni_v1.profiler.profiler_control import ProfilerControlClient
4142
from sglang_omni_v1.serve.openai_api import create_app
4243

@@ -189,20 +190,21 @@ async def _run_server(
189190
await mp_runner.stop()
190191
logger.info("Pipeline stopped.")
191192
else:
192-
coordinator, stages = compile_pipeline(pipeline_config)
193+
coordinator, stages, runtime_dir = compile_pipeline_core(pipeline_config)
193194
stage_endpoints = _collect_stage_control_endpoints(stages)
194-
195-
# Start coordinator + all stages as async tasks
196-
await coordinator.start()
197-
completion_task = asyncio.create_task(coordinator.run_completion_loop())
198-
stage_tasks = [asyncio.create_task(s.run()) for s in stages]
199-
logger.info(
200-
"Pipeline '%s' started (%d stages)",
201-
pipeline_config.name,
202-
len(stages),
203-
)
195+
completion_task = None
196+
stage_tasks = []
204197

205198
try:
199+
await coordinator.start()
200+
completion_task = asyncio.create_task(coordinator.run_completion_loop())
201+
stage_tasks = [asyncio.create_task(s.run()) for s in stages]
202+
logger.info(
203+
"Pipeline '%s' started (%d stages)",
204+
pipeline_config.name,
205+
len(stages),
206+
)
207+
206208
cl_kwargs = client_kwargs or {}
207209
client = Client(coordinator, **cl_kwargs)
208210
app = create_app(
@@ -227,8 +229,17 @@ async def _run_server(
227229
logger.info("Shutting down pipeline …")
228230
for t in stage_tasks:
229231
t.cancel()
230-
completion_task.cancel()
231-
await coordinator.stop()
232+
if completion_task is not None:
233+
completion_task.cancel()
234+
with suppress(asyncio.CancelledError):
235+
await completion_task
236+
if stage_tasks:
237+
await asyncio.gather(*stage_tasks, return_exceptions=True)
238+
try:
239+
await coordinator.stop()
240+
finally:
241+
if runtime_dir is not None:
242+
runtime_dir.close()
232243
logger.info("Pipeline stopped.")
233244

234245

0 commit comments

Comments
 (0)