[S2-v1]: Add S2-Pro Streaming Vocoder#374
Conversation
af02862 to
940f53f
Compare
dae1edb to
ee69f40
Compare
3cacc75 to
6881ec1
Compare
|
Among the concurrent S2-Pro changes I have reviewed, this PR has the most appropriate direction. All 17 files are placed under |
The goal: use CI together with full-suite benchmark results to rigorously guarantee that both streaming and non-streaming hit good precision with CUDA Graph enabled. Going forward, both streaming and non-streaming will default to CUDA Graph on. I'm actually not certain — on the surface this PR looks CUDA-Graph-friendly, but have you actually turned CUDA Graph on and run it? Worth checking with Xuesong for his take. Default to enabling it wherever possible. |
zhaochenyang20
left a comment
There was a problem hiding this comment.
New sampling path lacks alignment verification across PR #279, CUDA Graph, and streaming
The PR replaces the entire _decode_codebooks body — old softmax(biased_logits / 0.7) → multinomial is gone, new path is RAS + rep penalty + top-k + top-p with new GPU buffers (_sampling_*, _prev_tokens, _ras_*) and an implicit graph-capture contract (_graph_top_k = 30). The code reads graph-friendly (no .item(), no Python control flow on GPU values, fixed shapes pinned at attach()), but graph-friendly on paper is not the same as exercised under capture, and it does not automatically inherit Xuesong's PR #279 ("Align CUDA graph sampling with non-graph path") alignment guarantees.
Precision must hold along three axes simultaneously; failing any one collapses the streaming pipeline:
-
graph vs non-graph (PR #279's domain): #279 only aligned the old
softmax / multinomialpath. This PR rewrites it, so #279's invariants don't carry over automatically — in-place writes liketop_p_mask[..., 0] = Falseand host→device transfers liketorch.tensor(recent, ...).to(device)can diverge between capture and eager mode. Confirm with Xuesong directly whether the new sampling path still upholds #279's contract. -
streaming vs non-streaming: Same sampling code, same codec weights, identical prompt — concatenated streaming audio must be byte-equal to non-streaming audio outside crossfade regions. The byte-equal CI in Section 2's P4 is the test for this, but it only has meaning once sampling drift is excluded — otherwise a byte-equal failure can't be attributed to either sampling or vocoder math.
-
CUDA Graph default-on: Going forward, both streaming and non-streaming will default to CUDA Graph on. The PR description shows non-streaming WER 0.89%, streaming WER 2.06% (over threshold), latency numbers — but no data with CUDA Graph on. Looking graph-safe is not enough; Ratish must actually run it with CUDA Graph on, on the full TTS suite, for both streaming and non-streaming, and post the results.
Minimum unblock loop:
- (a) Ratish runs the 2 CUDA-Graph-on combinations of the 4-way matrix (streaming on × graph on, streaming off × graph on) on the full FishAudio S2-Pro TTS suite — not the 16-sample streaming subset currently in CI. Post results in PR description.
- (b) Hash the codec token sequence (or logits) emitted under graph-on vs graph-off and verify #279's alignment still holds. If it doesn't, port #279's fix forward into the new path.
- (c) Tag Xuesong in the PR for sign-off on the sampling rewrite; tag Yifei for CI threshold recalibration after the streaming sample-count alignment (see P2 below).
zhaochenyang20
left a comment
There was a problem hiding this comment.
The PR description mentions "Streaming consistency ERROR: throughput below threshold" without naming the magnitude. H20 CI numbers: streaming, concurrency=8, tok_per_s_agg drops from baseline 50.8 to 20.8 — a >2× regression. With the goal that streaming defaults on in production, this magnitude of perf regression cannot merge.
My diagnosis: vocoder co-locates with tts_engine on a single GPU and should default to CUDA IPC zero-copy; but the PR's stream_transport={"vocoder": "relay"} forces the vocoder path through relay (SHM serialize → ZMQ → SHM deserialize). Each codec-token chunk (shape [11, 1], dozens of bytes) goes through that pipeline; per-chunk overhead is amplified 8× under concurrency=8 with frequent small chunks, eating throughput. Same-GPU short chunks via relay is an anti-pattern that fights the very shape of streaming workloads (frequent small chunks).
Before merge, two pieces of profiling data:
- (a) Per-chunk transport breakdown: with
torch.profileror zmq+shm instrumentation, measure where each codec chunk spends its transport time. Expectation: same-GPU IPC < 100us; forced relay 1ms+ minimum. - (b) Re-benchmark after fixing the P1-MAINTAIN above: if the framework correctly selects transport (small chunks same-GPU still through IPC, or short-chunk threshold so vocoder doesn't go through relay),
tok/s_aggshould snap back to ≈50.8. If it stays at 20.8, the vocoder itself has a perf bug to localise (e.g. deque contention inside_handle_new_request_batch, O(N) deque rebuild inside_clear_request_state).
If (b) doesn't close the regression, the PR can't merge — streaming-default-on with throughput halved is not production-ready.
zhaochenyang20
left a comment
There was a problem hiding this comment.
My stance: starting from this PR, streaming lives only in V1; no parallel V0 streaming implementation may remain. Otherwise every bug fix has to be done twice and "which path is canonical?" becomes unanswerable. Today the repo has sglang_omni/models/fishaudio_s2_pro/pipeline/streaming_vocoder.py (V0) and the new sglang_omni_v1/models/fishaudio_s2_pro/streaming_vocoder.py (V1) as near-line-by-line duplicates — build_stream_vocoder_chunk / flush_stream_vocoder_chunk / _apply_stream_crossfade / trim_retained_stream_codes / resolve_stream_overlap_tokens are five pure functions implemented identically on both sides; the only difference is where _StreamVocoderState lives (V0 serialises into payload.data, V1 keeps it in a scheduler dict). That's not a reason to keep two copies.
Recommended merge sequence:
- This PR: leave V0 streaming alone, but explicitly add to the PR description "after this lands, V0 streaming will be removed in PR #XXX, owner: Ratish" so the follow-up has a named owner and action.
- Immediate follow-up PR (within 1–2 days of this merging): delete
sglang_omni/models/fishaudio_s2_pro/pipeline/streaming_vocoder.py, redirect any remaining V0 callers to V1 — or, preferred, retire V0's streaming entry point entirely in line with V1-First. - After that, streaming exists in exactly one place; any streaming bug is fixed in one file.
I've emphasised V1-First repeatedly. This PR did the easy half right ("new code only in V1"); the harder half — "old code must go" — must close in the same review cycle, otherwise V0 streaming becomes unmaintained zombie code.
| out.metadata, | ||
| ) | ||
| else: | ||
| await self._send_stream_to_coordinator(out) |
There was a problem hiding this comment.
New streaming model bypasses Design Doc's DataReadyMessage contract — owner-level decision required before merge
https://njpxzlfqwrqq.jp.larksuite.com/wiki/VTcEwGVEuihm6xkquEdjc424pNb
After we have all settled down an agreement on the new decision, we should update the design docs also.
There was a problem hiding this comment.
V1 Design Doc defines a unified streaming model in the Inter-Stage Communication section: "Streaming: hidden states / codec codes flow via DataReadyMessage with chunk_id and is_done fields, parallel to normal result routing." Coordinator's responsibility is "Collects completions from terminal stages" — completions, not stream chunks. This PR adds a fallback in Stage._drain_outbox_external: when a terminal stage has no _stream_targets, it bypasses DataReadyMessage and calls control_plane.send_stream(StreamMessage(...)) direct to the Coordinator. This silently adds a second streaming path alongside the Design Doc's stated one. There is no StreamMessage type in the Design Doc's Message Types listing, and no "terminal-stage stream forwarding to coordinator" in the Coordinator's responsibility list.
This is not a local code decision — it changes V1's overall streaming model. Every future model author has to choose between the two paths, and anyone reading the Design Doc to understand V1 will trip over the hidden one. Owner must pick:
- Option (a) promote it to a first-class Design Doc concept: add a Streaming-section entry "Terminal-stage stream forwarding: when a terminal stage emits stream chunks, they flow through
control_plane.send_stream(StreamMessage)direct to the Coordinator, which forwards to HTTP"; add to the Coordinator responsibility list "Forwards stream chunks from terminal stages to clients"; addStreamMessageto Message Types. - Option (b) rework PR to use DataReadyMessage: introduce a lightweight "client output collector" stage downstream of the vocoder that receives stream chunks via DataReadyMessage and hands them to the Coordinator. Preserves streaming-model uniqueness at the cost of an extra hop.
My lean is (a) — terminal-stage SSE forwarding direct-to-coordinator is cleanest for HTTP/SSE; an extra stage purely for design uniformity is design-for-design's-sake. But this is Chenyang's call as Design Doc owner, and whichever way it lands, the Design Doc must be edited in the same review cycle so code and doc don't diverge the moment this lands.
There was a problem hiding this comment.
I went with option (a). The stage-to-stage streaming path remains DataReadyMessage for intermediate model data such as hidden states and codec codes. The direct StreamMessage path is only for terminal-stage client output: once the vocoder has decoded codec tokens into audio bytes, the payload is no longer inter-stage tensor state, it is already the HTTP/SSE artifact that the Coordinator needs to forward to the client. Adding a collector stage after the vocoder would preserve a single mechanism mechanically, but it would add another scheduler hop only to hand terminal client output back to the Coordinator.
zhaochenyang20
left a comment
There was a problem hiding this comment.
tests/test_model/test_s2pro_tts_ci.py:80 (STREAMING_BENCHMARK_MAX_SAMPLES = 16)
Today's CI is asymmetric: non-streaming WER runs the full set, streaming WER runs only 16 samples. WER standard deviation at N=16 is roughly √(N_full / 16) times the full-set standard deviation, which means the 1.2% threshold at 16 samples has almost no discriminative power — a real 10% regression can pass because the sampled examples happen to be insensitive, and a harmless sampling jitter can push WER to 2%+ (this PR's measured 2.06% has exactly this signal-to-noise problem). Align streaming WER to the same full sample set as non-streaming. The threshold then needs recalibration; please coordinate with Yifei.
If CI duration is the concern, gate the 16-sample run behind a fast-CI flag, but the default and nightly runs must use the full set so WER actually serves as a regression net.
zhaochenyang20
left a comment
There was a problem hiding this comment.
Streaming-side failure semantics undefined — what does the client see when vocoder errors mid-stream?
On the RFC I noted streaming responses can't fail via HTTP 500 — the HTTP header flushes before the first chunk, so the status code is unchangeable. Streaming failure signalling must be: successful streams end with an explicit completion sentinel; failed streams abort the connection before the sentinel appears. Clients detect failure as "sentinel absent + connection closed prematurely."
This PR has fault tolerance in S2ProVocoderScheduler.start():
except Exception as exc:
logger.exception(...)
self.outbox.put(OutgoingMessage(request_id=msg.request_id, type="error", data=exc))
self.abort(msg.request_id)But the PR description and tests don't pin down what type="error" means in a streaming context. Expected: when a terminal stage sees type="error" in _drain_outbox_external for a request that is currently streaming, it must emit no sentinel and abort the SSE connection / cancel the client async generator. Current code routes type="error" through _route_result (alongside results), with no streaming-specific path documented.
Not strictly a blocker for this PR — abort cleanup is in place — but pinning streaming failure semantics down here is cheap:
- Document in
S2ProVocoderScheduler/ runtime.py that streaming errors manifest as "abort connection without sentinel". - Add a unit test parallel to
test_streaming_vocoder_abort_cleans_state_and_suppresses_final: make_on_chunkraise; assert outbox hastype="error"and does not have any subsequent stream chunk or result. - In the terminal stage, when handling
type="error"for a streaming request, take the abort-connection path (signal coordinator to drop the SSE connection).
zhaochenyang20
left a comment
There was a problem hiding this comment.
Derived from previous comment. "_FakeCodec is fine for scheduler-state-machine unit tests, but you also need at least one end-to-end CI that runs the real codec and asserts the concatenated streaming audio is byte-equal to the non-streaming audio (allowing minor differences inside crossfade regions)." This round I audited every layer of streaming coverage — none of them protects that claim:
Layer 1: the 7 unit tests in tests/test_v1_fish_streaming_vocoder.py all use _FakeCodec. Its from_indices returns arange(tokens * frame_length) + (row + 1) — output depends only on the number of tokens, not on token values, and has no causal context (delay = 0). So even if _build_stream_vocoder_chunk's stream_overlap_tokens is silently ignored, window_offset is off-by-one, or audio_tensor[overlap_samples:] slices wrong, the tests pass as long as total length matches. _FakeCodec protects the state machine by design but structurally cannot protect chunked-vocoding math.
Layer 2: assert_streaming_consistency (tests/utils.py:299), used by test_voice_cloning_streaming_consistency and test_s2pro_streaming_consistency_from_artifacts. It checks three things only: exact prompt_tokens match, total/median completion_tokens within rtol, total audio_duration_s within rtol. Aggregate level, not sample level. A bug that shifts/drops/duplicates samples while preserving total length passes silently.
Layer 3: test_voice_cloning_streaming_wer. Threshold 1.2%, and this PR already failed at 2.06%. That's exactly the "streaming rotted, can't distinguish vocoder math from upstream sampling" state (c) was meant to prevent. A byte-equal test answers it precisely: if streaming concat ≈ non-streaming on non-crossfade regions, the WER regression is upstream of vocoding (sampling stochasticity / graph drift); if it isn't, chunked-vocoding has a math bug.
The fix bundles "real codec + byte-equal + 4-way matrix" together, shipped jointly with the graph-alignment verification.
|
|
||
| scheduler = FishScheduler.__new__(FishScheduler) | ||
| scheduler.outbox = queue.Queue() | ||
| scheduler._aborted_request_ids = set() |
There was a problem hiding this comment.
_aborted_request_ids: set[str] (init at line 323) only ever has items added (add at line 357); nothing removes them. A 24-hour server with 1000 aborts ends up holding 1000 stale UUID strings. The total memory is modest, but the pattern is wrong: an unbounded process-lifetime set with no eviction.
Fix: when _clear_request_state(request_id) finishes, also discard the id from _aborted_request_ids after some grace period — or just discard on the next _clear_request_state call for the same id, since by that time the upstream scheduler has already finalized the request and no further messages will arrive with that id.
Fix:
def abort(self, request_id: str) -> None:
self._aborted_request_ids.add(request_id)
self._clear_request_state(request_id)
def _clear_request_state(self, request_id: str) -> None:
self._payloads.pop(request_id, None)
self._stream_states.pop(request_id, None)
self._pending_done.discard(request_id)
self._aborted_request_ids.discard(request_id) # NEW: bound the set
self._pending_messages = collections.deque(...)There was a problem hiding this comment.
Addressed this with bounded abort-id retention instead of immediate discard, because late queued stream messages must still be suppressed after abort.
PopSoda2002
left a comment
There was a problem hiding this comment.
Hi @Ratish1 Thanks for your great work, can I ask if there any flag we need to open to enable streaming out?
No extra server-side flag is needed in this PR. For S2-Pro, streaming out is enabled per request with |
RFC: V1 Output Streaming for FishAudio S2-Pro
Motivation
FishAudio S2-Pro V1 currently has a staged TTS pipeline:
Without output-side streaming, the autoregressive generator must finish all codec tokens before the vocoder can decode audio. This increases time-to-first-audio and leaves the V1 S2-Pro path behind the expected streaming behavior of the speech API.
This RFC proposes the current PR design: native V1 output streaming for FishAudio S2-Pro. The autoregressive stage emits incremental codec-token chunks to the vocoder, the vocoder decodes audio chunks as enough tokens accumulate, and the terminal vocoder stage forwards audio chunks to the coordinator/client before the final full result.
Scope
Included:
/v1/audio/speechrequests withstream=true.tts_enginetovocoder.vocoderto coordinator/client.Not included:
Input-side streaming and a general streaming-in/streaming-out architecture should be covered by a broader follow-up RFC with further discussion.
Design
The design keeps model topology, transport, and output delivery as separate responsibilities.
S2ProPipelineConfigdeclares topology:tts_enginehasstream_to=["vocoder"].FishS2ProModelRunnerowns per-decode-step codec-token extraction from model buffers.FishSchedulerdecides whether a request is streaming and emits codec-token chunks only forstream=true.Stageowns stream routing between stages and terminal stream forwarding to the coordinator.relay_ioowns stream tensor transfer through the existing relay path.S2ProVocoderSchedulerowns streaming vocoder state and final audio result construction.Coordinatorand the OpenAI-compatible speech endpoint own client-visible stream delivery.The request parameter
stream=trueis the runtime source of truth. There is no model-specific stream transport knob.flowchart LR A[HTTP /v1/audio/speech stream=true] --> B[Coordinator] B --> C[preprocessing] C --> D[tts_engine / FishScheduler] D -- codec chunks via DataReadyMessage --> E[vocoder / S2ProVocoderScheduler] E -- audio chunks via StreamMessage --> B E -- final full result via CompleteMessage --> B B --> F[SSE audio chunks + final event]Runtime Flow
StagePayloadtopreprocessing.preprocessingbuilds S2-Pro request state and routes it totts_engine.tts_engineruns the SGLang-backed Fish scheduler and model runner.latest_stream_code_chunk.FishScheduleremitsOutgoingMessage(type="stream", target="vocoder")only whenpayload.request.params["stream"]is true.Stageconverts targeted stream output into aDataReadyMessagewithchunk_idand relay metadata.StreamItemintoS2ProVocoderScheduler.S2ProVocoderSchedulerbuffers codec chunks, decodes audio when the configured cadence is reached, and emits terminal audio stream chunks.StreamMessage.tts_enginefinishes,Stage._route_resultsends stream done tovocoder, then routes the final payload.CompleteMessage.Codec-Token Semantics
S2-Pro generation produces one semantic token plus multiple audio codebook tokens per step. The streaming path must not send the terminal semantic token to the vocoder.
The model runner therefore:
<|im_end|>;output_codes;latest_stream_code_chunkonly for valid codec frames.This keeps the streaming path aligned with the final non-streaming code path and prevents the terminal token from becoming an audible vocoder frame.
Stream Transport
Stage-to-stage stream chunks use the existing relay path:
relay_io.write_blob;DataReadyMessage(chunk_id=...);shm_metadata["chunk_metadata"];relay_io.read_blob.The control message is sent before waiting for relay completion. This matches the existing NIXL credit pattern: the receiver must start reading before the sender waits, otherwise the transfer can deadlock.
CUDA IPC is intentionally not part of this PR. A same-GPU CUDA IPC attempt produced invalid-device-context failures in profiling, which indicates that the current V1 stream path does not yet own the CUDA IPC lifecycle. Correct CUDA IPC support would need framework-owned sender/receiver CUDA context setup, source tensor lifetime, IPC handle cleanup, abort cleanup, and fallback behavior. That belongs in the broader streaming transport RFC, not in the S2-Pro model implementation.
Terminal Stream Delivery
The current PR uses direct terminal-stage forwarding:
DataReadyMessage;StreamMessagefrom the terminal stage to the coordinator;CompleteMessage.This keeps the HTTP/SSE path simple: the terminal stage already owns user-visible audio chunks, so it forwards those chunks directly to the coordinator stream queue. An alternative is to add a downstream "client output collector" stage and force terminal audio chunks through another
DataReadyMessagehop. That preserves a single inter-stage streaming shape but adds a stage whose only job is to hand terminal chunks back to the coordinator.Recommended decision for this PR: make terminal-stage
StreamMessageforwarding a first-class V1 streaming concept. It is the more direct design for client-visible output streaming and avoids adding a collector stage for S2-Pro.Vocoder Semantics
S2ProVocoderSchedulerhandles both streaming and non-streaming requests.For streaming requests:
new_requeststores the final payload context and opens per-request stream state;stream_chunkappends codec tokens and decodes whenstream_strideorstream_followup_strideis reached;stream_doneflushes the pending tail before final completion;output_codespayload.For non-streaming requests:
Failure And Abort Semantics
The PR keeps request-local cleanup at the scheduler/stage boundary:
Done-before-payload ordering is supported because stream chunks and stream done can arrive before the vocoder has received the final payload. The vocoder records pending done and finalizes once the payload arrives.
Validation
Unit and docs coverage:
pytest tests/test_v1_fish_streaming_vocoder.py tests/test_v1_stage_terminal_stream.py -v -s -xpytest tests/test_stream_queue.py -v -s -xpytest tests/docs/s2pro/test_docs_tts_s2pro.py -v -s -xFull SeedTTS EN streaming validation, concurrency 16:
The graph-off run is not the target serving configuration for S2-Pro V1. It confirms request completion and streaming accuracy, while graph-on remains the production/default performance path.
Stream Profiling
Profiling run: S2-Pro V1 streaming, concurrency 8, 50 measured requests plus 1 benchmark warmup request. The benchmark completed with 50/50 measured requests and 0 failures while stream profiling was enabled.
Profiler artifact:
stream_after.jsonl, 16,351 JSONL events, 0 parse errors.stream_send_chunkstream_receive_chunkvocoder_scheduler_stream_chunkvocoder_stream_chunkvocoder_stream_flushPer-chunk transport timing from the profiling run: