Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 73 additions & 36 deletions neqo-http3/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
#[derive(Display)]
pub enum SessionAcceptAction {
Accept,
/// Accept the session and include additional headers in the 200 response.
AcceptWith(Vec<Header>),
Reject(Vec<Header>),
}

Expand Down Expand Up @@ -1334,44 +1336,63 @@
}
Ok(())
}
(Some(s), Some(_r), SessionAcceptAction::Accept) => {
let mut response_headers = vec![Header::new(":status", "200")];
if connect_type == ExtendedConnectType::ConnectUdp {
response_headers.push(Header::new("capsule-protocol", "?1"));
}

if s.http_stream()
.ok_or(Error::InvalidStreamId)?
.send_headers(&response_headers, conn)
.is_ok()
{
let extended_conn = Rc::new(RefCell::new(
extended_connect::session::Session::new_with_http_streams(
stream_id,
events,
self.role,
self.recv_streams
.remove(&stream_id)
.ok_or(Error::Internal)?,
self.send_streams
.remove(&stream_id)
.ok_or(Error::Internal)?,
connect_type,
)?,
));
self.add_streams(
stream_id,
Box::new(Rc::clone(&extended_conn)),
Box::new(extended_conn),
);
self.streams_with_pending_data.insert(stream_id);
} else {
self.cancel_fetch(stream_id, Error::HttpRequestRejected.code(), conn)?;
return Err(Error::InvalidStreamId);
}
Ok(())
(Some(_), Some(_), SessionAcceptAction::Accept) => {
self.do_accept_extended_connect(conn, stream_id, events, connect_type, &[])
}
(Some(_), Some(_), SessionAcceptAction::AcceptWith(extra)) => {
self.do_accept_extended_connect(conn, stream_id, events, connect_type, extra)
}
}
}

fn do_accept_extended_connect(
&mut self,
conn: &mut Connection,
stream_id: StreamId,
events: Box<dyn ExtendedConnectEvents>,
connect_type: ExtendedConnectType,
extra_headers: &[Header],
) -> Res<()> {
let mut response_headers = vec![Header::new(":status", "200")];
if connect_type == ExtendedConnectType::ConnectUdp {

Check warning on line 1357 in neqo-http3/src/connection.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace == with != in Http3Connection::do_accept_extended_connect
response_headers.push(Header::new("capsule-protocol", "?1"));
}
response_headers.extend_from_slice(extra_headers);

if self
.send_streams
.get_mut(&stream_id)
.ok_or(Error::InvalidStreamId)?
.http_stream()
.ok_or(Error::InvalidStreamId)?
.send_headers(&response_headers, conn)
.is_ok()
{
let extended_conn = Rc::new(RefCell::new(
extended_connect::session::Session::new_with_http_streams(
stream_id,
events,
self.role,
self.recv_streams
.remove(&stream_id)
.ok_or(Error::Internal)?,
self.send_streams
.remove(&stream_id)
.ok_or(Error::Internal)?,
connect_type,
)?,
));
self.add_streams(
stream_id,
Box::new(Rc::clone(&extended_conn)),
Box::new(extended_conn),
);
self.streams_with_pending_data.insert(stream_id);
} else {
self.cancel_fetch(stream_id, Error::HttpRequestRejected.code(), conn)?;
return Err(Error::InvalidStreamId);
}
Ok(())
}

pub(crate) fn webtransport_close_session(
Expand All @@ -1398,6 +1419,22 @@
})
}

/// Get the negotiated protocol for a WebTransport session.
///
/// Returns `Ok(None)` if no protocol was negotiated.
/// Returns an error if the session does not exist or is not an extended CONNECT session.
pub(crate) fn webtransport_session_protocol(
&self,
session_id: StreamId,
) -> Res<Option<String>> {
let stream = self
.send_streams
.get(&session_id)
.filter(|s| s.stream_type() == Http3StreamType::ExtendedConnect)
.ok_or(Error::InvalidStreamId)?;
Ok(stream.session_protocol())
}

pub(crate) fn connect_udp_close_session(
&mut self,
conn: &mut Connection,
Expand Down
18 changes: 18 additions & 0 deletions neqo-http3/src/connection_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1414,6 +1414,24 @@ impl Http3Client {
pub const fn webtransport_enabled(&self) -> bool {
self.base_handler.webtransport_enabled()
}

/// Get the negotiated subprotocol for a WebTransport session.
///
/// Returns the parsed protocol string from the server's `wt-protocol` response header
/// (an [RFC 8941 Item](https://www.rfc-editor.org/rfc/rfc8941.html#name-items)),
/// or `None` if the server did not include a `wt-protocol` header (or its value was
/// not a valid sf-string).
///
/// **Note:** this returns the server's selected protocol without validating it against the
/// list of protocols offered by the client. Callers are responsible for checking that the
/// returned protocol was among those originally offered.
///
/// # Errors
///
/// Returns error if the session ID is invalid.
pub fn webtransport_session_protocol(&self, session_id: StreamId) -> Res<Option<String>> {
self.base_handler.webtransport_session_protocol(session_id)
}
}

impl EventProvider for Http3Client {
Expand Down
12 changes: 12 additions & 0 deletions neqo-http3/src/features/extended_connect/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ impl Session {
);
State::Done
} else {
self.protocol.process_response_headers(&headers);

self.events.session_start(
self.protocol.connect_type(),
self.id,
Expand Down Expand Up @@ -459,6 +461,10 @@ impl Stream for Rc<RefCell<Session>> {
fn stream_type(&self) -> Http3StreamType {
Http3StreamType::ExtendedConnect
}

fn session_protocol(&self) -> Option<String> {
self.borrow().protocol.protocol().map(ToString::to_string)
}
}

impl RecvStream for Rc<RefCell<Session>> {
Expand Down Expand Up @@ -591,6 +597,12 @@ pub(crate) trait Protocol: Debug + Display {
(HashSet::default(), HashSet::default())
}

fn process_response_headers(&mut self, _headers: &[Header]) {}
Comment thread
jesup marked this conversation as resolved.

fn protocol(&self) -> Option<&str> {
None
Comment thread
jesup marked this conversation as resolved.
Comment thread
jesup marked this conversation as resolved.
}

fn write_datagram_prefix(&self, encoder: &mut Encoder);

fn dgram_context_id(&self, datagram: Bytes) -> Result<Bytes, DgramContextIdError>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -549,3 +549,88 @@ fn wt_goaway_draining_rejected_session() {
"Draining must be emitted before SessionClosed for rejected sessions"
);
}

#[test]
fn wt_session_protocol_none_when_no_header() {
let mut wt = WtTest::new();
let wt_session = wt.create_wt_session();
let session_id = wt_session.stream_id();
assert_eq!(
wt.client.webtransport_session_protocol(session_id).unwrap(),
None
);
}

#[test]
fn wt_session_protocol_quoted_value() {
let mut wt = WtTest::new();
let (session_id, _) =
wt.negotiate_wt_session(&SessionAcceptAction::AcceptWith(vec![Header::new(
"wt-protocol",
r#""myproto""#,
)]));
assert_eq!(
wt.client.webtransport_session_protocol(session_id).unwrap(),
Some("myproto".to_string())
);
}

#[test]
fn wt_session_protocol_parameters_stripped() {
let mut wt = WtTest::new();
let (session_id, _) =
wt.negotiate_wt_session(&SessionAcceptAction::AcceptWith(vec![Header::new(
"wt-protocol",
r#""myproto"; foo=bar; baz=2"#,
)]));
assert_eq!(
wt.client.webtransport_session_protocol(session_id).unwrap(),
Some("myproto".to_string())
);
}

#[test]
fn wt_session_protocol_malformed_unquoted_rejected() {
let mut wt = WtTest::new();
let (session_id, _) =
wt.negotiate_wt_session(&SessionAcceptAction::AcceptWith(vec![Header::new(
"wt-protocol",
"myproto",
)]));
assert_eq!(
wt.client.webtransport_session_protocol(session_id).unwrap(),
None
);
}

#[test]
fn wt_session_protocol_invalid_stream_id() {
let wt = WtTest::new();
assert_eq!(
wt.client
.webtransport_session_protocol(StreamId::new(9999))
.unwrap_err(),
Error::InvalidStreamId
);
}

#[test]
fn wt_session_protocol_non_webtransport_session() {
let mut wt = WtTest::new();
let stream_id = wt
.client
.fetch(
now(),
"GET",
("https", "something.com", "/"),
&[],
Priority::default(),
)
.unwrap();
assert_eq!(
wt.client
.webtransport_session_protocol(stream_id)
.unwrap_err(),
Error::InvalidStreamId
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ use std::{
time::Instant,
};

use neqo_common::{Bytes, Encoder, Role, qtrace};
use neqo_common::{Bytes, Encoder, Header, Role, qtrace};
use neqo_transport::{Connection, Error as TransportError, StreamId};
use sfv::{BareItem, Item, Parser};

use crate::{
Error, Http3StreamInfo, Http3StreamType, RecvStream, Res, SendStream,
Expand All @@ -34,6 +35,8 @@ pub struct Session {
///
/// [`HashSet`] size limited by QUIC connection stream limit.
pending_streams: HashSet<StreamId>,
/// The negotiated protocol from server response headers.
negotiated_protocol: Option<String>,
}

impl Display for Session {
Expand All @@ -52,6 +55,7 @@ impl Session {
recv_streams: HashSet::default(),
role,
pending_streams: HashSet::default(),
negotiated_protocol: None,
}
}
}
Expand Down Expand Up @@ -201,6 +205,24 @@ impl Protocol for Session {
)
}

fn process_response_headers(&mut self, headers: &[Header]) {
self.negotiated_protocol = headers
.iter()
.find(|h| h.name().eq_ignore_ascii_case("wt-protocol"))
.and_then(|h| Parser::new(h.value()).parse::<Item>().ok())
.and_then(|item| {
if let BareItem::String(s) = item.bare_item {
Some(s.into())
} else {
None
}
});
}

fn protocol(&self) -> Option<&str> {
self.negotiated_protocol.as_deref()
}

fn write_datagram_prefix(&self, _encoder: &mut Encoder) {
// WebTransport does not add prefix (i.e. context ID).
}
Expand Down
6 changes: 6 additions & 0 deletions neqo-http3/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,12 @@

trait Stream: Debug {
fn stream_type(&self) -> Http3StreamType;

// Unreachable: callers filter by ExtendedConnect before calling.
#[cfg_attr(coverage_nightly, coverage(off))]
fn session_protocol(&self) -> Option<String> {
None

Check warning on line 440 in neqo-http3/src/lib.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace Stream::session_protocol -> Option<String> with Some("xyzzy".into())

Check warning on line 440 in neqo-http3/src/lib.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace Stream::session_protocol -> Option<String> with Some(String::new())
}
}

trait RecvStream: Stream {
Expand Down
10 changes: 10 additions & 0 deletions neqo-http3/tests/connect_udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -669,3 +669,13 @@ fn session_lifecycle_with_http_datagram_capsule() {

qinfo!("HTTP DATAGRAM Capsule test completed successfully");
}

#[test]
fn connect_udp_session_protocol_returns_none() {
fixture_init();
let (client, _proxy, session_id, _proxy_session) = establish_new_session();
assert_eq!(
client.webtransport_session_protocol(session_id).unwrap(),
None
);
}
Loading