Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions duva/src/domains/cluster_actors/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ use crate::domains::cluster_actors::topology::Topology;
use crate::domains::peers::command::BannedPeer;
use crate::domains::peers::command::BatchEntries;
use crate::domains::peers::command::BatchId;
use crate::domains::replications::messages::ElectionVote;
use crate::domains::replications::messages::RejectionReason;
use crate::domains::replications::messages::ReplicationAck;
use crate::domains::replications::messages::RequestVote;

use crate::domains::peers::command::HeartBeat;
use crate::domains::peers::command::PendingMigrationTask;
Expand Down Expand Up @@ -105,7 +109,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
}

pub(crate) fn log_state(&self) -> &ReplicationState {
self.replication.get_state()
self.replication.state()
}

fn new(
Expand All @@ -123,7 +127,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {

let (tx, _) = tokio::sync::broadcast::channel::<Topology>(100);
let hash_ring = HashRing::default().add_partitions(vec![(
init_repl_state.state().replid.clone(),
init_repl_state.clone_state().replid.clone(),
init_repl_state.self_identifier(),
)]);

Expand Down Expand Up @@ -230,7 +234,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
};

let outbound_stream =
OutboundStream { r, w, self_state: self.replication.state(), peer_state: None };
OutboundStream { r, w, self_state: self.replication.clone_state(), peer_state: None };

tokio::spawn(outbound_stream.add_peer(
self.replication.self_port,
Expand All @@ -249,7 +253,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
r,
w,
host_ip,
self_state: self.replication.state(),
self_state: self.replication.clone_state(),
peer_state: Default::default(),
};

Expand Down Expand Up @@ -401,7 +405,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
self.members
.values()
.map(|p| p.state().clone())
.chain(std::iter::once(self.replication.state()))
.chain(std::iter::once(self.replication.clone_state()))
.collect()
}
// #[instrument(level = tracing::Level::INFO, skip(self, request_vote))]
Expand Down Expand Up @@ -436,7 +440,6 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
},
None => {
self.replication.revert_voting(current_term, &request_vote.candidate_id);
return;
},
};
}
Expand Down Expand Up @@ -769,7 +772,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
.values()
.clone()
.map(|peer| peer.state().clone())
.chain(iter::once(self.replication.state()))
.chain(iter::once(self.replication.clone_state()))
.collect(),
hash_ring: self.hash_ring.clone(),
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ async fn test_reconnection_on_gossip() {
let listener = TcpListener::bind("127.0.0.1:44455").await.unwrap(); // ! Beaware that this is cluster port
let bind_addr = listener.local_addr().unwrap();

let mut replication_state = cluster_actor.replication.state();
let mut replication_state = cluster_actor.replication.clone_state();
replication_state.role = ReplicationRole::Follower;

let (tx, _rx) = Callback::create();
Expand Down
4 changes: 1 addition & 3 deletions duva/src/domains/cluster_actors/actor/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::domains::QueryIO;
use crate::domains::caches::actor::CacheCommandSender;
use crate::domains::caches::cache_objects::CacheEntry;
use crate::domains::caches::command::CacheCommand;
use crate::domains::replications::ReplicatedLogs;

use crate::domains::peers::command::HeartBeat;

Expand Down Expand Up @@ -173,8 +172,7 @@ impl Helper {
last_log_index: 0,
term: 0,
};
let repllogs = ReplicatedLogs::new(MemoryOpLogs::default(), state);
let replication = Replication::new(8080, repllogs);
let replication = Replication::new(8080, MemoryOpLogs::default(), state);
let (_, cache_manager) = Helper::cache_manager();
ClusterActor::new(replication, 100, topology_writer, cache_manager)
}
Expand Down
18 changes: 7 additions & 11 deletions duva/src/domains/cluster_actors/actor/tests/replications.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use crate::domains::replications::ReplicatedLogs;

use super::*;

#[test]
Expand All @@ -13,31 +11,30 @@ fn logger_create_entries_from_lowest() {
last_log_index: 0,
term: 0,
};
let mut repllogs = ReplicatedLogs::new(MemoryOpLogs::default(), state);

let test_logs = vec![
Helper::write(1, 0, "foo", "bar"),
Helper::write(2, 0, "foo2", "bar"),
Helper::write(3, 0, "foo3", "bar"),
];
repllogs.write_many(test_logs.clone()).unwrap();

