Skip to content

Commit a52617e

Browse files
committed
Add tests for MultiPacket channels
1 parent 47cbb8a commit a52617e

5 files changed

Lines changed: 129 additions & 23 deletions

File tree

src/response.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,42 @@ pub enum Response<F, E = ()> {
5050
/// A potentially unbounded stream of frames.
5151
Stream(FrameStream<F, E>),
5252
/// Frames delivered through a channel.
53+
///
54+
/// # Usage and lifecycle
55+
///
56+
/// `MultiPacket` wraps a [`tokio::sync::mpsc::Receiver`] that yields frames
57+
/// (`F`) sent from another task. The receiver should be polled until it
58+
/// returns `None`, signalling the channel has closed and no more frames will
59+
/// be sent. Frames are yielded in send order.
60+
/// Back-pressure follows the channel's capacity: senders await when it is
61+
/// full. The stream ends once all senders are dropped and `recv` returns
62+
/// `None`.
63+
///
64+
/// # Resource management
65+
///
66+
/// To avoid resource leaks or deadlocks:
67+
/// - Drop the sender once all frames are sent.
68+
/// - Poll the receiver to completion, consuming all frames.
69+
/// - If the sender is dropped early, the receiver yields `None` and no further frames will
70+
/// arrive.
71+
///
72+
/// # Examples
73+
///
74+
/// ```rust
75+
/// use tokio::sync::mpsc;
76+
/// use wireframe::Response;
77+
///
78+
/// # tokio_test::block_on(async {
79+
/// let (tx, rx) = mpsc::channel(1);
80+
/// tx.send(1u8).await.expect("send");
81+
/// drop(tx);
82+
/// if let Response::MultiPacket(mut rx) = Response::MultiPacket(rx) {
83+
/// while let Some(f) = rx.recv().await {
84+
/// assert_eq!(f, 1);
85+
/// }
86+
/// }
87+
/// # });
88+
/// ```
5389
MultiPacket(mpsc::Receiver<F>),
5490
/// A response with no frames.
5591
Empty,

tests/features/multi_packet.feature

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,3 +2,7 @@ Feature: Multi-packet responses
22
Scenario: messages from a multi-packet response are delivered sequentially
33
When a multi-packet response emits messages
44
Then all messages are received in order
5+
6+
Scenario: no messages are emitted from a multi-packet response
7+
When a multi-packet response emits no messages
8+
Then no messages are received

tests/multi_packet.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,57 @@ async fn multi_packet_yields_messages() {
2323
}
2424
assert_eq!(received, vec![TestMsg(1), TestMsg(2)]);
2525
}
26+
27+
/// Yields no messages when the channel is immediately closed.
28+
#[tokio::test]
29+
async fn multi_packet_empty_channel() {
30+
let (_tx, rx) = mpsc::channel(4);
31+
drop(_tx);
32+
let resp: Response<TestMsg, ()> = Response::MultiPacket(rx);
33+
let mut received = Vec::new();
34+
if let Response::MultiPacket(mut rx) = resp {
35+
while let Some(msg) = rx.recv().await {
36+
received.push(msg);
37+
}
38+
}
39+
assert!(received.is_empty());
40+
}
41+
42+
/// Stops yielding when the sender is dropped before all messages are sent.
43+
#[tokio::test]
44+
async fn multi_packet_sender_dropped_before_all_messages() {
45+
let (tx, rx) = mpsc::channel(4);
46+
tx.send(TestMsg(1)).await.expect("send");
47+
drop(tx);
48+
let resp: Response<TestMsg, ()> = Response::MultiPacket(rx);
49+
let mut received = Vec::new();
50+
if let Response::MultiPacket(mut rx) = resp {
51+
while let Some(msg) = rx.recv().await {
52+
received.push(msg);
53+
}
54+
}
55+
assert_eq!(received, vec![TestMsg(1)]);
56+
}
57+
58+
/// Handles more messages than the channel capacity allows.
59+
#[tokio::test]
60+
async fn multi_packet_handles_channel_capacity() {
61+
let (tx, rx) = mpsc::channel(2);
62+
let send_task = tokio::spawn(async move {
63+
for i in 0..4u8 {
64+
tx.send(TestMsg(i)).await.expect("send");
65+
}
66+
});
67+
let resp: Response<TestMsg, ()> = Response::MultiPacket(rx);
68+
let mut received = Vec::new();
69+
if let Response::MultiPacket(mut rx) = resp {
70+
while let Some(msg) = rx.recv().await {
71+
received.push(msg);
72+
}
73+
}
74+
send_task.await.expect("sender join");
75+
assert_eq!(
76+
received,
77+
vec![TestMsg(0), TestMsg(1), TestMsg(2), TestMsg(3)]
78+
);
79+
}

