Skip to content

Commit beb1684

Browse files
committed
feat: option to fail eagerly with Error::NoHosts
This reverses the default behavior and give us chance to not fail with Error::NoHosts. This commit also refactors test_readonly a bit with help from above. Closes #34.
1 parent 58f8fb4 commit beb1684

File tree

2 files changed

+39
-14
lines changed

2 files changed

+39
-14
lines changed

src/client/mod.rs

+16-1
Original file line numberDiff line numberDiff line change
@@ -1541,6 +1541,7 @@ pub struct Connector {
15411541
session: Option<SessionInfo>,
15421542
readonly: bool,
15431543
detached: bool,
1544+
fail_eagerly: bool,
15441545
server_version: Version,
15451546
session_timeout: Duration,
15461547
connection_timeout: Duration,
@@ -1555,6 +1556,7 @@ impl Connector {
15551556
session: None,
15561557
readonly: false,
15571558
detached: false,
1559+
fail_eagerly: false,
15581560
server_version: Version(u32::MAX, u32::MAX, u32::MAX),
15591561
session_timeout: Duration::ZERO,
15601562
connection_timeout: Duration::ZERO,
@@ -1621,6 +1623,15 @@ impl Connector {
16211623
self
16221624
}
16231625

1626+
/// Fail session establishment eagerly with [Error::NoHosts] when all hosts has been tried.
1627+
///
1628+
/// This permits fail-fast without wait up to [Self::session_timeout] in [Self::connect]. This
1629+
/// is not suitable for situations where ZooKeeper cluster is accessible via a single virtual IP.
1630+
pub fn fail_eagerly(&mut self) -> &mut Self {
1631+
self.fail_eagerly = true;
1632+
self
1633+
}
1634+
16241635
async fn connect_internally(&mut self, secure: bool, cluster: &str) -> Result<Client> {
16251636
let (endpoints, chroot) = endpoint::parse_connect_string(cluster, secure)?;
16261637
if let Some(session) = self.session.as_ref() {
@@ -1647,6 +1658,9 @@ impl Connector {
16471658
self.connection_timeout,
16481659
);
16491660
let mut endpoints = IterableEndpoints::from(endpoints.as_slice());
1661+
if !self.fail_eagerly {
1662+
endpoints.cycle();
1663+
}
16501664
let mut buf = Vec::with_capacity(4096);
16511665
let mut connecting_depot = Depot::for_connecting();
16521666
let conn = session.start(&mut endpoints, &mut buf, &mut connecting_depot).await?;
@@ -1677,8 +1691,9 @@ impl Connector {
16771691
/// plaintext protocol, while `server3` uses tls encrypted protocol.
16781692
///
16791693
/// # Notable errors
1680-
/// * [Error::NoHosts] if no host is available
1694+
/// * [Error::NoHosts] if no host is available and [Self::fail_eagerly] is turn on
16811695
/// * [Error::SessionExpired] if specified session expired
1696+
/// * [Error::Timeout] if no session established with in approximate [Self::session_timeout]
16821697
///
16831698
/// # Notable behaviors
16841699
/// The state of this connector is undefined after connection attempt no matter whether it is

tests/zookeeper.rs

+23-13
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,23 @@ async fn connect(cluster: &Cluster, chroot: &str) -> zk::Client {
156156

157157
#[test_log::test(tokio::test)]
158158
async fn test_connect_nohosts() {
159-
assert_that!(zk::Client::connect("127.0.0.1:100,127.0.0.1:101").await.unwrap_err()).is_equal_to(zk::Error::NoHosts);
159+
assert_that!(zk::Client::connector()
160+
.session_timeout(Duration::from_secs(24 * 3600))
161+
.fail_eagerly()
162+
.connect("127.0.0.1:100,127.0.0.1:101")
163+
.await
164+
.unwrap_err())
165+
.is_equal_to(zk::Error::NoHosts);
166+
}
167+
168+
#[test_log::test(tokio::test)]
169+
async fn test_connect_timeout() {
170+
assert_that!(zk::Client::connector()
171+
.session_timeout(Duration::from_secs(1))
172+
.connect("127.0.0.1:100,127.0.0.1:101")
173+
.await
174+
.unwrap_err())
175+
.is_equal_to(zk::Error::Timeout);
160176
}
161177

162178
#[test_log::test(tokio::test)]
@@ -1807,18 +1823,12 @@ async fn test_readonly(tls: bool) {
18071823

18081824
logs.wait_for_message("Read-only server started").unwrap();
18091825

1810-
// It takes time for the last server to serve request, so let's spin on connecting.
1811-
let mut timeout = tokio::time::sleep(Duration::from_secs(60));
1812-
let client = loop {
1813-
let mut connector = zk::Client::connector();
1814-
connector.session_timeout(Duration::from_secs(60)).readonly(true);
1815-
select! {
1816-
_ = unsafe { Pin::new_unchecked(&mut timeout) } => panic!("expect ConnectedReadOnly, but got no session"),
1817-
result = connector.connect("localhost:4001,localhost:4002,localhost:4003") => if let Ok(client) = result {
1818-
break client
1819-
},
1820-
}
1821-
};
1826+
let client = zk::Client::connector()
1827+
.readonly(true)
1828+
.session_timeout(Duration::from_secs(60))
1829+
.connect("localhost:4001,localhost:4002,localhost:4003")
1830+
.await
1831+
.unwrap();
18221832
assert_that!(client.create("/y", b"", PERSISTENT_OPEN).await.unwrap_err()).is_equal_to(zk::Error::NotReadOnly);
18231833

18241834
let session = client.session().clone();

0 commit comments

Comments
 (0)