Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions python/sglang/srt/managers/tokenizer_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1843,6 +1843,34 @@ def auto_create_handle_loop(self):
loop.create_task(print_exception_wrapper(self.sigterm_watchdog))
)

# Monitor this loop's scheduling lag. This process (the lone
# TokenizerManager, or each TokenizerWorker in multi-tokenizer mode)
# stamps received_time on the same loop, so its lag is the engine-side
# view of delay that accrues before received_time. The MultiTokenizerRouter
# runs a separate forwarding loop that does not stamp received_time, so it
# is intentionally not monitored here.
if self.enable_metrics:
self.asyncio_tasks.add(
loop.create_task(print_exception_wrapper(self.watch_event_loop_lag))
)

async def watch_event_loop_lag(self, interval: float = 0.1):
"""Sample this process's asyncio event-loop scheduling lag as a metric.

Each tick asks to sleep ``interval`` seconds. A healthy loop wakes on
time (lag ~0); a loop blocked by synchronous work or GIL contention wakes
late, and the overrun is how long it was unresponsive -- and thus unable
to stamp incoming requests' received_time.
"""
loop = asyncio.get_running_loop()
next_tick = loop.time() + interval
while True:
await asyncio.sleep(interval)
now = loop.time()
lag = now - next_tick
self.metrics_collector.observe_event_loop_lag(lag if lag > 0.0 else 0.0)
next_tick = now + interval
Comment on lines +1857 to +1872

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Issue: Potential Measurement Inaccuracy and Process Crash Risk

  1. Measurement Inaccuracy: In the current implementation, next_tick is calculated as now + interval before self.metrics_collector.observe_event_loop_lag is called. Any CPU time spent inside observe_event_loop_lag (or spent running other tasks on the event loop before the next await asyncio.sleep is reached) will be incorrectly counted as "scheduling lag" in the subsequent iteration. Measuring the scheduled wakeup time immediately before calling asyncio.sleep avoids this accumulation of overhead.
  2. Process Crash Risk: Since this background task is wrapped in print_exception_wrapper, any unhandled exception raised during metrics observation (e.g., if self.metrics_collector is None or Prometheus client fails) will trigger the wrapper's exception handler, which logs the error and terminates the entire server process. Wrapping the observation call in a try-except block prevents metrics failures from crashing the production server.

Suggested Improvement

We can simplify the logic, eliminate the stateful next_tick variable, and make the metric collection robust against unexpected errors.

Suggested change
async def watch_event_loop_lag(self, interval: float = 0.1):
"""Sample this process's asyncio event-loop scheduling lag as a metric.
Each tick asks to sleep ``interval`` seconds. A healthy loop wakes on
time (lag ~0); a loop blocked by synchronous work or GIL contention wakes
late, and the overrun is how long it was unresponsive -- and thus unable
to stamp incoming requests' received_time.
"""
loop = asyncio.get_running_loop()
next_tick = loop.time() + interval
while True:
await asyncio.sleep(interval)
now = loop.time()
lag = now - next_tick
self.metrics_collector.observe_event_loop_lag(lag if lag > 0.0 else 0.0)
next_tick = now + interval
async def watch_event_loop_lag(self, interval: float = 0.1):
"""Sample this process's asyncio event-loop scheduling lag as a metric.
Each tick asks to sleep ``interval`` seconds. A healthy loop wakes on
time (lag ~0); a loop blocked by synchronous work or GIL contention wakes
late, and the overrun is how long it was unresponsive -- and thus unable
to stamp incoming requests' received_time.
"""
loop = asyncio.get_running_loop()
while True:
scheduled_wakeup = loop.time() + interval
await asyncio.sleep(interval)
lag = loop.time() - scheduled_wakeup
try:
self.metrics_collector.observe_event_loop_lag(max(0.0, lag))
except Exception as e:
logger.warning("Failed to observe event loop lag: %s", e)


async def handle_loop(self):
"""The event loop that handles requests"""
while True:
Expand Down
33 changes: 33 additions & 0 deletions python/sglang/srt/observability/metrics_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -1632,6 +1632,34 @@ def __init__(
buckets=bucket_e2e_request_latency,
)

# Scheduling lag of this process's asyncio event loop, i.e. how far past
# its scheduled wake-up the lag monitor coroutine actually ran. The loop
# that stamps a request's received_time is the same one served here, so
# sustained lag is delay that accrues *before* received_time -- the slice
# of client-observed TTFT that engine TTFT (received_time -> first token)
# cannot see. Healthy lag is sub-millisecond; starvation lands at 0.1s+.
self.histogram_event_loop_lag = Histogram(
name="sglang:event_loop_lag_seconds",
documentation="Histogram of tokenizer-process asyncio event loop scheduling lag in seconds.",
labelnames=labels.keys(),
buckets=[
0.0005,
0.001,
0.0025,
0.005,
0.01,
0.025,
0.05,
0.1,
0.25,
0.5,
1.0,
2.5,
5.0,
10.0,
],
)

