Skip to content

[V1, Feature] Add OpenAI Realtime WebSocket endpoint (M0)#385

Open
PopSoda2002 wants to merge 16 commits intosgl-project:mainfrom
PopSoda2002:v1_realtime
Open

[V1, Feature] Add OpenAI Realtime WebSocket endpoint (M0)#385
PopSoda2002 wants to merge 16 commits intosgl-project:mainfrom
PopSoda2002:v1_realtime

Conversation

@PopSoda2002
Copy link
Copy Markdown
Collaborator

@PopSoda2002 PopSoda2002 commented May 4, 2026

Summary

Mounts /v1/realtime, an OpenAI-Realtime-compatible WebSocket API on sglang-omni v1, to enable streaming audio in / streaming transcript deltas out for low-latency voice agents and live transcription/translation. Disabled by default — opt in with --enable-realtime.

Related discussions: vLLM streaming-realtime, sgl-project/sglang#22474 (upstream RFC scoped to transcription only — this PR goes for the OpenAI superset instead).

What's in this PR

New package: sglang_omni_v1/serve/realtime/

  • events.py — Pydantic schemas for the OpenAI Realtime client/server event vocabulary
  • audio_buffer.py — append-only PCM16 rolling buffer
  • session.py — per-WebSocket state machine; dispatches client events, drives the engine via the existing Coordinator.stream(), translates engine deltas back to Realtime server events
  • manager.py — in-memory session_id → RealtimeSession registry
image

Misc

image image

Mounts /v1/realtime, an OpenAI-Realtime-compatible WebSocket API for
streaming audio in / streaming transcript deltas out. Disabled by
default; opt in with `--enable-realtime` on the server.

Scope (M0 — protocol surface):
  * Client events: session.update, input_audio_buffer.append/commit/clear,
    conversation.item.create (text-only), response.create/cancel
  * Server events: session.created/updated, input_audio_buffer.committed/
    cleared, conversation.item.created, transcription.delta/completed/
    failed, response.created/text.delta/text.done/done, error
  * Lifecycle: each commit / response.create builds a fresh OmniRequest
    against the existing chat-completion path. Anchor-request lifecycle
    with KV preservation lands in M1 without changing this wire surface.

What this PR adds:
  * sglang_omni_v1/serve/realtime/ — events.py (Pydantic schemas),
    audio_buffer.py (rolling PCM16 buffer), session.py (per-WS state
    machine), manager.py (in-memory registry)
  * preprocessing/audio.py: pcm16_bytes_to_float32 fast path (no
    PyAV/WAV header parse) for streaming PCM frames
  * serve/openai_api.py: _register_realtime + create_app(enable_realtime=)
  * serve/launcher.py + cli/serve.py: --enable-realtime flag
  * examples/realtime_file_client.py — replay WAV at real-time rate
  * examples/realtime_mic_client.py — mic streaming via sounddevice
  * tests/test_v1_realtime_smoke.py — 18 tests covering PCM16 round-trip,
    audio buffer mutation, event parsing, and full WebSocket E2E via
    FastAPI TestClient with a fake Client (no GPU required)

Out of scope (deferred):
  * Server VAD / turn detection (M2)
  * Audio output / response.audio.delta (M3)
  * Function calling / tools (M3)
  * Anchor-request KV preservation (M1)
  * Ming-Omni support (M4)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@PopSoda2002 PopSoda2002 requested a review from FrankLeeeee as a code owner May 4, 2026 02:47
@PopSoda2002 PopSoda2002 marked this pull request as draft May 4, 2026 02:52
PopSoda2002 and others added 10 commits May 4, 2026 04:59
GPU-validated end-to-end against Qwen3-Omni-30B-A3B-Instruct on 2x H200.
This commit lands the M2 server-VAD path + M3 polish on top of the M0
protocol scaffolding from the previous commit.

M0 hotfix — audio IPC (was blocking GPU validation):
  * Pipeline IPC layer (msgpack) cannot serialize numpy.ndarray. The M0
    transcription path passed `metadata["audios"] = [np_array]` and
    crashed before reaching the encoder.
  * RealtimeAudioBuffer.to_wav_data_uri() now serializes the buffer as
    `data:audio/wav;base64,...`; the multimodal resource connector
    decodes it back to float32 on the preprocessor side. No coordinator
    or proto changes needed.

