@@ -12,7 +12,7 @@ use std::{
12
12
use bytes:: { Bytes , BytesMut } ;
13
13
use serde:: { Deserialize , Serialize } ;
14
14
15
- use futures:: { ready, stream, Sink , Stream } ;
15
+ use futures:: { ready, sink :: Buffer , stream, Sink , SinkExt , Stream } ;
16
16
17
17
use async_recursion:: async_recursion;
18
18
use async_trait:: async_trait;
@@ -395,9 +395,14 @@ impl ConnectedSink for ConnectedBidi {
395
395
}
396
396
}
397
397
398
- pub struct ConnectedDemux < T : ConnectedSink > {
398
+ pub type BufferedDrain < S , I > = DemuxDrain < I , Buffer < S , I > > ;
399
+
400
+ pub struct ConnectedDemux < T : ConnectedSink >
401
+ where
402
+ <T as ConnectedSink >:: Input : Sync ,
403
+ {
399
404
pub keys : Vec < u32 > ,
400
- sink : Option < DemuxDrain < T :: Input , T :: Sink > > ,
405
+ sink : Option < BufferedDrain < T :: Sink , T :: Input > > ,
401
406
}
402
407
403
408
#[ pin_project]
@@ -460,7 +465,12 @@ where
460
465
for ( id, pipe) in demux {
461
466
connected_demux. insert (
462
467
id,
463
- Box :: pin ( T :: from_defn ( ServerOrBound :: Server ( pipe) ) . await . into_sink ( ) ) ,
468
+ Box :: pin (
469
+ T :: from_defn ( ServerOrBound :: Server ( pipe) )
470
+ . await
471
+ . into_sink ( )
472
+ . buffer ( 1024 ) ,
473
+ ) ,
464
474
) ;
465
475
}
466
476
@@ -481,7 +491,12 @@ where
481
491
for ( id, bound) in demux {
482
492
connected_demux. insert (
483
493
id,
484
- Box :: pin ( T :: from_defn ( ServerOrBound :: Bound ( bound) ) . await . into_sink ( ) ) ,
494
+ Box :: pin (
495
+ T :: from_defn ( ServerOrBound :: Bound ( bound) )
496
+ . await
497
+ . into_sink ( )
498
+ . buffer ( 1024 ) ,
499
+ ) ,
485
500
) ;
486
501
}
487
502
@@ -505,7 +520,7 @@ where
505
520
<T as ConnectedSink >:: Input : ' static + Sync ,
506
521
{
507
522
type Input = ( u32 , T :: Input ) ;
508
- type Sink = DemuxDrain < T :: Input , T :: Sink > ;
523
+ type Sink = BufferedDrain < T :: Sink , T :: Input > ;
509
524
510
525
fn into_sink ( mut self ) -> Self :: Sink {
511
526
self . sink . take ( ) . unwrap ( )
0 commit comments