Skip to content

Commit 6b93d92

Browse files
committed
feat: support GSSAPI SASL authentication
Closes #12.
1 parent 15582ba commit 6b93d92

File tree

8 files changed

+292
-22
lines changed

8 files changed

+292
-22
lines changed

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ derive-where = "1.2.7"
3535
tokio-rustls = "0.26.0"
3636
fastrand = "2.0.2"
3737
tracing = "0.1.40"
38+
rsasl = { version = "2.0.1", default-features = false, features = ["provider", "gssapi", "config_builder", "registry_static", "std"] }
3839

3940
[dev-dependencies]
4041
test-log = { version = "0.2.15", features = ["log", "trace"] }

README.md

-3
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,6 @@ latch.create("/app/data", b"data", &zk::CreateMode::Ephemeral.with_acls(zk::Acls
7979

8080
For more examples, see [zookeeper.rs](tests/zookeeper.rs).
8181

82-
## TODO
83-
* [ ] Sasl authentication
84-
8582
## License
8683
The MIT License (MIT). See [LICENSE](LICENSE) for the full license text.
8784

src/client/mod.rs

+10
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use crate::proto::{
4545
};
4646
pub use crate::proto::{EnsembleUpdate, Stat};
4747
use crate::record::{self, Record, StaticRecord};
48+
use crate::sasl::SaslOptions;
4849
use crate::session::StateReceiver;
4950
pub use crate::session::{EventType, SessionId, SessionInfo, SessionState, WatchedEvent};
5051
use crate::tls::TlsOptions;
@@ -1538,6 +1539,7 @@ pub(crate) struct Version(u32, u32, u32);
15381539
#[derive(Clone, Debug)]
15391540
pub struct Connector {
15401541
tls: Option<TlsOptions>,
1542+
sasl: Option<SaslOptions>,
15411543
authes: Vec<AuthPacket>,
15421544
session: Option<SessionInfo>,
15431545
readonly: bool,
@@ -1553,6 +1555,7 @@ impl Connector {
15531555
fn new() -> Self {
15541556
Self {
15551557
tls: None,
1558+
sasl: None,
15561559
authes: Default::default(),
15571560
session: None,
15581561
readonly: false,
@@ -1624,6 +1627,12 @@ impl Connector {
16241627
self
16251628
}
16261629

1630+
/// Specifies SASL options.
1631+
pub fn sasl(&mut self, options: impl Into<SaslOptions>) -> &mut Self {
1632+
self.sasl = Some(options.into());
1633+
self
1634+
}
1635+
16271636
/// Fail session establishment eagerly with [Error::NoHosts] when all hosts has been tried.
16281637
///
16291638
/// This permits fail-fast without wait up to [Self::session_timeout] in [Self::connect]. This
@@ -1657,6 +1666,7 @@ impl Connector {
16571666
self.readonly,
16581667
self.detached,
16591668
tls_config,
1669+
self.sasl.take(),
16601670
self.session_timeout,
16611671
self.connection_timeout,
16621672
);

src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ mod endpoint;
66
mod error;
77
mod proto;
88
mod record;
9+
mod sasl;
910
mod session;
1011
mod tls;
1112
mod util;
@@ -14,3 +15,4 @@ pub use self::acl::{Acl, Acls, AuthId, AuthUser, Permission};
1415
pub use self::error::Error;
1516
pub use self::tls::TlsOptions;
1617
pub use crate::client::*;
18+
pub use crate::sasl::{GssapiSaslOptions, SaslOptions};

src/record.rs

+31-3
Original file line numberDiff line numberDiff line change
@@ -422,16 +422,44 @@ impl<'a> DeserializableRecord<'a> for &'a [u8] {
422422
type Error = InsufficientBuf;
423423

424424
fn deserialize(buf: &mut ReadingBuf<'a>) -> Result<&'a [u8], Self::Error> {
425+
Option::<&[u8]>::deserialize(buf).map(|opt| opt.unwrap_or_default())
426+
}
427+
}
428+
429+
impl SerializableRecord for Option<&[u8]> {
430+
fn serialize(&self, buf: &mut dyn BufMut) {
431+
match self {
432+
None => buf.put_i32(-1),
433+
Some(bytes) => bytes.serialize(buf),
434+
}
435+
}
436+
}
437+
438+
impl DynamicRecord for Option<&[u8]> {
439+
fn serialized_len(&self) -> usize {
440+
match self {
441+
None => 4,
442+
Some(bytes) => 4 + bytes.len(),
443+
}
444+
}
445+
}
446+
447+
impl<'a> DeserializableRecord<'a> for Option<&'a [u8]> {
448+
type Error = InsufficientBuf;
449+
450+
fn deserialize(buf: &mut ReadingBuf<'a>) -> Result<Option<&'a [u8]>, Self::Error> {
425451
let n = i32::deserialize(buf)?;
426-
if n <= 0 {
427-
return Ok(Default::default());
452+
if n < 0 {
453+
return Ok(None);
454+
} else if n == 0 {
455+
return Ok(Some(Default::default()));
428456
} else if n > buf.len() as i32 {
429457
return Err(InsufficientBuf);
430458
}
431459
let n = n as usize;
432460
let bytes = unsafe { buf.get_unchecked(..n) };
433461
unsafe { *buf = buf.get_unchecked(n..) };
434-
Ok(bytes)
462+
Ok(Some(bytes))
435463
}
436464
}
437465

src/sasl.rs

+149
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
use std::borrow::Cow;
2+
3+
use rsasl::callback::{Context, Request, SessionCallback, SessionData};
4+
use rsasl::mechanisms::gssapi::properties::GssService;
5+
use rsasl::prelude::*;
6+
use rsasl::property::Hostname;
7+
8+
use crate::error::Error;
9+
10+
pub(crate) type Result<T, E = crate::error::Error> = std::result::Result<T, E>;
11+
12+
#[derive(Clone, Debug)]
13+
enum SaslInnerOptions {
14+
Gssapi(GssapiSaslOptions),
15+
}
16+
17+
impl From<GssapiSaslOptions> for SaslOptions {
18+
fn from(options: GssapiSaslOptions) -> Self {
19+
Self(SaslInnerOptions::Gssapi(options))
20+
}
21+
}
22+
23+
/// Client side SASL options.
24+
#[derive(Clone, Debug)]
25+
pub struct SaslOptions(SaslInnerOptions);
26+
27+
impl SaslOptions {
28+
/// Constructs a default [GssapiSaslOptions] for further customization.
29+
///
30+
/// Make sure localhost is granted by Kerberos KDC, unlike Java counterpart this library
31+
/// provides no mean to grant ticket from KDC but simply utilizes whatever the ticket cache
32+
/// have.
33+
pub fn gssapi() -> GssapiSaslOptions {
34+
GssapiSaslOptions::new()
35+
}
36+
37+
pub(crate) fn new_session(&self, hostname: &str) -> Result<SaslSession> {
38+
match &self.0 {
39+
SaslInnerOptions::Gssapi(options) => {
40+
struct GssapiOptionsProvider {
41+
username: Cow<'static, str>,
42+
hostname: Cow<'static, str>,
43+
}
44+
impl SessionCallback for GssapiOptionsProvider {
45+
fn callback(
46+
&self,
47+
_session_data: &SessionData,
48+
_context: &Context,
49+
request: &mut Request<'_>,
50+
) -> Result<(), SessionError> {
51+
if request.is::<Hostname>() {
52+
request.satisfy::<Hostname>(&self.hostname)?;
53+
} else if request.is::<GssService>() {
54+
request.satisfy::<GssService>(&self.username)?;
55+
}
56+
Ok(())
57+
}
58+
}
59+
let provider = GssapiOptionsProvider {
60+
username: options.username.clone(),
61+
hostname: options.hostname_or(hostname),
62+
};
63+
let config = SASLConfig::builder().with_defaults().with_callback(provider).unwrap();
64+
let client = SASLClient::new(config);
65+
let session = client.start_suggested(&[Mechname::parse(b"GSSAPI").unwrap()]).unwrap();
66+
SaslSession::new(session)
67+
},
68+
}
69+
}
70+
}
71+
72+
pub struct SaslSession {
73+
output: Vec<u8>,
74+
session: Session,
75+
finished: bool,
76+
}
77+
78+
impl SaslSession {
79+
fn new(session: Session) -> Result<Self> {
80+
let mut session = Self { session, output: Default::default(), finished: false };
81+
if session.session.are_we_first() {
82+
session.step(Default::default())?;
83+
}
84+
Ok(session)
85+
}
86+
87+
pub fn name(&self) -> &str {
88+
self.session.get_mechname().as_str()
89+
}
90+
91+
pub fn initial(&self) -> &[u8] {
92+
&self.output
93+
}
94+
95+
pub fn step(&mut self, challenge: &[u8]) -> Result<Option<&[u8]>> {
96+
if self.finished {
97+
return Err(Error::UnexpectedError(format!("SASL {} session already finished", self.name())));
98+
}
99+
self.output.clear();
100+
match self.session.step(Some(challenge), &mut self.output).map_err(|e| Error::other(format!("{e}"), e))? {
101+
State::Running => Ok(Some(&self.output)),
102+
State::Finished(MessageSent::Yes) => {
103+
self.finished = true;
104+
Ok(Some(&self.output))
105+
},
106+
State::Finished(MessageSent::No) => {
107+
self.finished = true;
108+
Ok(None)
109+
},
110+
}
111+
}
112+
}
113+
114+
/// GSSAPI SASL options.
115+
#[derive(Clone, Debug)]
116+
pub struct GssapiSaslOptions {
117+
username: Cow<'static, str>,
118+
hostname: Option<Cow<'static, str>>,
119+
}
120+
121+
impl GssapiSaslOptions {
122+
fn new() -> Self {
123+
Self { username: Cow::from("zookeeper"), hostname: None }
124+
}
125+
126+
/// Specifies the primary part of Kerberos principal.
127+
///
128+
/// It is `zookeeper.sasl.client.username` in Java client, but the word "client" is misleading
129+
/// as it is the username of targeting server.
130+
///
131+
/// Defaults to "zookeeper".
132+
pub fn with_username(self, username: impl Into<Cow<'static, str>>) -> Self {
133+
Self { username: username.into(), ..self }
134+
}
135+
136+
/// Specifies the instance part of Kerberos principal.
137+
///
138+
/// Defaults to hostname or ip of targeting server in connecting string.
139+
pub fn with_hostname(self, hostname: impl Into<Cow<'static, str>>) -> Self {
140+
Self { hostname: Some(hostname.into()), ..self }
141+
}
142+
143+
fn hostname_or(&self, hostname: &str) -> Cow<'static, str> {
144+
match self.hostname.as_ref() {
145+
None => Cow::Owned(hostname.to_string()),
146+
Some(hostname) => hostname.clone(),
147+
}
148+
}
149+
}

src/session/depot.rs

+38-3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ pub struct Depot {
2222
writing_operations: VecDeque<Operation>,
2323
written_operations: HashMap<i32, SessionOperation>,
2424

25+
sasl: bool,
26+
pending_operations: VecDeque<SessionOperation>,
27+
2528
watching_paths: HashMap<(&'static str, WatchMode), usize>,
2629
unwatching_paths: HashMap<(&'static str, WatchMode), SessionOperation>,
2730
}
@@ -31,10 +34,12 @@ impl Depot {
3134
let writing_capacity = 128usize;
3235
Depot {
3336
xid: Default::default(),
37+
sasl: false,
3438
pending_authes: Vec::with_capacity(5),
3539
writing_slices: Vec::with_capacity(writing_capacity),
3640
writing_operations: VecDeque::with_capacity(writing_capacity),
3741
written_operations: HashMap::with_capacity(128),
42+
pending_operations: Default::default(),
3843
watching_paths: HashMap::with_capacity(32),
3944
unwatching_paths: HashMap::with_capacity(32),
4045
}
@@ -43,23 +48,30 @@ impl Depot {
4348
pub fn for_connecting() -> Depot {
4449
Depot {
4550
xid: Default::default(),
51+
sasl: false,
4652
pending_authes: Default::default(),
4753
writing_slices: Vec::with_capacity(10),
4854
writing_operations: VecDeque::with_capacity(10),
4955
written_operations: HashMap::with_capacity(10),
56+
pending_operations: VecDeque::with_capacity(10),
5057
watching_paths: HashMap::new(),
5158
unwatching_paths: HashMap::new(),
5259
}
5360
}
5461

5562
/// Clear all buffered operations from previous run.
5663
pub fn clear(&mut self) {
64+
self.xid = Default::default();
65+
self.sasl = false;
5766
self.pending_authes.clear();
5867
self.writing_slices.clear();
5968
self.watching_paths.clear();
6069
self.unwatching_paths.clear();
6170
self.writing_operations.clear();
6271
self.written_operations.clear();
72+
self.pending_operations.clear();
73+
self.watching_paths.clear();
74+
self.unwatching_paths.clear();
6375
}
6476

6577
/// Error out ongoing operations except authes.
@@ -97,7 +109,7 @@ impl Depot {
97109

98110
/// Check whether there is any ongoing operations.
99111
pub fn is_empty(&self) -> bool {
100-
self.writing_operations.is_empty() && self.written_operations.is_empty()
112+
self.writing_operations.is_empty() && self.written_operations.is_empty() && self.pending_operations.is_empty()
101113
}
102114

103115
pub fn pop_request(&mut self, xid: i32) -> Result<SessionOperation, Error> {
@@ -107,11 +119,21 @@ impl Depot {
107119
}
108120
}
109121

110-
fn push_request(&mut self, mut operation: SessionOperation) {
111-
operation.request.set_xid(self.xid.next());
122+
fn write_session(&mut self, mut operation: SessionOperation) {
123+
if operation.request.get_xid() == 0 {
124+
operation.request.set_xid(self.xid.next());
125+
}
112126
self.push_operation(Operation::Session(operation));
113127
}
114128

129+
fn push_request(&mut self, operation: SessionOperation) {
130+
if self.sasl {
131+
self.pending_operations.push_back(operation);
132+
return;
133+
}
134+
self.write_session(operation);
135+
}
136+
115137
pub fn pop_ping(&mut self) -> Result<(), Error> {
116138
self.pop_request(PredefinedXid::Ping.into()).map(|_| ())
117139
}
@@ -122,6 +144,19 @@ impl Depot {
122144
self.writing_slices.push(IoSlice::new(buf));
123145
}
124146

147+
pub fn push_sasl(&mut self, token: &[u8]) {
148+
let operation = SessionOperation::new(OpCode::Sasl, &Some(token));
149+
self.write_session(operation);
150+
self.sasl = true;
151+
}
152+
153+
pub fn complete_sasl(&mut self) {
154+
self.sasl = false;
155+
while let Some(operation) = self.pending_operations.pop_front() {
156+
self.write_session(operation);
157+
}
158+
}
159+
125160
pub fn has_pending_writes(&self) -> bool {
126161
!self.writing_slices.is_empty()
127162
}

0 commit comments

Comments
 (0)