Skip to content

Commit 1373ea3

Browse files
committed
Add inbound connection admission control to the QUIC endpoint
1 parent 487f8bc commit 1373ea3

5 files changed

Lines changed: 395 additions & 22 deletions

File tree

crates/anemo/src/config.rs

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use rcgen::{CertificateParams, KeyPair};
88
use rustls::pki_types::CertificateDer;
99
use rustls::pki_types::PrivateKeyDer;
1010
use serde::{Deserialize, Serialize};
11-
use std::{sync::Arc, time::Duration};
11+
use std::{num::NonZeroU32, sync::Arc, time::Duration};
1212

1313
/// Configuration for a [`Network`](crate::Network).
1414
#[derive(Clone, Debug, Default, Serialize, Deserialize)]
@@ -81,6 +81,36 @@ pub struct Config {
8181
#[serde(skip_serializing_if = "Option::is_none")]
8282
pub max_concurrent_connections: Option<usize>,
8383

84+
/// Whether to require QUIC source-address validation (via a stateless Retry
85+
/// packet) before beginning the TLS handshake for an inbound connection whose
86+
/// source address has not yet been validated.
87+
///
88+
/// When enabled, the first attempt from an unvalidated address is answered with
89+
/// a Retry instead of starting the handshake, forcing the peer to prove it can
90+
/// receive traffic at its claimed address.
91+
///
92+
/// If unspecified, this defaults to `true`.
93+
#[serde(skip_serializing_if = "Option::is_none")]
94+
pub require_inbound_address_validation: Option<bool>,
95+
96+
/// Maximum sustained rate, per source IP, of new inbound connections admitted to
97+
/// the TLS handshake, as a token-bucket refill rate in connections per second.
98+
/// Inbound attempts from a source IP exceeding this rate are dropped before the
99+
/// handshake begins.
100+
///
101+
/// If unspecified, this defaults to `10`. Setting it to `0` disables per-IP
102+
/// inbound connection rate limiting.
103+
#[serde(skip_serializing_if = "Option::is_none")]
104+
pub inbound_connection_rate_limit_per_ip: Option<u32>,
105+
106+
/// Token-bucket burst size for [`Config::inbound_connection_rate_limit_per_ip`],
107+
/// i.e. the maximum number of inbound connections from a single source IP that
108+
/// may be admitted in an instantaneous burst.
109+
///
110+
/// If unspecified, this defaults to `100`.
111+
#[serde(skip_serializing_if = "Option::is_none")]
112+
pub inbound_connection_rate_limit_burst_per_ip: Option<u32>,
113+
84114
/// Size of the broadcast channel use for subscribing to
85115
/// [`PeerEvent`](crate::types::PeerEvent)s via
86116
/// [`Network::subscribe`](crate::Network::subscribe).
@@ -278,6 +308,34 @@ impl Config {
278308
self.max_concurrent_connections
279309
}
280310

311+
pub(crate) fn require_inbound_address_validation(&self) -> bool {
312+
const DEFAULT: bool = true;
313+
314+
self.require_inbound_address_validation.unwrap_or(DEFAULT)
315+
}
316+
317+
/// Sustained per-source-IP inbound connection admission rate in connections per
318+
/// second. Returns `None` only when explicitly disabled by configuring `0`.
319+
pub(crate) fn inbound_connection_rate_limit_per_ip(&self) -> Option<NonZeroU32> {
320+
const DEFAULT_RATE_PER_SEC: u32 = 10;
321+
322+
// `NonZeroU32::new` yields `None` for a configured `0`, i.e. rate limiting off.
323+
NonZeroU32::new(
324+
self.inbound_connection_rate_limit_per_ip
325+
.unwrap_or(DEFAULT_RATE_PER_SEC),
326+
)
327+
}
328+
329+
pub(crate) fn inbound_connection_rate_limit_burst_per_ip(&self) -> NonZeroU32 {
330+
const DEFAULT_BURST: u32 = 100;
331+
332+
// A configured burst of `0` is nonsensical (it would admit nothing); fall back
333+
// to the default in that case as well as when unset.
334+
self.inbound_connection_rate_limit_burst_per_ip
335+
.and_then(NonZeroU32::new)
336+
.unwrap_or_else(|| NonZeroU32::new(DEFAULT_BURST).expect("DEFAULT_BURST is nonzero"))
337+
}
338+
281339
pub(crate) fn peer_event_broadcast_channel_capacity(&self) -> usize {
282340
const PEER_EVENT_BROADCAST_CHANNEL_CAPACITY: usize = 128;
283341

crates/anemo/src/endpoint.rs

Lines changed: 94 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -154,14 +154,68 @@ pin_project_lite::pin_project! {
154154
}
155155

156156
impl Future for Accept<'_> {
157-
type Output = Option<Connecting>;
157+
type Output = Option<Incoming>;
158158

159159
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
160-
self.project().inner.poll(ctx).map(|maybe_connecting| {
161-
maybe_connecting
162-
.and_then(|incoming| incoming.accept().ok())
163-
.map(Connecting::new_inbound)
164-
})
160+
self.project()
161+
.inner
162+
.poll(ctx)
163+
.map(|maybe_incoming| maybe_incoming.map(Incoming::new))
164+
}
165+
}
166+
167+
/// An inbound connection attempt for which the server has not yet begun the TLS
168+
/// handshake.
169+
///
170+
/// Yielded by [`Endpoint::accept`]. Admission decisions (source-address validation,
171+
/// rate limiting) are made against this type before calling [`Incoming::accept`],
172+
/// which is what begins the handshake.
173+
#[derive(Debug)]
174+
#[must_use = "an Incoming must be accepted, retried, or ignored"]
175+
pub(crate) struct Incoming(quinn::Incoming);
176+
177+
impl Incoming {
178+
pub(crate) fn new(inner: quinn::Incoming) -> Self {
179+
Self(inner)
180+
}
181+
182+
/// The remote peer's UDP socket address.
183+
pub(crate) fn remote_address(&self) -> SocketAddr {
184+
self.0.remote_address()
185+
}
186+
187+
/// Whether the peer has proved it can receive traffic at `remote_address()`.
188+
pub(crate) fn remote_address_validated(&self) -> bool {
189+
self.0.remote_address_validated()
190+
}
191+
192+
/// Ensure the peer's source address is validated before the handshake.
193+
///
194+
/// If the address is already validated, returns the attempt so the caller can
195+
/// proceed. Otherwise sends a stateless Retry packet and consumes the attempt
196+
/// (returning `None`); the peer must re-initiate from a validated address.
197+
#[must_use = "a returned Incoming must be accepted or ignored"]
198+
pub(crate) fn validate_source(self) -> Option<Incoming> {
199+
if self.remote_address_validated() {
200+
return Some(self);
201+
}
202+
// The address is unvalidated, so a Retry is always legal here: quinn
203+
// guarantees may_retry() whenever the address is unvalidated.
204+
let _ = self.0.retry();
205+
None
206+
}
207+
208+
/// Drop the attempt without sending any response packet.
209+
pub(crate) fn ignore(self) {
210+
self.0.ignore()
211+
}
212+
213+
/// Begin the handshake for this connection.
214+
pub(crate) fn accept(self) -> Result<Connecting> {
215+
self.0
216+
.accept()
217+
.map_err(anyhow::Error::from)
218+
.map(Connecting::new_inbound)
165219
}
166220
}
167221

