Skip to content

Commit f4b92ad

Browse files
committed
wait on drop
1 parent dc51bdd commit f4b92ad

6 files changed

Lines changed: 18 additions & 9 deletions

File tree

duva/src/domains/caches/cache_manager.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,13 @@ impl CacheManager {
225225
}
226226

227227
pub(crate) async fn drop_cache(&self) {
228-
join_all(self.inboxes.iter().map(|shard| shard.send(CacheCommand::Drop))).await;
228+
let (txs, rxs) = self.oneshot_channels();
229+
join_all(
230+
self.chain(txs)
231+
.map(|(shard, sender)| shard.send(CacheCommand::Drop { callback: sender })),
232+
)
233+
.await;
234+
235+
join_all(rxs.into_iter().map(|rx| rx)).await;
229236
}
230237
}

duva/src/domains/caches/command.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,6 @@ pub enum CacheCommand {
1313
IndexGet { key: String, read_idx: u64, callback: oneshot::Sender<QueryIO> },
1414
Ping,
1515
StopSentinel,
16-
Drop,
16+
Drop { callback: oneshot::Sender<()> },
1717
Exists { key: String, callback: oneshot::Sender<bool> },
1818
}

duva/src/presentation/clients/controllers/handler.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ impl ClientController<Handler> {
9494
}
9595
},
9696
ClientAction::ReplicaOf(peer_identifier) => {
97-
// TODO should check if the peer is in the cluster?
9897
self.cluster_communication_manager.replicaof(peer_identifier.clone()).await;
9998

10099
let (tx, rx) = tokio::sync::oneshot::channel();

duva/src/presentation/clusters/connection_manager.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ impl ClusterConnectionManager {
2929
self,
3030
self_port: u16,
3131
connect_to: PeerIdentifier,
32-
Sender: tokio::sync::oneshot::Sender<()>,
32+
sender: tokio::sync::oneshot::Sender<()>,
3333
) -> anyhow::Result<()> {
3434
// Base case
3535
let existing_peers = self.get_peers().await?;
@@ -45,7 +45,7 @@ impl ClusterConnectionManager {
4545
.await?
4646
.set_replication_info(&self)
4747
.await?
48-
.create_peer_cmd(self.clone(), Sender)?;
48+
.create_peer_cmd(self.clone(), sender)?;
4949
self.send(add_peer_cmd).await?;
5050

5151
// Discover additional peers concurrently

duva/src/presentation/clusters/outbound/stream.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ impl OutboundStream {
114114
pub(crate) fn create_peer_cmd(
115115
self,
116116
cluster_actor_handler: Sender<ClusterCommand>,
117-
Sender: tokio::sync::oneshot::Sender<()>,
117+
sender: tokio::sync::oneshot::Sender<()>,
118118
) -> anyhow::Result<(ClusterCommand, Vec<PeerIdentifier>)> {
119119
let mut connection_info =
120120
self.connected_node_info.context("Connected node info not found")?;
@@ -127,6 +127,6 @@ impl OutboundStream {
127127
cluster_actor_handler,
128128
);
129129

130-
Ok((ClusterCommand::AddPeer(AddPeer { peer_id: self.connect_to, peer }, Sender), peer_list))
130+
Ok((ClusterCommand::AddPeer(AddPeer { peer_id: self.connect_to, peer }, sender), peer_list))
131131
}
132132
}

duva/src/services/handlers/cache.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,9 @@ impl CacheActor {
6161
}
6262
};
6363
},
64-
CacheCommand::Drop => {
64+
CacheCommand::Drop { callback } => {
6565
self.cache.clear();
66+
let _ = callback.send(());
6667
},
6768
}
6869
}
@@ -104,7 +105,9 @@ mod test {
104105
self.0.send(CacheCommand::Ping).await.unwrap();
105106
}
106107
async fn drop(&self) {
107-
self.0.send(CacheCommand::Drop).await.unwrap();
108+
let (tx, rx) = oneshot::channel();
109+
self.0.send(CacheCommand::Drop { callback: tx }).await.unwrap();
110+
let _ = rx.await;
108111
}
109112
}
110113

0 commit comments

Comments
 (0)