Skip to content

Commit bc9e93d

Browse files
authored
perf: lock-free routing (#118)
* lock-free routing * refine more
1 parent 67aa06a commit bc9e93d

File tree

4 files changed

+57
-34
lines changed

4 files changed

+57
-34
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/gossip/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ repository.workspace = true
2525
version.workspace = true
2626

2727
[dependencies]
28+
arc-swap = { workspace = true }
2829
backon = { workspace = true }
2930
exn = { workspace = true }
3031
fastimer = { workspace = true }

crates/gossip/src/gossip.rs

Lines changed: 41 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414

1515
use std::path::PathBuf;
1616
use std::sync::Arc;
17-
use std::sync::RwLock;
1817
use std::time::Duration;
1918

19+
use arc_swap::ArcSwap;
2020
use backon::ConstantBuilder;
2121
use backon::Retryable;
2222
use exn::Result;
@@ -61,35 +61,35 @@ pub type GossipFuture = JoinHandle<Result<(), GossipError>>;
6161
pub struct GossipState {
6262
dir: PathBuf,
6363
initial_peers: Vec<Url>,
64-
current_node: RwLock<NodeInfo>,
64+
current_node: ArcSwap<NodeInfo>,
6565
transport: Transport,
6666

67-
membership: RwLock<Membership>,
68-
ring: RwLock<Arc<HashRing<Uuid>>>,
67+
membership: ArcSwap<Membership>,
68+
ring: ArcSwap<HashRing<Uuid>>,
6969
}
7070

7171
impl GossipState {
7272
pub fn new(current_node: NodeInfo, initial_peers: Vec<Url>, dir: PathBuf) -> Self {
7373
Self {
7474
dir,
7575
initial_peers,
76-
current_node: RwLock::new(current_node),
77-
membership: RwLock::new(Membership::default()),
76+
current_node: ArcSwap::new(Arc::new(current_node)),
77+
membership: ArcSwap::new(Arc::new(Membership::default())),
7878
transport: Transport::new(),
79-
ring: RwLock::new(Arc::new(HashRing::default())),
79+
ring: ArcSwap::new(Arc::new(HashRing::default())),
8080
}
8181
}
8282

8383
pub fn current(&self) -> NodeInfo {
84-
self.current_node.read().unwrap().clone()
84+
(**self.current_node.load()).clone()
8585
}
8686

87-
pub fn membership(&self) -> Membership {
88-
self.membership.read().unwrap().clone()
87+
pub fn membership(&self) -> Arc<Membership> {
88+
self.membership.load_full()
8989
}
9090

9191
pub fn ring(&self) -> Arc<HashRing<Uuid>> {
92-
self.ring.read().unwrap().clone()
92+
self.ring.load_full()
9393
}
9494

9595
/// Start the gossip protocol.
@@ -101,11 +101,12 @@ impl GossipState {
101101
let mut gossip_futs = vec![];
102102

103103
// Fast bootstrap
104-
self.membership.write().unwrap().update_member(MemberState {
105-
info: self.current(),
106-
status: MemberStatus::Alive,
107-
heartbeat: Timestamp::now(),
108-
});
104+
self.membership
105+
.store(Arc::new(Membership::from_iter([MemberState {
106+
info: self.current(),
107+
status: MemberStatus::Alive,
108+
heartbeat: Timestamp::now(),
109+
}])));
109110

110111
let state_clone = self.clone();
111112
rt.spawn(async move {
@@ -255,50 +256,51 @@ impl GossipState {
255256
log::debug!("received message: {message:?}");
256257
let result = match message {
257258
GossipMessage::Ping(info) => {
258-
self.membership.write().unwrap().update_member(MemberState {
259+
let mut membership = (**self.membership.load()).clone();
260+
membership.update_member(MemberState {
259261
info: info.clone(),
260262
status: MemberStatus::Alive,
261263
heartbeat: Timestamp::now(),
262264
});
265+
self.membership.store(Arc::new(membership));
263266

264267
// Respond with an ack
265268
Some(GossipMessage::Ack(self.current()))
266269
}
267270
GossipMessage::Ack(info) => {
268-
self.membership.write().unwrap().update_member(MemberState {
271+
let mut membership = (**self.membership.load()).clone();
272+
membership.update_member(MemberState {
269273
info: info.clone(),
270274
status: MemberStatus::Alive,
271275
heartbeat: Timestamp::now(),
272276
});
277+
self.membership.store(Arc::new(membership));
273278

274279
None
275280
}
276281
GossipMessage::Sync { members } => {
282+
let mut membership = (**self.membership.load()).clone();
277283
for member in members {
278-
self.membership.write().unwrap().update_member(member);
284+
membership.update_member(member);
279285
}
280286

281287
// Ensure the current node is alive
282-
self.membership.write().unwrap().update_member(MemberState {
288+
membership.update_member(MemberState {
283289
info: self.current(),
284290
status: MemberStatus::Alive,
285291
heartbeat: Timestamp::now(),
286292
});
287293

294+
self.membership.store(Arc::new(membership.clone()));
295+
288296
// Respond with the current membership
289-
let members = self.membership.read().unwrap().members().clone();
290297
Some(GossipMessage::Sync {
291-
members: members.values().cloned().collect(),
298+
members: membership.into_members().into_values().collect(),
292299
})
293300
}
294301
};
295302

296-
if self
297-
.membership
298-
.read()
299-
.unwrap()
300-
.is_dead(self.current().node_id)
301-
{
303+
if self.membership.load().is_dead(self.current().node_id) {
302304
log::info!("current node is marked as dead; advancing incarnation");
303305
self.advance_incarnation();
304306
}
@@ -307,13 +309,14 @@ impl GossipState {
307309
}
308310

309311
fn advance_incarnation(&self) {
310-
let mut current = self.current_node.write().unwrap();
312+
let mut current = self.current();
311313
current.advance_incarnation();
312314
current.persist(&node_file_path(&self.dir));
315+
self.current_node.store(Arc::new(current));
313316
}
314317

315318
fn remove_dead_members(&self) -> Vec<NodeInfo> {
316-
let mut members = self.membership.write().unwrap();
319+
let mut members = (**self.membership.load()).clone();
317320
let dead_members: Vec<NodeInfo> = members
318321
.members()
319322
.iter()
@@ -332,6 +335,8 @@ impl GossipState {
332335
members.remove_member(dead_member.node_id);
333336
}
334337

338+
self.membership.store(Arc::new(members));
339+
335340
dead_members
336341
}
337342

@@ -421,19 +426,20 @@ impl GossipState {
421426

422427
fn rebuild_ring(&self) {
423428
// Ensure the current node is alive
424-
let mut membership = self.membership.write().unwrap();
429+
let mut membership = (**self.membership.load()).clone();
425430
membership.update_member(MemberState {
426431
info: self.current(),
427432
status: MemberStatus::Alive,
428433
heartbeat: Timestamp::now(),
429434
});
430435

431-
*self.ring.write().unwrap() =
432-
Arc::new(HashRing::from(membership.members().keys().cloned()));
436+
self.ring.store(Arc::new(HashRing::from(
437+
membership.members().keys().cloned(),
438+
)));
433439
}
434440

435441
fn mark_dead(&self, peer: &NodeInfo) {
436-
let mut members = self.membership.write().unwrap();
442+
let mut members = (**self.membership.load()).clone();
437443
if let Some(last_seen) = members.members().get(&peer.node_id).map(|m| m.heartbeat) {
438444
let member = MemberState {
439445
info: peer.clone(),
@@ -442,6 +448,7 @@ impl GossipState {
442448
};
443449
members.update_member(member);
444450
}
451+
self.membership.store(Arc::new(members));
445452
}
446453
}
447454

crates/gossip/src/member.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ impl Membership {
5858
&self.members
5959
}
6060

61+
pub fn into_members(self) -> BTreeMap<Uuid, MemberState> {
62+
self.members
63+
}
64+
6165
pub fn is_dead(&self, id: Uuid) -> bool {
6266
self.members
6367
.get(&id)
@@ -129,6 +133,16 @@ impl Membership {
129133
}
130134
}
131135

136+
impl FromIterator<MemberState> for Membership {
137+
fn from_iter<T: IntoIterator<Item = MemberState>>(iter: T) -> Self {
138+
let mut membership = Membership::default();
139+
for member in iter {
140+
membership.update_member(member);
141+
}
142+
membership
143+
}
144+
}
145+
132146
#[cfg(test)]
133147
mod membership_tests {
134148
use jiff::Timestamp;

0 commit comments

Comments
 (0)