Skip to content

Commit dd17638

Browse files
authored
Add scripted-runtime harness core and wire scheduler/IPC hooks (#27411)
1 parent 21201ef commit dd17638

24 files changed

Lines changed: 1313 additions & 4 deletions

.claude/rules/modify-component-must-read.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ Before modifying the following components, read the listed skill first.
55
- **Speculative decoding code** (anything under `python/sglang/srt/speculative/`, related attention backends, scheduler accumulators, IPC fields, observability metrics, or CLI flags) → [`speculative-naming`](../skills/speculative-naming/SKILL.md)
66
- **`Scheduler` / `TokenizerManager` / `ModelRunner` `__init__`** (`python/sglang/srt/managers/scheduler.py`, `python/sglang/srt/managers/tokenizer_manager.py`, `python/sglang/srt/model_executor/model_runner.py`) → [`large-class-init-style`](../skills/large-class-init-style/SKILL.md)
77
- **Environment variables** (adding, renaming, or reviewing any `SGLANG_*` env var, migrating a legacy `SGL_*` alias, or touching `python/sglang/srt/environ.py`) → [`env-var-conventions`](../skills/env-var-conventions/SKILL.md)
8+
- **Scripted runtime** (anything related to the scripted runtime) → [`scripted-runtime-notes`](../skills/scripted-runtime-notes/SKILL.md)
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
---
2+
name: scripted-runtime-notes
3+
description: Requirements for the SGLang scripted runtime, chiefly when to add (vs not add) a harness API. Use for anything related to the scripted runtime.
4+
---
5+
6+
# Scripted Runtime — Notes
7+
8+
Notes for anything related to the SGLang scripted runtime.
9+
10+
## When to Add an API
11+
12+
Tests read `r.req.*` and `t._scheduler.*` directly — there is no encapsulation boundary. A thin wrapper buys zero isolation; it only grows the surface.
13+
14+
Add an API only if it does real work:
15+
16+
1. **Control primitive** — drives the engine through a real path (`start_req`, `pause_generation`, `abort`, `evict_radix`, `exhaust_kv`). Reuse the real path; never hand-mutate state.
17+
2. **Hook-backed** — value cannot be read from a snapshot; accumulate via `scheduler_hook.on_run_batch` or the recv proxy (`chunks_done`). Read-only; never monkey-patch; never add `*_count` to `srt/`.
18+
3. **Multi-structure derivation, widely reused** — scans `chunked_req` + `waiting_queue` + `running_batch` + `last_batch` (`is_idle`, `status`, `batch_composition`).
19+
20+
Else: don't. Read `r.req.X` / `t._scheduler.X` in the test; inline single-use accessors.
21+
22+
Never:
23+
24+
- Weaken an assertion to fit a missing probe.
25+
- Probe implementation details ("field non-None", "which branch ran") — assert the consequence.
26+
27+
## Other Tips
28+
29+
- **Engine-self-driven behavior: drive the real loop, don't call the private.** Never synchronously call a scheduler private (e.g. `scheduler._abort_on_waiting_timeout()`) from the harness/test — it runs at the wrong loop phase, bypasses the ordered `recv_requests``process_input_requests` injection, and can fire in states the real loop never reaches (e.g. while paused). For sweeps the engine runs itself (timeout/idle), enable the config/env and advance the loop with `yield`.

python/sglang/srt/environ.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,11 @@ class Envs:
337337
SGLANG_TEST_PD_DISAGG_DEVICES = EnvStr(None)
338338
SGLANG_TEST_FORCE_OPTIMISTIC_PREFILL_RETRY_PROB = EnvFloat(0.0)
339339

340+
SGLANG_TEST_SCRIPTED_RUNTIME = EnvBool(False)
341+
SGLANG_TEST_SCRIPTED_RUNTIME_IPC_ADDR = EnvStr(None)
342+
SGLANG_TEST_SCRIPTED_RUNTIME_OUT_OF_BAND_ERROR_PATH = EnvStr(None)
343+
SGLANG_TEST_SCRIPTED_RUNTIME_SYS_PATH_ENTRY = EnvStr(None)
344+
340345
# Model Parallel
341346
SGLANG_USE_MESSAGE_QUEUE_BROADCASTER = EnvBool(True)
342347
SGLANG_ONE_VISIBLE_DEVICE_PER_PROCESS = EnvBool(False)

python/sglang/srt/managers/scheduler.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -545,6 +545,8 @@ def __init__(
545545
# Init the grammar backend for constrained generation
546546
self.init_grammar_manager()
547547

548+
self.maybe_init_scripted_scheduler_hook()
549+
548550
self.init_request_receiver()
549551

550552
self.init_dp_attn_adapter()
@@ -611,6 +613,7 @@ def init_ipc_channels(self, port_args: PortArgs):
611613
self.ps.attn_tp_rank == 0
612614
or self.server_args.enable_metrics_for_all_schedulers
613615
),
616+
enable_scripted_runtime=envs.SGLANG_TEST_SCRIPTED_RUNTIME.get(),
614617
)
615618

