@@ -285,8 +285,10 @@ fn spawn_read_write_tasks<
285285 // 1. The reader task detects that the destination has closed the connection via reader returning None and terminates,
286286 // 1.1. connection_closed_tx is sent `ConnectionError::OtherSideClosed`
287287 // 1.2. in_tx, out_tx (reader's copy), and reader_shutdown_tx are dropped
288- // 2. The writer task detects that reader_shutdown_tx was dropped via reader_shutdown_rx and terminates promptly,
289- // closing the TCP write half and preventing CLOSE-WAIT connection leaks.
288+ // 2. The writer task detects that reader_shutdown_tx was dropped via reader_shutdown_rx.
289+ // 2.1. The writer task drains any already-buffered messages using try_recv() (non-blocking).
290+ // 2.2. Once the buffer is empty, the writer task terminates, closing the TCP write half
291+ // and preventing CLOSE-WAIT connection leaks.
290292 // (Note: without reader_shutdown_rx, the writer would wait forever on out_rx.recv() since SinkConnection.out_tx still exists)
291293 // 3. Eventually when SinkConnection::recv/try_recv is called, it detects that in_tx is dropped by in_rx returning None
292294 // and returns the ConnectionError::OtherSideClosed received from connection_closed_rx.
@@ -305,7 +307,8 @@ fn spawn_read_write_tasks<
305307
306308 // This channel is used to signal the writer task when the reader task terminates.
307309 // When the reader task terminates (for any reason), it drops reader_shutdown_tx,
308- // which causes reader_shutdown_rx to return an error, signaling the writer to shut down.
310+ // which causes reader_shutdown_rx to complete, signaling the writer to begin graceful shutdown.
311+ // The writer then drains any buffered messages before terminating.
309312 // This prevents CLOSE-WAIT connection leaks that would occur if the writer task
310313 // kept waiting on out_rx.recv() while holding the TCP write half open.
311314 let ( reader_shutdown_tx, reader_shutdown_rx) = oneshot:: channel :: < ( ) > ( ) ;
@@ -441,38 +444,58 @@ async fn writer_task<C: CodecBuilder + 'static, W: AsyncWrite + Unpin + Send + '
441444 request_pending : Arc < RequestPending > ,
442445 mut reader_shutdown_rx : oneshot:: Receiver < ( ) > ,
443446) -> Result < ( ) , ConnectionError > {
447+ let mut reader_has_shutdown = false ;
448+
444449 loop {
445- tokio:: select! {
446- biased;
447- // Check if reader task has terminated - this prevents CLOSE-WAIT leaks
448- // when the broker closes the connection and shotover doesn't actively poll the connection.
449- _ = & mut reader_shutdown_rx => {
450- // Reader task has terminated, we should shut down too.
451- // This ensures the TCP write half is closed promptly.
452- return Ok ( ( ) ) ;
453- }
454- result = out_rx. recv( ) => {
455- if let Some ( messages) = result {
450+ // When reader has shutdown, only process already-buffered messages (non-blocking).
451+ // This prevents CLOSE-WAIT leaks while still allowing pending sends to complete.
452+ if reader_has_shutdown {
453+ match out_rx. try_recv ( ) {
454+ Ok ( messages) => {
456455 request_pending. add ( messages. len ( ) as u64 ) ;
457- match writer. send( messages) . await {
458- Err ( CodecWriteError :: Encoder ( err) ) => {
459- return Err ( ConnectionError :: MessageEncode ( Arc :: new( err) ) ) ;
460- }
461- Err ( CodecWriteError :: Io ( err) ) => {
462- if matches!(
463- err. kind( ) ,
464- ErrorKind :: BrokenPipe | ErrorKind :: ConnectionReset
465- ) {
466- return Err ( ConnectionError :: OtherSideClosed ) ;
467- } else {
468- return Err ( ConnectionError :: Io ( Arc :: new( err) ) ) ;
456+ // Ignore send errors when draining - connection is closing anyway
457+ let _ = writer. send ( messages) . await ;
458+ }
459+ Err ( _) => {
460+ // No more buffered messages, safe to terminate
461+ //TODO not sure if skipping error handling is correct, at least might need to log em
462+ return Ok ( ( ) ) ;
463+ }
464+ }
465+ } else {
466+ tokio:: select! {
467+ // Don't use biased - prefer sending messages over checking shutdown
468+ // to avoid dropping messages that are ready to send.
469+ result = out_rx. recv( ) => {
470+ if let Some ( messages) = result {
471+ request_pending. add( messages. len( ) as u64 ) ;
472+ match writer. send( messages) . await {
473+ Err ( CodecWriteError :: Encoder ( err) ) => {
474+ return Err ( ConnectionError :: MessageEncode ( Arc :: new( err) ) ) ;
469475 }
476+ Err ( CodecWriteError :: Io ( err) ) => {
477+ if matches!(
478+ err. kind( ) ,
479+ ErrorKind :: BrokenPipe | ErrorKind :: ConnectionReset
480+ ) {
481+ return Err ( ConnectionError :: OtherSideClosed ) ;
482+ } else {
483+ return Err ( ConnectionError :: Io ( Arc :: new( err) ) ) ;
484+ }
485+ }
486+ Ok ( ( ) ) => { }
470487 }
471- Ok ( ( ) ) => { }
488+ } else {
489+ // shotover is no longer sending responses, this task is no longer needed
490+ return Ok ( ( ) ) ;
472491 }
473- } else {
474- // shotover is no longer sending responses, this task is no longer needed
475- return Ok ( ( ) ) ;
492+ }
493+ // Check if reader task has terminated - this prevents CLOSE-WAIT leaks
494+ // when the broker closes the connection and shotover doesn't actively poll the connection.
495+ _ = & mut reader_shutdown_rx => {
496+ // Reader task has terminated. Don't terminate immediately -
497+ // first drain any buffered messages to avoid dropping them.
498+ reader_has_shutdown = true ;
476499 }
477500 }
478501 }
0 commit comments