Skip to content

Commit b585a81

Browse files
authored
Get the negotiated protocol to support WebTransport (#3466)
1 parent a240086 commit b585a81

7 files changed

Lines changed: 227 additions & 37 deletions

File tree

neqo-http3/src/connection.rs

Lines changed: 73 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ pub struct RequestDescription<'b, T: RequestTarget> {
6060
#[derive(Display)]
6161
pub enum SessionAcceptAction {
6262
Accept,
63+
/// Accept the session and include additional headers in the 200 response.
64+
AcceptWith(Vec<Header>),
6365
Reject(Vec<Header>),
6466
}
6567

@@ -1334,44 +1336,63 @@ impl Http3Connection {
13341336
}
13351337
Ok(())
13361338
}
1337-
(Some(s), Some(_r), SessionAcceptAction::Accept) => {
1338-
let mut response_headers = vec![Header::new(":status", "200")];
1339-
if connect_type == ExtendedConnectType::ConnectUdp {
1340-
response_headers.push(Header::new("capsule-protocol", "?1"));
1341-
}
1342-
1343-
if s.http_stream()
1344-
.ok_or(Error::InvalidStreamId)?
1345-
.send_headers(&response_headers, conn)
1346-
.is_ok()
1347-
{
1348-
let extended_conn = Rc::new(RefCell::new(
1349-
extended_connect::session::Session::new_with_http_streams(
1350-
stream_id,
1351-
events,
1352-
self.role,
1353-
self.recv_streams
1354-
.remove(&stream_id)
1355-
.ok_or(Error::Internal)?,
1356-
self.send_streams
1357-
.remove(&stream_id)
1358-
.ok_or(Error::Internal)?,
1359-
connect_type,
1360-
)?,
1361-
));
1362-
self.add_streams(
1363-
stream_id,
1364-
Box::new(Rc::clone(&extended_conn)),
1365-
Box::new(extended_conn),
1366-
);
1367-
self.streams_with_pending_data.insert(stream_id);
1368-
} else {
1369-
self.cancel_fetch(stream_id, Error::HttpRequestRejected.code(), conn)?;
1370-
return Err(Error::InvalidStreamId);
1371-
}
1372-
Ok(())
1339+
(Some(_), Some(_), SessionAcceptAction::Accept) => {
1340+
self.do_accept_extended_connect(conn, stream_id, events, connect_type, &[])
13731341
}
1342+
(Some(_), Some(_), SessionAcceptAction::AcceptWith(extra)) => {
1343+
self.do_accept_extended_connect(conn, stream_id, events, connect_type, extra)
1344+
}
1345+
}
1346+
}
1347+
1348+
fn do_accept_extended_connect(
1349+
&mut self,
1350+
conn: &mut Connection,
1351+
stream_id: StreamId,
1352+
events: Box<dyn ExtendedConnectEvents>,
1353+
connect_type: ExtendedConnectType,
1354+
extra_headers: &[Header],
1355+
) -> Res<()> {
1356+
let mut response_headers = vec![Header::new(":status", "200")];
1357+
if connect_type == ExtendedConnectType::ConnectUdp {
1358+
response_headers.push(Header::new("capsule-protocol", "?1"));
1359+
}
1360+
response_headers.extend_from_slice(extra_headers);
1361+
1362+
if self
1363+
.send_streams
1364+
.get_mut(&stream_id)
1365+
.ok_or(Error::InvalidStreamId)?
1366+
.http_stream()
1367+
.ok_or(Error::InvalidStreamId)?
1368+
.send_headers(&response_headers, conn)
1369+
.is_ok()
1370+
{
1371+
let extended_conn = Rc::new(RefCell::new(
1372+
extended_connect::session::Session::new_with_http_streams(
1373+
stream_id,
1374+
events,
1375+
self.role,
1376+
self.recv_streams
1377+
.remove(&stream_id)
1378+
.ok_or(Error::Internal)?,
1379+
self.send_streams
1380+
.remove(&stream_id)
1381+
.ok_or(Error::Internal)?,
1382+
connect_type,
1383+
)?,
1384+
));
1385+
self.add_streams(
1386+
stream_id,
1387+
Box::new(Rc::clone(&extended_conn)),
1388+
Box::new(extended_conn),
1389+
);
1390+
self.streams_with_pending_data.insert(stream_id);
1391+
} else {
1392+
self.cancel_fetch(stream_id, Error::HttpRequestRejected.code(), conn)?;
1393+
return Err(Error::InvalidStreamId);
13741394
}
1395+
Ok(())
13751396
}
13761397

13771398
pub(crate) fn webtransport_close_session(
@@ -1398,6 +1419,22 @@ impl Http3Connection {
13981419
})
13991420
}
14001421

