Skip to content

Commit 8578487

Browse files
committed
webrtc: Fix data channel closure by detecting substream drop regardless of state
1 parent 4695143 commit 8578487

File tree

1 file changed

+112
-1
lines changed

1 file changed

+112
-1
lines changed

src/transport/webrtc/substream.rs

Lines changed: 112 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,9 @@ impl SubstreamHandle {
157157
if flag == Flag::Fin as i32 {
158158
// Received FIN from remote, close our read half
159159
self.inbound_tx.send(Event::RecvClosed).await?;
160+
160161
// Send FIN_ACK back to remote
162+
// Note: We stay in current state to allow shutdown() to send our own FIN if needed
161163
return self
162164
.outbound_tx
163165
.send(Event::Message {
@@ -212,7 +214,27 @@ impl Stream for SubstreamHandle {
212214
type Item = Event;
213215

214216
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
215-
self.rx.poll_recv(cx)
217+
// First, try to drain any pending outbound messages
218+
match self.rx.poll_recv(cx) {
219+
Poll::Ready(Some(event)) => return Poll::Ready(Some(event)),
220+
Poll::Ready(None) => {
221+
// Outbound channel closed (all senders dropped)
222+
return Poll::Ready(None);
223+
}
224+
Poll::Pending => {
225+
// No messages available, check if we should signal closure
226+
}
227+
}
228+
229+
// Check if Substream has been dropped (inbound channel closed)
230+
// When Substream is dropped, there will be no more outbound messages
231+
// Since we've already tried to recv above and got Pending, we know the queue is empty
232+
// Therefore, it's safe to signal closure
233+
if self.inbound_tx.is_closed() {
234+
return Poll::Ready(None);
235+
}
236+
237+
Poll::Pending
216238
}
217239
}
218240

@@ -1122,4 +1144,93 @@ mod tests {
11221144
assert!(result.is_err());
11231145
assert_eq!(result.unwrap_err().kind(), std::io::ErrorKind::BrokenPipe);
11241146
}
1147+
1148+
#[tokio::test]
1149+
async fn handle_signals_closure_after_substream_dropped() {
1150+
use futures::StreamExt;
1151+
1152+
let (mut substream, mut handle) = Substream::new();
1153+
1154+
// Complete shutdown handshake (client-initiated)
1155+
let shutdown_task = tokio::spawn(async move {
1156+
substream.shutdown().await.unwrap();
1157+
// Substream will be dropped here
1158+
});
1159+
1160+
// Receive FIN
1161+
assert_eq!(
1162+
handle.next().await,
1163+
Some(Event::Message {
1164+
payload: vec![],
1165+
flag: Some(Flag::Fin as i32)
1166+
})
1167+
);
1168+
1169+
// Send FIN_ACK
1170+
handle
1171+
.on_message(WebRtcMessage {
1172+
payload: None,
1173+
flag: Some(Flag::FinAck as i32),
1174+
})
1175+
.await
1176+
.unwrap();
1177+
1178+
// Wait for shutdown to complete and Substream to drop
1179+
shutdown_task.await.unwrap();
1180+
1181+
// Verify handle signals closure (returns None)
1182+
assert_eq!(
1183+
handle.next().await,
1184+
None,
1185+
"SubstreamHandle should signal closure after Substream is dropped"
1186+
);
1187+
}
1188+
1189+
#[tokio::test]
1190+
async fn server_side_closure_after_receiving_fin() {
1191+
use futures::StreamExt;
1192+
1193+
let (mut substream, mut handle) = Substream::new();
1194+
1195+
// Spawn task to consume from substream (server side)
1196+
let server_task = tokio::spawn(async move {
1197+
let mut buf = vec![0u8; 1024];
1198+
// This should fail because we receive RecvClosed
1199+
match substream.read(&mut buf).await {
1200+
Err(e) if e.kind() == std::io::ErrorKind::BrokenPipe => {
1201+
// Expected - read half closed by FIN
1202+
}
1203+
other => panic!("Unexpected result: {:?}", other),
1204+
}
1205+
// Substream dropped here (server closes after receiving FIN)
1206+
});
1207+
1208+
// Remote (client) sends FIN
1209+
handle
1210+
.on_message(WebRtcMessage {
1211+
payload: None,
1212+
flag: Some(Flag::Fin as i32),
1213+
})
1214+
.await
1215+
.unwrap();
1216+
1217+
// Verify FIN_ACK was sent back
1218+
assert_eq!(
1219+
handle.next().await,
1220+
Some(Event::Message {
1221+
payload: vec![],
1222+
flag: Some(Flag::FinAck as i32)
1223+
})
1224+
);
1225+
1226+
// Wait for server to close substream
1227+
server_task.await.unwrap();
1228+
1229+
// Verify handle signals closure (returns None) - this is the key fix!
1230+
assert_eq!(
1231+
handle.next().await,
1232+
None,
1233+
"SubstreamHandle should signal closure after server receives FIN and drops Substream"
1234+
);
1235+
}
11251236
}

0 commit comments

Comments
 (0)