def observe_one_finished_request(
self,
labels: Dict[str, str],
Expand Down Expand Up @@ -1688,6 +1716,11 @@ def report_cache_source(source: str, value: int):
def observe_time_to_first_token(self, labels: Dict[str, str], value: float):
self.histogram_time_to_first_token.labels(**labels).observe(value)

def observe_event_loop_lag(self, lag: float):
# Process-wide signal, so it carries the collector's own labels rather
# than per-request ones.
self.histogram_event_loop_lag.labels(**self.labels).observe(lag)

def check_time_to_first_token_straggler(self, value: float) -> bool:
his = self.histogram_time_to_first_token.labels(**self.labels)
total_observations = sum(bucket._value for bucket in his._buckets)
Expand Down
120 changes: 120 additions & 0 deletions test/registered/unit/observability/test_event_loop_lag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""Pure-CPU unit tests for the tokenizer event-loop-lag metric.

Covers the two halves of ``sglang:event_loop_lag_seconds``:

* ``TokenizerMetricsCollector`` registers the histogram and
``observe_event_loop_lag`` routes the value through the collector's labels.
* ``TokenizerManager.watch_event_loop_lag`` records a lag sample roughly equal
to how long the loop was blocked -- the real failure mode this metric exists
to surface (a starved loop that cannot promptly stamp ``received_time``).
"""

from sglang.test.ci.ci_register import register_cpu_ci

register_cpu_ci(est_time=5, suite="base-a-test-cpu")

import asyncio
import time
import unittest


class _RecordingCollector:
def __init__(self):
self.lags = []

def observe_event_loop_lag(self, lag):
self.lags.append(lag)


class _FakeCounter:
def __init__(self, name, documentation, labelnames):
pass


class _FakeHistogram:
by_name = {}

def __init__(self, name, documentation, labelnames, buckets):
self.name = name
self.buckets = buckets
self.observed = []
self.last_labels = None
_FakeHistogram.by_name[name] = self

def labels(self, **kwargs):
self.last_labels = kwargs
return self

def observe(self, value):
self.observed.append(value)


class _StubServerArgs:
"""Minimal ServerArgs stand-in for constructing the collector.

Only the two bucket-rule attributes are read in ``__init__``; ``None`` makes
``generate_buckets`` fall back to its defaults.
"""

prompt_tokens_buckets = None
generation_tokens_buckets = None


class TestEventLoopLagMetricWiring(unittest.TestCase):
def test_metric_registered_and_routes_through_labels(self):
from sglang.srt.observability.metrics_collector import (
TokenizerMetricsCollector,
)

_FakeHistogram.by_name = {}

class _DICollector(TokenizerMetricsCollector):
_counter_cls = _FakeCounter
_histogram_cls = _FakeHistogram

labels = {"model_name": "m", "engine_type": "unified"}
collector = _DICollector(server_args=_StubServerArgs(), labels=labels)

self.assertIn("sglang:event_loop_lag_seconds", _FakeHistogram.by_name)
hist = _FakeHistogram.by_name["sglang:event_loop_lag_seconds"]
# Sub-millisecond floor through multi-second ceiling.
self.assertEqual(hist.buckets[0], 0.0005)
self.assertEqual(hist.buckets[-1], 10.0)

collector.observe_event_loop_lag(0.7)
self.assertEqual(hist.observed, [0.7])
# Process-wide metric carries the collector's own labels.
self.assertEqual(hist.last_labels, labels)


class TestWatchEventLoopLag(unittest.TestCase):
def test_blocked_loop_records_lag(self):
from sglang.srt.managers.tokenizer_manager import TokenizerManager

# Bypass the heavy __init__; the monitor only touches metrics_collector.
tm = TokenizerManager.__new__(TokenizerManager)
collector = _RecordingCollector()
tm.metrics_collector = collector

async def scenario():
task = asyncio.create_task(tm.watch_event_loop_lag(interval=0.02))
await asyncio.sleep(0.06) # a few healthy ticks (lag ~0)
time.sleep(0.4) # synchronously block the loop
await asyncio.sleep(0.06) # let the overdue tick land
task.cancel()
try:
await task
except asyncio.CancelledError:
pass

asyncio.run(scenario())

self.assertGreaterEqual(len(collector.lags), 2)
self.assertTrue(
any(lag >= 0.3 for lag in collector.lags),
f"expected a >=0.3s lag sample from the 0.4s block, got {collector.lags}",
)


if __name__ == "__main__":
unittest.main()
Loading