[Feature][TTS] Streaming Text Input for Qwen3-TTS via WebSocket#1230
[Feature][TTS] Streaming Text Input for Qwen3-TTS via WebSocket#1230lishunyang12 wants to merge 2 commits intovllm-project:mainfrom
Conversation
Signed-off-by: lishunyang <lishunyang12@163.com>
Signed-off-by: lishunyang <lishunyang12@163.com>
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 3a53c3b43c
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| audio_bytes, _ = await self._speech_service._generate_audio_bytes( | ||
| request | ||
| ) |
There was a problem hiding this comment.
Validate requested model before websocket generation
The websocket path never runs _check_model, so session.config.model is effectively ignored and audio is generated with whatever model is currently served. In contrast, the REST /v1/audio/speech path validates model selection first, so clients relying on model-specific behavior can get silent misrouting instead of an explicit error when they pass an unknown or mismatched model. Please add the same model check in the streaming flow before calling generation.
Useful? React with 👍 / 👎.
| _SENTENCE_BOUNDARY_RE = re.compile( | ||
| r"(?<=[.!?])\s+" # English punctuation followed by whitespace |
There was a problem hiding this comment.
Split on sentence-ending punctuation at chunk end
The English boundary regex requires trailing whitespace ((?<=[.!?])\s+), so a chunk like "Hello world." is not emitted until another message adds whitespace or input.done is sent. That adds avoidable latency in real-time streaming and contradicts the stated behavior of splitting at punctuation when a sentence is complete. Consider treating end-of-buffer punctuation as a boundary as well.
Useful? React with 👍 / 👎.
|
@linyueqian would this help ? |
this is in the roadmap and should be nice to have especially for real world real-time scenario. i will take a look at it later today. |
There was a problem hiding this comment.
Pull request overview
This PR adds streaming text input support for Qwen3-TTS via a WebSocket endpoint /v1/audio/speech/stream. The implementation enables real-time text-to-speech workflows where text arrives incrementally (e.g., from STT, LLM streaming) and audio is generated on a per-sentence basis rather than waiting for complete input. This is distinct from PR #1189 which adds streaming audio output - this PR focuses solely on streaming text input.
Changes:
- Implements WebSocket-based incremental text input with automatic sentence boundary detection for English and CJK languages
- Refactors existing REST TTS endpoint to extract core generation logic into a reusable
_generate_audio_bytesmethod - Adds comprehensive test coverage for sentence splitting logic and WebSocket session lifecycle
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
vllm_omni/entrypoints/openai/text_splitter.py |
New incremental sentence boundary detector supporting English (.!? + whitespace) and CJK fullwidth punctuation (。!?,;) with configurable min_sentence_length |
vllm_omni/entrypoints/openai/serving_speech_stream.py |
New WebSocket handler managing session lifecycle with config/idle timeouts (10s/30s), per-sentence audio generation, and error resilience |
vllm_omni/entrypoints/openai/serving_speech.py |
Refactored to extract _generate_audio_bytes(request) → (bytes, media_type) from create_speech() for reuse by WebSocket handler |
vllm_omni/entrypoints/openai/protocol/audio.py |
Added StreamingSpeechSessionConfig Pydantic model mirroring OpenAICreateSpeechRequest fields (minus input text) |
vllm_omni/entrypoints/openai/api_server.py |
Registered WebSocket route /v1/audio/speech/stream and initialized OmniStreamingSpeechHandler in app state |
examples/online_serving/qwen3_tts/streaming_speech_client.py |
Example Python client with STT simulation mode, per-sentence audio file saving, and support for all task types |
examples/online_serving/qwen3_tts/README.md |
Comprehensive documentation of WebSocket protocol, session parameters, and sentence detection rules |
tests/entrypoints/openai_api/test_text_splitter.py |
20+ unit tests covering English/CJK/mixed splitting, incremental accumulation, flush behavior, and edge cases |
tests/entrypoints/openai_api/test_serving_speech_stream.py |
Integration tests for session lifecycle, multi-sentence handling, config validation, error scenarios, and generation failure recovery |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| max_new_tokens: int | None = None | ||
| ref_audio: str | None = None | ||
| ref_text: str | None = None | ||
| x_vector_only_mode: bool | None = None |
There was a problem hiding this comment.
StreamingSpeechSessionConfig lacks field validators for instructions length and max_new_tokens range that are present in the validation logic (_validate_tts_request). Consider adding @field_validator decorators to validate these constraints upfront when the session config is received, similar to how speed is validated with Field constraints. This would provide earlier feedback to clients instead of failing on the first sentence generation. For example: max length validator for instructions (500 chars), and range validator for max_new_tokens (1-4096).
| x_vector_only_mode: bool | None = None | |
| x_vector_only_mode: bool | None = None | |
| @field_validator("instructions") | |
| @classmethod | |
| def validate_instructions( | |
| cls, | |
| v: str | None, | |
| ) -> str | None: | |
| if v is not None and len(v) > 500: | |
| raise ValueError("instructions must be at most 500 characters long") | |
| return v | |
| @field_validator("max_new_tokens") | |
| @classmethod | |
| def validate_max_new_tokens( | |
| cls, | |
| v: int | None, | |
| ) -> int | None: | |
| if v is None: | |
| return v | |
| if not 1 <= v <= 4096: | |
| raise ValueError("max_new_tokens must be between 1 and 4096") | |
| return v |
| self._buffer += text | ||
| return self._extract_sentences() |
There was a problem hiding this comment.
The buffer can grow unbounded if text is sent continuously without sentence boundaries. Consider adding a maximum buffer size limit to prevent potential denial-of-service attacks or memory exhaustion. For example, if a client sends megabytes of text without punctuation, the buffer will continue to grow until the system runs out of memory. A reasonable limit (e.g., 10KB-100KB) would prevent this while still allowing for long sentences.
| if msg_type == "input.text": | ||
| text = msg.get("text", "") | ||
| sentences = splitter.add_text(text) |
There was a problem hiding this comment.
There is no validation on the length of individual text chunks or total accumulated text per session. A malicious client could send extremely large text chunks or accumulate unbounded text across multiple messages, potentially causing memory exhaustion or performance degradation. Consider adding limits such as: max text chunk size per message (e.g., 10KB), max total text per session (e.g., 100KB), or max number of sentences per session.
| if isinstance(message, bytes): | ||
| # Binary frame: audio data | ||
| filename = os.path.join( | ||
| output_dir, | ||
| f"sentence_{sentence_count:03d}.{response_format}", | ||
| ) | ||
| with open(filename, "wb") as f: | ||
| f.write(message) | ||
| print(f" Saved audio: {filename} ({len(message)} bytes)") | ||
| sentence_count += 1 |
There was a problem hiding this comment.
The sentence counter is incremented on receiving binary audio data, but the actual sentence index comes from the server in the audio.start message. This creates a potential mismatch if audio.start and binary frames arrive in different orders, or if generation fails for a sentence (where audio.done is still sent but no binary frame). Consider using msg['sentence_index'] from the audio.start message to name the file instead of a local counter.
| # - CJK fullwidth: 。!?,; | ||
| _SENTENCE_BOUNDARY_RE = re.compile( | ||
| r"(?<=[.!?])\s+" # English punctuation followed by whitespace | ||
| r"|(?<=[。!?,;])" # CJK fullwidth punctuation |
There was a problem hiding this comment.
Splitting on CJK commas (,) and semicolons (;) may result in very short fragments that don't constitute complete sentences. In Chinese, commas are often used within a single sentence to separate clauses, similar to English. Consider whether these should be treated as sentence boundaries or only split on the stronger punctuation marks (。!?). This could lead to choppy audio output with many small fragments. The tests show this behavior (e.g., "你好," as a sentence), but it may not align with typical TTS use cases where complete sentences are preferred for natural prosody.
| # - CJK fullwidth: 。!?,; | |
| _SENTENCE_BOUNDARY_RE = re.compile( | |
| r"(?<=[.!?])\s+" # English punctuation followed by whitespace | |
| r"|(?<=[。!?,;])" # CJK fullwidth punctuation | |
| # - CJK fullwidth: 。!? | |
| _SENTENCE_BOUNDARY_RE = re.compile( | |
| r"(?<=[.!?])\s+" # English punctuation followed by whitespace | |
| r"|(?<=[。!?])" # CJK fullwidth sentence-final punctuation |
| try: | ||
| await self._send_error(websocket, f"Internal error: {e}") | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
'except' clause does nothing but pass and there is no explanatory comment.
| pass | |
| logger.debug( | |
| "Failed to send error message to streaming speech client", | |
| exc_info=True, | |
| ) |
| sender_task.cancel() | ||
| try: | ||
| await sender_task | ||
| except asyncio.CancelledError: |
There was a problem hiding this comment.
'except' clause does nothing but pass and there is no explanatory comment.
| except asyncio.CancelledError: | |
| except asyncio.CancelledError: | |
| # Task cancellation is expected during shutdown; safe to ignore. |
|
The streaming input approach makes sense for LLM→TTS pipelines. One suggestion: the buffering/split granularity should be configurable via Also, how does this relate to PR #986 which adds engine-level |
|
Hi, if you add a markdown document under ./examples/*, please also run mkdocs serve to sync those editions to ./docs/ before merging this PR. And also please add markers to your test functions, I think there are two markers needed for your file import pytest
pytestmark = [pytest.mark.core_model, pytest.mark.cpu]You can also check the markers usage under this #577. |
Summary
Add a WebSocket endpoint
/v1/audio/speech/streamthat accepts text input incrementally (e.g., from a real-time STT pipeline), buffers and splits at sentence boundaries, and generates audio per sentence using the existing TTS pipeline.This enables real-time text-to-speech workflows where text is produced progressively (speech-to-text, LLM token streaming, live captions) and audio needs to be generated as soon as complete sentences are available, rather than waiting for the entire input.
Scope: Streaming text input only. Each sentence produces a complete audio response. Streaming audio output (chunked PCM) is tracked separately in PR #1189.
Motivation
The current
/v1/audio/speechREST endpoint requires the full text upfront. In real-time pipelines (e.g., STT → LLM → TTS), text arrives incrementally. Without streaming input support, clients must either:This PR solves both issues with a single WebSocket session that handles buffering, sentence detection, and per-sentence generation automatically.
WebSocket Protocol
Transport: WebSocket (industry standard — used by OpenAI Realtime API, ElevenLabs, Azure TTS)
Client → Server
Server → Client
Changes
New Files
vllm_omni/entrypoints/openai/text_splitter.pySentenceSplitter— incremental sentence boundary detector. Regex-based splitting at English.!?+ whitespace and CJK fullwidth。!?,;. Configurablemin_sentence_length(default 2, CJK-friendly).vllm_omni/entrypoints/openai/serving_speech_stream.pyOmniStreamingSpeechHandler— WebSocket session handler. Manages config validation, idle/config timeouts (30s/10s), per-sentence audio generation, and error resilience (one sentence failure doesn't kill the session).examples/online_serving/qwen3_tts/streaming_speech_client.py--simulate-sttmode (word-by-word with configurable delay), all 3 task types (CustomVoice, VoiceDesign, Base), saves per-sentence audio files.tests/entrypoints/openai_api/test_text_splitter.pySentenceSplitter: English/Chinese/mixed splitting, incremental accumulation, flush behavior, edge cases.tests/entrypoints/openai_api/test_serving_speech_stream.pyModified Files
vllm_omni/entrypoints/openai/serving_speech.py_generate_audio_bytes(request) → (bytes, media_type)fromcreate_speech(). The REST endpoint delegates to it; the WebSocket handler reuses it per sentence. No behavior change for existing callers.vllm_omni/entrypoints/openai/protocol/audio.pyStreamingSpeechSessionConfigPydantic model for WebSocket session configuration (mirrorsOpenAICreateSpeechRequestfields minusinput).vllm_omni/entrypoints/openai/api_server.py@router.websocket("/v1/audio/speech/stream")route andOmniStreamingSpeechHandlerinitialization inomni_init_app_state().examples/online_serving/qwen3_tts/README.mdDesign Decisions
min_sentence_length=2.) while supporting short CJK sentences like你好!(3 chars)._generate_audio_bytes()extractionResponse; WebSocket sends raw bytes. No code duplication.Test Plan
pytest tests/entrypoints/openai_api/test_text_splitter.py— sentence splitter unit testspytest tests/entrypoints/openai_api/test_serving_speech_stream.py— WebSocket integration testspytest tests/entrypoints/openai_api/test_serving_speech.py— existing REST endpoint tests (verify refactor is non-breaking)