1422+
/// Get the negotiated protocol for a WebTransport session.
1423+
///
1424+
/// Returns `Ok(None)` if no protocol was negotiated.
1425+
/// Returns an error if the session does not exist or is not an extended CONNECT session.
1426+
pub(crate) fn webtransport_session_protocol(
1427+
&self,
1428+
session_id: StreamId,
1429+
) -> Res<Option<String>> {
1430+
let stream = self
1431+
.send_streams
1432+
.get(&session_id)
1433+
.filter(|s| s.stream_type() == Http3StreamType::ExtendedConnect)
1434+
.ok_or(Error::InvalidStreamId)?;
1435+
Ok(stream.session_protocol())
1436+
}
1437+
14011438
pub(crate) fn connect_udp_close_session(
14021439
&mut self,
14031440
conn: &mut Connection,

neqo-http3/src/connection_client.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1414,6 +1414,24 @@ impl Http3Client {
14141414
pub const fn webtransport_enabled(&self) -> bool {
14151415
self.base_handler.webtransport_enabled()
14161416
}
1417+
1418+
/// Get the negotiated subprotocol for a WebTransport session.
1419+
///
1420+
/// Returns the parsed protocol string from the server's `wt-protocol` response header
1421+
/// (an [RFC 8941 Item](https://www.rfc-editor.org/rfc/rfc8941.html#name-items)),
1422+
/// or `None` if the server did not include a `wt-protocol` header (or its value was
1423+
/// not a valid sf-string).
1424+
///
1425+
/// **Note:** this returns the server's selected protocol without validating it against the
1426+
/// list of protocols offered by the client. Callers are responsible for checking that the
1427+
/// returned protocol was among those originally offered.
1428+
///
1429+
/// # Errors
1430+
///
1431+
/// Returns error if the session ID is invalid.
1432+
pub fn webtransport_session_protocol(&self, session_id: StreamId) -> Res<Option<String>> {
1433+
self.base_handler.webtransport_session_protocol(session_id)
1434+
}
14171435
}
14181436

14191437
impl EventProvider for Http3Client {

neqo-http3/src/features/extended_connect/session.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,8 @@ impl Session {
296296
);
297297
State::Done
298298
} else {
299+
self.protocol.process_response_headers(&headers);
300+
299301
self.events.session_start(
300302
self.protocol.connect_type(),
301303
self.id,
@@ -459,6 +461,10 @@ impl Stream for Rc<RefCell<Session>> {
459461
fn stream_type(&self) -> Http3StreamType {
460462
Http3StreamType::ExtendedConnect
461463
}
464+
465+
fn session_protocol(&self) -> Option<String> {
466+
self.borrow().protocol.protocol().map(ToString::to_string)
467+
}
462468
}
463469

464470
impl RecvStream for Rc<RefCell<Session>> {
@@ -591,6 +597,12 @@ pub(crate) trait Protocol: Debug + Display {
591597
(HashSet::default(), HashSet::default())
592598
}
593599

600+
fn process_response_headers(&mut self, _headers: &[Header]) {}
601+
602+
fn protocol(&self) -> Option<&str> {
603+
None
604+
}
605+
594606
fn write_datagram_prefix(&self, encoder: &mut Encoder);
595607

596608
fn dgram_context_id(&self, datagram: Bytes) -> Result<Bytes, DgramContextIdError>;

neqo-http3/src/features/extended_connect/tests/webtransport/sessions.rs

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,3 +549,88 @@ fn wt_goaway_draining_rejected_session() {
549549
"Draining must be emitted before SessionClosed for rejected sessions"
550550
);
551551
}
552+
553+
#[test]
554+
fn wt_session_protocol_none_when_no_header() {
555+
let mut wt = WtTest::new();
556+
let wt_session = wt.create_wt_session();
557+
let session_id = wt_session.stream_id();
558+
assert_eq!(
559+
wt.client.webtransport_session_protocol(session_id).unwrap(),
560+
None
561+
);
562+
}
563+
564+
#[test]
565+
fn wt_session_protocol_quoted_value() {
566+
let mut wt = WtTest::new();
567+
let (session_id, _) =
568+
wt.negotiate_wt_session(&SessionAcceptAction::AcceptWith(vec![Header::new(
569+
"wt-protocol",
570+
r#""myproto""#,
571+
)]));
572+
assert_eq!(
573+
wt.client.webtransport_session_protocol(session_id).unwrap(),
574+
Some("myproto".to_string())
575+
);
576+
}
577+
578+
#[test]
579+
fn wt_session_protocol_parameters_stripped() {
580+
let mut wt = WtTest::new();
581+
let (session_id, _) =
582+
wt.negotiate_wt_session(&SessionAcceptAction::AcceptWith(vec![Header::new(
583+
"wt-protocol",
584+
r#""myproto"; foo=bar; baz=2"#,
585+
)]));
586+
assert_eq!(
587+
wt.client.webtransport_session_protocol(session_id).unwrap(),
588+
Some("myproto".to_string())
589+
);
590+
}
591+
592+
#[test]
593+
fn wt_session_protocol_malformed_unquoted_rejected() {
594+
let mut wt = WtTest::new();
595+
let (session_id, _) =
596+
wt.negotiate_wt_session(&SessionAcceptAction::AcceptWith(vec![Header::new(
597+
"wt-protocol",
598+
"myproto",
599+
)]));
600+
assert_eq!(
601+
wt.client.webtransport_session_protocol(session_id).unwrap(),
602+
None
603+
);
604+
}
605+
606+
#[test]
607+
fn wt_session_protocol_invalid_stream_id() {
608+
let wt = WtTest::new();
609+
assert_eq!(
610+
wt.client
611+
.webtransport_session_protocol(StreamId::new(9999))
612+
.unwrap_err(),
613+
Error::InvalidStreamId
614+
);
615+
}
616+
617+
#[test]
618+
fn wt_session_protocol_non_webtransport_session() {
619+
let mut wt = WtTest::new();
620+
let stream_id = wt
621+
.client
622+
.fetch(
623+
now(),
624+
"GET",
625+
("https", "something.com", "/"),
626+
&[],
627+
Priority::default(),
628+
)
629+
.unwrap();
630+
assert_eq!(
631+
wt.client
632+
.webtransport_session_protocol(stream_id)
633+
.unwrap_err(),
634+
Error::InvalidStreamId
635+
);
636+
}

neqo-http3/src/features/extended_connect/webtransport_session.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ use std::{
1111
time::Instant,
1212
};
1313

14-
use neqo_common::{Bytes, Encoder, Role, qtrace};
14+
use neqo_common::{Bytes, Encoder, Header, Role, qtrace};
1515
use neqo_transport::{Connection, Error as TransportError, StreamId};
16+
use sfv::{BareItem, Item, Parser};
1617

1718
use crate::{
1819
Error, Http3StreamInfo, Http3StreamType, RecvStream, Res, SendStream,
@@ -34,6 +35,8 @@ pub struct Session {
3435
///
3536
/// [`HashSet`] size limited by QUIC connection stream limit.
3637
pending_streams: HashSet<StreamId>,
38+
/// The negotiated protocol from server response headers.
39+
negotiated_protocol: Option<String>,
3740
}
3841

3942
impl Display for Session {
@@ -52,6 +55,7 @@ impl Session {
5255
recv_streams: HashSet::default(),
5356
role,
5457
pending_streams: HashSet::default(),
58+
negotiated_protocol: None,
5559
}
5660
}
5761
}
@@ -201,6 +205,24 @@ impl Protocol for Session {
201205
)
202206
}
203207

208+
fn process_response_headers(&mut self, headers: &[Header]) {
209+
self.negotiated_protocol = headers
210+
.iter()
211+
.find(|h| h.name().eq_ignore_ascii_case("wt-protocol"))
212+
.and_then(|h| Parser::new(h.value()).parse::<Item>().ok())
213+
.and_then(|item| {
214+
if let BareItem::String(s) = item.bare_item {
215+
Some(s.into())
216+
} else {
217+
None
218+
}
219+
});
220+
}
221+
222+
fn protocol(&self) -> Option<&str> {
223+
self.negotiated_protocol.as_deref()
224+
}
225+
204226
fn write_datagram_prefix(&self, _encoder: &mut Encoder) {
205227
// WebTransport does not add prefix (i.e. context ID).
206228
}

neqo-http3/src/lib.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,12 @@ enum ReceiveOutput {
433433

434434
trait Stream: Debug {
435435
fn stream_type(&self) -> Http3StreamType;
436+
437+
// Unreachable: callers filter by ExtendedConnect before calling.
438+
#[cfg_attr(coverage_nightly, coverage(off))]
439+
fn session_protocol(&self) -> Option<String> {
440+
None
441+
}
436442
}
437443

438444
trait RecvStream: Stream {

neqo-http3/tests/connect_udp.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -669,3 +669,13 @@ fn session_lifecycle_with_http_datagram_capsule() {
669669

670670
qinfo!("HTTP DATAGRAM Capsule test completed successfully");
671671
}
672+
673+
#[test]
674+
fn connect_udp_session_protocol_returns_none() {
675+
fixture_init();
676+
let (client, _proxy, session_id, _proxy_session) = establish_new_session();
677+
assert_eq!(
678+
client.webtransport_session_protocol(session_id).unwrap(),
679+
None
680+
);
681+
}

0 commit comments

Comments
 (0)