Skip to content

Commit 82d25e5

Browse files
authored
fix: last_zxid_seen in ConnectRequest is not set (#39)
Closes #32.
1 parent 0e4e201 commit 82d25e5

File tree

3 files changed

+46
-7
lines changed

3 files changed

+46
-7
lines changed

src/client/mod.rs

+32
Original file line numberDiff line numberDiff line change
@@ -2257,4 +2257,36 @@ mod tests {
22572257
.unwrap_err())
22582258
.is_equal_to(Error::BadArguments(&"directory node must not be sequential"));
22592259
}
2260+
2261+
#[test_log::test(tokio::test)]
2262+
async fn session_last_zxid_seen() {
2263+
use testcontainers::clients::Cli as DockerCli;
2264+
use testcontainers::core::{Healthcheck, WaitFor};
2265+
use testcontainers::images::generic::GenericImage;
2266+
2267+
let healthcheck = Healthcheck::default()
2268+
.with_cmd(["./bin/zkServer.sh", "status"].iter())
2269+
.with_interval(Duration::from_secs(2))
2270+
.with_retries(60);
2271+
let image =
2272+
GenericImage::new("zookeeper", "3.9.0").with_healthcheck(healthcheck).with_wait_for(WaitFor::Healthcheck);
2273+
let docker = DockerCli::default();
2274+
let container = docker.run(image);
2275+
let endpoint = format!("127.0.0.1:{}", container.get_host_port(2181));
2276+
2277+
let client1 = Client::connector().detached().connect(&endpoint).await.unwrap();
2278+
client1.create("/n1", b"", &CreateMode::Persistent.with_acls(Acls::anyone_all())).await.unwrap();
2279+
2280+
let mut session = client1.into_session();
2281+
2282+
// Fail to connect with large zxid.
2283+
session.last_zxid = i64::MAX;
2284+
assert_that!(Client::connector().fail_eagerly().session(session.clone()).connect(&endpoint).await.unwrap_err())
2285+
.is_equal_to(Error::NoHosts);
2286+
2287+
// Succeed to connect with small zxid.
2288+
session.last_zxid = 0;
2289+
let client2 = Client::connector().fail_eagerly().session(session.clone()).connect(&endpoint).await.unwrap();
2290+
client2.create("/n2", b"", &CreateMode::Persistent.with_acls(Acls::anyone_all())).await.unwrap();
2291+
}
22602292
}

src/session/mod.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ impl Session {
102102

103103
configured_connection_timeout: connection_timeout,
104104

105-
last_zxid: 0,
105+
last_zxid: session.last_zxid,
106106
last_recv: now,
107107
last_send: now,
108108
last_ping: None,
@@ -286,15 +286,15 @@ impl Session {
286286
}
287287

288288
fn handle_reply(&mut self, header: ReplyHeader, body: &[u8], depot: &mut Depot) -> Result<(), Error> {
289-
if header.err == ErrorCode::SessionExpired.into() {
289+
if header.err == ErrorCode::SessionExpired as i32 {
290290
return Err(Error::SessionExpired);
291-
} else if header.err == ErrorCode::AuthFailed.into() {
291+
} else if header.err == ErrorCode::AuthFailed as i32 {
292292
return Err(Error::AuthFailed);
293293
}
294-
if header.xid == PredefinedXid::Notification.into() {
294+
if header.xid == PredefinedXid::Notification as i32 {
295295
self.handle_notification(header.zxid, body, depot)?;
296296
return Ok(());
297-
} else if header.xid == PredefinedXid::Ping.into() {
297+
} else if header.xid == PredefinedXid::Ping as i32 {
298298
depot.pop_ping()?;
299299
if let Some(last_ping) = self.last_ping.take() {
300300
let elapsed = Instant::now() - last_ping;
@@ -514,7 +514,7 @@ impl Session {
514514
fn send_connect(&self, depot: &mut Depot) {
515515
let request = ConnectRequest {
516516
protocol_version: 0,
517-
last_zxid_seen: 0,
517+
last_zxid_seen: self.last_zxid,
518518
timeout: self.session_timeout.as_millis() as i32,
519519
session_id: if self.session.readonly { 0 } else { self.session.id.0 },
520520
password: self.session.password.as_slice(),

src/session/types.rs

+8-1
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,18 @@ pub struct SessionInfo {
3030
#[derive_where(skip(Debug))]
3131
pub(crate) password: Vec<u8>,
3232
pub(crate) readonly: bool,
33+
/// Only set through test otherwise 0.
34+
///
35+
/// I thought to carry [Session::last_zxid] from [Client::session] to [Connector::session].
36+
/// This way session reestablishment API has not major difference with internal reconnection.
37+
/// But I think it is a ZooKeeper tradition to [Client::sync] after session reestablishment.
38+
/// We probably should not challenge this.
39+
pub(crate) last_zxid: i64,
3340
}
3441

3542
impl SessionInfo {
3643
pub(crate) fn new(id: SessionId, password: Vec<u8>) -> Self {
37-
Self { id, password, readonly: id.0 == 0 }
44+
Self { id, password, readonly: id.0 == 0, last_zxid: 0 }
3845
}
3946

4047
/// Session id.

0 commit comments

Comments
 (0)