Skip to content

Commit 7953dca

Browse files
committed
refactor: merge connecting and serving depots into one
1 parent ea64b7f commit 7953dca

File tree

3 files changed

+14
-29
lines changed

3 files changed

+14
-29
lines changed

src/client/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -1671,16 +1671,16 @@ impl Connector {
16711671
endpoints.cycle();
16721672
}
16731673
let mut buf = Vec::with_capacity(4096);
1674-
let mut connecting_depot = Depot::for_connecting();
1675-
let conn = session.start(&mut endpoints, &mut buf, &mut connecting_depot).await?;
1674+
let mut depot = Depot::new();
1675+
let conn = session.start(&mut endpoints, &mut buf, &mut depot).await?;
16761676
let (sender, receiver) = mpsc::unbounded_channel();
16771677
let session_info = session.session.clone();
16781678
let session_timeout = session.session_timeout;
16791679
let mut state_watcher = StateWatcher::new(state_receiver);
16801680
// Consume all state changes so far.
16811681
state_watcher.state();
16821682
tokio::spawn(async move {
1683-
session.serve(endpoints, conn, buf, connecting_depot, receiver).await;
1683+
session.serve(endpoints, conn, buf, depot, receiver).await;
16841684
});
16851685
let client =
16861686
Client::new(chroot.to_owned(), self.server_version, session_info, session_timeout, sender, state_watcher);

src/session/depot.rs

+8-22
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ use crate::proto::{OpCode, PredefinedXid, RemoveWatchesRequest};
1616
pub struct Depot {
1717
xid: Xid,
1818

19+
// If ongoing authes are interruptted due to disconnection, they will be retried after
20+
// session reestablishment as auth failure will terminate session.
1921
pending_authes: Vec<SessionOperation>,
2022

2123
writing_slices: Vec<IoSlice<'static>>,
@@ -30,7 +32,7 @@ pub struct Depot {
3032
}
3133

3234
impl Depot {
33-
pub fn for_serving() -> Depot {
35+
pub fn new() -> Self {
3436
let writing_capacity = 128usize;
3537
Depot {
3638
xid: Default::default(),
@@ -39,34 +41,18 @@ impl Depot {
3941
writing_slices: Vec::with_capacity(writing_capacity),
4042
writing_operations: VecDeque::with_capacity(writing_capacity),
4143
written_operations: HashMap::with_capacity(128),
42-
pending_operations: Default::default(),
44+
pending_operations: VecDeque::with_capacity(32),
4345
watching_paths: HashMap::with_capacity(32),
4446
unwatching_paths: HashMap::with_capacity(32),
4547
}
4648
}
4749

48-
pub fn for_connecting() -> Depot {
49-
Depot {
50-
xid: Default::default(),
51-
sasl: false,
52-
pending_authes: Default::default(),
53-
writing_slices: Vec::with_capacity(10),
54-
writing_operations: VecDeque::with_capacity(10),
55-
written_operations: HashMap::with_capacity(10),
56-
pending_operations: VecDeque::with_capacity(10),
57-
watching_paths: HashMap::new(),
58-
unwatching_paths: HashMap::new(),
59-
}
60-
}
61-
62-
/// Clear all buffered operations from previous run.
63-
pub fn clear(&mut self) {
64-
self.xid = Default::default();
50+
/// Reset state and clear buffered operations from previous run except `pending_authes`.
51+
pub fn reset(&mut self) {
52+
// We don't reset xid, so we get continuous xid (ignoring the overflow) even in case of
53+
// reconnection. This is helpful in diagnosis.
6554
self.sasl = false;
66-
self.pending_authes.clear();
6755
self.writing_slices.clear();
68-
self.watching_paths.clear();
69-
self.unwatching_paths.clear();
7056
self.writing_operations.clear();
7157
self.written_operations.clear();
7258
self.pending_operations.clear();

src/session/mod.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -225,16 +225,15 @@ impl Session {
225225
mut endpoints: IterableEndpoints,
226226
conn: Connection,
227227
mut buf: Vec<u8>,
228-
mut connecting_trans: Depot,
228+
mut depot: Depot,
229229
mut requester: mpsc::UnboundedReceiver<SessionOperation>,
230230
) {
231-
let mut depot = Depot::for_serving();
232231
let mut unwatch_requester = self.unwatch_receiver.take().unwrap();
233232
endpoints.cycle();
234233
endpoints.reset();
235234
self.serve_once(conn, &mut endpoints, &mut buf, &mut depot, &mut requester, &mut unwatch_requester).await;
236235
while !self.session_state.is_terminated() {
237-
let conn = match self.start(&mut endpoints, &mut buf, &mut connecting_trans).await {
236+
let conn = match self.start(&mut endpoints, &mut buf, &mut depot).await {
238237
Err(err) => {
239238
warn!("fail to connect to cluster {:?} due to {}", endpoints.endpoints(), err);
240239
self.resolve_start_error(&err);
@@ -683,7 +682,7 @@ impl Session {
683682
conn
684683
},
685684
};
686-
depot.clear();
685+
depot.reset();
687686
buf.clear();
688687
self.send_connect(depot);
689688
#[cfg(any(feature = "sasl-digest-md5", feature = "sasl-gssapi"))]

0 commit comments

Comments
 (0)