Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions crates/corro-admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ pub enum LogCommand {
pub enum ClusterCommand {
Rejoin,
Members,
MembershipStates,
MembershipStates { local: bool },
SetId(ClusterId),
}

Expand Down Expand Up @@ -347,9 +347,20 @@ async fn handle_conn(
}
send_success(&mut stream).await;
}
Command::Cluster(ClusterCommand::MembershipStates) => {
Command::Cluster(ClusterCommand::MembershipStates { local }) => {
info_log(&mut stream, "gathering membership state").await;

if local {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so generally don't dump all peers we how about but only the ones we are connected to directly.

let members = agent.members().read().states.clone();
for member in members {
match serde_json::to_value(&member) {
Ok(json) => send(&mut stream, Response::Json(json)).await,
Err(e) => send_error(&mut stream, e).await,
}
}
send_success(&mut stream).await;
continue;
}
let (tx, mut rx) = mpsc::channel(1024);
if let Err(e) = agent
.tx_foca()
Expand Down Expand Up @@ -386,10 +397,11 @@ async fn handle_conn(

let (cb_tx, cb_rx) = oneshot::channel();

let member_id = agent.member_id();
agent
.tx_foca()
.blocking_send(FocaInput::Cmd(FocaCmd::ChangeIdentity(
agent.actor(cluster_id),
agent.actor(cluster_id, member_id),
cb_tx,
)))
.map_err(|_| ProcessingError::Send)?;
Expand Down
16 changes: 12 additions & 4 deletions crates/corro-agent/src/agent/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ pub fn spawn_swim_announcer(agent: &Agent, gossip_addr: SocketAddr, mut tripwire
_ = timer.as_mut() => {}
}

// TODO: find way to find and filter out addrs with a different membership id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So different peers which share a IP?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we keep sending out Announce messages even to peers that we learn have a different membership id. We could pass this information around, and exclude the peer's ip when we want to announce. But it is mostly an optimization so I left it out of this pr

match bootstrap::generate_bootstrap(
agent.config().gossip.bootstrap.as_slice(),
gossip_addr,
Expand Down Expand Up @@ -301,9 +302,14 @@ pub async fn handle_notifications(
info!("Member Up {actor:?} (result: {member_added_res:?})");

match member_added_res {
MemberAddedResult::NewMember => {
debug!("Member Added {actor:?}");
counter!("corro.gossip.member.added", "id" => actor.id().0.to_string(), "addr" => actor.addr().to_string()).increment(1);
MemberAddedResult::NewMember | MemberAddedResult::Removed => {
if matches!(member_added_res, MemberAddedResult::Removed) {
debug!("Member Removed {actor:?} due to member id mismatch");
counter!("corro.gossip.member.removed", "id" => actor.id().0.to_string(), "addr" => actor.addr().to_string()).increment(1);
} else {
debug!("Member Added {actor:?}");
counter!("corro.gossip.member.added", "id" => actor.id().0.to_string(), "addr" => actor.addr().to_string()).increment(1);
}

let members_len = { agent.members().read().states.len() as u32 };

Expand Down Expand Up @@ -832,7 +838,9 @@ pub async fn handle_sync(
.iter()
// Filter out self
.filter(|(id, state)| {
**id != agent.actor_id() && state.cluster_id == agent.cluster_id()
**id != agent.actor_id()
&& state.cluster_id == agent.cluster_id()
&& state.member_id == agent.member_id()
})
// Grab a ring-buffer index to the member RTT range
.map(|(id, state)| {
Expand Down
7 changes: 5 additions & 2 deletions crates/corro-agent/src/agent/run_root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,17 +83,20 @@ async fn run(
let (notifications_tx, notifications_rx) =
bounded(pconf.notifications_channel_len, "notifications");

let member_states = util::load_member_states(&agent).await;

//// Start the main SWIM runtime loop
runtime_loop(
// here the agent already has the current cluster_id, we don't need to pass one
agent.actor(None),
agent.actor(None, agent.config().gossip.member_id),
agent.clone(),
transport.clone(),
rx_foca,
rx_bcast,
to_send_tx,
notifications_tx,
tripwire.clone(),
member_states.clone(),
);

//// Update member connection RTTs
Expand All @@ -102,7 +105,7 @@ async fn run(
handlers::spawn_swim_announcer(&agent, gossip_addr, tripwire.clone());

// Load existing cluster members into the SWIM runtime
util::initialise_foca(&agent).await;
util::initialise_foca(&agent, member_states).await;

// Load schema from paths
let stmts = corro_utils::read_files_from_paths(&agent.config().db.schema_paths).await?;
Expand Down
2 changes: 1 addition & 1 deletion crates/corro-agent/src/agent/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub async fn setup(conf: Config, tripwire: Tripwire) -> eyre::Result<(Agent, Age
}

// do this early to error earlier
let members = Members::default();
let members = Members::new(conf.gossip.member_id);

let actor_id = {
// we need to set auto_vacuum before any tables are created
Expand Down
99 changes: 98 additions & 1 deletion crates/corro-agent/src/agent/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::{
use corro_tests::*;
use corro_types::change::Change;
use corro_types::{
actor::ActorId,
actor::{ActorId, MemberId},
api::{ExecResponse, ExecResult, Statement},
base::{dbsr, dbsri, dbvri, CrsqlDbVersion, CrsqlDbVersionRange, CrsqlSeq},
broadcast::{ChangeSource, ChangeV1, Changeset},
Expand Down Expand Up @@ -1397,3 +1397,100 @@ async fn many_small_changes() -> eyre::Result<()> {

Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_diff_member_id() -> eyre::Result<()> {
_ = tracing_subscriber::fmt::try_init();
let (tripwire, tripwire_worker, tripwire_tx) = Tripwire::new_simple();

let (ta1_tp, ta1_tp_worker, ta1_tp_tx) = Tripwire::new_simple();
let ta1 = launch_test_agent(|conf| conf.member_id(MemberId(1)).build(), ta1_tp.clone()).await?;
let ta2 = launch_test_agent(
|conf| {
conf.bootstrap(vec![ta1.agent.gossip_addr().to_string()])
.member_id(MemberId(2))
.build()
},
tripwire.clone(),
)
.await?;

let ta3 = launch_test_agent(
|conf| {
conf.bootstrap(vec![ta2.agent.gossip_addr().to_string()])
.member_id(MemberId(1))
.build()
},
tripwire.clone(),
)
.await?;

let ta4 = launch_test_agent(
|conf| {
conf.bootstrap(vec![ta1.agent.gossip_addr().to_string()])
.member_id(MemberId(2))
.build()
},
tripwire.clone(),
)
.await?;

// small sleep to ensure they announce themselves to each other
tokio::time::sleep(Duration::from_secs(3)).await;

let members = ta1.agent.members().read().states.clone();
assert_eq!(members.len(), 1);
assert!(members.contains_key(&ta3.agent.actor_id()));

let members = ta3.agent.members().read().states.clone();
assert_eq!(members.len(), 1);
assert!(members.contains_key(&ta1.agent.actor_id()));

let members = ta2.agent.members().read().states.clone();
assert_eq!(members.len(), 1);
assert!(members.contains_key(&ta4.agent.actor_id()));

let members = ta4.agent.members().read().states.clone();
assert_eq!(members.len(), 1);
assert!(members.contains_key(&ta2.agent.actor_id()));

// restart ta1 with a different membership id
println!("ta1: {:?}", ta1.agent.actor_id());
let ta1_db_path = ta1.agent.db_path();
let ta1_gossip_addr = ta1.agent.gossip_addr();
ta1_tp_tx.send(()).await.ok();
ta1_tp_worker.await;
tokio::time::sleep(Duration::from_secs(2)).await;
let ta1 = launch_test_agent(
|conf| {
conf.member_id(MemberId(2))
.db_path(ta1_db_path)
.gossip_addr(ta1_gossip_addr)
.bootstrap(vec![ta2.agent.gossip_addr().to_string()])
.build()
},
tripwire.clone(),
)
.await?;

tokio::time::sleep(Duration::from_secs(3)).await;

let members = ta1.agent.members().read().states.clone();
assert_eq!(members.len(), 2);
assert!(members.contains_key(&ta2.agent.actor_id()));
assert!(members.contains_key(&ta4.agent.actor_id()));

let members = ta3.agent.members().read().states.clone();
assert_eq!(members.len(), 0);

let members = ta2.agent.members().read().states.clone();
assert_eq!(members.len(), 2);
assert!(members.contains_key(&ta1.agent.actor_id()));
assert!(members.contains_key(&ta4.agent.actor_id()));

tripwire_tx.send(()).await.ok();
tripwire_worker.await;
wait_for_all_pending_handles().await;

Ok(())
}
3 changes: 1 addition & 2 deletions crates/corro-agent/src/agent/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ use tower_http::trace::TraceLayer;
use tracing::{debug, error, info, trace, warn};
use tripwire::{Outcome, PreemptibleFutureExt, Tripwire};

pub async fn initialise_foca(agent: &Agent) {
let states = load_member_states(agent).await;
pub async fn initialise_foca(agent: &Agent, states: Vec<(SocketAddr, Member<Actor>)>) {
if !states.is_empty() {
let mut foca_states = BTreeMap::<SocketAddr, Member<Actor>>::new();

Expand Down
1 change: 1 addition & 0 deletions crates/corro-agent/src/api/peer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2428,6 +2428,7 @@ mod tests {
plaintext: false,
max_mtu: None,
disable_gso: false,
member_id: None,
};

let server = gossip_server_endpoint(&gossip_config).await?;
Expand Down
18 changes: 15 additions & 3 deletions crates/corro-agent/src/broadcast/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
};

use bytes::{BufMut, Bytes, BytesMut};
use foca::{BincodeCodec, Foca, Identity, NoCustomBroadcast, OwnedNotification, Timer};
use foca::{BincodeCodec, Foca, Identity, Member, NoCustomBroadcast, OwnedNotification, Timer};
use futures::{
stream::{FusedStream, FuturesUnordered},
Future,
Expand Down Expand Up @@ -127,6 +127,7 @@ pub fn runtime_loop(
to_send_tx: CorroSender<(Actor, Bytes)>,
notifications_tx: CorroSender<OwnedNotification<Actor>>,
tripwire: Tripwire,
member_states: Vec<(SocketAddr, Member<Actor>)>,
) {
debug!("starting runtime loop for actor: {actor:?}");
let rng = StdRng::from_os_rng();
Expand Down Expand Up @@ -169,7 +170,10 @@ pub fn runtime_loop(
let mut metrics_interval = tokio::time::interval(Duration::from_secs(10));
let mut last_cluster_size = unsafe { NonZeroU32::new_unchecked(1) };

let mut last_states = HashMap::new();
let mut last_states = member_states
.into_iter()
.map(|(_, member)| (member.id().id(), (member, None)))
.collect::<HashMap<_, _>>();
let mut diff_last_states_every = tokio::time::interval(Duration::from_secs(60));

#[derive(EnumDiscriminants)]
Expand Down Expand Up @@ -832,11 +836,17 @@ fn diff_member_states(
}

let members = agent.members().read();
let member_id = agent.config().gossip.member_id;

let to_update = foca_states
.iter()
.filter_map(|(id, member)| {
let member = *member;

if member.id().member_id() != member_id {
return None;
}

let rtt = members
.rtts
.get(&member.id().addr())
Expand Down Expand Up @@ -865,7 +875,8 @@ fn diff_member_states(
let mut to_delete = vec![];

last_states.retain(|id, _v| {
if foca_states.contains_key(id) {
let foca_state = foca_states.get(id);
if foca_state.is_some() && foca_state.unwrap().id().member_id() == member_id {
true
} else {
to_delete.push(*id);
Expand Down Expand Up @@ -1126,6 +1137,7 @@ mod tests {
ta2_gossip_addr,
Default::default(),
ta1.agent.cluster_id(),
None,
);
ta1.agent.members().write().add_member(&ta2_actor);

Expand Down
6 changes: 3 additions & 3 deletions crates/corro-agent/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use tokio::{
sync::{mpsc, Mutex, RwLock},
time::error::Elapsed,
};
use tracing::{debug, debug_span, info, warn, Instrument};
use tracing::{debug, debug_span, info, trace, warn, Instrument};

use crate::api::peer::gossip_client_endpoint;

Expand Down Expand Up @@ -80,11 +80,11 @@ impl Transport {
#[tracing::instrument(skip(self, data), fields(buf_size = data.len()), level = "debug", err)]
pub async fn send_datagram(&self, addr: SocketAddr, data: Bytes) -> Result<(), TransportError> {
let conn = self.connect(addr).await?;
debug!("connected to {addr}");
trace!("connected to {addr}");

match conn.send_datagram(data.clone()) {
Ok(send) => {
debug!("sent datagram to {addr}");
trace!("sent datagram to {addr}");
return Ok(send);
}
Err(SendDatagramError::ConnectionLost(e)) => {
Expand Down
Loading
Loading