Skip to content

Commit 5a4ac78

Browse files
committed
feat: voter votes only once
1 parent 92d25a1 commit 5a4ac78

4 files changed

Lines changed: 71 additions & 10 deletions

File tree

src/domains/cluster_actors/actor.rs

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,7 @@ impl ClusterActor {
297297
pub(crate) fn track_replication_progress(&mut self, res: ReplicationResponse) {
298298
if let Some(mut consensus) = self.consensus_tracker.take(&res.log_idx) {
299299
println!("[INFO] Received acks for log index num: {}", res.log_idx);
300-
consensus.increase_vote();
300+
consensus.increase_vote(res.from);
301301

302302
if let Some(consensus) = consensus.maybe_not_finished(res.log_idx) {
303303
self.consensus_tracker.insert(res.log_idx, consensus);
@@ -745,7 +745,7 @@ mod test {
745745
}
746746

747747
#[tokio::test]
748-
async fn apply_acks_delete_consensus_voting_when_consensus_reached() {
748+
async fn test_consensus_voting_deleted_when_consensus_reached() {
749749
// GIVEN
750750
let mut test_logger = ReplicatedLogs::new(InMemoryWAL::default(), 0, 0);
751751
let mut cluster_actor = cluster_actor_create_helper();
@@ -771,23 +771,64 @@ mod test {
771771
log_idx: 1,
772772
term: 0,
773773
rej_reason: RejectionReason::None,
774-
from: PeerIdentifier("repl1".into()), //TODO Must be changed if "update_match_index" becomes idempotent operation on peer id
774+
from: PeerIdentifier("".into()),
775775
};
776-
cluster_actor.track_replication_progress(follower_res.clone());
777-
cluster_actor.track_replication_progress(follower_res.clone());
776+
cluster_actor.track_replication_progress(follower_res.clone().set_from("repl1"));
777+
cluster_actor.track_replication_progress(follower_res.clone().set_from("repl2"));
778778

779779
// up to this point, tracker hold the consensus
780780
assert_eq!(cluster_actor.consensus_tracker.len(), 1);
781+
assert_eq!(cluster_actor.consensus_tracker.get(&1).unwrap().voters.len(), 2);
781782

782783
// ! Majority votes made
783-
cluster_actor.track_replication_progress(follower_res.clone());
784+
cluster_actor.track_replication_progress(follower_res.set_from("repl3"));
784785

785786
// THEN
786787
assert_eq!(cluster_actor.consensus_tracker.len(), 0);
787788
assert_eq!(test_logger.log_index, 1);
789+
788790
client_wait.await.unwrap();
789791
}
790792

793+
#[tokio::test]
794+
async fn test_same_voter_can_vote_only_once() {
795+
// GIVEN
796+
let mut test_logger = ReplicatedLogs::new(InMemoryWAL::default(), 0, 0);
797+
let mut cluster_actor = cluster_actor_create_helper();
798+
799+
let (cluster_sender, _) = tokio::sync::mpsc::channel(100);
800+
801+
// - add followers to create quorum
802+
let cache_manager = CacheManager { inboxes: vec![] };
803+
cluster_member_create_helper(&mut cluster_actor, 0..4, cluster_sender, cache_manager, 0)
804+
.await;
805+
let (client_request_sender, client_wait) = tokio::sync::oneshot::channel();
806+
807+
cluster_actor
808+
.req_consensus(
809+
&mut test_logger,
810+
WriteRequest::Set { key: "foo".into(), value: "bar".into() },
811+
client_request_sender,
812+
)
813+
.await;
814+
815+
// WHEN
816+
assert_eq!(cluster_actor.consensus_tracker.len(), 1);
817+
let follower_res = ReplicationResponse {
818+
log_idx: 1,
819+
term: 0,
820+
rej_reason: RejectionReason::None,
821+
from: PeerIdentifier("repl1".into()), //TODO Must be changed if "update_match_index" becomes idempotent operation on peer id
822+
};
823+
cluster_actor.track_replication_progress(follower_res.clone());
824+
cluster_actor.track_replication_progress(follower_res.clone());
825+
cluster_actor.track_replication_progress(follower_res.clone());
826+
827+
// THEN - no change in consensus tracker even though the same voter voted multiple times
828+
assert_eq!(cluster_actor.consensus_tracker.len(), 1);
829+
assert_eq!(test_logger.log_index, 1);
830+
}
831+
791832
#[tokio::test]
792833
async fn logger_create_entries_from_lowest() {
793834
// GIVEN

src/domains/cluster_actors/commands/write_con.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,9 @@ impl ReplicationResponse {
3535
pub(crate) fn is_granted(&self) -> bool {
3636
self.rej_reason == RejectionReason::None
3737
}
38+
39+
#[cfg(test)]
40+
pub(crate) fn set_from(self, from: &str) -> Self {
41+
Self { from: PeerIdentifier(from.to_string()), ..self }
42+
}
3843
}

src/domains/cluster_actors/consensus/mod.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,16 @@ impl LogConsensusTracker {
1717
value: Sender<ConsensusClientResponse>,
1818
replica_count: usize,
1919
) {
20-
self.0
21-
.insert(key, ConsensusVoting { callback: value, pos_vt: 0, neg_vt: 0, replica_count });
20+
self.0.insert(
21+
key,
22+
ConsensusVoting {
23+
callback: value,
24+
pos_vt: 0, // no need for self vote
25+
neg_vt: 0,
26+
replica_count,
27+
voters: Vec::with_capacity(replica_count),
28+
},
29+
);
2230
}
2331
pub(crate) fn take(
2432
&mut self,

src/domains/cluster_actors/consensus/voting.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use crate::domains::cluster_actors::commands::ConsensusClientResponse;
1+
use crate::domains::{
2+
cluster_actors::commands::ConsensusClientResponse, peers::identifier::PeerIdentifier,
3+
};
24
use tokio::sync::oneshot::Sender;
35
pub(crate) type ReplicationVote = Sender<ConsensusClientResponse>;
46

@@ -8,10 +10,15 @@ pub struct ConsensusVoting<T> {
810
pub(crate) pos_vt: u8,
911
pub(crate) neg_vt: u8,
1012
pub(crate) replica_count: usize,
13+
pub(crate) voters: Vec<PeerIdentifier>,
1114
}
1215
impl<T> ConsensusVoting<T> {
13-
pub(crate) fn increase_vote(&mut self) {
16+
pub(crate) fn increase_vote(&mut self, voter: PeerIdentifier) {
17+
if self.voters.iter().any(|v| v == &voter) {
18+
return;
19+
}
1420
self.pos_vt += 1;
21+
self.voters.push(voter);
1522
}
1623

1724
fn get_required_votes(&self) -> u8 {

0 commit comments

Comments
 (0)