Skip to content

Commit a01365d

Browse files
authored
Merge pull request astarte-platform#446 from joshuachp/refactor/retry-logic
refactor(retry): improve the retry logic
2 parents a0273b3 + 8558092 commit a01365d

File tree

4 files changed

+66
-13
lines changed

4 files changed

+66
-13
lines changed

src/connection/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use tokio::task::JoinHandle;
2626
use tracing::{debug, error, info, trace, warn};
2727

2828
use crate::error::Report;
29+
use crate::retry::ExponentialIter;
2930
use crate::state::{SharedState, Status};
3031
use crate::transport::TransportError;
3132
use crate::Timestamp;
@@ -87,6 +88,7 @@ where
8788
sender: C::Sender,
8889
state: Arc<SharedState>,
8990
resend: Option<JoinHandle<()>>,
91+
backoff: ExponentialIter,
9092
}
9193

9294
impl<C> DeviceConnection<C>
@@ -107,6 +109,7 @@ where
107109
connection,
108110
sender,
109111
resend: None,
112+
backoff: ExponentialIter::default(),
110113
}
111114
}
112115

src/connection/resend.rs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ use crate::builder::DEFAULT_CHANNEL_SIZE;
2525
use crate::error::Report;
2626
use crate::retention::memory::ItemValue;
2727
use crate::retention::{RetentionId, StoredRetention, StoredRetentionExt};
28-
use crate::retry::ExponentialIter;
2928
use crate::state::SharedState;
3029
use crate::store::wrapper::StoreWrapper;
3130
use crate::store::StoreCapabilities;
@@ -115,7 +114,7 @@ where
115114

116115
match resend.await {
117116
Ok(()) => {
118-
trace!("resend task joined")
117+
trace!("task already exited")
119118
}
120119
Err(err) if err.is_cancelled() => {
121120
debug!("resend task was cancelled");
@@ -191,10 +190,32 @@ where
191190
{
192191
let interfaces = self.state.interfaces.read().await;
193192
debug!("reconnecting");
194-
let mut exp = ExponentialIter::default();
193+
194+
// Wait before trying to reconnect the first time, this will prevent cases where the
195+
// connection will loop that will keep throwing errors.
196+
//
197+
// The back-off will start with a wait time of 0s and increase exponentially until it
198+
// reaches a max. We also keep track of when the last disconnection happened, to reset the
199+
// wait time only if the connection has been stable for a certain duration of time.
200+
//
201+
// An example of an error loop is when the MQTT connection Interfaces (the device
202+
// introspection) contains interfaces not installed on Astarte:
203+
//
204+
// - Device: publishes on that interface
205+
// - Astarte: disconnects us since we don't have that interface in the introspection
206+
// - Device: reconnects and succeeds on the first try (the while loop bellow)
207+
// - Device: publishes on the same interface and gets disconnected again.
208+
//
209+
// If we didn't keep track of the last disconnection, the error loop above would continue to
210+
// happen without timeouts, wasting device bandwidth and resources.
211+
let timeout = self.backoff.next();
212+
213+
debug!("waiting {timeout} seconds before retrying");
214+
215+
tokio::time::sleep(Duration::from_secs(timeout)).await;
195216

196217
while !self.connection.reconnect(&interfaces).await? {
197-
let timeout = exp.next();
218+
let timeout = self.backoff.next();
198219

199220
debug!("waiting {timeout} seconds before retrying");
200221

src/retention/mod.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -517,8 +517,9 @@ impl Context {
517517
let timestamp = TimestampMillis::now();
518518

519519
// We want the values to be unique, this will wrap around, but it will never wrap on the
520-
// same ms
521-
let counter = self.counter.fetch_add(1, Ordering::AcqRel);
520+
// same ms, the ordering can be relaxed since the only guarantee we need is for the counter
521+
// to yield unique values.
522+
let counter = self.counter.fetch_add(1, Ordering::Relaxed);
522523

523524
Id { timestamp, counter }
524525
}
@@ -540,35 +541,40 @@ mod tests {
540541
#[test]
541542
fn id_should_be_unique() {
542543
const NUM: usize = 5;
544+
const CAP: usize = 1000;
543545
let ctx = Arc::new(Context::new());
544546

545-
let (tx, rx) = std::sync::mpsc::sync_channel::<Id>(NUM);
547+
let (tx, rx) = std::sync::mpsc::sync_channel::<Vec<Id>>(NUM);
546548

547549
for _ in 0..NUM {
548550
std::thread::spawn({
549551
let ctx = Arc::clone(&ctx);
550552
let tx = tx.clone();
551553

552554
move || {
555+
let mut gen = Vec::with_capacity(CAP);
553556
let mut prev = ctx.next();
554-
for _i in 0..1000 {
557+
for _i in 0..CAP {
555558
let new = ctx.next();
556559

557-
tx.send(new).expect("channel closed");
558-
559560
assert!(new > prev);
560561

562+
gen.push(prev);
561563
prev = new;
562564
}
565+
566+
tx.send(gen).expect("channel closed");
563567
}
564568
});
565569
}
566570

567571
drop(tx);
568572

569573
let mut recvd = HashSet::new();
570-
while let Ok(new) = rx.recv() {
571-
assert!(recvd.insert(new));
574+
while let Ok(gen) = rx.recv() {
575+
for i in gen {
576+
assert!(recvd.insert(i));
577+
}
572578
}
573579
}
574580
}

src/retry.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,33 @@
1818

1919
//! Module to handle the retry of the MQTT connection
2020
21+
use std::time::{Duration, Instant};
22+
23+
use tracing::trace;
24+
2125
/// Iterator that yields a delay that will increase exponentially till the max,
2226
#[derive(Debug, Clone, Copy)]
2327
pub(crate) struct ExponentialIter {
2428
n: u32,
2529
max: u32,
30+
reset_after: Duration,
31+
last: Option<Instant>,
2632
}
2733

2834
impl ExponentialIter {
2935
pub(crate) fn next(&mut self) -> u64 {
36+
if self
37+
.last
38+
.is_some_and(|instant| instant.elapsed() > self.reset_after)
39+
{
40+
trace!("resetting timeout");
41+
42+
// Start from the beginning
43+
self.n = 0;
44+
}
45+
46+
self.last = Some(Instant::now());
47+
3048
let v = ((self.n > 0) as u64).wrapping_shl(self.n.saturating_sub(1));
3149

3250
self.n = self.n.saturating_add(1).min(self.max);
@@ -37,7 +55,12 @@ impl ExponentialIter {
3755

3856
impl Default for ExponentialIter {
3957
fn default() -> Self {
40-
Self { n: 0, max: 9 }
58+
Self {
59+
n: 0,
60+
max: 9,
61+
reset_after: Duration::from_secs(256 * 4),
62+
last: None,
63+
}
4164
}
4265
}
4366

0 commit comments

Comments
 (0)