Skip to content

Commit 43062b1

Browse files
authored
feat: add backoff between connection retries (#37)
Closes #35.
1 parent e206b9f commit 43062b1

File tree

4 files changed

+105
-28
lines changed

4 files changed

+105
-28
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ rustls-pemfile = "2"
3434
webpki-roots = "0.26.1"
3535
derive-where = "1.2.7"
3636
tokio-rustls = "0.26.0"
37+
fastrand = "2.0.2"
3738

3839
[dev-dependencies]
3940
test-log = "0.2.12"

src/client/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1658,6 +1658,7 @@ impl Connector {
16581658
self.connection_timeout,
16591659
);
16601660
let mut endpoints = IterableEndpoints::from(endpoints.as_slice());
1661+
endpoints.reset();
16611662
if !self.fail_eagerly {
16621663
endpoints.cycle();
16631664
}

src/endpoint.rs

+100-27
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use std::fmt::{self, Display, Formatter};
2+
use std::time::Duration;
23

34
use crate::chroot::Chroot;
45
use crate::error::Error;
@@ -157,12 +158,19 @@ pub fn parse_connect_string(s: &str, tls: bool) -> Result<(Vec<EndpointRef<'_>>,
157158
pub struct IterableEndpoints {
158159
cycle: bool,
159160
next: usize,
161+
start: usize,
160162
endpoints: Vec<Endpoint>,
161163
}
162164

165+
#[derive(Copy, Clone, Debug)]
166+
pub struct Index {
167+
offset: usize,
168+
cycles: usize,
169+
}
170+
163171
impl IterableEndpoints {
164172
pub fn new(endpoints: impl Into<Vec<Endpoint>>) -> Self {
165-
Self { cycle: false, next: 0, endpoints: endpoints.into() }
173+
Self { cycle: false, start: 0, next: 0, endpoints: endpoints.into() }
166174
}
167175

168176
pub fn len(&self) -> usize {
@@ -174,35 +182,68 @@ impl IterableEndpoints {
174182
}
175183

176184
pub fn cycle(&mut self) {
177-
if self.next >= self.endpoints.len() {
178-
self.next = 0;
179-
}
180185
self.cycle = true;
181186
}
182187

183-
pub fn next(&mut self) -> Option<EndpointRef<'_>> {
184-
let next = self.next;
185-
if next >= self.endpoints.len() {
188+
/// Resets all counters and shuffles endpoints for future iterations.
189+
pub fn reset(&mut self) {
190+
self.next = 0;
191+
self.start = 0;
192+
fastrand::shuffle(&mut self.endpoints);
193+
}
194+
195+
/// Starts an new iteration at where `peek` stopped.
196+
pub fn start(&mut self) {
197+
self.start = self.next;
198+
}
199+
200+
pub async fn next(&mut self) -> Option<EndpointRef<'_>> {
201+
let index = self.index()?;
202+
self.delay(index).await;
203+
self.step();
204+
Some(self.endpoints[index.offset].to_ref())
205+
}
206+
207+
async fn delay(&self, index: Index) {
208+
let timeout = Self::timeout(index, self.endpoints.len());
209+
if timeout != Duration::ZERO {
210+
tokio::time::sleep(timeout).await;
211+
}
212+
}
213+
214+
fn timeout(index: Index, size: usize) -> Duration {
215+
if index.cycles == 0 {
216+
return Duration::ZERO;
217+
}
218+
let unit = Duration::from_millis(100);
219+
if index.offset == 0 {
220+
let jitter = Duration::from_millis(fastrand::u32(0..100).into());
221+
let base = Duration::from_millis(1000).min(unit * size as u32);
222+
base * 2u32.pow(index.cycles as u32 - 1) + jitter
223+
} else {
224+
let jitter = Duration::from_millis(fastrand::u32(0..50).into());
225+
(unit * (index.offset as u32)) / 2 + jitter
226+
}
227+
}
228+
229+
/// Index for `next` and `peek`.
230+
fn index(&self) -> Option<Index> {
231+
let i = self.next - self.start;
232+
let n = self.endpoints.len();
233+
if i >= n && !self.cycle {
186234
return None;
187235
}
188-
self.step();
189-
let host = &self.endpoints[next];
190-
Some(host.to_ref())
236+
let offset = i % self.endpoints.len();
237+
let cycles = i / self.endpoints.len();
238+
Some(Index { offset, cycles })
191239
}
192240

193241
pub fn step(&mut self) {
194242
self.next += 1;
195-
if self.cycle && self.next >= self.endpoints.len() {
196-
self.next = 0;
197-
}
198243
}
199244

200245
pub fn peek(&self) -> Option<EndpointRef<'_>> {
201-
let next = self.next;
202-
if next >= self.endpoints.len() {
203-
return None;
204-
}
205-
Some(self.endpoints[next].to_ref())
246+
self.index().map(|index| self.endpoints[index.offset].to_ref())
206247
}
207248
}
208249

@@ -283,21 +324,53 @@ mod tests {
283324
);
284325
}
285326

327+
#[tokio::test]
328+
async fn test_iterable_endpoints_next() {
329+
use assertor::*;
330+
331+
use super::{parse_connect_string, EndpointRef, Index, IterableEndpoints};
332+
let (endpoints, _) = parse_connect_string("host1:2181,tcp://host2,tcp+tls://host3:2182", true).unwrap();
333+
let mut endpoints = IterableEndpoints::from(endpoints.as_slice());
334+
assert_eq!(endpoints.next().await, Some(EndpointRef::new("host1", 2181, true)));
335+
assert_eq!(endpoints.next().await, Some(EndpointRef::new("host2", 2181, false)));
336+
assert_eq!(endpoints.next().await, Some(EndpointRef::new("host3", 2182, true)));
337+
assert_eq!(endpoints.next().await, None);
338+
339+
endpoints.cycle();
340+
let start = std::time::Instant::now();
341+
assert_eq!(endpoints.next().await, Some(EndpointRef::new("host1", 2181, true)));
342+
assert_eq!(endpoints.next().await, Some(EndpointRef::new("host2", 2181, false)));
343+
assert_eq!(endpoints.next().await, Some(EndpointRef::new("host3", 2182, true)));
344+
assert_eq!(endpoints.next().await, Some(EndpointRef::new("host1", 2181, true)));
345+
let delay = IterableEndpoints::timeout(Index { offset: 0, cycles: 1 }, 3)
346+
+ IterableEndpoints::timeout(Index { offset: 1, cycles: 1 }, 3);
347+
let now = std::time::Instant::now();
348+
assert_that!(now).is_greater_than(start + delay);
349+
}
350+
286351
#[test]
287-
fn test_iterable_endpoints() {
352+
fn test_iterable_endpoints_peek() {
288353
use super::{parse_connect_string, EndpointRef, IterableEndpoints};
289354
let (endpoints, _) = parse_connect_string("host1:2181,tcp://host2,tcp+tls://host3:2182", true).unwrap();
290355
let mut endpoints = IterableEndpoints::from(endpoints.as_slice());
291-
assert_eq!(endpoints.next(), Some(EndpointRef::new("host1", 2181, true)));
292-
assert_eq!(endpoints.next(), Some(EndpointRef::new("host2", 2181, false)));
293-
assert_eq!(endpoints.next(), Some(EndpointRef::new("host3", 2182, true)));
294-
assert_eq!(endpoints.next(), None);
356+
assert_eq!(endpoints.peek(), Some(EndpointRef::new("host1", 2181, true)));
357+
// Successive `peek` without `step` doesn't advance.
358+
assert_eq!(endpoints.peek(), Some(EndpointRef::new("host1", 2181, true)));
359+
endpoints.step();
360+
assert_eq!(endpoints.peek(), Some(EndpointRef::new("host2", 2181, false)));
361+
endpoints.step();
362+
assert_eq!(endpoints.peek(), Some(EndpointRef::new("host3", 2182, true)));
363+
endpoints.step();
364+
assert_eq!(endpoints.peek(), None);
295365

296366
endpoints.cycle();
297-
assert_eq!(endpoints.next(), Some(EndpointRef::new("host1", 2181, true)));
298-
assert_eq!(endpoints.next(), Some(EndpointRef::new("host2", 2181, false)));
299-
assert_eq!(endpoints.next(), Some(EndpointRef::new("host3", 2182, true)));
300-
assert_eq!(endpoints.next(), Some(EndpointRef::new("host1", 2181, true)));
367+
assert_eq!(endpoints.peek(), Some(EndpointRef::new("host1", 2181, true)));
368+
endpoints.step();
369+
assert_eq!(endpoints.peek(), Some(EndpointRef::new("host2", 2181, false)));
370+
endpoints.step();
371+
assert_eq!(endpoints.peek(), Some(EndpointRef::new("host3", 2182, true)));
372+
endpoints.step();
373+
assert_eq!(endpoints.peek(), Some(EndpointRef::new("host1", 2181, true)));
301374
}
302375

303376
#[test]

src/session/mod.rs

+3-1
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ impl Session {
159159
},
160160
Ok(conn) => conn,
161161
};
162+
endpoints.reset();
162163
self.serve_once(conn, &mut endpoints, &mut buf, &mut depot, &mut requester, &mut unwatch_requester).await;
163164
}
164165
let err = self.state_error();
@@ -537,7 +538,7 @@ impl Session {
537538
buf: &mut Vec<u8>,
538539
depot: &mut Depot,
539540
) -> Result<Connection, Error> {
540-
let Some(endpoint) = endpoints.next() else {
541+
let Some(endpoint) = endpoints.next().await else {
541542
return Err(Error::NoHosts);
542543
};
543544
let mut conn = match self.connector.connect(endpoint, deadline).await {
@@ -577,6 +578,7 @@ impl Session {
577578
buf: &mut Vec<u8>,
578579
depot: &mut Depot,
579580
) -> Result<Connection, Error> {
581+
endpoints.start();
580582
let session_timeout = if self.session.id.0 == 0 { self.session_timeout } else { self.session_expired_timeout };
581583
let mut deadline = Deadline::until(self.last_recv + session_timeout);
582584
let mut last_error = match self.start_once(endpoints, &mut deadline, buf, depot).await {

0 commit comments

Comments
 (0)