Speech to text streaming support#4140
Conversation
b3f80c0 to
530e420
Compare
There was a problem hiding this comment.
Pull request overview
Adds MediaPipe LOOPBACK-based streaming support for Speech-to-Text (STT) so audio endpoints can return partial transcription results over Server-Sent Events (SSE), aligning STT behavior with existing streaming patterns used by LLM graphs.
Changes:
- Enable LOOPBACK/back-edge wiring for STT graphs (test graphs + exported graph templates).
- Add multipart
"stream"field detection in REST v3 payload creation and implement streaming logic inS2tCalculator. - Update demo docs/scripts to reference the 2026.1 release assets/images.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
src/test/audio/graph_stt.pbtxt |
Adds LOOPBACK/back_edge + SyncSet handler for STT streaming in test graph. |
src/test/audio/graph_stt_word_timestamps.pbtxt |
Same as above, for word-timestamps STT test graph. |
src/mediapipe_internal/mediapipegraphexecutor.hpp |
Ensures additional side packets are provided for streaming graph execution. |
src/http_rest_api_handler.cpp |
Detects multipart stream=true and routes v3 REST requests into streaming inference. |
src/graph_export/graph_export.cpp |
Exports STT graphs with LOOPBACK/back_edge + SyncSet handler to support streaming. |
src/audio/speech_to_text/s2t_calculator.cc |
Implements STT streaming via background generation + LOOPBACK iterations emitting SSE chunks. |
demos/common/export_models/export_model.py |
Updates exported STT graph template to include LOOPBACK/back_edge + SyncSet handler. |
demos/audio/README.md |
Updates demo instructions to use release URLs/tags and pip3 in some commands. |
| static std::string wrapTextInServerSideEventMessage(const std::string& text) { | ||
| std::stringstream ss; | ||
| ss << "data: " << text << "\n\n"; | ||
| return ss.str(); | ||
| } |
There was a problem hiding this comment.
wrapTextInServerSideEventMessage uses std::stringstream, but this file doesn't include <sstream>. This relies on transitive includes and can break compilation depending on toolchain; add the missing standard header explicitly.
| if (language.size() > ISO_LANG_CODE_MAX) { | ||
| return absl::InvalidArgumentError("Invalid language code."); | ||
| } | ||
| SPDLOG_LOGGER_TRACE(s2t_calculator_logger, "Received language: {}"); |
There was a problem hiding this comment.
The log line SPDLOG_LOGGER_TRACE(..., "Received language: {}") has a {} placeholder but no argument provided. With fmt/spdlog format checking this can fail to compile or throw at runtime; pass language (or remove the placeholder).
| SPDLOG_LOGGER_TRACE(s2t_calculator_logger, "Received language: {}"); | |
| SPDLOG_LOGGER_TRACE(s2t_calculator_logger, "Received language: {}", language); |
| // Streaming with timestamps: GenAI streams chunk-level batches, not per-token | ||
| // Streaming without timestamps: GenAI streams per-token decoded text | ||
| generateFuture_ = std::async(std::launch::async, | ||
| [pipe, rawSpeech = std::move(rawSpeech), config, streamerCallback, queue]() mutable -> ov::genai::WhisperDecodedResults { | ||
| try { | ||
| std::unique_lock lock(pipe->sttPipelineMutex); | ||
| auto result = pipe->sttPipeline->generate(rawSpeech, config, streamerCallback); | ||
| lock.unlock(); | ||
| queue->setDone(); | ||
| return result; | ||
| } catch (...) { | ||
| queue->setDone(); | ||
| throw; | ||
| } | ||
| }); |
There was a problem hiding this comment.
Streaming spawns a background thread via std::async(std::launch::async, ...) stored in generateFuture_, but there is no corresponding teardown path (e.g., in Close()/cancellation) to ensure the async task is completed/cancelled before graph shutdown. This can lead to long shutdowns or hangs if a stream is aborted mid-generation. Consider adding an explicit cancellation flag checked by the streamer callback (return StreamingStatus::CANCEL) and ensuring the future is waited/joined during node teardown.
| // Thread-safe queue for streaming partial transcription results from the | ||
| // background generate() thread to the MediaPipe LOOPBACK loop. | ||
| class StreamingTextQueue { | ||
| public: | ||
| void push(std::string text) { | ||
| std::lock_guard<std::mutex> lock(mutex_); | ||
| queue_.push(std::move(text)); | ||
| cv_.notify_one(); | ||
| } | ||
|
|
||
| // Signals that generation has finished (successfully or with error). | ||
| void setDone() { | ||
| std::lock_guard<std::mutex> lock(mutex_); | ||
| done_ = true; | ||
| cv_.notify_one(); | ||
| } | ||
|
|
||
| // Blocks until a text chunk is available or generation is done. | ||
| // Returns true if a chunk was retrieved, false if done and queue is empty. | ||
| bool waitAndPop(std::string& out) { | ||
| std::unique_lock<std::mutex> lock(mutex_); | ||
| cv_.wait(lock, [this] { return !queue_.empty() || done_; }); | ||
| if (!queue_.empty()) { | ||
| out = std::move(queue_.front()); | ||
| queue_.pop(); | ||
| return true; | ||
| } | ||
| return false; // done and empty | ||
| } | ||
|
|
||
| bool isDone() const { | ||
| std::lock_guard<std::mutex> lock(mutex_); | ||
| return done_ && queue_.empty(); | ||
| } | ||
|
|
||
| private: | ||
| mutable std::mutex mutex_; | ||
| std::condition_variable cv_; | ||
| std::queue<std::string> queue_; | ||
| bool done_ = false; |
There was a problem hiding this comment.
StreamingTextQueue is unbounded, and processStreamingIteration() drains at most one chunk per LOOPBACK iteration. If the GenAI callback produces chunks faster than the graph can schedule LOOPBACK iterations / send responses, this queue can grow without bound and increase memory usage for long streams. Consider bounding/coalescing queued chunks (e.g., keep only the latest chunk or drain all available chunks per iteration) to avoid unbounded growth.
| std::stringstream ss; | ||
| ss << "data: " << text << "\n\n"; | ||
| return ss.str(); |
There was a problem hiding this comment.
This introduces a local wrapTextInServerSideEventMessage() helper, but an identical helper already exists in the LLM serving code (src/llm/servable.cpp). Duplicating this increases maintenance cost and risks behavior drift (e.g., future SSE formatting changes). Consider reusing the existing utility (or moving it to a shared header) instead of duplicating it here.
| std::stringstream ss; | |
| ss << "data: " << text << "\n\n"; | |
| return ss.str(); | |
| std::string message; | |
| message.reserve(sizeof("data: ") - 1 + text.size() + sizeof("\n\n") - 1); | |
| message.append("data: "); | |
| message.append(text); | |
| message.append("\n\n"); | |
| return message; |
| std::string streamField = payload.multipartParser->getFieldByName("stream"); | ||
| bool requestStreaming = (streamField == "true") && hasLoopback_; | ||
| if (streamField == "true" && !hasLoopback_) { | ||
| return absl::InvalidArgumentError("streaming is not supported for this graph configuration (LOOPBACK not configured)"); | ||
| } | ||
|
|
There was a problem hiding this comment.
Streaming support is enabled by the new stream multipart field and LOOPBACK-driven execution, but there are no unit/functional tests covering the streaming response path (SSE framing, [DONE] terminator, and partial chunk emission). The existing audio HTTP tests cover unary behavior; please add a streaming test case to prevent regressions.
| oss << R"( | ||
| input_stream: "HTTP_REQUEST_PAYLOAD:input" | ||
| output_stream: "HTTP_RESPONSE_PAYLOAD:output" | ||
| node { | ||
| name: ")" | ||
| << exportSettings.modelName << R"(" | ||
| calculator: "S2tCalculator" | ||
| input_side_packet: "STT_NODE_RESOURCES:s2t_servable" | ||
| input_stream: "LOOPBACK:loopback" | ||
| input_stream: "HTTP_REQUEST_PAYLOAD:input" | ||
| output_stream: "LOOPBACK:loopback" | ||
| output_stream: "HTTP_RESPONSE_PAYLOAD:output" | ||
| input_stream_info: { | ||
| tag_index: 'LOOPBACK:0', | ||
| back_edge: true | ||
| } | ||
| node_options: { | ||
| [type.googleapis.com / mediapipe.S2tCalculatorOptions]: { | ||
| models_path: ")" | ||
| << modelsPath << R"(" | ||
| target_device: ")" << exportSettings.targetDevice << R"(" | ||
| )"; | ||
| if (pluginConfigOpt.has_value()) { | ||
| oss << R"(plugin_config: ')" << pluginConfigOpt.value() << R"(' | ||
| )"; | ||
| } | ||
| oss << R"(} | ||
| } | ||
| input_stream_handler { | ||
| input_stream_handler: "SyncSetInputStreamHandler", | ||
| options { | ||
| [mediapipe.SyncSetInputStreamHandlerOptions.ext] { | ||
| sync_set { | ||
| tag_index: "LOOPBACK:0" | ||
| } | ||
| } | ||
| } | ||
| } | ||
| })"; |
There was a problem hiding this comment.
createSpeechToTextGraphTemplate() now emits LOOPBACK/back_edge configuration for STT graphs. The unit test expectations in src/test/graph_export_test.cpp still assert the previous graph contents without LOOPBACK, so CI will likely fail unless those expected strings are updated accordingly.
f67ddfe to
6a92481
Compare
mzegla
left a comment
There was a problem hiding this comment.
We support streaming for stateful LLMs. This new machinery for audio looks different as I recall. Seems quite complex here. Couldn't we reuse the flow of legacy/stateful pipelines?
| // --- First iteration: new request --- | ||
| if (cc->Inputs().Tag(INPUT_TAG_NAME).IsEmpty()) { | ||
| return absl::OkStatus(); | ||
| } |
There was a problem hiding this comment.
when does it happen exactly?
There was a problem hiding this comment.
I assume it could happen with invalid graph.pbtxt
| serializeTimestamps(writer, result, config); | ||
| } | ||
| if (endpoint == Endpoint::TRANSLATIONS) { | ||
| float temperature = 1.0f; |
There was a problem hiding this comment.
why is this hardcoded? why this specific value?
There was a problem hiding this comment.
This is default value from genai generation config. Could be probbably changed to comment to make it clear what default value is.
| return absl::InvalidArgumentError("Temperature out of range(0.0, 2.0)"); | ||
| temperature = temp.value(); | ||
| } else { | ||
| temperature = 1.0f; |
|
|
||
| static absl::Status parseTemperature(const ovms::HttpPayload& payload, float& temperature); | ||
|
|
||
| static std::string serializeTextChunk(const std::string& text) { |
There was a problem hiding this comment.
Isn't it duplicated code? I suppose we already have quite a lot of logic for serialization.
| bool hasData = streamingQueue_->waitAndPop(chunk); | ||
|
|
||
| if (hasData) { | ||
| accumulatedText_ += chunk; |
There was a problem hiding this comment.
why do we need to accumulate text?
| // serialization and LOOPBACK signaling. | ||
| class S2tStreamingHandler { | ||
| public: | ||
| bool isActive() const { return isStreaming_; } |
There was a problem hiding this comment.
do we need that? couldn't we rely on loopback being on/off?
| return false; // done and empty | ||
| } | ||
|
|
||
| bool isDone() const { |
There was a problem hiding this comment.
The logic of this function does not keep setter/getter pair idea. I mean setDone does not guarantee that following isDone() will be called.
Either setDone should
a) clear the queue
b) assert queue is empty and fail if called on non-empty queue
or isDone should only check flag set by the setter
or isDone should be ranamed to not be considered simply a getter for done_
| req->path().find("/responses") != std::string::npos; | ||
| bool isLongRunning = req->path().find("/completions") != std::string::npos || | ||
| req->path().find("/responses") != std::string::npos || | ||
| req->path().find("/audio/") != std::string::npos; |
There was a problem hiding this comment.
why audio ends with slash and others don't?
| // Here we need to schedule the request to the separate thread pool | ||
| // in order to use disconnection callback of drogon. | ||
| if (isTextGeneration) { | ||
| if (isLongRunning) { |
There was a problem hiding this comment.
Because its not related to text generation only :)
But this name is not perfect, i agree
There was a problem hiding this comment.
What is breaking out of text generation here? Speech generation?
| accumulatedText_.clear(); | ||
| streamingQueue_ = std::make_shared<ovms::StreamingTextQueue>(); | ||
|
|
||
| auto queue = streamingQueue_; |
There was a problem hiding this comment.
why is here a copy of queue shared reference? cant you use streamingQueue_ directly?
| isStreaming_ = false; | ||
| return status; | ||
| } | ||
| generateFuture_ = std::async(std::launch::async, |
There was a problem hiding this comment.
Do we really want to create new thread each time? (use of std::async)
Shouldn't we follow ContinuousBatchingPipeline convention and implement add_request, read and read_all in GenAI?
There was a problem hiding this comment.
Maybe we could use threadpool with size 1 to ensure its all sequential and also no new threads are created? Or reuse LLMPipeline generate with streaming logic
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 29 out of 29 changed files in this pull request and generated 3 comments.
Comments suppressed due to low confidence (1)
src/llm/language_model/legacy/legacy_executor.cpp:31
LegacyExecutor::processRequest()accessesrequests.front()without holdingqueueMutex, whileaddRequest()can push concurrently. This is a data race (and the reference fromfront()can be invalidated depending on the underlying container). Consider lockingqueueMutex, moving the front request out + popping it, then releasing the lock before running generation and signaling the context.
void LegacyExecutor::processRequest() {
OVMS_PROFILE_FUNCTION();
auto& requestExecutionContext = requests.front();
if (requestExecutionContext->clientDisconnected) {
requestExecutionContext->success = false;
| #include <atomic> | ||
| #include <condition_variable> | ||
| #include <memory> | ||
| #include <mutex> | ||
| #include <queue> | ||
| #include <thread> | ||
| #include <utility> | ||
|
|
||
| #include <spdlog/spdlog.h> | ||
|
|
There was a problem hiding this comment.
executor_base.hpp calls exit(1) but does not include <cstdlib>, so the header is not self-contained and may fail to compile depending on include order. Add the proper standard header for exit (or avoid exit entirely if possible).
| #include <sstream> | ||
| #include <string> | ||
|
|
||
| namespace ovms { | ||
|
|
||
| inline std::string wrapTextInServerSideEventMessage(const std::string& text) { | ||
| std::stringstream ss; | ||
| ss << "data: " << text << "\n\n"; | ||
| return ss.str(); |
There was a problem hiding this comment.
wrapTextInServerSideEventMessage() uses std::stringstream in a header-only inline helper. This adds extra allocations/overhead on the streaming hot path (and pulls in <sstream> transitively). Consider building the string with direct concatenation/append (e.g., pre-sizing and appending) to reduce per-chunk overhead and compile-time impact.
| #include <sstream> | |
| #include <string> | |
| namespace ovms { | |
| inline std::string wrapTextInServerSideEventMessage(const std::string& text) { | |
| std::stringstream ss; | |
| ss << "data: " << text << "\n\n"; | |
| return ss.str(); | |
| #include <string> | |
| namespace ovms { | |
| inline std::string wrapTextInServerSideEventMessage(const std::string& text) { | |
| std::string result; | |
| result.reserve(6 + text.size() + 2); | |
| result.append("data: "); | |
| result.append(text); | |
| result.append("\n\n"); | |
| return result; |
| sttPipeline = std::make_shared<ov::genai::WhisperPipeline>(parsedModelsPath.string(), nodeOptions.target_device(), config); | ||
|
|
||
| streamingExecutor = std::make_unique<SttExecutorWrapper>(); | ||
| } |
There was a problem hiding this comment.
SttServable eagerly creates SttExecutorWrapper in the constructor, which spawns a dedicated background thread per STT node even if streaming is never used. This can add noticeable resource overhead (threads/stack memory) on servers with many graphs/nodes. Consider lazy-initializing the executor on the first streaming request (or reusing an existing shared executor/thread pool) so non-streaming deployments pay no cost.
| inputSidePackets[RERANK_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket<RerankServableMap>(this->sidePacketMaps.rerankServableMap).At(STARTING_TIMESTAMP); | ||
| inputSidePackets[STT_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket<SttServableMap>(this->sidePacketMaps.sttServableMap).At(STARTING_TIMESTAMP); | ||
| inputSidePackets[TTS_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket<TtsServableMap>(this->sidePacketMaps.ttsServableMap).At(STARTING_TIMESTAMP); | ||
| inputSidePackets[IMAGE_GEN_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket<ImageGenerationPipelinesMap>(this->sidePacketMaps.imageGenPipelinesMap).At(STARTING_TIMESTAMP); |
There was a problem hiding this comment.
Why is it all added now? Were they missing all along and it's kind of a bugfix?
There was a problem hiding this comment.
added by mistake. only inputSidePackets[STT_SESSION_SIDE_PACKET_TAG] is relevant
| void LegacyExecutor::addRequest(std::shared_ptr<LegacyServableExecutionContext> request) { | ||
| std::unique_lock<std::mutex> lock(queueMutex); | ||
| requests.push(request); | ||
| cv.notify_one(); |
There was a problem hiding this comment.
Couldn;t we use Executor base implementation?
| return false; // done and empty | ||
| } | ||
|
|
||
| bool isDone() const { |
| } | ||
|
|
||
| // --- LOOPBACK iteration: drain streaming queue --- | ||
| if (hasLoopback_ && !cc->Inputs().Tag(LOOPBACK_TAG_NAME).IsEmpty()) { |
There was a problem hiding this comment.
Isn't it the same a loopbackEmpty?
| // Reused by both streaming start (for config) and unary path. | ||
| // Kept here to avoid duplicating the parsing logic. | ||
| static absl::Status applyTranscriptionConfig(ov::genai::WhisperGenerationConfig& config, | ||
| const std::shared_ptr<ovms::SttServable>& pipe, const ovms::HttpPayload& payload); |
There was a problem hiding this comment.
Used for both unary/streaming - should be on servable level then I guess.
Keeping it in streaming handler is confusing also due to the fact it does not have direct connection to streaming (it's input preparation phase)
| // Here we need to schedule the request to the separate thread pool | ||
| // in order to use disconnection callback of drogon. | ||
| if (isTextGeneration) { | ||
| if (isLongRunning) { |
There was a problem hiding this comment.
What is breaking out of text generation here? Speech generation?
| std::condition_variable cv; | ||
| std::queue<RequestT> requests; | ||
| std::mutex queueMutex; | ||
|
|
There was a problem hiding this comment.
Why can't we have add request add this level?
|
|
||
| using SttStreamingJob = std::packaged_task<ov::genai::WhisperDecodedResults()>; | ||
|
|
||
| struct SttExecutor : public Executor<SttStreamingJob> { |
There was a problem hiding this comment.
can you move this to cpp file?
411838d to
d46f69d
Compare
40774fa to
6c35ff8
Compare
🛠 Summary
CVS-181778
CVS-185048
🧪 Checklist
``