Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ repository.workspace = true
version.workspace = true

[dependencies]
arc-swap = { workspace = true }
backon = { workspace = true }
exn = { workspace = true }
fastimer = { workspace = true }
Expand Down
65 changes: 36 additions & 29 deletions crates/gossip/src/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::sync::Arc;
use std::sync::RwLock;
use std::time::Duration;

use arc_swap::ArcSwap;
use backon::ConstantBuilder;
use backon::Retryable;
use exn::Result;
Expand Down Expand Up @@ -64,8 +65,8 @@ pub struct GossipState {
current_node: RwLock<NodeInfo>,
transport: Transport,

membership: RwLock<Membership>,
ring: RwLock<Arc<HashRing<Uuid>>>,
membership: ArcSwap<Membership>,
ring: ArcSwap<HashRing<Uuid>>,
}

impl GossipState {
Expand All @@ -74,22 +75,22 @@ impl GossipState {
dir,
initial_peers,
current_node: RwLock::new(current_node),
membership: RwLock::new(Membership::default()),
membership: ArcSwap::new(Arc::new(Membership::default())),
transport: Transport::new(),
ring: RwLock::new(Arc::new(HashRing::default())),
ring: ArcSwap::new(Arc::new(HashRing::default())),
}
}

pub fn current(&self) -> NodeInfo {
self.current_node.read().unwrap().clone()
}

pub fn membership(&self) -> Membership {
self.membership.read().unwrap().clone()
pub fn membership(&self) -> Arc<Membership> {
self.membership.load_full()
}

pub fn ring(&self) -> Arc<HashRing<Uuid>> {
self.ring.read().unwrap().clone()
self.ring.load_full()
}

/// Start the gossip protocol.
Expand All @@ -101,11 +102,12 @@ impl GossipState {
let mut gossip_futs = vec![];

// Fast bootstrap
self.membership.write().unwrap().update_member(MemberState {
info: self.current(),
status: MemberStatus::Alive,
heartbeat: Timestamp::now(),
});
self.membership
.store(Arc::new(Membership::from_iter([MemberState {
info: self.current(),
status: MemberStatus::Alive,
heartbeat: Timestamp::now(),
}])));

let state_clone = self.clone();
rt.spawn(async move {
Expand Down Expand Up @@ -255,50 +257,51 @@ impl GossipState {
log::debug!("received message: {message:?}");
let result = match message {
GossipMessage::Ping(info) => {
self.membership.write().unwrap().update_member(MemberState {
let mut membership = (**self.membership.load()).clone();
membership.update_member(MemberState {
info: info.clone(),
status: MemberStatus::Alive,
heartbeat: Timestamp::now(),
});
self.membership.store(Arc::new(membership));

// Respond with an ack
Some(GossipMessage::Ack(self.current()))
}
GossipMessage::Ack(info) => {
self.membership.write().unwrap().update_member(MemberState {
let mut membership = (**self.membership.load()).clone();
membership.update_member(MemberState {
info: info.clone(),
status: MemberStatus::Alive,
heartbeat: Timestamp::now(),
});
self.membership.store(Arc::new(membership));

None
}
GossipMessage::Sync { members } => {
let mut membership = (**self.membership.load()).clone();
for member in members {
self.membership.write().unwrap().update_member(member);
membership.update_member(member);
}

// Ensure the current node is alive
self.membership.write().unwrap().update_member(MemberState {
membership.update_member(MemberState {
info: self.current(),
status: MemberStatus::Alive,
heartbeat: Timestamp::now(),
});

self.membership.store(Arc::new(membership.clone()));

// Respond with the current membership
let members = self.membership.read().unwrap().members().clone();
Some(GossipMessage::Sync {
members: members.values().cloned().collect(),
members: membership.into_members().into_values().collect(),
})
}
};

if self
.membership
.read()
.unwrap()
.is_dead(self.current().node_id)
{
if self.membership.load().is_dead(self.current().node_id) {
log::info!("current node is marked as dead; advancing incarnation");
self.advance_incarnation();
}
Expand All @@ -313,7 +316,7 @@ impl GossipState {
}

fn remove_dead_members(&self) -> Vec<NodeInfo> {
let mut members = self.membership.write().unwrap();
let mut members = (**self.membership.load()).clone();
let dead_members: Vec<NodeInfo> = members
.members()
.iter()
Expand All @@ -332,6 +335,8 @@ impl GossipState {
members.remove_member(dead_member.node_id);
}

self.membership.store(Arc::new(members));

dead_members
}

Expand Down Expand Up @@ -421,19 +426,20 @@ impl GossipState {

fn rebuild_ring(&self) {
// Ensure the current node is alive
let mut membership = self.membership.write().unwrap();
let mut membership = (**self.membership.load()).clone();
membership.update_member(MemberState {
info: self.current(),
status: MemberStatus::Alive,
heartbeat: Timestamp::now(),
});

*self.ring.write().unwrap() =
Arc::new(HashRing::from(membership.members().keys().cloned()));
self.ring.store(Arc::new(HashRing::from(
membership.members().keys().cloned(),
)));
}

fn mark_dead(&self, peer: &NodeInfo) {
let mut members = self.membership.write().unwrap();
let mut members = (**self.membership.load()).clone();
if let Some(last_seen) = members.members().get(&peer.node_id).map(|m| m.heartbeat) {
let member = MemberState {
info: peer.clone(),
Expand All @@ -442,6 +448,7 @@ impl GossipState {
};
members.update_member(member);
}
self.membership.store(Arc::new(members));
}
}

Expand Down
14 changes: 14 additions & 0 deletions crates/gossip/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ impl Membership {
&self.members
}

pub fn into_members(self) -> BTreeMap<Uuid, MemberState> {
self.members
}

pub fn is_dead(&self, id: Uuid) -> bool {
self.members
.get(&id)
Expand Down Expand Up @@ -129,6 +133,16 @@ impl Membership {
}
}

impl FromIterator<MemberState> for Membership {
fn from_iter<T: IntoIterator<Item = MemberState>>(iter: T) -> Self {
let mut membership = Membership::default();
for member in iter {
membership.update_member(member);
}
membership
}
}

#[cfg(test)]
mod membership_tests {
use jiff::Timestamp;
Expand Down
Loading