Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
196d625
update strom dependency which fixes disable remote fingerprint verifi…
timwu20 Oct 7, 2025
b875bbd
change str0m to 0.11.1
timwu20 Oct 20, 2025
925caa9
fix lint
timwu20 Oct 21, 2025
4200246
set and listen on ChannelBufferedAmountLow after stream closure
timwu20 Oct 20, 2025
85fae21
fix: multistream-select negotiation for outbound webrtc substreams
haikoschol Oct 27, 2025
2ec539b
close data channel immediately when substream is closed
haikoschol Nov 12, 2025
4e4a2d3
remove setting of buffered amount low threshold in str0m
haikoschol Nov 12, 2025
7947bf6
remove special case of receiving None instead of a SubstreamEvent
haikoschol Nov 12, 2025
3765d52
move handling of trailing newline to webrtc_listener_negotiate()
haikoschol Nov 12, 2025
fdedc4f
cover all negotiation responses included in tests
haikoschol Nov 13, 2025
1cd945b
Merge branch 'master' into haiko-webrtc-outbound-multistream-nego-fix
haikoschol Nov 13, 2025
6dcb15d
address review feedback
haikoschol Nov 13, 2025
9621e3c
extract loop to drain_trailing_protocols()
haikoschol Nov 13, 2025
daa8f71
fmt
haikoschol Nov 19, 2025
afd44c8
Merge branch 'master' into haiko-webrtc-outbound-multistream-nego-fix
haikoschol Nov 19, 2025
99f6547
consider truncated multistream messages invalid data
haikoschol Nov 19, 2025
c9a0140
Merge branch 'master' into haiko-webrtc-outbound-multistream-nego-fix
haikoschol Nov 25, 2025
90f0c86
Apply suggestions from code review
haikoschol Nov 26, 2025
c0a44c0
avoid Message::Protocols in webrtc_listener_negotiate()
haikoschol Nov 26, 2025
c27930a
remove handling of Event::ChannelBufferedAmountLow
haikoschol Dec 2, 2025
4ed539e
Merge branch 'master' into haiko-webrtc-outbound-multistream-nego-fix
haikoschol Dec 2, 2025
12801fd
Merge branch 'master' into haiko-webrtc-outbound-multistream-nego-fix
haikoschol Dec 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
/target
.idea

34 changes: 15 additions & 19 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ rcgen = { version = "0.14.5", optional = true }
# End of Quic related dependencies.

# WebRTC related dependencies. WebRTC is an experimental feature flag. The dependencies must be updated.
str0m = { version = "0.9.0", optional = true }
str0m = { version = "0.11.1", optional = true }
# End of WebRTC related dependencies.

