Skip to content

Commit 2d78d7e

Browse files
committed
webrtc: Address review feedback for FIN/FIN_ACK handshake
1 parent 057adbd commit 2d78d7e

File tree

1 file changed

+114
-27
lines changed

1 file changed

+114
-27
lines changed

src/transport/webrtc/substream.rs

Lines changed: 114 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,25 @@ pub struct SubstreamHandle {
162162
impl SubstreamHandle {
163163
/// Handle message received from a remote peer.
164164
///
165-
/// If the message contains a flag, handle it first and appropriately close the correct
166-
/// side of the substream. If the message contained any payload, send it to the protocol for
167-
/// further processing.
165+
/// Process an incoming WebRTC message, handling any payload and flags.
166+
///
167+
/// Payload is processed first (if present), then flags are handled. This ensures that
168+
/// a FIN message containing final data will deliver that data before signaling closure.
168169
pub async fn on_message(&self, message: WebRtcMessage) -> crate::Result<()> {
170+
// Process payload first, before handling flags.
171+
// This ensures that if a FIN message contains data, we deliver it before closing.
172+
if let Some(payload) = message.payload {
173+
if !payload.is_empty() {
174+
self.inbound_tx
175+
.send(Event::Message {
176+
payload,
177+
flag: None,
178+
})
179+
.await?;
180+
}
181+
}
182+
183+
// Now handle flags
169184
if let Some(flag) = message.flag {
170185
match flag {
171186
Flag::Fin => {
@@ -191,6 +206,12 @@ impl SubstreamHandle {
191206
*state = State::FinAcked;
192207
// Wake up any task waiting on shutdown
193208
self.shutdown_waker.wake();
209+
} else {
210+
tracing::warn!(
211+
target: "litep2p::webrtc::substream",
212+
?state,
213+
"received FIN_ACK in unexpected state, ignoring"
214+
);
194215
}
195216
return Ok(());
196217
}
@@ -214,19 +235,6 @@ impl SubstreamHandle {
214235
}
215236
}
216237

217-
if let Some(payload) = message.payload {
218-
if !payload.is_empty() {
219-
return self
220-
.inbound_tx
221-
.send(Event::Message {
222-
payload,
223-
flag: None,
224-
})
225-
.await
226-
.map_err(From::from);
227-
}
228-
}
229-
230238
Ok(())
231239
}
232240
}
@@ -395,7 +403,10 @@ impl tokio::io::AsyncWrite for Substream {
395403
}
396404

397405
State::Closing => {
398-
// Already in closing state, continue with shutdown process
406+
// Already in closing state, continue with shutdown process.
407+
// Note: Concurrent calls to poll_shutdown violate AsyncWrite's contract
408+
// (requires &mut self). If this somehow happens, duplicate FIN messages
409+
// could be sent, but the protocol handles this gracefully.
399410
}
400411

401412
State::SendClosed => {
@@ -420,17 +431,16 @@ impl tokio::io::AsyncWrite for Substream {
420431
flag: Some(Flag::Fin),
421432
}) {
422433
Ok(()) => {
423-
// Register waker BEFORE transitioning state to avoid race condition:
424-
// If FIN_ACK arrives between state transition and waker registration,
425-
// on_message would call wake() before the waker is registered, causing
426-
// a missed wakeup.
427-
self.shutdown_waker.register(cx.waker());
428-
429-
// Transition to FinSent after successfully sending FIN
434+
// Transition to FinSent FIRST so that on_message can recognize FIN_ACK.
435+
// If we registered the waker first, a FIN_ACK arriving before state transition
436+
// would be ignored (on_message checks for FinSent state).
430437
*self.state.lock() = State::FinSent;
431438

432-
// Re-check state in case FIN_ACK arrived between state transition and now
433-
// (on_message may have already transitioned us to FinAcked)
439+
// Now register waker so we'll be notified when FIN_ACK arrives
440+
self.shutdown_waker.register(cx.waker());
441+
442+
// Re-check state in case FIN_ACK arrived between state transition and
443+
// waker registration (on_message may have already transitioned us to FinAcked)
434444
if matches!(*self.state.lock(), State::FinAcked) {
435445
return Poll::Ready(Ok(()));
436446
}
@@ -794,7 +804,7 @@ mod tests {
794804
let (mut substream, mut handle) = Substream::new();
795805

796806
// Spawn shutdown since it waits for FIN_ACK
797-
let _shutdown_task = tokio::spawn(async move {
807+
let shutdown_task = tokio::spawn(async move {
798808
substream.shutdown().await.unwrap();
799809
});
800810

@@ -809,6 +819,18 @@ mod tests {
809819

810820
// Verify state is FinSent
811821
assert!(matches!(*handle.state.lock(), State::FinSent));
822+
823+
// Send FIN_ACK to complete shutdown cleanly (avoids waiting for timeout)
824+
handle
825+
.on_message(WebRtcMessage {
826+
payload: None,
827+
flag: Some(Flag::FinAck),
828+
})
829+
.await
830+
.unwrap();
831+
832+
// Wait for shutdown to complete
833+
shutdown_task.await.unwrap();
812834
}
813835

814836
#[tokio::test]
@@ -1373,4 +1395,69 @@ mod tests {
13731395
"SubstreamHandle should signal closure after server receives FIN and drops Substream"
13741396
);
13751397
}
1398+
1399+
#[tokio::test]
1400+
async fn simultaneous_close() {
1401+
// Test simultaneous close where both sides send FIN at the same time.
1402+
// This verifies that:
1403+
// 1. Both sides can be in FinSent state simultaneously
1404+
// 2. Both sides correctly respond to FIN with FIN_ACK even when in FinSent state
1405+
// 3. Both sides eventually transition to FinAcked
1406+
1407+
let (mut substream, mut handle) = Substream::new();
1408+
1409+
// Local side initiates shutdown (sends FIN, transitions to FinSent)
1410+
let shutdown_task = tokio::spawn(async move {
1411+
substream.shutdown().await.unwrap();
1412+
});
1413+
1414+
// Wait for local FIN to be sent
1415+
assert_eq!(
1416+
handle.next().await,
1417+
Some(Event::Message {
1418+
payload: vec![],
1419+
flag: Some(Flag::Fin)
1420+
})
1421+
);
1422+
1423+
// Verify local is in FinSent state
1424+
assert!(matches!(*handle.state.lock(), State::FinSent));
1425+
1426+
// Now simulate remote also sending FIN (simultaneous close)
1427+
// This should trigger FIN_ACK response even though we're in FinSent state
1428+
handle
1429+
.on_message(WebRtcMessage {
1430+
payload: None,
1431+
flag: Some(Flag::Fin),
1432+
})
1433+
.await
1434+
.unwrap();
1435+
1436+
// Local should send FIN_ACK in response to remote's FIN
1437+
assert_eq!(
1438+
handle.next().await,
1439+
Some(Event::Message {
1440+
payload: vec![],
1441+
flag: Some(Flag::FinAck)
1442+
})
1443+
);
1444+
1445+
// Local should still be in FinSent (waiting for FIN_ACK from remote)
1446+
assert!(matches!(*handle.state.lock(), State::FinSent));
1447+
1448+
// Now remote sends FIN_ACK (completing their side of the handshake)
1449+
handle
1450+
.on_message(WebRtcMessage {
1451+
payload: None,
1452+
flag: Some(Flag::FinAck),
1453+
})
1454+
.await
1455+
.unwrap();
1456+
1457+
// Local should now transition to FinAcked
1458+
assert!(matches!(*handle.state.lock(), State::FinAcked));
1459+
1460+
// Shutdown should complete successfully
1461+
shutdown_task.await.unwrap();
1462+
}
13761463
}

0 commit comments

Comments
 (0)