Skip to content

Commit f3fcf58

Browse files
committed
fix: config_rx of Loss[Replay]Cell
1 parent 57322ea commit f3fcf58

File tree

1 file changed

+30
-46
lines changed

1 file changed

+30
-46
lines changed

rattan-core/src/cells/loss.rs

Lines changed: 30 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -54,27 +54,17 @@ where
5454
R: Rng,
5555
{
5656
egress: mpsc::UnboundedReceiver<P>,
57-
pattern: Arc<AtomicRawCell<LossPattern>>,
58-
inner_pattern: Box<LossPattern>,
57+
/// This `Arc` is shared with the `LossCellControlInterface`.
58+
pattern_to_set: Arc<AtomicRawCell<LossPattern>>,
59+
pattern_in_use: Box<LossPattern>,
5960
/// How many packets have been lost consecutively
6061
prev_loss: usize,
6162
rng: R,
6263
state: AtomicCellState,
63-
config_rx: mpsc::UnboundedReceiver<LossCellConfig>,
6464
notify_rx: Option<tokio::sync::broadcast::Receiver<crate::control::RattanNotify>>,
6565
started: bool,
6666
}
6767

68-
impl<P, R> LossCellEgress<P, R>
69-
where
70-
P: Packet + Send + Sync,
71-
R: Rng + Send + Sync,
72-
{
73-
fn set_config(&mut self, config: LossCellConfig) {
74-
*self.inner_pattern = config.pattern;
75-
}
76-
}
77-
7868
#[async_trait]
7969
impl<P, R> Egress<P> for LossCellEgress<P, R>
8070
where
@@ -87,18 +77,17 @@ where
8777

8878
// It could be None only if the other end of the channel has closed.
8979
let packet = self.egress.recv().await?;
90-
while let Ok(config) = self.config_rx.try_recv() {
91-
self.set_config(config);
92-
}
93-
9480
let packet = crate::check_cell_state!(self.state, packet);
95-
if let Some(pattern) = self.pattern.swap_null() {
96-
self.inner_pattern = pattern;
97-
debug!(?self.inner_pattern, "Set inner pattern:");
81+
82+
// Try to update the config.
83+
if let Some(pattern) = self.pattern_to_set.swap_null() {
84+
self.pattern_in_use = pattern;
85+
debug!(?self.pattern_in_use, "Set inner pattern:");
9886
}
99-
let loss_rate = match self.inner_pattern.get(self.prev_loss) {
87+
88+
let loss_rate = match self.pattern_in_use.get(self.prev_loss) {
10089
Some(&loss_rate) => loss_rate,
101-
None => *self.inner_pattern.last().unwrap_or(&0.0),
90+
None => *self.pattern_in_use.last().unwrap_or(&0.0),
10291
};
10392
let rand_num = self.rng.random_range(0.0..1.0);
10493
if rand_num < loss_rate {
@@ -142,17 +131,15 @@ impl LossCellConfig {
142131
}
143132

144133
pub struct LossCellControlInterface {
145-
config_tx: mpsc::UnboundedSender<LossCellConfig>,
134+
pattern_to_set: Arc<AtomicRawCell<LossPattern>>,
146135
}
147136

148137
impl ControlInterface for LossCellControlInterface {
149138
type Config = LossCellConfig;
150139

151140
fn set_config(&self, config: Self::Config) -> Result<(), Error> {
152141
info!("Setting loss pattern to: {:?}", config.pattern);
153-
self.config_tx
154-
.send(config)
155-
.map_err(|_| Error::ConfigError("Control channel is closed.".to_string()))?;
142+
self.pattern_to_set.store(Box::new(config.pattern));
156143
Ok(())
157144
}
158145
}
@@ -198,22 +185,20 @@ where
198185
let pattern = pattern.into();
199186
debug!(?pattern, "New LossCell");
200187
let (rx, tx) = mpsc::unbounded_channel();
201-
let (config_tx, config_rx) = mpsc::unbounded_channel();
202-
let pattern = Arc::new(AtomicRawCell::new(Box::new(pattern)));
188+
let pattern_to_set = Arc::new(AtomicRawCell::new(Box::new(pattern)));
203189
Ok(LossCell {
204190
ingress: Arc::new(LossCellIngress { ingress: rx }),
205191
egress: LossCellEgress {
206192
egress: tx,
207-
pattern: Arc::clone(&pattern),
208-
inner_pattern: Box::default(),
193+
pattern_to_set: Arc::clone(&pattern_to_set),
194+
pattern_in_use: Box::default(),
209195
prev_loss: 0,
210196
rng,
211197
state: AtomicCellState::new(CellState::Drop),
212198
notify_rx: None,
213-
config_rx,
214199
started: false,
215200
},
216-
control_interface: Arc::new(LossCellControlInterface { config_tx }),
201+
control_interface: Arc::new(LossCellControlInterface { pattern_to_set }),
217202
})
218203
}
219204
}
@@ -229,7 +214,7 @@ where
229214
trace: Box<dyn LossTrace>,
230215
current_loss_pattern: TimedConfig<LossPattern>,
231216
next_change: Instant,
232-
config_rx: mpsc::UnboundedReceiver<LossReplayCellConfig>,
217+
trace_to_set: Arc<AtomicRawCell<Box<dyn LossTraceConfig>>>,
233218
/// How many packets have been lost consecutively
234219
prev_loss: usize,
235220
rng: R,
@@ -257,9 +242,9 @@ where
257242
self.current_loss_pattern.update(loss, change_time);
258243
}
259244