@@ -237,7 +291,14 @@ mod test {
237291
};
238292

239293
let peer_2 = async move {
240-
let connection = endpoint_2.accept().await.unwrap().await.unwrap();
294+
let connection = endpoint_2
295+
.accept()
296+
.await
297+
.unwrap()
298+
.accept()
299+
.unwrap()
300+
.await
301+
.unwrap();
241302
assert_eq!(connection.peer_id(), peer_id_1);
242303

243304
let mut recv = connection.accept_uni().await.unwrap();
@@ -299,10 +360,24 @@ mod test {
299360
};
300361

301362
let peer_2 = async move {
302-
let connection_1 = endpoint_2.accept().await.unwrap().await.unwrap();
363+
let connection_1 = endpoint_2
364+
.accept()
365+
.await
366+
.unwrap()
367+
.accept()
368+
.unwrap()
369+
.await
370+
.unwrap();
303371
assert_eq!(connection_1.peer_id(), peer_id_1);
304372

305-
let connection_2 = endpoint_2.accept().await.unwrap().await.unwrap();
373+
let connection_2 = endpoint_2
374+
.accept()
375+
.await
376+
.unwrap()
377+
.accept()
378+
.unwrap()
379+
.await
380+
.unwrap();
306381
assert_eq!(connection_2.peer_id(), peer_id_1);
307382
assert_ne!(connection_1.stable_id(), connection_2.stable_id());
308383

@@ -352,7 +427,16 @@ mod test {
352427

353428
let (connection_1_to_2, connection_2_to_1) = timeout(join(
354429
async { endpoint_1.connect(addr_2).unwrap().await.unwrap() },
355-
async { endpoint_2.accept().await.unwrap().await.unwrap() },
430+
async {
431+
endpoint_2
432+
.accept()
433+
.await
434+
.unwrap()
435+
.accept()
436+
.unwrap()
437+
.await
438+
.unwrap()
439+
},
356440
))
357441
.await
358442
.unwrap();

crates/anemo/src/network/connection_manager.rs

Lines changed: 59 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1+
use super::inbound_rate_limit::InboundIpRateLimiter;
12
use super::request_handler::InboundRequestHandler;
23
use crate::{
34
config::Config,
45
connection::Connection,
5-
endpoint::{Connecting, Endpoint},
6+
endpoint::{Connecting, Endpoint, Incoming},
67
types::{Address, DisconnectReason, PeerAffinity, PeerEvent, PeerInfo},
78
ConnectionOrigin, PeerId, Request, Response, Result,
89
};
@@ -11,6 +12,7 @@ use std::{
1112
collections::{hash_map::Entry, HashMap},
1213
convert::Infallible,
1314
sync::{Arc, RwLock},
15+
time::Instant,
1416
};
1517
use tokio::{
1618
sync::{broadcast, mpsc, oneshot},
@@ -54,6 +56,10 @@ pub(crate) struct ConnectionManager {
5456
pending_dials: HashMap<PeerId, oneshot::Receiver<Result<PeerId>>>,
5557
dial_backoff_states: HashMap<PeerId, DialBackoffState>,
5658

59+
/// Per-source-IP rate limiter for admitting new inbound connections.
60+
/// `None` when inbound rate limiting is disabled.
61+
inbound_rate_limiter: Option<InboundIpRateLimiter>,
62+
5763
active_peers: ActivePeers,
5864
known_peers: KnownPeers,
5965

@@ -75,6 +81,9 @@ impl ConnectionManager {
7581
service: BoxCloneService<Request<Bytes>, Response<Bytes>, Infallible>,
7682
) -> (Self, mpsc::Sender<ConnectionManagerRequest>) {
7783
let (sender, receiver) = mpsc::channel(config.connection_manager_channel_capacity());
84+
let inbound_rate_limiter = config.inbound_connection_rate_limit_per_ip().map(|rate| {
85+
InboundIpRateLimiter::new(rate, config.inbound_connection_rate_limit_burst_per_ip())
86+
});
7887
(
7988
Self {
8089
config,
@@ -84,6 +93,7 @@ impl ConnectionManager {
8493
connection_handlers: JoinSet::new(),
8594
pending_dials: HashMap::default(),
8695
dial_backoff_states: HashMap::default(),
96+
inbound_rate_limiter,
8797
active_peers,
8898
known_peers,
8999
service,
@@ -142,9 +152,9 @@ impl ConnectionManager {
142152
}
143153
}
144154
}
145-
connecting = self.endpoint.accept() => {
146-
if let Some(connecting) = connecting {
147-
self.handle_incoming(connecting);
155+
incoming = self.endpoint.accept() => {
156+
if let Some(incoming) = incoming {
157+
self.handle_incoming(incoming);
148158
}
149159
},
150160
Some(connecting_output) = self.pending_connections.join_next() => {
@@ -229,15 +239,48 @@ impl ConnectionManager {
229239
self.dial_peer(address, peer_id, oneshot);
230240
}
231241

232-
fn handle_incoming(&mut self, connecting: Connecting) {
242+
fn handle_incoming(&mut self, incoming: Incoming) {
233243
trace!("received new incoming connection");
234244

235-
self.pending_connections.spawn(Self::handle_incoming_task(
236-
connecting,
237-
self.config.clone(),
238-
self.active_peers.clone(),
239-
self.known_peers.clone(),
240-
));
245+
let remote_address = incoming.remote_address();
246+
247+
// Validate source address, if enabled.
248+
let incoming = if self.config.require_inbound_address_validation() {
249+
match incoming.validate_source() {
250+
Some(incoming) => incoming,
251+
None => return,
252+
}
253+
} else {
254+
incoming
255+
};
256+
257+
// Apply per-source-IP inbound rate limit. Loopback sources are
258+
// exempt: they can only originate on this host (a remote attacker cannot
259+
// present a loopback source address), and exempting them avoids throttling
260+
// local/test topologies where many peers share a loopback address.
261+
if let Some(limiter) = self.inbound_rate_limiter.as_mut() {
262+
let ip = remote_address.ip();
263+
if !ip.is_loopback() && !limiter.check(ip, Instant::now()) {
264+
debug!(%remote_address, "dropping inbound connection: per-source-IP rate limit exceeded");
265+
incoming.ignore();
266+
return;
267+
}
268+
}
269+
270+
// Admit the connection and begin the handshake.
271+
match incoming.accept() {
272+
Ok(connecting) => {
273+
self.pending_connections.spawn(Self::handle_incoming_task(
274+
connecting,
275+
self.config.clone(),
276+
self.active_peers.clone(),
277+
self.known_peers.clone(),
278+
));
279+
}
280+
Err(e) => {
281+
debug!(%remote_address, "failed to accept inbound connection: {e}");
282+
}
283+
}
241284
}
242285

243286
async fn handle_incoming_task(
@@ -420,6 +463,11 @@ impl ConnectionManager {
420463
self.dial_peer(address, Some(peer.peer_id), sender);
421464
self.pending_dials.insert(peer.peer_id, receiver);
422465
}
466+
467+
// Clean up rate limiter buckets.
468+
if let Some(limiter) = self.inbound_rate_limiter.as_mut() {
469+
limiter.evict_idle(now);
470+
}
423471
}
424472

425473
#[instrument(level = "trace", skip_all, fields(peer_id = ?peer_id, address = ?address))]

0 commit comments

Comments
 (0)