Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 82 additions & 1 deletion examples/online_serving/qwen3_tts/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,90 @@ with open("output.wav", "wb") as f:
f.write(response.content)
```

## Streaming Text Input

The `/v1/audio/speech/stream` WebSocket endpoint accepts text incrementally (e.g., from a real-time STT pipeline), buffers and splits at sentence boundaries, and generates audio per sentence.

> **Note:** This is streaming *text input* only. Each sentence produces a complete audio response. For streaming audio output, see PR #1189.

### Quick Start

```bash
# Send full text (sentences are auto-detected)
python streaming_speech_client.py \
--text "Hello world. How are you? I am fine."

# Simulate STT: send text word-by-word
python streaming_speech_client.py \
--text "Hello world. How are you? I am fine." \
--simulate-stt --stt-delay 0.1

# VoiceDesign task
python streaming_speech_client.py \
--text "Today is a great day. The weather is nice." \
--task-type VoiceDesign \
--instructions "A cheerful young female voice"
```

### WebSocket Protocol

**Client -> Server:**

```jsonc
// 1. Session config (sent once first)
{"type": "session.config", "voice": "Vivian", "task_type": "CustomVoice", "language": "Auto"}

// 2. Text chunks (sent incrementally)
{"type": "input.text", "text": "Hello, how are you? "}

// 3. End of input (flushes remaining buffer)
{"type": "input.done"}
```

**Server -> Client:**

```jsonc
// Audio metadata (before binary frame)
{"type": "audio.start", "sentence_index": 0, "sentence_text": "Hello, how are you?", "format": "wav"}

// Binary WebSocket frame: raw audio bytes

// Per-sentence completion
{"type": "audio.done", "sentence_index": 0}

// Session complete
{"type": "session.done", "total_sentences": 3}