let mut repl_state = Replication::new(8080, MemoryOpLogs::default(), state);
repl_state.write_many(test_logs.clone()).unwrap();

// WHEN
const LOWEST_FOLLOWER_COMMIT_INDEX: u64 = 2;
let mut repl_state = Replication::new(8080, repllogs);

repl_state.increase_con_idx_by(LOWEST_FOLLOWER_COMMIT_INDEX);

let log = LogEntry::Set { key: "foo4".into(), value: "bar".into(), expires_at: None };
repl_state.write_single_entry(log, repl_state.state().term, None).unwrap();
repl_state.write_single_entry(log, repl_state.clone_state().term, None).unwrap();

let logs = repl_state.list_append_log_entries(Some(LOWEST_FOLLOWER_COMMIT_INDEX));

// THEN
assert_eq!(logs.len(), 2);
assert_eq!(logs[0].log_index, 3);
assert_eq!(logs[1].log_index, 4);
assert_eq!(repl_state.state().last_log_index, 4);
assert_eq!(repl_state.clone_state().last_log_index, 4);
}

#[tokio::test]
Expand Down Expand Up @@ -371,10 +368,9 @@ async fn follower_truncates_log_on_term_mismatch() {
.writer
.extend(vec![Helper::write(2, 1, "key1", "val1"), Helper::write(3, 1, "key2", "val2")]);

let logger = ReplicatedLogs::new(inmemory, state);

let mut cluster_actor = Helper::cluster_actor(ReplicationRole::Leader).await;
cluster_actor.replication.set_logger(logger);
cluster_actor.replication.set_target(inmemory);
cluster_actor.replication.set_state(state);

// Simulate an initial log entry at index 1, term 1
// WHEN: Leader sends an AppendEntries with prev_log_index=1, prev_log_term=2 (mismatch)
Expand Down
2 changes: 1 addition & 1 deletion duva/src/domains/cluster_actors/hash_ring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
/// The `HashRing` maps keys to physical nodes using virtual nodes to ensure
/// even distribution. Each physical node is represented by multiple virtual
/// nodes on the ring, determined by `vnode_num`.
use crate::ReplicationId;
use crate::domains::peers::command::MigrationChunk;
use crate::domains::replications::ReplicationId;
use crate::prelude::PeerIdentifier;
use std::collections::{BTreeMap, HashMap};
use std::ops::Deref;
Expand Down
2 changes: 1 addition & 1 deletion duva/src/domains/cluster_actors/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
callback.send(self.cluster_nodes());
},
ReplicationState(callback) => {
callback.send(self.replication.state());
callback.send(self.replication.clone_state());
},
Forget(peer_addr, callback) => {
if let Ok(Some(())) = self.forget_peer(peer_addr).await {
Expand Down
3 changes: 2 additions & 1 deletion duva/src/domains/query_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::domains::cluster_actors::topology::Topology;
use crate::domains::peers::command::{BatchEntries, BatchId, HeartBeat};

use crate::domains::replications::WriteOperation;
use crate::domains::replications::messages::{ElectionVote, ReplicationAck, RequestVote};

use crate::domains::replications::*;
use crate::presentation::clients::request::ClientAction;
Expand Down Expand Up @@ -496,7 +497,7 @@ mod test {
use super::*;
use crate::domains::caches::cache_objects::CacheEntry;
use crate::domains::cluster_actors::hash_ring::HashRing;
use crate::domains::replications::{ReplicationId, ReplicationRole};
use crate::domains::replications::ReplicationRole;

use crate::domains::peers::command::BannedPeer;
use crate::domains::peers::identifier::PeerIdentifier;
Expand Down
106 changes: 0 additions & 106 deletions duva/src/domains/replications/logger.rs

This file was deleted.

5 changes: 1 addition & 4 deletions duva/src/domains/replications/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
pub(crate) mod consensus;
pub(crate) mod interfaces;
pub(crate) mod logger;
pub(crate) mod messages;
pub(crate) mod operation;
pub(crate) mod replication;
pub(crate) mod state;
pub(crate) use consensus::election::*;
pub(crate) use consensus::log::*;
pub(crate) use interfaces::*;
pub(crate) use logger::*;
pub(crate) use messages::*;
pub use operation::LogEntry;
pub use operation::WriteOperation;
pub(crate) mod messages;

pub use messages::ReplicationId;
pub use replication::ReplicationRole;
Expand Down
Loading
Loading