diff --git a/Cargo.lock b/Cargo.lock index 9fdc96e..8e14015 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2549,6 +2549,7 @@ dependencies = [ name = "percas-gossip" version = "0.4.0" dependencies = [ + "arc-swap", "backon", "exn", "fastimer", diff --git a/crates/gossip/Cargo.toml b/crates/gossip/Cargo.toml index 77a64ad..d428076 100644 --- a/crates/gossip/Cargo.toml +++ b/crates/gossip/Cargo.toml @@ -25,6 +25,7 @@ repository.workspace = true version.workspace = true [dependencies] +arc-swap = { workspace = true } backon = { workspace = true } exn = { workspace = true } fastimer = { workspace = true } diff --git a/crates/gossip/src/gossip.rs b/crates/gossip/src/gossip.rs index 7430ce4..f7d186d 100644 --- a/crates/gossip/src/gossip.rs +++ b/crates/gossip/src/gossip.rs @@ -14,9 +14,9 @@ use std::path::PathBuf; use std::sync::Arc; -use std::sync::RwLock; use std::time::Duration; +use arc_swap::ArcSwap; use backon::ConstantBuilder; use backon::Retryable; use exn::Result; @@ -61,11 +61,11 @@ pub type GossipFuture = JoinHandle>; pub struct GossipState { dir: PathBuf, initial_peers: Vec, - current_node: RwLock, + current_node: ArcSwap, transport: Transport, - membership: RwLock, - ring: RwLock>>, + membership: ArcSwap, + ring: ArcSwap>, } impl GossipState { @@ -73,23 +73,23 @@ impl GossipState { Self { dir, initial_peers, - current_node: RwLock::new(current_node), - membership: RwLock::new(Membership::default()), + current_node: ArcSwap::new(Arc::new(current_node)), + membership: ArcSwap::new(Arc::new(Membership::default())), transport: Transport::new(), - ring: RwLock::new(Arc::new(HashRing::default())), + ring: ArcSwap::new(Arc::new(HashRing::default())), } } pub fn current(&self) -> NodeInfo { - self.current_node.read().unwrap().clone() + (**self.current_node.load()).clone() } - pub fn membership(&self) -> Membership { - self.membership.read().unwrap().clone() + pub fn membership(&self) -> Arc { + self.membership.load_full() } pub fn ring(&self) -> Arc> { - self.ring.read().unwrap().clone() + self.ring.load_full() } /// Start the gossip protocol. @@ -101,11 +101,12 @@ impl GossipState { let mut gossip_futs = vec![]; // Fast bootstrap - self.membership.write().unwrap().update_member(MemberState { - info: self.current(), - status: MemberStatus::Alive, - heartbeat: Timestamp::now(), - }); + self.membership + .store(Arc::new(Membership::from_iter([MemberState { + info: self.current(), + status: MemberStatus::Alive, + heartbeat: Timestamp::now(), + }]))); let state_clone = self.clone(); rt.spawn(async move { @@ -255,50 +256,51 @@ impl GossipState { log::debug!("received message: {message:?}"); let result = match message { GossipMessage::Ping(info) => { - self.membership.write().unwrap().update_member(MemberState { + let mut membership = (**self.membership.load()).clone(); + membership.update_member(MemberState { info: info.clone(), status: MemberStatus::Alive, heartbeat: Timestamp::now(), }); + self.membership.store(Arc::new(membership)); // Respond with an ack Some(GossipMessage::Ack(self.current())) } GossipMessage::Ack(info) => { - self.membership.write().unwrap().update_member(MemberState { + let mut membership = (**self.membership.load()).clone(); + membership.update_member(MemberState { info: info.clone(), status: MemberStatus::Alive, heartbeat: Timestamp::now(), }); + self.membership.store(Arc::new(membership)); None } GossipMessage::Sync { members } => { + let mut membership = (**self.membership.load()).clone(); for member in members { - self.membership.write().unwrap().update_member(member); + membership.update_member(member); } // Ensure the current node is alive - self.membership.write().unwrap().update_member(MemberState { + membership.update_member(MemberState { info: self.current(), status: MemberStatus::Alive, heartbeat: Timestamp::now(), }); + self.membership.store(Arc::new(membership.clone())); + // Respond with the current membership - let members = self.membership.read().unwrap().members().clone(); Some(GossipMessage::Sync { - members: members.values().cloned().collect(), + members: membership.into_members().into_values().collect(), }) } }; - if self - .membership - .read() - .unwrap() - .is_dead(self.current().node_id) - { + if self.membership.load().is_dead(self.current().node_id) { log::info!("current node is marked as dead; advancing incarnation"); self.advance_incarnation(); } @@ -307,13 +309,14 @@ impl GossipState { } fn advance_incarnation(&self) { - let mut current = self.current_node.write().unwrap(); + let mut current = self.current(); current.advance_incarnation(); current.persist(&node_file_path(&self.dir)); + self.current_node.store(Arc::new(current)); } fn remove_dead_members(&self) -> Vec { - let mut members = self.membership.write().unwrap(); + let mut members = (**self.membership.load()).clone(); let dead_members: Vec = members .members() .iter() @@ -332,6 +335,8 @@ impl GossipState { members.remove_member(dead_member.node_id); } + self.membership.store(Arc::new(members)); + dead_members } @@ -421,19 +426,20 @@ impl GossipState { fn rebuild_ring(&self) { // Ensure the current node is alive - let mut membership = self.membership.write().unwrap(); + let mut membership = (**self.membership.load()).clone(); membership.update_member(MemberState { info: self.current(), status: MemberStatus::Alive, heartbeat: Timestamp::now(), }); - *self.ring.write().unwrap() = - Arc::new(HashRing::from(membership.members().keys().cloned())); + self.ring.store(Arc::new(HashRing::from( + membership.members().keys().cloned(), + ))); } fn mark_dead(&self, peer: &NodeInfo) { - let mut members = self.membership.write().unwrap(); + let mut members = (**self.membership.load()).clone(); if let Some(last_seen) = members.members().get(&peer.node_id).map(|m| m.heartbeat) { let member = MemberState { info: peer.clone(), @@ -442,6 +448,7 @@ impl GossipState { }; members.update_member(member); } + self.membership.store(Arc::new(members)); } } diff --git a/crates/gossip/src/member.rs b/crates/gossip/src/member.rs index 71db715..80f0a4b 100644 --- a/crates/gossip/src/member.rs +++ b/crates/gossip/src/member.rs @@ -58,6 +58,10 @@ impl Membership { &self.members } + pub fn into_members(self) -> BTreeMap { + self.members + } + pub fn is_dead(&self, id: Uuid) -> bool { self.members .get(&id) @@ -129,6 +133,16 @@ impl Membership { } } +impl FromIterator for Membership { + fn from_iter>(iter: T) -> Self { + let mut membership = Membership::default(); + for member in iter { + membership.update_member(member); + } + membership + } +} + #[cfg(test)] mod membership_tests { use jiff::Timestamp;