Skip to content

Commit 0e4e201

Browse files
committed
fix: Expired not fire timely due to connection retry backoff
1 parent b0ce0b8 commit 0e4e201

File tree

4 files changed

+50
-34
lines changed

4 files changed

+50
-34
lines changed

src/deadline.rs

+9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::future::Future;
22
use std::pin::Pin;
33
use std::task::{Context, Poll};
4+
use std::time::Duration;
45

56
use tokio::time::{self, Instant, Sleep};
67

@@ -20,6 +21,14 @@ impl Deadline {
2021
pub fn elapsed(&self) -> bool {
2122
self.sleep.as_ref().map(|f| f.is_elapsed()).unwrap_or(false)
2223
}
24+
25+
/// Remaining timeout.
26+
pub fn timeout(&self) -> Duration {
27+
match self.sleep.as_ref() {
28+
None => Duration::MAX,
29+
Some(sleep) => sleep.deadline().saturating_duration_since(Instant::now()),
30+
}
31+
}
2332
}
2433

2534
impl Future for Deadline {

src/endpoint.rs

+14-12
Original file line numberDiff line numberDiff line change
@@ -209,15 +209,15 @@ impl IterableEndpoints {
209209
self.start = self.next;
210210
}
211211

212-
pub async fn next(&mut self) -> Option<EndpointRef<'_>> {
212+
pub async fn next(&mut self, max_delay: Duration) -> Option<EndpointRef<'_>> {
213213
let index = self.index()?;
214-
self.delay(index).await;
214+
self.delay(index, max_delay).await;
215215
self.step();
216216
Some(self.endpoints[index.offset].to_ref())
217217
}
218218