M1 — anchor request + KV preservation: SHELVED.
  * Pre-flight spike (playground/realtime_spike/encoder_acausality.py)
    numerically compared full-clip vs chunked encode on Qwen3-Omni's
    audio tower across {250, 500, 1000, 2000} ms widths.
  * Result: relative_diff = 0.59-1.47 across all widths. Encoder is
    strongly acausal — chunked embeddings are not equivalent to a full
    encode. Straight KV-extend would corrupt the thinker's prefix KV;
    the fallback ("re-encode prefix per append, preserve thinker KV")
    only saves system-prompt KV and is not worth the engineering cost
    of new ExtendInputMessage plumbing.
  * Plan revised: M1 dropped, M2 promoted to next milestone.

M2 — server VAD + multi-utterance:
  * silero-vad ONNX wrapper in serve/realtime/vad.py (~1 ms / 32 ms
    window, CPU thread). Frame-by-frame state machine emits
    speech_started / speech_stopped per OpenAI Realtime semantics.
  * RealtimeAudioBuffer.slice_to_wav_data_uri(start, end) and tail(n)
    so the session can commit only the speech segment with prefix
    padding, dropping post-utterance silence.
  * RealtimeSession wires VAD into _handle_audio_append; auto-commits
    on speech_stopped.
  * FIFO _transcription_queue + _drain_queue serialize utterances when
    the engine can't drain fast enough (manual or VAD-driven).
  * Default system prompt rewritten as a "realtime speech-to-text"
    framing — Qwen3-Omni's chatty default produced descriptions and
    refusals; the new prompt produces verbatim transcripts.
  * 7 new VAD tests on top of the 18 M0 tests (25 total, all green).
  * GPU validation: 12s multi-utterance fixture → 4 correct
    speech_started/stopped pairs → 4 verbatim transcripts via the queue.

M3 — polish:
  * benchmarks/realtime_latency.py — wall-clock measurement of manual
    and VAD modes. Manual mode H200 baseline: p50 = 190 ms, p95 = 303
    ms, max = 286 ms (commit -> first delta -> completed).
    benchmarks/baselines/realtime_latency_h200.json checked in.
  * examples/realtime_translate.py — same /v1/realtime path with
    `instructions` set for live streaming translation.
  * docs/basic_usage/realtime.md — overview, manual + VAD walkthroughs,
    supported event matrix, latency benchmark instructions.
  * realtime_file_client.py grew --server-vad flag and multi-utterance
    completion tracking.

Encoder cache (originally M3) deferred — under server VAD, fresh
audio segments per utterance dominate any cache hits we'd see, and
the encoder-acausality result removes the main motivation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Net -264 lines across the realtime package after a pass with ruff +
manual review. No behavior change; 36/36 tests still green.

Removed dead code:
  * RealtimeAudioBuffer.numpy_to_wav_data_uri — never called
  * RealtimeAudioBuffer.to_numpy + duration_ms — only used internally
    by helpers that were themselves unused
  * Three stranded imports (numpy, sys, typing.Iterable)
  * RealtimeSessionManager._lock — asyncio.Lock that was created but
    never acquired
  * VAD module's _VAD_FRAME_MS constant — declared but unused
  * `_Emit` from vad.py __all__ (private dataclass leaked into the
    public API)

