Skip to content

Commit f02e9de

Browse files
committed
Handle Helicone streaming chat responses
1 parent dc1ae1b commit f02e9de

3 files changed

Lines changed: 563 additions & 1 deletion

File tree

crates/calciforge/src/proxy/helicone_router.rs

Lines changed: 351 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ use crate::{
1919
config::GatewayRetryConfig,
2020
proxy::backend::{BackendError, BackendType, ModelInfo, SecretsBackend},
2121
proxy::openai::{
22-
ChatCompletionRequest, ChatCompletionResponse, ChatMessage, ToolChoice, ToolDefinition,
22+
ChatCompletionRequest, ChatCompletionResponse, ChatMessage, Choice, FunctionCall,
23+
MessageContent, ToolCall, ToolChoice, ToolDefinition, Usage,
2324
},
2425
};
2526

@@ -230,6 +231,25 @@ impl HeliconeRouter {
230231
));
231232
}
232233

234+
let content_type = response
235+
.headers()
236+
.get(CONTENT_TYPE)
237+
.and_then(|value| value.to_str().ok())
238+
.unwrap_or("")
239+
.to_ascii_lowercase();
240+
if content_type.contains("text/event-stream") {
241+
let body = response.text().await.map_err(|e| {
242+
BackendError::transport(
243+
format!(
244+
"Failed to read Helicone streaming response for model '{}': {}",
245+
model_for_error, e
246+
),
247+
e.is_timeout(),
248+
)
249+
})?;
250+
return parse_streaming_chat_completion(&body, &model_for_error);
251+
}
252+
233253
let completion_response: ChatCompletionResponse = response.json().await.map_err(|e| {
234254
BackendError::InvalidResponse(format!(
235255
"Failed to parse Helicone response for model '{}': {}",
@@ -280,6 +300,222 @@ fn helicone_chat_completions_url(base_url: &str) -> Result<Url, HeliconeError> {
280300
Ok(url)
281301
}
282302

303+
#[derive(Debug, Default)]
304+
struct ToolCallAccumulator {
305+
id: Option<String>,
306+
r#type: Option<String>,
307+
name: Option<String>,
308+
arguments: String,
309+
}
310+
311+
fn parse_streaming_chat_completion(
312+
body: &str,
313+
requested_model: &str,
314+
) -> Result<ChatCompletionResponse, BackendError> {
315+
let mut id: Option<String> = None;
316+
let mut created: Option<u64> = None;
317+
let mut model: Option<String> = None;
318+
let mut role: Option<String> = None;
319+
let mut content = String::new();
320+
let mut reasoning = String::new();
321+
let mut reasoning_content = String::new();
322+
let mut finish_reason: Option<String> = None;
323+
let mut usage = Usage {
324+
prompt_tokens: 0,
325+
completion_tokens: 0,
326+
total_tokens: 0,
327+
};
328+
let mut tool_calls: HashMap<usize, ToolCallAccumulator> = HashMap::new();
329+
let mut saw_chunk = false;
330+
331+
for event in body.split("\n\n") {
332+
let mut data = String::new();
333+
for line in event.lines() {
334+
let Some(rest) = line.strip_prefix("data:") else {
335+
continue;
336+
};
337+
if !data.is_empty() {
338+
data.push('\n');
339+
}
340+
data.push_str(rest.trim_start());
341+
}
342+
let data = data.trim();
343+
if data.is_empty() || data == "[DONE]" {
344+
continue;
345+
}
346+
347+
saw_chunk = true;
348+
let value: serde_json::Value = serde_json::from_str(data).map_err(|e| {
349+
BackendError::InvalidResponse(format!(
350+
"Failed to parse Helicone streaming chunk for model '{}': {}",
351+
requested_model, e
352+
))
353+
})?;
354+
355+
if id.is_none() {
356+
id = value
357+
.get("id")
358+
.and_then(|v| v.as_str())
359+
.map(ToOwned::to_owned);
360+
}
361+
if created.is_none() {
362+
created = value.get("created").and_then(|v| v.as_u64());
363+
}
364+
if model.is_none() {
365+
model = value
366+
.get("model")
367+
.and_then(|v| v.as_str())
368+
.map(ToOwned::to_owned);
369+
}
370+
if let Some(chunk_usage) = value.get("usage") {
371+
usage.prompt_tokens = chunk_usage
372+
.get("prompt_tokens")
373+
.and_then(|v| v.as_u64())
374+
.unwrap_or(usage.prompt_tokens as u64) as u32;
375+
usage.completion_tokens = chunk_usage
376+
.get("completion_tokens")
377+
.and_then(|v| v.as_u64())
378+
.unwrap_or(usage.completion_tokens as u64)
379+
as u32;
380+
usage.total_tokens = chunk_usage
381+
.get("total_tokens")
382+
.and_then(|v| v.as_u64())
383+
.unwrap_or(usage.total_tokens as u64) as u32;
384+
}
385+
386+
let choices = value
387+
.get("choices")
388+
.and_then(|v| v.as_array())
389+
.ok_or_else(|| {
390+
BackendError::InvalidResponse(format!(
391+
"Helicone streaming chunk for model '{}' did not include choices",
392+
requested_model
393+
))
394+
})?;
395+
for choice in choices {
396+
if let Some(reason) = choice.get("finish_reason").and_then(|v| v.as_str()) {
397+
finish_reason = Some(reason.to_string());
398+
}
399+
let Some(delta) = choice.get("delta").and_then(|v| v.as_object()) else {
400+
continue;
401+
};
402+
if role.is_none() {
403+
role = delta
404+
.get("role")
405+
.and_then(|v| v.as_str())
406+
.map(ToOwned::to_owned);
407+
}
408+
if let Some(text) = delta.get("content").and_then(|v| v.as_str()) {
409+
content.push_str(text);
410+
}
411+
if let Some(text) = delta.get("reasoning").and_then(|v| v.as_str()) {
412+
reasoning.push_str(text);
413+
}
414+
if let Some(text) = delta.get("reasoning_content").and_then(|v| v.as_str()) {
415+
reasoning_content.push_str(text);
416+
}
417+
if let Some(calls) = delta.get("tool_calls").and_then(|v| v.as_array()) {
418+
for (fallback_index, call) in calls.iter().enumerate() {
419+
let index = call
420+
.get("index")
421+
.and_then(|v| v.as_u64())
422+
.map(|i| i as usize)
423+
.unwrap_or(fallback_index);
424+
let entry = tool_calls.entry(index).or_default();
425+
if let Some(value) = call.get("id").and_then(|v| v.as_str()) {
426+
entry.id = Some(value.to_string());
427+
}
428+
if let Some(value) = call.get("type").and_then(|v| v.as_str()) {
429+
entry.r#type = Some(value.to_string());
430+
}
431+
if let Some(function) = call.get("function") {
432+
if let Some(value) = function.get("name").and_then(|v| v.as_str()) {
433+
entry.name = Some(value.to_string());
434+
}
435+
if let Some(value) = function.get("arguments").and_then(|v| v.as_str()) {
436+
entry.arguments.push_str(value);
437+
}
438+
}
439+
}
440+
}
441+
}
442+
}
443+
444+
if !saw_chunk {
445+
return Err(BackendError::InvalidResponse(format!(
446+
"Helicone streaming response for model '{}' did not include any chunks",
447+
requested_model
448+
)));
449+
}
450+
451+
let mut tool_call_entries: Vec<(usize, ToolCall)> = Vec::new();
452+
for (index, call) in tool_calls {
453+
let Some(id) = call.id else {
454+
continue;
455+
};
456+
let Some(name) = call.name else {
457+
continue;
458+
};
459+
tool_call_entries.push((
460+
index,
461+
ToolCall {
462+
id,
463+
r#type: call.r#type.unwrap_or_else(|| "function".to_string()),
464+
function: FunctionCall {
465+
name,
466+
arguments: call.arguments,
467+
},
468+
},
469+
));
470+
}
471+
tool_call_entries.sort_by_key(|(index, _)| *index);
472+
let parsed_tool_calls: Vec<ToolCall> = tool_call_entries
473+
.into_iter()
474+
.map(|(_, call)| call)
475+
.collect();
476+
477+
let content = if content.is_empty() {
478+
None
479+
} else {
480+
Some(MessageContent::Text(content))
481+
};
482+
483+
Ok(ChatCompletionResponse {
484+
id: id.unwrap_or_else(|| "chatcmpl-helicone-stream".to_string()),
485+
object: "chat.completion".to_string(),
486+
created: created.unwrap_or(0),
487+
model: model.unwrap_or_else(|| requested_model.to_string()),
488+
choices: vec![Choice {
489+
index: 0,
490+
message: ChatMessage {
491+
role: role.unwrap_or_else(|| "assistant".to_string()),
492+
content,
493+
name: None,
494+
tool_calls: if parsed_tool_calls.is_empty() {
495+
None
496+
} else {
497+
Some(parsed_tool_calls)
498+
},
499+
tool_call_id: None,
500+
reasoning: if reasoning.is_empty() {
501+
None
502+
} else {
503+
Some(reasoning)
504+
},
505+
reasoning_content: if reasoning_content.is_empty() {
506+
None
507+
} else {
508+
Some(reasoning_content)
509+
},
510+
},
511+
finish_reason,
512+
logprobs: None,
513+
}],
514+
usage,
515+
system_fingerprint: None,
516+
})
517+
}
518+
283519
fn truncate_error_body(body: &str) -> String {
284520
const MAX_ERROR_BODY_CHARS: usize = 1024;
285521
let mut chars = body.chars();
@@ -513,6 +749,120 @@ mod tests {
513749
mock.assert_async().await;
514750
}
515751

752+
#[tokio::test]
753+
async fn chat_completion_parses_streaming_upstream_response() {
754+
let mut server = mockito::Server::new_async().await;
755+
let body = concat!(
756+
"data: {\"id\":\"chatcmpl-stream\",\"object\":\"chat.completion.chunk\",\"created\":1,\"model\":\"ollama/qwen3.6:27b\",\"choices\":[{\"index\":0,\"delta\":{\"role\":\"assistant\"},\"finish_reason\":null}]}\n\n",
757+
"data: {\"id\":\"chatcmpl-stream\",\"object\":\"chat.completion.chunk\",\"created\":1,\"model\":\"ollama/qwen3.6:27b\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"po\"},\"finish_reason\":null}]}\n\n",
758+
"data: {\"id\":\"chatcmpl-stream\",\"object\":\"chat.completion.chunk\",\"created\":1,\"model\":\"ollama/qwen3.6:27b\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"ng\"},\"finish_reason\":\"stop\"}],\"usage\":{\"prompt_tokens\":1,\"completion_tokens\":1,\"total_tokens\":2}}\n\n",
759+
"data: [DONE]\n\n",
760+
);
761+
let mock = server
762+
.mock("POST", "/v1/chat/completions")
763+
.match_body(Matcher::PartialJson(serde_json::json!({
764+
"model": "ollama/qwen3.6:27b",
765+
"stream": true
766+
})))
767+
.with_status(200)
768+
.with_header("content-type", "text/event-stream")
769+
.with_body(body)
770+
.create_async()
771+
.await;
772+
773+
let router = HeliconeRouter::new(config(format!("{}/v1/", server.url()))).unwrap();
774+
let result = router
775+
.chat_completion(
776+
"ollama/qwen3.6:27b".to_string(),
777+
vec![ChatMessage {
778+
role: "user".to_string(),
779+
content: Some(MessageContent::Text("hello".to_string())),
780+
name: None,
781+
tool_calls: None,
782+
tool_call_id: None,
783+
reasoning: None,
784+
reasoning_content: None,
785+
}],
786+
true,
787+
None,
788+
None,
789+
)
790+
.await
791+
.unwrap();
792+
793+
assert_eq!(result.model, "ollama/qwen3.6:27b");
794+
assert_eq!(
795+
result.choices[0]
796+
.message
797+
.content
798+
.as_ref()
799+
.and_then(MessageContent::to_text)
800+
.as_deref(),
801+
Some("pong")
802+
);
803+
assert_eq!(result.choices[0].finish_reason.as_deref(), Some("stop"));
804+
assert_eq!(result.usage.total_tokens, 2);
805+
mock.assert_async().await;
806+
}
807+
808+
#[tokio::test]
809+
async fn chat_completion_parses_streamed_tool_calls() {
810+
let mut server = mockito::Server::new_async().await;
811+
let body = concat!(
812+
"data: {\"id\":\"chatcmpl-tools\",\"object\":\"chat.completion.chunk\",\"created\":2,\"model\":\"ollama/qwen3.6:27b\",\"choices\":[{\"index\":0,\"delta\":{\"role\":\"assistant\",\"tool_calls\":[{\"index\":0,\"id\":\"call_1\",\"type\":\"function\",\"function\":{\"name\":\"fetch_url\",\"arguments\":\"{\\\"url\\\":\"}}]},\"finish_reason\":null}]}\n\n",
813+
"data: {\"id\":\"chatcmpl-tools\",\"object\":\"chat.completion.chunk\",\"created\":2,\"model\":\"ollama/qwen3.6:27b\",\"choices\":[{\"index\":0,\"delta\":{\"tool_calls\":[{\"index\":0,\"function\":{\"arguments\":\"\\\"https://example.test\\\"}\"}}]},\"finish_reason\":\"tool_calls\"}]}\n\n",
814+
"data: [DONE]\n\n",
815+
);
816+
let mock = server
817+
.mock("POST", "/v1/chat/completions")
818+
.match_body(Matcher::PartialJson(serde_json::json!({
819+
"model": "ollama/qwen3.6:27b",
820+
"stream": true
821+
})))
822+
.with_status(200)
823+
.with_header("content-type", "text/event-stream")
824+
.with_body(body)
825+
.create_async()
826+
.await;
827+
828+
let router = HeliconeRouter::new(config(format!("{}/v1/", server.url()))).unwrap();
829+
let result = router
830+
.chat_completion(
831+
"ollama/qwen3.6:27b".to_string(),
832+
vec![ChatMessage {
833+
role: "user".to_string(),
834+
content: Some(MessageContent::Text("use a tool".to_string())),
835+
name: None,
836+
tool_calls: None,
837+
tool_call_id: None,
838+
reasoning: None,
839+
reasoning_content: None,
840+
}],
841+
true,
842+
None,
843+
None,
844+
)
845+
.await
846+
.unwrap();
847+
848+
let tool_calls = result.choices[0]
849+
.message
850+
.tool_calls
851+
.as_ref()
852+
.expect("streamed tool calls should be preserved");
853+
assert_eq!(
854+
result.choices[0].finish_reason.as_deref(),
855+
Some("tool_calls")
856+
);
857+
assert_eq!(tool_calls[0].id, "call_1");
858+
assert_eq!(tool_calls[0].function.name, "fetch_url");
859+
assert_eq!(
860+
tool_calls[0].function.arguments,
861+
"{\"url\":\"https://example.test\"}"
862+
);
863+
mock.assert_async().await;
864+
}
865+
516866
#[tokio::test]
517867
async fn chat_completion_forwards_custom_headers() {
518868
let mut server = mockito::Server::new_async().await;

0 commit comments

Comments
 (0)