Skip to content

Commit 70c3522

Browse files
authored
Merge pull request #438 from Migorithm/feat/435
feat: exists command on cli
2 parents ec072c9 + 2378936 commit 70c3522

10 files changed

Lines changed: 93 additions & 14 deletions

File tree

duva-client/src/command.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,14 @@ pub fn take_input(action: &str, args: &[&str]) -> Result<ClientInputKind, String
4848
}
4949
Ok(ClientInputKind::Del)
5050
},
51+
"EXISTS" => {
52+
if args.len() < 1 {
53+
return Err(
54+
"(error) ERR wrong number of arguments for 'exists' command".to_string()
55+
);
56+
}
57+
Ok(ClientInputKind::Exists)
58+
},
5159

5260
"PING" => Ok(ClientInputKind::Ping),
5361
"ECHO" => {
@@ -106,4 +114,5 @@ pub enum ClientInputKind {
106114
ClusterInfo,
107115
ClusterNodes,
108116
ClusterForget,
117+
Exists,
109118
}

duva-client/src/controller.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -132,14 +132,13 @@ impl<T> ClientController<T> {
132132
return Err("Unexpected response format".to_string());
133133
},
134134
},
135-
Del => {
135+
Del | Exists => {
136136
let QueryIO::SimpleString(value) = query_io else {
137137
return Err("Unexpected response format".to_string());
138138
};
139139
let deleted_count = value.parse::<u64>().unwrap();
140140
println!("(integer) {}", deleted_count);
141141
},
142-
143142
Set => {
144143
let v = match query_io {
145144
QueryIO::SimpleString(value) => value,
@@ -154,7 +153,6 @@ impl<T> ClientController<T> {
154153
self.latest_known_index = rindex.parse::<u64>().unwrap();
155154
println!("OK");
156155
},
157-
158156
ClusterNodes => {
159157
let QueryIO::Array(value) = query_io else {
160158
return Err("Unexpected response format".to_string());

duva/src/domains/caches/actor.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,13 @@ impl CacheActor {
6060
let _ = callback.send(false);
6161
}
6262
}
63+
pub(crate) fn exists(&self, key: String, callback: oneshot::Sender<bool>) {
64+
if self.cache.get(&key).is_some() {
65+
let _ = callback.send(true);
66+
} else {
67+
let _ = callback.send(false);
68+
}
69+
}
6370
pub(crate) fn get(&self, key: &str, callback: oneshot::Sender<QueryIO>) {
6471
let _ = callback.send(self.cache.get(key).cloned().into());
6572
}

duva/src/domains/caches/cache_manager.rs

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,11 @@ use chrono::Utc;
1414
use futures::StreamExt;
1515
use futures::future::join_all;
1616
use futures::stream::FuturesUnordered;
17+
use tokio::sync::oneshot::error::RecvError;
18+
1719
use std::sync::Arc;
1820
use std::sync::atomic::AtomicU64;
21+
1922
use std::{hash::Hasher, iter::Zip};
2023
use tokio::sync::oneshot::Sender;
2124
use tokio::task::JoinHandle;
@@ -169,27 +172,43 @@ impl CacheManager {
169172
}
170173

171174
pub(crate) async fn route_delete(&self, keys: Vec<String>) -> Result<u64> {
175+
let closure = |key, callback| -> CacheCommand { CacheCommand::Delete { key, callback } };
172176
// Create futures for all delete operations at once
173-
let results = FuturesUnordered::from_iter(keys.into_iter().map(|key| {
174-
let (tx, rx) = tokio::sync::oneshot::channel();
175-
async move {
176-
let _ =
177-
self.select_shard(&key).send(CacheCommand::Delete { key, callback: tx }).await;
178-
rx.await
179-
}
180-
}))
181-
.collect::<Vec<_>>()
182-
.await;
177+
let results = self.send_selectively(keys, closure).await;
183178

184179
let deleted = results.into_iter().filter_map(|r| r.ok().filter(|&success| success)).count();
185-
186180
Ok(deleted as u64)
187181
}
182+
pub(crate) async fn route_exists(&self, keys: Vec<String>) -> Result<u64> {
183+
let closure = |key, callback| -> CacheCommand { CacheCommand::Exists { key, callback } };
184+
// Create futures for all delete operations at once
185+
let results = self.send_selectively(keys, closure).await;
186+
187+
let found = results.into_iter().filter_map(|r| r.ok().filter(|&success| success)).count();
188+
Ok(found as u64)
189+
}
190+
188191
pub(crate) fn select_shard(&self, key: &str) -> &CacheCommandSender {
189192
let shard_key = self.take_shard_key_from_str(key);
190193
&self.inboxes[shard_key]
191194
}
192195

196+
async fn send_selectively<T>(
197+
&self,
198+
keys: Vec<String>,
199+
func: fn(String, Sender<T>) -> CacheCommand,
200+
) -> Vec<Result<T, RecvError>> {
201+
FuturesUnordered::from_iter(keys.into_iter().map(|key| {
202+
let (tx, rx) = tokio::sync::oneshot::channel();
203+
async move {
204+
let _ = self.select_shard(&key).send(func(key, tx)).await;
205+
rx.await
206+
}
207+
}))
208+
.collect::<Vec<_>>()
209+
.await
210+
}
211+
193212
fn take_shard_key_from_str(&self, s: &str) -> usize {
194213
let mut hasher = std::hash::DefaultHasher::new();
195214
std::hash::Hash::hash(&s, &mut hasher);

duva/src/domains/caches/command.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@ pub enum CacheCommand {
1313
IndexGet { key: String, read_idx: u64, callback: oneshot::Sender<QueryIO> },
1414
Ping,
1515
StopSentinel,
16+
Exists { key: String, callback: oneshot::Sender<bool> },
1617
}

duva/src/presentation/clients/controllers/handler.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ impl ClientController<Handler> {
6868
ClientAction::Delete { keys } => {
6969
QueryIO::SimpleString(self.cache_manager.route_delete(keys).await?.to_string())
7070
},
71+
72+
ClientAction::Exists { keys } => {
73+
QueryIO::SimpleString(self.cache_manager.route_exists(keys).await?.to_string())
74+
},
7175
ClientAction::Info => QueryIO::BulkString(
7276
self.cluster_communication_manager
7377
.replication_info()

duva/src/presentation/clients/parser.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ pub fn parse_query(
2828
}
2929
},
3030
("del", keys) => ClientAction::Delete { keys: keys.to_vec() },
31+
("exists", keys) => ClientAction::Exists { keys: keys.to_vec() },
3132
("echo", [value]) => ClientAction::Echo(value.to_string()),
3233
("config", [key, value]) => {
3334
ClientAction::Config { key: key.to_string(), value: value.to_string() }

duva/src/presentation/clients/request.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ pub(crate) enum ClientAction {
2020
ClusterInfo,
2121
ClusterNodes,
2222
ClusterForget(PeerIdentifier),
23+
Exists { keys: Vec<String> },
2324
}
2425

2526
impl ClientAction {

duva/src/services/handlers/cache.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ impl CacheActor {
3838
CacheCommand::Delete { key, callback } => {
3939
self.delete(key, callback);
4040
},
41+
CacheCommand::Exists { key, callback } => {
42+
self.exists(key, callback);
43+
},
4144
CacheCommand::Save { outbox } => {
4245
outbox
4346
.send(SaveCommand::LocalShardSize {

duva/tests/test_exists.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/// The following is to test out the set operation with expiry
2+
/// Firstly, we set a key with a value and an expiry of 300ms
3+
/// Then we get the key and check if the value is returned
4+
/// After 300ms, we get the key again and check if the value is not returned (-1)
5+
mod common;
6+
use common::{ServerEnv, array, spawn_server_process};
7+
8+
use duva::{clients::ClientStreamHandler, domains::query_parsers::query_io::QueryIO};
9+
10+
#[tokio::test]
11+
async fn test_exists() {
12+
// GIVEN
13+
let env = ServerEnv::default();
14+
let process = spawn_server_process(&env);
15+
16+
let mut h = ClientStreamHandler::new(process.bind_addr()).await;
17+
assert_eq!(
18+
h.send_and_get(&array(vec!["SET", "a", "b"])).await,
19+
QueryIO::SimpleString("OK RINDEX 1".into()).serialize()
20+
);
21+
assert_eq!(
22+
h.send_and_get(&array(vec!["SET", "c", "d"])).await,
23+
QueryIO::SimpleString("OK RINDEX 2".into()).serialize()
24+
);
25+
26+
// WHEN & THEN
27+
assert_eq!(
28+
h.send_and_get(&array(vec!["exists", "a", "c", "d"])).await,
29+
QueryIO::SimpleString("2".into()).serialize() // 2 means 2 keys deleted
30+
);
31+
32+
assert_eq!(
33+
h.send_and_get(&array(vec!["exists", "x"])).await,
34+
QueryIO::SimpleString("0".into()).serialize() // 2 means 2 keys deleted
35+
);
36+
}

0 commit comments

Comments
 (0)