Skip to content

Commit 2e82028

Browse files
authored
Merge pull request #131 from nearai/fix/sse-error-chunk-propagation
fix(proxy): propagate SSE error chunks as upstream errors
2 parents 64161cb + 1f71cc7 commit 2e82028

2 files changed

Lines changed: 146 additions & 0 deletions

File tree

src/proxy.rs

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,29 @@ pub async fn proxy_json_request(
390390
assembler.process_chunk(&chunk);
391391
}
392392
}
393+
// If the stream surfaced an upstream error chunk (e.g. SGLang queue-full
394+
// abort), propagate it as a real upstream error. Otherwise the empty
395+
// `choices: []` final chunk would be signed and returned as HTTP 200,
396+
// hiding the failure from cloud-api's retry logic.
397+
if let Some(err) = assembler.take_error() {
398+
let status_code = err
399+
.get("code")
400+
.and_then(|v| v.as_u64())
401+
.and_then(|c| u16::try_from(c).ok())
402+
.and_then(|c| StatusCode::from_u16(c).ok())
403+
.unwrap_or(StatusCode::BAD_GATEWAY);
404+
let body_bytes = Bytes::from(
405+
serde_json::to_vec(&serde_json::json!({ "error": err }))
406+
.map_err(|e| AppError::Internal(e.into()))?,
407+
);
408+
let reqwest_status = reqwest::StatusCode::from_u16(status_code.as_u16())
409+
.unwrap_or(reqwest::StatusCode::BAD_GATEWAY);
410+
log_upstream_error(reqwest_status, url, &body_bytes);
411+
return Err(AppError::Upstream {
412+
status: status_code,
413+
body: body_bytes,
414+
});
415+
}
393416
assembler.into_response(&opts.id_prefix)
394417
} else {
395418
// Backend returned plain JSON — process as before.
@@ -473,6 +496,13 @@ struct StreamingResponseAssembler {
473496
usage: Option<serde_json::Value>,
474497
metadata: Option<serde_json::Value>,
475498
shape: ResponseShape,
499+
/// First `event["error"]` object seen in the stream. SGLang aborts (e.g.
500+
/// `--max-queued-requests` overflow) emit `data: {"error": {...}}` then
501+
/// continue with `[DONE]` plus an empty-choices/zero-usage chunk, so the
502+
/// upstream returns HTTP 200 with a valid SSE shape. Capturing this lets
503+
/// the caller surface a real upstream error instead of returning a
504+
/// phantom HTTP 200 + empty choices.
505+
error: Option<serde_json::Value>,
476506
}
477507

478508
/// Accumulates delta fields for a single choice.
@@ -496,9 +526,15 @@ impl StreamingResponseAssembler {
496526
usage: None,
497527
metadata: None,
498528
shape,
529+
error: None,
499530
}
500531
}
501532

