Skip to content

Commit 01355be

Browse files
authored
Merge pull request #193 from zeromq/wrap-tests-in-cfg-test
enable async dispatcher macros only for tests
2 parents a0ef148 + 3f74ee4 commit 01355be

9 files changed

+416
-382
lines changed

.github/workflows/main-ci.yml

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ jobs:
2929
uses: actions-rs/cargo@v1
3030
with:
3131
command: clippy
32-
args: --all --all-targets --no-default-features --features async-std-runtime,all-transport -- --deny warnings
32+
args: --all --all-targets --no-default-features --features async-std-runtime,all-transport,async-dispatcher-macros -- --deny warnings
3333

3434
test:
3535
name: Test
@@ -63,7 +63,7 @@ jobs:
6363
uses: actions-rs/cargo@v1
6464
with:
6565
command: test
66-
args: --all --no-default-features --features async-dispatcher-runtime,all-transport
66+
args: --all --no-default-features --features async-dispatcher-runtime,all-transport,async-dispatcher-macros
6767

6868
fmt:
6969
name: Formatting

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ default = ["tokio-runtime", "all-transport"]
1313
tokio-runtime = ["tokio", "tokio-util"]
1414
async-std-runtime = ["async-std"]
1515
async-dispatcher-runtime = ["async-std", "async-dispatcher"]
16+
async-dispatcher-macros = ["async-dispatcher/macros"]
1617
all-transport = ["ipc-transport", "tcp-transport"]
1718
ipc-transport = []
1819
tcp-transport = []

src/async_rt/mod.rs

+4-1
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,8 @@ extern crate async_std;
1212
#[cfg(feature = "async-std-runtime")]
1313
pub use async_std::{main, test};
1414

15-
#[cfg(feature = "async-dispatcher-runtime")]
15+
#[cfg(all(
16+
feature = "async-dispatcher-runtime",
17+
feature = "async-dispatcher-macros"
18+
))]
1619
pub use async_dispatcher::{main, test};

tests/message.rs

