@@ -24,15 +24,15 @@ use crate::{
2424} ;
2525
2626use bytes:: { Buf , BufMut , BytesMut } ;
27- use futures:: Stream ;
27+ use futures:: { task :: AtomicWaker , Stream } ;
2828use parking_lot:: Mutex ;
2929use tokio:: sync:: mpsc:: { channel, Receiver , Sender } ;
3030use tokio_util:: sync:: PollSender ;
3131
3232use std:: {
3333 pin:: Pin ,
3434 sync:: Arc ,
35- task:: { Context , Poll , Waker } ,
35+ task:: { Context , Poll } ,
3636 time:: { Duration , Instant } ,
3737} ;
3838
@@ -96,7 +96,7 @@ pub struct Substream {
9696 rx : Receiver < Event > ,
9797
9898 /// Waker to notify when shutdown completes (FIN_ACK received).
99- shutdown_waker : Arc < Mutex < Option < Waker > > > ,
99+ shutdown_waker : Arc < AtomicWaker > ,
100100
101101 /// Timestamp when FIN was sent, used for timeout.
102102 fin_sent_at : Option < Instant > ,
@@ -108,7 +108,7 @@ impl Substream {
108108 let ( outbound_tx, outbound_rx) = channel ( 256 ) ;
109109 let ( inbound_tx, inbound_rx) = channel ( 256 ) ;
110110 let state = Arc :: new ( Mutex :: new ( State :: Open ) ) ;
111- let shutdown_waker = Arc :: new ( Mutex :: new ( None ) ) ;
111+ let shutdown_waker = Arc :: new ( AtomicWaker :: new ( ) ) ;
112112
113113 let handle = SubstreamHandle {
114114 inbound_tx,
@@ -146,7 +146,7 @@ pub struct SubstreamHandle {
146146 rx : Receiver < Event > ,
147147
148148 /// Waker to notify when shutdown completes (FIN_ACK received).
149- shutdown_waker : Arc < Mutex < Option < Waker > > > ,
149+ shutdown_waker : Arc < AtomicWaker > ,
150150}
151151
152152impl SubstreamHandle {
@@ -180,9 +180,7 @@ impl SubstreamHandle {
180180 if matches ! ( * state, State :: FinSent ) {
181181 * state = State :: FinAcked ;
182182 // Wake up any task waiting on shutdown
183- if let Some ( waker) = self . shutdown_waker . lock ( ) . take ( ) {
184- waker. wake ( ) ;
185- }
183+ self . shutdown_waker . wake ( ) ;
186184 }
187185 return Ok ( ( ) ) ;
188186 }
@@ -350,7 +348,7 @@ impl tokio::io::AsyncWrite for Substream {
350348 }
351349
352350 // Store the waker so it can be triggered when we receive FIN_ACK
353- * self . shutdown_waker . lock ( ) = Some ( cx. waker ( ) . clone ( ) ) ;
351+ self . shutdown_waker . register ( cx. waker ( ) ) ;
354352 return Poll :: Pending ;
355353 }
356354
@@ -389,7 +387,7 @@ impl tokio::io::AsyncWrite for Substream {
389387 // Record timestamp for timeout tracking
390388 * self . state . lock ( ) = State :: FinSent ;
391389 self . fin_sent_at = Some ( Instant :: now ( ) ) ;
392- * self . shutdown_waker . lock ( ) = Some ( cx. waker ( ) . clone ( ) ) ;
390+ self . shutdown_waker . register ( cx. waker ( ) ) ;
393391
394392 // Spawn timeout task to wake us after FIN_ACK_TIMEOUT
395393 let waker = cx. waker ( ) . clone ( ) ;
0 commit comments