Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/multi-packet-and-streaming-responses-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ This design allows simple, single-frame handlers to remain unchanged
(`Ok(my_frame.into())`) while providing powerful and efficient options for more
complex cases.

To simplify consumption, `Response::into_stream` converts any `Response`
variant into a `FrameStream`. Downstream code can iterate over frames without
matching `MultiPacket` or wiring channels.

### 4.2 The `WireframeError` Enum

To enable more robust error handling, a generic error enum will be introduced.
Expand Down
40 changes: 40 additions & 0 deletions src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,46 @@ impl<F, E> From<Vec<F>> for Response<F, E> {
fn from(v: Vec<F>) -> Self { Response::Vec(v) }
}

impl<F: Send + 'static, E: Send + 'static> Response<F, E> {
/// Convert this response into a stream of frames.
///
/// # Examples
///
/// ```
/// use futures::TryStreamExt;
/// use wireframe::Response;
///
/// # async fn demo() {
/// let (tx, rx) = tokio::sync::mpsc::channel(1);
/// tx.send(1u8).await.expect("send");
/// drop(tx);
/// let resp: Response<u8, ()> = Response::MultiPacket(rx);
/// let frames: Vec<u8> = resp
/// .into_stream()
/// .try_collect()
/// .await
/// .expect("stream error");
/// assert_eq!(frames, vec![1]);
/// # }
/// ```
Comment thread
coderabbitai[bot] marked this conversation as resolved.
#[must_use]
pub fn into_stream(self) -> FrameStream<F, E> {
Comment thread
leynos marked this conversation as resolved.
match self {
Response::Single(frame) => Box::pin(futures::stream::once(async move {
Ok::<F, WireframeError<E>>(frame)
})),
Response::Vec(frames) => Box::pin(futures::stream::iter(
frames.into_iter().map(|f| Ok::<F, WireframeError<E>>(f)),
)),
Response::Stream(stream) => stream,
Response::MultiPacket(rx) => Box::pin(futures::stream::unfold(rx, |mut rx| async {
rx.recv().await.map(|f| (Ok::<F, WireframeError<E>>(f), rx))
})),
Response::Empty => Box::pin(futures::stream::empty()),
}
}
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

/// A generic error type for wireframe operations.
///
/// # Examples
Expand Down
38 changes: 10 additions & 28 deletions tests/multi_packet.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
//! Tests for multi-packet responses using channels.

use futures::TryStreamExt;
use tokio::sync::mpsc;
use wireframe::Response;

#[derive(PartialEq, Debug)]
struct TestMsg(u8);

/// Drain all messages from the receiver.
async fn drain_all(mut rx: mpsc::Receiver<TestMsg>) -> Vec<TestMsg> {
let mut messages = Vec::new();
while let Some(msg) = rx.recv().await {
messages.push(msg);
}
messages
/// Drain all messages from the stream.
async fn drain_all(stream: wireframe::FrameStream<TestMsg, ()>) -> Vec<TestMsg> {
stream.try_collect::<Vec<_>>().await.expect("stream error")
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

/// Verifies that all messages sent through the channel are yielded by `Response::MultiPacket`.
/// Verify that all messages sent through the channel are yielded via
/// `Response::into_stream()` for the `MultiPacket` variant.
#[tokio::test]
async fn multi_packet_yields_messages() {
let (tx, rx) = mpsc::channel(4);
Expand All @@ -24,11 +22,7 @@ async fn multi_packet_yields_messages() {
drop(tx);

let resp: Response<TestMsg, ()> = Response::MultiPacket(rx);
let received = if let Response::MultiPacket(rx) = resp {
drain_all(rx).await
} else {
unreachable!()
};
let received = drain_all(resp.into_stream()).await;
assert_eq!(received, vec![TestMsg(1), TestMsg(2)]);
}

Expand All @@ -38,11 +32,7 @@ async fn multi_packet_empty_channel() {
let (tx, rx) = mpsc::channel(4);
drop(tx);
let resp: Response<TestMsg, ()> = Response::MultiPacket(rx);
let received = if let Response::MultiPacket(rx) = resp {
drain_all(rx).await
} else {
unreachable!()
};
let received = drain_all(resp.into_stream()).await;
assert!(received.is_empty());
}

Expand All @@ -53,11 +43,7 @@ async fn multi_packet_sender_dropped_before_all_messages() {
tx.send(TestMsg(1)).await.expect("send");
drop(tx);
let resp: Response<TestMsg, ()> = Response::MultiPacket(rx);
let received = if let Response::MultiPacket(rx) = resp {
drain_all(rx).await
} else {
unreachable!()
};
let received = drain_all(resp.into_stream()).await;
assert_eq!(received, vec![TestMsg(1)]);
}

Expand All @@ -71,11 +57,7 @@ async fn multi_packet_handles_channel_capacity() {
}
});
let resp: Response<TestMsg, ()> = Response::MultiPacket(rx);
let received = if let Response::MultiPacket(rx) = resp {
drain_all(rx).await
} else {
unreachable!()
};
let received = drain_all(resp.into_stream()).await;
send_task.await.expect("sender join");
assert_eq!(
Comment thread
leynos marked this conversation as resolved.
received,
Expand Down
12 changes: 7 additions & 5 deletions tests/world.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{net::SocketAddr, sync::Arc};

use async_stream::try_stream;
use cucumber::World;
use futures::TryStreamExt;
use tokio::{net::TcpStream, sync::oneshot};
use tokio_util::sync::CancellationToken;
use wireframe::{
Expand Down Expand Up @@ -216,11 +217,12 @@ pub struct MultiPacketWorld {

impl MultiPacketWorld {
async fn drain(&mut self, resp: wireframe::Response<u8, ()>) {
if let wireframe::Response::MultiPacket(mut mp_rx) = resp {
while let Some(msg) = mp_rx.recv().await {
self.messages.push(msg);
}
}
let frames = resp
.into_stream()
.try_collect::<Vec<_>>()
.await
.expect("stream error");
self.messages.extend(frames);
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

/// Helper method to process messages through a multi-packet response.
Expand Down
Loading