diff --git a/Cargo.lock b/Cargo.lock index ed9f5816a..0bb053e21 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -997,15 +997,6 @@ dependencies = [ "rustc_version", ] -[[package]] -name = "fxhash" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" -dependencies = [ - "byteorder", -] - [[package]] name = "generic-array" version = "0.14.9" @@ -2771,7 +2762,7 @@ dependencies = [ "pin-project-lite 0.2.16", "quinn-proto", "quinn-udp", - "rustc-hash", + "rustc-hash 1.1.0", "rustls 0.20.9", "thiserror 1.0.69", "tokio", @@ -2788,7 +2779,7 @@ dependencies = [ "bytes", "rand 0.8.5", "ring 0.16.20", - "rustc-hash", + "rustc-hash 1.1.0", "rustls 0.20.9", "slab", "thiserror 1.0.69", @@ -3024,6 +3015,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc-hash" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" + [[package]] name = "rustc_version" version = "0.4.1" @@ -3162,17 +3159,17 @@ dependencies = [ [[package]] name = "sctp-proto" -version = "0.3.0" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4dea4fe3384a24652f065296ac333c810dfd0c5b39b98a2214762c16aaadc3c" +checksum = "423139d8cca3021b9d800f084a711ba2d23b508ae71b33dba167f11ca33e54c7" dependencies = [ "bytes", "crc", - "fxhash", "log", - "rand 0.8.5", + "rand 0.9.2", + "rustc-hash 2.1.1", "slab", - "thiserror 1.0.69", + "thiserror 2.0.17", ] [[package]] @@ -3455,9 +3452,9 @@ checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" [[package]] name = "str0m" -version = "0.9.0" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7f9fdffeb677e1d5d2cf84993f865143680b8e5eece3396fa488d43b34c1245" +checksum = "26890ff5b60e33eb8bedcf44792fc459c8f348ecbf2658edb19477571e547ac2" dependencies = [ "combine", "crc", @@ -3470,7 +3467,6 @@ dependencies = [ "sctp-proto", "serde", "sha1", - "thiserror 1.0.69", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index 893e94558..eb1ed99c4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,7 +65,7 @@ rcgen = { version = "0.14.5", optional = true } # End of Quic related dependencies. # WebRTC related dependencies. WebRTC is an experimental feature flag. The dependencies must be updated. -str0m = { version = "0.9.0", optional = true } +str0m = { version = "0.11.1", optional = true } # End of WebRTC related dependencies. # Fuzzing related dependencies. diff --git a/src/multistream_select/protocol.rs b/src/multistream_select/protocol.rs index b24b31da2..8ef11a0cb 100644 --- a/src/multistream_select/protocol.rs +++ b/src/multistream_select/protocol.rs @@ -220,6 +220,10 @@ impl Message { // Skip ahead to the next protocol. remaining = &tail[len..]; + if remaining.is_empty() { + // During negotiation the remote may not append a trailing newline. + break; + } } Ok(Message::Protocols(protocols)) @@ -230,7 +234,7 @@ impl Message { /// /// # Note /// -/// This is implementation is not compliant with the multistream-select protocol spec. +/// This implementation may not be compliant with the multistream-select protocol spec. /// The only purpose of this was to get the `multistream-select` protocol working with smoldot. pub fn webrtc_encode_multistream_message( messages: impl IntoIterator, @@ -249,9 +253,6 @@ pub fn webrtc_encode_multistream_message( header.append(&mut proto_bytes); } - // For the `Message::Protocols` to be interpreted correctly, it must be followed by a newline. - header.push(b'\n'); - Ok(BytesMut::from(&header[..])) } @@ -542,4 +543,21 @@ mod tests { ProtocolError::InvalidMessage ); } + + #[test] + fn test_decode_multiple_protocols_no_trailing_newline() { + let raw: [u8; 38] = [ + 19, 47, 109, 117, 108, 116, 105, 115, 116, 114, 101, 97, 109, 47, 49, 46, 48, 46, 48, + 10, 17, 47, 105, 112, 102, 115, 47, 112, 105, 110, 103, 47, 49, 46, 48, 46, 48, 10, + ]; + let bytes = Bytes::copy_from_slice(&raw); + + assert_eq!( + Message::decode(bytes).unwrap(), + Message::Protocols(vec![ + Protocol::try_from(Bytes::from_static(b"/multistream/1.0.0")).unwrap(), + Protocol::try_from(Bytes::from_static(b"/ipfs/ping/1.0.0")).unwrap(), + ]) + ); + } } diff --git a/src/transport/webrtc/connection.rs b/src/transport/webrtc/connection.rs index 6c1e57462..0fa7f098c 100644 --- a/src/transport/webrtc/connection.rs +++ b/src/transport/webrtc/connection.rs @@ -631,6 +631,8 @@ impl WebRtcConnection { protocol: protocol.to_string(), }); + self.rtc.channel(channel_id).unwrap().set_buffered_amount_low_threshold(1024); + tracing::trace!( target: LOG_TARGET, peer = ?self.peer, @@ -742,6 +744,20 @@ impl WebRtcConnection { continue; } + Event::ChannelBufferedAmountLow(channel_id) => { + if let Some(ChannelState::Closing) = self.channels.get(&channel_id) { + tracing::trace!( + target: LOG_TARGET, + peer = ?self.peer, + ?channel_id, + "buffer drained, closing channel", + ); + self.rtc.direct_api().close_data_channel(channel_id); + self.handles.remove(&channel_id); + } + + continue; + } event => { tracing::debug!( target: LOG_TARGET, @@ -794,10 +810,7 @@ impl WebRtcConnection { ?channel_id, "channel closed", ); - - self.rtc.direct_api().close_data_channel(channel_id); self.channels.insert(channel_id, ChannelState::Closing); - self.handles.remove(&channel_id); } Some((channel_id, Some(SubstreamEvent::Message(data)))) => { if let Err(error) = self.on_outbound_data(channel_id, data) { diff --git a/src/transport/webrtc/opening.rs b/src/transport/webrtc/opening.rs index 90fcfcc8c..ba6e2af7a 100644 --- a/src/transport/webrtc/opening.rs +++ b/src/transport/webrtc/opening.rs @@ -176,14 +176,14 @@ impl OpeningWebRtcConnection { .rtc .direct_api() .remote_dtls_fingerprint() - .clone() - .expect("fingerprint to exist"); + .expect("fingerprint to exist") + .clone(); Self::fingerprint_to_bytes(&fingerprint) } /// Get local fingerprint as bytes. fn local_fingerprint(&mut self) -> Vec { - Self::fingerprint_to_bytes(&self.rtc.direct_api().local_dtls_fingerprint()) + Self::fingerprint_to_bytes(self.rtc.direct_api().local_dtls_fingerprint()) } /// Convert `Fingerprint` to bytes. @@ -268,8 +268,8 @@ impl OpeningWebRtcConnection { .rtc .direct_api() .remote_dtls_fingerprint() - .clone() .expect("fingerprint to exist") + .clone() .bytes; const MULTIHASH_SHA256_CODE: u64 = 0x12;