Skip to content

Commit 372bfb7

Browse files
committed
feat: timestamp-based queue
1 parent 11a6ae8 commit 372bfb7

File tree

3 files changed

+164
-37
lines changed

3 files changed

+164
-37
lines changed

rattan-core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ bytesize = "2.0.1"
3030
camellia = { git = "https://github.com/minhuw/camellia.git", optional = true }
3131
clap = { workspace = true }
3232
csv = { workspace = true }
33-
derive_more = { workspace = true}
33+
derive_more = { workspace = true }
3434
dyn-clone = "1.0"
3535
etherparse = "0.18.0"
3636
figment = { workspace = true }

rattan-core/src/cells/bandwidth/mod.rs

Lines changed: 63 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,15 @@ use netem_trace::{model::BwTraceConfig, Bandwidth, BwTrace, Delay};
66
#[cfg(feature = "serde")]
77
use serde::{Deserialize, Serialize};
88
use tokio::sync::mpsc;
9+
use tokio::sync::mpsc::error::TryRecvError;
910
use tokio::time::Instant;
1011
use tracing::{debug, info, trace};
1112

1213
use super::{
1314
AtomicCellState, Cell, CellState, ControlInterface, CurrentConfig, Egress, Ingress, Packet,
1415
LARGE_DURATION, TRACE_START_INSTANT,
1516
};
17+
use crate::cells::bandwidth::queue::AQM;
1618
use crate::error::Error;
1719
use crate::metal::timer::Timer;
1820

@@ -91,7 +93,7 @@ where
9193
egress: mpsc::UnboundedReceiver<P>,
9294
bw_type: BwType,
9395
bandwidth: Bandwidth,
94-
packet_queue: Q,
96+
packet_queue: AQM<Q, P>,
9597
next_available: Instant,
9698
config_rx: mpsc::UnboundedReceiver<BwCellConfig<P, Q>>,
9799
timer: Timer,
@@ -143,6 +145,7 @@ where
143145
/// queue and thus being returnd, this function returns `Some`. When this happens, the caller
144146
/// should send out the packet immediately.
145147
#[inline(always)]
148+
#[must_use]
146149
fn enqueue_packet(&mut self, new_packet: P) -> Option<P> {
147150
if let Some(packet_to_drop) = self.packet_queue.enqueue(new_packet) {
148151
let timestamp = packet_to_drop.get_timestamp();
@@ -155,7 +158,6 @@ where
155158
None
156159
}
157160
}
158-
159161
#[async_trait]
160162
impl<P, Q> Egress<P> for BwCellEgress<P, Q>
161163
where
@@ -187,7 +189,29 @@ where
187189
}
188190
}
189191

