Skip to content

Commit 04e2261

Browse files
committed
feat: switch to noq
1 parent a261ae5 commit 04e2261

9 files changed

Lines changed: 105 additions & 73 deletions

File tree

compio-quic/Cargo.toml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ compio-log = { workspace = true }
2121
compio-net = { workspace = true }
2222
compio-runtime = { workspace = true, features = ["time"] }
2323

24-
quinn-proto = { version = "0.11.13", default-features = false }
24+
noq-proto = { version = "0.16.0", default-features = false }
2525
rustls = { workspace = true }
2626
rustls-platform-verifier = { workspace = true, optional = true }
2727
rustls-native-certs = { workspace = true, optional = true }
@@ -55,7 +55,7 @@ compio-runtime = { workspace = true, features = ["criterion"] }
5555

5656
criterion = { workspace = true, features = ["async_tokio"] }
5757
http = "1.1.0"
58-
quinn = "0.11.9"
58+
noq = "0.17.0"
5959
rand = { workspace = true }
6060
rcgen = "0.14.1"
6161
socket2 = { workspace = true, features = ["all"] }
@@ -68,9 +68,9 @@ io-compat = ["futures-util/io"]
6868
platform-verifier = ["dep:rustls-platform-verifier"]
6969
native-certs = ["dep:rustls-native-certs"]
7070
webpki-roots = ["dep:webpki-roots"]
71-
qlog = ["quinn-proto/qlog"]
71+
qlog = ["noq-proto/qlog"]
7272
h3 = ["dep:h3", "dep:h3-datagram"]
73-
ring = ["quinn-proto/rustls-ring"]
73+
ring = ["noq-proto/rustls-ring"]
7474
windows-gro = []
7575
sync = []
7676

compio-quic/src/builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::{io, sync::Arc};
22

