Skip to content

Commit 25e2706

Browse files
authored
feat(s2n-quic-dc): implement queue allocator/dispatcher (#2517)
1 parent fb30a2b commit 25e2706

File tree

14 files changed

+1515
-14
lines changed

14 files changed

+1515
-14
lines changed

dc/s2n-quic-dc/src/socket/recv/router.rs

+2
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use s2n_quic_core::inet::{ExplicitCongestionNotification, SocketAddress};
1111

1212
/// Routes incoming packet segments to the appropriate destination
1313
pub trait Router {
14+
fn is_open(&self) -> bool;
15+
1416
#[inline(always)]
1517
fn tag_len(&self) -> usize {
1618
16

dc/s2n-quic-dc/src/stream/recv.rs

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
mod ack;
55
pub mod application;
66
pub(crate) mod buffer;
7+
pub mod dispatch;
78
mod error;
89
mod packet;
910
mod probes;
+209
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
use crate::{credentials, packet, socket::recv::descriptor as desc, sync::ring_deque};
5+
use s2n_quic_core::{inet::SocketAddress, varint::VarInt};
6+
use tracing::debug;
7+
8+
mod descriptor;
9+
mod free_list;
10+
mod handle;
11+
mod pool;
12+
mod queue;
13+
mod sender;
14+
15+
#[cfg(test)]
16+
mod tests;
17+
18+
/// Allocate this many channels at a time
19+
///
20+
/// With `debug_assertions`, we allocate smaller pages to try and cover more
21+
/// branches in the allocator logic around growth.
22+
const PAGE_SIZE: usize = if cfg!(debug_assertions) { 8 } else { 256 };
23+
24+
pub type Error = queue::Error;
25+
pub type Control = handle::Control<desc::Filled>;
26+
pub type Stream = handle::Stream<desc::Filled>;
27+
28+
/// A queue allocator for registering a receiver to process packets
29+
/// for a given ID.
30+
#[derive(Clone)]
31+
pub struct Allocator {
32+
pool: pool::Pool<desc::Filled, PAGE_SIZE>,
33+
}
34+
35+
impl Allocator {
36+
pub fn new(
37+
stream_capacity: impl Into<ring_deque::Capacity>,
38+
control_capacity: impl Into<ring_deque::Capacity>,
39+
) -> Self {
40+
Self {
41+
pool: pool::Pool::new(
42+
VarInt::ZERO,
43+
stream_capacity.into(),
44+
control_capacity.into(),
45+
),
46+
}
47+
}
48+
49+
/// Creates an allocator with a non-zero queue id
50+
///
51+
/// This is used for patterns where the `queue_id=0` is special and used to
52+
/// indicate newly initialized flows waiting to be assigned. For example,
53+
/// a client sends a packet with `queue_id=0` to a server and waits for the
54+
/// server to respond with an actual `queue_id` for future packets from the client.
55+
pub fn new_non_zero(
56+
stream_capacity: impl Into<ring_deque::Capacity>,
57+
control_capacity: impl Into<ring_deque::Capacity>,
58+
) -> Self {
59+
Self {
60+
pool: pool::Pool::new(
61+
VarInt::from_u8(1),
62+
stream_capacity.into(),
63+
control_capacity.into(),
64+
),
65+
}
66+
}
67+
68+
#[inline]
69+
pub fn dispatcher(&self) -> Dispatch {
70+
Dispatch {
71+
senders: self.pool.senders(),
72+
is_open: true,
73+
}
74+
}
75+
76+
#[inline]
77+
pub fn alloc(&mut self) -> Option<(Control, Stream)> {
78+
self.pool.alloc()
79+
}
80+
81+
#[inline]
82+
pub fn alloc_or_grow(&mut self) -> (Control, Stream) {
83+
self.pool.alloc_or_grow()
84+
}
85+
}
86+
87+
/// A dispatcher which routes packets to the specified queue, if
88+
/// there is a registered receiver.
89+
#[derive(Clone)]
90+
pub struct Dispatch {
91+
senders: sender::Senders<desc::Filled, PAGE_SIZE>,
92+
is_open: bool,
93+
}
94+
95+
impl Dispatch {
96+
#[inline]
97+
pub fn send_control(
98+
&mut self,
99+
queue_id: VarInt,
100+
segment: desc::Filled,
101+
) -> Result<Option<desc::Filled>, Error> {
102+
let mut res = Err(Error::Unallocated);
103+
self.senders.lookup(queue_id, |sender| {
104+
res = sender.send_control(segment);
105+
});
106+
107+
if matches!(res, Err(Error::Closed)) {
108+
self.is_open = false;
109+
}
110+
111+
res
112+
}
113+
114+
#[inline]
115+
pub fn send_stream(
116+
&mut self,
117+
queue_id: VarInt,
118+
segment: desc::Filled,
119+
) -> Result<Option<desc::Filled>, Error> {
120+
let mut res = Err(Error::Unallocated);
121+
self.senders.lookup(queue_id, |sender| {
122+
res = sender.send_stream(segment);
123+
});
124+
125+
if matches!(res, Err(Error::Closed)) {
126+
self.is_open = false;
127+
}
128+
129+
res
130+
}
131+
}
132+
133+
impl crate::socket::recv::router::Router for Dispatch {
134+
#[inline(always)]
135+
fn is_open(&self) -> bool {
136+
self.is_open
137+
}
138+
139+
#[inline(always)]
140+
fn tag_len(&self) -> usize {
141+
16
142+
}
143+
144+
/// implement this so we don't get warnings about not handling it
145+
#[inline(always)]
146+
fn handle_control_packet(
147+
&mut self,
148+
_remote_address: SocketAddress,
149+
_ecn: s2n_quic_core::inet::ExplicitCongestionNotification,
150+
_packet: packet::control::decoder::Packet,
151+
) {
152+
}
153+
154+
#[inline]
155+
fn dispatch_control_packet(
156+
&mut self,
157+
_tag: packet::control::Tag,
158+
id: Option<packet::stream::Id>,
159+
credentials: credentials::Credentials,
160+
segment: desc::Filled,
161+
) {
162+
let Some(id) = id else {
163+
return;
164+
};
165+
166+
match self.send_control(id.queue_id, segment) {
167+
Ok(None) => {}
168+
Ok(Some(_prev)) => {
169+
// TODO increment metrics
170+
debug!(queue_id = %id.queue_id, "control queue overflow");
171+
}
172+
Err(_) => {
173+
// TODO increment metrics
174+
debug!(stream_id = ?id, ?credentials, "unroutable control packet");
175+
}
176+
}
177+
}
178+
179+
/// implement this so we don't get warnings about not handling it
180+
#[inline(always)]
181+
fn handle_stream_packet(
182+
&mut self,
183+
_remote_address: SocketAddress,
184+
_ecn: s2n_quic_core::inet::ExplicitCongestionNotification,
185+
_packet: packet::stream::decoder::Packet,
186+
) {
187+
}
188+
189+
#[inline]
190+
fn dispatch_stream_packet(
191+
&mut self,
192+
_tag: packet::stream::Tag,
193+
id: packet::stream::Id,
194+
credentials: credentials::Credentials,
195+
segment: desc::Filled,
196+
) {
197+
match self.send_stream(id.queue_id, segment) {
198+
Ok(None) => {}
199+
Ok(Some(_prev)) => {
200+
// TODO increment metrics
201+
debug!(queue_id = %id.queue_id, "stream queue overflow");
202+
}
203+
Err(_) => {
204+
// TODO increment metrics
205+
debug!(stream_id = ?id, ?credentials, "unroutable stream packet");
206+
}
207+
}
208+
}
209+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
version https://git-lfs.github.com/spec/v1
2+
oid sha256:64dc6c1ff0f7c89de06fcc817360f949a95b2d524afe99789e2429c9e082603f
3+
size 2488320

0 commit comments

Comments
 (0)