@@ -108,7 +108,8 @@ impl Substream {
108108 let shutdown_waker = Arc :: new ( Mutex :: new ( None ) ) ;
109109
110110 let handle = SubstreamHandle {
111- tx : inbound_tx,
111+ inbound_tx,
112+ outbound_tx : outbound_tx. clone ( ) ,
112113 rx : outbound_rx,
113114 state : Arc :: clone ( & state) ,
114115 shutdown_waker : Arc :: clone ( & shutdown_waker) ,
@@ -133,7 +134,10 @@ pub struct SubstreamHandle {
133134 state : Arc < Mutex < State > > ,
134135
135136 /// TX channel for sending inbound messages from `peer` to the associated `Substream`.
136- tx : Sender < Event > ,
137+ inbound_tx : Sender < Event > ,
138+
139+ /// TX channel for sending outbound messages to `peer` (e.g., FIN_ACK responses).
140+ outbound_tx : Sender < Event > ,
137141
138142 /// RX channel for receiving outbound messages to `peer` from the associated `Substream`.
139143 rx : Receiver < Event > ,
@@ -151,11 +155,11 @@ impl SubstreamHandle {
151155 pub async fn on_message ( & self , message : WebRtcMessage ) -> crate :: Result < ( ) > {
152156 if let Some ( flag) = message. flag {
153157 if flag == Flag :: Fin as i32 {
154- // Received FIN, send FIN_ACK back
155- self . tx . send ( Event :: RecvClosed ) . await ?;
156- // Send FIN_ACK to acknowledge
158+ // Received FIN from remote, close our read half
159+ self . inbound_tx . send ( Event :: RecvClosed ) . await ?;
160+ // Send FIN_ACK back to remote
157161 return self
158- . tx
162+ . outbound_tx
159163 . send ( Event :: Message {
160164 payload : vec ! [ ] ,
161165 flag : Some ( Flag :: FinAck as i32 ) ,
@@ -190,7 +194,7 @@ impl SubstreamHandle {
190194 if let Some ( payload) = message. payload {
191195 if !payload. is_empty ( ) {
192196 return self
193- . tx
197+ . inbound_tx
194198 . send ( Event :: Message {
195199 payload,
196200 flag : None ,
@@ -509,7 +513,7 @@ mod tests {
509513 async fn read_small_frame ( ) {
510514 let ( mut substream, handle) = Substream :: new ( ) ;
511515 handle
512- . tx
516+ . inbound_tx
513517 . send ( Event :: Message {
514518 payload : vec ! [ 1u8 ; 256 ] ,
515519 flag : None ,
@@ -544,7 +548,7 @@ mod tests {
544548 first. extend_from_slice ( & vec ! [ 2u8 ; 256 ] ) ;
545549
546550 handle
547- . tx
551+ . inbound_tx
548552 . send ( Event :: Message {
549553 payload : first,
550554 flag : None ,
@@ -587,15 +591,15 @@ mod tests {
587591 first. extend_from_slice ( & vec ! [ 2u8 ; 256 ] ) ;
588592
589593 handle
590- . tx
594+ . inbound_tx
591595 . send ( Event :: Message {
592596 payload : first,
593597 flag : None ,
594598 } )
595599 . await
596600 . unwrap ( ) ;
597601 handle
598- . tx
602+ . inbound_tx
599603 . send ( Event :: Message {
600604 payload : vec ! [ 4u8 ; 2048 ] ,
601605 flag : None ,
@@ -740,7 +744,19 @@ mod tests {
740744
741745 #[ tokio:: test]
742746 async fn fin_ack_response_on_receiving_fin ( ) {
743- let ( _substream, handle) = Substream :: new ( ) ;
747+ let ( mut substream, mut handle) = Substream :: new ( ) ;
748+
749+ // Spawn task to consume inbound events sent to the substream
750+ let consumer_task = tokio:: spawn ( async move {
751+ // Substream should receive RecvClosed
752+ let mut buf = vec ! [ 0u8 ; 1024 ] ;
753+ match substream. read ( & mut buf) . await {
754+ Err ( e) if e. kind ( ) == std:: io:: ErrorKind :: BrokenPipe => {
755+ // Expected - read half closed
756+ }
757+ other => panic ! ( "Unexpected result: {:?}" , other) ,
758+ }
759+ } ) ;
744760
745761 // Simulate receiving FIN from remote
746762 handle
@@ -751,8 +767,17 @@ mod tests {
751767 . await
752768 . unwrap ( ) ;
753769
754- // Should have sent FIN_ACK back (this would be captured by the connection layer)
755- // In real scenario, the connection would read from handle.rx
770+ // Wait for consumer task to complete
771+ consumer_task. await . unwrap ( ) ;
772+
773+ // Verify FIN_ACK was sent outbound to network
774+ assert_eq ! (
775+ handle. next( ) . await ,
776+ Some ( Event :: Message {
777+ payload: vec![ ] ,
778+ flag: Some ( Flag :: FinAck as i32 )
779+ } )
780+ ) ;
756781 }
757782
758783 #[ tokio:: test]
@@ -910,7 +935,7 @@ mod tests {
910935
911936 #[ tokio:: test]
912937 async fn flag_are_mutually_exclusive ( ) {
913- let ( mut substream , handle) = Substream :: new ( ) ;
938+ let ( _substream , handle) = Substream :: new ( ) ;
914939
915940 // Test that STOP_SENDING (1) is handled correctly
916941 handle
@@ -1085,7 +1110,6 @@ mod tests {
10851110
10861111 #[ tokio:: test]
10871112 async fn closing_state_blocks_writes ( ) {
1088- use std:: { pin:: Pin , task:: Poll } ;
10891113 use tokio:: io:: AsyncWriteExt ;
10901114
10911115 let ( mut substream, handle) = Substream :: new ( ) ;
0 commit comments