Skip to content

Commit 9030111

Browse files
authored
periodically sync in-memory member state to foca state (#410)
Signed-off-by: Somtochi Onyekwere <[email protected]>
1 parent 68285c1 commit 9030111

File tree

2 files changed

+56
-4
lines changed

2 files changed

+56
-4
lines changed

crates/corro-agent/src/broadcast/mod.rs

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,7 @@ pub fn runtime_loop(
147147
bounded(agent.config().perf.schedule_channel_len, "to_schedule");
148148

149149
let mut runtime: DispatchRuntime<Actor> =
150-
DispatchRuntime::new(to_send_tx, to_schedule_tx, notifications_tx);
150+
DispatchRuntime::new(to_send_tx, to_schedule_tx, notifications_tx.clone());
151151

152152
let (timer_tx, mut timer_rx) = mpsc::channel(10);
153153
let timer_spawner = TimerSpawner::new(timer_tx);
@@ -322,7 +322,7 @@ pub fn runtime_loop(
322322
gauge!("corro.gossip.cluster_size").set(last_cluster_size.get() as f64);
323323
}
324324
Branch::DiffMembers => {
325-
diff_member_states(&agent, &foca, &mut last_states);
325+
diff_member_states(&agent, &foca, &mut last_states, &notifications_tx);
326326
}
327327
}
328328

@@ -375,7 +375,9 @@ pub fn runtime_loop(
375375
}
376376
}
377377

378-
if let Some(handle) = diff_member_states(&agent, &foca, &mut last_states) {
378+
if let Some(handle) =
379+
diff_member_states(&agent, &foca, &mut last_states, &notifications_tx)
380+
{
379381
info!("Waiting on task to update member states...");
380382
if let Err(e) = handle.await {
381383
error!("could not await task to update member states: {e}");
@@ -820,6 +822,7 @@ fn diff_member_states(
820822
agent: &Agent,
821823
foca: &Foca<Actor, BincodeCodec<bincode::config::Configuration>, StdRng, NoCustomBroadcast>,
822824
last_states: &mut HashMap<ActorId, (foca::Member<Actor>, Option<u64>)>,
825+
notifications_tx: &CorroSender<foca::OwnedNotification<Actor>>,
823826
) -> Option<tokio::task::JoinHandle<()>> {
824827
let mut foca_states = HashMap::new();
825828
for member in foca.iter_membership_state() {
@@ -885,10 +888,51 @@ fn diff_member_states(
885888
}
886889
});
887890

891+
let mut foca_notifications = vec![];
892+
foca_notifications.extend(foca_states.iter().filter_map(|(id, member)| {
893+
let member = *member;
894+
if foca_state_is_active(&member.state()) && members.get(id).is_none() {
895+
Some(foca::OwnedNotification::MemberUp(member.id().clone()))
896+
} else {
897+
None
898+
}
899+
}));
900+
901+
foca_notifications.extend(members.states.iter().filter_map(|(id, member_state)| {
902+
match foca_states.get(id) {
903+
Some(foca_state) => {
904+
if !foca_state_is_active(&foca_state.state()) {
905+
Some(foca::OwnedNotification::MemberDown(foca_state.id().clone()))
906+
} else {
907+
None
908+
}
909+
}
910+
None => Some(foca::OwnedNotification::MemberDown(
911+
member_state.to_actor(*id),
912+
)),
913+
}
914+
}));
915+
916+
drop(members);
917+
918+
if !foca_notifications.is_empty() {
919+
info!(
920+
"Sending out {} foca notifications for members",
921+
foca_notifications.len()
922+
);
923+
// best effort update since we'd retry this function.
924+
for notification in foca_notifications {
925+
if let Err(e) = notifications_tx.try_send(notification) {
926+
error!("error dispatching notifications from diff_member_states: {e}");
927+
counter!("corro.channel.error", "type" => "full", "name" => "dispatch.notifications")
928+
.increment(1);
929+
}
930+
}
931+
}
932+
888933
if to_update.is_empty() && to_delete.is_empty() {
889934
return None;
890935
}
891-
892936
info!(
893937
"Scheduling cluster membership state update for {} members (delete: {})",
894938
to_update.len(),
@@ -968,6 +1012,10 @@ fn diff_member_states(
9681012
}))
9691013
}
9701014

1015+
fn foca_state_is_active(state: &foca::State) -> bool {
1016+
matches!(*state, foca::State::Alive | foca::State::Suspect)
1017+
}
1018+
9711019
fn make_foca_config(cluster_size: NonZeroU32) -> foca::Config {
9721020
let mut config = foca::Config::new_wan(cluster_size);
9731021
config.remove_down_after = Duration::from_secs(2 * 24 * 3600);

crates/corro-types/src/members.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ impl MemberState {
4040
pub fn is_ring0(&self) -> bool {
4141
self.ring == Some(0)
4242
}
43+
44+
pub fn to_actor(&self, id: ActorId) -> Actor {
45+
Actor::new(id, self.addr, self.ts, self.cluster_id, self.member_id)
46+
}
4347
}
4448

4549
const RING_BUCKETS: [Range<u64>; 6] = [0..6, 6..15, 15..50, 50..100, 100..200, 200..300];

0 commit comments

Comments
 (0)