Skip to content

Commit bcb7da1

Browse files
committed
refresh ring eagerly
1 parent 7a9c310 commit bcb7da1

File tree

2 files changed

+24
-3
lines changed

2 files changed

+24
-3
lines changed

crates/cluster/src/gossip.rs

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ const DEFAULT_SYNC_INTERVAL: Duration = Duration::from_secs(5);
5959
const DEFAULT_RETRY_INTERVAL: Duration = Duration::from_secs(1);
6060
const DEFAULT_RETRIES: usize = 3;
6161

62-
const DEFAULT_REBUILD_RING_INTERVAL: Duration = Duration::from_secs(10);
62+
const DEFAULT_REBUILD_RING_INTERVAL: Duration = Duration::from_secs(5);
6363

6464
const DEFAULT_MEMBER_DEADLINE: Duration = Duration::from_secs(30);
6565

@@ -329,9 +329,16 @@ impl GossipState {
329329
}
330330

331331
fn rebuild_ring(&self) {
332-
let members = self.membership.read().unwrap();
332+
// Ensure the current node is alive
333+
let mut membership = self.membership.write().unwrap();
334+
membership.update_member(MemberState {
335+
info: self.current(),
336+
status: MemberStatus::Alive,
337+
heartbeat: Timestamp::now(),
338+
});
333339

334-
*self.ring.write().unwrap() = Arc::new(HashRing::from(members.members().keys().cloned()));
340+
*self.ring.write().unwrap() =
341+
Arc::new(HashRing::from(membership.members().keys().cloned()));
335342
}
336343

337344
fn mark_dead(&self, peer: &NodeInfo) {
@@ -429,6 +436,10 @@ async fn drive_gossip(state: Arc<GossipState>, runtime: &Runtime) -> Result<(),
429436
.iter()
430437
.nth(random::<usize>() % membership.members().len())
431438
{
439+
if member.status == MemberStatus::Dead {
440+
log::debug!("skipping dead member: {member:?}");
441+
continue;
442+
}
432443
log::debug!("pinging member: {member:?}");
433444
state.ping(member.info.clone()).await;
434445
} else {
@@ -452,6 +463,10 @@ async fn drive_gossip(state: Arc<GossipState>, runtime: &Runtime) -> Result<(),
452463
.iter()
453464
.nth(random::<usize>() % membership.members().len())
454465
{
466+
if member.status == MemberStatus::Dead {
467+
log::debug!("skipping dead member: {member:?}");
468+
continue;
469+
}
455470
log::debug!("syncing member: {member:?}");
456471
state.sync(member.info.clone()).await;
457472
} else {

crates/cluster/src/member.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ impl Membership {
6868
Entry::Occupied(mut entry) => {
6969
let current = entry.get_mut();
7070
if current.info.incarnation < member.info.incarnation {
71+
log::info!(target: "gossip", "advancing member incarnation from [{}] to [{}]: {member:?}", current.info.incarnation, member.info.incarnation);
7172
*current = member;
7273
return;
7374
}
@@ -76,15 +77,20 @@ impl Membership {
7677
}
7778
// If the incarnation is the same, we only accept downgrades
7879
current.status.downgrade_to(&member.status);
80+
if member.status == MemberStatus::Dead {
81+
log::info!(target: "gossip", "member confirmed dead: {member:?}");
82+
}
7983
current.heartbeat = current.heartbeat.max(member.heartbeat);
8084
}
8185
Entry::Vacant(entry) => {
86+
log::info!(target: "gossip", "adding new member: {member:?}");
8287
entry.insert(member);
8388
}
8489
}
8590
}
8691

8792
pub fn remove_member(&mut self, id: Uuid) {
93+
log::info!(target: "gossip", "removing member: {id}");
8894
self.members.remove(&id);
8995
}
9096
}

0 commit comments

Comments
 (0)