533+
/// Returns the upstream error chunk captured during stream processing, if any.
534+
fn take_error(&mut self) -> Option<serde_json::Value> {
535+
self.error.take()
536+
}
537+
502538
fn process_chunk(&mut self, chunk: &[u8]) {
503539
match std::str::from_utf8(chunk) {
504540
Ok(s) => self.line_buffer.push_str(s),
@@ -531,6 +567,16 @@ impl StreamingResponseAssembler {
531567
}
532568

533569
fn ingest_event(&mut self, event: &serde_json::Value) {
570+
// Capture the first upstream error chunk. SGLang surfaces aborts
571+
// (queue-full, priority-disabled, waiting timeout) by emitting
572+
// `data: {"error": {"object":"error","message":"...","type":"...","code":<http_status>}}`
573+
// mid-stream while keeping the SSE response otherwise well-formed.
574+
if self.error.is_none() {
575+
if let Some(err) = event.get("error").filter(|v| v.is_object()) {
576+
self.error = Some(err.clone());
577+
}
578+
}
579+
534580
// Capture top-level fields from the first event.
535581
if self.id.is_none() {
536582
self.id = event.get("id").and_then(|v| v.as_str()).map(String::from);
@@ -2441,6 +2487,58 @@ mod tests {
24412487
assert!(id.starts_with("cmpl-"), "should generate id: {id}");
24422488
}
24432489

2490+
#[test]
2491+
fn test_assembler_captures_sglang_queue_full_abort() {
2492+
// SGLang's `--max-queued-requests` abort emits an error data chunk
2493+
// mid-stream, then continues with an empty-choices/zero-usage chunk
2494+
// and `[DONE]`. The assembler must capture the error so the caller
2495+
// can surface it as a real upstream error instead of returning HTTP
2496+
// 200 with empty choices.
2497+
let mut asm = StreamingResponseAssembler::new(ResponseShape::ChatCompletion);
2498+
asm.process_chunk(
2499+
b"data: {\"error\":{\"object\":\"error\",\"message\":\"The request queue is full.\",\"type\":\"SERVICE_UNAVAILABLE\",\"code\":503}}\n\n",
2500+
);
2501+
asm.process_chunk(
2502+
b"data: {\"id\":\"chatcmpl-abc\",\"object\":\"chat.completion.chunk\",\"model\":\"glm-5.1\",\"created\":1779313724,\"choices\":[],\"usage\":{\"prompt_tokens\":0,\"completion_tokens\":0,\"total_tokens\":0}}\n\n",
2503+
);
2504+
asm.process_chunk(b"data: [DONE]\n\n");
2505+
2506+
let err = asm.take_error().expect("error chunk must be captured");
2507+
assert_eq!(err["code"], 503);
2508+
assert_eq!(err["type"], "SERVICE_UNAVAILABLE");
2509+
assert_eq!(err["message"], "The request queue is full.");
2510+
// Subsequent take returns None.
2511+
assert!(asm.take_error().is_none());
2512+
}
2513+
2514+
#[test]
2515+
fn test_assembler_ignores_non_object_error_field() {
2516+
// A `null` or string `error` field on a chunk must not be mistaken
2517+
// for an upstream abort.
2518+
let mut asm = StreamingResponseAssembler::new(ResponseShape::ChatCompletion);
2519+
asm.process_chunk(
2520+
b"data: {\"id\":\"c1\",\"error\":null,\"choices\":[{\"index\":0,\"delta\":{\"content\":\"hi\"},\"finish_reason\":\"stop\"}]}\n\ndata: [DONE]\n\n",
2521+
);
2522+
assert!(asm.take_error().is_none());
2523+
let resp = asm.into_response("chatcmpl");
2524+
assert_eq!(resp["choices"][0]["message"]["content"], "hi");
2525+
}
2526+
2527+
#[test]
2528+
fn test_assembler_keeps_first_error_chunk() {
2529+
// If the stream emits multiple error chunks (defensive — not observed
2530+
// in practice), keep the first so we surface the original failure.
2531+
let mut asm = StreamingResponseAssembler::new(ResponseShape::ChatCompletion);
2532+
asm.process_chunk(
2533+
b"data: {\"error\":{\"object\":\"error\",\"message\":\"first\",\"type\":\"SERVICE_UNAVAILABLE\",\"code\":503}}\n\n",
2534+
);
2535+
asm.process_chunk(
2536+
b"data: {\"error\":{\"object\":\"error\",\"message\":\"second\",\"type\":\"INTERNAL\",\"code\":500}}\n\n",
2537+
);
2538+
let err = asm.take_error().unwrap();
2539+
assert_eq!(err["message"], "first");
2540+
}
2541+
24442542
#[test]
24452543
fn test_inject_streaming() {
24462544
let body = br#"{"messages":[{"role":"user","content":"hi"}]}"#;

tests/integration.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -796,6 +796,54 @@ async fn test_completions_endpoint_sse_reassembly() {
796796
assert_eq!(body["usage"]["completion_tokens"], 3);
797797
}
798798

799+
// ---- SSE error chunk propagation ----
800+
801+
/// SGLang's `--max-queued-requests` abort emits `data: {"error": {...}}` then
802+
/// `data: {...,"choices":[],"usage":{0,0,0}}` then `[DONE]`, so the upstream
803+
/// returns HTTP 200 with a valid SSE shape. proxy_json_request must detect the
804+
/// error chunk and propagate the upstream status (503) rather than signing the
805+
/// empty-choices body and returning HTTP 200 to the caller.
806+
#[tokio::test]
807+
async fn test_chat_sse_queue_full_propagates_503() {
808+
let mock_server = MockServer::start().await;
809+
810+
let sse_body = concat!(
811+
"data: {\"error\":{\"object\":\"error\",\"message\":\"The request queue is full.\",\"type\":\"SERVICE_UNAVAILABLE\",\"code\":503}}\n\n",
812+
"data: {\"id\":\"chatcmpl-q1\",\"object\":\"chat.completion.chunk\",\"model\":\"test-model\",\"created\":100,\"choices\":[],\"usage\":{\"prompt_tokens\":0,\"completion_tokens\":0,\"total_tokens\":0}}\n\n",
813+
"data: [DONE]\n\n",
814+
);
815+
816+
Mock::given(method("POST"))
817+
.and(path("/v1/chat/completions"))
818+
.respond_with(
819+
ResponseTemplate::new(200).set_body_raw(sse_body.as_bytes(), "text/event-stream"),
820+
)
821+
.mount(&mock_server)
822+
.await;
823+
824+
let app = build_test_app(&mock_server.uri());
825+
826+
let response = app
827+
.oneshot(
828+
Request::builder()
829+
.method("POST")
830+
.uri("/v1/chat/completions")
831+
.header("content-type", "application/json")
832+
.header(auth_header().0, auth_header().1)
833+
.body(Body::from(
834+
r#"{"model":"test-model","messages":[{"role":"user","content":"hi"}]}"#,
835+
))
836+
.unwrap(),
837+
)
838+
.await
839+
.unwrap();
840+
841+
assert_eq!(response.status(), StatusCode::SERVICE_UNAVAILABLE);
842+
let body = body_to_json(response).await;
843+
assert_eq!(body["error"]["message"], "The request queue is full.");
844+
assert_eq!(body["error"]["type"], "SERVICE_UNAVAILABLE");
845+
}
846+
799847
// ---- Embeddings ----
800848

801849
#[tokio::test]

0 commit comments

Comments
 (0)