Skip to content

Commit 0c72c7a

Browse files
committed
refactor: merge connecting and serving depots into one
Those two are there from initial day to keep `pending_authes` cross reconnection. But turns out, it is verbose and confusing. Treating `pending_authes` specially is enough for us to go. By merging them into one, we could share partial state among connection states, e.g. `xid`.
1 parent ea64b7f commit 0c72c7a

File tree

3 files changed

+15
-29
lines changed

3 files changed

+15
-29
lines changed

src/client/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -1671,16 +1671,16 @@ impl Connector {
16711671
endpoints.cycle();
16721672
}
16731673
let mut buf = Vec::with_capacity(4096);
1674-
let mut connecting_depot = Depot::for_connecting();
1675-
let conn = session.start(&mut endpoints, &mut buf, &mut connecting_depot).await?;
1674+
let mut depot = Depot::new();
1675+
let conn = session.start(&mut endpoints, &mut buf, &mut depot).await?;
16761676
let (sender, receiver) = mpsc::unbounded_channel();
16771677
let session_info = session.session.clone();
16781678
let session_timeout = session.session_timeout;
16791679
let mut state_watcher = StateWatcher::new(state_receiver);
16801680
// Consume all state changes so far.
16811681
state_watcher.state();
16821682
tokio::spawn(async move {
1683-
session.serve(endpoints, conn, buf, connecting_depot, receiver).await;
1683+
session.serve(endpoints, conn, buf, depot, receiver).await;
16841684
});
16851685
let client =
16861686
Client::new(chroot.to_owned(), self.server_version, session_info, session_timeout, sender, state_watcher);

src/session/depot.rs

+9-22
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ use crate::proto::{OpCode, PredefinedXid, RemoveWatchesRequest};
1616
pub struct Depot {
1717
xid: Xid,
1818

19+
// If ongoing authes are interruptted due to disconnection, they will be retried after
20+
// session reestablishment as auth failure will terminate session and should not be dropped
21+
// silently in case of disconnection.
1922
pending_authes: Vec<SessionOperation>,
2023

2124
writing_slices: Vec<IoSlice<'static>>,
@@ -30,7 +33,7 @@ pub struct Depot {
3033
}
3134

3235
impl Depot {
33-
pub fn for_serving() -> Depot {
36+
pub fn new() -> Self {
3437
let writing_capacity = 128usize;
3538
Depot {
3639
xid: Default::default(),
@@ -39,34 +42,18 @@ impl Depot {
3942
writing_slices: Vec::with_capacity(writing_capacity),
4043
writing_operations: VecDeque::with_capacity(writing_capacity),
4144
written_operations: HashMap::with_capacity(128),
42-
pending_operations: Default::default(),
45+
pending_operations: VecDeque::with_capacity(32),
4346
watching_paths: HashMap::with_capacity(32),
4447
unwatching_paths: HashMap::with_capacity(32),
4548
}
4649
}
4750

48-
pub fn for_connecting() -> Depot {
49-
Depot {
50-
xid: Default::default(),
51-
sasl: false,
52-
pending_authes: Default::default(),
53-
writing_slices: Vec::with_capacity(10),
54-
writing_operations: VecDeque::with_capacity(10),
55-
written_operations: HashMap::with_capacity(10),
56-
pending_operations: VecDeque::with_capacity(10),
57-
watching_paths: HashMap::new(),
58-
unwatching_paths: HashMap::new(),
59-
}
60-
}
61-
62-
/// Clear all buffered operations from previous run.
63-
pub fn clear(&mut self) {
64-
self.xid = Default::default();
51+
/// Reset state and clear buffered operations from previous run except `pending_authes`.
52+
pub fn reset(&mut self) {
53+
// We don't reset xid, so we get continuous xid (ignoring the overflow) even in case of
54+
// reconnection. This is helpful in diagnosis.
6555
self.sasl = false;
66-
self.pending_authes.clear();
6756
self.writing_slices.clear();
68-
self.watching_paths.clear();
69-
self.unwatching_paths.clear();
7057
self.writing_operations.clear();
7158
self.written_operations.clear();
7259
self.pending_operations.clear();

src/session/mod.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -225,16 +225,15 @@ impl Session {
225225
mut endpoints: IterableEndpoints,
226226
conn: Connection,
227227
mut buf: Vec<u8>,
228-
mut connecting_trans: Depot,
228+
mut depot: Depot,
229229
mut requester: mpsc::UnboundedReceiver<SessionOperation>,
230230
) {
231-
let mut depot = Depot::for_serving();
232231
let mut unwatch_requester = self.unwatch_receiver.take().unwrap();
233232
endpoints.cycle();
234233
endpoints.reset();
235234
self.serve_once(conn, &mut endpoints, &mut buf, &mut depot, &mut requester, &mut unwatch_requester).await;
236235
while !self.session_state.is_terminated() {
237-
let conn = match self.start(&mut endpoints, &mut buf, &mut connecting_trans).await {
236+
let conn = match self.start(&mut endpoints, &mut buf, &mut depot).await {
238237
Err(err) => {
239238
warn!("fail to connect to cluster {:?} due to {}", endpoints.endpoints(), err);
240239
self.resolve_start_error(&err);
@@ -683,7 +682,7 @@ impl Session {
683682
conn
684683
},
685684
};
686-
depot.clear();
685+
depot.reset();
687686
buf.clear();
688687
self.send_connect(depot);
689688
#[cfg(any(feature = "sasl-digest-md5", feature = "sasl-gssapi"))]

0 commit comments

Comments
 (0)