Skip to content

Commit 4695143

Browse files
committed
webrtc: Fix FIN_ACK response to be sent outbound to network instead of inbound to self
1 parent 4c5917d commit 4695143

File tree

1 file changed

+40
-16
lines changed

1 file changed

+40
-16
lines changed

src/transport/webrtc/substream.rs

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,8 @@ impl Substream {
108108
let shutdown_waker = Arc::new(Mutex::new(None));
109109

110110
let handle = SubstreamHandle {
111-
tx: inbound_tx,
111+
inbound_tx,
112+
outbound_tx: outbound_tx.clone(),
112113
rx: outbound_rx,
113114
state: Arc::clone(&state),
114115
shutdown_waker: Arc::clone(&shutdown_waker),
@@ -133,7 +134,10 @@ pub struct SubstreamHandle {
133134
state: Arc<Mutex<State>>,
134135

135136
/// TX channel for sending inbound messages from `peer` to the associated `Substream`.
136-
tx: Sender<Event>,
137+
inbound_tx: Sender<Event>,
138+
139+
/// TX channel for sending outbound messages to `peer` (e.g., FIN_ACK responses).
140+
outbound_tx: Sender<Event>,
137141

138142
/// RX channel for receiving outbound messages to `peer` from the associated `Substream`.
139143
rx: Receiver<Event>,
@@ -151,11 +155,11 @@ impl SubstreamHandle {
151155
pub async fn on_message(&self, message: WebRtcMessage) -> crate::Result<()> {
152156
if let Some(flag) = message.flag {
153157
if flag == Flag::Fin as i32 {
154-
// Received FIN, send FIN_ACK back
155-
self.tx.send(Event::RecvClosed).await?;
156-
// Send FIN_ACK to acknowledge
158+
// Received FIN from remote, close our read half
159+
self.inbound_tx.send(Event::RecvClosed).await?;
160+
// Send FIN_ACK back to remote
157161
return self
158-
.tx
162+
.outbound_tx
159163
.send(Event::Message {
160164
payload: vec![],
161165
flag: Some(Flag::FinAck as i32),
@@ -190,7 +194,7 @@ impl SubstreamHandle {
190194
if let Some(payload) = message.payload {
191195
if !payload.is_empty() {
192196
return self
193-
.tx
197+
.inbound_tx
194198
.send(Event::Message {
195199
payload,
196200
flag: None,
@@ -509,7 +513,7 @@ mod tests {
509513
async fn read_small_frame() {
510514
let (mut substream, handle) = Substream::new();
511515
handle
512-
.tx
516+
.inbound_tx
513517
.send(Event::Message {
514518
payload: vec![1u8; 256],
515519
flag: None,
@@ -544,7 +548,7 @@ mod tests {
544548
first.extend_from_slice(&vec![2u8; 256]);
545549

546550
handle
547-
.tx
551+
.inbound_tx
548552
.send(Event::Message {
549553
payload: first,
550554
flag: None,
@@ -587,15 +591,15 @@ mod tests {
587591
first.extend_from_slice(&vec![2u8; 256]);
588592

589593
handle
590-
.tx
594+
.inbound_tx
591595
.send(Event::Message {
592596
payload: first,
593597
flag: None,
594598
})
595599
.await
596600
.unwrap();
597601
handle
598-
.tx
602+
.inbound_tx
599603
.send(Event::Message {
600604
payload: vec![4u8; 2048],
601605
flag: None,
@@ -740,7 +744,19 @@ mod tests {
740744

741745
#[tokio::test]
742746
async fn fin_ack_response_on_receiving_fin() {
743-
let (_substream, handle) = Substream::new();
747+
let (mut substream, mut handle) = Substream::new();
748+
749+
// Spawn task to consume inbound events sent to the substream
750+
let consumer_task = tokio::spawn(async move {
751+
// Substream should receive RecvClosed
752+
let mut buf = vec![0u8; 1024];
753+
match substream.read(&mut buf).await {
754+
Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
755+
// Expected - read half closed
756+
}
757+
other => panic!("Unexpected result: {:?}", other),
758+
}
759+
});
744760

745761
// Simulate receiving FIN from remote
746762
handle
@@ -751,8 +767,17 @@ mod tests {
751767
.await
752768
.unwrap();
753769

754-
// Should have sent FIN_ACK back (this would be captured by the connection layer)
755-
// In real scenario, the connection would read from handle.rx
770+
// Wait for consumer task to complete
771+
consumer_task.await.unwrap();
772+
773+
// Verify FIN_ACK was sent outbound to network
774+
assert_eq!(
775+
handle.next().await,
776+
Some(Event::Message {
777+
payload: vec![],
778+
flag: Some(Flag::FinAck as i32)
779+
})
780+
);
756781
}
757782

758783
#[tokio::test]
@@ -910,7 +935,7 @@ mod tests {
910935

911936
#[tokio::test]
912937
async fn flag_are_mutually_exclusive() {
913-
let (mut substream, handle) = Substream::new();
938+
let (_substream, handle) = Substream::new();
914939

915940
// Test that STOP_SENDING (1) is handled correctly
916941
handle
@@ -1085,7 +1110,6 @@ mod tests {
10851110

10861111
#[tokio::test]
10871112
async fn closing_state_blocks_writes() {
1088-
use std::{pin::Pin, task::Poll};
10891113
use tokio::io::AsyncWriteExt;
10901114

10911115
let (mut substream, handle) = Substream::new();

0 commit comments

Comments
 (0)