190-
let mut packet = self.packet_queue.dequeue();
192+
while self.packet_queue.need_more_packets(self.next_available) {
193+
match self.egress.try_recv() {
194+
Ok(new_packet) => {
195+
let new_packet = crate::check_cell_state!(self.state, new_packet);
196+
if let Some(packet) = self.enqueue_packet(new_packet) {
197+
return Some(packet);
198+
}
199+
}
200+
Err(TryRecvError::Empty) => {
201+
break;
202+
}
203+
Err(TryRecvError::Disconnected) => {
204+
return None;
205+
}
206+
}
207+
}
208+
// Here, either:
209+
// 1) No more packets can be retrived from egress, or
210+
// 2) A packet, that should enter the queue after `self.next_available` is seen.
211+
// Thus the `dequeue_at()` see a correct queue, containing any packet that should
212+
// enter the AQM at `self.next_available`.
213+
let mut packet = self.packet_queue.dequeue_at(self.next_available);
214+
191215
while packet.is_none() {
192216
// the queue is empty, wait for the next packet
193217
tokio::select! {
@@ -198,10 +222,11 @@ where
198222
// `new_packet` can be None only if `self.egress` is closed.
199223
new_packet = self.egress.recv() => {
200224
let new_packet = crate::check_cell_state!(self.state, new_packet?);
225+
let timestamp = new_packet.get_timestamp();
201226
if let Some(packet) = self.enqueue_packet(new_packet){
202227
return Some(packet);
203228
}
204-
packet = self.packet_queue.dequeue();
229+
packet = self.packet_queue.dequeue_at(timestamp);
205230
}
206231
}
207232
}
@@ -377,7 +402,7 @@ where
377402
egress: tx,
378403
bw_type: bw_type.into().unwrap_or_default(),
379404
bandwidth: bandwidth.into().unwrap_or(MAX_BANDWIDTH),
380-
packet_queue,
405+
packet_queue: AQM::new(packet_queue),
381406
next_available: Instant::now(),
382407
config_rx,
383408
timer: Timer::new()?,
@@ -400,7 +425,7 @@ where
400425
egress: mpsc::UnboundedReceiver<P>,
401426
bw_type: BwType,
402427
trace: Box<dyn BwTrace>,
403-
packet_queue: Q,
428+
packet_queue: AQM<Q, P>,
404429
current_bandwidth: CurrentConfig<Bandwidth>,
405430
next_available: Instant,
406431
next_change: Instant,
@@ -478,11 +503,12 @@ where
478503
if let Some((bandwidth, duration)) = self.trace.next_bw() {
479504
self.change_bandwidth(bandwidth, change_time);
480505
self.next_change = change_time + duration;
481-
#[cfg(test)]
482-
trace!(
483-
"Bandwidth changed to {:?}, next change after {:?}",
506+
// #[cfg(test)]
507+
eprintln!(
508+
"Bandwidth changed to {:?}, next change after {:?}. now {:?}",
484509
bandwidth,
485-
self.next_change - Instant::now()
510+
self.next_change - Instant::now(),
511+
Instant::now(),
486512
);
487513
true
488514
} else {
@@ -556,7 +582,29 @@ where
556582
}
557583
}
558584

559-
let mut packet = self.packet_queue.dequeue();
585+
while self.packet_queue.need_more_packets(self.next_available) {
586+
match self.egress.try_recv() {
587+
Ok(new_packet) => {
588+
let new_packet = crate::check_cell_state!(self.state, new_packet);
589+
if let Some(packet) = self.enqueue_packet(new_packet) {
590+
return Some(packet);
591+
}
592+
}
593+
Err(TryRecvError::Empty) => {
594+
break;
595+
}
596+
Err(TryRecvError::Disconnected) => {
597+
return None;
598+
}
599+
}
600+
}
601+
602+
// Here, either:
603+
// 1) No more packets can be retrived from egress, or
604+
// 2) A packet, that should enter the queue after `self.next_available` is seen.
605+
// Thus the `dequeue_at()` see a correct queue, containing any packet that should
606+
// enter the AQM at `self.next_available`.
607+
let mut packet = self.packet_queue.dequeue_at(self.next_available);
560608

561609
while packet.is_none() {
562610
// the queue is empty, wait for the next packet
@@ -571,10 +619,11 @@ where
571619
// `new_packet` can be None only if `self.egress` is closed.
572620
new_packet = self.egress.recv() => {
573621
let new_packet = crate::check_cell_state!(self.state, new_packet?);
622+
let timestamp = new_packet.get_timestamp();
574623
if let Some(packet) = self.enqueue_packet(new_packet){
575624
return Some(packet);
576625
}
577-
packet = self.packet_queue.dequeue();
626+
packet = self.packet_queue.dequeue_at(timestamp);
578627
}
579628
}
580629
}
@@ -606,6 +655,7 @@ where
606655
fn reset(&mut self) {
607656
self.next_available = *TRACE_START_INSTANT.get_or_init(Instant::now);
608657
self.next_change = *TRACE_START_INSTANT.get_or_init(Instant::now);
658+
eprintln!("Reset to {:?}", self.next_change);
609659
}
610660

