Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
ae0d082
perf(http3): eliminate allocations with zero-copy `DatagramPayload``
larseggert Sep 25, 2025
8aaa3c2
Minimize diff
larseggert Sep 25, 2025
013518b
Update neqo-http3/src/features/extended_connect/session.rs
larseggert Sep 25, 2025
0a53834
Update neqo-http3/src/lib.rs
larseggert Sep 25, 2025
1776fe8
Fixes
larseggert Sep 25, 2025
af45985
Update neqo-http3/src/lib.rs
larseggert Sep 25, 2025
6cebdba
Use Bytes
larseggert Sep 30, 2025
86bfd5b
Merge branch 'main' into fix-2854-v2
larseggert Sep 30, 2025
618f3c0
Update neqo-http3/src/lib.rs
larseggert Sep 30, 2025
7651468
Update neqo-http3/src/features/extended_connect/session.rs
larseggert Sep 30, 2025
1bdaaae
fmt
larseggert Sep 30, 2025
f1cb050
Update neqo-http3/src/features/extended_connect/session.rs
larseggert Sep 30, 2025
906b4dd
clippy
larseggert Sep 30, 2025
d6b102f
Merge branch 'fix-2854-v2' of github.com:larseggert/neqo into fix-285…
larseggert Sep 30, 2025
bc0f298
import
larseggert Sep 30, 2025
54b031f
Update neqo-http3/src/features/extended_connect/session.rs
larseggert Oct 7, 2025
d6fe04b
No more Bytes dep
larseggert Oct 8, 2025
29c4633
Update neqo-common/src/bytes.rs
larseggert Oct 8, 2025
8b28568
Comments from @mxinden
larseggert Oct 9, 2025
f118659
Merge branch 'fix-2854-v2' of github.com:larseggert/neqo into fix-285…
larseggert Oct 9, 2025
398eaca
Merge branch 'main' into fix-2854-v2
larseggert Oct 9, 2025
5dd98cf
Final fixes
larseggert Oct 10, 2025
5272e0a
Fixes
larseggert Oct 13, 2025
a04ce1e
Update neqo-common/src/bytes.rs
larseggert Oct 13, 2025
586f50d
impl From<Vec<u8>> for Bytes
larseggert Oct 13, 2025
e795ac1
Merge branch 'main' into fix-2854-v2
larseggert Oct 13, 2025
6c9fe27
feat(common/datagram): support Bytes as data store
mxinden Oct 13, 2025
04d26ed
Merge pull request #38 from mxinden/fix-2854-v2
larseggert Oct 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions neqo-common/src/bytes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.

/// Owned contiguous byte array with an optional offset.
///
/// Inspired by `bytes` crate's `Bytes`.
#[derive(Debug, Clone)]
pub struct Bytes {
data: Vec<u8>,
offset: usize,
}

impl Bytes {
/// Create a new `Bytes` with the given data and offset.
///
/// # Panics
///
/// Panics if `offset > data.len()`.
#[must_use]
pub fn new(data: Vec<u8>, offset: usize) -> Self {
assert!(
offset <= data.len(),
"offset {offset} is out of bounds for data of length {}",
data.len()
);
Self { data, offset }
}

#[must_use]
pub fn len(&self) -> usize {
self.data.len() - self.offset
}

#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0

Check warning on line 39 in neqo-common/src/bytes.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace Bytes::is_empty -> bool with true
}

/// Skips the first `n` bytes, consuming and returning `self`.
///
/// # Panics
///
/// Panics if `n > self.len()`.
#[must_use]
pub fn skip(mut self, n: usize) -> Self {
assert!(
n <= self.len(),
"cannot skip {n} bytes when only {} bytes remain",
self.len()
);
self.offset += n;
self
}
}
Comment thread
larseggert marked this conversation as resolved.

impl AsRef<[u8]> for Bytes {
fn as_ref(&self) -> &[u8] {
&self.data[self.offset..]
}
}

impl AsMut<[u8]> for Bytes {
fn as_mut(&mut self) -> &mut [u8] {
&mut self.data[self.offset..]

Check warning on line 67 in neqo-common/src/bytes.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace <impl AsMut<[u8]> for Bytes>::as_mut -> &mut[u8] with Vec::leak(vec![1])

Check warning on line 67 in neqo-common/src/bytes.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace <impl AsMut<[u8]> for Bytes>::as_mut -> &mut[u8] with Vec::leak(vec![0])

Check warning on line 67 in neqo-common/src/bytes.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace <impl AsMut<[u8]> for Bytes>::as_mut -> &mut[u8] with Vec::leak(Vec::new())
}
}

impl PartialEq for Bytes {
fn eq(&self, other: &Self) -> bool {
self.as_ref() == other.as_ref()

Check warning on line 73 in neqo-common/src/bytes.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace <impl PartialEq for Bytes>::eq -> bool with true
}
}

impl Eq for Bytes {}

impl From<Vec<u8>> for Bytes {
fn from(data: Vec<u8>) -> Self {
Self::new(data, 0)
}
}

impl<const N: usize> PartialEq<[u8; N]> for Bytes {
fn eq(&self, other: &[u8; N]) -> bool {
self.as_ref() == other.as_slice()

Check warning on line 87 in neqo-common/src/bytes.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace == with != in <impl PartialEq<[u8; N]> for Bytes>::eq

Check warning on line 87 in neqo-common/src/bytes.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace <impl PartialEq<[u8; N]> for Bytes>::eq -> bool with false

Check warning on line 87 in neqo-common/src/bytes.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace <impl PartialEq<[u8; N]> for Bytes>::eq -> bool with true
}
}

impl PartialEq<[u8]> for Bytes {
fn eq(&self, other: &[u8]) -> bool {
self.as_ref() == other

Check warning on line 93 in neqo-common/src/bytes.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace <impl PartialEq<[u8]> for Bytes>::eq -> bool with false

Check warning on line 93 in neqo-common/src/bytes.rs

View workflow job for this annotation

GitHub Actions / Find mutants

Missed mutant

replace <impl PartialEq<[u8]> for Bytes>::eq -> bool with true
}
}

#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use crate::Bytes;

#[test]
#[should_panic(expected = "offset 4 is out of bounds for data of length 3")]
fn illegal_offset() {
_ = Bytes::new(vec![1, 2, 3], 4);
}

#[test]
fn len() {
let b = Bytes::new(vec![1, 2, 3, 4], 1);
assert_eq!(b.len(), 3);
}

#[test]
fn is_empty() {
let b = Bytes::new(vec![1, 2, 3, 4], 4);
assert!(b.is_empty());
}

#[test]
fn skip() {
let b = Bytes::new(vec![1, 2, 3, 4], 1).skip(2);
assert_eq!(b.as_ref(), &[4]);
}

#[test]
#[should_panic(expected = "cannot skip 4 bytes when only 3 bytes remain")]
fn illegal_skip() {
_ = Bytes::new(vec![1, 2, 3, 4], 1).skip(4);
}

#[test]
fn is_equal() {
let a = Bytes::new(vec![1, 2, 3, 4], 1);
let b = Bytes::from(vec![2, 3, 4]);
assert_eq!(a, b);
}
}
9 changes: 8 additions & 1 deletion neqo-common/src/datagram.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{
ops::{Deref, DerefMut},
};

use crate::{hex_with_len, Tos};
use crate::{hex_with_len, Bytes, Tos};

#[derive(Clone, PartialEq, Eq)]
pub struct Datagram<D = Vec<u8>> {
Expand Down Expand Up @@ -130,6 +130,13 @@ impl<'a> Datagram<&'a mut [u8]> {
}
}

impl Datagram<Bytes> {
#[must_use]
pub const fn from_bytes(src: SocketAddr, dst: SocketAddr, tos: Tos, d: Bytes) -> Self {
Self { src, dst, tos, d }
}
}

impl<D: AsRef<[u8]>> AsRef<[u8]> for Datagram<D> {
fn as_ref(&self) -> &[u8] {
self.d.as_ref()
Expand Down
2 changes: 2 additions & 0 deletions neqo-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

#![cfg_attr(coverage_nightly, feature(coverage_attribute))]

pub mod bytes;
mod codec;
mod datagram;
pub mod event;
Expand All @@ -26,6 +27,7 @@ use strum::Display;
#[cfg(feature = "build-fuzzing-corpus")]
pub use self::fuzz::write_item_to_fuzzing_corpus;
pub use self::{
bytes::Bytes,
codec::{Buffer, Decoder, Encoder, MAX_VARINT},
datagram::{Datagram, DatagramBatch},
header::Header,
Expand Down
8 changes: 4 additions & 4 deletions neqo-http3/src/client_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

use std::{cell::RefCell, collections::VecDeque, rc::Rc};

use neqo_common::{event::Provider as EventProvider, qtrace, Header};
use neqo_common::{event::Provider as EventProvider, qtrace, Bytes, Header};
use neqo_crypto::ResumptionToken;
use neqo_transport::{AppError, StreamId, StreamType};

Expand Down Expand Up @@ -40,7 +40,7 @@ pub enum WebTransportEvent {
},
Datagram {
session_id: StreamId,
datagram: Vec<u8>,
datagram: Bytes,
},
}

Expand All @@ -62,7 +62,7 @@ pub enum ConnectUdpEvent {
},
Datagram {
session_id: StreamId,
datagram: Vec<u8>,
datagram: Bytes,
},
}

Expand Down Expand Up @@ -283,7 +283,7 @@ impl ExtendedConnectEvents for Http3ClientEvents {
fn new_datagram(
&self,
session_id: StreamId,
datagram: Vec<u8>,
datagram: Bytes,
connect_type: ExtendedConnectType,
) {
let event = match connect_type {
Expand Down
24 changes: 17 additions & 7 deletions neqo-http3/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use std::{
time::Instant,
};

use neqo_common::{qdebug, qerror, qinfo, qtrace, qwarn, Decoder, Header, MessageType, Role};
use neqo_common::{
qdebug, qerror, qinfo, qtrace, qwarn, Bytes, Decoder, Header, MessageType, Role,
};
use neqo_qpack as qpack;
use neqo_transport::{
streams::SendOrder, AppError, CloseReason, Connection, DatagramTracking, State, StreamId,
Expand Down Expand Up @@ -660,18 +662,26 @@ impl Http3Connection {
}
}

pub(crate) fn handle_datagram(&mut self, datagram: &[u8]) {
let mut decoder = Decoder::new(datagram);
let Some(stream) = decoder
.decode_varint()
.and_then(|id| self.recv_streams.get_mut(&StreamId::from(id * 4)))
pub(crate) fn handle_datagram(&mut self, datagram: Vec<u8>) {
let mut decoder = Decoder::new(&datagram);
let Some(id) = decoder.decode_varint() else {
qdebug!("[{self}] handle_datagram: failed to decode session ID");
return;
};
let varint_len = decoder.offset();

let Some(stream) = self
.recv_streams
.get_mut(&StreamId::from(id * 4))
.and_then(|s| s.extended_connect_session())
else {
qdebug!("[{self}] handle_datagram for unknown extended connect session");
return;
};

stream.borrow_mut().datagram(decoder.as_ref());
stream
.borrow_mut()
.datagram(Bytes::new(datagram, varint_len));
}

fn check_stream_exists(&self, stream_type: Http3StreamType) -> Res<()> {
Expand Down
2 changes: 1 addition & 1 deletion neqo-http3/src/connection_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1157,7 +1157,7 @@ impl Http3Client {
}
}
ConnectionEvent::Datagram(dgram) => {
self.base_handler.handle_datagram(&dgram);
self.base_handler.handle_datagram(dgram);
}
ConnectionEvent::SendStreamComplete { .. }
| ConnectionEvent::OutgoingDatagramOutcome { .. }
Expand Down
2 changes: 1 addition & 1 deletion neqo-http3/src/connection_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ impl Http3ServerHandler {
s.stream_writable();
}
}
ConnectionEvent::Datagram(dgram) => self.base_handler.handle_datagram(&dgram),
ConnectionEvent::Datagram(dgram) => self.base_handler.handle_datagram(dgram),
ConnectionEvent::AuthenticationNeeded
| ConnectionEvent::EchFallbackAuthenticationNeeded { .. }
| ConnectionEvent::ZeroRttRejected
Expand Down
26 changes: 17 additions & 9 deletions neqo-http3/src/features/extended_connect/connect_udp_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::{
time::Instant,
};

use neqo_common::{Decoder, Encoder};
use neqo_common::{Bytes, Decoder, Encoder};
use neqo_transport::{Connection, StreamId};

use crate::{
Expand Down Expand Up @@ -88,10 +88,13 @@ impl Protocol for Session {
encoder.encode_varint(0u64);
}

fn dgram_context_id<'a>(&self, datagram: &'a [u8]) -> Result<&'a [u8], DgramContextIdError> {
let mut decoder = Decoder::new(datagram);
match decoder.decode_varint() {
Some(0) => Ok(decoder.decode_remainder()),
fn dgram_context_id(&self, datagram: Bytes) -> Result<Bytes, DgramContextIdError> {
let (context_id, offset) = {
let mut decoder = Decoder::new(datagram.as_ref());
(decoder.decode_varint(), decoder.offset())
};
match context_id {
Some(0) => Ok(datagram.skip(offset)),
Some(context_id) => Err(DgramContextIdError::UnknownIdentifier(context_id)),
None => {
// > all HTTP Datagrams associated with UDP Proxying request streams start with a Context ID field;
Expand All @@ -105,6 +108,7 @@ impl Protocol for Session {

#[cfg(test)]
mod tests {
use neqo_common::Bytes;
use neqo_transport::StreamId;

use super::Session;
Expand All @@ -115,13 +119,17 @@ mod tests {
let session = Session::new(StreamId::new(42));
// Varint [0x00] is 0, i.e. a supported connect-udp context ID.
assert_eq!(
session.dgram_context_id(&[0x00, 0x00, 0x00]).unwrap(),
&[0x00, 0x00]
session
.dgram_context_id(Bytes::from(vec![0x00, 0x00, 0x00]))
.unwrap(),
Bytes::from(vec![0x00, 0x00])
);
// Varint [0x40 0x00] is 0 as well, thus a supported connect-udp context ID, too.
assert_eq!(
session.dgram_context_id(&[0x40, 0x00, 0x00, 0x00]).unwrap(),
&[0x00, 0x00]
session
.dgram_context_id(Bytes::from(vec![0x40, 0x00, 0x00, 0x00]))
.unwrap(),
Bytes::from(vec![0x00, 0x00])
);
}
}
4 changes: 2 additions & 2 deletions neqo-http3/src/features/extended_connect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ mod tests;

use std::{cell::RefCell, fmt::Debug, mem, rc::Rc};

use neqo_common::{Header, Role};
use neqo_common::{Bytes, Header, Role};
use neqo_transport::StreamId;

use crate::{
Expand Down Expand Up @@ -51,7 +51,7 @@ pub(crate) trait ExtendedConnectEvents: Debug {
fn new_datagram(
&self,
session_id: StreamId,
datagram: Vec<u8>,
datagram: Bytes,
connect_type: ExtendedConnectType,
);
}
Expand Down
20 changes: 11 additions & 9 deletions neqo-http3/src/features/extended_connect/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
time::Instant,
};

use neqo_common::{qdebug, qtrace, Encoder, Header, MessageType, Role};
use neqo_common::{qdebug, qtrace, Bytes, Encoder, Header, MessageType, Role};
use neqo_transport::{AppError, Connection, DatagramTracking, StreamId};

use crate::{
Expand Down Expand Up @@ -398,20 +398,22 @@ impl Session {
Ok(())
}

pub(crate) fn datagram(&self, datagram: &[u8]) {
pub(crate) fn datagram(&self, datagram: Bytes) {
if self.state != State::Active {
qdebug!("[{self}]: received datagram on {:?} session.", self.state);
return;
}
let datagram = match self.protocol.dgram_context_id(datagram) {
Ok(datagram) => datagram,

// dgram_context_id returns the payload after stripping any context ID
Comment thread
larseggert marked this conversation as resolved.
match self.protocol.dgram_context_id(datagram) {
Ok(slice) => {
self.events
.new_datagram(self.id, slice, self.protocol.connect_type());
}
Err(e) => {
qdebug!("[{self}]: received datagram with invalid context identifier: {e}");
return;
}
};
self.events
.new_datagram(self.id, datagram.to_vec(), self.protocol.connect_type());
}
}

fn has_data_to_send(&self) -> bool {
Expand Down Expand Up @@ -561,7 +563,7 @@ pub(crate) trait Protocol: Debug + Display {

fn write_datagram_prefix(&self, encoder: &mut Encoder);

fn dgram_context_id<'a>(&self, datagram: &'a [u8]) -> Result<&'a [u8], DgramContextIdError>;
fn dgram_context_id(&self, datagram: Bytes) -> Result<Bytes, DgramContextIdError>;
}

#[derive(Debug, Error)]
Expand Down
Loading
Loading