Skip to content

Commit e09b3cd

Browse files
committed
openai: handling for responses
Add parsing logic for responses stream events. Signed-off-by: Yusuke Shimizu <stm1051212@gmail.com>
1 parent dc71b88 commit e09b3cd

File tree

1 file changed

+126
-2
lines changed

1 file changed

+126
-2
lines changed

crates/goose/src/providers/formats/openai_responses.rs

Lines changed: 126 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,52 @@ pub enum ResponsesStreamEvent {
170170
Error { error: Value },
171171
}
172172

173+
fn is_known_responses_stream_event_type(event_type: &str) -> bool {
174+
matches!(
175+
event_type,
176+
"response.created"
177+
| "response.in_progress"
178+
| "response.output_item.added"
179+
| "response.content_part.added"
180+
| "response.output_text.delta"
181+
| "response.output_item.done"
182+
| "response.content_part.done"
183+
| "response.output_text.done"
184+
| "response.completed"
185+
| "response.failed"
186+
| "response.function_call_arguments.delta"
187+
| "response.function_call_arguments.done"
188+
| "error"
189+
)
190+
}
191+
192+
fn parse_responses_stream_event(data_line: &str) -> anyhow::Result<Option<ResponsesStreamEvent>> {
193+
let raw_event: Value = serde_json::from_str(data_line).map_err(|e| {
194+
anyhow!(
195+
"Failed to parse Responses stream event: {}: {:?}",
196+
e,
197+
data_line
198+
)
199+
})?;
200+
201+
let Some(event_type) = raw_event.get("type").and_then(Value::as_str) else {
202+
return Ok(None);
203+
};
204+
205+
if !is_known_responses_stream_event_type(event_type) {
206+
return Ok(None);
207+
}
208+
209+
let event = serde_json::from_value(raw_event).map_err(|e| {
210+
anyhow!(
211+
"Failed to parse Responses stream event: {}: {:?}",
212+
e,
213+
data_line
214+
)
215+
})?;
216+
Ok(Some(event))
217+
}
218+
173219
#[derive(Debug, Serialize, Deserialize)]
174220
pub struct ResponseMetadata {
175221
pub id: String,
@@ -589,6 +635,9 @@ where
589635
if response_str.trim().is_empty() {
590636
continue;
591637
}
638+
if response_str.starts_with(':') {
639+
continue;
640+
}
592641

593642
// Parse SSE format: "event: <type>\ndata: <json>"
594643
// For now, we only care about the data line
@@ -606,8 +655,9 @@ where
606655
break 'outer;
607656
}
608657

609-
let event: ResponsesStreamEvent = serde_json::from_str(data_line)
610-
.map_err(|e| anyhow!("Failed to parse Responses stream event: {}: {:?}", e, data_line))?;
658+
let Some(event) = parse_responses_stream_event(data_line)? else {
659+
continue;
660+
};
611661

612662
match event {
613663
ResponsesStreamEvent::ResponseCreated { response, .. } |
@@ -703,3 +753,77 @@ where
703753
}
704754
}
705755
}
756+
757+
#[cfg(test)]
758+
mod tests {
759+
use super::*;
760+
use crate::conversation::message::MessageContent;
761+
use futures::StreamExt;
762+
763+
#[tokio::test]
764+
async fn test_responses_stream_ignores_keepalive_event() -> anyhow::Result<()> {
765+
let lines = vec![
766+
r#"data: {"type":"response.created","sequence_number":1,"response":{"id":"resp_1","object":"response","created_at":1737368310,"status":"in_progress","model":"gpt-5.2-pro","output":[]}}"#.to_string(),
767+
r#"data: {"type":"keepalive"}"#.to_string(),
768+
r#"data: {"type":"response.output_text.delta","sequence_number":2,"item_id":"msg_1","output_index":0,"content_index":0,"delta":"Hello"}"#.to_string(),
769+
r#"data: {"type":"response.output_text.delta","sequence_number":3,"item_id":"msg_1","output_index":0,"content_index":0,"delta":" world"}"#.to_string(),
770+
r#"data: {"type":"response.completed","sequence_number":4,"response":{"id":"resp_1","object":"response","created_at":1737368310,"status":"completed","model":"gpt-5.2-pro","output":[],"usage":{"input_tokens":10,"output_tokens":4,"total_tokens":14}}}"#.to_string(),
771+
"data: [DONE]".to_string(),
772+
];
773+
774+
let response_stream = tokio_stream::iter(lines.into_iter().map(Ok));
775+
let messages = responses_api_to_streaming_message(response_stream);
776+
futures::pin_mut!(messages);
777+
778+
let mut text_parts = Vec::new();
779+
let mut usage: Option<ProviderUsage> = None;
780+
781+
while let Some(item) = messages.next().await {
782+
let (message, maybe_usage) = item?;
783+
if let Some(msg) = message {
784+
for content in msg.content {
785+
if let MessageContent::Text(text) = content {
786+
text_parts.push(text.text.clone());
787+
}
788+
}
789+
}
790+
if let Some(final_usage) = maybe_usage {
791+
usage = Some(final_usage);
792+
}
793+
}
794+
795+
assert_eq!(text_parts.concat(), "Hello world");
796+
let usage = usage.expect("usage should be present at completion");
797+
assert_eq!(usage.model, "gpt-5.2-pro");
798+
assert_eq!(usage.usage.input_tokens, Some(10));
799+
assert_eq!(usage.usage.output_tokens, Some(4));
800+
assert_eq!(usage.usage.total_tokens, Some(14));
801+
802+
Ok(())
803+
}
804+
805+
#[tokio::test]
806+
async fn test_responses_stream_error_event_still_returns_error() -> anyhow::Result<()> {
807+
let lines = vec![
808+
r#"data: {"type":"error","error":{"message":"boom"}}"#.to_string(),
809+
"data: [DONE]".to_string(),
810+
];
811+
812+
let response_stream = tokio_stream::iter(lines.into_iter().map(Ok));
813+
let messages = responses_api_to_streaming_message(response_stream);
814+
futures::pin_mut!(messages);
815+
816+
let first = messages
817+
.next()
818+
.await
819+
.expect("stream should emit an error item");
820+
821+
assert!(first.is_err());
822+
assert!(first
823+
.expect_err("expected error")
824+
.to_string()
825+
.contains("Responses API error"));
826+
827+
Ok(())
828+
}
829+
}

0 commit comments

Comments
 (0)