Skip to content

Commit 175b0d8

Browse files
committed
Add Connector to deprecate ClientBuilder
Closes #28.
1 parent a946882 commit 175b0d8

File tree

2 files changed

+240
-121
lines changed

2 files changed

+240
-121
lines changed

src/client/mod.rs

+207-83
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use std::borrow::Cow;
44
use std::fmt::Write as _;
55
use std::future::Future;
66
use std::mem::ManuallyDrop;
7-
use std::sync::Arc;
87
use std::time::Duration;
98

109
use const_format::formatcp;
@@ -227,14 +226,20 @@ impl Client {
227226

228227
/// Connects to ZooKeeper cluster.
229228
pub async fn connect(cluster: &str) -> Result<Self> {
230-
Self::builder().connect(cluster).await
229+
Self::connector().connect(cluster).await
231230
}
232231

233232
/// Creates a builder with configurable options in connecting to ZooKeeper cluster.
233+
#[deprecated(since = "0.7.0", note = "use Client::connector instead")]
234234
pub fn builder() -> ClientBuilder {
235235
ClientBuilder::new()
236236
}
237237

238+
/// Creates a builder with configurable options in connecting to ZooKeeper cluster.
239+
pub fn connector() -> Connector {
240+
Connector::new()
241+
}
242+
238243
pub(crate) fn new(
239244
chroot: OwnedChroot,
240245
version: Version,
@@ -1528,32 +1533,96 @@ impl Drop for OwnedLockClient {
15281533
#[derive(Copy, Clone, Debug, PartialEq, PartialOrd)]
15291534
pub(crate) struct Version(u32, u32, u32);
15301535

1531-
/// Builder for [Client] with more options than [Client::connect].
1536+
/// Options for tls connection.
1537+
#[derive(Debug)]
1538+
pub struct TlsOptions {
1539+
identity: Option<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>)>,
1540+
ca_certs: RootCertStore,
1541+
}
1542+
1543+
impl Clone for TlsOptions {
1544+
fn clone(&self) -> Self {
1545+
Self {
1546+
identity: self.identity.as_ref().map(|id| (id.0.clone(), id.1.clone_key())),
1547+
ca_certs: self.ca_certs.clone(),
1548+
}
1549+
}
1550+
}
1551+
1552+
impl Default for TlsOptions {
1553+
/// Tls options with well-known ca roots.
1554+
fn default() -> Self {
1555+
let mut options = Self::no_ca();
1556+
options.ca_certs.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
1557+
options
1558+
}
1559+
}
1560+
1561+
impl TlsOptions {
1562+
/// Tls options with no ca certificates. Use [TlsOptions::default] if well-known ca roots is
1563+
/// desirable.
1564+
pub fn no_ca() -> Self {
1565+
Self { ca_certs: RootCertStore::empty(), identity: None }
1566+
}
1567+
1568+
/// Adds new ca certificates.
1569+
pub fn with_pem_ca_certs(mut self, certs: &str) -> Result<Self> {
1570+
for r in rustls_pemfile::certs(&mut certs.as_bytes()) {
1571+
let cert = match r {
1572+
Ok(cert) => cert,
1573+
Err(err) => return Err(Error::other(format!("fail to read cert {}", err), err)),
1574+
};
1575+
if let Err(err) = self.ca_certs.add(cert) {
1576+
return Err(Error::other(format!("fail to add cert {}", err), err));
1577+
}
1578+
}
1579+
Ok(self)
1580+
}
1581+
1582+
/// Specifies client identity for server to authenticate.
1583+
pub fn with_pem_identity(mut self, cert: &str, key: &str) -> Result<Self> {
1584+
let r: std::result::Result<Vec<_>, _> = rustls_pemfile::certs(&mut cert.as_bytes()).collect();
1585+
let certs = match r {
1586+
Err(err) => return Err(Error::other(format!("fail to read cert {}", err), err)),
1587+
Ok(certs) => certs,
1588+
};
1589+
let key = match rustls_pemfile::private_key(&mut key.as_bytes()) {
1590+
Err(err) => return Err(Error::other(format!("fail to read client private key {err}"), err)),
1591+
Ok(None) => return Err(Error::BadArguments(&"no client private key")),
1592+
Ok(Some(key)) => key,
1593+
};
1594+
self.identity = Some((certs, key));
1595+
Ok(self)
1596+
}
1597+
1598+
fn take_roots(&mut self) -> RootCertStore {
1599+
std::mem::replace(&mut self.ca_certs, RootCertStore::empty())
1600+
}
1601+
}
1602+
1603+
/// A builder for [Client].
15321604
#[derive(Clone, Debug)]
1533-
pub struct ClientBuilder {
1534-
tls: bool,
1535-
trusted_certs: RootCertStore,
1536-
client_certs: Option<(Vec<CertificateDer<'static>>, Arc<PrivateKeyDer<'static>>)>,
1605+
pub struct Connector {
1606+
tls: Option<TlsOptions>,
15371607
authes: Vec<AuthPacket>,
1538-
version: Version,
15391608
session: Option<(SessionId, Vec<u8>)>,
15401609
readonly: bool,
15411610
detached: bool,
1611+
server_version: Version,
15421612
session_timeout: Duration,
15431613
connection_timeout: Duration,
15441614
}
15451615

1546-
impl ClientBuilder {
1616+
/// Builder for [Client] with more options than [Client::connect].
1617+
impl Connector {
15471618
fn new() -> Self {
15481619
Self {
1549-
tls: false,
1550-
trusted_certs: RootCertStore::empty(),
1551-
client_certs: None,
1620+
tls: None,
15521621
authes: Default::default(),
1553-
version: Version(u32::MAX, u32::MAX, u32::MAX),
15541622
session: None,
15551623
readonly: false,
15561624
detached: false,
1625+
server_version: Version(u32::MAX, u32::MAX, u32::MAX),
15571626
session_timeout: Duration::ZERO,
15581627
connection_timeout: Duration::ZERO,
15591628
}
@@ -1562,75 +1631,38 @@ impl ClientBuilder {
15621631
/// Specifies target session timeout to negotiate with ZooKeeper server.
15631632
///
15641633
/// Defaults to 6s.
1565-
pub fn with_session_timeout(&mut self, timeout: Duration) -> &mut Self {
1634+
pub fn session_timeout(&mut self, timeout: Duration) -> &mut Self {
15661635
self.session_timeout = timeout;
15671636
self
15681637
}
15691638

15701639
/// Specifies idle timeout to conclude a connection as loss.
15711640
///
15721641
/// Defaults to `2/5` of session timeout.
1573-
pub fn with_connection_timeout(&mut self, timeout: Duration) -> &mut Self {
1642+
pub fn connection_timeout(&mut self, timeout: Duration) -> &mut Self {
15741643
self.connection_timeout = timeout;
15751644
self
15761645
}
15771646

15781647
/// Specifies whether readonly server is allowed.
1579-
pub fn with_readonly(&mut self, readonly: bool) -> &mut ClientBuilder {
1648+
pub fn readonly(&mut self, readonly: bool) -> &mut Self {
15801649
self.readonly = readonly;
15811650
self
15821651
}
15831652

15841653
/// Specifies auth info for given authentication scheme.
1585-
pub fn with_auth(&mut self, scheme: String, auth: Vec<u8>) -> &mut ClientBuilder {
1654+
pub fn auth(&mut self, scheme: String, auth: Vec<u8>) -> &mut Self {
15861655
self.authes.push(AuthPacket { scheme, auth });
15871656
self
15881657
}
15891658

15901659
/// Specifies session to reestablish.
1591-
pub fn with_session(&mut self, id: SessionId, password: Vec<u8>) -> &mut Self {
1660+
pub fn session(&mut self, id: SessionId, password: Vec<u8>) -> &mut Self {
15921661
self.session = Some((id, password));
15931662
self
15941663
}
15951664

1596-
/// Assumes tls for server in connection string if no protocol specified individually.
1597-
/// See [Self::connect] for syntax to specify protocol individually.
1598-
pub fn assume_tls(&mut self) -> &mut Self {
1599-
self.tls = true;
1600-
self
1601-
}
1602-
1603-
/// Trusts certificates signed by given ca certificates.
1604-
pub fn trust_ca_pem_certs(&mut self, certs: &str) -> Result<&mut Self> {
1605-
for r in rustls_pemfile::certs(&mut certs.as_bytes()) {
1606-
let cert = match r {
1607-
Ok(cert) => cert,
1608-
Err(err) => return Err(Error::other(format!("fail to read cert {}", err), err)),
1609-
};
1610-
if let Err(err) = self.trusted_certs.add(cert) {
1611-
return Err(Error::other(format!("fail to add cert {}", err), err));
1612-
}
1613-
}
1614-
Ok(self)
1615-
}
1616-
1617-
/// Identifies client itself to server with given cert chain and private key.
1618-
pub fn use_client_pem_cert(&mut self, cert: &str, key: &str) -> Result<&mut Self> {
1619-
let r: std::result::Result<Vec<_>, _> = rustls_pemfile::certs(&mut cert.as_bytes()).collect();
1620-
let certs = match r {
1621-
Err(err) => return Err(Error::other(format!("fail to read cert {}", err), err)),
1622-
Ok(certs) => certs,
1623-
};
1624-
let key = match rustls_pemfile::private_key(&mut key.as_bytes()) {
1625-
Err(err) => return Err(Error::other(format!("fail to read client private key {err}"), err)),
1626-
Ok(None) => return Err(Error::BadArguments(&"no client private key")),
1627-
Ok(Some(key)) => key,
1628-
};
1629-
self.client_certs = Some((certs, Arc::new(key)));
1630-
Ok(self)
1631-
}
1632-
1633-
/// Specifies client assumed server version of ZooKeeper cluster.
1665+
/// Specifies target server version of ZooKeeper cluster.
16341666
///
16351667
/// Client will issue server compatible protocol to avoid [Error::Unimplemented] for some
16361668
/// operations. See [Client::create] for an example.
@@ -1639,30 +1671,25 @@ impl ClientBuilder {
16391671
///
16401672
/// [ZOOKEEPER-1381]: https://issues.apache.org/jira/browse/ZOOKEEPER-1381
16411673
/// [ZOOKEEPER-3762]: https://issues.apache.org/jira/browse/ZOOKEEPER-3762
1642-
pub fn assume_server_version(&mut self, major: u32, minor: u32, patch: u32) -> &mut Self {
1643-
self.version = Version(major, minor, patch);
1674+
pub fn server_version(&mut self, major: u32, minor: u32, patch: u32) -> &mut Self {
1675+
self.server_version = Version(major, minor, patch);
16441676
self
16451677
}
16461678

1647-
/// Detaches creating session so it will not be closed after all client instances dropped.
1648-
pub fn detach(&mut self) -> &mut Self {
1679+
/// Detaches created session so it will not be closed after all client instances dropped.
1680+
pub fn detached(&mut self) -> &mut Self {
16491681
self.detached = true;
16501682
self
16511683
}
16521684

1653-
/// Connects to ZooKeeper cluster.
1654-
///
1655-
/// Parameter `cluster` specifies connection string to ZooKeeper cluster. It has same syntax as
1656-
/// Java client except that you can specifies protocol for server individually. For example,
1657-
/// `tcp://server1,tcp+tls://server2:port,server3`. This claims that `server1` uses plaintext
1658-
/// protocol, `server2` uses tls encrypted protocol while `server3` uses tls if
1659-
/// [Self::assume_tls] is specified or plaintext otherwise.
1660-
///
1661-
/// # Notable errors
1662-
/// * [Error::NoHosts] if no host is available
1663-
/// * [Error::SessionExpired] if specified session expired
1664-
pub async fn connect(&mut self, cluster: &str) -> Result<Client> {
1665-
let (hosts, chroot) = util::parse_connect_string(cluster, self.tls)?;
1685+
/// Specifies tls options for connections to ZooKeeper.
1686+
pub fn tls(&mut self, options: TlsOptions) -> &mut Self {
1687+
self.tls = Some(options);
1688+
self
1689+
}
1690+
1691+
async fn connect_internally(&mut self, secure: bool, cluster: &str) -> Result<Client> {
1692+
let (hosts, chroot) = util::parse_connect_string(cluster, secure)?;
16661693
if let Some((id, password)) = &self.session {
16671694
if id.0 == 0 {
16681695
return Err(Error::BadArguments(&"session id must not be 0"));
@@ -1678,19 +1705,15 @@ impl ClientBuilder {
16781705
} else if self.connection_timeout < Duration::ZERO {
16791706
return Err(Error::BadArguments(&"connection timeout must not be negative"));
16801707
}
1681-
self.trusted_certs.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned());
1682-
let tls_config = if let Some((certs, private_key)) = self.client_certs.take() {
1683-
match ClientConfig::builder()
1684-
.with_root_certificates(std::mem::replace(&mut self.trusted_certs, RootCertStore::empty()))
1685-
.with_client_auth_cert(certs, Arc::try_unwrap(private_key).unwrap_or_else(|k| k.clone_key()))
1686-
{
1708+
let mut tls_options = self.tls.take().unwrap_or_default();
1709+
let tls_builder = ClientConfig::builder().with_root_certificates(tls_options.take_roots());
1710+
let tls_config = if let Some((client_cert, client_key)) = tls_options.identity.take() {
1711+
match tls_builder.with_client_auth_cert(client_cert, client_key) {
16871712
Ok(config) => config,
16881713
Err(err) => return Err(Error::other(format!("invalid client private key {err}"), err)),
16891714
}
16901715
} else {
1691-
ClientConfig::builder()
1692-
.with_root_certificates(std::mem::replace(&mut self.trusted_certs, RootCertStore::empty()))
1693-
.with_no_client_auth()
1716+
tls_builder.with_no_client_auth()
16941717
};
16951718
let (mut session, state_receiver) = Session::new(
16961719
self.session.take(),
@@ -1713,9 +1736,110 @@ impl ClientBuilder {
17131736
session.serve(servers, conn, buf, connecting_depot, receiver).await;
17141737
});
17151738
let client =
1716-
Client::new(chroot.to_owned(), self.version, session_info, session_timeout, sender, state_receiver);
1739+
Client::new(chroot.to_owned(), self.server_version, session_info, session_timeout, sender, state_receiver);
17171740
Ok(client)
17181741
}
1742+
1743+
/// Connects to ZooKeeper cluster.
1744+
///
1745+
/// Same to [Self::connect] except that `server1` will use tls encrypted protocol given
1746+
/// the connection string `server1,tcp://server2,tcp+tls://server3`.
1747+
pub async fn secure_connect(&mut self, cluster: &str) -> Result<Client> {
1748+
self.connect_internally(true, cluster).await
1749+
}
1750+
1751+
/// Connects to ZooKeeper cluster.
1752+
///
1753+
/// Parameter `cluster` specifies connection string to ZooKeeper cluster. It has same syntax as
1754+
/// Java client except that you can specifies protocol for server individually. For example,
1755+
/// `server1,tcp://server2,tcp+tls://server3`. This claims that `server1` and `server2` use
1756+
/// plaintext protocol, while `server3` uses tls encrypted protocol.
1757+
///
1758+
/// # Notable errors
1759+
/// * [Error::NoHosts] if no host is available
1760+
/// * [Error::SessionExpired] if specified session expired
1761+
///
1762+
/// # Notable behaviors
1763+
/// The state of this connector is undefined after connection attempt no matter whether it is
1764+
/// success or not.
1765+
pub async fn connect(&mut self, cluster: &str) -> Result<Client> {
1766+
self.connect_internally(false, cluster).await
1767+
}
1768+
}
1769+
1770+
/// Builder for [Client] with more options than [Client::connect].
1771+
#[derive(Clone, Debug)]
1772+
pub struct ClientBuilder {
1773+
connector: Connector,
1774+
}
1775+
1776+
impl ClientBuilder {
1777+
fn new() -> Self {
1778+
Self { connector: Connector::new() }
1779+
}
1780+
1781+
/// Specifies target session timeout to negotiate with ZooKeeper server.
1782+
///
1783+
/// Defaults to 6s.
1784+
pub fn with_session_timeout(&mut self, timeout: Duration) -> &mut Self {
1785+
self.connector.session_timeout(timeout);
1786+
self
1787+
}
1788+
1789+
/// Specifies idle timeout to conclude a connection as loss.
1790+
///
1791+
/// Defaults to `2/5` of session timeout.
1792+
pub fn with_connection_timeout(&mut self, timeout: Duration) -> &mut Self {
1793+
self.connector.connection_timeout(timeout);
1794+
self
1795+
}
1796+
1797+
/// Specifies whether readonly server is allowed.
1798+
pub fn with_readonly(&mut self, readonly: bool) -> &mut ClientBuilder {
1799+
self.connector.readonly = readonly;
1800+
self
1801+
}
1802+
1803+
/// Specifies auth info for given authentication scheme.
1804+
pub fn with_auth(&mut self, scheme: String, auth: Vec<u8>) -> &mut ClientBuilder {
1805+
self.connector.auth(scheme, auth);
1806+
self
1807+
}
1808+
1809+
/// Specifies session to reestablish.
1810+
pub fn with_session(&mut self, id: SessionId, password: Vec<u8>) -> &mut Self {
1811+
self.connector.session(id, password);
1812+
self
1813+
}
1814+
1815+
/// Specifies client assumed server version of ZooKeeper cluster.
1816+
///
1817+
/// Client will issue server compatible protocol to avoid [Error::Unimplemented] for some
1818+
/// operations. See [Client::create] for an example.
1819+
///
1820+
/// See [ZOOKEEPER-1381][] and [ZOOKEEPER-3762][] for references.
1821+
///
1822+
/// [ZOOKEEPER-1381]: https://issues.apache.org/jira/browse/ZOOKEEPER-1381
1823+
/// [ZOOKEEPER-3762]: https://issues.apache.org/jira/browse/ZOOKEEPER-3762
1824+
pub fn assume_server_version(&mut self, major: u32, minor: u32, patch: u32) -> &mut Self {
1825+
self.connector.server_version(major, minor, patch);
1826+
self
1827+
}
1828+
1829+
/// Detaches creating session so it will not be closed after all client instances dropped.
1830+
pub fn detach(&mut self) -> &mut Self {
1831+
self.connector.detached();
1832+
self
1833+
}
1834+
1835+
/// Connects to ZooKeeper cluster.
1836+
///
1837+
/// # Notable errors
1838+
/// * [Error::NoHosts] if no host is available
1839+
/// * [Error::SessionExpired] if specified session expired
1840+
pub async fn connect(&mut self, cluster: &str) -> Result<Client> {
1841+
self.connector.connect(cluster).await
1842+
}
17191843
}
17201844

17211845
trait MultiBuffer {

0 commit comments

Comments
 (0)