Skip to content

Commit 2c3d135

Browse files
committed
Add real-time mode assertion for ZMQ nodes
ZMQ adapters only work in real-time mode since they use network I/O. Add assert_realtime parameter to ReceiverStream and check in start() for both sender and receiver nodes. Add tests to verify historical mode fails with appropriate error message.
1 parent 5c11004 commit 2c3d135

File tree

2 files changed

+43
-5
lines changed

2 files changed

+43
-5
lines changed

wingfoil/src/adapters/zmq.rs

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ use std::rc::Rc;
33

44
use crate::channel::{ChannelSender, Message};
55
use crate::{
6-
Element, GraphState, IntoNode, IntoStream, MutableNode, Node, ReceiverStream, Stream, UpStreams,
6+
Element, GraphState, IntoNode, IntoStream, MutableNode, Node, ReceiverStream, RunMode, Stream,
7+
UpStreams,
78
};
89
use derive_new::new;
910
use serde::Serialize;
@@ -77,8 +78,7 @@ pub fn zmq_rec<T: Element + Send + DeserializeOwned>(
7778
) -> Rc<dyn Stream<TinyVec<[T; 1]>>> {
7879
let subscriber = ZeroMqSubscriber::new(address.to_string());
7980
let f = move |channel_sender| subscriber.run(channel_sender);
80-
let receiver_stream = ReceiverStream::new(f);
81-
receiver_stream.into_stream()
81+
ReceiverStream::new(f, true).into_stream()
8282
}
8383

8484
#[derive(new)]
@@ -108,7 +108,10 @@ impl<T: Element + Send + Serialize> MutableNode for ZeroMqSenderNode<T> {
108108
UpStreams::new(vec![self.src.clone().as_node()], vec![])
109109
}
110110

111-
fn start(&mut self, _: &mut GraphState) -> anyhow::Result<()> {
111+
fn start(&mut self, state: &mut GraphState) -> anyhow::Result<()> {
112+
if state.run_mode() != RunMode::RealTime {
113+
anyhow::bail!("ZMQ nodes only support real-time mode");
114+
}
112115
let context = zmq::Context::new();
113116
let socket = context.socket(zmq::SocketType::PUB)?;
114117
let address = format!("tcp://127.0.0.1:{:}", self.port);
@@ -252,4 +255,31 @@ mod tests {
252255
send.join().unwrap().unwrap();
253256
rec.join().unwrap().unwrap();
254257
}
258+
259+
#[test]
260+
fn zmq_send_historical_mode_fails() {
261+
use crate::NanoTime;
262+
let result = sender(Duration::from_millis(10), 5558)
263+
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever);
264+
let err = result.expect_err("expected historical mode to fail for zmq sender");
265+
let err_msg = format!("{err:?}");
266+
assert!(
267+
err_msg.contains("real-time"),
268+
"expected error to mention real-time, got: {err_msg}"
269+
);
270+
}
271+
272+
#[test]
273+
fn zmq_rec_historical_mode_fails() {
274+
use crate::NanoTime;
275+
let result = zmq_rec::<u64>("tcp://127.0.0.1:5559")
276+
.as_node()
277+
.run(RunMode::HistoricalFrom(NanoTime::ZERO), RunFor::Forever);
278+
let err = result.expect_err("expected historical mode to fail for zmq receiver");
279+
let err_msg = format!("{err:?}");
280+
assert!(
281+
err_msg.contains("real-time"),
282+
"expected error to mention real-time, got: {err_msg}"
283+
);
284+
}
255285
}

wingfoil/src/nodes/receiver.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ pub(crate) struct ReceiverStream<T: Element + Send> {
5151
inner: ChannelReceiverStream<T>,
5252
sender: Option<ChannelSender<T>>,
5353
state: State<T>,
54+
assert_realtime: bool,
5455
}
5556

5657
impl<T: Element + Send> MutableNode for ReceiverStream<T> {
@@ -76,6 +77,9 @@ impl<T: Element + Send> MutableNode for ReceiverStream<T> {
7677
}
7778

7879
fn start(&mut self, state: &mut crate::GraphState) -> anyhow::Result<()> {
80+
if self.assert_realtime && state.run_mode() != RunMode::RealTime {
81+
anyhow::bail!("ReceiverStream only supports real-time mode");
82+
}
7983
self.inner.start(state)
8084
}
8185

@@ -96,7 +100,10 @@ impl<T: Element + Send> StreamPeekRef<TinyVec<[T; 1]>> for ReceiverStream<T> {
96100
}
97101

98102
impl<T: Element + Send> ReceiverStream<T> {
99-
pub(crate) fn new(f: impl Fn(ChannelSender<T>) -> anyhow::Result<()> + Send + 'static) -> Self {
103+
pub(crate) fn new(
104+
f: impl Fn(ChannelSender<T>) -> anyhow::Result<()> + Send + 'static,
105+
assert_realtime: bool,
106+
) -> Self {
100107
let (sender, receiver) = channel_pair(None);
101108
let inner = ChannelReceiverStream::new(receiver, None, None);
102109
let sender = Some(sender);
@@ -105,6 +112,7 @@ impl<T: Element + Send> ReceiverStream<T> {
105112
inner,
106113
sender,
107114
state,
115+
assert_realtime,
108116
}
109117
}
110118
}

0 commit comments

Comments
 (0)