611661
fn change_state(&self, state: CellState) {
@@ -770,7 +820,7 @@ where
770820
egress: tx,
771821
bw_type: bw_type.into().unwrap_or_default(),
772822
trace,
773-
packet_queue,
823+
packet_queue: AQM::new(packet_queue),
774824
current_bandwidth: CurrentConfig::default(),
775825
next_available: Instant::now(),
776826
next_change: Instant::now(),

rattan-core/src/cells/bandwidth/queue.rs

Lines changed: 100 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@ use std::fmt::Debug;
33

44
#[cfg(feature = "serde")]
55
use serde::{Deserialize, Serialize};
6-
use tokio::time::{Duration, Instant};
6+
use tokio::{
7+
sync::mpsc::{error::TryRecvError, UnboundedReceiver},
8+
time::{Duration, Instant},
9+
};
710
use tracing::{debug, trace};
811

912
use super::BwType;
@@ -14,6 +17,83 @@ fn serde_default<T: Default + PartialEq>(t: &T) -> bool {
1417
*t == Default::default()
1518
}
1619

20+
pub enum PacketInboundTryReceiveError {
21+
Empty,
22+
Failed,
23+
}
24+
25+
pub trait PacketInbound<P> {
26+
fn try_receive(&mut self) -> Result<P, PacketInboundTryReceiveError>;
27+
}
28+
29+
impl<P> PacketInbound<P> for UnboundedReceiver<P> {
30+
fn try_receive(&mut self) -> Result<P, PacketInboundTryReceiveError> {
31+
self.try_recv().map_err(|e| match e {
32+
TryRecvError::Empty => PacketInboundTryReceiveError::Empty,
33+
TryRecvError::Disconnected => PacketInboundTryReceiveError::Failed,
34+
})
35+
}
36+
}
37+
38+
pub struct AQM<Q, P>
39+
where
40+
Q: PacketQueue<P>,
41+
P: Packet,
42+
{
43+
inbound_buffer: VecDeque<P>,
44+
queue: Q,
45+
latest_enqueue_timestamp: Option<Instant>,
46+
}
47+
48+
impl<Q, P> AQM<Q, P>
49+
where
50+
Q: PacketQueue<P>,
51+
P: Packet,
52+
{
53+
pub fn new(queue: Q) -> Self {
54+
Self {
55+
inbound_buffer: VecDeque::with_capacity(1024),
56+
queue,
57+
latest_enqueue_timestamp: None,
58+
}
59+
}
60+
61+
pub fn configure(&mut self, config: Q::Config) {
62+
self.queue.configure(config);
63+
}
64+
65+
/// If this returns true, the caller should try to enqueue more packets.
66+
pub fn need_more_packets(&self, next_available: Instant) -> bool {
67+
self.latest_enqueue_timestamp
68+
.is_none_or(|t| t <= next_available)
69+
}
70+
71+
/// Return the packet immediately, if the inner queue is zero-buffered.
72+
pub fn enqueue(&mut self, packet: P) -> Option<P> {
73+
self.latest_enqueue_timestamp = packet.get_timestamp().into();
74+
if self.queue.is_zero_buffer() {
75+
packet.into()
76+
} else {
77+
self.inbound_buffer.push_back(packet);
78+
None
79+
}
80+
}
81+
82+
/// The caller ensures that:
83+
/// 1) This function is not called before the `timestamp` here.
84+
/// 2) The timestamp should be non-decending.
85+
pub fn dequeue_at(&mut self, timestamp: Instant) -> Option<P> {
86+
while let Some(head) = self.inbound_buffer.front() {
87+
if head.get_timestamp() <= timestamp {
88+
self.queue.enqueue(self.inbound_buffer.pop_front().unwrap());
89+
} else {
90+
break;
91+
}
92+
}
93+
self.queue.dequeue()
94+
}
95+
}
96+
1797
pub trait PacketQueue<P>: Send
1898
where
1999
P: Packet,
@@ -25,14 +105,18 @@ where
25105

26106
fn configure(&mut self, config: Self::Config);
27107

28-
/// Returns the packet if there is not space for it.
29-
fn enqueue(&mut self, packet: P) -> Option<P>;
108+
fn enqueue(&mut self, packet: P);
30109

31110
// If the queue is empty, return `None`
32111
fn dequeue(&mut self) -> Option<P>;
33112

34113
fn is_empty(&self) -> bool;
35114

115+
// Returns if the buffer is zero-sized.
116+
fn is_zero_buffer(&self) -> bool {
117+
false
118+
}
119+
36120
// How this queue measures the size of a packet.
37121
// Should return 0 if it measures the size of a packet based on its L3 size.
38122
// Should return 14 if it measures that based on its L2 size (L3 length + 14 bytes L2 header).
@@ -102,9 +186,8 @@ where
102186

103187
fn configure(&mut self, _config: Self::Config) {}
104188

105-
fn enqueue(&mut self, packet: P) -> Option<P> {
189+
fn enqueue(&mut self, packet: P) {
106190
self.queue.push_back(packet);
107-
None
108191
}
109192

110193
fn dequeue(&mut self) -> Option<P> {
@@ -212,13 +295,12 @@ where
212295
self.bw_type = config.bw_type;
213296
}
214297

215-
fn enqueue(&mut self, packet: P) -> Option<P> {
216-
if self.packet_limit.is_some_and(|limit| limit == 0)
298+
fn is_zero_buffer(&self) -> bool {
299+
self.packet_limit.is_some_and(|limit| limit == 0)
217300
|| self.byte_limit.is_some_and(|limit| limit == 0)
218-
{
219-
return packet.into();
220-
}
301+
}
221302

303+
fn enqueue(&mut self, packet: P) {
222304
if self
223305
.packet_limit
224306
.is_none_or(|limit| self.queue.len() < limit)
@@ -237,7 +319,6 @@ where
237319
"Drop packet(l3_len: {}, extra_len: {}) when enqueue", packet.l3_length(), self.bw_type.extra_length()
238320
);
239321
}
240-
None
241322
}
242323

243324
fn dequeue(&mut self) -> Option<P> {
@@ -351,13 +432,12 @@ where
351432
self.bw_type = config.bw_type;
352433
}
353434

354-
fn enqueue(&mut self, packet: P) -> Option<P> {
355-
if self.packet_limit.is_some_and(|limit| limit == 0)
435+
fn is_zero_buffer(&self) -> bool {
436+
self.packet_limit.is_some_and(|limit| limit == 0)
356437
|| self.byte_limit.is_some_and(|limit| limit == 0)
357-
{
358-
return packet.into();
359-
}
438+
}
360439

440+
fn enqueue(&mut self, packet: P) {
361441
self.now_bytes += packet.l3_length() + self.bw_type.extra_length();
362442
self.queue.push_back(packet);
363443
while self
@@ -374,7 +454,6 @@ where
374454
"Drop packet(l3_len: {}, extra_len: {}) when enqueue another packet", _packet.l3_length(), self.bw_type.extra_length()
375455
);
376456
}
377-
None
378457
}
379458

380459
fn dequeue(&mut self) -> Option<P> {
@@ -554,13 +633,12 @@ where
554633
self.config = config;
555634
}
556635

557-
fn enqueue(&mut self, packet: P) -> Option<P> {
558-
if self.config.packet_limit.is_some_and(|limit| limit == 0)
636+
fn is_zero_buffer(&self) -> bool {
637+
self.config.packet_limit.is_some_and(|limit| limit == 0)
559638
|| self.config.byte_limit.is_some_and(|limit| limit == 0)
560-
{
561-
return packet.into();
562-
}
639+
}
563640

641+
fn enqueue(&mut self, packet: P) {
564642
if self
565643
.config
566644
.packet_limit
@@ -582,7 +660,6 @@ where
582660
self.config.bw_type.extra_length()
583661
);
584662
}
585-
None
586663
}
587664

588665
fn dequeue(&mut self) -> Option<P> {

0 commit comments

Comments
 (0)