Skip to content

Commit 2fdb583

Browse files
committed
Add batched_sink utility for flushing sink elements together
1 parent b6945d8 commit 2fdb583

File tree

9 files changed

+163
-10
lines changed

9 files changed

+163
-10
lines changed

Cargo.lock

+18
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

hydro_cli/src/core/hydroflow_crate/ports.rs

-1
Original file line numberDiff line numberDiff line change
@@ -526,7 +526,6 @@ impl ServerConfig {
526526
}
527527

528528
ServerConfig::MergeSelect(underlying, key) => {
529-
dbg!(underlying);
530529
let key = *key;
531530
underlying
532531
.load_instantiated(

hydro_cli/src/lib.rs

+10-6
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ struct SafeCancelToken {
3030
impl SafeCancelToken {
3131
fn safe_cancel(&mut self) {
3232
if let Some(token) = self.cancel_tx.take() {
33-
eprintln!("Received cancellation, cleaning up...");
34-
token.send(()).unwrap();
33+
if token.send(()).is_ok() {
34+
eprintln!("Received cancellation, cleaning up...");
35+
}
3536
} else {
3637
eprintln!("Already received cancellation, please be patient!");
3738
}
@@ -52,13 +53,16 @@ async def coroutine_to_safely_cancellable(c, cancel_token):
5253
while True:
5354
try:
5455
ok, cancel = await asyncio.shield(c)
56+
is_done = True
5557
except asyncio.CancelledError:
5658
cancel_token.safe_cancel()
59+
is_done = False
5760
58-
if not cancel:
59-
return ok
60-
else:
61-
raise asyncio.CancelledError()
61+
if is_done:
62+
if not cancel:
63+
return ok
64+
else:
65+
raise asyncio.CancelledError()
6266
"#,
6367
"coro_converter",
6468
"coro_converter",

hydro_cli_examples/examples/dedalus_sender/main.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::time::Duration;
2+
13
use hydroflow::{
24
tokio_stream::wrappers::IntervalStream,
35
util::{
@@ -31,7 +33,7 @@ async fn main() {
3133
.input repeated `repeat_iter(to_repeat.iter().cloned())`
3234
.input periodic `source_stream(periodic) -> map(|_| ())`
3335
.input peers `repeat_iter(peers.clone()) -> map(|p| (p,))`
34-
.async broadcast `map(|(node_id, v)| (node_id, serialize_to_bytes(v))) -> dest_sink(broadcast_sink)` `null::<(String,)>()`
36+
.async broadcast `map(|(node_id, v)| (node_id, serialize_to_bytes(v))) -> dest_sink_chunked(broadcast_sink, 8, Duration::from_millis(1))` `null::<(String,)>()`
3537
3638
broadcast@n(x) :~ repeated(x), periodic(), peers(n)
3739
"#

hydroflow/Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ edition = "2021"
55

66
[features]
77
default = [ "async", "macros" ]
8-
async = [ "futures" ]
8+
async = [ "futures", "futures-batch" ]
99
macros = [ "hydroflow_macro", "hydroflow_datalog" ]
1010
hydroflow_macro = [ "dep:hydroflow_macro" ]
1111
hydroflow_datalog = [ "dep:hydroflow_datalog" ]
@@ -52,6 +52,7 @@ bincode = "1.3"
5252
byteorder = "1.4.3"
5353
bytes = "1.1.0"
5454
futures = { version = "0.3", optional = true }
55+
futures-batch = { version = "0.6.1", optional = true }
5556
hydroflow_datalog = { optional = true, path = "../hydroflow_datalog" }
5657
hydroflow_lang = { path = "../hydroflow_lang" }
5758
hydroflow_macro = { optional = true, path = "../hydroflow_macro" }

hydroflow/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ pub mod util;
1414
pub use bincode;
1515
pub use bytes;
1616
pub use futures;
17+
pub use futures_batch;
1718
pub use pusherator;
1819
pub use rustc_hash;
1920
pub use serde;

hydroflow/src/util/mod.rs

+32-1
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,10 @@ pub mod cli;
1818

1919
use std::net::SocketAddr;
2020
use std::task::{Context, Poll};
21+
use std::time::Duration;
2122

2223
use bincode;
23-
use futures::Stream;
24+
use futures::{Sink, SinkExt, Stream};
2425
use serde::{Deserialize, Serialize};
2526

2627
pub fn unbounded_channel<T>() -> (
@@ -131,6 +132,36 @@ where
131132
slice.sort_unstable_by(|a, b| f(a).cmp(f(b)))
132133
}
133134

135+
pub fn batched_sink<I: Send + 'static, S: Sink<I> + Send + 'static>(
136+
s: S,
137+
cap: usize,
138+
timeout: Duration,
139+
) -> impl Sink<I, Error = ()> + Unpin {
140+
let (send, recv) = tokio::sync::mpsc::unbounded_channel::<I>();
141+
142+
use futures::{stream, StreamExt};
143+
use futures_batch::ChunksTimeoutStreamExt;
144+
145+
tokio::spawn(async move {
146+
let recv_stream = tokio_stream::wrappers::UnboundedReceiverStream::new(recv);
147+
let mut batched_recv = recv_stream.chunks_timeout(cap, timeout);
148+
let mut s = Box::pin(s);
149+
150+
while let Some(batch) = batched_recv.next().await {
151+
if s.send_all(&mut stream::iter(batch).map(|v| Ok(v)))
152+
.await
153+
.is_err()
154+
{
155+
panic!("Batched sink failed")
156+
}
157+
}
158+
});
159+
160+
Box::pin(futures::sink::unfold(send, |send, item| async move {
161+
send.send(item).map(|_| send).map_err(|_| ())
162+
}))
163+
}
164+
134165
#[cfg(test)]
135166
mod test {
136167
use super::*;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
use super::{make_missing_runtime_msg, FlowProperties, FlowPropertyVal};
2+
3+
use super::{
4+
OperatorConstraints, OperatorInstance, OperatorWriteOutput, WriteContextArgs, RANGE_0, RANGE_1,
5+
};
6+
7+
use quote::quote_spanned;
8+
9+
/// The same as `dest_sink`, but takes two additional parameters controlling
10+
/// when the data is actually flushed.
11+
#[hydroflow_internalmacro::operator_docgen]
12+
pub const DEST_SINK_CHUNKED: OperatorConstraints = OperatorConstraints {
13+
name: "dest_sink_chunked",
14+
hard_range_inn: RANGE_1,
15+
soft_range_inn: RANGE_1,
16+
hard_range_out: RANGE_0,
17+
soft_range_out: RANGE_0,
18+
num_args: 3,
19+
persistence_args: RANGE_0,
20+
type_args: RANGE_0,
21+
is_external_input: false,
22+
ports_inn: None,
23+
ports_out: None,
24+
properties: FlowProperties {
25+
deterministic: FlowPropertyVal::Preserve,
26+
monotonic: FlowPropertyVal::Preserve,
27+
inconsistency_tainted: false,
28+
},
29+
input_delaytype_fn: |_| None,
30+
write_fn: |wc @ &WriteContextArgs {
31+
root,
32+
hydroflow,
33+
op_span,
34+
ident,
35+
op_name,
36+
op_inst: OperatorInstance { arguments, .. },
37+
..
38+
},
39+
_| {
40+
let sink_arg = &arguments[0];
41+
let chunk_size_arg = &arguments[1];
42+
let chunk_delay_arg = &arguments[2];
43+
44+
let send_ident = wc.make_ident("item_send");
45+
let recv_ident = wc.make_ident("item_recv");
46+
47+
let missing_runtime_msg = make_missing_runtime_msg(op_name);
48+
49+
let write_prologue = quote_spanned! {op_span=>
50+
let (#send_ident, #recv_ident) = #root::tokio::sync::mpsc::unbounded_channel();
51+
{
52+
/// Function is needed so `Item` is so no ambiguity for what `Item` is used
53+
/// when calling `.flush()`.
54+
async fn sink_feed_flush<Sink, Item>(
55+
recv: #root::tokio::sync::mpsc::UnboundedReceiver<Item>,
56+
mut sink: Sink,
57+
) where
58+
Sink: ::std::marker::Unpin + #root::futures::Sink<Item>,
59+
Sink::Error: ::std::fmt::Debug,
60+
{
61+
use #root::futures::SinkExt;
62+
use #root::futures::StreamExt;
63+
use #root::futures_batch::ChunksTimeoutStreamExt;
64+
65+
let recv_stream = #root::tokio_stream::wrappers::UnboundedReceiverStream::new(recv);
66+
let mut batched_recv = Box::pin(recv_stream.chunks_timeout(#chunk_size_arg, #chunk_delay_arg));
67+
68+
while let Some(batch) = batched_recv.next().await {
69+
for item in batch {
70+
sink.feed(item)
71+
.await
72+
.expect("Error processing async sink item.");
73+
}
74+
75+
sink.flush().await.expect("Failed to flush sink.");
76+
}
77+
}
78+
#hydroflow
79+
.spawn_task(sink_feed_flush(#recv_ident, #sink_arg))
80+
.expect(#missing_runtime_msg);
81+
}
82+
};
83+
84+
let write_iterator = quote_spanned! {op_span=>
85+
let #ident = #root::pusherator::for_each::ForEach::new(|item| {
86+
#send_ident.send(item).expect("Failed to send async write item for processing.");
87+
});
88+
};
89+
90+
Ok(OperatorWriteOutput {
91+
write_prologue,
92+
write_iterator,
93+
..Default::default()
94+
})
95+
},
96+
};

hydroflow_lang/src/graph/ops/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,7 @@ declare_ops![
232232
demux::DEMUX,
233233
dest_file::DEST_FILE,
234234
dest_sink::DEST_SINK,
235+
dest_sink_chunked::DEST_SINK_CHUNKED,
235236
dest_sink_serde::DEST_SINK_SERDE,
236237
difference::DIFFERENCE,
237238
filter::FILTER,

0 commit comments

Comments
 (0)