Trimmed comments per CLAUDE.md (default to no comments; only WHY,
not WHAT):
  * Section dividers (# Public API / # Internals / # Mutation / # Read)
  * Multi-line explanations of variables whose names already say it
  * Module docstrings narrating M0/M1/M2/M3 milestones — referenced
    process state that's now ancient history; replaced with one-line
    descriptions of what each module does
  * Verbose docstrings on small wrappers (`_load_model`, `reset`,
    `numpy_to_wav_data_uri`)

Style fixes:
  * Two long lines wrapped (>88 cols)
  * `from typing import Iterable` removed where unused

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Fixes the four findings (3 high, 1 medium) from /codex:adversarial-review.
43/43 tests passing (18 smoke + 7 VAD + 7 hardening + 11 chat-completions).

sgl-project#1 [HIGH] server_vad crashed on clean install (vad.py:50-59)
  * silero-vad and onnxruntime were imported lazily but never declared
    as dependencies; clean install + --enable-realtime + server_vad
    config crashed the WS with ModuleNotFoundError instead of returning
    a recoverable error.
  * Added silero-vad>=5.1, onnxruntime>=1.17, websockets>=12.0 to
    pyproject.toml [project].dependencies.
  * Introduced VADUnavailable exception; session.update catches both
    import and init failures, sends a structured `error` event with
    code="server_vad_unavailable", and falls back to manual mode
    without dropping the WebSocket.

sgl-project#2 [HIGH] All sessions shared a single LSTM-stateful VAD model (vad.py:46-61)
  * _model_cache singleton meant concurrent sessions cross-contaminated
    silero's hidden state, and one session's reset_states() wiped
    another session's pending utterance.
  * Removed the singleton; each StreamingVAD constructs its own model
    via _load_model_instance(). Silero-vad ONNX is ~1MB so per-session
    instances are cheap.

sgl-project#3 [HIGH] Unbounded buffer = single-WS DoS (audio_buffer.py:43-53)
  * append_b64 had no size cap. A client streaming silence forever (or
    one that never sends commit/clear) could grow the bytearray until
    the worker OOM'd.
  * Added max_bytes parameter (default 60s × 16kHz × 2B = 1.92 MB).
    Overflow raises new AudioBufferOverflow exception. Session handler
    sends an `error` event with code="input_audio_buffer_too_large"
    and closes the WS with code 1009 ("message too big").

sgl-project#4 [MED] Manual commit left VAD/origin state stale (session.py:394-405)
  * Auto-commit reset _vad, _utterance_start_byte, and advanced
    _buffer_origin_samples; manual commit only cleared the buffer. A
    client that mixed manual commits with active server_vad would carry
    stale LSTM and offset state into the next utterance.
  * Extracted _drop_buffer_and_reset_vad() helper; both auto-commit,
    manual commit, and audio_buffer.clear all go through it now.

New regression tests (tests/test_v1_realtime_hardening.py):
  * test_session_update_returns_error_when_vad_unavailable — sgl-project#1
  * test_streaming_vad_constructs_a_distinct_model_per_instance — sgl-project#2
  * test_audio_buffer_rejects_overflow_on_append — sgl-project#3
  * test_audio_buffer_overflow_extends_audio_buffer_error_hierarchy — sgl-project#3
  * test_websocket_closes_on_oversized_append — sgl-project#3
  * test_manual_commit_resets_vad_and_origin — sgl-project#4
  * test_drop_buffer_and_reset_vad_helper_unifies_paths — sgl-project#4 structural

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Excalidraw + rendered PNG showing the realtime architecture:

  - WebSocket transport (left): chunks streaming in via
    input_audio_buffer.append at ~30 Hz
  - RealtimeSession (middle): RealtimeAudioBuffer (with 1.92 MB cap),
    silero VAD per-session state machine, slice → WAV data URI, FIFO
    transcription queue, drainer
  - Engine pipeline (right): Preprocessor → Audio Encoder (highlighted
    acausal) → Thinker LLM → Decode → token stream back to client
  - M1 spike summary: encoder rel_diff = 0.59–1.47 across chunk widths,
    killed anchor-request KV-extend
  - Codex hardening fixes: 4 callouts
  - "On the wire" JSON evidence: 3 real event examples

Drawn via the excalidraw-diagram skill; rendered via Playwright +
chromium-headless. The .excalidraw source is editable in Excalidraw
itself if anyone wants to iterate.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
v1 had three issues:
  - Engine column flowed top-down while everything else was left-to-right
    → eye traced an awkward U-shape
  - VAD state machine was just a row of text ("IDLE → SPEAKING → IDLE")
    rather than visualized
  - "ACAUSAL" was buried as a small subtitle inside the encoder; the
    spike result at the bottom had no visual link to the encoder it
    was about
  - Coordinator label overlapped the AudioBuffer rectangle

v2 fixes:
  - Engine pipeline is horizontal (top-right): Preprocess → Audio
    Encoder → Thinker → Decode. Whole diagram now flows L→R then loops
    back via a single clean return arrow.
  - VAD is drawn as a real two-state machine with labeled transitions
    (prob ≥ 0.5 forward, silence ≥ 500 ms back).
  - Audio Encoder uses warning red, has ACAUSAL stamp, and a dashed
    red callout arrow visually connects it to the M1 SPIKE block at
    the bottom that explains why.
  - Coordinator + msgpack IPC labels now live in the clear vertical
    channel between FIFO and Preprocess.

Same content (M1 spike + 4 Codex hardening fixes + 3 wire-format JSON
examples) preserved; just better visual argument.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Per user style preference for the realtime package: no leading-
underscore identifiers, no try/except blocks. 43/43 tests still pass.

Renames (no behavior change):
  * RealtimeAudioBuffer: _buf/_source_sr/_target_sr/_channels/_max_bytes
    → buf/source_sr/target_sr/channels/max_bytes
  * StreamingVAD: _model/_leftover_pcm/_samples_consumed/_is_speech/
    _silence_run_samples/_last_speech_offset/_infer
    → model/leftover_pcm/samples_consumed/is_speech/silence_run_samples/
      last_speech_offset/infer
  * RealtimeSessionManager: _client/_model_name/_sessions
    → client/model_name/sessions
  * RealtimeSession: _audio_buffer/_session_object/_vad/_conversation/
    _closed/_active_*/_transcription_queue/_queue_drainer/
    _buffer_origin_samples/_utterance_start_byte/...
    → audio_buffer/session_object/vad/conversation/closed/active_*/...
  * Module privates: _DEFAULT_MAX_BUFFER_BYTES → DEFAULT_MAX_BUFFER_BYTES,
    _VAD_FRAME_SAMPLES → VAD_FRAME_SAMPLES, _VAD_SAMPLE_RATE →
    VAD_SAMPLE_RATE, _Emit → Emit, _Base → EventBase,
    _CLIENT_EVENT_TYPES → CLIENT_EVENT_TYPES, _load_model_instance
    → load_silero_model
  * Methods: _drop_buffer_and_reset_vad → drop_buffer_and_reset_vad,
    _handle_* → handle_*, _run_transcription → run_transcription,
    _build_* → build_*, _send → send, _send_error → send_error, etc.

try/except removals (replaced with `contextlib.suppress` + sentinels,
early-return guards, or tuple-returning APIs):

  * audio_buffer.py: append_b64 now returns (bytes_appended, error_kind)
    where error_kind is None / "invalid_b64" / "overflow". Removed the
    AudioBufferError / AudioBufferOverflow exception classes — the
    Literal type alias `AppendError` is the new contract. Internal
    base64 decode wrapped in `suppress(binascii.Error, ValueError)`.

  * vad.py: load_silero_model returns model or raises VADUnavailable;
    internal failures wrapped with `suppress`. No try/except.

  * events.py: parse_client_event uses `suppress(ValidationError)` and
    returns Optional[ClientEvent]. CLIENT_EVENT_TYPES is now public.

  * session.py:
    - run() loops while not self.closed; receive_text wrapped in
      `suppress(WebSocketDisconnect, RuntimeError)` so a disconnect or
      a self-initiated close just breaks the loop cleanly.
    - decode_payload helper returns Optional[dict] via suppress.
    - handle_audio_append branches on the (n, err) tuple from append_b64
      instead of catching AudioBufferOverflow / AudioBufferError.
    - VAD construction in handle_session_update wraps in
      `suppress(VADUnavailable)` and emits a structured error if the
      result is None.
    - run_transcription / run_text_response use nested `suppress`
      blocks plus a `succeeded`/`engine_failed`/`cancelled` sentinel
      pattern: ClientError + Exception suppressed inside, CancelledError
      suppressed outside. A return statement on the success path skips
      the failure-event emit.
    - drain_queue: `suppress(asyncio.CancelledError)` for the outer loop,
      `suppress(asyncio.TimeoutError)` for the queue.get wait,
      `suppress(Exception)` for awaiting the active task.
    - send/teardown/handle_response_cancel: all best-effort cleanups
      use `suppress(Exception)` instead of try/except.

Tests updated for the renamed identifiers and the tuple-returning
`append_b64`:
  * tests/test_v1_realtime_smoke.py — `(n, err) = buf.append_b64(...)`
    instead of `pytest.raises(AudioBufferError)`.
  * tests/test_v1_realtime_vad.py — direct attribute access
    (vad.is_speech etc.) instead of `vad._is_speech`.
  * tests/test_v1_realtime_hardening.py — monkeypatch
    `load_silero_model` instead of `_load_model_instance`; assert
    `(0, "overflow")` instead of catching `AudioBufferOverflow`.

Net: zero try/except blocks in the realtime package; zero leading-
underscore identifiers in code I authored. Existing pre-PR conventions
(`_register_*` in openai_api.py, etc.) untouched since they're not
mine to restyle.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Per user preference ("不要做任何的 error 处理 出错就错了"): no
try/except, no contextlib.suppress, no defensive checks. If something
fails, the exception propagates. Memory updated to reflect both rules.

Changes:
  * audio_buffer.py — removed `decode_audio_b64` + suppress wrapper.
    `append_b64` now calls `base64.b64decode` directly; bad input
    raises `binascii.Error`. AppendError type narrowed to "overflow"
    (the only structured error path remaining).
  * vad.py — removed `VADUnavailable`, removed all suppress around
    silero import / load. If silero isn't installed, ImportError
    propagates. If load fails, the underlying exception propagates.
  * events.py — removed `suppress(ValidationError)` from
    `parse_client_event`. Malformed events raise; unrecognized types
    still return None.
  * session.py:
    - run() uses raw `websocket.receive()` instead of `receive_text()`,
      handling the `websocket.disconnect` ASGI message in-band so a
      normal disconnect doesn't raise. No try/except around the loop.
    - drain_queue() uses `asyncio.wait({task})` (which doesn't re-raise
      task results) and explicitly retrieves `task.exception()` to
      silence GC warnings. No suppress.
    - run_transcription / run_text_response are now linear: ClientError
      and CancelledError propagate from the engine stream straight up,
      contained by the drainer's `asyncio.wait`. No "transcription.failed"
      synthetic event on engine error — the user's accepted tradeoff
      for not handling errors.
    - send() / handle_response_cancel / teardown — removed all suppress;
      send-after-disconnect and abort failures propagate. teardown uses
      `asyncio.wait` to cancel/await pending tasks without re-raising.

Tests updated:
  * test_audio_buffer_propagates_bad_base64: now `pytest.raises(binascii.Error)`.
  * test_websocket_invalid_json_kills_session: now wraps the entire
    TestClient block in `pytest.raises(json.JSONDecodeError)`.
  * test_session_update_propagates_vad_load_failure (was
    `..._returns_error_when_vad_unavailable`): now wraps in
    `pytest.raises(ImportError)`.
  * test_audio_buffer_invalid_b64_propagates: same shape as above.

43/43 tests pass; ruff clean.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
User asked to remove the standalone examples / docs / spike artifacts
and replace them with a single web demo. Net: −7 ad-hoc files,
+4 self-contained demo files in playground/web/realtime/.

Removed:
  * docs/basic_usage/realtime.md
  * examples/realtime_file_client.py
  * examples/realtime_mic_client.py
  * examples/realtime_translate.py
  * playground/realtime_spike/encoder_acausality.py
  * playground/realtime_spike/pr385_architecture.excalidraw
  * playground/realtime_spike/pr385_architecture.png

Added (playground/web/realtime/):
  * index.html — single-page UI: server URL, turn-detection mode
    (manual vs server_vad), instructions box, mic start/stop, manual
    commit, clear buffer, live transcripts, event log
  * app.js — WebSocket client, getUserMedia → AudioWorklet @ 16 kHz →
    Float32 → PCM16 LE → base64 → input_audio_buffer.append; renders
    transcription.delta / .completed events as live utterance cards
  * styles.css — minimal dark-theme vanilla CSS, no framework
  * README.md — run instructions (sgl-omni serve --enable-realtime,
    python -m http.server 8080)

No build step, no package.json. Matches existing playground/web/
conventions. Per house style: zero error handling — status pills
update on state changes, exceptions bubble to the browser console.

The protocol surface (server VAD, manual commit, multi-utterance
queue) is now demoable end-to-end in a browser. Scope of the PR
unchanged; this is documentation/UX sugar on top of what already
shipped.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Per the user-installed frontend-design skill: commit to a bold,
context-specific aesthetic instead of generic dark-corporate UI. The
realtime endpoint IS a wire service — voice in one end, transcript
out the other — so set it as one.

Aesthetic direction: editorial broadsheet
  * Cream paper background (#f4f0e6) with deep ink (#1a1814) and
    vermilion (#c5392b) press-red as the single accent
  * Display: Fraunces variable serif (italic, opsz=144) for masthead
    + section heads — extremely characterful, "Wire Service" reads as
    a real magazine title
  * Body: Newsreader (italic) for transcript prose with `::first-letter`
    drop caps in Fraunces vermilion — drop caps stream-update naturally
    as deltas arrive
  * Mono: JetBrains Mono for technical inputs (URLs, event types,
    timestamps, buttons that lean technical)
  * Numbered Roman section heads (i / ii / iii) in italic red
  * Centered fleuron (❦) divider between control sections
  * Masthead corners with VOL/EDITION metadata in mono caps
  * "ON THE WIRE / OFFLINE" live pill in the transcript header,
    pulsing dot
  * Live oscilloscope rendered to canvas in vermilion ink on a faint
    grid (replaces the old <meter> VU bar)
  * In-progress utterances get a vermilion left rule with a slow
    pulse stripe; completed turns get a thin ink rule
  * Animations: staggered page-load reveal (masthead → controls →
    transcripts → wire feed) via animation-delay; respects
    prefers-reduced-motion

Files:
  * index.html (173 lines) — semantic editorial markup
  * styles.css (~720 lines after fleuron cleanup) — variable Fraunces,
    Newsreader, JetBrains Mono via Google Fonts
  * app.js (422 lines) — same client logic as before; DOM building
    updated for numbered serials, drop-cap-friendly p.utterance-body,
    canvas-based oscilloscope (FFT analyser at 1024)
  * preview.png — screenshot of the populated state for the README

Per house style: zero error handling on the page. WS / mic / fetch
errors update the status pill and surface in the browser console.
The frontend-design skill explicitly warns against AI-slop aesthetics
(Inter/Roboto, purple gradients on white) — none used here.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…shot

The previous preview.png was a JS-mocked DOM populated with sample
utterances. This one is the actual rendered state after running the
demo end-to-end on GPU:

  * sglang-omni server (Qwen3-Omni-30B-A3B, 2x H200 TP) on :8765
  * playground/web/realtime/ served via python -m http.server :8090
  * headless Chromium with --use-file-for-fake-audio-capture=
    /tmp/realtime_fixture.wav driving a real audio stream into
    getUserMedia
  * UI: connect → start mic → 12s of audio → manual commit
  * server emitted transcription.delta + .completed for the audio,
    page rendered '№ 001 — A man is talking about a video.' with the
    full Wire Service treatment (drop cap, italic body, oscilloscope
    showing live waveform, Wire Feed populated with real protocol
    traffic)

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@PopSoda2002
Copy link
Copy Markdown
Collaborator Author

Ref: SGlang omni: sgl-project/sglang#22848, have a reproducible CI

@PopSoda2002
Copy link
Copy Markdown
Collaborator Author

realtime, trained, qwen3 omni, encoder, bidirectional, causcal. voxtral realtime https://huggingface.co/mistralai/Voxtral-Mini-4B-Realtime-2602

@PopSoda2002 PopSoda2002 marked this pull request as ready for review May 8, 2026 04:03
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put these data in the Py files. Do not make a separate JSON file.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Baseline JSON has only the manual block — even though the PR title bills server VAD as a core feature. Combined with [P0-BLOCKER] #1, my read is VAD mode hung on the second segment during benchmarking, so only the manual numbers shipped. CI gating off this baseline gives the VAD path zero coverage. Also n=5 with quantiles(vals, n=20)[-1] extrapolates p95 = 303 ms above max = 285 ms — statistically an artifact, visually wrong. Bump the fallback to len >= 20, or fall back to max for n < 20; or switch to numpy.percentile(vals, 95).

Comment on lines +111 to +117
def reset(self) -> None:
self.leftover_pcm.clear()
self.is_speech = False
self.silence_run_samples = 0
self.last_speech_offset = self.samples_consumed
if hasattr(self.vad_model, "reset_states"):
self.vad_model.reset_states() # type: ignore[union-attr]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StreamingVAD.reset() deliberately does not zero self.samples_consumed: line 111-117 only clears leftover_pcm / is_speech / silence_run_samples and sets last_speech_offset = self.samples_consumed (line 115), not 0. But handle_vad_emit treats emit.sample_offset as a byte offset inside the current audio_buffer: vad_byte = max(0, emit.sample_offset * 2) (line 220), then utterance_start_byte = min(vad_byte, audio_buffer.num_bytes) (line 221). The two coordinate systems happen to agree on the first utterance; after auto_commit_utterance → drop_buffer_and_reset_vad (session.py:295) the audio_buffer is cleared while vad.samples_consumed stays. The next emit's sample_offset is now an absolute counter (much larger than the fresh buffer's num_bytes), min(...) clamps to num_bytes, auto_commit_utterance sees start_byte == end_byte (line 285) and returns early — the whole utterance is silently dropped. The client receives speech_started / speech_stopped but never the matching committed / transcription.completed. The wall-clock timestamp_ms = offsets_to_ms(self.buffer_origin_samples + emit.sample_offset) (line 217) also double-counts and drifts by ~2x.

Fix:

# vad.py
def reset(self) -> None:
    self.leftover_pcm.clear()
    self.is_speech = False
    self.silence_run_samples = 0
    self.samples_consumed = 0          # zero the absolute counter
    self.last_speech_offset = 0        # always 0 after reset
    if hasattr(self.vad_model, "reset_states"):
        self.vad_model.reset_states()  # type: ignore[union-attr]

Add a unit test: run two back-to-back utterances with the same StreamingVAD, and assert that after SPEECH_STOPPED of the second utterance, auto_commit_utterance switches to the correct buffer slice and the client receives the corresponding committed event.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

grep -rn '_send_error\|make_event("error"' sglang_omni_v1/serve/realtime/ returns 0 hits — there is zero structured-error emission in the implementation. Every external-boundary guard is a bare assert:

  • session.py:144-145: payload = json.loads(raw); assert isinstance(payload, dict)
  • session.py:150-152: assert event_type in SUPPORTED_CLIENT_EVENT_TYPES
  • session.py:166-176: input_audio_format / audio modality / semantic_vad asserts
  • session.py:204-206: assert err != "overflow"
  • session.py:303: assert not self.audio_buffer.is_empty()
  • session.py:344-346: response-already-active assert
  • session.py:353: audio-output-modality assert

But tests/ contains 6 assertions of the form evt["type"] == "error" + a specific error.code (table above). Each trigger goes assert → AssertionError → ASGI 1006 → TestClient sees WebSocketDisconnect → the test's evt["type"] == "error" assertion fails. Six tests can never pass against the current implementation. The test_websocket_invalid_json_kills_session test (smoke.py:314) is actually the correct one — it explicitly pytest.raises(JSONDecodeError). The other six are the contradiction.

Fix: Add the _send_error helper at the implementation layer, refactor all external boundary guards to emit structured error events followed by graceful shutdown. This complies with the OpenAI Realtime protocol contract and directly resolves the below [P0-BLOCKER] issue regarding buffer overflow DoS vulnerability.

async def _send_error(self, code: str, message: str, event_id: str | None = None) -> None:
    await self.send(make_event(
        "error",
        error={
            "type": "invalid_request_error",
            "code": code,
            "message": message,
            "event_id": event_id,
            "param": None,
        },
    ))

# handle_audio_append:
if err == "overflow":
    await self._send_error(
        "input_audio_buffer_too_large",
        f"buffer would exceed {self.audio_buffer.max_bytes} bytes",
    )
    await self.websocket.close(code=1009)
    self.closed = True
    return

# handle_session_update / dispatch / handle_audio_commit / handle_response_create
# Apply the same refactoring: replace each assert with the corresponding _send_error + close logic.

Either implement the error event at the implementation layer, or modify the tests to use pytest.raises(WebSocketDisconnect) — yet the former is the correct approach aligned with the protocol specification.

def previous_item_id(self) -> str | None:
return self.conversation[-1].item_id if self.conversation else None

async def teardown(self) -> None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File: sglang_omni_v1/serve/openai_api.py:469-476sglang_omni_v1/serve/realtime/session.py:571-592sglang_omni_v1/serve/realtime/manager.py:37-39

The WebSocket route's finally block only calls manager.close(session.session_id) (manager.py:37-39 just pops from a dict). RealtimeSession.teardown() at session.py:571-592 — which does Client.abort(active_request_id), cancels active_task, cancels queue_drainer, closes the websocket — is dead code. After client disconnect the drainer coroutine is still parked on await self.transcription_queue.get(), active_task is still streaming, no Client.abort() is issued — the pipeline keeps the in-flight request running to completion. One leaked GPU inference + one leaked asyncio task per unexpected disconnect. Under sustained load this is leak-to-OOM.

Fix:

@app.websocket("/v1/realtime")
async def realtime(websocket: WebSocket) -> None:
    await websocket.accept()
    session = manager.open(websocket)
    try:
        await session.run()
    finally:
        await session.teardown()           # MUST be awaited
        manager.close(session.session_id)

def _boom():
raise ImportError("simulated missing silero-vad")

monkeypatch.setattr(vad_module, "load_silero_model", _boom)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

File: tests/test_v1_realtime_hardening.py:81, 124, 128, 131, 212sglang_omni_v1/serve/realtime/vad.py:8, 50

All three monkeypatch.setattr(vad_module, "load_silero_model", ...) calls reference a non-existent attribute. vad.py:8 imports load_silero_vad (not load_silero_model); vad.py:50 calls load_silero_vad(onnx=True) directly — the module has no load_silero_model name. monkeypatch.setattr raises AttributeError on a missing attr, so all three tests fail at setup. Likewise a.model is not b.model (line 128) and constructed == [a.model, b.model] (line 131) reference model, but the implementation uses vad_model. test_session_update_propagates_vad_load_failure / test_streaming_vad_constructs_a_distinct_model_per_instance / test_manual_commit_resets_vad_and_origin either crash at import or silently skip in CI.

Fix: Expose the import at the module level with a unified alias in vad.py, and rename the attribute vad_model to model:

# vad.py
from silero_vad import load_silero_vad as load_silero_model

class StreamingVAD:
    def __init__(self, config: VADConfig | None = None) -> None:
        ...
        self.model = load_silero_model()

This allows direct patching in tests. This fix also naturally enables the "silero ORT singletonization" mentioned in [P1-PERF] below — the model parameter can be injected externally, and after singletonization, Test #2 should be revised to "validate the per-process model is shared".

assert payload is not None, "Audio buffer became empty before commit"
await self.commit_user_audio_item(payload)

async def handle_item_create(self, event: ConversationItemCreate) -> None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handle_item_create silently drops input_audio content parts

Lines 314-318 keep only input_text and text; the comment claims audio attachments belong on input_audio_buffer.* but the OpenAI spec also lets conversation.item.create carry input_audio (base64 PCM) inline. M0 may legitimately defer, but the current impl silently discards without error — clients never know. Per house style: emit a structured error event when input_audio content is present.

)
if chunk.finish_reason is not None:
finish_reason = chunk.finish_reason
usage = chunk.usage.to_dict() if chunk.usage else None
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assumes the Usage type has a to_dict method. If CompletionStreamChunk.usage ever migrates to e.g. dataclasses.asdict, this raises AttributeError. Tighten to usage = dataclasses.asdict(chunk.usage) if chunk.usage else None, or formalize a Usage protocol.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove smoke test

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both VAD tests skip when /tmp/realtime_fixture.wav is absent (lines 96-100 and 205-207). The PR doesn't document how to generate it, so in CI both tests skip silently — the only two real end-to-end VAD tests are de facto absent from the CI matrix.

)


def test_drop_buffer_and_reset_vad_helper_unifies_paths() -> None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Counting source-code substring occurrences as "the helper is called from ≥ 3 sites" is fragile — any refactor breaks the test without a real correctness regression. Replace with a behavioral test: drive manual commit + auto commit + clear through a real session, assert all three paths leave the VAD in the post-reset state. This naturally merges with the regression test for [P0-BLOCKER] #1.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants