Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 79 additions & 66 deletions src/domains/cluster_actors/actor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use super::commands::AddPeer;
use super::commands::ConsensusClientResponse;
use super::commands::ReplicationResponse;
use super::commands::RequestVote;
use super::commands::RequestVoteReply;
use super::commands::WriteConsensusResponse;
use super::heartbeats::heartbeat::AppendEntriesRPC;
use super::heartbeats::heartbeat::ClusterHeartBeat;
use super::heartbeats::scheduler::HeartBeatScheduler;
Expand Down Expand Up @@ -62,14 +63,14 @@ impl ClusterActor {

pub(crate) fn replicas(&self) -> impl Iterator<Item = (&PeerIdentifier, &Peer, u64)> {
self.members.iter().filter_map(|(id, peer)| match &peer.kind {
PeerKind::Replica { watermark: hwm, replid } => Some((id, peer, *hwm)),
PeerKind::Replica { match_index: hwm, replid } => Some((id, peer, *hwm)),
_ => None,
})
}

pub(crate) fn replicas_mut(&mut self) -> impl Iterator<Item = (&mut Peer, u64)> {
self.members.values_mut().into_iter().filter_map(|peer| match peer.kind.clone() {
PeerKind::Replica { watermark: hwm, replid } => Some((peer, hwm)),
PeerKind::Replica { match_index: hwm, replid } => Some((peer, hwm)),
_ => None,
})
}
Expand Down Expand Up @@ -195,33 +196,33 @@ impl ClusterActor {
self.remove_banned_peers().await;
}

pub(crate) fn update_on_hertbeat_message(&mut self, heartheat: &HeartBeatMessage) {
if let Some(peer) = self.members.get_mut(&heartheat.heartbeat_from) {
pub(crate) fn update_on_hertbeat_message(&mut self, from: &PeerIdentifier, log_index: u64) {
if let Some(peer) = self.members.get_mut(from) {
peer.last_seen = Instant::now();

if let PeerKind::Replica { watermark: hwm, .. } = &mut peer.kind {
*hwm = heartheat.hwm;
if let PeerKind::Replica { match_index, .. } = &mut peer.kind {
*match_index = log_index;
}
}
}
pub(crate) async fn try_create_append_entries(
&mut self,
logger: &mut ReplicatedLogs<impl TWriteAheadLog>,
log: &WriteRequest,
) -> Result<Vec<WriteOperation>, WriteConsensusResponse> {
) -> Result<Vec<WriteOperation>, ConsensusClientResponse> {
if !self.replication.is_leader_mode {
return Err(WriteConsensusResponse::Err("Write given to follower".into()));
return Err(ConsensusClientResponse::Err("Write given to follower".into()));
}

let Ok(append_entries) =
logger.create_log_entries(&log, self.take_low_watermark(), self.replication.term).await
else {
return Err(WriteConsensusResponse::Err("Write operation failed".into()));
return Err(ConsensusClientResponse::Err("Write operation failed".into()));
};

// Skip consensus for no replicas
if self.replicas().count() == 0 {
return Err(WriteConsensusResponse::LogIndex(Some(logger.log_index)));
return Err(ConsensusClientResponse::LogIndex(Some(logger.log_index)));
}

Ok(append_entries)
Expand All @@ -231,14 +232,14 @@ impl ClusterActor {
&mut self,
logger: &mut ReplicatedLogs<impl TWriteAheadLog>,
log: WriteRequest,
callback: tokio::sync::oneshot::Sender<WriteConsensusResponse>,
) -> anyhow::Result<()> {
callback: tokio::sync::oneshot::Sender<ConsensusClientResponse>,
) {
let (prev_log_index, prev_term) = (logger.log_index, logger.term);
let append_entries = match self.try_create_append_entries(logger, &log).await {
Ok(entries) => entries,
Err(err) => {
let _ = callback.send(err);
return Ok(());
return;
},
};

Expand All @@ -248,8 +249,6 @@ impl ClusterActor {
.collect::<FuturesUnordered<_>>()
.for_each(|_| async {})
.await;

Ok(())
}

pub(crate) async fn install_leader_state(
Expand Down Expand Up @@ -285,38 +284,32 @@ impl ClusterActor {
.values()
.into_iter()
.filter_map(|peer| match &peer.kind {
PeerKind::Replica { watermark, replid } => Some(*watermark),
PeerKind::Replica { match_index: watermark, replid } => Some(*watermark),
_ => None,
})
.min()
}

pub(crate) fn apply_acks(&mut self, offsets: Vec<u64>) {
offsets.into_iter().for_each(|offset| {
if let Some(mut consensus) = self.consensus_tracker.take(&offset) {
println!("[INFO] Received acks for log index num: {}", offset);
consensus.increase_vote();
pub(crate) fn track_replication_progress(&mut self, res: ReplicationResponse) {
if let Some(mut consensus) = self.consensus_tracker.take(&res.log_idx) {
println!("[INFO] Received acks for log index num: {}", res.log_idx);
consensus.increase_vote();

if let Some(consensus) = consensus.maybe_not_finished(offset) {
self.consensus_tracker.insert(offset, consensus);
}
if let Some(consensus) = consensus.maybe_not_finished(res.log_idx) {
self.consensus_tracker.insert(res.log_idx, consensus);
}
});
}
}

// After send_ack: Leader updates its knowledge of follower's progress
async fn send_ack(&mut self, send_to: &PeerIdentifier, log_index: u64) {
async fn send_ack(&mut self, send_to: &PeerIdentifier, log_idx: u64, is_granted: bool) {
if let Some(leader) = self.members.get_mut(send_to) {
// TODO send the last offset instead of multiple offsets.
let _ = leader.write_io(QueryIO::AppendEntriesResponse(vec![log_index])).await;
let _ = leader
.write_io(ReplicationResponse::new(log_idx, is_granted, &self.replication))
.await;
}
}

// After send_negative_ack: Leader needs to backtrack and send earlier entries
async fn send_negative_ack(&self, send_to: &PeerIdentifier, prev_log_index: u64) {
//TODO
}

pub(crate) async fn send_commit_heartbeat(&mut self, offset: u64) {
// TODO is there any case where I can use offset input?
self.replication.hwm.fetch_add(1, Ordering::Relaxed);
Expand Down Expand Up @@ -360,13 +353,13 @@ impl ClusterActor {
wal.truncate_after(rpc.prev_log_index).await;
}

self.send_negative_ack(&rpc.heartbeat_from, wal.log_index).await;
self.send_ack(&rpc.from, wal.log_index, false).await;
return Err(e);
}

let match_index = wal.write_log_entries(std::mem::take(&mut rpc.append_entries)).await?;

self.send_ack(&rpc.heartbeat_from, match_index).await;
self.send_ack(&rpc.from, match_index, true).await;
Ok(())
}

Expand Down Expand Up @@ -433,10 +426,10 @@ impl ClusterActor {
.values()
.into_iter()
.map(|peer| match &peer.kind {
PeerKind::Replica { watermark, replid } => {
PeerKind::Replica { match_index, replid } => {
format!("{} {} 0", peer.addr, replid)
},
PeerKind::NonDataPeer { replid } => {
PeerKind::NonDataPeer { replid, match_index } => {
format!("{} {} 0", peer.addr, replid)
},
})
Expand Down Expand Up @@ -476,27 +469,18 @@ impl ClusterActor {
let _ = peer.write_io(RequestVoteReply { term, vote_granted: grant_vote }).await;
}

pub(crate) async fn tally_vote(
&mut self,
request_vote_reply: RequestVoteReply,
logger: &ReplicatedLogs<impl TWriteAheadLog>,
) {
if !self.replication.should_become_leader(request_vote_reply.vote_granted) {
pub(crate) async fn tally_vote(&mut self, logger: &ReplicatedLogs<impl TWriteAheadLog>) {
if !self.replication.election_state.may_become_leader() {
return;
}
self.become_leader().await;

if self.replication.is_leader_mode {
self.heartbeat_scheduler.switch().await;
}
let msg = self.replication.default_heartbeat(0, logger.log_index, logger.term);

self.replicas_mut()
.map(|(peer, _)| peer.write_io(AppendEntriesRPC(msg.clone())))
.collect::<FuturesUnordered<_>>()
.for_each(|_| async {})
.await;

//TODO - how to notify other followers of this election? What's the rule?
}

pub(crate) fn reset_election_timeout(&mut self, leader_id: &PeerIdentifier) {
Expand Down Expand Up @@ -531,18 +515,42 @@ impl ClusterActor {
}
}

// TODO the node should step down to follower state if it’s a leader or candidate (Raft rule).
pub(crate) fn apply_term_then_may_stepdown(
pub(crate) fn maybe_update_term(
&mut self,
new_term: u64,
heartbeat_from: &PeerIdentifier,
wal: &mut ReplicatedLogs<impl TWriteAheadLog>,
) {
if new_term > self.replication.term {
self.replication.term = new_term;
wal.term = new_term;
}
}

pub(crate) async fn maybe_reject(
&mut self,
heartbeat: &HeartBeatMessage,
wal: &ReplicatedLogs<impl TWriteAheadLog>,
) -> bool {
if heartbeat.term < self.replication.term {
self.send_ack(&heartbeat.from, wal.log_index, false).await;
return true;
}
false
}

/// Used when:
/// 1) on follower's consensus rejection when term is not matched
/// 2) step down operation is given from user
pub(crate) async fn step_down(&mut self) {
self.replication.become_follower(None);
self.heartbeat_scheduler.turn_follower_mode().await;
}

async fn become_leader(&mut self) {
eprintln!("\x1b[32m[INFO] Election succeeded\x1b[0m");
self.replication.become_leader();
self.heartbeat_scheduler.turn_leader_mode().await;
}
}

#[cfg(test)]
Expand Down Expand Up @@ -588,7 +596,7 @@ mod test {
prev_log_term: 0,
append_entries: op_logs,
ban_list: vec![],
heartbeat_from: PeerIdentifier::new("localhost", 8080),
from: PeerIdentifier::new("localhost", 8080),
replid: ReplicationId::Key("localhost".to_string().into()),
hop_count: 0,
cluster_nodes: vec![],
Expand Down Expand Up @@ -619,7 +627,7 @@ mod test {
key.to_string(),
TcpStream::connect(bind_addr).await.unwrap(),
PeerKind::Replica {
watermark: follower_hwm,
match_index: follower_hwm,
replid: ReplicationId::Key("localhost".to_string().into()),
},
cluster_sender.clone(),
Expand Down Expand Up @@ -686,8 +694,7 @@ mod test {
WriteRequest::Set { key: "foo".into(), value: "bar".into() },
tx,
)
.await
.unwrap();
.await;

// THEN
assert_eq!(cluster_actor.consensus_tracker.len(), 0);
Expand Down Expand Up @@ -717,8 +724,7 @@ mod test {
WriteRequest::Set { key: "foo".into(), value: "bar".into() },
tx,
)
.await
.unwrap();
.await;

// THEN
assert_eq!(cluster_actor.consensus_tracker.len(), 1);
Expand All @@ -745,18 +751,23 @@ mod test {
WriteRequest::Set { key: "foo".into(), value: "bar".into() },
client_request_sender,
)
.await
.unwrap();
.await;

// WHEN
cluster_actor.apply_acks(vec![1]);
cluster_actor.apply_acks(vec![1]);
let follower_res = ReplicationResponse {
log_idx: 1,
term: 0,
is_granted: true,
from: PeerIdentifier("repl1".into()), //TODO Must be changed if "update_match_index" becomes idempotent operation on peer id
};
cluster_actor.track_replication_progress(follower_res.clone());
cluster_actor.track_replication_progress(follower_res.clone());

// up to this point, tracker hold the consensus
assert_eq!(cluster_actor.consensus_tracker.len(), 1);

// ! Majority votes made
cluster_actor.apply_acks(vec![1]);
cluster_actor.track_replication_progress(follower_res.clone());

// THEN
assert_eq!(cluster_actor.consensus_tracker.len(), 0);
Expand Down Expand Up @@ -1201,7 +1212,7 @@ mod test {
create_peer(
key.to_string(),
TcpStream::connect(bind_addr).await.unwrap(),
PeerKind::Replica { watermark: 0, replid: repl_id.clone() },
PeerKind::Replica { match_index: 0, replid: repl_id.clone() },
cluster_sender.clone(),
),
);
Expand All @@ -1221,6 +1232,7 @@ mod test {
TcpStream::connect(bind_addr).await.unwrap(),
PeerKind::NonDataPeer {
replid: ReplicationId::Key(second_shard_repl_id.to_string()),
match_index: 0,
},
cluster_sender.clone(),
),
Expand All @@ -1236,6 +1248,7 @@ mod test {
TcpStream::connect(bind_addr_for_second_shard).await.unwrap(),
PeerKind::NonDataPeer {
replid: ReplicationId::Key(second_shard_repl_id.to_string()),
match_index: 0,
},
cluster_sender.clone(),
),
Expand Down Expand Up @@ -1288,7 +1301,7 @@ mod test {
let peer = create_peer(
"foo".to_string(),
TcpStream::connect(bind_addr).await.unwrap(),
PeerKind::Replica { watermark: 0, replid: ReplicationId::Key(repl_id.to_string()) },
PeerKind::Replica { match_index: 0, replid: ReplicationId::Key(repl_id.to_string()) },
cluster_actor.self_handler.clone(),
);

Expand Down
Loading