219-
async fn delay(&self, index: Index) {
220-
let timeout = Self::timeout(index, self.endpoints.len());
219+
async fn delay(&self, index: Index, max_delay: Duration) {
220+
let timeout = max_delay.min(Self::timeout(index, self.endpoints.len()));
221221
if timeout != Duration::ZERO {
222222
tokio::time::sleep(timeout).await;
223223
}
@@ -338,22 +338,24 @@ mod tests {
338338

339339
#[tokio::test]
340340
async fn test_iterable_endpoints_next() {
341+
use std::time::Duration;
342+
341343
use assertor::*;
342344

343345
use super::{parse_connect_string, EndpointRef, Index, IterableEndpoints};
344346
let (endpoints, _) = parse_connect_string("host1:2181,tcp://host2,tcp+tls://host3:2182", true).unwrap();
345347
let mut endpoints = IterableEndpoints::from(endpoints.as_slice());
346-
assert_eq!(endpoints.next().await, Some(EndpointRef::new("host1", 2181, true)));
347-
assert_eq!(endpoints.next().await, Some(EndpointRef::new("host2", 2181, false)));
348-
assert_eq!(endpoints.next().await, Some(EndpointRef::new("host3", 2182, true)));
349-
assert_eq!(endpoints.next().await, None);
348+
assert_eq!(endpoints.next(Duration::MAX).await, Some(EndpointRef::new("host1", 2181, true)));
349+
assert_eq!(endpoints.next(Duration::MAX).await, Some(EndpointRef::new("host2", 2181, false)));
350+
assert_eq!(endpoints.next(Duration::MAX).await, Some(EndpointRef::new("host3", 2182, true)));
351+
assert_eq!(endpoints.next(Duration::MAX).await, None);
350352

351353
endpoints.cycle();
352354
let start = std::time::Instant::now();
353-
assert_eq!(endpoints.next().await, Some(EndpointRef::new("host1", 2181, true)));
354-
assert_eq!(endpoints.next().await, Some(EndpointRef::new("host2", 2181, false)));
355-
assert_eq!(endpoints.next().await, Some(EndpointRef::new("host3", 2182, true)));
356-
assert_eq!(endpoints.next().await, Some(EndpointRef::new("host1", 2181, true)));
355+
assert_eq!(endpoints.next(Duration::MAX).await, Some(EndpointRef::new("host1", 2181, true)));
356+
assert_eq!(endpoints.next(Duration::MAX).await, Some(EndpointRef::new("host2", 2181, false)));
357+
assert_eq!(endpoints.next(Duration::MAX).await, Some(EndpointRef::new("host3", 2182, true)));
358+
assert_eq!(endpoints.next(Duration::MAX).await, Some(EndpointRef::new("host1", 2181, true)));
357359
let delay = IterableEndpoints::timeout(Index { offset: 0, cycles: 1 }, 3)
358360
+ IterableEndpoints::timeout(Index { offset: 1, cycles: 1 }, 3);
359361
let now = std::time::Instant::now();

src/session/mod.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ impl Session {
149149
let mut depot = Depot::for_serving();
150150
let mut unwatch_requester = self.unwatch_receiver.take().unwrap();
151151
endpoints.cycle();
152+
endpoints.reset();
152153
self.serve_once(conn, &mut endpoints, &mut buf, &mut depot, &mut requester, &mut unwatch_requester).await;
153154
while !self.session_state.is_terminated() {
154155
let conn = match self.start(&mut endpoints, &mut buf, &mut connecting_trans).await {
@@ -538,7 +539,7 @@ impl Session {
538539
buf: &mut Vec<u8>,
539540
depot: &mut Depot,
540541
) -> Result<Connection, Error> {
541-
let Some(endpoint) = endpoints.next().await else {
542+
let Some(endpoint) = endpoints.next(deadline.timeout()).await else {
542543
return Err(Error::NoHosts);
543544
};
544545
let mut conn = match self.connector.connect(endpoint, deadline).await {

tests/zookeeper.rs

+25-21
Original file line numberDiff line numberDiff line change
@@ -1776,6 +1776,29 @@ async fn test_tls() {
17761776
assert_eq!(client2.get_data("/a").await.unwrap_err(), zk::Error::NoAuth);
17771777
}
17781778

1779+
trait StateWaiter {
1780+
async fn wait(&mut self, expected: zk::SessionState, timeout: Option<Duration>);
1781+
}
1782+
1783+
impl StateWaiter for zk::StateWatcher {
1784+
async fn wait(&mut self, expected: zk::SessionState, timeout: Option<Duration>) {
1785+
let timeout = timeout.unwrap_or_else(|| Duration::from_secs(60));
1786+
let mut sleep = tokio::time::sleep(timeout);
1787+
let mut got = self.state();
1788+
loop {
1789+
if got == expected {
1790+
break;
1791+
} else if got.is_terminated() {
1792+
panic!("expect {expected}, but got terminal state {got}")
1793+
}
1794+
select! {
1795+
state = self.changed() => got = state,
1796+
_ = unsafe { Pin::new_unchecked(&mut sleep) } => panic!("expect {expected}, but still {got} after {}s", timeout.as_secs()),
1797+
}
1798+
}
1799+
}
1800+
}
1801+
17791802
#[cfg(target_os = "linux")]
17801803
#[test_case(true; "tls")]
17811804
#[test_case(false; "plaintext")]
@@ -1811,15 +1834,7 @@ async fn test_readonly(tls: bool) {
18111834
cluster.by_id(2).stop();
18121835

18131836
// Quorum session will expire finally.
1814-
let mut timeout = tokio::time::sleep(2 * client.session_timeout());
1815-
loop {
1816-
select! {
1817-
state = state_watcher.changed() => if state == zk::SessionState::Expired {
1818-
break
1819-
},
1820-
_ = unsafe { Pin::new_unchecked(&mut timeout) } => panic!("expect Expired, but got {}", state_watcher.state()),
1821-
}
1822-
}
1837+
state_watcher.wait(zk::SessionState::Expired, Some(2 * client.session_timeout())).await;
18231838

18241839
logs.wait_for_message("Read-only server started").unwrap();
18251840

@@ -1842,18 +1857,7 @@ async fn test_readonly(tls: bool) {
18421857
cluster.by_id(1).start();
18431858
cluster.by_id(2).start();
18441859

1845-
let mut timeout = tokio::time::sleep(Duration::from_secs(60));
1846-
loop {
1847-
select! {
1848-
state = state_watcher.changed() => match state {
1849-
zk::SessionState::SyncConnected => break,
1850-
zk::SessionState::Disconnected | zk::SessionState::ConnectedReadOnly => continue,
1851-
state => panic!("expect SyncConnected, but got {}", state),
1852-
},
1853-
_ = unsafe { Pin::new_unchecked(&mut timeout) } => panic!("expect SyncConnected, but got {}", state_watcher.state()),
1854-
}
1855-
}
1856-
1860+
state_watcher.wait(zk::SessionState::SyncConnected, None).await;
18571861
client.create("/z", b"", PERSISTENT_OPEN).await.unwrap();
18581862
}
18591863

0 commit comments

Comments
 (0)