616619
self.load_snapshot_writer = None
@@ -1601,6 +1604,19 @@ def init_lora_overlap_loader(self) -> None:
16011604
def init_grammar_manager(self) -> None:
16021605
self.grammar_manager = GrammarManager(self)
16031606

1607+
def maybe_init_scripted_scheduler_hook(self) -> None:
1608+
if envs.SGLANG_TEST_SCRIPTED_RUNTIME.get():
1609+
from sglang.test.scripted_runtime.scheduler_hook import (
1610+
ScriptedSchedulerHook,
1611+
)
1612+
1613+
self.scripted_scheduler_hook = ScriptedSchedulerHook(
1614+
scheduler=self,
1615+
tokenizer_recv_proxy=self.ipc_channels.recv_from_tokenizer,
1616+
)
1617+
else:
1618+
self.scripted_scheduler_hook = None
1619+
16041620
def init_request_receiver(self) -> None:
16051621
self.request_receiver = SchedulerRequestReceiver(
16061622
recv_from_tokenizer=self.ipc_channels.recv_from_tokenizer,
@@ -1623,6 +1639,7 @@ def init_request_receiver(self) -> None:
16231639
get_last_forward_mode=lambda: (
16241640
self.last_batch.forward_mode if self.last_batch is not None else None
16251641
),
1642+
scripted_scheduler_hook=self.scripted_scheduler_hook,
16261643
)
16271644