260-
fn set_config(&mut self, config: LossReplayCellConfig) {
245+
fn set_config(&mut self, trace_config: Box<dyn LossTraceConfig>) {
261246
tracing::debug!("Set inner trace config");
262-
self.trace = config.trace_config.into_model();
247+
self.trace = trace_config.into_model();
263248
let now = Instant::now();
264249
if !self.update_loss(now) {
265250
tracing::warn!("Setting null trace");
@@ -306,12 +291,13 @@ where
306291

307292
// It could be None only if the other end of the channel has closed.
308293
let packet = self.egress.recv().await?;
309-
while let Ok(config) = self.config_rx.try_recv() {
310-
self.set_config(config);
311-
}
312-
313294
let packet = crate::check_cell_state!(self.state, packet);
314295

296+
// Try to update the config.
297+
if let Some(trace) = self.trace_to_set.swap_null() {
298+
self.set_config(*trace);
299+
}
300+
315301
let timestamp = packet.get_timestamp();
316302

317303
// Update the config until it is applicable for the packet.
@@ -383,17 +369,15 @@ impl LossReplayCellConfig {
383369
}
384370

385371
pub struct LossReplayCellControlInterface {
386-
config_tx: mpsc::UnboundedSender<LossReplayCellConfig>,
372+
trace_to_set: Arc<AtomicRawCell<Box<dyn LossTraceConfig>>>,
387373
}
388374

389375
impl ControlInterface for LossReplayCellControlInterface {
390376
type Config = LossReplayCellConfig;
391377

392378
fn set_config(&self, config: Self::Config) -> Result<(), Error> {
393379
info!("Setting loss replay config");
394-
self.config_tx
395-
.send(config)
396-
.map_err(|_| Error::ConfigError("Control channel is closed.".to_string()))?;
380+
self.trace_to_set.store(Box::new(config.trace_config));
397381
Ok(())
398382
}
399383
}
@@ -437,22 +421,22 @@ where
437421
{
438422
pub fn new(trace: Box<dyn LossTrace>, rng: R) -> Result<LossReplayCell<P, R>, Error> {
439423
let (rx, tx) = mpsc::unbounded_channel();
440-
let (config_tx, config_rx) = mpsc::unbounded_channel();
424+
let trace_to_set = Arc::new(AtomicRawCell::new_null());
441425
Ok(LossReplayCell {
442426
ingress: Arc::new(LossReplayCellIngress { ingress: rx }),
443427
egress: LossReplayCellEgress {
444428
egress: tx,
445429
trace,
446430
current_loss_pattern: TimedConfig::default(),
447431
next_change: Instant::now(),
448-
config_rx,
432+
trace_to_set: trace_to_set.clone(),
449433
prev_loss: 0,
450434
rng,
451435
state: AtomicCellState::new(CellState::Drop),
452436
notify_rx: None,
453437
started: false,
454438
},
455-
control_interface: Arc::new(LossReplayCellControlInterface { config_tx }),
439+
control_interface: Arc::new(LossReplayCellControlInterface { trace_to_set }),
456440
})
457441
}
458442
}

0 commit comments

Comments
 (0)