Skip to content

Commit 57966a5

Browse files
committed
add tests and metrics
Signed-off-by: Somtochi Onyekwere <[email protected]>
1 parent 1ec476b commit 57966a5

File tree

2 files changed

+105
-3
lines changed

2 files changed

+105
-3
lines changed

crates/corro-agent/src/agent/handlers.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -303,8 +303,13 @@ pub async fn handle_notifications(
303303

304304
match member_added_res {
305305
MemberAddedResult::NewMember | MemberAddedResult::Removed => {
306-
debug!("Member Added {actor:?}");
307-
counter!("corro.gossip.member.added", "id" => actor.id().0.to_string(), "addr" => actor.addr().to_string()).increment(1);
306+
if matches!(member_added_res, MemberAddedResult::Removed) {
307+
debug!("Member Removed {actor:?}");
308+
counter!("corro.gossip.member.removed", "id" => actor.id().0.to_string(), "addr" => actor.addr().to_string()).increment(1);
309+
} else {
310+
debug!("Member Added {actor:?}");
311+
counter!("corro.gossip.member.added", "id" => actor.id().0.to_string(), "addr" => actor.addr().to_string()).increment(1);
312+
}
308313

309314
let members_len = { agent.members().read().states.len() as u32 };
310315

crates/corro-agent/src/agent/tests.rs

Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use crate::{
3232
use corro_tests::*;
3333
use corro_types::change::Change;
3434
use corro_types::{
35-
actor::ActorId,
35+
actor::{ActorId, MemberId},
3636
api::{ExecResponse, ExecResult, Statement},
3737
base::{dbsr, dbsri, dbvri, CrsqlDbVersion, CrsqlDbVersionRange, CrsqlSeq},
3838
broadcast::{ChangeSource, ChangeV1, Changeset},
@@ -1397,3 +1397,100 @@ async fn many_small_changes() -> eyre::Result<()> {
13971397

13981398
Ok(())
13991399
}
1400+
1401+
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
1402+
async fn test_diff_member_id() -> eyre::Result<()> {
1403+
_ = tracing_subscriber::fmt::try_init();
1404+
let (tripwire, tripwire_worker, tripwire_tx) = Tripwire::new_simple();
1405+
1406+
let (ta1_tp, ta1_tp_worker, ta1_tp_tx) = Tripwire::new_simple();
1407+
let ta1 = launch_test_agent(|conf| conf.member_id(MemberId(1)).build(), ta1_tp.clone()).await?;
1408+
let ta2 = launch_test_agent(
1409+
|conf| {
1410+
conf.bootstrap(vec![ta1.agent.gossip_addr().to_string()])
1411+
.member_id(MemberId(2))
1412+
.build()
1413+
},
1414+
tripwire.clone(),
1415+
)
1416+
.await?;
1417+
1418+
let ta3 = launch_test_agent(
1419+
|conf| {
1420+
conf.bootstrap(vec![ta2.agent.gossip_addr().to_string()])
1421+
.member_id(MemberId(1))
1422+
.build()
1423+
},
1424+
tripwire.clone(),
1425+
)
1426+
.await?;
1427+
1428+
let ta4 = launch_test_agent(
1429+
|conf| {
1430+
conf.bootstrap(vec![ta1.agent.gossip_addr().to_string()])
1431+
.member_id(MemberId(2))
1432+
.build()
1433+
},
1434+
tripwire.clone(),
1435+
)
1436+
.await?;
1437+
1438+
// small sleep to ensure they announce themselves to each other
1439+
tokio::time::sleep(Duration::from_secs(3)).await;
1440+
1441+
let members = ta1.agent.members().read().states.clone();
1442+
assert_eq!(members.len(), 1);
1443+
assert!(members.get(&ta3.agent.actor_id()).is_some());
1444+
1445+
let members = ta3.agent.members().read().states.clone();
1446+
assert_eq!(members.len(), 1);
1447+
assert!(members.get(&ta1.agent.actor_id()).is_some());
1448+
1449+
let members = ta2.agent.members().read().states.clone();
1450+
assert_eq!(members.len(), 1);
1451+
assert!(members.get(&ta4.agent.actor_id()).is_some());
1452+
1453+
let members = ta4.agent.members().read().states.clone();
1454+
assert_eq!(members.len(), 1);
1455+
assert!(members.get(&ta2.agent.actor_id()).is_some());
1456+
1457+
// restart ta1 with a different membership id
1458+
println!("ta1: {:?}", ta1.agent.actor_id());
1459+
let ta1_db_path = ta1.agent.db_path();
1460+
let ta1_gossip_addr = ta1.agent.gossip_addr();
1461+
ta1_tp_tx.send(()).await.ok();
1462+
ta1_tp_worker.await;
1463+
tokio::time::sleep(Duration::from_secs(2)).await;
1464+
let ta1 = launch_test_agent(
1465+
|conf| {
1466+
conf.member_id(MemberId(2))
1467+
.db_path(ta1_db_path)
1468+
.gossip_addr(ta1_gossip_addr)
1469+
.bootstrap(vec![ta2.agent.gossip_addr().to_string()])
1470+
.build()
1471+
},
1472+
tripwire.clone(),
1473+
)
1474+
.await?;
1475+
1476+
tokio::time::sleep(Duration::from_secs(3)).await;
1477+
1478+
let members = ta1.agent.members().read().states.clone();
1479+
assert_eq!(members.len(), 2);
1480+
assert!(members.get(&ta2.agent.actor_id()).is_some());
1481+
assert!(members.get(&ta4.agent.actor_id()).is_some());
1482+
1483+
let members = ta3.agent.members().read().states.clone();
1484+
assert_eq!(members.len(), 0);
1485+
1486+
let members = ta2.agent.members().read().states.clone();
1487+
assert_eq!(members.len(), 2);
1488+
assert!(members.get(&ta1.agent.actor_id()).is_some());
1489+
assert!(members.get(&ta4.agent.actor_id()).is_some());
1490+
1491+
tripwire_tx.send(()).await.ok();
1492+
tripwire_worker.await;
1493+
wait_for_all_pending_handles().await;
1494+
1495+
Ok(())
1496+
}

0 commit comments

Comments
 (0)