Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions duva/src/domains/caches/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,18 @@ impl CacheActor {

Ok(list.llen())
}
pub(crate) fn rpushx(&mut self, key: String, values: Vec<String>) -> usize {
let mut val = self.cache.get_mut(&key);

let Some(CacheValue { value: TypedValue::List(list), .. }) = val.as_mut() else {
return 0;
};
for v in values {
list.rpush(v.into());
}

list.llen()
}

pub(crate) fn pop(&mut self, key: String, count: usize, from_left: bool) -> Vec<String> {
let val = self.cache.remove(&key);
Expand Down
15 changes: 15 additions & 0 deletions duva/src/domains/caches/cache_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,21 @@ impl CacheManager {
let current_len = rx.await??;
Ok(IndexedValueCodec::encode(current_len, unwrap))
}
pub(crate) async fn route_rpushx(
&self,
key: String,
value: Vec<String>,
current_idx: u64,
) -> Result<String> {
let (tx, rx) = tokio::sync::oneshot::channel();

self.select_shard(&key)
.send(CacheCommand::RPushX { key, values: value, callback: tx.into() })
.await?;
let current_len = rx.await?;

Ok(IndexedValueCodec::encode(current_len, current_idx))
}

pub(crate) async fn route_rpop(&self, key: String, count: usize) -> Result<Vec<String>> {
let (tx, rx) = tokio::sync::oneshot::channel();
Expand Down
5 changes: 5 additions & 0 deletions duva/src/domains/caches/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,9 @@ pub(crate) enum CacheCommand {
value: String,
callback: Callback<anyhow::Result<()>>,
},
RPushX {
key: String,
values: Vec<String>,
callback: Callback<usize>,
},
}
3 changes: 3 additions & 0 deletions duva/src/domains/caches/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ impl CacheActor {
| CacheCommand::RPush { key, values, callback } => {
let _ = callback.send(self.rpush(key, values));
},
| CacheCommand::RPushX { key, values, callback } => {
let _ = callback.send(self.rpushx(key, values));
},
| CacheCommand::RPop { key, count, callback } => {
let _ = callback.send(self.pop(key, count, false));
},
Expand Down
4 changes: 3 additions & 1 deletion duva/src/presentation/clients/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,9 @@ impl ClientController {
| ClientAction::RPush { key, value } => QueryIO::SimpleString(
self.cache_manager.route_rpush(key, value, current_index.unwrap()).await?.into(),
),
| ClientAction::RPushX { key, value } => todo!(),
| ClientAction::RPushX { key, value } => QueryIO::SimpleString(
self.cache_manager.route_rpushx(key, value, current_index.unwrap()).await?.into(),
),
| ClientAction::RPop { key, count } => {
let values = self.cache_manager.route_rpop(key, count).await?;
if values.is_empty() {
Expand Down
1 change: 0 additions & 1 deletion duva/src/presentation/clients/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ pub enum ClientAction {
RPushX { key: String, value: Vec<String> },
RPop { key: String, count: usize },
LSet { key: String, index: isize, value: String },

LTrim { key: String, start: isize, end: isize },
LLen { key: String },
LRange { key: String, start: isize, end: isize },
Expand Down
1 change: 1 addition & 0 deletions duva/tests/client_ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ mod test_ltrim;
mod test_replication_info;
mod test_rpop;
mod test_rpush;
mod test_rpushx;
mod test_set_get;
mod test_snapshot_persists_and_recovers_state;
mod test_ttl;
26 changes: 26 additions & 0 deletions duva/tests/client_ops/test_rpushx.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
// Inserts specified values at the head of the list stored at key, only if key already exists and holds a list.

use crate::common::{Client, ServerEnv, spawn_server_process};

fn run_rpushx(env: ServerEnv) -> anyhow::Result<()> {
// GIVEN
let process = spawn_server_process(&env)?;

let mut h = Client::new(process.port);

//WHEN & THEN
assert_eq!(h.send_and_get(format!("RPUSH test 1")), "(integer) 1");
assert_eq!(h.send_and_get(format!("RPUSHX test 2")), "(integer) 2");
assert_eq!(h.send_and_get(format!("RPUSHX test2 1")), "(integer) 0");

Ok(())
}

#[test]
fn test_rpushx() -> anyhow::Result<()> {
for env in [ServerEnv::default(), ServerEnv::default().with_append_only(true)] {
run_rpushx(env)?;
}

Ok(())
}
Loading