Skip to content

Commit b9f360f

Browse files
committed
Make consume_async propagate errors via Result
- consume_async now requires Future<Output = anyhow::Result<()>> - kdb_write_consumer uses ? operator to propagate connection/insert errors - Errors are surfaced in teardown via block_on(handle)?? - Updated all async consumers in examples/tests to return Ok(())
1 parent 1475a9a commit b9f360f

File tree

6 files changed

+22
-23
lines changed

6 files changed

+22
-23
lines changed

wingfoil/examples/async/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ let consumer = async move |mut source: Pin<Box<dyn FutStream<u32>>>| {
5252
while let Some((time, value)) = source.next().await {
5353
println!("{time:?}, {value:?}");
5454
}
55+
Ok(())
5556
};
5657

5758
produce_async(producer)

wingfoil/examples/async/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ fn main() {
1111
let run_for = RunFor::Duration(period * 5);
1212
let run_mode = RunMode::RealTime;
1313

14-
let producer = move |_ctx: RunParams| async move {
14+
let producer = move |_| async move {
1515
Ok(stream! {
1616
for i in 0.. {
1717
tokio::time::sleep(period).await; // simulate waiting IO
@@ -25,6 +25,7 @@ fn main() {
2525
while let Some((time, value)) = source.next().await {
2626
println!("{time:?}, {value:?}");
2727
}
28+
Ok(())
2829
};
2930

3031
produce_async(producer)

wingfoil/examples/rfq/order_gateway.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@ pub trait OrderGateway {
1717
pub struct RealTimeOrderGateway {}
1818

1919
impl RealTimeOrderGateway {
20-
async fn consume_orders(mut source: Pin<Box<dyn FutStream<TinyVec<[Order; 1]>>>>) {
20+
async fn consume_orders(
21+
mut source: Pin<Box<dyn FutStream<TinyVec<[Order; 1]>>>>,
22+
) -> anyhow::Result<()> {
2123
while let Some((_time, _value)) = source.next().await {
2224
//println!("{time:?}, {value:?}");
2325
}
26+
Ok(())
2427
}
2528
}
2629

wingfoil/src/adapters/kdb/write.rs

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -88,26 +88,20 @@ async fn kdb_write_consumer<T>(
8888
connection: KdbConnection,
8989
table_name: String,
9090
mut source: Pin<Box<dyn FutStream<TinyVec<[T; 1]>>>>,
91-
) where
91+
) -> anyhow::Result<()>
92+
where
9293
T: Element + Send + KdbSerialize + 'static,
9394
{
9495
let creds = connection.credentials_string();
9596

9697
// Connect to KDB
97-
let mut socket = match QStream::connect(
98+
let mut socket = QStream::connect(
9899
ConnectionMethod::TCP,
99100
&connection.host,
100101
connection.port,
101102
&creds,
102103
)
103-
.await
104-
{
105-
Ok(s) => s,
106-
Err(e) => {
107-
log::error!("KDB connection failed: {}", e);
108-
return;
109-
}
110-
};
104+
.await?;
111105

112106
// Process incoming records
113107
while let Some((_time, batch)) = source.next().await {
@@ -121,11 +115,10 @@ async fn kdb_write_consumer<T>(
121115
]);
122116

123117
// Send sync message to ensure insert completes before continuing
124-
if let Err(e) = socket.send_sync_message(&query).await {
125-
log::error!("KDB insert failed: {}", e);
126-
}
118+
socket.send_sync_message(&query).await?;
127119
}
128120
}
121+
Ok(())
129122
}
130123

131124
/// Extension trait for writing streams to KDB+ tables.

wingfoil/src/nodes/async_io.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,19 +58,19 @@ type ConsumerFunc<T, FUT> = Box<dyn FnOnce(Pin<Box<dyn FutStream<T>>>) -> FUT +
5858
pub(crate) struct AsyncConsumerNode<T, FUT>
5959
where
6060
T: Element + Send,
61-
FUT: Future<Output = ()> + Send + 'static,
61+
FUT: Future<Output = anyhow::Result<()>> + Send + 'static,
6262
{
6363
source: Rc<dyn Stream<T>>,
6464
sender: ChannelSender<T>,
6565
func: Option<ConsumerFunc<T, FUT>>,
66-
handle: Option<tokio::task::JoinHandle<()>>,
66+
handle: Option<tokio::task::JoinHandle<anyhow::Result<()>>>,
6767
rx: Option<ChannelReceiver<T>>,
6868
}
6969

7070
impl<T, FUT> AsyncConsumerNode<T, FUT>
7171
where
7272
T: Element + Send,
73-
FUT: Future<Output = ()> + Send + 'static,
73+
FUT: Future<Output = anyhow::Result<()>> + Send + 'static,
7474
{
7575
pub fn new(source: Rc<dyn Stream<T>>, func: ConsumerFunc<T, FUT>) -> Self {
7676
let (sender, receiver) = channel_pair(None);
@@ -91,7 +91,7 @@ where
9191
impl<T, FUT> MutableNode for AsyncConsumerNode<T, FUT>
9292
where
9393
T: Element + Send,
94-
FUT: Future<Output = ()> + Send + 'static,
94+
FUT: Future<Output = anyhow::Result<()>> + Send + 'static,
9595
{
9696
fn cycle(&mut self, state: &mut GraphState) -> anyhow::Result<bool> {
9797
self.sender.send(state, self.source.peek_value())?;
@@ -119,7 +119,7 @@ where
119119
.limit(run_mode, run_for)
120120
.to_stream();
121121
let fut = func(Box::pin(src));
122-
fut.await;
122+
fut.await
123123
};
124124
let handle = state.tokio_runtime().spawn(f);
125125
self.handle = Some(handle);
@@ -133,7 +133,7 @@ where
133133

134134
fn teardown(&mut self, state: &mut GraphState) -> anyhow::Result<()> {
135135
if let Some(handle) = self.handle.take() {
136-
state.tokio_runtime().block_on(handle)?;
136+
state.tokio_runtime().block_on(handle)??;
137137
}
138138
Ok(())
139139
}
@@ -405,6 +405,7 @@ mod tests {
405405
while let Some((time, value)) = source.next().await {
406406
println!("{time:?}, {value:?}");
407407
}
408+
Ok(())
408409
};
409410

410411
produce_async(example_producer)

wingfoil/src/nodes/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ pub trait StreamOperators<T: Element> {
250250
) -> Rc<dyn Node>
251251
where
252252
T: Element + Send,
253-
FUT: Future<Output = ()> + Send + 'static;
253+
FUT: Future<Output = anyhow::Result<()>> + Send + 'static;
254254
fn finally<F: FnOnce(T, &GraphState) + 'static>(self: &Rc<Self>, func: F) -> Rc<dyn Node>;
255255
/// executes supplied closure on each tick
256256
fn for_each(self: &Rc<Self>, func: impl Fn(T, NanoTime) + 'static) -> Rc<dyn Node>;
@@ -410,7 +410,7 @@ where
410410
) -> Rc<dyn Node>
411411
where
412412
T: Element + Send,
413-
FUT: Future<Output = ()> + Send + 'static,
413+
FUT: Future<Output = anyhow::Result<()>> + Send + 'static,
414414
{
415415
AsyncConsumerNode::new(self.clone(), func).into_node()
416416
}

0 commit comments

Comments
 (0)