Skip to content

Commit 2a6e7ba

Browse files
[p2p/authenticated] Refactor Dial Jitter (#3379)
1 parent f3b4e45 commit 2a6e7ba

22 files changed

Lines changed: 1675 additions & 670 deletions

File tree

p2p/src/authenticated/dialing.rs

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
use commonware_cryptography::PublicKey;
2+
use std::time::SystemTime;
3+
4+
/// Merges `b` into `a`, keeping the earliest time.
5+
pub(crate) fn earliest(a: Option<SystemTime>, b: SystemTime) -> Option<SystemTime> {
6+
Some(a.map_or(b, |a| a.min(b)))
7+
}
8+
9+
/// Result of checking whether a peer is dialable.
10+
#[derive(Clone, Copy, Debug, PartialEq)]
11+
pub enum DialStatus {
12+
/// Peer can be dialed immediately.
13+
Now,
14+
/// Peer will become dialable at the given time.
15+
After(SystemTime),
16+
/// Peer is not dialable.
17+
Unavailable,
18+
}
19+
20+
/// Result of attempting to reserve a peer.
21+
#[derive(Clone, Copy, Debug, PartialEq)]
22+
pub enum ReserveResult {
23+
/// Reservation succeeded.
24+
Reserved,
25+
/// Reservation denied because not enough time has elapsed since the last reservation.
26+
RateLimited,
27+
/// Reservation denied for any other reason (already reserved, is self, etc.).
28+
Unavailable,
29+
}
30+
31+
/// Dialable peers and the next time it is worth querying again.
32+
#[derive(Clone, Debug)]
33+
pub struct Dialable<C: PublicKey> {
34+
/// Peers that can be dialed immediately.
35+
pub peers: Vec<C>,
36+
37+
/// Earliest known time at which another peer may become dialable.
38+
pub next_query_at: Option<SystemTime>,
39+
}
40+
41+
impl<C: PublicKey> Default for Dialable<C> {
42+
fn default() -> Self {
43+
Self {
44+
peers: Vec::new(),
45+
next_query_at: None,
46+
}
47+
}
48+
}

p2p/src/authenticated/discovery/actors/dialer.rs

Lines changed: 213 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use commonware_runtime::{
1818
Spawner, StreamOf,
1919
};
2020
use commonware_stream::encrypted::{dial, Config as StreamConfig};
21-
use commonware_utils::SystemTimeExt;
2221
use prometheus_client::metrics::{counter::Counter, family::Family};
2322
use rand::seq::SliceRandom;
2423
use rand_core::CryptoRngCore;
@@ -38,13 +37,10 @@ pub struct Config<C: Signer> {
3837
/// which we attempt to dial peers in general.
3938
pub dial_frequency: Duration,
4039

41-
/// The frequency at which to refresh the list of dialable peers if there are no more peers in
42-
/// the queue. This also limits the rate at which any single peer is dialed multiple times.
43-
///
44-
/// This approach attempts to help ensure that the connection rate-limiter is not maxed out for
45-
/// a single peer by preventing dialing it as fast as possible. This should make it easier for
46-
/// other peers to dial us.
47-
pub query_frequency: Duration,
40+
/// The maximum interval between tracker queries when the queue is empty. This tracks the
41+
/// configured peer connection cooldown, since that is the soonest any peer could become
42+
/// reservable again.
43+
pub peer_connection_cooldown: Duration,
4844

4945
/// Whether to allow dialing private IP addresses after DNS resolution.
5046
pub allow_private_ips: bool,
@@ -61,7 +57,7 @@ pub struct Actor<E: Spawner + Clock + Network + Resolver + Metrics, C: Signer> {
6157
// ---------- Configuration ----------
6258
stream_cfg: StreamConfig<C>,
6359
dial_frequency: Duration,
64-
query_frequency: Duration,
60+
peer_connection_cooldown: Duration,
6561
allow_private_ips: bool,
6662

6763
// ---------- Metrics ----------
@@ -86,7 +82,7 @@ impl<
8682
queue: Vec::new(),
8783
stream_cfg: cfg.stream_cfg,
8884
dial_frequency: cfg.dial_frequency,
89-
query_frequency: cfg.query_frequency,
85+
peer_connection_cooldown: cfg.peer_connection_cooldown,
9086
allow_private_ips: cfg.allow_private_ips,
9187
attempts,
9288
}
@@ -166,38 +162,37 @@ impl<
166162
mut supervisor: SupervisorMailbox<E, C>,
167163
) {
168164
let mut dial_deadline = self.context.current();
169-
let mut query_deadline = self.context.current();
170165
select_loop! {
171166
self.context,
172167
on_stopped => {
173168
debug!("context shutdown, stopping dialer");
174169
},
175170
_ = self.context.sleep_until(dial_deadline) => {
176-
// Update the deadline.
177-
dial_deadline = dial_deadline.add_jittered(&mut self.context, self.dial_frequency);
178-
179-
// Pop the queue until we can reserve a peer.
180-
// If a peer is reserved, attempt to dial it.
181-
while let Some(peer) = self.queue.pop() {
182-
// Attempt to reserve peer.
183-
let Some(reservation) = tracker.dial(peer).await else {
184-
continue;
185-
};
186-
self.dial_peer(reservation, &mut supervisor);
187-
break;
188-
}
189-
},
190-
_ = self.context.sleep_until(query_deadline) => {
191-
// Update the deadline.
192-
query_deadline =
193-
query_deadline.add_jittered(&mut self.context, self.query_frequency);
194-
195-
// Only update the queue if it is empty.
171+
// Refill the queue if empty.
172+
let now = self.context.current();
173+
let mut next_query_at = None;
196174
if self.queue.is_empty() {
197-
// Query the tracker for dialable peers and shuffle the list to prevent
198-
// starvation.
199-
self.queue = tracker.dialable().await;
175+
let dialable = tracker.dialable().await;
176+
self.queue = dialable.peers;
200177
self.queue.shuffle(&mut self.context);
178+
next_query_at = dialable.next_query_at;
179+
}
180+
181+
// Set next deadline.
182+
dial_deadline = if self.queue.is_empty() {
183+
let min = now + self.dial_frequency;
184+
let max = (now + self.peer_connection_cooldown).max(min);
185+
next_query_at.unwrap_or(max).clamp(min, max)
186+
} else {
187+
now + self.dial_frequency
188+
};
189+
190+
// Pop through peers until we can reserve and dial one.
191+
while let Some(peer) = self.queue.pop() {
192+
if let Some(reservation) = tracker.dial(peer).await {
193+
self.dial_peer(reservation, &mut supervisor);
194+
break;
195+
}
201196
}
202197
},
203198
}
@@ -208,7 +203,10 @@ impl<
208203
mod tests {
209204
use super::*;
210205
use crate::{
211-
authenticated::discovery::actors::tracker::{ingress::Releaser, Metadata},
206+
authenticated::{
207+
dialing::Dialable,
208+
discovery::actors::tracker::{ingress::Releaser, Metadata},
209+
},
212210
Ingress,
213211
};
214212
use commonware_cryptography::ed25519::{PrivateKey, PublicKey};
@@ -241,7 +239,7 @@ mod tests {
241239
let dialer_cfg = Config {
242240
stream_cfg: test_stream_config(signer),
243241
dial_frequency,
244-
query_frequency: Duration::from_secs(60),
242+
peer_connection_cooldown: Duration::from_secs(60),
245243
allow_private_ips: true,
246244
};
247245

@@ -277,7 +275,10 @@ mod tests {
277275
select! {
278276
msg = tracker_rx.recv() => match msg {
279277
Some(tracker::Message::Dialable { responder }) => {
280-
let _ = responder.send(peers.clone());
278+
let _ = responder.send(Dialable {
279+
peers: peers.clone(),
280+
next_query_at: Some(context.current()),
281+
});
281282
}
282283
Some(tracker::Message::Dial {
283284
public_key,
@@ -304,4 +305,179 @@ mod tests {
304305
);
305306
});
306307
}
308+
309+
#[test]
310+
fn test_dialer_uses_tracker_next_query_deadline() {
311+
let executor = deterministic::Runner::timed(Duration::from_secs(10));
312+
executor.start(|context| async move {
313+
let signer = PrivateKey::from_seed(0);
314+
let dial_frequency = Duration::from_millis(500);
315+
316+
let dialer = Actor::new(
317+
context.with_label("dialer"),
318+
Config {
319+
stream_cfg: test_stream_config(signer),
320+
dial_frequency,
321+
peer_connection_cooldown: dial_frequency,
322+
allow_private_ips: true,
323+
},
324+
);
325+
326+
let (tracker_mailbox, mut tracker_rx) =
327+
UnboundedMailbox::<tracker::Message<PublicKey>>::new();
328+
let (supervisor, mut supervisor_rx) =
329+
Mailbox::<spawner::Message<_, _, PublicKey>>::new(100);
330+
context
331+
.with_label("supervisor")
332+
.spawn(|_| async move { while supervisor_rx.recv().await.is_some() {} });
333+
334+
let _handle = dialer.start(tracker_mailbox, supervisor);
335+
336+
// Tracker reports next_query_at=100ms, which is shorter than
337+
// dial_frequency=500ms. The dialer should clamp to dial_frequency,
338+
// so we only get 1 refresh in 350ms instead of 3-4.
339+
let mut refresh_count = 0;
340+
let deadline = context.current() + Duration::from_millis(350);
341+
loop {
342+
select! {
343+
msg = tracker_rx.recv() => if let Some(tracker::Message::Dialable { responder }) = msg {
344+
refresh_count += 1;
345+
let _ = responder.send(Dialable {
346+
peers: Vec::new(),
347+
next_query_at: Some(context.current() + Duration::from_millis(100)),
348+
});
349+
},
350+
_ = context.sleep_until(deadline) => break,
351+
}
352+
}
353+
354+
assert_eq!(
355+
refresh_count, 1,
356+
"expected 1 refresh (clamped to dial_frequency), got {}",
357+
refresh_count
358+
);
359+
});
360+
}
361+
362+
#[test]
363+
fn test_dialer_drains_queue_at_dial_frequency() {
364+
let executor = deterministic::Runner::timed(Duration::from_secs(10));
365+
executor.start(|context| async move {
366+
let signer = PrivateKey::from_seed(0);
367+
let dial_frequency = Duration::from_millis(100);
368+
369+
let dialer = Actor::new(
370+
context.with_label("dialer"),
371+
Config {
372+
stream_cfg: test_stream_config(signer),
373+
dial_frequency,
374+
peer_connection_cooldown: Duration::from_secs(60),
375+
allow_private_ips: true,
376+
},
377+
);
378+
379+
let (tracker_mailbox, mut tracker_rx) =
380+
UnboundedMailbox::<tracker::Message<PublicKey>>::new();
381+
382+
let (releaser_mailbox, _releaser_rx) =
383+
UnboundedMailbox::<tracker::Message<PublicKey>>::new();
384+
let releaser = Releaser::new(releaser_mailbox);
385+
386+
let peers: Vec<PublicKey> = (0..3)
387+
.map(|i| PrivateKey::from_seed(i).public_key())
388+
.collect();
389+
390+
let (supervisor, mut supervisor_rx) =
391+
Mailbox::<spawner::Message<_, _, PublicKey>>::new(100);
392+
context
393+
.with_label("supervisor")
394+
.spawn(|_| async move { while supervisor_rx.recv().await.is_some() {} });
395+
396+
let _handle = dialer.start(tracker_mailbox, supervisor);
397+
398+
let mut dial_count = 0;
399+
let deadline = context.current() + Duration::from_millis(250);
400+
loop {
401+
select! {
402+
msg = tracker_rx.recv() => match msg {
403+
Some(tracker::Message::Dialable { responder }) => {
404+
let _ = responder.send(Dialable {
405+
peers: peers.clone(),
406+
next_query_at: None,
407+
});
408+
}
409+
Some(tracker::Message::Dial {
410+
public_key,
411+
reservation,
412+
}) => {
413+
dial_count += 1;
414+
let ingress: Ingress =
415+
SocketAddr::new(Ipv4Addr::LOCALHOST.into(), 8000).into();
416+
let metadata = Metadata::Dialer(public_key, ingress);
417+
let res = tracker::Reservation::new(metadata, releaser.clone());
418+
let _ = reservation.send(Some(res));
419+
}
420+
_ => {}
421+
},
422+
_ = context.sleep_until(deadline) => break,
423+
}
424+
}
425+
426+
assert_eq!(
427+
dial_count, 3,
428+
"expected queued peers to drain at dial_frequency, got {} dials",
429+
dial_count
430+
);
431+
});
432+
}
433+
434+
#[test]
435+
fn test_dialer_does_not_panic_when_dial_frequency_exceeds_peer_connection_cooldown() {
436+
let executor = deterministic::Runner::timed(Duration::from_secs(10));
437+
executor.start(|context| async move {
438+
let signer = PrivateKey::from_seed(0);
439+
let dial_frequency = Duration::from_millis(200);
440+
441+
let dialer = Actor::new(
442+
context.with_label("dialer"),
443+
Config {
444+
stream_cfg: test_stream_config(signer),
445+
dial_frequency,
446+
peer_connection_cooldown: Duration::from_millis(50),
447+
allow_private_ips: true,
448+
},
449+
);
450+
451+
let (tracker_mailbox, mut tracker_rx) =
452+
UnboundedMailbox::<tracker::Message<PublicKey>>::new();
453+
let (supervisor, mut supervisor_rx) =
454+
Mailbox::<spawner::Message<_, _, PublicKey>>::new(100);
455+
context
456+
.with_label("supervisor")
457+
.spawn(|_| async move { while supervisor_rx.recv().await.is_some() {} });
458+
459+
let _handle = dialer.start(tracker_mailbox, supervisor);
460+
461+
let mut refresh_count = 0;
462+
let deadline = context.current() + Duration::from_millis(350);
463+
loop {
464+
select! {
465+
msg = tracker_rx.recv() => if let Some(tracker::Message::Dialable { responder }) = msg {
466+
refresh_count += 1;
467+
let _ = responder.send(Dialable {
468+
peers: Vec::new(),
469+
next_query_at: None,
470+
});
471+
},
472+
_ = context.sleep_until(deadline) => break,
473+
}
474+
}
475+
476+
assert_eq!(
477+
refresh_count, 2,
478+
"expected 2 refreshes at dial_frequency without panicking, got {}",
479+
refresh_count
480+
);
481+
});
482+
}
307483
}

0 commit comments

Comments
 (0)