33
use compio_net::ToSocketAddrsAsync;
4-
use quinn_proto::{
4+
use noq_proto::{
55
ClientConfig, ServerConfig,
66
crypto::rustls::{QuicClientConfig, QuicServerConfig},
77
};

compio-quic/src/connection.rs

Lines changed: 59 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ use futures_util::{
1717
select, stream,
1818
};
1919
#[cfg(rustls)]
20-
use quinn_proto::crypto::rustls::HandshakeData;
21-
use quinn_proto::{
22-
ConnectionHandle, ConnectionStats, Dir, EndpointEvent, Side, StreamEvent, StreamId, VarInt,
23-
congestion::Controller,
20+
use noq_proto::crypto::rustls::HandshakeData;
21+
use noq_proto::{
22+
ConnectionHandle, ConnectionStats, Dir, EndpointEvent, PathId, Side, StreamEvent, StreamId,
23+
VarInt, congestion::Controller,
2424
};
2525
use rustc_hash::FxHashMap as HashMap;
2626
use thiserror::Error;
@@ -36,12 +36,12 @@ use crate::{
3636
#[derive(Debug)]
3737
pub(crate) enum ConnectionEvent {
3838
Close(VarInt, Bytes),
39-
Proto(quinn_proto::ConnectionEvent),
39+
Proto(noq_proto::ConnectionEvent),
4040
}
4141

4242
#[derive(Debug)]
4343
pub(crate) struct ConnectionState {
44-
pub(crate) conn: quinn_proto::Connection,
44+
pub(crate) conn: noq_proto::Connection,
4545
pub(crate) error: Option<ConnectionError>,
4646
connected: bool,
4747
worker: Option<JoinHandle<()>>,
@@ -134,7 +134,7 @@ fn implicit_close(this: &Shared<ConnectionInner>) {
134134
impl ConnectionInner {
135135
fn new(
136136
handle: ConnectionHandle,
137-
conn: quinn_proto::Connection,
137+
conn: noq_proto::Connection,
138138
socket: Socket,
139139
events_tx: Sender<(ConnectionHandle, EndpointEvent)>,
140140
events_rx: Receiver<ConnectionEvent>,
@@ -246,7 +246,7 @@ impl ConnectionInner {
246246
}
247247

248248
while let Some(event) = state.conn.poll() {
249-
use quinn_proto::Event::*;
249+
use noq_proto::Event::*;
250250
match event {
251251
HandshakeDataReady => {
252252
if let Some(waker) = state.on_handshake_data.take() {
@@ -282,6 +282,16 @@ impl ConnectionInner {
282282
.for_each(Waker::wake),
283283
DatagramReceived => state.datagram_received.drain(..).for_each(Waker::wake),
284284
DatagramsUnblocked => state.datagrams_unblocked.drain(..).for_each(Waker::wake),
285+
286+
HandshakeConfirmed => {
287+
todo!()
288+
}
289+
Path(_) => {
290+
todo!()
291+
}
292+
NatTraversal(_) => {
293+
todo!()
294+
}
285295
}
286296
}
287297

@@ -313,19 +323,37 @@ macro_rules! conn_fn {
313323
/// This will return `None` for clients, or when the platform does not
314324
/// expose this information.
315325
pub fn local_ip(&self) -> Option<IpAddr> {
316-
self.0.state().conn.local_ip()
326+
let state = self.0.state();
327+
328+
state
329+
.conn
330+
.paths()
331+
.iter()
332+
.filter_map(|id| state.conn.network_path(*id).ok())
333+
.next()
334+
.unwrap()
335+
.local_ip
317336
}
318337

319338
/// The peer's UDP address.
320339
///
321340
/// Will panic if called after `poll` has returned `Ready`.
322341
pub fn remote_address(&self) -> SocketAddr {
323-
self.0.state().conn.remote_address()
342+
let state = self.0.state();
343+
344+
state
345+
.conn
346+
.paths()
347+
.iter()
348+
.filter_map(|id| state.conn.network_path(*id).ok())
349+
.next()
350+
.unwrap()
351+
.remote
324352
}
325353

326354
/// Current best estimate of this connection's latency (round-trip-time).
327-
pub fn rtt(&self) -> Duration {
328-
self.0.state().conn.rtt()
355+
pub fn rtt(&self, path_id: PathId) -> Option<Duration> {
356+
self.0.state().conn.rtt(path_id)
329357
}
330358

331359
/// Connection statistics.
@@ -335,8 +363,12 @@ macro_rules! conn_fn {
335363

336364
/// Current state of the congestion control algorithm. (For debugging
337365
/// purposes)
338-
pub fn congestion_state(&self) -> Box<dyn Controller> {
339-
self.0.state().conn.congestion_state().clone_box()
366+
pub fn congestion_state(&self, path_id: PathId) -> Option<Box<dyn Controller>> {
367+
self.0
368+
.state()
369+
.conn
370+
.congestion_state(path_id)
371+
.map(|state| state.clone_box())
340372
}
341373

342374
/// Cryptographic identity of the peer.
@@ -375,7 +407,7 @@ macro_rules! conn_fn {
375407
output: &mut [u8],
376408
label: &[u8],
377409
context: &[u8],
378-
) -> Result<(), quinn_proto::crypto::ExportKeyingMaterialError> {
410+
) -> Result<(), noq_proto::crypto::ExportKeyingMaterialError> {
379411
self.0
380412
.state()
381413
.conn
@@ -395,7 +427,7 @@ impl Connecting {
395427

396428
pub(crate) fn new(
397429
handle: ConnectionHandle,
398-
conn: quinn_proto::Connection,
430+
conn: noq_proto::Connection,
399431
socket: Socket,
400432
events_tx: Sender<(ConnectionHandle, EndpointEvent)>,
401433
events_rx: Receiver<ConnectionEvent>,
@@ -573,14 +605,14 @@ impl Connection {
573605
state.wake();
574606
}
575607

576-
/// See [`quinn_proto::TransportConfig::send_window()`]
608+
/// See [`noq_proto::TransportConfig::send_window()`]
577609
pub fn set_send_window(&self, send_window: u64) {
578610
let mut state = self.0.state();
579611
state.conn.set_send_window(send_window);
580612
state.wake();
581613
}
582614

583-
/// See [`quinn_proto::TransportConfig::receive_window()`]
615+
/// See [`noq_proto::TransportConfig::receive_window()`]
584616
pub fn set_receive_window(&self, receive_window: VarInt) {
585617
let mut state = self.0.state();
586618
state.conn.set_receive_window(receive_window);
@@ -685,7 +717,7 @@ impl Connection {
685717
cx: Option<&mut Context>,
686718
data: Bytes,
687719
) -> Result<(), Result<SendDatagramError, Bytes>> {
688-
use quinn_proto::SendDatagramError::*;
720+
use noq_proto::SendDatagramError::*;
689721
let mut state = self.0.try_state().map_err(|e| Ok(e.into()))?;
690722
state
691723
.conn
@@ -945,13 +977,13 @@ pub enum ConnectionError {
945977
/// The peer violated the QUIC specification as understood by this
946978
/// implementation
947979
#[error(transparent)]
948-
TransportError(#[from] quinn_proto::TransportError),
980+
TransportError(#[from] noq_proto::TransportError),
949981
/// The peer's QUIC stack aborted the connection automatically
950982
#[error("aborted by peer: {0}")]
951-
ConnectionClosed(quinn_proto::ConnectionClose),
983+
ConnectionClosed(noq_proto::ConnectionClose),
952984
/// The peer closed the connection
953985
#[error("closed by peer: {0}")]
954-
ApplicationClosed(quinn_proto::ApplicationClose),
986+
ApplicationClosed(noq_proto::ApplicationClose),
955987
/// The peer is unable to continue processing this connection, usually due
956988
/// to having restarted
957989
#[error("reset by peer")]
@@ -961,8 +993,8 @@ pub enum ConnectionError {
961993
///
962994
/// If neither side is sending keep-alives, a connection will time out after
963995
/// a long enough idle period even if the peer is still reachable. See
964-
/// also [`TransportConfig::max_idle_timeout()`](quinn_proto::TransportConfig::max_idle_timeout())
965-
/// and [`TransportConfig::keep_alive_interval()`](quinn_proto::TransportConfig::keep_alive_interval()).
996+
/// also [`TransportConfig::max_idle_timeout()`](noq_proto::TransportConfig::max_idle_timeout())
997+
/// and [`TransportConfig::keep_alive_interval()`](noq_proto::TransportConfig::keep_alive_interval()).
966998
#[error("timed out")]
967999
TimedOut,
9681000
/// The local application closed the connection
@@ -976,9 +1008,9 @@ pub enum ConnectionError {
9761008
CidsExhausted,
9771009
}
9781010

979-
impl From<quinn_proto::ConnectionError> for ConnectionError {
980-
fn from(value: quinn_proto::ConnectionError) -> Self {
981-
use quinn_proto::ConnectionError::*;
1011+
impl From<noq_proto::ConnectionError> for ConnectionError {
1012+
fn from(value: noq_proto::ConnectionError) -> Self {
1013+
use noq_proto::ConnectionError::*;
9821014

9831015
match value {
9841016
VersionMismatch => ConnectionError::VersionMismatch,

compio-quic/src/endpoint.rs

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ use compio_net::UdpSocket;
2121
use compio_runtime::JoinHandle;
2222
use flume::{Receiver, Sender, unbounded};
2323
use futures_util::{FutureExt, StreamExt, future, select, task::AtomicWaker};
24-
use quinn_proto::{
24+
use noq_proto::{
2525
ClientConfig, ConnectError, ConnectionError, ConnectionHandle, DatagramEvent, EndpointConfig,
26-
EndpointEvent, ServerConfig, Transmit, VarInt,
26+
EndpointEvent, FourTuple, ServerConfig, Transmit, VarInt,
2727
};
2828
use rustc_hash::FxHashMap as HashMap;
2929

@@ -34,12 +34,12 @@ use crate::{
3434

3535
#[derive(Debug)]
3636
struct EndpointState {
37-
endpoint: quinn_proto::Endpoint,
37+
endpoint: noq_proto::Endpoint,
3838
worker: Option<JoinHandle<()>>,
3939
connections: HashMap<ConnectionHandle, Sender<ConnectionEvent>>,
4040
close: Option<(VarInt, Bytes)>,
4141
exit_on_idle: bool,
42-
incoming: VecDeque<quinn_proto::Incoming>,
42+
incoming: VecDeque<noq_proto::Incoming>,
4343
incoming_wakers: VecDeque<Waker>,
4444
stats: EndpointStats,
4545
}
@@ -68,8 +68,10 @@ impl EndpointState {
6868
let mut resp_buf = Vec::new();
6969
match self.endpoint.handle(
7070
now,
71-
meta.remote,
72-
meta.local_ip,
71+
FourTuple {
72+
remote: meta.remote,
73+
local_ip: meta.local_ip,
74+
},
7375
meta.ecn,
7476
data,
7577
&mut resp_buf,
@@ -112,7 +114,7 @@ impl EndpointState {
112114
self.connections.is_empty()
113115
}
114116

115-
fn poll_incoming(&mut self, cx: &mut Context) -> Poll<Option<quinn_proto::Incoming>> {
117+
fn poll_incoming(&mut self, cx: &mut Context) -> Poll<Option<noq_proto::Incoming>> {
116118
if self.close.is_none() {
117119
if let Some(incoming) = self.incoming.pop_front() {
118120
Poll::Ready(Some(incoming))
@@ -128,7 +130,7 @@ impl EndpointState {
128130
fn new_connection(
129131
&mut self,
130132
handle: ConnectionHandle,
131-
conn: quinn_proto::Connection,
133+
conn: noq_proto::Connection,
132134
socket: Socket,
133135
events_tx: Sender<(ConnectionHandle, EndpointEvent)>,
134136
) -> Connecting {
@@ -173,11 +175,10 @@ impl EndpointInner {
173175

174176
Ok(Self {
175177
state: Mutex::new(EndpointState {
176-
endpoint: quinn_proto::Endpoint::new(
178+
endpoint: noq_proto::Endpoint::new(
177179
Arc::new(config),
178180
server_config.map(Arc::new),
179181
allow_mtud,
180-
None,
181182
),
182183
worker: None,
183184
connections: HashMap::default(),
@@ -237,7 +238,7 @@ impl EndpointInner {
237238

238239
pub(crate) fn accept(
239240
&self,
240-
incoming: quinn_proto::Incoming,
241+
incoming: noq_proto::Incoming,
241242
server_config: Option<ServerConfig>,
242243
) -> Result<Connecting, ConnectionError> {
243244
let mut state = self.state.lock();
@@ -260,7 +261,7 @@ impl EndpointInner {
260261
}
261262
}
262263

263-
pub(crate) fn refuse(&self, incoming: quinn_proto::Incoming) {
264+
pub(crate) fn refuse(&self, incoming: noq_proto::Incoming) {
264265
let mut state = self.state.lock();
265266
state.stats.refused_handshakes += 1;
266267
let mut resp_buf = Vec::new();
@@ -269,18 +270,15 @@ impl EndpointInner {
269270
}
270271

271272
#[allow(clippy::result_large_err)]
272-
pub(crate) fn retry(
273-
&self,
274-
incoming: quinn_proto::Incoming,
275-
) -> Result<(), quinn_proto::RetryError> {
273+
pub(crate) fn retry(&self, incoming: noq_proto::Incoming) -> Result<(), noq_proto::RetryError> {
276274
let mut state = self.state.lock();
277275
let mut resp_buf = Vec::new();
278276
let transmit = state.endpoint.retry(incoming, &mut resp_buf)?;
279277
self.respond(resp_buf, transmit);
280278
Ok(())
281279
}
282280

283-
pub(crate) fn ignore(&self, incoming: quinn_proto::Incoming) {
281+
pub(crate) fn ignore(&self, incoming: noq_proto::Incoming) {
284282
let mut state = self.state.lock();
285283
state.stats.ignored_handshakes += 1;
286284
state.endpoint.ignore(incoming);

compio-quic/src/incoming.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@ use std::{
66
};
77

88
use futures_util::FutureExt;
9-
use quinn_proto::{ConnectionId, ServerConfig};
9+
use noq_proto::{ConnectionId, ServerConfig};
1010
use thiserror::Error;
1111

1212
use crate::{Connecting, Connection, ConnectionError, EndpointRef};
1313

1414
#[derive(Debug)]
1515
pub(crate) struct IncomingInner {
16-
pub(crate) incoming: quinn_proto::Incoming,
16+
pub(crate) incoming: noq_proto::Incoming,
1717
pub(crate) endpoint: EndpointRef,
1818
}
1919

@@ -23,7 +23,7 @@ pub(crate) struct IncomingInner {
2323
pub struct Incoming(Option<IncomingInner>);
2424

2525
impl Incoming {
26-
pub(crate) fn new(incoming: quinn_proto::Incoming, endpoint: EndpointRef) -> Self {
26+
pub(crate) fn new(incoming: noq_proto::Incoming, endpoint: EndpointRef) -> Self {
2727
Self(Some(IncomingInner { incoming, endpoint }))
2828
}
2929

@@ -103,7 +103,7 @@ impl Incoming {
103103

104104
/// The original destination CID when initiating the connection
105105
pub fn orig_dst_cid(&self) -> ConnectionId {
106-
*self.0.as_ref().unwrap().incoming.orig_dst_cid()
106+
self.0.as_ref().unwrap().incoming.orig_dst_cid()
107107
}
108108
}
109109

0 commit comments

Comments
 (0)