Skip to content

Commit f250fdb

Browse files
committed
webrtc: Fix data channel not closing after FIN/FIN_ACK by detecting substream drop
1 parent 4695143 commit f250fdb

File tree

1 file changed

+52
-0
lines changed

1 file changed

+52
-0
lines changed

src/transport/webrtc/substream.rs

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,17 @@ impl Stream for SubstreamHandle {
212212
type Item = Event;
213213

214214
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
215+
// Check if Substream has been dropped (inbound channel closed)
216+
// If so, and we're in a terminal state, signal stream closure
217+
if self.inbound_tx.is_closed() {
218+
let state = *self.state.lock();
219+
// Only close if we're in a terminal state (FinAcked or SendClosed)
220+
// This ensures proper cleanup after FIN/FIN_ACK handshake completes
221+
if matches!(state, State::FinAcked | State::SendClosed) {
222+
return Poll::Ready(None);
223+
}
224+
}
225+
215226
self.rx.poll_recv(cx)
216227
}
217228
}
@@ -1122,4 +1133,45 @@ mod tests {
11221133
assert!(result.is_err());
11231134
assert_eq!(result.unwrap_err().kind(), std::io::ErrorKind::BrokenPipe);
11241135
}
1136+
1137+
#[tokio::test]
1138+
async fn handle_signals_closure_after_substream_dropped() {
1139+
use futures::StreamExt;
1140+
1141+
let (mut substream, mut handle) = Substream::new();
1142+
1143+
// Complete shutdown handshake
1144+
let shutdown_task = tokio::spawn(async move {
1145+
substream.shutdown().await.unwrap();
1146+
// Substream will be dropped here
1147+
});
1148+
1149+
// Receive FIN
1150+
assert_eq!(
1151+
handle.next().await,
1152+
Some(Event::Message {
1153+
payload: vec![],
1154+
flag: Some(Flag::Fin as i32)
1155+
})
1156+
);
1157+
1158+
// Send FIN_ACK
1159+
handle
1160+
.on_message(WebRtcMessage {
1161+
payload: None,
1162+
flag: Some(Flag::FinAck as i32),
1163+
})
1164+
.await
1165+
.unwrap();
1166+
1167+
// Wait for shutdown to complete and Substream to drop
1168+
shutdown_task.await.unwrap();
1169+
1170+
// Verify handle signals closure (returns None)
1171+
assert_eq!(
1172+
handle.next().await,
1173+
None,
1174+
"SubstreamHandle should signal closure after Substream is dropped"
1175+
);
1176+
}
11251177
}

0 commit comments

Comments
 (0)