-
Notifications
You must be signed in to change notification settings - Fork 108
Expand file tree
/
Copy pathsse.rs
More file actions
59 lines (51 loc) · 1.62 KB
/
sse.rs
File metadata and controls
59 lines (51 loc) · 1.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
use std::{convert::From, future};
use async_sse::{Event as SseEvent, decode};
use crux_core::{Request, capability::Operation, command::StreamBuilder};
use facet::Facet;
use futures::{Stream, StreamExt, io::Cursor};
use serde::{Deserialize, Serialize, de::DeserializeOwned};
#[derive(Facet, Serialize, Deserialize, Clone, Debug, PartialEq, Eq)]
pub struct SseRequest {
pub url: String,
}
#[derive(Facet, Serialize, Deserialize, Debug, PartialEq, Eq)]
#[repr(C)]
pub enum SseResponse {
Chunk(Vec<u8>),
Done,
}
impl SseResponse {
#[must_use]
pub fn is_done(&self) -> bool {
matches!(self, SseResponse::Done)
}
}
impl Operation for SseRequest {
type Output = SseResponse;
}
pub fn get<Effect, Event, T>(
url: impl Into<String>,
) -> StreamBuilder<Effect, Event, impl Stream<Item = T>>
where
Effect: From<Request<SseRequest>> + Send + 'static,
Event: Send + 'static,
T: Send + DeserializeOwned,
{
let url = url.into();
StreamBuilder::new(|ctx| {
ctx.stream_from_shell(SseRequest { url })
.take_while(|response| future::ready(!response.is_done()))
.flat_map(|response| {
let SseResponse::Chunk(data) = response else {
unreachable!()
};
decode(Cursor::new(data))
})
.filter_map(|sse_event| async {
sse_event.ok().and_then(|event| match event {
SseEvent::Message(msg) => serde_json::from_slice(msg.data()).ok(),
SseEvent::Retry(_) => None, // do we need to worry about this?
})
})
})
}