Skip to content

Commit dc51bdd

Browse files
committed
feat: remove option callback for AddPeer Command
Signed-off-by: Eun-chan Cho <eunchan.cho@talabat.com>
1 parent 23d994b commit dc51bdd

7 files changed

Lines changed: 21 additions & 17 deletions

File tree

duva/src/domains/cluster_actors/commands/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::domains::{append_only_files::WriteRequest, peers::identifier::PeerIde
1414

1515
#[derive(Debug)]
1616
pub(crate) enum ClusterCommand {
17-
AddPeer(AddPeer, Option<tokio::sync::oneshot::Sender<()>>),
17+
AddPeer(AddPeer, tokio::sync::oneshot::Sender<()>),
1818
GetPeers(tokio::sync::oneshot::Sender<Vec<PeerIdentifier>>),
1919
ReplicationInfo(tokio::sync::oneshot::Sender<ReplicationState>),
2020

duva/src/lib.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,9 @@ impl StartUpFacade {
120120
.await?
121121
.leader_bind_addr()
122122
.context("No leader bind address found")?;
123-
connection_manager
124-
.discover_cluster(self.config_manager.port, peer_identifier, None)
125-
.await?;
123+
let (tx, rx) = tokio::sync::oneshot::channel();
124+
connection_manager.discover_cluster(self.config_manager.port, peer_identifier, tx).await?;
125+
let _ = rx.await;
126126
Ok(())
127127
}
128128

duva/src/presentation/clients/controllers/handler.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,9 @@ impl ClientController<Handler> {
9999

100100
let (tx, rx) = tokio::sync::oneshot::channel();
101101
ClusterConnectionManager(self.cluster_communication_manager.clone())
102-
.discover_cluster(self.config_manager.port, peer_identifier, Some(tx))
102+
.discover_cluster(self.config_manager.port, peer_identifier, tx)
103103
.await?;
104104
let _ = rx.await;
105-
106105
QueryIO::SimpleString("OK".into())
107106
},
108107
};

duva/src/presentation/clusters/connection_manager.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ impl ClusterConnectionManager {
1919
peer_stream.disseminate_peers(self.0.get_peers().await?).await?;
2020
peer_stream.may_try_sync(ccm, &connected_peer_info).await?;
2121

22-
self.send(peer_stream.into_add_peer(self.clone(), connected_peer_info)?).await?;
22+
let (tx, rx) = tokio::sync::oneshot::channel();
23+
self.send(peer_stream.into_add_peer(self.clone(), connected_peer_info, tx)?).await?;
2324

2425
Ok(())
2526
}
@@ -28,7 +29,7 @@ impl ClusterConnectionManager {
2829
self,
2930
self_port: u16,
3031
connect_to: PeerIdentifier,
31-
tx: Option<tokio::sync::oneshot::Sender<()>>,
32+
Sender: tokio::sync::oneshot::Sender<()>,
3233
) -> anyhow::Result<()> {
3334
// Base case
3435
let existing_peers = self.get_peers().await?;
@@ -44,19 +45,24 @@ impl ClusterConnectionManager {
4445
.await?
4546
.set_replication_info(&self)
4647
.await?
47-
.create_peer_cmd(self.clone(), tx)?;
48+
.create_peer_cmd(self.clone(), Sender)?;
4849
self.send(add_peer_cmd).await?;
4950

5051
// Discover additional peers concurrently
5152
// TODO Require investigation. Why does 'list_peer_binding_addrs' have to be called at here?
53+
let mut callbacks = Vec::new();
5254
for peer in peer_list {
5355
println!("Discovering peer: {}", peer);
56+
let (tx, rx) = tokio::sync::oneshot::channel();
5457
Box::pin(
55-
ClusterConnectionManager(self.0.clone()).discover_cluster(self_port, peer, None),
58+
ClusterConnectionManager(self.0.clone()).discover_cluster(self_port, peer, tx),
5659
)
5760
.await?;
61+
callbacks.push(rx);
62+
}
63+
for callback in callbacks {
64+
let _ = callback.await;
5865
}
59-
6066
Ok(())
6167
}
6268

duva/src/presentation/clusters/inbound/stream.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ impl InboundStream {
116116
self,
117117
cluster_actor_handler: Sender<ClusterCommand>,
118118
connected_peer_info: ConnectedPeerInfo,
119+
Sender: tokio::sync::oneshot::Sender<()>,
119120
) -> anyhow::Result<ClusterCommand> {
120121
let kind = self.decide_peer_kind(&connected_peer_info);
121122
let peer = create_peer(
@@ -124,7 +125,7 @@ impl InboundStream {
124125
kind,
125126
cluster_actor_handler,
126127
);
127-
Ok(ClusterCommand::AddPeer(AddPeer { peer_id: connected_peer_info.id, peer }, None))
128+
Ok(ClusterCommand::AddPeer(AddPeer { peer_id: connected_peer_info.id, peer }, Sender))
128129
}
129130

130131
pub(crate) fn decide_peer_kind(&self, connected_peer_info: &ConnectedPeerInfo) -> PeerState {

duva/src/presentation/clusters/outbound/stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ impl OutboundStream {
114114
pub(crate) fn create_peer_cmd(
115115
self,
116116
cluster_actor_handler: Sender<ClusterCommand>,
117-
tx: Option<tokio::sync::oneshot::Sender<()>>,
117+
Sender: tokio::sync::oneshot::Sender<()>,
118118
) -> anyhow::Result<(ClusterCommand, Vec<PeerIdentifier>)> {
119119
let mut connection_info =
120120
self.connected_node_info.context("Connected node info not found")?;
@@ -127,6 +127,6 @@ impl OutboundStream {
127127
cluster_actor_handler,
128128
);
129129

130-
Ok((ClusterCommand::AddPeer(AddPeer { peer_id: self.connect_to, peer }, tx), peer_list))
130+
Ok((ClusterCommand::AddPeer(AddPeer { peer_id: self.connect_to, peer }, Sender), peer_list))
131131
}
132132
}

duva/src/services/handlers/cluster.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,7 @@ impl ClusterActor {
2424
ClusterCommand::AddPeer(add_peer_cmd, callback) => {
2525
self.add_peer(add_peer_cmd).await;
2626
self.snapshot_topology().await;
27-
if let Some(callback) = callback {
28-
let _ = callback.send(());
29-
}
27+
let _ = callback.send(());
3028
},
3129
ClusterCommand::GetPeers(callback) => {
3230
let _ = callback.send(self.members.keys().cloned().collect::<Vec<_>>());

0 commit comments

Comments
 (0)