Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ repository.workspace = true
version.workspace = true

[dependencies]
arc-swap = { workspace = true }
backon = { workspace = true }
exn = { workspace = true }
fastimer = { workspace = true }
Expand Down
75 changes: 41 additions & 34 deletions crates/gossip/src/gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@

use std::path::PathBuf;
use std::sync::Arc;
use std::sync::RwLock;
use std::time::Duration;

use arc_swap::ArcSwap;
use backon::ConstantBuilder;
use backon::Retryable;
use exn::Result;
Expand Down Expand Up @@ -61,35 +61,35 @@ pub type GossipFuture = JoinHandle<Result<(), GossipError>>;
pub struct GossipState {
dir: PathBuf,
initial_peers: Vec<Url>,
current_node: RwLock<NodeInfo>,
current_node: ArcSwap<NodeInfo>,
transport: Transport,

membership: RwLock<Membership>,
ring: RwLock<Arc<HashRing<Uuid>>>,
membership: ArcSwap<Membership>,
ring: ArcSwap<HashRing<Uuid>>,
}

impl GossipState {
pub fn new(current_node: NodeInfo, initial_peers: Vec<Url>, dir: PathBuf) -> Self {
Self {
dir,
initial_peers,
current_node: RwLock::new(current_node),
membership: RwLock::new(Membership::default()),
current_node: ArcSwap::new(Arc::new(current_node)),
membership: ArcSwap::new(Arc::new(Membership::default())),
transport: Transport::new(),
ring: RwLock::new(Arc::new(HashRing::default())),
ring: ArcSwap::new(Arc::new(HashRing::default())),
}
}

pub fn current(&self) -> NodeInfo {
self.current_node.read().unwrap().clone()
(**self.current_node.load()).clone()
}

pub fn membership(&self) -> Membership {
self.membership.read().unwrap().clone()
pub fn membership(&self) -> Arc<Membership> {
self.membership.load_full()
}

pub fn ring(&self) -> Arc<HashRing<Uuid>> {
self.ring.read().unwrap().clone()
self.ring.load_full()
}

/// Start the gossip protocol.
Expand All @@ -101,11 +101,12 @@ impl GossipState {
let mut gossip_futs = vec![];

// Fast bootstrap
self.membership.write().unwrap().update_member(MemberState {
info: self.current(),
status: MemberStatus::Alive,
heartbeat: Timestamp::now(),
});
self.membership
.store(Arc::new(Membership::from_iter([MemberState {
info: self.current(),
status: MemberStatus::Alive,
heartbeat: Timestamp::now(),
}])));

let state_clone = self.clone();
rt.spawn(async move {
Expand Down Expand Up @@ -255,50 +256,51 @@ impl GossipState {
log::debug!("received message: {message:?}");
let result = match message {
GossipMessage::Ping(info) => {
self.membership.write().unwrap().update_member(MemberState {
let mut membership = (**self.membership.load()).clone();
membership.update_member(MemberState {
info: info.clone(),
status: MemberStatus::Alive,
heartbeat: Timestamp::now(),
});
self.membership.store(Arc::new(membership));

// Respond with an ack
Some(GossipMessage::Ack(self.current()))
}
GossipMessage::Ack(info) => {
self.membership.write().unwrap().update_member(MemberState {
let mut membership = (**self.membership.load()).clone();
membership.update_member(MemberState {
info: info.clone(),
status: MemberStatus::Alive,
heartbeat: Timestamp::now(),
});
self.membership.store(Arc::new(membership));

None
}
GossipMessage::Sync { members } => {
let mut membership = (**self.membership.load()).clone();
for member in members {
self.membership.write().unwrap().update_member(member);
membership.update_member(member);
}

// Ensure the current node is alive
self.membership.write().unwrap().update_member(MemberState {
membership.update_member(MemberState {
info: self.current(),
status: MemberStatus::Alive,
heartbeat: Timestamp::now(),
});

self.membership.store(Arc::new(membership.clone()));

// Respond with the current membership
let members = self.membership.read().unwrap().members().clone();
Some(GossipMessage::Sync {
members: members.values().cloned().collect(),
members: membership.into_members().into_values().collect(),
})
}
};

if self
.membership
.read()
.unwrap()
.is_dead(self.current().node_id)
{
if self.membership.load().is_dead(self.current().node_id) {
log::info!("current node is marked as dead; advancing incarnation");
self.advance_incarnation();
}
Expand All @@ -307,13 +309,14 @@ impl GossipState {
}

fn advance_incarnation(&self) {
let mut current = self.current_node.write().unwrap();
let mut current = self.current();
current.advance_incarnation();
current.persist(&node_file_path(&self.dir));
self.current_node.store(Arc::new(current));
}

fn remove_dead_members(&self) -> Vec<NodeInfo> {
let mut members = self.membership.write().unwrap();
let mut members = (**self.membership.load()).clone();
let dead_members: Vec<NodeInfo> = members
.members()
.iter()
Expand All @@ -332,6 +335,8 @@ impl GossipState {
members.remove_member(dead_member.node_id);
}

self.membership.store(Arc::new(members));

dead_members
}

Expand Down Expand Up @@ -421,19 +426,20 @@ impl GossipState {

fn rebuild_ring(&self) {
// Ensure the current node is alive
let mut membership = self.membership.write().unwrap();
let mut membership = (**self.membership.load()).clone();
membership.update_member(MemberState {
info: self.current(),
status: MemberStatus::Alive,
heartbeat: Timestamp::now(),
});

*self.ring.write().unwrap() =
Arc::new(HashRing::from(membership.members().keys().cloned()));
self.ring.store(Arc::new(HashRing::from(
membership.members().keys().cloned(),
)));
}

fn mark_dead(&self, peer: &NodeInfo) {
let mut members = self.membership.write().unwrap();
let mut members = (**self.membership.load()).clone();
if let Some(last_seen) = members.members().get(&peer.node_id).map(|m| m.heartbeat) {
let member = MemberState {
info: peer.clone(),
Expand All @@ -442,6 +448,7 @@ impl GossipState {
};
members.update_member(member);
}
self.membership.store(Arc::new(members));
}
}

Expand Down
14 changes: 14 additions & 0 deletions crates/gossip/src/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ impl Membership {
&self.members
}

pub fn into_members(self) -> BTreeMap<Uuid, MemberState> {
self.members
}

pub fn is_dead(&self, id: Uuid) -> bool {
self.members
.get(&id)
Expand Down Expand Up @@ -129,6 +133,16 @@ impl Membership {
}
}

impl FromIterator<MemberState> for Membership {
fn from_iter<T: IntoIterator<Item = MemberState>>(iter: T) -> Self {
let mut membership = Membership::default();
for member in iter {
membership.update_member(member);
}
membership
}
}

#[cfg(test)]
mod membership_tests {
use jiff::Timestamp;
Expand Down