Skip to content

Commit f6fadb0

Browse files
author
zhaojing.jz
committed
fix: check [DONE] SSE marker before deserialization in stream_mapped_raw_events
PR #537 refactored the streaming pipeline and removed reqwest-eventsource. The old `stream()` function checked for the `[DONE]` SSE end-of-stream marker *before* passing event data to `serde_json::from_str`. The refactored version delegated to `stream_mapped_raw_events`, which checks `[DONE]` *after* calling the `event_mapper` closure. Since the mapper in `stream()` deserializes JSON, this causes `[DONE]` to fail with: failed deserialization of: [DONE] Move the `[DONE]` check before the mapper call in both the WASM (futures::stream::unfold) and non-WASM (tokio::mpsc) implementations. Closes #542
1 parent b7fd7ea commit f6fadb0

1 file changed

Lines changed: 10 additions & 14 deletions

File tree

async-openai/src/client.rs

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -791,12 +791,8 @@ where
791791
let event_stream = Box::pin(eventsource_stream::EventStream::new(byte_stream));
792792

793793
Box::pin(futures::stream::unfold(
794-
(event_stream, event_mapper, false),
795-
|(mut event_stream, event_mapper, finished)| async move {
796-
if finished {
797-
return None;
798-
}
799-
794+
(event_stream, event_mapper),
795+
|(mut event_stream, event_mapper)| async move {
800796
loop {
801797
let event = match event_stream.next().await {
802798
Some(Ok(event)) => event,
@@ -805,20 +801,22 @@ where
805801
Err(OpenAIError::StreamError(Box::new(
806802
StreamError::EventStream(error.to_string()),
807803
))),
808-
(event_stream, event_mapper, true),
804+
(event_stream, event_mapper),
809805
));
810806
}
811807
None => return None,
812808
};
813809

814-
let done = event.data == "[DONE]";
810+
if event.data == "[DONE]" {
811+
return None;
812+
}
815813

816814
if event.event == "keepalive" {
817815
continue;
818816
}
819817

820818
let response = event_mapper(event);
821-
return Some((response, (event_stream, event_mapper, done)));
819+
return Some((response, (event_stream, event_mapper)));
822820
}
823821
},
824822
))
@@ -856,7 +854,9 @@ where
856854
break;
857855
}
858856
};
859-
let done = event.data == "[DONE]";
857+
if event.data == "[DONE]" {
858+
break;
859+
}
860860

861861
if event.event == "keepalive" {
862862
continue;
@@ -867,10 +867,6 @@ where
867867
if tx.send(response).is_err() {
868868
break;
869869
}
870-
871-
if done {
872-
break;
873-
}
874870
}
875871
});
876872

0 commit comments

Comments
 (0)