Skip to content

Commit 23ee755

Browse files
authored
feat(gateway): add Messages API streaming support to gRPC router (#758)
Signed-off-by: Simo Lin <linsimo.mark@gmail.com>
1 parent f687596 commit 23ee755

4 files changed

Lines changed: 934 additions & 25 deletions

File tree

model_gateway/src/routers/grpc/pipeline.rs

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -274,11 +274,19 @@ impl RequestPipeline {
274274
configured_reasoning_parser: Option<String>,
275275
) -> Self {
276276
let processor = processor::ResponseProcessor::new(
277+
tool_parser_factory.clone(),
278+
reasoning_parser_factory.clone(),
279+
configured_tool_parser.clone(),
280+
configured_reasoning_parser.clone(),
281+
);
282+
283+
let streaming_processor = Arc::new(streaming::StreamingProcessor::new(
277284
tool_parser_factory,
278285
reasoning_parser_factory,
279286
configured_tool_parser,
280287
configured_reasoning_parser,
281-
);
288+
metrics_labels::BACKEND_REGULAR,
289+
));
282290

283291
let stages: Vec<Box<dyn PipelineStage>> = vec![
284292
Box::new(MessagePreparationStage),
@@ -291,7 +299,10 @@ impl RequestPipeline {
291299
Box::new(MessageRequestBuildingStage::new(false)), // No PD metadata
292300
Box::new(DispatchMetadataStage),
293301
Box::new(RequestExecutionStage::new(ExecutionMode::Single)),
294-
Box::new(MessageResponseProcessingStage::new(processor)),
302+
Box::new(MessageResponseProcessingStage::new(
303+
processor,
304+
streaming_processor,
305+
)),
295306
];
296307

297308
Self {
@@ -310,11 +321,19 @@ impl RequestPipeline {
310321
configured_reasoning_parser: Option<String>,
311322
) -> Self {
312323
let processor = processor::ResponseProcessor::new(
324+
tool_parser_factory.clone(),
325+
reasoning_parser_factory.clone(),
326+
configured_tool_parser.clone(),
327+
configured_reasoning_parser.clone(),
328+
);
329+
330+
let streaming_processor = Arc::new(streaming::StreamingProcessor::new(
313331
tool_parser_factory,
314332
reasoning_parser_factory,
315333
configured_tool_parser,
316334
configured_reasoning_parser,
317-
);
335+
metrics_labels::BACKEND_PD,
336+
));
318337

319338
let stages: Vec<Box<dyn PipelineStage>> = vec![
320339
Box::new(MessagePreparationStage),
@@ -327,7 +346,10 @@ impl RequestPipeline {
327346
Box::new(MessageRequestBuildingStage::new(true)), // Inject PD metadata
328347
Box::new(DispatchMetadataStage),
329348
Box::new(RequestExecutionStage::new(ExecutionMode::DualDispatch)),
330-
Box::new(MessageResponseProcessingStage::new(processor)),
349+
Box::new(MessageResponseProcessingStage::new(
350+
processor,
351+
streaming_processor,
352+
)),
331353
];
332354

333355
Self {

model_gateway/src/routers/grpc/regular/processor.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -487,13 +487,19 @@ impl ResponseProcessor {
487487
let complete = all_responses.into_iter().next().unwrap();
488488

489489
// Check parser availability
490-
// Always attempt reasoning parsing when a parser is available — some models'
491-
// chat templates emit thinking tokens regardless of the request's `thinking` config.
492-
let reasoning_parser_available = utils::check_reasoning_parser_availability(
493-
&self.reasoning_parser_factory,
494-
self.configured_reasoning_parser.as_deref(),
495-
&messages_request.model,
490+
// Only run reasoning parser when the user explicitly enabled thinking in the request.
491+
// Without this gate, the reasoning parser misclassifies normal text and tool call JSON
492+
// as thinking content, breaking tool use and producing incorrect content blocks.
493+
let separate_reasoning = matches!(
494+
&messages_request.thinking,
495+
Some(messages::ThinkingConfig::Enabled { .. })
496496
);
497+
let reasoning_parser_available = separate_reasoning
498+
&& utils::check_reasoning_parser_availability(
499+
&self.reasoning_parser_factory,
500+
self.configured_reasoning_parser.as_deref(),
501+
&messages_request.model,
502+
);
497503

498504
let tool_choice_enabled = !matches!(
499505
&messages_request.tool_choice,
@@ -508,7 +514,7 @@ impl ResponseProcessor {
508514
&messages_request.model,
509515
);
510516

511-
if !reasoning_parser_available {
517+
if separate_reasoning && !reasoning_parser_available {
512518
tracing::debug!(
513519
"No reasoning parser found for model '{}', reasoning content will not be separated",
514520
messages_request.model

model_gateway/src/routers/grpc/regular/stages/messages/response_processing.rs

Lines changed: 49 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,50 @@
1-
//! Message response processing stage: non-streaming response processing
1+
//! Message response processing stage: streaming and non-streaming response processing
22
//!
3-
//! Collects the backend response, converts it to an Anthropic `Message`,
4-
//! and stores it as FinalResponse::Messages.
5-
//! Streaming support will be added in a follow-up PR.
3+
//! - For streaming: Spawns background task and returns SSE response (early exit)
4+
//! - For non-streaming: Collects the backend response, converts it to an Anthropic `Message`,
5+
//! and stores it as FinalResponse::Messages.
6+
7+
use std::sync::Arc;
68

79
use async_trait::async_trait;
810
use axum::response::Response;
911
use tracing::error;
1012

11-
use crate::routers::{
12-
error,
13-
grpc::{
14-
common::stages::PipelineStage,
15-
context::{FinalResponse, RequestContext},
16-
regular::processor,
13+
use crate::{
14+
core::AttachedBody,
15+
routers::{
16+
error,
17+
grpc::{
18+
common::stages::PipelineStage,
19+
context::{FinalResponse, RequestContext},
20+
regular::{processor, streaming},
21+
},
1722
},
1823
};
1924

20-
/// Message response processing stage (non-streaming only)
25+
/// Message response processing stage
2126
pub(crate) struct MessageResponseProcessingStage {
2227
processor: processor::ResponseProcessor,
28+
streaming_processor: Arc<streaming::StreamingProcessor>,
2329
}
2430

2531
impl MessageResponseProcessingStage {
26-
pub fn new(processor: processor::ResponseProcessor) -> Self {
27-
Self { processor }
32+
pub fn new(
33+
processor: processor::ResponseProcessor,
34+
streaming_processor: Arc<streaming::StreamingProcessor>,
35+
) -> Self {
36+
Self {
37+
processor,
38+
streaming_processor,
39+
}
2840
}
2941
}
3042

3143
#[async_trait]
3244
impl PipelineStage for MessageResponseProcessingStage {
3345
async fn execute(&self, ctx: &mut RequestContext) -> Result<Option<Response>, Response> {
46+
let is_streaming = ctx.is_streaming();
47+
3448
// Extract execution result
3549
let execution_result = ctx.state.response.execution_result.take().ok_or_else(|| {
3650
error!(
@@ -66,6 +80,28 @@ impl PipelineStage for MessageResponseProcessingStage {
6680
)
6781
})?;
6882

83+
if is_streaming {
84+
// Streaming: use StreamingProcessor and return SSE response
85+
let response = self
86+
.streaming_processor
87+
.clone()
88+
.process_messages_streaming_response(
89+
execution_result,
90+
ctx.messages_request_arc(),
91+
dispatch,
92+
tokenizer,
93+
);
94+
95+
// Attach load guards for RAII lifecycle
96+
let response = match ctx.state.load_guards.take() {
97+
Some(guards) => AttachedBody::wrap_response(response, guards),
98+
None => response,
99+
};
100+
101+
return Ok(Some(response));
102+
}
103+
104+
// Non-streaming: delegate to ResponseProcessor
69105
let messages_request = ctx.messages_request_arc();
70106

71107
let stop_decoder = ctx.state.response.stop_decoder.as_mut().ok_or_else(|| {
@@ -79,7 +115,6 @@ impl PipelineStage for MessageResponseProcessingStage {
79115
)
80116
})?;
81117

82-
// Non-streaming: delegate to ResponseProcessor
83118
let response = self
84119
.processor
85120
.process_non_streaming_messages_response(

0 commit comments

Comments
 (0)