Skip to content

Commit ae8de32

Browse files
authored
Place a buffer over each sink of a demux to avoid serial message sending (hydro-project#575)
1 parent fe4ecbf commit ae8de32

File tree

1 file changed

+21
-6
lines changed
  • hydroflow_cli_integration/src

1 file changed

+21
-6
lines changed

hydroflow_cli_integration/src/lib.rs

+21-6
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use std::{
1212
use bytes::{Bytes, BytesMut};
1313
use serde::{Deserialize, Serialize};
1414

15-
use futures::{ready, stream, Sink, Stream};
15+
use futures::{ready, sink::Buffer, stream, Sink, SinkExt, Stream};
1616

1717
use async_recursion::async_recursion;
1818
use async_trait::async_trait;
@@ -395,9 +395,14 @@ impl ConnectedSink for ConnectedBidi {
395395
}
396396
}
397397

398-
pub struct ConnectedDemux<T: ConnectedSink> {
398+
pub type BufferedDrain<S, I> = DemuxDrain<I, Buffer<S, I>>;
399+
400+
pub struct ConnectedDemux<T: ConnectedSink>
401+
where
402+
<T as ConnectedSink>::Input: Sync,
403+
{
399404
pub keys: Vec<u32>,
400-
sink: Option<DemuxDrain<T::Input, T::Sink>>,
405+
sink: Option<BufferedDrain<T::Sink, T::Input>>,
401406
}
402407

403408
#[pin_project]
@@ -460,7 +465,12 @@ where
460465
for (id, pipe) in demux {
461466
connected_demux.insert(
462467
id,
463-
Box::pin(T::from_defn(ServerOrBound::Server(pipe)).await.into_sink()),
468+
Box::pin(
469+
T::from_defn(ServerOrBound::Server(pipe))
470+
.await
471+
.into_sink()
472+
.buffer(1024),
473+
),
464474
);
465475
}
466476

@@ -481,7 +491,12 @@ where
481491
for (id, bound) in demux {
482492
connected_demux.insert(
483493
id,
484-
Box::pin(T::from_defn(ServerOrBound::Bound(bound)).await.into_sink()),
494+
Box::pin(
495+
T::from_defn(ServerOrBound::Bound(bound))
496+
.await
497+
.into_sink()
498+
.buffer(1024),
499+
),
485500
);
486501
}
487502

@@ -505,7 +520,7 @@ where
505520
<T as ConnectedSink>::Input: 'static + Sync,
506521
{
507522
type Input = (u32, T::Input);
508-
type Sink = DemuxDrain<T::Input, T::Sink>;
523+
type Sink = BufferedDrain<T::Sink, T::Input>;
509524

510525
fn into_sink(mut self) -> Self::Sink {
511526
self.sink.take().unwrap()

0 commit comments

Comments
 (0)