Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions duva-client/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ impl<T> ClientController<T> {
| Config { .. }
| Info
| ClusterForget { .. }
| Role
| ReplicaOf { .. }
| ClusterInfo => match query_io {
| QueryIO::Null => Response::Null,
Expand Down Expand Up @@ -130,7 +129,7 @@ impl<T> ClientController<T> {
}
Response::Array(keys)
},
| ClusterNodes => {
| Role | ClusterNodes => {
let QueryIO::Array(value) = query_io else {
return Response::FormatError;
};
Expand Down
12 changes: 12 additions & 0 deletions duva/src/domains/cluster_actors/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,18 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
})
}

pub(crate) fn get_sorted_roles(&mut self) -> Vec<(PeerIdentifier, ReplicationRole)> {
let mut replica = self
.members
.iter()
.filter(|(_, peer_state)| peer_state.is_replica(&self.replication.replid))
.map(|(peer_id, peer_state)| (peer_id.clone(), peer_state.role().clone()))
.chain(iter::once((self.replication.self_identifier(), self.replication.role.clone())))
.collect::<Vec<_>>();
replica.sort_by_key(|(_, role)| role.clone());
replica
}

fn shard_leaders(&self) -> Vec<(ReplicationId, PeerIdentifier)> {
let iter = self
.members
Expand Down
1 change: 1 addition & 0 deletions duva/src/domains/cluster_actors/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ pub enum ClientMessage {
LeaderReqConsensus(ConsensusRequest),
ClusterNodes(Callback<Vec<PeerState>>),
GetRole(Callback<ReplicationRole>),
GetRoles(Callback<Vec<(PeerIdentifier, ReplicationRole)>>),
SubscribeToTopologyChange(Callback<tokio::sync::broadcast::Receiver<Topology>>),
ClusterMeet(PeerIdentifier, LazyOption, Callback<anyhow::Result<()>>),
GetTopology(Callback<Topology>),
Expand Down
6 changes: 4 additions & 2 deletions duva/src/domains/cluster_actors/replication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,13 @@ impl From<String> for ReplicationId {
}
}

#[derive(Debug, Clone, PartialEq, Eq, bincode::Encode, bincode::Decode, Default)]
#[derive(
Debug, Clone, PartialEq, Eq, bincode::Encode, bincode::Decode, Default, PartialOrd, Ord,
)]
pub enum ReplicationRole {
Leader,
#[default]
Follower,
Leader,
}