+43-40
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,48 @@
1-
use bytes::Bytes;
2-
use std::collections::vec_deque::VecDeque;
3-
use std::convert::TryFrom;
4-
use zeromq::ZmqMessage;
1+
#[cfg(test)]
2+
mod test {
3+
use bytes::Bytes;
4+
use std::collections::vec_deque::VecDeque;
5+
use std::convert::TryFrom;
6+
use zeromq::ZmqMessage;
57

6-
#[test]
7-
fn test_split_off() {
8-
let mut frames = VecDeque::with_capacity(5);
9-
frames.push_back(Bytes::from("id1"));
10-
frames.push_back(Bytes::from("id2"));
11-
frames.push_back(Bytes::from(""));
12-
frames.push_back(Bytes::from("data1"));
13-
frames.push_back(Bytes::from("data2"));
14-
let mut m = ZmqMessage::try_from(frames).unwrap();
15-
let data = m.split_off(3);
16-
assert_eq!(m.len(), 3);
17-
assert_eq!(m.get(0), Some(&Bytes::from("id1")));
18-
assert_eq!(m.get(1), Some(&Bytes::from("id2")));
19-
assert_eq!(m.get(2), Some(&Bytes::from("")));
20-
assert_eq!(data.len(), 2);
21-
assert_eq!(data.get(0), Some(&Bytes::from("data1")));
22-
assert_eq!(data.get(1), Some(&Bytes::from("data2")));
23-
}
8+
#[test]
9+
fn test_split_off() {
10+
let mut frames = VecDeque::with_capacity(5);
11+
frames.push_back(Bytes::from("id1"));
12+
frames.push_back(Bytes::from("id2"));
13+
frames.push_back(Bytes::from(""));
14+
frames.push_back(Bytes::from("data1"));
15+
frames.push_back(Bytes::from("data2"));
16+
let mut m = ZmqMessage::try_from(frames).unwrap();
17+
let data = m.split_off(3);
18+
assert_eq!(m.len(), 3);
19+
assert_eq!(m.get(0), Some(&Bytes::from("id1")));
20+
assert_eq!(m.get(1), Some(&Bytes::from("id2")));
21+
assert_eq!(m.get(2), Some(&Bytes::from("")));
22+
assert_eq!(data.len(), 2);
23+
assert_eq!(data.get(0), Some(&Bytes::from("data1")));
24+
assert_eq!(data.get(1), Some(&Bytes::from("data2")));
25+
}
2426

25-
#[test]
26-
fn test_prepend() {
27-
let mut frames = VecDeque::with_capacity(2);
28-
frames.push_back(Bytes::from("data1"));
29-
frames.push_back(Bytes::from("data2"));
30-
let mut m = ZmqMessage::try_from(frames).unwrap();
27+
#[test]
28+
fn test_prepend() {
29+
let mut frames = VecDeque::with_capacity(2);
30+
frames.push_back(Bytes::from("data1"));
31+
frames.push_back(Bytes::from("data2"));
32+
let mut m = ZmqMessage::try_from(frames).unwrap();
3133

32-
let mut envelope_frames = VecDeque::with_capacity(3);
33-
envelope_frames.push_back(Bytes::from("id1"));
34-
envelope_frames.push_back(Bytes::from("id2"));
35-
envelope_frames.push_back(Bytes::from(""));
36-
let envelope = ZmqMessage::try_from(envelope_frames).unwrap();
34+
let mut envelope_frames = VecDeque::with_capacity(3);
35+
envelope_frames.push_back(Bytes::from("id1"));
36+
envelope_frames.push_back(Bytes::from("id2"));
37+
envelope_frames.push_back(Bytes::from(""));
38+
let envelope = ZmqMessage::try_from(envelope_frames).unwrap();
3739

38-
m.prepend(&envelope);
39-
assert_eq!(m.len(), 5);
40-
assert_eq!(m.get(0), Some(&Bytes::from("id1")));
41-
assert_eq!(m.get(1), Some(&Bytes::from("id2")));
42-
assert_eq!(m.get(2), Some(&Bytes::from("")));
43-
assert_eq!(m.get(3), Some(&Bytes::from("data1")));
44-
assert_eq!(m.get(4), Some(&Bytes::from("data2")));
40+
m.prepend(&envelope);
41+
assert_eq!(m.len(), 5);
42+
assert_eq!(m.get(0), Some(&Bytes::from("id1")));
43+
assert_eq!(m.get(1), Some(&Bytes::from("id2")));
44+
assert_eq!(m.get(2), Some(&Bytes::from("")));
45+
assert_eq!(m.get(3), Some(&Bytes::from("data1")));
46+
assert_eq!(m.get(4), Some(&Bytes::from("data2")));
47+
}
4548
}

tests/pub_sub.rs

+90-87
Original file line numberDiff line numberDiff line change
@@ -1,104 +1,107 @@
1-
use zeromq::prelude::*;
2-
use zeromq::Endpoint;
3-
use zeromq::ZmqMessage;
4-
use zeromq::__async_rt as async_rt;
1+
#[cfg(test)]
2+
mod test {
3+
use zeromq::prelude::*;
4+
use zeromq::Endpoint;
5+
use zeromq::ZmqMessage;
6+
use zeromq::__async_rt as async_rt;
57

6-
use futures_channel::{mpsc, oneshot};
7-
use futures_util::{SinkExt, StreamExt};
8-
use std::time::Duration;
8+
use futures_channel::{mpsc, oneshot};
9+
use futures_util::{SinkExt, StreamExt};
10+
use std::time::Duration;
911

10-
#[async_rt::test]
11-
async fn test_pub_sub_sockets() {
12-
pretty_env_logger::try_init().ok();
12+
#[async_rt::test]
13+
async fn test_pub_sub_sockets() {
14+
pretty_env_logger::try_init().ok();
1315

14-
async fn helper(bind_addr: &'static str) {
15-
// We will join on these at the end to determine if any tasks we spawned
16-
// panicked
17-
let mut task_handles = Vec::new();
18-
let payload = chrono::Utc::now().to_rfc2822();
16+
async fn helper(bind_addr: &'static str) {
17+
// We will join on these at the end to determine if any tasks we spawned
18+
// panicked
19+
let mut task_handles = Vec::new();
20+
let payload = chrono::Utc::now().to_rfc2822();
1921

20-
let cloned_payload = payload.clone();
21-
let (server_stop_sender, mut server_stop) = oneshot::channel::<()>();
22-
let (has_bound_sender, has_bound) = oneshot::channel::<Endpoint>();
23-
task_handles.push(async_rt::task::spawn(async move {
24-
let mut pub_socket = zeromq::PubSocket::new();
25-
let bound_to = pub_socket
26-
.bind(bind_addr)
27-
.await
28-
.unwrap_or_else(|e| panic!("Failed to bind to {}: {}", bind_addr, e));
29-
has_bound_sender
30-
.send(bound_to)
31-
.expect("channel was dropped");
22+
let cloned_payload = payload.clone();
23+
let (server_stop_sender, mut server_stop) = oneshot::channel::<()>();
24+
let (has_bound_sender, has_bound) = oneshot::channel::<Endpoint>();
25+
task_handles.push(async_rt::task::spawn(async move {
26+
let mut pub_socket = zeromq::PubSocket::new();
27+
let bound_to = pub_socket
28+
.bind(bind_addr)
29+
.await
30+
.unwrap_or_else(|e| panic!("Failed to bind to {}: {}", bind_addr, e));
31+
has_bound_sender
32+
.send(bound_to)
33+
.expect("channel was dropped");
3234

33-
loop {
34-
if let Ok(Some(_)) = server_stop.try_recv() {
35-
break;
35+
loop {
36+
if let Ok(Some(_)) = server_stop.try_recv() {
37+
break;
38+
}
39+
40+
let s: String = cloned_payload.clone();
41+
let m = ZmqMessage::from(s);
42+
pub_socket.send(m).await.expect("Failed to send");
43+
async_rt::task::sleep(Duration::from_millis(1)).await;
3644
}
3745

38-
let s: String = cloned_payload.clone();
39-
let m = ZmqMessage::from(s);
40-
pub_socket.send(m).await.expect("Failed to send");
41-
async_rt::task::sleep(Duration::from_millis(1)).await;
46+
let errs = pub_socket.close().await;
47+
if !errs.is_empty() {
48+
panic!("Could not unbind socket: {:?}", errs);
49+
}
50+
}));
51+
// Block until the pub has finished binding
52+
// TODO: ZMQ sockets should not care about this sort of ordering.
53+
// See https://github.com/zeromq/zmq.rs/issues/73
54+
let bound_addr = has_bound.await.expect("channel was cancelled");
55+
if let Endpoint::Tcp(_host, port) = bound_addr.clone() {
56+
assert_ne!(port, 0);
4257
}
4358

44-
let errs = pub_socket.close().await;
45-
if !errs.is_empty() {
46-
panic!("Could not unbind socket: {:?}", errs);
47-
}
48-
}));
49-
// Block until the pub has finished binding
50-
// TODO: ZMQ sockets should not care about this sort of ordering.
51-
// See https://github.com/zeromq/zmq.rs/issues/73
52-
let bound_addr = has_bound.await.expect("channel was cancelled");
53-
if let Endpoint::Tcp(_host, port) = bound_addr.clone() {
54-
assert_ne!(port, 0);
55-
}
59+
let (sub_results_sender, sub_results) = mpsc::channel(100);
60+
for _ in 0..10 {
61+
let mut cloned_sub_sender = sub_results_sender.clone();
62+
let cloned_payload = payload.clone();
63+
let cloned_bound_addr = bound_addr.to_string();
64+
task_handles.push(async_rt::task::spawn(async move {
65+
let mut sub_socket = zeromq::SubSocket::new();
66+
sub_socket
67+
.connect(&cloned_bound_addr)
68+
.await
69+
.unwrap_or_else(|_| panic!("Failed to connect to {}", bind_addr));
5670

57-
let (sub_results_sender, sub_results) = mpsc::channel(100);
58-
for _ in 0..10 {
59-
let mut cloned_sub_sender = sub_results_sender.clone();
60-
let cloned_payload = payload.clone();
61-
let cloned_bound_addr = bound_addr.to_string();
62-
task_handles.push(async_rt::task::spawn(async move {
63-
let mut sub_socket = zeromq::SubSocket::new();
64-
sub_socket
65-
.connect(&cloned_bound_addr)
66-
.await
67-
.unwrap_or_else(|_| panic!("Failed to connect to {}", bind_addr));
71+
sub_socket.subscribe("").await.expect("Failed to subscribe");
6872

69-
sub_socket.subscribe("").await.expect("Failed to subscribe");
73+
async_rt::task::sleep(std::time::Duration::from_millis(500)).await;
7074

71-
async_rt::task::sleep(std::time::Duration::from_millis(500)).await;
75+
for _ in 0..10 {
76+
let recv_message = sub_socket.recv().await.unwrap();
77+
let recv_payload =
78+
String::from_utf8(recv_message.get(0).unwrap().to_vec()).unwrap();
79+
assert_eq!(cloned_payload, recv_payload);
80+
cloned_sub_sender.send(()).await.unwrap();
81+
}
82+
}));
83+
}
84+
drop(sub_results_sender);
85+
let res_vec: Vec<()> = sub_results.collect().await;
86+
assert_eq!(100, res_vec.len());
7287

73-
for _ in 0..10 {
74-
let recv_message = sub_socket.recv().await.unwrap();
75-
let recv_payload =
76-
String::from_utf8(recv_message.get(0).unwrap().to_vec()).unwrap();
77-
assert_eq!(cloned_payload, recv_payload);
78-
cloned_sub_sender.send(()).await.unwrap();
79-
}
80-
}));
88+
server_stop_sender.send(()).unwrap();
89+
for t in task_handles {
90+
t.await.expect("Task failed unexpectedly!");
91+
}
8192
}
82-
drop(sub_results_sender);
83-
let res_vec: Vec<()> = sub_results.collect().await;
84-
assert_eq!(100, res_vec.len());
8593

86-
server_stop_sender.send(()).unwrap();
87-
for t in task_handles {
88-
t.await.expect("Task failed unexpectedly!");
89-
}
94+
let addrs = vec![
95+
"tcp://localhost:0",
96+
"tcp://127.0.0.1:0",
97+
"tcp://[::1]:0",
98+
"tcp://127.0.0.1:0",
99+
"tcp://localhost:0",
100+
"tcp://127.0.0.1:0",
101+
"tcp://[::1]:0",
102+
"ipc://asdf.sock",
103+
"ipc://anothersocket-asdf",
104+
];
105+
futures_util::future::join_all(addrs.into_iter().map(helper)).await;
90106
}
91-
92-
let addrs = vec![
93-
"tcp://localhost:0",
94-
"tcp://127.0.0.1:0",
95-
"tcp://[::1]:0",
96-
"tcp://127.0.0.1:0",
97-
"tcp://localhost:0",
98-
"tcp://127.0.0.1:0",
99-
"tcp://[::1]:0",
100-
"ipc://asdf.sock",
101-
"ipc://anothersocket-asdf",
102-
];
103-
futures_util::future::join_all(addrs.into_iter().map(helper)).await;
104107
}

0 commit comments

Comments
 (0)