-
Notifications
You must be signed in to change notification settings - Fork 27
fix: multistream-select negotiation on outbound substream over webrtc #465
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 17 commits
196d625
b875bbd
925caa9
4200246
85fae21
2ec539b
4e4a2d3
7947bf6
3765d52
fdedc4f
1cd945b
6dcb15d
9621e3c
daa8f71
afd44c8
99f6547
c9a0140
90f0c86
c0a44c0
c27930a
4ed539e
12801fd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1,3 @@ | ||
| /target | ||
| .idea | ||
|
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,11 +22,11 @@ | |
|
|
||
| use crate::{ | ||
| codec::unsigned_varint::UnsignedVarint, | ||
| error::{self, Error, ParseError}, | ||
| error::{self, Error, ParseError, SubstreamError}, | ||
| multistream_select::{ | ||
| protocol::{ | ||
| webrtc_encode_multistream_message, HeaderLine, Message, MessageIO, Protocol, | ||
| ProtocolError, | ||
| ProtocolError, PROTO_MULTISTREAM_1_0, | ||
| }, | ||
| Negotiated, NegotiationError, Version, | ||
| }, | ||
|
|
@@ -357,24 +357,57 @@ impl WebRtcDialerState { | |
| &mut self, | ||
| payload: Vec<u8>, | ||
| ) -> Result<HandshakeResult, crate::error::NegotiationError> { | ||
| let Message::Protocols(protocols) = | ||
| Message::decode(payload.into()).map_err(|_| ParseError::InvalidData)? | ||
| else { | ||
| return Err(crate::error::NegotiationError::MultistreamSelectError( | ||
| NegotiationError::Failed, | ||
| )); | ||
| // All multistream-select messages are length-prefixed. Since this code path is not using | ||
| // multistream_select::protocol::MessageIO, we need to decode and remove the length here. | ||
| let remaining: &[u8] = &payload; | ||
| let (len, tail) = unsigned_varint::decode::usize(remaining).map_err(|error| { | ||
| tracing::debug!( | ||
| target: LOG_TARGET, | ||
| ?error, | ||
| message = ?payload, | ||
| "Failed to decode length-prefix in multistream message"); | ||
| error::NegotiationError::ParseError(ParseError::InvalidData) | ||
| })?; | ||
|
|
||
| let payload = tail[..len].to_vec(); | ||
|
|
||
haikoschol marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| let message = Message::decode(payload.into()); | ||
haikoschol marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| tracing::trace!( | ||
| target: LOG_TARGET, | ||
| ?message, | ||
| "Decoded message while registering response", | ||
| ); | ||
|
|
||
| let mut protocols = match message { | ||
| Ok(Message::Header(HeaderLine::V1)) => { | ||
| vec![PROTO_MULTISTREAM_1_0] | ||
| } | ||
| Ok(Message::Protocol(protocol)) => vec![protocol], | ||
| Ok(Message::Protocols(protocols)) => protocols, | ||
| Ok(Message::NotAvailable) => | ||
| return match &self.state { | ||
| HandshakeState::WaitingProtocol => Err( | ||
| error::NegotiationError::MultistreamSelectError(NegotiationError::Failed), | ||
| ), | ||
| _ => Err(error::NegotiationError::StateMismatch), | ||
| }, | ||
| Ok(Message::ListProtocols) => return Err(error::NegotiationError::StateMismatch), | ||
| Err(_) => return Err(error::NegotiationError::ParseError(ParseError::InvalidData)), | ||
| }; | ||
|
|
||
| match drain_trailing_protocols(tail, len) { | ||
haikoschol marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Ok(protos) => protocols.extend(protos), | ||
| Err(error) => return Err(error), | ||
| } | ||
|
|
||
| let mut protocol_iter = protocols.into_iter(); | ||
| loop { | ||
| match (&self.state, protocol_iter.next()) { | ||
| (HandshakeState::WaitingResponse, None) => | ||
| return Err(crate::error::NegotiationError::StateMismatch), | ||
| (HandshakeState::WaitingResponse, Some(protocol)) => { | ||
| let header = Protocol::try_from(&b"/multistream/1.0.0"[..]) | ||
| .expect("valid multitstream-select header"); | ||
|
|
||
| if protocol == header { | ||
| if protocol == PROTO_MULTISTREAM_1_0 { | ||
| self.state = HandshakeState::WaitingProtocol; | ||
| } else { | ||
| return Err(crate::error::NegotiationError::MultistreamSelectError( | ||
|
|
@@ -405,10 +438,66 @@ impl WebRtcDialerState { | |
| } | ||
| } | ||
|
|
||
| fn drain_trailing_protocols( | ||
| tail: &[u8], | ||
| len: usize, | ||
| ) -> Result<Vec<Protocol>, error::NegotiationError> { | ||
| let mut protocols = vec![]; | ||
| let mut remaining = &tail[len..]; | ||
haikoschol marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| loop { | ||
| if remaining.is_empty() { | ||
| break; | ||
| } | ||
|
|
||
| let (len, tail) = unsigned_varint::decode::usize(remaining).map_err(|error| { | ||
haikoschol marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| tracing::debug!( | ||
| target: LOG_TARGET, | ||
| ?error, | ||
| message = ?remaining, | ||
| "Failed to decode length-prefix in multistream message"); | ||
| error::NegotiationError::ParseError(ParseError::InvalidData) | ||
| })?; | ||
|
|
||
| if len > tail.len() { | ||
| tracing::debug!( | ||
| target: LOG_TARGET, | ||
| message = ?tail, | ||
| length_prefix = len, | ||
| actual_length = tail.len(), | ||
| "Truncated multistream message", | ||
| ); | ||
|
|
||
| return Err(error::NegotiationError::ParseError(ParseError::InvalidData)); | ||
| } | ||
|
|
||
| let payload = tail[..len].to_vec(); | ||
|
|
||
| match Message::decode(payload.into()) { | ||
haikoschol marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Ok(Message::Protocol(protocol)) => protocols.push(protocol), | ||
| Err(error) => { | ||
| tracing::debug!( | ||
| target: LOG_TARGET, | ||
| ?error, | ||
| message = ?tail[..len], | ||
| "Failed to decode multistream message", | ||
| ); | ||
| return Err(error::NegotiationError::ParseError(ParseError::InvalidData)); | ||
| } | ||
| _ => return Err(error::NegotiationError::StateMismatch), | ||
| } | ||
|
|
||
| remaining = &tail[len..]; | ||
haikoschol marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| Ok(protocols) | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use super::*; | ||
| use crate::multistream_select::listener_select_proto; | ||
| use crate::multistream_select::{listener_select_proto, protocol::MSG_MULTISTREAM_1_0}; | ||
| use bytes::BufMut; | ||
| use std::time::Duration; | ||
| use tokio::net::{TcpListener, TcpStream}; | ||
|
|
||
|
|
@@ -755,23 +844,18 @@ mod tests { | |
| fn propose() { | ||
| let (mut dialer_state, message) = | ||
| WebRtcDialerState::propose(ProtocolName::from("/13371338/proto/1"), vec![]).unwrap(); | ||
| let message = bytes::BytesMut::from(&message[..]).freeze(); | ||
|
|
||
| let Message::Protocols(protocols) = Message::decode(message).unwrap() else { | ||
| panic!("invalid message type"); | ||
| }; | ||
| let mut bytes = BytesMut::with_capacity(32); | ||
| bytes.put_u8(MSG_MULTISTREAM_1_0.len() as u8); | ||
| let _ = Message::Header(HeaderLine::V1).encode(&mut bytes).unwrap(); | ||
|
|
||
| assert_eq!(protocols.len(), 2); | ||
| assert_eq!( | ||
| protocols[0], | ||
| Protocol::try_from(&b"/multistream/1.0.0"[..]) | ||
| .expect("valid multitstream-select header") | ||
| ); | ||
| assert_eq!( | ||
| protocols[1], | ||
| Protocol::try_from(&b"/13371338/proto/1"[..]) | ||
| .expect("valid multitstream-select header") | ||
| ); | ||
| let proto = Protocol::try_from(&b"/13371338/proto/1"[..]).expect("valid protocol name"); | ||
| bytes.put_u8((proto.as_ref().len() + 1) as u8); // + 1 for \n | ||
| let _ = Message::Protocol(proto).encode(&mut bytes).unwrap(); | ||
|
|
||
| let expected_message = bytes.freeze().to_vec(); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. dq: This slightly changes the meaning of the messages. Before we were expecting: Now we are expecting: Hmm, one of those representation can't be spec compliant right? Also are we ignoring malformated messages:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right now the test ensures that the wire format that |
||
|
|
||
| assert_eq!(message, expected_message); | ||
| } | ||
|
|
||
| #[test] | ||
|
|
@@ -781,59 +865,59 @@ mod tests { | |
| vec![ProtocolName::from("/sup/proto/1")], | ||
| ) | ||
| .unwrap(); | ||
| let message = bytes::BytesMut::from(&message[..]).freeze(); | ||
|
|
||
| let Message::Protocols(protocols) = Message::decode(message).unwrap() else { | ||
| panic!("invalid message type"); | ||
| }; | ||
| let mut bytes = BytesMut::with_capacity(32); | ||
| bytes.put_u8(MSG_MULTISTREAM_1_0.len() as u8); | ||
| let _ = Message::Header(HeaderLine::V1).encode(&mut bytes).unwrap(); | ||
|
|
||
| assert_eq!(protocols.len(), 3); | ||
| assert_eq!( | ||
| protocols[0], | ||
| Protocol::try_from(&b"/multistream/1.0.0"[..]) | ||
| .expect("valid multitstream-select header") | ||
| ); | ||
| assert_eq!( | ||
| protocols[1], | ||
| Protocol::try_from(&b"/13371338/proto/1"[..]) | ||
| .expect("valid multitstream-select header") | ||
| ); | ||
| assert_eq!( | ||
| protocols[2], | ||
| Protocol::try_from(&b"/sup/proto/1"[..]).expect("valid multitstream-select header") | ||
| ); | ||
| let proto1 = Protocol::try_from(&b"/13371338/proto/1"[..]).expect("valid protocol name"); | ||
| bytes.put_u8((proto1.as_ref().len() + 1) as u8); // + 1 for \n | ||
| let _ = Message::Protocol(proto1).encode(&mut bytes).unwrap(); | ||
|
|
||
| let proto2 = Protocol::try_from(&b"/sup/proto/1"[..]).expect("valid protocol name"); | ||
| bytes.put_u8((proto2.as_ref().len() + 1) as u8); // + 1 for \n | ||
| let _ = Message::Protocol(proto2).encode(&mut bytes).unwrap(); | ||
|
|
||
| let expected_message = bytes.freeze().to_vec(); | ||
|
|
||
| assert_eq!(message, expected_message); | ||
| } | ||
|
|
||
| #[test] | ||
| fn register_response_invalid_message() { | ||
| // send only header line | ||
| fn register_response_header_only() { | ||
| let mut bytes = BytesMut::with_capacity(32); | ||
| bytes.put_u8(MSG_MULTISTREAM_1_0.len() as u8); | ||
|
|
||
| let message = Message::Header(HeaderLine::V1); | ||
| message.encode(&mut bytes).map_err(|_| Error::InvalidData).unwrap(); | ||
|
|
||
| let (mut dialer_state, _message) = | ||
| WebRtcDialerState::propose(ProtocolName::from("/13371338/proto/1"), vec![]).unwrap(); | ||
|
|
||
| match dialer_state.register_response(bytes.freeze().to_vec()) { | ||
| Err(error::NegotiationError::MultistreamSelectError(NegotiationError::Failed)) => {} | ||
| Ok(HandshakeResult::NotReady) => {} | ||
| Err(err) => panic!("unexpected error: {:?}", err), | ||
| event => panic!("invalid event: {event:?}"), | ||
| } | ||
| } | ||
|
|
||
| #[test] | ||
| fn header_line_missing() { | ||
| // header line missing | ||
| let mut bytes = BytesMut::with_capacity(256); | ||
| let message = Message::Protocols(vec![ | ||
| Protocol::try_from(&b"/13371338/proto/1"[..]).unwrap(), | ||
| Protocol::try_from(&b"/sup/proto/1"[..]).unwrap(), | ||
| ]); | ||
| message.encode(&mut bytes).map_err(|_| Error::InvalidData).unwrap(); | ||
| let proto = b"/13371338/proto/1"; | ||
| let mut bytes = BytesMut::with_capacity(proto.len() + 2); | ||
| bytes.put_u8((proto.len() + 1) as u8); | ||
|
|
||
| let response = Message::Protocol(Protocol::try_from(&proto[..]).unwrap()) | ||
| .encode(&mut bytes) | ||
| .expect("valid message encodes"); | ||
|
|
||
| let response = bytes.freeze().to_vec(); | ||
|
|
||
| let (mut dialer_state, _message) = | ||
| WebRtcDialerState::propose(ProtocolName::from("/13371338/proto/1"), vec![]).unwrap(); | ||
|
|
||
| match dialer_state.register_response(bytes.freeze().to_vec()) { | ||
| match dialer_state.register_response(response) { | ||
| Err(error::NegotiationError::MultistreamSelectError(NegotiationError::Failed)) => {} | ||
| event => panic!("invalid event: {event:?}"), | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.