Skip to content

Commit cc3232e

Browse files
adding unit tests around timeout
1 parent fafd818 commit cc3232e

File tree

2 files changed

+102
-2
lines changed

2 files changed

+102
-2
lines changed

src/tests/test_async_blocking.rs

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@ use super::common::*;
22
use crate::*;
33
use log::*;
44
use rstest::*;
5-
use std::sync::Arc;
65
use std::sync::atomic::{AtomicUsize, Ordering};
6+
use std::sync::Arc;
77
use std::thread;
88
use std::time::Duration;
99

@@ -49,6 +49,58 @@ async fn test_basic_1_tx_async_1_rx_blocking<T: AsyncTxTrait<usize>, R: Blocking
4949
rt.shutdown_background(); // Prevent panic on runtime drop
5050
}
5151

52+
#[rstest]
53+
#[case(spsc::bounded_tx_async_rx_blocking::<usize>(100))]
54+
#[case(mpsc::bounded_tx_async_rx_blocking::<usize>(100))]
55+
#[case(mpmc::bounded_tx_async_rx_blocking::<usize>(100))]
56+
#[tokio::test]
57+
async fn test_timeout_1_tx_async_1_rx_blocking<
58+
T: AsyncTxTrait<usize>,
59+
R: BlockingRxTrait<usize>,
60+
>(
61+
setup_log: (), #[case] channel: (T, R),
62+
) {
63+
let _ = setup_log; // Disable unused var warning
64+
let (tx, rx) = channel;
65+
66+
let rx_res = rx.try_recv();
67+
assert!(rx_res.is_err());
68+
assert!(rx_res.unwrap_err().is_empty());
69+
let batch_1: usize = 100;
70+
let batch_2: usize = 200;
71+
let rt = get_runtime();
72+
rt.spawn(async move {
73+
for i in 0..batch_1 {
74+
let tx_res = tx.send(i).await;
75+
assert!(tx_res.is_ok());
76+
}
77+
for i in batch_1..(batch_1 + batch_2) {
78+
assert!(tx.send(10 + i).await.is_ok());
79+
tokio::time::sleep(Duration::from_millis(2)).await;
80+
}
81+
82+
tokio::time::sleep(Duration::from_millis(200)).await;
83+
assert!(tx.send(123).await.is_ok());
84+
});
85+
for _ in 0..(batch_1 + batch_2) {
86+
match rx.recv() {
87+
Ok(i) => {
88+
debug!("recv {}", i);
89+
}
90+
Err(e) => {
91+
panic!("error {}", e);
92+
}
93+
}
94+
}
95+
96+
assert!(rx.recv_timeout(Duration::from_millis(100)).is_err());
97+
assert!(rx.recv_timeout(Duration::from_millis(200)).is_ok());
98+
99+
let res = rx.recv();
100+
assert!(res.is_err());
101+
rt.shutdown_background(); // Prevent panic on runtime drop
102+
}
103+
52104
#[rstest]
53105
#[case(mpsc::bounded_tx_async_rx_blocking::<usize>(10), 8)]
54106
#[case(mpsc::bounded_tx_async_rx_blocking::<usize>(10), 100)]

src/tests/test_blocking_async.rs

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ use crate::*;
33
use log::*;
44
use rstest::*;
55
use std::sync::{
6-
Arc,
76
atomic::{AtomicUsize, Ordering},
7+
Arc,
88
};
99
use std::thread;
1010
use std::time::*;
@@ -52,6 +52,54 @@ async fn test_basic_1_tx_blocking_1_rx_async<T: BlockingTxTrait<usize>, R: Async
5252
let _ = th.join();
5353
}
5454

55+
#[rstest]
56+
#[case(spsc::bounded_tx_blocking_rx_async::<usize>(10))]
57+
#[case(mpsc::bounded_tx_blocking_rx_async::<usize>(10))]
58+
#[case(mpmc::bounded_tx_blocking_rx_async::<usize>(10))]
59+
#[tokio::test]
60+
async fn test_timeout_1_tx_blocking_1_rx_async<
61+
T: BlockingTxTrait<usize>,
62+
R: AsyncRxTrait<usize>,
63+
>(
64+
setup_log: (), #[case] channel: (T, R),
65+
) {
66+
let _ = setup_log; // Disable unused var warning
67+
let (tx, rx) = channel;
68+
let rx_res = rx.try_recv();
69+
assert!(rx_res.is_err());
70+
assert!(rx_res.unwrap_err().is_empty());
71+
for i in 0usize..10 {
72+
let tx_res = tx.send(i);
73+
assert!(tx_res.is_ok());
74+
}
75+
let tx_res = tx.send_timeout(11, Duration::from_millis(100));
76+
assert!(tx_res.is_err());
77+
assert!(tx_res.unwrap_err().is_timeout());
78+
79+
let th = thread::spawn(move || {
80+
assert!(tx.send_timeout(10, Duration::from_millis(100)).is_err());
81+
assert!(tx.send_timeout(10, Duration::from_millis(200)).is_ok());
82+
});
83+
84+
tokio::time::sleep(Duration::from_millis(200)).await;
85+
86+
for i in 0usize..11 {
87+
match rx.recv().await {
88+
Ok(j) => {
89+
debug!("recv {}", i);
90+
assert_eq!(i, j);
91+
}
92+
Err(e) => {
93+
panic!("error {}", e);
94+
}
95+
}
96+
}
97+
let res = rx.recv().await;
98+
assert!(res.is_err());
99+
debug!("rx close");
100+
let _ = th.join();
101+
}
102+
55103
#[rstest]
56104
#[case(spsc::bounded_tx_blocking_rx_async::<usize>(1))]
57105
#[case(mpsc::bounded_tx_blocking_rx_async::<usize>(1))]

0 commit comments

Comments
 (0)