Skip to content

Commit fe5da8a

Browse files
committed
modified Pacer such that congestion controller metrics can be used to adjust the burst size and the overall pacing rate of the connection
1 parent 17a4406 commit fe5da8a

6 files changed

Lines changed: 111 additions & 16 deletions

File tree

quinn-proto/src/congestion.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,14 @@ pub trait Controller: Send + Sync {
8585
fn window(&self) -> u64;
8686

8787
/// Retrieve implementation-specific metrics used to populate `qlog` traces when they are enabled
88+
/// This is also used to alter the pacing of the connection with
89+
/// `pacing_rate` and `send_quantum`
8890
fn metrics(&self) -> ControllerMetrics {
8991
ControllerMetrics {
9092
congestion_window: self.window(),
9193
ssthresh: None,
9294
pacing_rate: None,
95+
send_quantum: None,
9396
}
9497
}
9598

@@ -103,16 +106,20 @@ pub trait Controller: Send + Sync {
103106
fn into_any(self: Box<Self>) -> Box<dyn Any>;
104107
}
105108

106-
/// Common congestion controller metrics
109+
/// Common congestion controller metrics used both for logging purposes
110+
/// but also to alter the pacing of the connection with
111+
/// `pacing_rate` and `send_quantum`
107112
#[derive(Default)]
108113
#[non_exhaustive]
109114
pub struct ControllerMetrics {
110115
/// Congestion window (bytes)
111116
pub congestion_window: u64,
112117
/// Slow start threshold (bytes)
113118
pub ssthresh: Option<u64>,
114-
/// Pacing rate (bits/s)
119+
/// Pacing rate (bytes/s)
115120
pub pacing_rate: Option<u64>,
121+
/// Send Quantum (bytes) used to control the size of packet bursts
122+
pub send_quantum: Option<u64>,
116123
}
117124

118125
/// Constructs controllers on demand

quinn-proto/src/congestion/bbr/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,8 @@ impl Controller for Bbr {
497497
ControllerMetrics {
498498
congestion_window: self.window(),
499499
ssthresh: None,
500-
pacing_rate: Some(self.pacing_rate * 8),
500+
pacing_rate: Some(self.pacing_rate),
501+
send_quantum: None,
501502
}
502503
}
503504

quinn-proto/src/congestion/cubic.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,7 @@ impl Controller for Cubic {
254254
congestion_window: self.window(),
255255
ssthresh: Some(self.state.ssthresh),
256256
pacing_rate: None,
257+
send_quantum: None,
257258
}
258259
}
259260

quinn-proto/src/congestion/new_reno.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ impl Controller for NewReno {
120120
congestion_window: self.window(),
121121
ssthresh: Some(self.ssthresh),
122122
pacing_rate: None,
123+
send_quantum: None,
123124
}
124125
}
125126

