11use futures:: { FutureExt , SinkExt , StreamExt } ;
2+ use rand:: { SeedableRng , Rng } ;
23use tokio_listener:: { Connection , Listener } ;
34
45use std:: cell:: { Cell , RefCell } ;
@@ -13,11 +14,16 @@ use tungstenite::Message;
1314
1415type UsualClient = tokio_tungstenite:: WebSocketStream < Connection > ;
1516
16- type ClientSink = Rc < tokio:: sync:: Mutex < futures:: stream:: SplitSink < UsualClient , Message > > > ;
1717type ClientStream = futures:: stream:: SplitStream < UsualClient > ;
1818type AllClients = Rc < RefCell < Slab < ClientSink > > > ;
1919type Url2Clientset = HashMap < String , AllClients > ;
2020
21+ #[ derive( Clone ) ]
22+ struct ClientSink {
23+ s : Rc < tokio:: sync:: Mutex < futures:: stream:: SplitSink < UsualClient , Message > > > ,
24+ queue : Option < Rc < tokio:: sync:: mpsc:: Sender < Message > > > ,
25+ }
26+
2127const DEFAULT_MAXURLS : usize = 64 ;
2228
2329async fn process_client_messages (
@@ -28,6 +34,7 @@ async fn process_client_messages(
2834 flags : & flags:: Wsbroad ,
2935) -> Result < ( ) > {
3036 let mut ids_for_backpressure = vec ! [ ] ;
37+ let mut rng = flags. stochastic_queue . map ( |_|rand:: rngs:: SmallRng :: from_entropy ( ) ) ;
3138 while let Some ( m) = stream. next ( ) . await {
3239 let m = m?;
3340
@@ -36,7 +43,7 @@ async fn process_client_messages(
3643 }
3744 let fwd = match m {
3845 Message :: Ping ( p) => {
39- sink. lock ( ) . await . send ( Message :: Pong ( p) ) . await ?;
46+ sink. s . lock ( ) . await . send ( Message :: Pong ( p) ) . await ?;
4047 continue ;
4148 }
4249 Message :: Pong ( _) => continue ,
@@ -57,7 +64,8 @@ async fn process_client_messages(
5764 let all_borrowed = all. borrow ( ) ;
5865 if let Some ( cs) = all_borrowed. get ( id) . cloned ( ) {
5966 drop ( all_borrowed) ;
60- let mut cs_lock = cs. lock ( ) . await ;
67+ let mut cs_lock = cs. s . lock ( ) . await ;
68+
6169 let ret = cs_lock. send ( fwd. clone ( ) ) . await ;
6270 drop ( cs_lock) ;
6371
@@ -69,17 +77,43 @@ async fn process_client_messages(
6977 }
7078 }
7179 } else {
80+ // must not await inside this loop
7281 for ( id, i) in all. borrow ( ) . iter ( ) {
7382 if !flags. reflexive {
7483 if id == my_id {
7584 continue ;
7685 }
7786 }
7887
79- // should be always uncontended when backpressure is not enabled
80- let mut cs_lock = i. lock ( ) . await ;
81-
82- let _ = cs_lock. send ( fwd. clone ( ) ) . now_or_never ( ) ;
88+ if let Some ( ref queue) = i. queue {
89+ // stochastic queue mode
90+
91+ if queue. max_capacity ( ) == 1 {
92+ let _ = queue. try_send ( fwd. clone ( ) ) ;
93+ } else {
94+ let mut denominator = queue. max_capacity ( ) as u32 ;
95+ let mut numerator = queue. capacity ( ) as u32 ;
96+ if denominator > 2 {
97+ // first half of the queue is a free ride
98+ denominator /= 2 ;
99+ numerator = numerator. saturating_sub ( denominator) ;
100+ denominator += 1 ;
101+ } else {
102+ denominator += 1 ;
103+ }
104+ if rng. as_mut ( ) . unwrap ( ) . gen_ratio ( numerator, denominator) {
105+ let _ = queue. try_send ( fwd. clone ( ) ) ;
106+ } else {
107+ // actively drop message
108+ }
109+ }
110+ } else {
111+ // should be always uncontended when backpressure is not enabled and qlen is not used
112+ if let Ok ( mut cs_lock) = i. s . try_lock ( ) {
113+ let _ = cs_lock. send ( fwd. clone ( ) ) . now_or_never ( ) ;
114+ drop ( cs_lock) ;
115+ }
116+ }
83117 }
84118 }
85119 }
@@ -88,15 +122,41 @@ async fn process_client_messages(
88122}
89123
90124async fn serve_client ( client : UsualClient , all : AllClients , flags : & flags:: Wsbroad ) -> Result < ( ) > {
91- let ( sink, stream) = client. split ( ) ;
92- let sink: ClientSink = Rc :: new ( tokio:: sync:: Mutex :: new ( sink) ) ;
125+ let ( ws_write_part, ws_read_part) = client. split ( ) ;
126+
127+ let s = Rc :: new ( tokio:: sync:: Mutex :: new ( ws_write_part) ) ;
128+ let mut sink = ClientSink {
129+ s,
130+ queue : None ,
131+ } ;
132+
133+ let jh = if let Some ( slen) = flags. stochastic_queue {
134+ let ( tx, mut rx) = tokio:: sync:: mpsc:: channel ( slen) ;
135+ let s2 = sink. s . clone ( ) ;
136+ let jh = tokio:: task:: spawn_local ( async move {
137+ while let Some ( msg) = rx. recv ( ) . await {
138+ if s2. lock ( ) . await . send ( msg) . await . is_err ( ) {
139+ break ;
140+ }
141+ }
142+ } ) ;
143+ sink. queue = Some ( Rc :: new ( tx) ) ;
144+ Some ( jh)
145+ } else {
146+ None
147+ } ;
93148
94149 let my_id: usize = all. borrow_mut ( ) . insert ( sink. clone ( ) ) ;
95150
96- let ret = process_client_messages ( my_id, sink, stream, & all, flags) . await ;
151+ let ret = process_client_messages ( my_id, sink, ws_read_part, & all, flags) . await ;
152+
153+ if let Some ( jh) = jh {
154+ jh. abort ( ) ;
155+ let _ = jh. await ;
156+ }
97157
98158 let sink = all. borrow_mut ( ) . remove ( my_id) ;
99- sink. lock ( ) . await . send ( Message :: Close ( None ) ) . await ?;
159+ sink. s . lock ( ) . await . send ( Message :: Close ( None ) ) . await ?;
100160 ret?;
101161
102162 Ok ( ( ) )
@@ -194,7 +254,7 @@ async fn client_accepting_loop(listener: &mut Listener, flags: Rc<flags::Wsbroad
194254}
195255
196256mod flags {
197- use tokio_listener:: { ListenerAddress , UnixChmodVariant , TcpKeepaliveParams } ;
257+ use tokio_listener:: { ListenerAddress , TcpKeepaliveParams , UnixChmodVariant } ;
198258
199259 xflags:: xflags! {
200260 src "./src/main.rs"
@@ -225,6 +285,8 @@ mod flags {
225285 optional --tcp-keepalive ka_triplet: TcpKeepaliveParams
226286
227287 /// try to set SO_REUSEPORT, so that multiple processes can accept connections from the same port in a round-robin fashion.
288+ ///
289+ /// Obviously, URL domains would be different based on which instance does the client land.
228290 optional --tcp-reuse-port
229291
230292 /// set socket's IPV6_V6ONLY to true, to avoid receiving IPv4 connections on IPv6 socket
@@ -243,16 +305,16 @@ mod flags {
243305
244306 /// The target minimum size of the in-app write buffer to reach before writing the data to the underlying stream.
245307 /// The default value is 128 KiB, but wsbroad flushes after sending every message, so this may be unrelevant.
246- ///
308+ ///
247309 /// May be 0. Needs to be less that --max-write-buffer-size.
248310 optional --write-buffer-size size_bytes: usize
249311
250312 /// The max size of the in-app write buffer in bytes. Default is 4 MiB.
251- ///
252- /// This affects how much messages get buffered before droppign
313+ ///
314+ /// This affects how much messages get buffered before droppign
253315 /// or slowing down sender begins. Note that --send-buffer-size also affects
254316 /// this behaviour.
255- ///
317+ ///
256318 /// Also indirectly affects max message size
257319 optional --max-write-buffer-size size_bytes: usize
258320
@@ -285,6 +347,18 @@ mod flags {
285347
286348 /// Set TCP_NODELAY to deliver small messages with less latency
287349 optional --nodelay
350+
351+ /// drop messages to slow receivers not in clusters (i.e. multiple dropped messages in a row),
352+ /// but with increasing probability based on congestion level.
353+ /// Value is maximum additional queue length. The bigger - the more uniformly message going to be
354+ /// dropped when overloaded, but the higher there may be latency for message that go though
355+ ///
356+ /// Short queue descreases thgouhput.
357+ ///
358+ /// Note that other buffers (--max-write-buffer-size and --send-buffer-size) still apply after this queue.
359+ ///
360+ /// Unlike other options, the unit is messages, not bytes
361+ optional --stochastic-queue qlen: usize
288362 }
289363 }
290364 // generated start
@@ -315,6 +389,7 @@ mod flags {
315389 pub backpressure : bool ,
316390 pub backpressure_with_errors : bool ,
317391 pub nodelay : bool ,
392+ pub stochastic_queue : Option < usize > ,
318393 }
319394
320395 impl Wsbroad {
@@ -355,7 +430,11 @@ async fn main() -> Result<()> {
355430 uopts. tcp_only_v6 = flags. tcp_only_v6 ;
356431 uopts. tcp_listen_backlog = flags. tcp_listen_backlog ;
357432 uopts. recv_buffer_size = flags. recv_buffer_size ;
358- uopts. send_buffer_size = flags. send_buffer_size ;
433+ uopts. send_buffer_size = flags. send_buffer_size ;
434+
435+ if flags. stochastic_queue . is_some ( ) && ( flags. backpressure || flags. backpressure_with_errors ) {
436+ anyhow:: bail!( "--stochastic-queue is incompatibel with --backpressure" ) ;
437+ }
359438
360439 let mut listener = Listener :: bind ( & flags. listen_addr , & sopts, & uopts) . await ?;
361440
0 commit comments