impl Display for ReplicationRole {
Expand Down
4 changes: 3 additions & 1 deletion duva/src/domains/cluster_actors/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use crate::domains::cluster_actors::ConnectionMessage;
use crate::domains::cluster_actors::SchedulerMessage;
use crate::domains::operation_logs::interfaces::TWriteAheadLog;
use crate::domains::peers::PeerMessage;

use crate::prelude::PeerIdentifier;
use crate::res_err;
use tokio::net::TcpStream;
Expand Down Expand Up @@ -109,6 +108,9 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
| GetRole(callback) => {
callback.send(self.replication.role.clone());
},
| GetRoles(callback) => {
callback.send(self.get_sorted_roles());
},
| SubscribeToTopologyChange(callback) => {
callback.send(self.node_change_broadcast.subscribe());
},
Expand Down
3 changes: 1 addition & 2 deletions duva/src/presentation/clients/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,7 @@ impl ClientController {
QueryIO::SimpleString("OK".into())
},
| ClientAction::Role => {
let role = self.cluster_communication_manager.route_get_role();
QueryIO::SimpleString(role.await?.to_string().into())
self.cluster_communication_manager.route_get_roles().await?.into()
},
| ClientAction::Ttl { key } => {
QueryIO::SimpleString(self.cache_manager.route_ttl(key).await?.into())
Expand Down
6 changes: 6 additions & 0 deletions duva/src/presentation/clusters/communication_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@ impl ClusterCommunicationManager {
Ok(rx.await?)
}

pub(crate) async fn route_get_roles(&self) -> anyhow::Result<Vec<String>> {
let (tx, rx) = Callback::create();
self.send(ClientMessage::GetRoles(tx)).await?;
Ok(rx.await?.into_iter().map(|(id, role)| format!("{}:{}", id.0, role)).collect())
}

pub(crate) async fn route_get_role(&self) -> anyhow::Result<ReplicationRole> {
let (tx, rx) = Callback::create();
self.send(ClientMessage::GetRole(tx)).await?;
Expand Down
6 changes: 4 additions & 2 deletions duva/tests/cluster_ops/test_cluster_meet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ fn run_cluster_meet(append_only: bool) -> anyhow::Result<()> {
while until > std::time::Instant::now() {
let res = client_handler.send_and_get_vec("cluster info", 1);
if res.contains(&"cluster_known_nodes:3".to_string()) {
assert_eq!(client_handler.send_and_get("role"), "leader");
let role_response = client_handler.send_and_get_vec("role", 2);
assert!(role_response.contains(&format!("127.0.0.1:{}:{}", env3.port, "leader")));
success_cnt += 1;
break;
}
Expand All @@ -42,7 +43,8 @@ fn run_cluster_meet(append_only: bool) -> anyhow::Result<()> {
while until > std::time::Instant::now() {
let res = replica_handler.send_and_get_vec("cluster info", 1);
if res.contains(&"cluster_known_nodes:3".to_string()) {
assert_eq!(replica_handler.send_and_get("role"), "follower");
let role_response = client_handler.send_and_get_vec("role", 2);
assert!(role_response.contains(&format!("127.0.0.1:{}:{}", env4.port, "follower")));
success_cnt += 1;
break;
}
Expand Down
5 changes: 3 additions & 2 deletions duva/tests/cluster_ops/test_lazy_discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ fn run_lazy_discovery_of_leader(with_append_only: bool) -> anyhow::Result<()> {
assert_eq!(p1_h.send_and_get(format!("REPLICAOF 127.0.0.1 {}", &env2.port)), "OK");

// THEN
assert_eq!(p1_h.send_and_get("role"), "follower");
assert_eq!(p2_h.send_and_get("role"), "leader");
let role_response = p1_h.send_and_get_vec("role", 2);
assert!(role_response.contains(&format!("127.0.0.1:{}:{}", p1.port, "follower")));
assert!(role_response.contains(&format!("127.0.0.1:{}:{}", env2.port, "leader")));

// * Following is required to test replicaof successuflly update topology changes
p1.terminate()?;
Expand Down
8 changes: 4 additions & 4 deletions duva/tests/cluster_ops/test_reconnection_on_reboot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ fn run_reconnection_on_reboot(with_append_only: bool) -> anyhow::Result<()> {
//THEN

let mut cli_to_p2 = Client::new(p2.port);
let p2_role = cli_to_p2.send_and_get("ROLE");
let p2_role = cli_to_p2.send_and_get_vec("ROLE", 2);

let mut cli_to_p1 = Client::new(p1.port);
let p1_role = cli_to_p1.send_and_get("ROLE");
let p1_role = cli_to_p1.send_and_get_vec("ROLE", 2);

assert_eq!(p2_role, "follower");
assert_eq!(p1_role, "leader");
assert!(p2_role.contains(&format!("127.0.0.1:{}:{}", p2.port, "follower")));
assert!(p1_role.contains(&format!("127.0.0.1:{}:{}", p1.port, "leader")));

// Check that the values are still there
assert_eq!(cli_to_p2.send_and_get("GET x"), "value1");
Expand Down
3 changes: 2 additions & 1 deletion duva/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ pub fn spawn_server_process(env: &ServerEnv) -> anyhow::Result<TestProcessChild>
continue;
}

if role_res.is_empty() || (role_res != "leader" && role_res != "follower") {
if role_res.is_empty() || (role_res.contains("leader") && role_res.contains("follower"))
{
continue;
}

Expand Down
18 changes: 8 additions & 10 deletions duva/tests/replication_ops/test_leader_election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,15 @@ fn run_set_twice_after_election(with_append_only: bool) -> anyhow::Result<()> {

panic_if_election_not_done(follower_p1.port, follower_p2.port);

let res = h1.send_and_get("role");
let res2 = h2.send_and_get("role");
let res = h1.send_and_get_vec("role", 3);

if res.contains(&"leader".to_string()) {
if res.contains(&format!("127.0.0.1:{}:{}", follower_p1.port, "leader")) {
// THEN - one of the replicas should become the leader
assert_eq!(h1.send_and_get("set 1 2"), "OK");
assert_eq!(h1.send_and_get("set 2 3"), "OK");
}

if res2 == "leader" {
if res.contains(&format!("127.0.0.1:{}:{}", follower_p2.port, "leader")) {
// THEN - one of the replicas should become the leader
assert_eq!(h2.send_and_get("set 1 2"), "OK");
assert_eq!(h2.send_and_get("set 2 3"), "OK");
Expand Down Expand Up @@ -112,22 +111,21 @@ fn panic_if_election_not_done(port1: u16, port2: u16) {
let mut first_election_cnt = 0;
let mut flag = false;
let mut h1 = Client::new(port1);
let mut h2 = Client::new(port2);

let start = std::time::Instant::now();
while first_election_cnt < 50 {
let res = h1.send_and_get("role");
let res2 = h2.send_and_get("role");
let res = h1.send_and_get_vec("role", 2);
println!(
"[{}ms] Poll {}: port1={} port2={} res1={} res2={}",
"[{}ms] Poll {}: port1={} port2={} res={:?}",
start.elapsed().as_millis(),
first_election_cnt,
port1,
port2,
res,
res2
);
if res == "leader" || res2 == "leader" {
if res.contains(&format!("127.0.0.1:{}:{}", port1, "leader"))
|| res.contains(&format!("127.0.0.1:{}:{}", port2, "leader"))
{
flag = true;
break;
}
Expand Down
Loading