Skip to content

Commit 2fc4a2c

Browse files
committed
webrtc: Add timeout for FIN_ACK to prevent indefinite waiting
1 parent 0027ec7 commit 2fc4a2c

File tree

1 file changed

+70
-2
lines changed

1 file changed

+70
-2
lines changed

src/transport/webrtc/substream.rs

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,21 @@ use std::{
3333
pin::Pin,
3434
sync::Arc,
3535
task::{Context, Poll, Waker},
36+
time::{Duration, Instant},
3637
};
3738

3839
/// Maximum frame size.
3940
const MAX_FRAME_SIZE: usize = 16384;
4041

42+
/// Timeout for waiting on FIN_ACK after sending FIN.
43+
/// Matches go-libp2p's 5 second stream close timeout.
44+
#[cfg(not(test))]
45+
const FIN_ACK_TIMEOUT: Duration = Duration::from_secs(5);
46+
47+
/// Shorter timeout for tests.
48+
#[cfg(test)]
49+
const FIN_ACK_TIMEOUT: Duration = Duration::from_secs(2);
50+
4151
/// Substream event.
4252
#[derive(Debug, PartialEq, Eq)]
4353
pub enum Event {
@@ -87,6 +97,9 @@ pub struct Substream {
8797

8898
/// Waker to notify when shutdown completes (FIN_ACK received).
8999
shutdown_waker: Arc<Mutex<Option<Waker>>>,
100+
101+
/// Timestamp when FIN was sent, used for timeout.
102+
fin_sent_at: Option<Instant>,
90103
}
91104

92105
impl Substream {
@@ -111,6 +124,7 @@ impl Substream {
111124
rx: inbound_rx,
112125
read_buffer: BytesMut::new(),
113126
shutdown_waker,
127+
fin_sent_at: None,
114128
},
115129
handle,
116130
)
@@ -292,8 +306,21 @@ impl tokio::io::AsyncWrite for Substream {
292306
// Already received FIN_ACK, shutdown complete
293307
State::FinAcked => return Poll::Ready(Ok(())),
294308

295-
// Sent FIN, waiting for FIN_ACK - register waker and return Pending
309+
// Sent FIN, waiting for FIN_ACK - check timeout and return Pending
296310
State::FinSent => {
311+
// Check if we've exceeded the timeout waiting for FIN_ACK
312+
if let Some(sent_at) = self.fin_sent_at {
313+
if sent_at.elapsed() >= FIN_ACK_TIMEOUT {
314+
// Timeout exceeded, force complete shutdown
315+
tracing::debug!(
316+
target: "litep2p::webrtc::substream",
317+
"FIN_ACK timeout exceeded, forcing shutdown completion"
318+
);
319+
*self.state.lock() = State::FinAcked;
320+
return Poll::Ready(Ok(()));
321+
}
322+
}
323+
297324
// Store the waker so it can be triggered when we receive FIN_ACK
298325
*self.shutdown_waker.lock() = Some(cx.waker().clone());
299326
return Poll::Pending;
@@ -331,9 +358,18 @@ impl tokio::io::AsyncWrite for Substream {
331358
}) {
332359
Ok(()) => {
333360
// Transition to FinSent after successfully sending FIN
334-
// Now we need to wait for FIN_ACK, so store waker and return Pending
361+
// Record timestamp for timeout tracking
335362
*self.state.lock() = State::FinSent;
363+
self.fin_sent_at = Some(Instant::now());
336364
*self.shutdown_waker.lock() = Some(cx.waker().clone());
365+
366+
// Spawn timeout task to wake us after FIN_ACK_TIMEOUT
367+
let waker = cx.waker().clone();
368+
tokio::spawn(async move {
369+
tokio::time::sleep(FIN_ACK_TIMEOUT).await;
370+
waker.wake();
371+
});
372+
337373
Poll::Pending
338374
}
339375
Err(_) => Poll::Ready(Err(std::io::ErrorKind::BrokenPipe.into())),
@@ -975,6 +1011,38 @@ mod tests {
9751011
assert!(matches!(*handle.state.lock(), State::FinAcked));
9761012
}
9771013

1014+
#[tokio::test]
1015+
async fn shutdown_timeout_without_fin_ack() {
1016+
use tokio::time::{timeout, Duration};
1017+
1018+
let (mut substream, mut handle) = Substream::new();
1019+
1020+
// Spawn shutdown in background
1021+
let shutdown_task = tokio::spawn(async move {
1022+
substream.shutdown().await.unwrap();
1023+
});
1024+
1025+
// Wait for FIN to be sent
1026+
assert_eq!(handle.next().await, Some(Event::Message {
1027+
payload: vec![],
1028+
flag: Some(Flag::Fin as i32)
1029+
}));
1030+
1031+
// Verify we're in FinSent state
1032+
assert!(matches!(*handle.state.lock(), State::FinSent));
1033+
1034+
// DON'T send FIN_ACK - let it timeout
1035+
// The shutdown should complete after FIN_ACK_TIMEOUT (2 seconds in tests)
1036+
// Add a bit of buffer to the timeout
1037+
let result = timeout(Duration::from_secs(4), shutdown_task).await;
1038+
1039+
assert!(result.is_ok(), "Shutdown should complete after timeout");
1040+
assert!(result.unwrap().is_ok(), "Shutdown should succeed after timeout");
1041+
1042+
// Should have transitioned to FinAcked after timeout
1043+
assert!(matches!(*handle.state.lock(), State::FinAcked));
1044+
}
1045+
9781046
#[tokio::test]
9791047
async fn closing_state_blocks_writes() {
9801048
use tokio::io::AsyncWriteExt;

0 commit comments

Comments
 (0)