quinn-proto/src/connection/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -615,12 +615,15 @@ impl Connection {
615615

616616
// Check whether the next datagram is blocked by pacing
617617
let smoothed_rtt = self.path.rtt.get();
618+
let controller_metrics = self.path.congestion.metrics();
618619
if let Some(delay) = self.path.pacing.delay(
619620
smoothed_rtt,
620621
bytes_to_send,
621622
self.path.current_mtu(),
622623
self.path.congestion.window(),
623624
now,
625+
controller_metrics.send_quantum,
626+
controller_metrics.pacing_rate,
624627
) {
625628
self.timers.set(Timer::Pacing, delay);
626629
congestion_blocked = true;

quinn-proto/src/connection/pacing.rs

Lines changed: 95 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,22 @@ impl Pacer {
4545
///
4646
/// The 5/4 ratio used here comes from the suggestion that N = 1.25 in the draft IETF RFC for
4747
/// QUIC.
48+
/// `capacity` in bytes is used to cap the number of bytes sent at once.
49+
/// It can be leveraged in congestion control to regulate the maximum size of transmission aggregates.
50+
/// e.g: <https://www.ietf.org/archive/id/draft-ietf-ccwg-bbr-04.html#name-send-quantum-csend_quantum>
51+
///
52+
/// `pacing_rate` in bytes/sec is used to pace the connection and control the upper limit of how fast
53+
/// we're sending data it can be leveraged in congestion control
54+
/// e.g: <https://www.ietf.org/archive/id/draft-ietf-ccwg-bbr-04.html#name-pacing-rate-cpacing_rate>
4855
pub(super) fn delay(
4956
&mut self,
5057
smoothed_rtt: Duration,
5158
bytes_to_send: u64,
5259
mtu: u16,
5360
window: u64,
5461
now: Instant,
62+
capacity: Option<u64>,
63+
pacing_rate: Option<u64>,
5564
) -> Option<Instant> {
5665
debug_assert_ne!(
5766
window, 0,
@@ -61,12 +70,29 @@ impl Pacer {
6170
if window != self.last_window || mtu != self.last_mtu {
6271
self.capacity = optimal_capacity(smoothed_rtt, window, mtu);
6372

64-
// Clamp the tokens
73+
// here we cap the number of bytes sent at once during a burst
6574
self.tokens = self.capacity.min(self.tokens);
6675
self.last_window = window;
6776
self.last_mtu = mtu;
6877
}
6978

79+
if let Some(capacity) = capacity {
80+
self.capacity = capacity;
81+
// here we cap the number of bytes sent at once during a burst
82+
self.tokens = self.capacity.min(self.tokens);
83+
}
84+
85+
if let Some(pacing_rate) = pacing_rate {
86+
// if the bytes to send are below or equal to our maximum burst size there is no need for delay
87+
if bytes_to_send <= self.capacity {
88+
return None;
89+
}
90+
let capped_bytes_to_send = bytes_to_send.max(self.capacity);
91+
// otherwise we calculate the delay such that we send at pacing_rate
92+
let delay = Duration::from_secs_f64(capped_bytes_to_send as f64 / pacing_rate as f64);
93+
return Some(now + delay);
94+
}
95+
7096
// if we can already send a packet, there is no need for delay
7197
if self.tokens >= bytes_to_send {
7298
return None;
@@ -174,17 +200,41 @@ mod tests {
174200

175201
assert!(
176202
Pacer::new(rtt, 30000, 1500, new_instant)
177-
.delay(Duration::from_micros(0), 0, 1500, 1, old_instant)
203+
.delay(
204+
Duration::from_micros(0),
205+
0,
206+
1500,
207+
1,
208+
old_instant,
209+
None,
210+
None
211+
)
178212
.is_none()
179213
);
180214
assert!(
181215
Pacer::new(rtt, 30000, 1500, new_instant)
182-
.delay(Duration::from_micros(0), 1600, 1500, 1, old_instant)
216+
.delay(
217+
Duration::from_micros(0),
218+
1600,
219+
1500,
220+
1,
221+
old_instant,
222+
None,
223+
None
224+
)
183225
.is_none()
184226
);
185227
assert!(
186228
Pacer::new(rtt, 30000, 1500, new_instant)
187-
.delay(Duration::from_micros(0), 1500, 1500, 3000, old_instant)
229+
.delay(
230+
Duration::from_micros(0),
231+
1500,
232+
1500,
233+
3000,
234+
old_instant,
235+
None,
236+
None
237+
)
188238
.is_none()
189239
);
190240
}
@@ -227,27 +277,27 @@ mod tests {
227277
assert_eq!(pacer.tokens, pacer.capacity);
228278
let initial_tokens = pacer.tokens;
229279

230-
pacer.delay(rtt, mtu as u64, mtu, window * 2, now);
280+
pacer.delay(rtt, mtu as u64, mtu, window * 2, now, None, None);
231281
assert_eq!(
232282
pacer.capacity,
233283
(2 * window as u128 * TARGET_BURST_INTERVAL.as_nanos() / rtt.as_nanos()) as u64
234284
);
235285
assert_eq!(pacer.tokens, initial_tokens);
236286

237-
pacer.delay(rtt, mtu as u64, mtu, window / 2, now);
287+
pacer.delay(rtt, mtu as u64, mtu, window / 2, now, None, None);
238288
assert_eq!(
239289
pacer.capacity,
240290
(window as u128 / 2 * TARGET_BURST_INTERVAL.as_nanos() / rtt.as_nanos()) as u64
241291
);
242292
assert_eq!(pacer.tokens, initial_tokens / 2);
243293

244-
pacer.delay(rtt, mtu as u64, mtu * 2, window, now);
294+
pacer.delay(rtt, mtu as u64, mtu * 2, window, now, None, None);
245295
assert_eq!(
246296
pacer.capacity,
247297
(window as u128 * TARGET_BURST_INTERVAL.as_nanos() / rtt.as_nanos()) as u64
248298
);
249299

250-
pacer.delay(rtt, mtu as u64, 20_000, window, now);
300+
pacer.delay(rtt, mtu as u64, 20_000, window, now, None, None);
251301
assert_eq!(pacer.capacity, 20_000_u64 * MIN_BURST_SIZE);
252302
}
253303

@@ -263,7 +313,7 @@ mod tests {
263313

264314
for _ in 0..packet_capacity {
265315
assert_eq!(
266-
pacer.delay(rtt, mtu as u64, mtu, window, old_instant),
316+
pacer.delay(rtt, mtu as u64, mtu, window, old_instant, None, None),
267317
None,
268318
"When capacity is available packets should be sent immediately"
269319
);
@@ -274,7 +324,7 @@ mod tests {
274324
let pace_duration = Duration::from_nanos((TARGET_BURST_INTERVAL.as_nanos() * 4 / 5) as u64);
275325

276326
let actual_delay = pacer
277-
.delay(rtt, mtu as u64, mtu, window, old_instant)
327+
.delay(rtt, mtu as u64, mtu, window, old_instant, None, None)
278328
.expect("Send must be delayed")
279329
.duration_since(old_instant);
280330

@@ -292,15 +342,17 @@ mod tests {
292342
mtu as u64,
293343
mtu,
294344
window,
295-
old_instant + pace_duration / 2
345+
old_instant + pace_duration / 2,
346+
None,
347+
None
296348
),
297349
None
298350
);
299351
assert_eq!(pacer.tokens, pacer.capacity / 2);
300352

301353
for _ in 0..packet_capacity / 2 {
302354
assert_eq!(
303-
pacer.delay(rtt, mtu as u64, mtu, window, old_instant),
355+
pacer.delay(rtt, mtu as u64, mtu, window, old_instant, None, None),
304356
None,
305357
"When capacity is available packets should be sent immediately"
306358
);
@@ -315,10 +367,40 @@ mod tests {
315367
mtu as u64,
316368
mtu,
317369
window,
318-
old_instant + pace_duration * 3 / 2
370+
old_instant + pace_duration * 3 / 2,
371+
None,
372+
None
319373
),
320374
None
321375
);
322376
assert_eq!(pacer.tokens, pacer.capacity);
323377
}
378+
379+
#[test]
380+
fn adjusts_capacity_from_congestion() {
381+
let window = 2_000_000;
382+
let mtu = 1500;
383+
let rtt = Duration::from_millis(50);
384+
let now = Instant::now();
385+
let send_quantum = Some(20000);
386+
let mut pacer = Pacer::new(rtt, window, mtu, now);
387+
pacer.delay(rtt, mtu as u64, mtu, window * 2, now, send_quantum, None);
388+
assert_eq!(pacer.capacity, 20000);
389+
}
390+
391+
#[test]
392+
fn adjusts_pacing_from_congestion_metrics() {
393+
let window = 2_000_000;
394+
let mtu = 1500;
395+
let rtt = Duration::from_millis(50);
396+
let now = Instant::now();
397+
let send_quantum = Some(20000);
398+
let pacing_rate = Some(200_000_000); //200 MB/s in bytes/s
399+
let mut pacer = Pacer::new(rtt, window, mtu, now);
400+
// 40_000 B / 200_000_000 MB/s = 0.0002 sec = 200 microseconds
401+
assert_eq!(
402+
pacer.delay(rtt, 40000, mtu, window, now, send_quantum, pacing_rate),
403+
Some(now + Duration::from_micros(200))
404+
)
405+
}
324406
}

0 commit comments

Comments
 (0)