Skip to content

Commit bcd3a94

Browse files
committed
fix: session disconnected due to unblock multiple unwatching
`xid` for unwatching requests is not set in unblocking. Response for later unwatching will not find matching request as it was popped by earlier response. Closes #22.
1 parent 64cca9e commit bcd3a94

File tree

3 files changed

+39
-6
lines changed

3 files changed

+39
-6
lines changed

src/session/depot.rs

+9-5
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ impl Depot {
107107
}
108108
}
109109

110+
fn push_request(&mut self, mut operation: SessionOperation) {
111+
operation.request.set_xid(self.xid.next());
112+
self.push_operation(Operation::Session(operation));
113+
}
114+
110115
pub fn pop_ping(&mut self) -> Result<(), Error> {
111116
self.pop_request(PredefinedXid::Ping.into()).map(|_| ())
112117
}
@@ -145,13 +150,13 @@ impl Depot {
145150
if *count == 0 {
146151
self.watching_paths.remove(&(path, mode));
147152
if let Some(operation) = self.unwatching_paths.remove(&(path, mode)) {
148-
self.push_operation(Operation::Session(operation));
153+
self.push_request(operation);
149154
}
150155
if self.has_watching_requests(path) {
151156
return;
152157
}
153158
if let Some(operation) = self.unwatching_paths.remove(&(path, WatchMode::Any)) {
154-
self.push_operation(Operation::Session(operation));
159+
self.push_request(operation);
155160
}
156161
}
157162
}
@@ -166,7 +171,7 @@ impl Depot {
166171
self.cancel_unwatch(path, mode);
167172
}
168173

169-
pub fn push_session(&mut self, mut operation: SessionOperation) {
174+
pub fn push_session(&mut self, operation: SessionOperation) {
170175
let info = operation.request.get_operation_info();
171176
log::debug!("ZooKeeper operation request: {:?}", info);
172177
if let (op_code, OpStat::Watch { path, mode }) = info {
@@ -185,8 +190,7 @@ impl Depot {
185190
self.watching_paths.insert((path, mode), count);
186191
}
187192
}
188-
operation.request.set_xid(self.xid.next());
189-
self.push_operation(Operation::Session(operation));
193+
self.push_request(operation);
190194
}
191195

192196
pub fn push_remove_watch(&mut self, path: &str, mode: WatchMode, responser: StateResponser) {

src/session/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ impl Session {
203203
) {
204204
if let Err(err) = self.serve_session(&sock, buf, depot, requester, unwatch_requester).await {
205205
self.resolve_serve_error(&err);
206-
log::debug!("ZooKeeper session {} state {} error {}", self.session_id, self.session_state, err);
206+
log::info!("ZooKeeper session {} state {} error {}", self.session_id, self.session_state, err);
207207
depot.error(&err);
208208
} else {
209209
self.change_state(SessionState::Disconnected);

tests/zookeeper.rs

+29
Original file line numberDiff line numberDiff line change
@@ -1164,6 +1164,35 @@ async fn test_persistent_watcher_passive_remove() {
11641164
assert_eq!(child_event.path, "/");
11651165
}
11661166

1167+
#[test_log::test(tokio::test)]
1168+
async fn test_fail_watch_with_multiple_unwatching() {
1169+
let docker = DockerCli::default();
1170+
let zookeeper = docker.run(zookeeper_image());
1171+
let zk_port = zookeeper.get_host_port(2181);
1172+
let cluster = format!("127.0.0.1:{}", zk_port);
1173+
1174+
let client = zk::Client::connect(&cluster).await.unwrap();
1175+
1176+
let (_, exist_watcher1) = client.check_and_watch_stat("/a1").await.unwrap();
1177+
let (_, exist_watcher2) = client.check_and_watch_stat("/a2").await.unwrap();
1178+
1179+
let mut state_watcher = client.state_watcher();
1180+
1181+
let data_watching1 = client.get_and_watch_data("/a1");
1182+
let data_watching2 = client.get_and_watch_data("/a2");
1183+
1184+
drop(exist_watcher1);
1185+
drop(exist_watcher2);
1186+
1187+
assert_that!(data_watching1.await.unwrap_err()).is_equal_to(zk::Error::NoNode);
1188+
assert_that!(data_watching2.await.unwrap_err()).is_equal_to(zk::Error::NoNode);
1189+
1190+
select! {
1191+
state = state_watcher.changed() => panic!("expect no state update, but got {state}"),
1192+
_ = tokio::time::sleep(Duration::from_millis(10)) => {},
1193+
}
1194+
}
1195+
11671196
#[test_log::test(tokio::test)]
11681197
async fn test_fail_watch_with_concurrent_passive_remove() {
11691198
let docker = DockerCli::default();

0 commit comments

Comments
 (0)