Skip to content

Commit fe69e74

Browse files
committed
fix: AuthFailed not reported in Connector::connect
`AuthFailed` is a terminal error. It should be reported eagerly if possible. Currently, there are two problems: * `AuthFailed` is not reported in connection establishment. * Session could immediately fail after successful `connect`.
1 parent 96ee554 commit fe69e74

File tree

4 files changed

+46
-7
lines changed

4 files changed

+46
-7
lines changed

src/client/mod.rs

+6-4
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use const_format::formatcp;
1010
use either::{Either, Left, Right};
1111
use ignore_result::Ignore;
1212
use thiserror::Error;
13-
use tokio::sync::{mpsc, watch};
13+
use tokio::sync::mpsc;
1414
use tracing::field::display;
1515
use tracing::{instrument, Span};
1616

@@ -248,9 +248,8 @@ impl Client {
248248
session: SessionInfo,
249249
timeout: Duration,
250250
requester: mpsc::UnboundedSender<SessionOperation>,
251-
state_receiver: watch::Receiver<SessionState>,
251+
state_watcher: StateWatcher,
252252
) -> Client {
253-
let state_watcher = StateWatcher::new(state_receiver);
254253
Client { chroot, version, session, session_timeout: timeout, requester, state_watcher }
255254
}
256255

@@ -1672,11 +1671,14 @@ impl Connector {
16721671
let (sender, receiver) = mpsc::unbounded_channel();
16731672
let session_info = session.session.clone();
16741673
let session_timeout = session.session_timeout;
1674+
let mut state_watcher = StateWatcher::new(state_receiver);
1675+
// Consume all state changes so far.
1676+
state_watcher.state();
16751677
tokio::spawn(async move {
16761678
session.serve(endpoints, conn, buf, connecting_depot, receiver).await;
16771679
});
16781680
let client =
1679-
Client::new(chroot.to_owned(), self.server_version, session_info, session_timeout, sender, state_receiver);
1681+
Client::new(chroot.to_owned(), self.server_version, session_info, session_timeout, sender, state_watcher);
16801682
Ok(client)
16811683
}
16821684

src/error.rs

+4
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ pub struct OtherError {
100100
}
101101

102102
impl Error {
103+
pub(crate) fn is_terminated(&self) -> bool {
104+
matches!(self, Self::NoHosts | Self::SessionExpired | Self::AuthFailed | Self::ClientClosed)
105+
}
106+
103107
pub(crate) fn has_no_data_change(&self) -> bool {
104108
match self {
105109
Self::NoNode

src/session/mod.rs

+13-2
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,9 @@ impl Session {
291291
fn handle_reply(&mut self, header: ReplyHeader, body: &[u8], depot: &mut Depot) -> Result<(), Error> {
292292
if header.err == ErrorCode::SessionExpired as i32 {
293293
return Err(Error::SessionExpired);
294-
} else if header.err == ErrorCode::AuthFailed as i32 {
294+
} else if header.err == ErrorCode::AuthFailed as i32
295+
|| header.err == ErrorCode::SessionClosedRequireSaslAuth as i32
296+
{
295297
return Err(Error::AuthFailed);
296298
}
297299
if header.xid == PredefinedXid::Notification as i32 {
@@ -423,6 +425,7 @@ impl Session {
423425
buf: &mut Vec<u8>,
424426
depot: &mut Depot,
425427
) -> Result<(), Error> {
428+
let mut pinged = false;
426429
let mut tick = time::interval(self.tick_timeout);
427430
tick.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
428431
while !(self.session_state.is_connected() && depot.is_empty()) {
@@ -441,6 +444,13 @@ impl Session {
441444
}
442445
},
443446
}
447+
if self.session_state.is_connected() && !pinged {
448+
// Send opcode other than `auth` and `sasl` to get possible AuthFailed if
449+
// "zookeeper.enforce.auth.enabled".
450+
pinged = true;
451+
self.send_ping(depot, Instant::now());
452+
depot.write_operations(conn)?;
453+
}
444454
}
445455
Ok(())
446456
}
@@ -571,6 +581,7 @@ impl Session {
571581
self.last_send = Instant::now();
572582
self.last_recv = self.last_send;
573583
self.last_ping = None;
584+
self.change_state(SessionState::Disconnected);
574585
match self.serve_connecting(&mut conn, buf, depot).await {
575586
Err(err) => {
576587
warn!("fails to establish session to {} due to {}", endpoint, err);
@@ -602,7 +613,7 @@ impl Session {
602613
Err(err) => err,
603614
Ok(conn) => return Ok(conn),
604615
};
605-
while last_error != Error::NoHosts && last_error != Error::SessionExpired {
616+
while !last_error.is_terminated() {
606617
if deadline.elapsed() {
607618
return Err(Error::Timeout);
608619
}

tests/zookeeper.rs

+23-1
Original file line numberDiff line numberDiff line change
@@ -390,7 +390,13 @@ impl Cluster {
390390
..Default::default()
391391
};
392392
match tls.as_ref() {
393-
None => {},
393+
None => {
394+
if standalone && !configs.is_empty() {
395+
configs += "clientPort=2181\n";
396+
configs += "initLimit=5\n";
397+
configs += "syncLimit=2\n";
398+
}
399+
},
394400
Some(tls) => {
395401
configs += r"
396402
ssl.clientAuth=need
@@ -1181,6 +1187,22 @@ async fn test_no_auth() {
11811187
assert_eq!(no_auth_client.set_data("/acl_test_2", b"set_my_data", None).await.unwrap_err(), zk::Error::NoAuth);
11821188
}
11831189

1190+
#[test_log::test(tokio::test)]
1191+
#[should_panic(expected = "AuthFailed")]
1192+
async fn test_auth_failed() {
1193+
let cluster = Cluster::with_options(ClusterOptions {
1194+
configs: vec![
1195+
"authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider",
1196+
"enforce.auth.enabled=true",
1197+
"enforce.auth.schemes=sasl",
1198+
"sessionRequireClientSASLAuth=true",
1199+
],
1200+
..Default::default()
1201+
})
1202+
.await;
1203+
cluster.client(None).await;
1204+
}
1205+
11841206
#[test_log::test(tokio::test)]
11851207
async fn test_delete() {
11861208
let cluster = Cluster::new().await;

0 commit comments

Comments
 (0)