Skip to content

Commit de48f30

Browse files
perf(http-pd): use shared SseEncoder for logprob-merge SSE re-encode
Replace the per-chunk `serde_json::to_string` + `format!("data: …")` pair in `merge_streaming_logprobs` with the shared `common::sse::SseEncoder`. A single encoder is created once per stream in the `create_streaming_response` relay task and threaded into the merge helper, so the serialization buffer is reused across every re-encoded chunk (1 allocation per chunk instead of 2). The idiosyncratic streaming-error frame in `handle_decode_error_response` (`data: {'error': …}` without a trailing blank line) is left untouched to avoid changing its wire format. No behavior change. Signed-off-by: XinyueZhang369 <zoeyzhang369@gmail.com>
1 parent cac6755 commit de48f30

1 file changed

Lines changed: 12 additions & 8 deletions

File tree

model_gateway/src/routers/http/pd_router.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use crate::{
3636
common::{
3737
header_utils,
3838
retry::{is_retryable_status, RetryExecutor},
39+
sse::SseEncoder,
3940
},
4041
error,
4142
grpc::utils::{error_type_from_status, route_to_endpoint},
@@ -910,14 +911,20 @@ impl PDRouter {
910911
)]
911912
tokio::spawn(async move {
912913
futures_util::pin_mut!(stream);
914+
// Reusable SSE encoder for the logprob-merge re-encode path.
915+
let mut encoder = SseEncoder::new();
913916
while let Some(chunk_result) = stream.next().await {
914917
match chunk_result {
915918
Ok(chunk) => {
916919
let is_done = memmem::find(&chunk, b"data: [DONE]").is_some();
917920

918921
let result = if return_logprob && prefill_logprobs.is_some() {
919-
Self::merge_streaming_logprobs(prefill_logprobs.clone(), &chunk)
920-
.unwrap_or(chunk)
922+
Self::merge_streaming_logprobs(
923+
prefill_logprobs.clone(),
924+
&chunk,
925+
&mut encoder,
926+
)
927+
.unwrap_or(chunk)
921928
} else {
922929
chunk
923930
};
@@ -1138,6 +1145,7 @@ impl PDRouter {
11381145
fn merge_streaming_logprobs(
11391146
prefill_logprobs: Option<Value>,
11401147
decode_chunk: &[u8],
1148+
encoder: &mut SseEncoder,
11411149
) -> Result<bytes::Bytes, ()> {
11421150
// Skip non-data chunks
11431151
let chunk_str = std::str::from_utf8(decode_chunk).map_err(|_| ())?;
@@ -1168,12 +1176,8 @@ impl PDRouter {
11681176
}
11691177
}
11701178

1171-
// Re-serialize
1172-
let merged_str = format!(
1173-
"data: {}\n\n",
1174-
serde_json::to_string(&decode_json).unwrap_or_default()
1175-
);
1176-
Ok(bytes::Bytes::from(merged_str))
1179+
// Re-serialize via the shared encoder (reuses its buffer across chunks).
1180+
encoder.encode_data(&decode_json).map_err(|_| ())
11771181
}
11781182
}
11791183

0 commit comments

Comments
 (0)