tests/steps/multi_packet_steps.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,9 @@ async fn when_multi(world: &mut MultiPacketWorld) { world.process().await; }
88

99
#[then("all messages are received in order")]
1010
fn then_multi(world: &mut MultiPacketWorld) { world.verify(); }
11+
12+
#[when("a multi-packet response emits no messages")]
13+
async fn when_multi_empty(world: &mut MultiPacketWorld) { world.process_empty().await; }
14+
15+
#[then("no messages are received")]
16+
fn then_multi_empty(world: &mut MultiPacketWorld) { world.verify_empty(); }

tests/world.rs

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,6 @@ impl CorrelationWorld {
149149
///
150150
/// # Panics
151151
/// Panics if the actor fails to run successfully.
152-
/// Send messages through a multi-packet response and record them.
153-
///
154-
/// # Panics
155-
///
156-
/// Panics if sending to the channel fails.
157152
pub async fn process(&mut self) {
158153
let cid = self.cid;
159154
let stream: FrameStream<Envelope> = Box::pin(try_stream! {
@@ -166,16 +161,10 @@ impl CorrelationWorld {
166161
actor.run(&mut self.frames).await.expect("actor run failed");
167162
}
168163

169-
/// Verify that all received frames carry the expected correlation id.
170-
///
171-
/// # Panics
172-
/// Panics if any frame has a `correlation_id` that does not match the
173-
/// expected value.
174-
/// Verify messages were received in order.
164+
/// Verify that all received frames carry the expected correlation ID.
175165
///
176166
/// # Panics
177-
///
178-
/// Panics if the messages are not in the expected order.
167+
/// Panics if any frame has a `correlation_id` that does not match `self.cid`.
179168
pub fn verify(&self) {
180169
assert!(
181170
self.frames
@@ -197,11 +186,6 @@ impl StreamEndWorld {
197186
///
198187
/// # Panics
199188
/// Panics if the actor fails to run successfully.
200-
/// Send messages through a multi-packet response and record them.
201-
///
202-
/// # Panics
203-
///
204-
/// Panics if sending to the channel fails.
205189
pub async fn process(&mut self) {
206190
let stream: FrameStream<u8> = Box::pin(try_stream! {
207191
yield 1u8;
@@ -233,22 +217,44 @@ impl MultiPacketWorld {
233217
/// Send messages through a multi-packet response and record them.
234218
///
235219
/// # Panics
236-
///
237220
/// Panics if sending to the channel fails.
238221
pub async fn process(&mut self) {
239-
let (tx, rx) = tokio::sync::mpsc::channel(4);
222+
let (tx, ch_rx) = tokio::sync::mpsc::channel(4);
240223
tx.send(1u8).await.expect("send");
241224
tx.send(2u8).await.expect("send");
242225
tx.send(3u8).await.expect("send");
243226
drop(tx);
244-
let resp: wireframe::Response<u8, ()> = wireframe::Response::MultiPacket(rx);
245-
if let wireframe::Response::MultiPacket(mut rx) = resp {
246-
while let Some(msg) = rx.recv().await {
227+
let resp: wireframe::Response<u8, ()> = wireframe::Response::MultiPacket(ch_rx);
228+
if let wireframe::Response::MultiPacket(mut mp_rx) = resp {
229+
while let Some(msg) = mp_rx.recv().await {
230+
self.messages.push(msg);
231+
}
232+
}
233+
}
234+
235+
/// Send no messages through the channel and record them.
236+
///
237+
/// # Panics
238+
/// Panics if sending to the channel fails.
239+
pub async fn process_empty(&mut self) {
240+
let (_tx, ch_rx) = tokio::sync::mpsc::channel(4);
241+
drop(_tx);
242+
let resp: wireframe::Response<u8, ()> = wireframe::Response::MultiPacket(ch_rx);
243+
if let wireframe::Response::MultiPacket(mut mp_rx) = resp {
244+
while let Some(msg) = mp_rx.recv().await {
247245
self.messages.push(msg);
248246
}
249247
}
250248
}
251249

250+
/// Verify that no messages were received.
251+
///
252+
/// # Panics
253+
/// Panics if any messages are present.
254+
pub fn verify_empty(&self) {
255+
assert!(self.messages.is_empty());
256+
}
257+
252258
/// Verify messages were received in order.
253259
///
254260
/// # Panics

0 commit comments

Comments
 (0)