-
Notifications
You must be signed in to change notification settings - Fork 27
fix: multistream-select negotiation on outbound substream over webrtc #465
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
196d625
b875bbd
925caa9
4200246
85fae21
2ec539b
4e4a2d3
7947bf6
3765d52
fdedc4f
1cd945b
6dcb15d
9621e3c
daa8f71
afd44c8
99f6547
c9a0140
90f0c86
c0a44c0
c27930a
4ed539e
12801fd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1,3 @@ | ||
| /target | ||
| .idea | ||
|
|
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,11 +22,11 @@ | |
|
|
||
| use crate::{ | ||
| codec::unsigned_varint::UnsignedVarint, | ||
| error::{self, Error, ParseError}, | ||
| error::{self, Error, ParseError, SubstreamError}, | ||
| multistream_select::{ | ||
| protocol::{ | ||
| webrtc_encode_multistream_message, HeaderLine, Message, MessageIO, Protocol, | ||
| ProtocolError, | ||
| ProtocolError, PROTO_MULTISTREAM_1_0 | ||
| }, | ||
| Negotiated, NegotiationError, Version, | ||
| }, | ||
|
|
@@ -357,24 +357,95 @@ impl WebRtcDialerState { | |
| &mut self, | ||
| payload: Vec<u8>, | ||
| ) -> Result<HandshakeResult, crate::error::NegotiationError> { | ||
| let Message::Protocols(protocols) = | ||
| Message::decode(payload.into()).map_err(|_| ParseError::InvalidData)? | ||
| else { | ||
| return Err(crate::error::NegotiationError::MultistreamSelectError( | ||
| NegotiationError::Failed, | ||
| )); | ||
| // All multistream-select messages are length-prefixed. Since this code path is not using | ||
| // multistream_select::protocol::MessageIO, we need to decode and remove the length here. | ||
| let mut remaining: &[u8] = &payload; | ||
| let (len, tail) = unsigned_varint::decode::usize(remaining). | ||
| map_err(|error| { | ||
| tracing::debug!( | ||
| target: LOG_TARGET, | ||
| ?error, | ||
| message = ?payload, | ||
| "Failed to decode length-prefix in multistream message"); | ||
| error::NegotiationError::ParseError(ParseError::InvalidData) | ||
| })?; | ||
|
|
||
| let payload = tail[..len].to_vec(); | ||
|
|
||
haikoschol marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| let message = Message::decode(payload.into()); | ||
haikoschol marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| tracing::trace!( | ||
| target: LOG_TARGET, | ||
| ?message, | ||
| "Decoded message while registering response", | ||
| ); | ||
|
|
||
| let mut protocols = match message { | ||
| Ok(Message::Header(HeaderLine::V1)) => { | ||
| vec![PROTO_MULTISTREAM_1_0] | ||
| } | ||
| Ok(Message::Protocol(protocol)) => vec![protocol], | ||
| Ok(Message::Protocols(protocols)) => protocols, | ||
| Ok(Message::NotAvailable) => { | ||
| return match &self.state { | ||
| HandshakeState::WaitingProtocol => | ||
| Err(error::NegotiationError::MultistreamSelectError( | ||
| NegotiationError::Failed | ||
| )), | ||
| _ => Err(error::NegotiationError::StateMismatch), | ||
| } | ||
| } | ||
| Ok(Message::ListProtocols) => return Err(error::NegotiationError::StateMismatch), | ||
| Err(_) => return Err(error::NegotiationError::ParseError(ParseError::InvalidData)), | ||
| }; | ||
|
|
||
| remaining = &tail[len..]; | ||
|
|
||
| loop { | ||
haikoschol marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| if remaining.is_empty() { | ||
| break; | ||
| } | ||
|
|
||
| let (len, tail) = unsigned_varint::decode::usize(remaining). | ||
| map_err(|error| { | ||
| tracing::debug!( | ||
| target: LOG_TARGET, | ||
| ?error, | ||
| message = ?remaining, | ||
| "Failed to decode length-prefix in multistream message"); | ||
| error::NegotiationError::ParseError(ParseError::InvalidData) | ||
| })?; | ||
|
|
||
| if len > tail.len() { | ||
| break; | ||
| } | ||
|
|
||
| let payload = tail[..len].to_vec(); | ||
|
|
||
| match Message::decode(payload.into()) { | ||
haikoschol marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| Ok(Message::Protocol(protocol)) => protocols.push(protocol), | ||
| Err(error) => { | ||
| tracing::debug!( | ||
| target: LOG_TARGET, | ||
| ?error, | ||
| message = ?tail[..len], | ||
| "Failed to decode multistream message", | ||
| ); | ||
| return Err(error::NegotiationError::ParseError(ParseError::InvalidData)) | ||
| }, | ||
| _ => return Err(error::NegotiationError::StateMismatch), | ||
| } | ||
|
|
||
| remaining = &tail[len..]; | ||
| } | ||
|
|
||
| let mut protocol_iter = protocols.into_iter(); | ||
| loop { | ||
| match (&self.state, protocol_iter.next()) { | ||
| (HandshakeState::WaitingResponse, None) => | ||
| return Err(crate::error::NegotiationError::StateMismatch), | ||
| (HandshakeState::WaitingResponse, Some(protocol)) => { | ||
| let header = Protocol::try_from(&b"/multistream/1.0.0"[..]) | ||
| .expect("valid multitstream-select header"); | ||
|
|
||
| if protocol == header { | ||
| if protocol == PROTO_MULTISTREAM_1_0 { | ||
| self.state = HandshakeState::WaitingProtocol; | ||
| } else { | ||
| return Err(crate::error::NegotiationError::MultistreamSelectError( | ||
|
|
@@ -410,7 +481,9 @@ mod tests { | |
| use super::*; | ||
| use crate::multistream_select::listener_select_proto; | ||
| use std::time::Duration; | ||
| use bytes::BufMut; | ||
| use tokio::net::{TcpListener, TcpStream}; | ||
| use crate::multistream_select::protocol::MSG_MULTISTREAM_1_0; | ||
|
|
||
| #[tokio::test] | ||
| async fn select_proto_basic() { | ||
|
|
@@ -755,23 +828,18 @@ mod tests { | |
| fn propose() { | ||
| let (mut dialer_state, message) = | ||
| WebRtcDialerState::propose(ProtocolName::from("/13371338/proto/1"), vec![]).unwrap(); | ||
| let message = bytes::BytesMut::from(&message[..]).freeze(); | ||
|
|
||
| let Message::Protocols(protocols) = Message::decode(message).unwrap() else { | ||
| panic!("invalid message type"); | ||
| }; | ||
| let mut bytes = BytesMut::with_capacity(32); | ||
| bytes.put_u8(MSG_MULTISTREAM_1_0.len() as u8); | ||
| let _ = Message::Header(HeaderLine::V1).encode(&mut bytes).unwrap(); | ||
|
|
||
| assert_eq!(protocols.len(), 2); | ||
| assert_eq!( | ||
| protocols[0], | ||
| Protocol::try_from(&b"/multistream/1.0.0"[..]) | ||
| .expect("valid multitstream-select header") | ||
| ); | ||
| assert_eq!( | ||
| protocols[1], | ||
| Protocol::try_from(&b"/13371338/proto/1"[..]) | ||
| .expect("valid multitstream-select header") | ||
| ); | ||
| let proto = Protocol::try_from(&b"/13371338/proto/1"[..]).expect("valid protocol name"); | ||
| bytes.put_u8((proto.as_ref().len() + 1) as u8); // + 1 for \n | ||
| let _ = Message::Protocol(proto).encode(&mut bytes).unwrap(); | ||
|
|
||
| let expected_message = bytes.freeze().to_vec(); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. dq: This slightly changes the meaning of the messages. Before we were expecting: Now we are expecting: Hmm, one of those representation can't be spec compliant right? Also are we ignoring malformated messages:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right now the test ensures that the wire format that |
||
|
|
||
| assert_eq!(message, expected_message); | ||
| } | ||
|
|
||
| #[test] | ||
|
|
@@ -781,41 +849,38 @@ mod tests { | |
| vec![ProtocolName::from("/sup/proto/1")], | ||
| ) | ||
| .unwrap(); | ||
| let message = bytes::BytesMut::from(&message[..]).freeze(); | ||
|
|
||
| let Message::Protocols(protocols) = Message::decode(message).unwrap() else { | ||
| panic!("invalid message type"); | ||
| }; | ||
| let mut bytes = BytesMut::with_capacity(32); | ||
| bytes.put_u8(MSG_MULTISTREAM_1_0.len() as u8); | ||
| let _ = Message::Header(HeaderLine::V1).encode(&mut bytes).unwrap(); | ||
|
|
||
| assert_eq!(protocols.len(), 3); | ||
| assert_eq!( | ||
| protocols[0], | ||
| Protocol::try_from(&b"/multistream/1.0.0"[..]) | ||
| .expect("valid multitstream-select header") | ||
| ); | ||
| assert_eq!( | ||
| protocols[1], | ||
| Protocol::try_from(&b"/13371338/proto/1"[..]) | ||
| .expect("valid multitstream-select header") | ||
| ); | ||
| assert_eq!( | ||
| protocols[2], | ||
| Protocol::try_from(&b"/sup/proto/1"[..]).expect("valid multitstream-select header") | ||
| ); | ||
| let proto1 = Protocol::try_from(&b"/13371338/proto/1"[..]).expect("valid protocol name"); | ||
| bytes.put_u8((proto1.as_ref().len() + 1) as u8); // + 1 for \n | ||
| let _ = Message::Protocol(proto1).encode(&mut bytes).unwrap(); | ||
|
|
||
| let proto2 = Protocol::try_from(&b"/sup/proto/1"[..]).expect("valid protocol name"); | ||
| bytes.put_u8((proto2.as_ref().len() + 1) as u8); // + 1 for \n | ||
| let _ = Message::Protocol(proto2).encode(&mut bytes).unwrap(); | ||
|
|
||
| let expected_message = bytes.freeze().to_vec(); | ||
|
|
||
| assert_eq!(message, expected_message); | ||
| } | ||
|
|
||
| #[test] | ||
| fn register_response_invalid_message() { | ||
| // send only header line | ||
| fn register_response_header_only() { | ||
| let mut bytes = BytesMut::with_capacity(32); | ||
| bytes.put_u8(MSG_MULTISTREAM_1_0.len() as u8); | ||
|
|
||
| let message = Message::Header(HeaderLine::V1); | ||
| message.encode(&mut bytes).map_err(|_| Error::InvalidData).unwrap(); | ||
|
|
||
| let (mut dialer_state, _message) = | ||
| WebRtcDialerState::propose(ProtocolName::from("/13371338/proto/1"), vec![]).unwrap(); | ||
|
|
||
| match dialer_state.register_response(bytes.freeze().to_vec()) { | ||
| Err(error::NegotiationError::MultistreamSelectError(NegotiationError::Failed)) => {} | ||
| Ok(HandshakeResult::NotReady) => {}, | ||
| Err(err) => panic!("unexpected error: {:?}", err), | ||
| event => panic!("invalid event: {event:?}"), | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -351,6 +351,14 @@ pub fn webrtc_listener_negotiate<'a>( | |
| supported_protocols: &'a mut impl Iterator<Item = &'a ProtocolName>, | ||
| payload: Bytes, | ||
| ) -> crate::Result<ListenerSelectResult> { | ||
| let payload = if payload.len() > 2 && payload[0..payload.len() - 2] != b"\n\n"[..] { | ||
|
||
| let mut buf = BytesMut::from(payload); | ||
| buf.extend_from_slice(b"\n"); | ||
| buf.freeze() | ||
| } else { | ||
| payload | ||
| }; | ||
|
|
||
| let Message::Protocols(protocols) = Message::decode(payload).map_err(|_| Error::InvalidData)? | ||
| else { | ||
| return Err(Error::NegotiationError( | ||
|
|
@@ -466,12 +474,7 @@ mod tests { | |
| message.encode(&mut bytes).map_err(|_| Error::InvalidData).unwrap(); | ||
|
|
||
| match webrtc_listener_negotiate(&mut local_protocols.iter(), bytes.freeze()) { | ||
| Err(error) => assert!(std::matches!( | ||
| error, | ||
| Error::NegotiationError(error::NegotiationError::MultistreamSelectError( | ||
| NegotiationError::Failed | ||
| )) | ||
| )), | ||
| Err(error) => assert!(std::matches!(error, Error::InvalidData)), | ||
| event => panic!("invalid event: {event:?}"), | ||
| } | ||
| } | ||
|
|
@@ -495,12 +498,7 @@ mod tests { | |
| message.encode(&mut bytes).map_err(|_| Error::InvalidData).unwrap(); | ||
|
|
||
| match webrtc_listener_negotiate(&mut local_protocols.iter(), bytes.freeze()) { | ||
| Err(error) => assert!(std::matches!( | ||
| error, | ||
| Error::NegotiationError(error::NegotiationError::MultistreamSelectError( | ||
| NegotiationError::Failed | ||
| )) | ||
| )), | ||
| Err(error) => assert!(std::matches!(error, Error::InvalidData)), | ||
| event => panic!("invalid event: {event:?}"), | ||
| } | ||
| } | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.