Skip to content

Commit 57322ea

Browse files
committed
feat: state migration for TBF upon config change
1 parent 67a7f2d commit 57322ea

File tree

3 files changed

+132
-82
lines changed

3 files changed

+132
-82
lines changed

rattan-core/src/cells/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,7 @@ pub trait ControlInterface: Send + Sync + 'static {
459459
#[cfg(not(feature = "serde"))]
460460
type Config: Send;
461461
fn set_config(&self, config: Self::Config) -> Result<(), Error>;
462+
// TODO: add `set_config_at` to explicitly express the logical timestamp of config change.
462463
}
463464

464465
#[cfg(feature = "serde")]

rattan-core/src/cells/token_bucket/bucket.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ impl TokenBucket {
7777

7878
// Allows maintaining the tokens (in Bytes) in bucket during a config change.
7979
// Not used in current token_bucket implementation.
80-
#[allow(unused)]
8180
pub fn change_config(
8281
&mut self,
8382
burst_size: ByteSize,

rattan-core/src/cells/token_bucket/mod.rs

Lines changed: 131 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -66,35 +66,54 @@ where
6666
peak_token_bucket: Option<TokenBucket>,
6767
max_size: ByteSize,
6868
packet_queue: DropTailQueue<P>,
69-
config_rx: mpsc::UnboundedReceiver<TokenBucketCellConfig>,
69+
config_rx: mpsc::UnboundedReceiver<(Instant, TokenBucketCellConfig)>,
7070
timer: Timer,
7171
state: AtomicCellState,
7272
notify_rx: Option<tokio::sync::broadcast::Receiver<crate::control::RattanNotify>>,
7373
started: bool,
7474
logical_clock: Instant,
7575
}
7676

77+
fn update_token_bucket(
78+
old: &mut Option<TokenBucket>,
79+
burst_size: ByteSize,
80+
token_rate: Bandwidth,
81+
timestamp: Instant,
82+
) {
83+
if let Some(old) = old.as_mut() {
84+
// Update the old bucket.
85+
old.change_config(burst_size, token_rate, timestamp);
86+
} else {
87+
// Init a new bucket
88+
let _ = old.insert(TokenBucket::new(
89+
burst_size,
90+
token_rate,
91+
timestamp - LARGE_DURATION,
92+
));
93+
}
94+
}
95+
7796
impl<P> TokenBucketCellEgress<P>
7897
where
7998
P: Packet + Send,
8099
{
81100
fn set_config(&mut self, config: TokenBucketCellConfig, timestamp: Instant) {
82101
let mut new_max_size = ByteSize(u64::MAX);
83-
self.token_bucket =
84-
if let (Some(burst_size), Some(token_rate)) = (config.burst, config.rate) {
85-
new_max_size = new_max_size.min(burst_size);
86-
TokenBucket::new(burst_size, token_rate, timestamp - LARGE_DURATION).into()
87-
} else {
88-
None
89-
};
90102

91-
self.peak_token_bucket =
92-
if let (Some(burst_size), Some(token_rate)) = (config.minburst, config.peakrate) {
93-
new_max_size = new_max_size.min(burst_size);
94-
TokenBucket::new(burst_size, token_rate, timestamp - LARGE_DURATION).into()
95-
} else {
96-
None
97-
};
103+
if let (Some(burst_size), Some(token_rate)) = (config.burst, config.rate) {
104+
new_max_size = new_max_size.min(burst_size);
105+
update_token_bucket(&mut self.token_bucket, burst_size, token_rate, timestamp);
106+
};
107+
108+
if let (Some(burst_size), Some(token_rate)) = (config.minburst, config.peakrate) {
109+
new_max_size = new_max_size.min(burst_size);
110+
update_token_bucket(
111+
&mut self.peak_token_bucket,
112+
burst_size,
113+
token_rate,
114+
timestamp,
115+
);
116+
};
98117

99118
// calculate max_size
100119
let old_max_size = self.max_size;
@@ -171,6 +190,9 @@ where
171190
.as_mut()
172191
.map(|t| t.reserve(current_size)),
173192
) {
193+
// None: The token bucket is not present.
194+
// Some(None): The token bucket is present, but there is not suffcient token.
195+
// Some(Some(token)): The token bucket is present, and there is token to be consumed.
174196
(Some(None), _) | (_, Some(None)) => {
175197
// We have packets waiting to be sent, yet at least one token bucket is not ready
176198
debug!(
@@ -183,14 +205,14 @@ where
183205
self.update_available(Some(current_bytes));
184206
return None;
185207
}
186-
(Some(Some(token)), None) | (None, Some(Some(token))) => {
187-
token.consume();
188-
}
189-
(Some(Some(token_a)), Some(Some(token_b))) => {
190-
token_a.consume();
191-
token_b.consume();
208+
(token_a, token_b) => {
209+
if let Some(token) = token_a.flatten() {
210+
token.consume()
211+
};
212+
if let Some(token) = token_b.flatten() {
213+
token.consume()
214+
};
192215
}
193-
(None, None) => {}
194216
}
195217
// Send this packet!
196218
let mut current_packet = if let Some(head_packet) = self.packet_queue.dequeue() {
@@ -246,8 +268,8 @@ where
246268
while new_packet.is_none() {
247269
tokio::select! {
248270
biased;
249-
Some(config) = self.config_rx.recv() => {
250-
self.set_config(config, Instant::now());
271+
Some((timestamp, config)) = self.config_rx.recv() => {
272+
self.set_config(config, timestamp);
251273
}
252274
recv_packet = self.egress.recv() => {
253275
// return None if the channel is closed
@@ -352,60 +374,64 @@ impl TokenBucketCellConfig {
352374
}
353375

354376
pub struct TokenBucketCellControlInterface {
355-
config_tx: mpsc::UnboundedSender<TokenBucketCellConfig>,
377+
config_tx: mpsc::UnboundedSender<(Instant, TokenBucketCellConfig)>,
356378
}
357379

358-
impl ControlInterface for TokenBucketCellControlInterface {
359-
type Config = TokenBucketCellConfig;
360-
361-
fn set_config(&self, config: Self::Config) -> Result<(), Error> {
362-
// set_config of TokenBucketCellControlInterface (send config through tx)
363-
if (config.rate.is_none() && config.burst.is_some())
364-
|| (config.rate.is_some() && config.burst.is_none())
365-
{
380+
fn sanity_check(config: TokenBucketCellConfig) -> Result<TokenBucketCellConfig, Error> {
381+
if (config.rate.is_none() && config.burst.is_some())
382+
|| (config.rate.is_some() && config.burst.is_none())
383+
{
384+
return Err(Error::ConfigError(
385+
"rate and burst should be provided together".to_string(),
386+
));
387+
}
388+
if (config.peakrate.is_none() && config.minburst.is_some())
389+
|| (config.peakrate.is_some() && config.minburst.is_none())
390+
{
391+
return Err(Error::ConfigError(
392+
"peakrate and minburst should be provided together".to_string(),
393+
));
394+
}
395+
if let Some(rate) = config.rate {
396+
if rate.as_bps() == 0 {
366397
return Err(Error::ConfigError(
367-
"rate and burst should be provided together".to_string(),
398+
"rate must be greater than 0".to_string(),
368399
));
369400
}
370-
if (config.peakrate.is_none() && config.minburst.is_some())
371-
|| (config.peakrate.is_some() && config.minburst.is_none())
372-
{
401+
}
402+
if let Some(burst) = config.burst {
403+
if burst.as_u64() == 0 {
373404
return Err(Error::ConfigError(
374-
"peakrate and minburst should be provided together".to_string(),
405+
"burst must be greater than 0".to_string(),
375406
));
376407
}
377-
if let Some(rate) = config.rate {
378-
if rate.as_bps() == 0 {
379-
return Err(Error::ConfigError(
380-
"rate must be greater than 0".to_string(),
381-
));
382-
}
383-
}
384-
if let Some(burst) = config.burst {
385-
if burst.as_u64() == 0 {
386-
return Err(Error::ConfigError(
387-
"burst must be greater than 0".to_string(),
388-
));
389-
}
390-
}
391-
if let Some(peakrate) = config.peakrate {
392-
if peakrate.as_bps() == 0 {
393-
return Err(Error::ConfigError(
394-
"peakrate must be greater than 0".to_string(),
395-
));
396-
}
408+
}
409+
if let Some(peakrate) = config.peakrate {
410+
if peakrate.as_bps() == 0 {
411+
return Err(Error::ConfigError(
412+
"peakrate must be greater than 0".to_string(),
413+
));
397414
}
398-
if let Some(minburst) = config.minburst {
399-
if minburst.as_u64() == 0 {
400-
return Err(Error::ConfigError(
401-
"minburst must be greater than 0".to_string(),
402-
));
403-
}
415+
}
416+
if let Some(minburst) = config.minburst {
417+
if minburst.as_u64() == 0 {
418+
return Err(Error::ConfigError(
419+
"minburst must be greater than 0".to_string(),
420+
));
404421
}
422+
}
423+
Ok(config)
424+
}
425+
426+
impl ControlInterface for TokenBucketCellControlInterface {
427+
type Config = TokenBucketCellConfig;
428+
429+
fn set_config(&self, config: Self::Config) -> Result<(), Error> {
430+
// set_config of TokenBucketCellControlInterface (send config through tx)
431+
let config = sanity_check(config)?;
405432
self.config_tx
406-
.send(config)
407-
.map_err(|_| Error::ConfigError("Control channel is closed.".to_string()))?;
408-
Ok(())
433+
.send((Instant::now(), config))
434+
.map_err(|_| Error::ConfigError("Control channel is closed.".to_string()))
409435
}
410436
}
411437

@@ -615,6 +641,8 @@ mod tests {
615641
packet_size: usize,
616642
config: TokenBucketCellConfig,
617643
updated_config: TokenBucketCellConfig,
644+
update_time_since_start: Duration,
645+
packet_send_time_after_update: Duration,
618646
) -> Result<(Vec<Duration>, Vec<Duration>), Error> {
619647
let rt = tokio::runtime::Builder::new_current_thread()
620648
.enable_all()
@@ -640,6 +668,7 @@ mod tests {
640668
let mut measured_delays = Vec::new();
641669

642670
let logical_time = Instant::now() + Duration::from_millis(10);
671+
let update_time = logical_time + update_time_since_start;
643672

644673
rt.block_on(async {
645674
timer.sleep_until(logical_time).await.unwrap();
@@ -674,9 +703,12 @@ mod tests {
674703
measured_delays.push(actual_receive_time.duration_since(send_time));
675704
}
676705

677-
controller_interface.config_tx.send(updated_config).unwrap();
706+
controller_interface
707+
.config_tx
708+
.send((update_time, updated_config))
709+
.unwrap();
678710

679-
let logical_time = Instant::now() + Duration::from_millis(10);
711+
let logical_time = logical_time + packet_send_time_after_update;
680712

681713
rt.block_on(async {
682714
timer.sleep_until(logical_time).await.unwrap();
@@ -867,8 +899,14 @@ mod tests {
867899
);
868900

869901
// 256 Bytes L3 length
870-
let (measured_delays, logical_delays) =
871-
get_delays_with_update(4, 256 + 14, config, updated_config)?;
902+
let (measured_delays, logical_delays) = get_delays_with_update(
903+
4,
904+
256 + 14,
905+
config,
906+
updated_config,
907+
Duration::from_millis(1100),
908+
Duration::from_millis(1140),
909+
)?;
872910

873911
// In this test, rate and burst is the bottleneck
874912
//
@@ -877,14 +915,17 @@ mod tests {
877915
// Each packet consumed token that needs 1s to refill after the config change.
878916
compare_delays(
879917
vec![
918+
// Start from an fully filled bucket, thus the first two packets is not delayed.
880919
Duration::ZERO,
881920
Duration::ZERO,
882921
Duration::from_millis(500),
883922
Duration::from_millis(1000),
884-
Duration::ZERO,
885-
Duration::ZERO,
886-
Duration::from_millis(1000),
887-
Duration::from_millis(2000),
923+
// Config change happens 100ms after 4th packet was sent, and 40ms before the 5th packet was sent.
924+
// token when the 5th packet arrives: 100ms / 500ms/pkt + 40ms / 1s/pkt = 0.24 pkt, or 240ms at 1s/pkt.
925+
Duration::from_millis(1000 - 240),
926+
Duration::from_millis(2000 - 240),
927+
Duration::from_millis(3000 - 240),
928+
Duration::from_millis(4000 - 240),
888929
],
889930
measured_delays,
890931
logical_delays,
@@ -914,8 +955,14 @@ mod tests {
914955
None,
915956
);
916957
// 256 Bytes L3 length
917-
let (measured_delays, logical_delays) =
918-
get_delays_with_update(4, 256 + 14, config, updated_config)?;
958+
let (measured_delays, logical_delays) = get_delays_with_update(
959+
4,
960+
256 + 14,
961+
config,
962+
updated_config,
963+
Duration::from_millis(3100),
964+
Duration::from_millis(3140),
965+
)?;
919966

920967
// In this test, rate and burst is the bottleneck
921968
//
@@ -924,14 +971,17 @@ mod tests {
924971
// Each packet consumed token that needs 500ms to refill after the config change.
925972
compare_delays(
926973
vec![
974+
// Start from an fully filled bucket, thus the first two packets is not delayed.
927975
Duration::ZERO,
928976
Duration::from_millis(1000),
929977
Duration::from_millis(2000),
930978
Duration::from_millis(3000),
931-
Duration::ZERO,
932-
Duration::from_millis(500),
933-
Duration::from_millis(1000),
934-
Duration::from_millis(1500),
979+
// Config change happens 100ms after 4th packet was sent, and 40ms before the 5th packet was sent.
980+
// token when the 5th packet arrives: 100ms / 1s/pkt + 40ms / 500ms/pkt = 0.18 pkt, or 90ms at 500ms/pkt.
981+
Duration::from_millis(500 - 90),
982+
Duration::from_millis(1000 - 90),
983+
Duration::from_millis(1500 - 90),
984+
Duration::from_millis(2000 - 90),
935985
],
936986
measured_delays,
937987
logical_delays,

0 commit comments

Comments
 (0)