Skip to content

Commit e52e92c

Browse files
authored
Instant ttft oversaturation (#607)
## Summary When over-saturation detection is enabled (`--detect-saturation`), the constraint can only receive TTFT data after a request fully completes. With large models and long contexts, no request completes within the `minimum_duration` window (default 30s), so the constraint falls back to concurrent slope alone and stops prematurely. This PR adds time-bounded instant TTFT notifications: when over-saturation detection is enabled, worker processes monitor for first-token arrival during streaming and send a `"first_token_arrived"` status update before the request completes. This gives the constraint real TTFT data for a two-signal decision. Notifications are sent only during the first `minimum_duration` seconds of the benchmark to limit IPC overhead. ## Details - [x] Add `"first_token_arrived"` to `RequestInfo.status` literal (`schemas/info.py`) - [x] Add TTFT polling monitor to `WorkerProcess` — spawns an async task per request that detects `first_token_iteration` and sends a `"first_token_arrived"` update (`scheduler/worker.py`) - [x] Time-bound the monitor: notifications stop after `minimum_duration` seconds via `instant_ttft_duration` (`scheduler/worker.py`) - [x] Handle `"first_token_arrived"` in `WorkerGroupState` — no request count changes, passes through to constraints (`scheduler/worker_group.py`) - [x] Extract `minimum_duration` from `OverSaturationConstraint` to configure worker TTFT duration (`scheduler/worker_group.py`) - [x] Accept TTFT from both `"first_token_arrived"` and `"completed"` in the constraint, deduplicated by request ID (`scheduler/constraints/saturation.py`) - [x] Add 8 tests covering happy path, dedup, missing timings, backward compatibility, concurrent isolation, disabled mode, reset, and multi-request slope building ## Test Plan - Run `pytest tests/unit/scheduler/ tests/unit/schemas/ tests/unit/backends/` — 1077 passed - Run `pre-commit run --files` on changed files — all checks pass - Verify with `--detect-saturation` on a large model with long context (>10k tokens) that the benchmark no longer stops prematurely ## Related Issues - Resolves #606 --- - [x] "I certify that all code in this PR is my own, except as noted below." ## Use of AI - [x] Includes AI-assisted code completion - [x] Includes code generated by an AI application - [x] Includes AI-generated tests (NOTE: AI written tests should have a docstring that includes `## WRITTEN BY AI ##`)
2 parents 4f9d266 + f9267cb commit e52e92c

File tree

7 files changed

+327
-17
lines changed

7 files changed

+327
-17
lines changed

src/guidellm/backends/openai/http.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ async def resolve( # type: ignore[override]
279279
request: GenerationRequest,
280280
request_info: RequestInfo,
281281
history: list[tuple[GenerationRequest, GenerationResponse]] | None = None,
282-
) -> AsyncIterator[tuple[GenerationResponse, RequestInfo]]:
282+
) -> AsyncIterator[tuple[GenerationResponse | None, RequestInfo]]:
283283
"""
284284
Process generation request and yield progressive responses.
285285
@@ -377,6 +377,7 @@ async def resolve( # type: ignore[override]
377377
if request_info.timings.first_token_iteration is None:
378378
request_info.timings.first_token_iteration = iter_time
379379
request_info.timings.token_iterations = 0
380+
yield None, request_info
380381

381382
request_info.timings.last_token_iteration = iter_time
382383
request_info.timings.token_iterations += iterations

src/guidellm/scheduler/constraints/saturation.py

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@ def reset(self) -> None:
367367
self.ttft_violations_counter = 0
368368
self.total_finished_ever = 0
369369
self.total_started_ever = 0
370+
self._ttft_reported_request_ids: set[str] = set()
370371
self.concurrent_slope_checker = SlopeChecker(
371372
moe_threshold=self.moe_threshold, confidence=self.confidence, eps=self.eps
372373
)
@@ -519,17 +520,19 @@ def __call__(
519520
self._add_started(
520521
{"concurrent_requests": concurrent_requests, "duration": duration}
521522
)
522-
elif (
523-
request_info.status == "completed"
524-
and request_info.timings
525-
and request_info.timings.first_token_iteration
526-
and request_info.timings.request_start
527-
):
528-
ttft = (
529-
request_info.timings.first_token_iteration
530-
- request_info.timings.request_start
531-
)
532-
self._add_finished({"ttft": ttft, "duration": duration})
523+
elif request_info.status in ("first_token", "completed"):
524+
if (
525+
request_info.request_id not in self._ttft_reported_request_ids
526+
and request_info.timings
527+
and request_info.timings.first_token_iteration
528+
and request_info.timings.request_start
529+
):
530+
self._ttft_reported_request_ids.add(request_info.request_id)
531+
ttft = (
532+
request_info.timings.first_token_iteration
533+
- request_info.timings.request_start
534+
)
535+
self._add_finished({"ttft": ttft, "duration": duration})
533536

534537
self._update_duration(duration)
535538
is_over_saturated = self._check_alert()

src/guidellm/scheduler/schemas.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,14 +119,15 @@ async def resolve(
119119
request: RequestT,
120120
request_info: RequestInfo,
121121
history: list[tuple[RequestT, ResponseT]] | None = None,
122-
) -> AsyncIterator[tuple[ResponseT, RequestInfo]]:
122+
) -> AsyncIterator[tuple[ResponseT | None, RequestInfo]]:
123123
"""
124124
Process a request and yield incremental response updates.
125125
126126
:param request: The request object to process
127127
:param request_info: Scheduling metadata and timing information
128128
:param history: Conversation history for multi-turn requests
129-
:yield: Tuples of (response, updated_request_info) for each response chunk
129+
:yield: Tuples of (response, updated_request_info) for each response chunk.
130+
Response may be None for intermediate updates (e.g., first token arrival).
130131
:raises Exception: Implementation-specific exceptions for processing failures
131132
"""
132133

src/guidellm/scheduler/worker.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -365,11 +365,18 @@ async def _process_next_request(self, target_start: float):
365365
async for resp, info in self.backend.resolve( # type: ignore[attr-defined]
366366
request, request_info, None
367367
):
368-
response = resp
369368
request_info = info
370369
if request_info is None:
371370
raise RuntimeError("Received invalid request info from backend")
372371

372+
if (
373+
resp is None
374+
and request_info.timings.first_token_iteration is not None
375+
):
376+
self._send_update("first_token", None, request, request_info)
377+
378+
response = resp
379+
373380
# Complete the request
374381
request_info.timings.resolve_end = time.time()
375382
self._send_update("completed", response, request, request_info)
@@ -428,7 +435,12 @@ async def _schedule_request(
428435
def _send_update(
429436
self,
430437
new_status: Literal[
431-
"pending", "in_progress", "completed", "errored", "cancelled"
438+
"pending",
439+
"in_progress",
440+
"first_token",
441+
"completed",
442+
"errored",
443+
"cancelled",
432444
],
433445
response: ResponseT | None,
434446
request: RequestT | MultiTurnRequestT[RequestT],

src/guidellm/scheduler/worker_group.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -643,6 +643,8 @@ def _update_state_request_counts(self, info: RequestInfo):
643643
self._state.pending_requests = len(self._pending_request_ids)
644644
self._processing_request_ids.add(info.request_id)
645645
self._state.processing_requests = len(self._processing_request_ids)
646+
elif info.status == "first_token":
647+
pass
646648
elif info.status == "completed":
647649
info.timings.finalized = finalized
648650
self._processing_request_ids.remove(info.request_id)

src/guidellm/schemas/info.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,13 @@ class RequestInfo(StandardBaseModel):
126126
default_factory=lambda: str(uuid.uuid4()),
127127
)
128128
status: Literal[
129-
"queued", "pending", "in_progress", "completed", "errored", "cancelled"
129+
"queued",
130+
"pending",
131+
"in_progress",
132+
"first_token",
133+
"completed",
134+
"errored",
135+
"cancelled",
130136
] = Field(description="Current processing status of the request", default="queued")
131137
scheduler_node_id: int = Field(
132138
description="ID/rank of the scheduler node handling the request",

0 commit comments

Comments
 (0)