Skip to content

Commit ee450f3

Browse files
committed
Expand Omni V1 IPC lifecycle tests
1 parent 70ac338 commit ee450f3

1 file changed

Lines changed: 138 additions & 2 deletions

File tree

tests/test_v1_ipc_runtime_dir.py

Lines changed: 138 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33

44
from __future__ import annotations
55

6+
import asyncio
67
import tempfile
78
import unittest
89
from pathlib import Path
910
from unittest.mock import AsyncMock, patch
1011

1112
import pytest
13+
from fastapi import FastAPI
1214

1315
pytest.importorskip("torch")
1416

@@ -24,7 +26,37 @@ def noop_factory():
2426
return None
2527

2628

27-
def _make_config(base_path: str) -> PipelineConfig:
29+
class _FakeControlPlane:
30+
def __init__(self, recv_endpoint: str):
31+
self.recv_endpoint = recv_endpoint
32+
33+
34+
class _FakeStage:
35+
name = "preprocessing"
36+
37+
def __init__(self, recv_endpoint: str):
38+
self.control_plane = _FakeControlPlane(recv_endpoint)
39+
40+
async def run(self) -> None:
41+
await asyncio.Event().wait()
42+
43+
44+
class _FakeCoordinator:
45+
def __init__(self):
46+
self.started = False
47+
self.stopped = False
48+
49+
async def start(self) -> None:
50+
self.started = True
51+
52+
async def run_completion_loop(self) -> None:
53+
await asyncio.Event().wait()
54+
55+
async def stop(self) -> None:
56+
self.stopped = True
57+
58+
59+
def _make_config(base_path: str, *, scheme: str = "ipc") -> PipelineConfig:
2860
return PipelineConfig(
2961
model_path="Qwen/Qwen3-Omni-30B-A3B-Instruct",
3062
entry_stage="preprocessing",
@@ -36,13 +68,31 @@ def _make_config(base_path: str) -> PipelineConfig:
3668
)
3769
],
3870
endpoints=EndpointsConfig(
39-
scheme="ipc",
71+
scheme=scheme,
4072
base_path=base_path,
4173
),
4274
)
4375

4476

4577
class TestV1IpcRuntimeDir(unittest.TestCase):
78+
def test_ipc_runtime_dir_close_is_idempotent(self) -> None:
79+
with tempfile.TemporaryDirectory() as tmp_dir:
80+
config = _make_config(tmp_dir)
81+
runtime_dir = create_ipc_runtime_dir(config)
82+
self.assertIsNotNone(runtime_dir)
83+
runtime_path = runtime_dir.path
84+
85+
runtime_dir.close()
86+
runtime_dir.close()
87+
88+
self.assertFalse(runtime_path.exists())
89+
90+
def test_create_ipc_runtime_dir_returns_none_for_tcp(self) -> None:
91+
with tempfile.TemporaryDirectory() as tmp_dir:
92+
config = _make_config(tmp_dir, scheme="tcp")
93+
94+
self.assertIsNone(create_ipc_runtime_dir(config))
95+
4696
def test_ipc_runtime_dirs_are_unique_for_same_model_name(self) -> None:
4797
with tempfile.TemporaryDirectory() as tmp_dir:
4898
config = _make_config(tmp_dir)
@@ -110,6 +160,27 @@ def test_caller_owned_ipc_dir_is_not_removed_on_compile_failure(self) -> None:
110160
runtime_dir.close()
111161
self.assertFalse(runtime_path.exists())
112162

163+
def test_compile_core_returns_owned_runtime_dir_for_successful_ipc_compile(
164+
self,
165+
) -> None:
166+
with tempfile.TemporaryDirectory() as tmp_dir:
167+
config = _make_config(tmp_dir)
168+
169+
_coordinator, stages, runtime_dir = compile_pipeline_core(config)
170+
self.assertIsNotNone(runtime_dir)
171+
runtime_path = runtime_dir.path
172+
173+
try:
174+
self.assertTrue(runtime_path.exists())
175+
self.assertIn(
176+
str(runtime_path),
177+
stages[0].control_plane.recv_endpoint,
178+
)
179+
finally:
180+
runtime_dir.close()
181+
182+
self.assertFalse(runtime_path.exists())
183+
113184

114185
class TestV1MultiProcessRunnerIpcCleanup(unittest.IsolatedAsyncioTestCase):
115186
async def test_mp_runner_cleans_runtime_dir_on_start_failure(self) -> None:
@@ -127,3 +198,68 @@ async def test_mp_runner_cleans_runtime_dir_on_start_failure(self) -> None:
127198
await runner.start()
128199

129200
self.assertEqual(list(Path(tmp_dir).iterdir()), [])
201+
202+
async def test_mp_runner_cleans_runtime_dir_on_stop(self) -> None:
203+
with tempfile.TemporaryDirectory() as tmp_dir:
204+
config = _make_config(tmp_dir)
205+
from sglang_omni_v1.pipeline.mp_runner import MultiProcessPipelineRunner
206+
207+
runner = MultiProcessPipelineRunner(config)
208+
await runner.start(timeout=30.0)
209+
210+
runtime_path = None
211+
try:
212+
runtime_dirs = [
213+
path for path in Path(tmp_dir).iterdir() if path.is_dir()
214+
]
215+
self.assertEqual(len(runtime_dirs), 1)
216+
runtime_path = runtime_dirs[0]
217+
self.assertTrue(runtime_path.exists())
218+
finally:
219+
await runner.stop()
220+
221+
self.assertIsNotNone(runtime_path)
222+
self.assertFalse(runtime_path.exists())
223+
224+
225+
class TestV1LauncherIpcCleanup(unittest.IsolatedAsyncioTestCase):
226+
async def test_single_process_launcher_cleans_runtime_dir_on_server_exit(
227+
self,
228+
) -> None:
229+
with tempfile.TemporaryDirectory() as tmp_dir:
230+
config = _make_config(tmp_dir)
231+
runtime_dir = create_ipc_runtime_dir(config)
232+
self.assertIsNotNone(runtime_dir)
233+
runtime_path = runtime_dir.path
234+
stage = _FakeStage(f"ipc://{runtime_path}/stage_preprocessing.sock")
235+
coordinator = _FakeCoordinator()
236+
app = FastAPI()
237+
server_serve = AsyncMock(return_value=None)
238+
239+
from sglang_omni_v1.serve.launcher import _run_server
240+
241+
with (
242+
patch(
243+
"sglang_omni_v1.serve.launcher._find_available_port",
244+
return_value=8000,
245+
),
246+
patch(
247+
"sglang_omni_v1.serve.launcher.compile_pipeline_core",
248+
return_value=(coordinator, [stage], runtime_dir),
249+
),
250+
patch(
251+
"sglang_omni_v1.serve.launcher.create_app",
252+
return_value=app,
253+
),
254+
patch(
255+
"sglang_omni_v1.serve.launcher.uvicorn.Server.serve",
256+
new=server_serve,
257+
),
258+
):
259+
await _run_server(config, port=8000)
260+
261+
self.assertTrue(coordinator.started)
262+
self.assertTrue(coordinator.stopped)
263+
server_serve.assert_awaited_once()
264+
self.assertIn("/start_profile", {route.path for route in app.routes})
265+
self.assertFalse(runtime_path.exists())

0 commit comments

Comments
 (0)