16281645
def init_dp_attn_adapter(self) -> None:
@@ -2978,6 +2995,9 @@ def run_batch(
29782995
self.forward_ct += 1
29792996
batch.forward_iter = self.forward_ct
29802997

2998+
if self.scripted_scheduler_hook is not None:
2999+
self.scripted_scheduler_hook.on_run_batch(batch)
3000+
29813001
# Whether to run the profiler
29823002
self.profiler_manager._profile_batch_predicate(batch)
29833003
if self.forward_sleep_time is not None:

python/sglang/srt/managers/scheduler_components/ipc_channels.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,21 @@
11
from dataclasses import dataclass
2-
from typing import Optional
2+
from typing import TYPE_CHECKING, Optional, Union
33

44
import zmq
55

66
from sglang.srt.managers.scheduler_components.output_sender import SenderWrapper
77
from sglang.srt.server_args import PortArgs
88
from sglang.srt.utils.network import get_zmq_socket
99

10+
if TYPE_CHECKING:
11+
from sglang.test.scripted_runtime.tokenizer_recv_proxy import (
12+
ScriptedTokenizerRecvProxy,
13+
)
14+
1015

1116
@dataclass(frozen=True, slots=True, kw_only=True)
1217
class SchedulerIpcChannels:
13-
recv_from_tokenizer: Optional[zmq.Socket]
18+
recv_from_tokenizer: Union[zmq.Socket, "ScriptedTokenizerRecvProxy"]
1419
recv_from_rpc: Optional[zmq.Socket]
1520
send_to_tokenizer: SenderWrapper
1621
send_to_detokenizer: SenderWrapper
@@ -24,13 +29,22 @@ def create(
2429
is_rank_zero: bool,
2530
skip_tokenizer_init: bool,
2631
metrics_enabled: bool,
32+
enable_scripted_runtime: bool,
2733
) -> "SchedulerIpcChannels":
2834
context = zmq.Context(2)
2935

3036
if is_rank_zero:
3137
recv_from_tokenizer = get_zmq_socket(
3238
context, zmq.PULL, port_args.scheduler_input_ipc_name, False
3339
)
40+
if enable_scripted_runtime:
41+
from sglang.test.scripted_runtime.tokenizer_recv_proxy import (
42+
ScriptedTokenizerRecvProxy,
43+
)
44+
45+
recv_from_tokenizer = ScriptedTokenizerRecvProxy(
46+
underlying=recv_from_tokenizer
47+
)
3448
recv_from_rpc = get_zmq_socket(
3549
context, zmq.DEALER, port_args.rpc_ipc_name, False
3650
)

python/sglang/srt/managers/scheduler_components/request_receiver.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,15 @@
3434
from sglang.srt.configs.model_config import ModelConfig
3535
from sglang.srt.distributed.parallel_state_wrapper import ParallelState
3636
from sglang.srt.server_args import ServerArgs
37+
from sglang.test.scripted_runtime.scheduler_hook import ScriptedSchedulerHook
38+
from sglang.test.scripted_runtime.tokenizer_recv_proxy import (
39+
ScriptedTokenizerRecvProxy,
40+
)
3741

3842

3943
@dataclass(kw_only=True, slots=True, frozen=True)
4044
class SchedulerRequestReceiver:
41-
recv_from_tokenizer: zmq.Socket
45+
recv_from_tokenizer: Union[zmq.Socket, "ScriptedTokenizerRecvProxy"]
4246
recv_from_rpc: Optional[zmq.Socket]
4347
recv_skipper: Any
4448
input_blocker: Any
@@ -56,6 +60,7 @@ class SchedulerRequestReceiver:
5660
max_recv_per_poll: int
5761
stream_output: Callable[..., None]
5862
get_last_forward_mode: Callable[[], Any]
63+
scripted_scheduler_hook: Optional["ScriptedSchedulerHook"] = None
5964

6065
def recv_limit_reached(self, num_recv_reqs: int) -> bool:
6166
if self.max_recv_per_poll < 0:
@@ -67,6 +72,9 @@ def recv_requests(
6772
) -> List[Union[TokenizedGenerateReqInput, TokenizedEmbeddingReqInput, Any]]:
6873
"""Receive results at tp_rank = 0 and broadcast it to all other TP ranks."""
6974

75+
if self.scripted_scheduler_hook is not None:
76+
self.scripted_scheduler_hook.step()
77+
7078
if self.recv_skipper is not None:
7179
if not self.recv_skipper.handle(self.get_last_forward_mode()):
7280
return []

python/sglang/srt/utils/network.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ def set_recv_opt():
231231
set_send_opt()
232232
elif socket_type == zmq.PULL:
233233
set_recv_opt()
234-
elif socket_type in [zmq.DEALER, zmq.REQ, zmq.REP]:
234+
elif socket_type in [zmq.DEALER, zmq.REQ, zmq.REP, zmq.PAIR]:
235235
set_send_opt()
236236
set_recv_opt()
237237
else:

python/sglang/test/scripted_runtime/__init__.py

Whitespace-only changes.
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import logging
5+
import threading
6+
from concurrent.futures import Future
7+
from typing import Any, Coroutine, Optional
8+
9+
import aiohttp
10+
11+
logger = logging.getLogger(__name__)
12+
13+
JOIN_TIMEOUT_S: float = 10.0
14+
15+
16+
class BackgroundHttpPoster:
17+
18+
def __init__(self) -> None:
19+
self._session: Optional[aiohttp.ClientSession] = None
20+
self._loop = asyncio.new_event_loop()
21+
self._thread = threading.Thread(
22+
target=self._run_loop, name="scripted-runtime-async", daemon=True
23+
)
24+
self._thread.start()
25+
26+
def _run_loop(self) -> None:
27+
asyncio.set_event_loop(self._loop)
28+
self._loop.run_forever()
29+
30+
def submit_coro(self, coro: Coroutine) -> None:
31+
future = asyncio.run_coroutine_threadsafe(coro, self._loop)
32+
future.add_done_callback(self._log_coro_exception)
33+
34+
@staticmethod
35+
def _log_coro_exception(future: Future) -> None:
36+
try:
37+
future.result()
38+
except asyncio.CancelledError:
39+
pass
40+
except Exception:
41+
logger.exception("scripted_runtime: background async coroutine failed")
42+
43+
async def post(self, url: str, json: Any) -> None:
44+
session = self._ensure_session()
45+
async with session.post(url, json=json) as resp:
46+
await resp.read()
47+
48+
def _ensure_session(self) -> aiohttp.ClientSession:
49+
if self._session is None or self._session.closed:
50+
self._session = aiohttp.ClientSession(
51+
connector=aiohttp.TCPConnector(limit=0)
52+
)
53+
return self._session
54+
55+
def close(self) -> None:
56+
try:
57+
if self._session is not None:
58+
future = asyncio.run_coroutine_threadsafe(
59+
self._session.close(), self._loop
60+
)
61+
future.result(timeout=JOIN_TIMEOUT_S)
62+
except Exception:
63+
logger.exception("scripted_runtime: failed to close aiohttp session")
64+
try:
65+
self._loop.call_soon_threadsafe(self._loop.stop)
66+
self._thread.join(timeout=JOIN_TIMEOUT_S)
67+
except Exception:
68+
logger.exception("scripted_runtime: failed to stop background async loop")
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from sglang.test.scripted_runtime.context.api import ScriptedContext
2+
3+
__all__ = ["ScriptedContext"]

0 commit comments

Comments
 (0)