# Fuzzing related dependencies.
Expand Down
35 changes: 26 additions & 9 deletions src/multistream_select/dialer_select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,12 +357,25 @@ impl WebRtcDialerState {
&mut self,
payload: Vec<u8>,
) -> Result<HandshakeResult, crate::error::NegotiationError> {
let Message::Protocols(protocols) =
Message::decode(payload.into()).map_err(|_| ParseError::InvalidData)?
else {
return Err(crate::error::NegotiationError::MultistreamSelectError(
NegotiationError::Failed,
));

let protocols = match Message::decode(payload.into()) {
Ok(Message::Header(HeaderLine::V1)) => {
self.state = HandshakeState::WaitingProtocol;
return Ok(HandshakeResult::NotReady);
}
Ok(Message::Protocol(protocol)) => vec![protocol],
Ok(Message::Protocols(protocols)) => protocols,
Ok(Message::NotAvailable) => {
return match &self.state {
HandshakeState::WaitingProtocol =>
Err(error::NegotiationError::MultistreamSelectError(
NegotiationError::Failed
)),
_ => Err(error::NegotiationError::StateMismatch),
}
}
Ok(Message::ListProtocols) => return Err(error::NegotiationError::StateMismatch),
Err(_) => return Err(error::NegotiationError::ParseError(ParseError::InvalidData)),
};

let mut protocol_iter = protocols.into_iter();
Expand Down Expand Up @@ -410,7 +423,9 @@ mod tests {
use super::*;
use crate::multistream_select::listener_select_proto;
use std::time::Duration;
use bytes::BufMut;
use tokio::net::{TcpListener, TcpStream};
use crate::multistream_select::protocol::MSG_MULTISTREAM_1_0;

#[tokio::test]
async fn select_proto_basic() {
Expand Down Expand Up @@ -805,17 +820,19 @@ mod tests {
}

#[test]
fn register_response_invalid_message() {
// send only header line
fn register_response_header_only() {
let mut bytes = BytesMut::with_capacity(32);
bytes.put_u8(MSG_MULTISTREAM_1_0.len() as u8);

let message = Message::Header(HeaderLine::V1);
message.encode(&mut bytes).map_err(|_| Error::InvalidData).unwrap();

let (mut dialer_state, _message) =
WebRtcDialerState::propose(ProtocolName::from("/13371338/proto/1"), vec![]).unwrap();

match dialer_state.register_response(bytes.freeze().to_vec()) {
Err(error::NegotiationError::MultistreamSelectError(NegotiationError::Failed)) => {}
Ok(HandshakeResult::NotReady) => {},
Err(err) => panic!("unexpected error: {:?}", err),
event => panic!("invalid event: {event:?}"),
}
}
Expand Down
26 changes: 22 additions & 4 deletions src/multistream_select/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,10 @@ impl Message {

// Skip ahead to the next protocol.
remaining = &tail[len..];
if remaining.is_empty() {
// During negotiation the remote may not append a trailing newline.
break;
}
}

Ok(Message::Protocols(protocols))
Expand All @@ -230,7 +234,7 @@ impl Message {
///
/// # Note
///
/// This is implementation is not compliant with the multistream-select protocol spec.
/// This implementation may not be compliant with the multistream-select protocol spec.
/// The only purpose of this was to get the `multistream-select` protocol working with smoldot.
pub fn webrtc_encode_multistream_message(
messages: impl IntoIterator<Item = Message>,
Expand All @@ -249,9 +253,6 @@ pub fn webrtc_encode_multistream_message(
header.append(&mut proto_bytes);
}

// For the `Message::Protocols` to be interpreted correctly, it must be followed by a newline.
header.push(b'\n');

Ok(BytesMut::from(&header[..]))
}

Expand Down Expand Up @@ -542,4 +543,21 @@ mod tests {
ProtocolError::InvalidMessage
);
}

#[test]
fn test_decode_multiple_protocols_no_trailing_newline() {
let raw: [u8; 38] = [
19, 47, 109, 117, 108, 116, 105, 115, 116, 114, 101, 97, 109, 47, 49, 46, 48, 46, 48,
10, 17, 47, 105, 112, 102, 115, 47, 112, 105, 110, 103, 47, 49, 46, 48, 46, 48, 10,
];
let bytes = Bytes::copy_from_slice(&raw);

assert_eq!(
Message::decode(bytes).unwrap(),
Message::Protocols(vec![
Protocol::try_from(Bytes::from_static(b"/multistream/1.0.0")).unwrap(),
Protocol::try_from(Bytes::from_static(b"/ipfs/ping/1.0.0")).unwrap(),
])
);
}
}
39 changes: 32 additions & 7 deletions src/transport/webrtc/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,21 +382,32 @@ impl WebRtcConnection {
target: LOG_TARGET,
peer = ?self.peer,
?channel_id,
data_len = ?data.len(),
"handle opening outbound substream",
);

let rtc_message = WebRtcMessage::decode(&data)
.map_err(|err| SubstreamError::NegotiationError(err.into()))?;
let message = rtc_message.payload.ok_or(SubstreamError::NegotiationError(
let payload = rtc_message.payload.ok_or(SubstreamError::NegotiationError(
ParseError::InvalidData.into(),
))?;

let HandshakeResult::Succeeded(protocol) = dialer_state.register_response(message)? else {
// All multistream-select messages are length-prefixed. Since this code path is not using
// multistream_select::protocol::MessageIO, we need to decode and remove the length here.
let remaining: &[u8] = &payload;
let (len, tail) = unsigned_varint::decode::usize(remaining).
map_err(|_| SubstreamError::NegotiationError(
ParseError::InvalidData.into(),
))?;

let message = tail[..len].to_vec();

let HandshakeResult::Succeeded(protocol) = dialer_state.register_response(message)? else {
tracing::trace!(
target: LOG_TARGET,
peer = ?self.peer,
?channel_id,
"multisteam-select handshake not ready",
"multistream-select handshake not ready",
);

self.channels.insert(
Expand Down Expand Up @@ -631,6 +642,8 @@ impl WebRtcConnection {
protocol: protocol.to_string(),
});

// self.rtc.channel(channel_id).unwrap().set_buffered_amount_low_threshold(1024);

tracing::trace!(
target: LOG_TARGET,
peer = ?self.peer,
Expand Down Expand Up @@ -742,6 +755,20 @@ impl WebRtcConnection {

continue;
}
Event::ChannelBufferedAmountLow(channel_id) => {
if let Some(ChannelState::Closing) = self.channels.get(&channel_id) {
tracing::trace!(
target: LOG_TARGET,
peer = ?self.peer,
?channel_id,
"buffer drained, closing channel",
);
self.rtc.direct_api().close_data_channel(channel_id);
self.handles.remove(&channel_id);
}

continue;
}
event => {
tracing::debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -787,17 +814,15 @@ impl WebRtcConnection {
},
event = self.handles.next() => match event {
None => unreachable!(),
Some((channel_id, None | Some(SubstreamEvent::Close))) => {
Some((_, None)) => {}
Some((channel_id, Some(SubstreamEvent::Close))) => {
tracing::trace!(
target: LOG_TARGET,
peer = ?self.peer,
?channel_id,
"channel closed",
);

self.rtc.direct_api().close_data_channel(channel_id);
self.channels.insert(channel_id, ChannelState::Closing);
self.handles.remove(&channel_id);
}
Some((channel_id, Some(SubstreamEvent::Message(data)))) => {
if let Err(error) = self.on_outbound_data(channel_id, data) {
Expand Down
8 changes: 4 additions & 4 deletions src/transport/webrtc/opening.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,14 @@ impl OpeningWebRtcConnection {
.rtc
.direct_api()
.remote_dtls_fingerprint()
.clone()
.expect("fingerprint to exist");
.expect("fingerprint to exist")
.clone();
Self::fingerprint_to_bytes(&fingerprint)
}

/// Get local fingerprint as bytes.
fn local_fingerprint(&mut self) -> Vec<u8> {
Self::fingerprint_to_bytes(&self.rtc.direct_api().local_dtls_fingerprint())
Self::fingerprint_to_bytes(self.rtc.direct_api().local_dtls_fingerprint())
}

/// Convert `Fingerprint` to bytes.
Expand Down Expand Up @@ -268,8 +268,8 @@ impl OpeningWebRtcConnection {
.rtc
.direct_api()
.remote_dtls_fingerprint()
.clone()
.expect("fingerprint to exist")
.clone()
.bytes;

const MULTIHASH_SHA256_CODE: u64 = 0x12;
Expand Down