// Error (non-fatal, session continues)
{"type": "error", "message": "..."}
```

### Session Config Parameters

All parameters from the REST API are supported:

| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `voice` | string | None | Speaker voice name |
| `task_type` | string | None | CustomVoice, VoiceDesign, or Base |
| `language` | string | None | Language code |
| `instructions` | string | None | Voice style instructions |
| `response_format` | string | "wav" | Audio format per sentence |
| `speed` | float | 1.0 | Playback speed (0.25-4.0) |
| `max_new_tokens` | int | None | Max tokens per sentence |
| `ref_audio` | string | None | Reference audio (Base task) |
| `ref_text` | string | None | Reference text (Base task) |

### Sentence Detection

Text is automatically split at sentence boundaries:
- **English:** `.` `!` `?` followed by whitespace
- **CJK:** fullwidth punctuation `。` `!` `?` `,` `;`

If text never forms a complete sentence, it is flushed when `input.done` is sent.

## Limitations

- **No streaming**: Audio is generated completely before being returned. Streaming will be supported after the pipeline is disaggregated (see RFC #938).
- **Single request**: Batch processing is not yet optimized for online serving.

## Troubleshooting
Expand Down
234 changes: 234 additions & 0 deletions examples/online_serving/qwen3_tts/streaming_speech_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
"""WebSocket client for streaming text-input TTS.

Connects to the /v1/audio/speech/stream endpoint, sends text incrementally
(simulating real-time STT output), and saves per-sentence audio files.

Usage:
# Send full text at once
python streaming_speech_client.py --text "Hello world. How are you? I am fine."

# Simulate STT: send text word-by-word with delay
python streaming_speech_client.py \
--text "Hello world. How are you? I am fine." \
--simulate-stt --stt-delay 0.1

# VoiceDesign task
python streaming_speech_client.py \
--text "Today is a great day. The weather is nice." \
--task-type VoiceDesign \
--instructions "A cheerful young female voice"

# Base task (voice cloning)
python streaming_speech_client.py \
--text "Hello world. How are you?" \
--task-type Base \
--ref-audio /path/to/reference.wav \
--ref-text "Transcript of reference audio"

Requirements:
pip install websockets
"""

import argparse
import asyncio
import json
import os

try:
import websockets
except ImportError:
print("Please install websockets: pip install websockets")
raise SystemExit(1)


async def stream_tts(
url: str,
text: str,
config: dict,
output_dir: str,
simulate_stt: bool = False,
stt_delay: float = 0.1,
) -> None:
"""Connect to the streaming TTS endpoint and process audio responses."""
os.makedirs(output_dir, exist_ok=True)

async with websockets.connect(url) as ws:
# 1. Send session config
config_msg = {"type": "session.config", **config}
await ws.send(json.dumps(config_msg))
print(f"Sent session config: {config}")

# 2. Send text (either all at once or word-by-word)
async def send_text():
if simulate_stt:
words = text.split(" ")
for i, word in enumerate(words):
chunk = word + (" " if i < len(words) - 1 else "")
await ws.send(
json.dumps(
{
"type": "input.text",
"text": chunk,
}
)
)
print(f" Sent: {chunk!r}")
await asyncio.sleep(stt_delay)
else:
await ws.send(
json.dumps(
{
"type": "input.text",
"text": text,
}
)
)
print(f"Sent full text: {text!r}")

# 3. Signal end of input
await ws.send(json.dumps({"type": "input.done"}))
print("Sent input.done")

# Run sender and receiver concurrently
sender_task = asyncio.create_task(send_text())

response_format = config.get("response_format", "wav")
sentence_count = 0

try:
while True:
message = await ws.recv()

if isinstance(message, bytes):
# Binary frame: audio data
filename = os.path.join(
output_dir,
f"sentence_{sentence_count:03d}.{response_format}",
)
with open(filename, "wb") as f:
f.write(message)
print(f" Saved audio: {filename} ({len(message)} bytes)")
sentence_count += 1
Comment on lines +102 to +111
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sentence counter is incremented on receiving binary audio data, but the actual sentence index comes from the server in the audio.start message. This creates a potential mismatch if audio.start and binary frames arrive in different orders, or if generation fails for a sentence (where audio.done is still sent but no binary frame). Consider using msg['sentence_index'] from the audio.start message to name the file instead of a local counter.

Copilot uses AI. Check for mistakes.
else:
# JSON frame
msg = json.loads(message)
msg_type = msg.get("type")

if msg_type == "audio.start":
print(f" [sentence {msg['sentence_index']}] Generating: {msg['sentence_text']!r}")
elif msg_type == "audio.done":
print(f" [sentence {msg['sentence_index']}] Done")
elif msg_type == "session.done":
print(f"\nSession complete: {msg['total_sentences']} sentence(s) generated")
break
elif msg_type == "error":
print(f" ERROR: {msg['message']}")
else:
print(f" Unknown message: {msg}")
finally:
sender_task.cancel()
try:
await sender_task
except asyncio.CancelledError:
Copy link

Copilot AI Feb 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'except' clause does nothing but pass and there is no explanatory comment.

Suggested change
except asyncio.CancelledError:
except asyncio.CancelledError:
# Task cancellation is expected during shutdown; safe to ignore.

Copilot uses AI. Check for mistakes.
pass

print(f"\nAudio files saved to: {output_dir}/")


def main():
parser = argparse.ArgumentParser(description="Streaming text-input TTS client")
parser.add_argument(
"--url",
default="ws://localhost:8000/v1/audio/speech/stream",
help="WebSocket endpoint URL",
)
parser.add_argument(
"--text",
required=True,
help="Text to synthesize",
)
parser.add_argument(
"--output-dir",
default="streaming_tts_output",
help="Directory to save audio files (default: streaming_tts_output)",
)

# Session config options
parser.add_argument("--model", default=None, help="Model name")
parser.add_argument("--voice", default="Vivian", help="Speaker voice")
parser.add_argument(
"--task-type",
default="CustomVoice",
choices=["CustomVoice", "VoiceDesign", "Base"],
help="TTS task type",
)
parser.add_argument("--language", default="Auto", help="Language")
parser.add_argument("--instructions", default=None, help="Voice style instructions")
parser.add_argument(
"--response-format",
default="wav",
choices=["wav", "pcm", "flac", "mp3", "aac", "opus"],
help="Audio format",
)
parser.add_argument("--speed", type=float, default=1.0, help="Playback speed (0.25-4.0)")
parser.add_argument("--max-new-tokens", type=int, default=None, help="Max tokens")

# Base task options
parser.add_argument("--ref-audio", default=None, help="Reference audio")
parser.add_argument("--ref-text", default=None, help="Reference text")
parser.add_argument(
"--x-vector-only-mode",
action="store_true",
default=False,
help="Speaker embedding only mode",
)

# STT simulation
parser.add_argument(
"--simulate-stt",
action="store_true",
help="Simulate STT by sending text word-by-word",
)
parser.add_argument(
"--stt-delay",
type=float,
default=0.1,
help="Delay between words in STT simulation (seconds)",
)

args = parser.parse_args()

# Build session config (only include non-None values)
config = {}
for key in [
"model",
"voice",
"task_type",
"language",
"instructions",
"response_format",
"speed",
"max_new_tokens",
"ref_audio",
"ref_text",
]:
val = getattr(args, key.replace("-", "_"), None)
if val is not None:
config[key] = val
if args.x_vector_only_mode:
config["x_vector_only_mode"] = True

asyncio.run(
stream_tts(
url=args.url,
text=args.text,
config=config,
output_dir=args.output_dir,
simulate_stt=args.simulate_stt,
stt_delay=args.stt_delay,
)
)


if __name__ == "__main__":
main()
Loading
Loading