Skip to content

Commit c58ed4c

Browse files
authored
Dont return quorum if requester isnt involved (#72)
* dont return quorum if requester isnt involved * refactor: make cloning more consistent * add unit test * refactor: take out some boilerplate
1 parent 79572e6 commit c58ed4c

File tree

1 file changed

+128
-2
lines changed

1 file changed

+128
-2
lines changed

src/lighthouse.rs

+128-2
Original file line numberDiff line numberDiff line change
@@ -458,7 +458,7 @@ impl LighthouseService for Arc<Lighthouse> {
458458
requester.replica_id.clone(),
459459
QuorumMemberDetails {
460460
joined: Instant::now(),
461-
member: requester,
461+
member: requester.clone(),
462462
},
463463
);
464464
let rx = state.channel.subscribe();
@@ -471,7 +471,28 @@ impl LighthouseService for Arc<Lighthouse> {
471471
rx
472472
};
473473

474-
let quorum = rx.recv().await.map_err(|e| Status::from_error(e.into()))?;
474+
let quorum = loop {
475+
let current_quorum = rx.recv().await.map_err(|e| Status::from_error(e.into()))?;
476+
477+
if current_quorum
478+
.participants
479+
.iter()
480+
.any(|p| p.replica_id == requester.replica_id)
481+
{
482+
break current_quorum;
483+
}
484+
485+
// Only continue the loop if the replica is not in the quorum
486+
let mut state = self.state.lock().await;
487+
state.participants.insert(
488+
requester.replica_id.clone(),
489+
QuorumMemberDetails {
490+
joined: Instant::now(),
491+
member: requester.clone(),
492+
},
493+
);
494+
info!("Replica {} not in quorum, retrying", &requester.replica_id);
495+
};
475496

476497
let reply = LighthouseQuorumResponse {
477498
quorum: Some(quorum),
@@ -995,4 +1016,109 @@ mod tests {
9951016
// replica_id changed
9961017
assert!(quorum_changed(&a, &c));
9971018
}
1019+
#[tokio::test]
1020+
1021+
async fn test_lighthouse_join_during_shrink() -> Result<()> {
1022+
fn create_member(id: &str, addr_num: &str, step: i64, shrink_only: bool) -> QuorumMember {
1023+
QuorumMember {
1024+
replica_id: id.to_string(),
1025+
address: format!("addr{}", addr_num),
1026+
store_address: format!("store{}", addr_num),
1027+
step,
1028+
world_size: 1,
1029+
shrink_only,
1030+
}
1031+
}
1032+
1033+
fn create_request(member: &QuorumMember) -> tonic::Request<LighthouseQuorumRequest> {
1034+
tonic::Request::new(LighthouseQuorumRequest {
1035+
requester: Some(member.clone()),
1036+
})
1037+
}
1038+
1039+
let opt = LighthouseOpt {
1040+
min_replicas: 2,
1041+
bind: "[::]:0".to_string(),
1042+
join_timeout_ms: 1000,
1043+
quorum_tick_ms: 10,
1044+
heartbeat_timeout_ms: 5000,
1045+
};
1046+
1047+
// Start the lighthouse service
1048+
let lighthouse = Lighthouse::new(opt).await?;
1049+
let lighthouse_task = tokio::spawn(lighthouse.clone().run());
1050+
1051+
// Create client to interact with lighthouse
1052+
let mut client = lighthouse_client_new(lighthouse.address()).await?;
1053+
1054+
// 1. First quorum
1055+
let mut first_request = create_request(&create_member("replica0", "1", 1, false));
1056+
let mut second_request = create_request(&create_member("replica1", "2", 1, false));
1057+
1058+
tokio::spawn({
1059+
let mut client = client.clone();
1060+
async move { client.quorum(first_request).await }
1061+
});
1062+
let first_response = client.quorum(second_request).await?;
1063+
let first_quorum = first_response.into_inner().quorum.unwrap();
1064+
assert_eq!(first_quorum.participants.len(), 2);
1065+
assert_eq!(first_quorum.participants[0].replica_id, "replica0");
1066+
assert_eq!(first_quorum.participants[1].replica_id, "replica1");
1067+
assert_eq!(first_quorum.participants[1].step, 1);
1068+
1069+
// 2. Quorum without joiner
1070+
let joining_request = create_request(&create_member("joiner", "3", 1, false));
1071+
let joining_task = tokio::spawn({
1072+
let mut client = client.clone();
1073+
async move { client.quorum(joining_request).await }
1074+
});
1075+
1076+
// Try to shrink only
1077+
first_request = create_request(&create_member("replica0", "1", 2, true));
1078+
second_request = create_request(&create_member("replica1", "2", 2, false));
1079+
1080+
tokio::spawn({
1081+
let mut client = client.clone();
1082+
async move { client.quorum(first_request).await }
1083+
});
1084+
let second_response = client.quorum(second_request).await?;
1085+
let second_quorum = second_response.into_inner().quorum.unwrap();
1086+
assert!(second_quorum
1087+
.participants
1088+
.iter()
1089+
.all(|p| p.replica_id != "joiner"));
1090+
assert_eq!(second_quorum.participants.len(), 2);
1091+
assert_eq!(second_quorum.participants[0].replica_id, "replica0");
1092+
assert_eq!(second_quorum.participants[1].replica_id, "replica1");
1093+
assert_eq!(second_quorum.participants[1].step, 2);
1094+
1095+
// 3. Quorum with joiner
1096+
first_request = create_request(&create_member("replica0", "1", 3, false));
1097+
second_request = create_request(&create_member("replica1", "2", 3, false));
1098+
1099+
tokio::spawn({
1100+
let mut client = client.clone();
1101+
async move { client.quorum(first_request).await }
1102+
});
1103+
let third_response = client.quorum(second_request).await?;
1104+
let third_quorum = third_response.into_inner().quorum.unwrap();
1105+
assert!(third_quorum
1106+
.participants
1107+
.iter()
1108+
.any(|p| p.replica_id == "joiner"));
1109+
assert_eq!(third_quorum.participants.len(), 3);
1110+
assert_eq!(third_quorum.participants[2].step, 3);
1111+
1112+
let join_result = joining_task.await?;
1113+
let join_quorum = join_result.unwrap().into_inner().quorum.unwrap();
1114+
assert!(join_quorum
1115+
.participants
1116+
.iter()
1117+
.any(|p| p.replica_id == "joiner"));
1118+
assert_eq!(join_quorum.participants.len(), 3);
1119+
assert_eq!(join_quorum.participants[2].step, 3);
1120+
1121+
lighthouse_task.abort();
1122+
Ok(())
1123+
}
9981124
}

0 commit comments

Comments
 (0)