Skip to content

Commit 64222b1

Browse files
authored
Merge pull request #743 from Migorithm/feat/742
feat: lset
2 parents ad46bb3 + 630950a commit 64222b1

11 files changed

Lines changed: 183 additions & 31 deletions

File tree

duva-client/src/controller.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ impl<T> ClientController<T> {
9696
};
9797
Response::Null
9898
},
99-
| Set { .. } | SetWithExpiry { .. } | LTrim { .. } => match query_io {
99+
| Set { .. } | SetWithExpiry { .. } | LTrim { .. } | LSet { .. } => match query_io {
100100
| QueryIO::SimpleString(_) => Response::String("OK".into()),
101101
| QueryIO::Err(value) => Response::Error(value),
102102
| _ => Response::FormatError,

duva/src/domains/caches/actor.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,22 @@ impl CacheActor {
220220
| _ => Err(anyhow::anyhow!(WRONG_TYPE_ERR_MSG)),
221221
}
222222
}
223+
224+
pub(crate) fn lset(
225+
&mut self,
226+
key: String,
227+
index: isize,
228+
val: String,
229+
) -> Result<(), anyhow::Error> {
230+
let Some(var) = self.cache.get_mut(&key) else {
231+
return Err(anyhow::anyhow!("ERR no such key"));
232+
};
233+
let CacheValue { value: TypedValue::List(list), .. } = var else {
234+
return Err(anyhow::anyhow!(WRONG_TYPE_ERR_MSG));
235+
};
236+
237+
list.lset(index, val)
238+
}
223239
}
224240

225241
#[derive(Clone, Debug)]

duva/src/domains/caches/cache_manager.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,9 @@ impl CacheManager {
198198
| WriteRequest::LPushX { key, value } => {
199199
self.route_lpushx(key, value, log_index).await?;
200200
},
201+
| WriteRequest::LSet { key, index, value } => {
202+
self.route_lset(key, index, value, log_index).await?;
203+
},
201204
};
202205

203206
// * This is to wake up the cache actors to process the pending read requests
@@ -422,6 +425,21 @@ impl CacheManager {
422425
let value = rx.await??;
423426
Ok(value)
424427
}
428+
429+
pub(crate) async fn route_lset(
430+
&self,
431+
key: String,
432+
index: isize,
433+
value: String,
434+
current_idx: u64,
435+
) -> Result<String> {
436+
let (tx, rx) = tokio::sync::oneshot::channel();
437+
self.select_shard(key.as_str())
438+
.send(CacheCommand::LSet { key, index, value, callback: tx.into() })
439+
.await?;
440+
rx.await??;
441+
Ok(IndexedValueCodec::encode("", current_idx))
442+
}
425443
}
426444

427445
pub struct IndexedValueCodec;

duva/src/domains/caches/cache_objects/types/quicklist.rs

Lines changed: 80 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ struct QuickListNode {
123123

124124
impl QuickListNode {
125125
/// Ensures the node is decompressed. Must be called before any read/write.
126-
fn ensure_decompressed(&mut self, fill_factor: &FillFactor) {
126+
fn ensure_decompressed(&mut self, fill_factor: &FillFactor) -> &mut Ziplist {
127127
let current_data =
128128
std::mem::replace(&mut self.data, NodeData::Uncompressed(Ziplist::default()));
129129
self.data = match current_data {
@@ -138,6 +138,9 @@ impl QuickListNode {
138138
},
139139
| uncompressed => uncompressed,
140140
};
141+
142+
let NodeData::Uncompressed(ziplist) = &mut self.data else { panic!() };
143+
ziplist
141144
}
142145

143146
/// Attempts to compress the node.
@@ -308,21 +311,13 @@ impl QuickList {
308311
if should_merge {
309312
// First, remove the next node from the list.
310313
let mut removed_node = self.nodes.remove(index + 1).unwrap();
311-
removed_node.ensure_decompressed(&self.fill_factor);
314+
let removed_ziplist = removed_node.ensure_decompressed(&self.fill_factor);
312315

313316
// Now, get mutable access to the current node to merge into.
314317
let current_node = &mut self.nodes[index];
315-
current_node.ensure_decompressed(&self.fill_factor);
316318

317-
// Use pattern matching to get the ziplists and perform the merge.
318-
if let (
319-
NodeData::Uncompressed(current_ziplist),
320-
NodeData::Uncompressed(removed_ziplist),
321-
) = (&mut current_node.data, &mut removed_node.data)
322-
{
323-
current_ziplist.extend_from_slice(removed_ziplist);
324-
current_node.entry_count += removed_node.entry_count;
325-
}
319+
current_node.ensure_decompressed(&self.fill_factor).extend_from_slice(removed_ziplist);
320+
current_node.entry_count += removed_node.entry_count;
326321

327322
self.return_node(removed_node);
328323
}
@@ -451,21 +446,21 @@ impl QuickList {
451446
}
452447

453448
// 3. Decompress the node and read its entries
454-
node.ensure_decompressed(&self.fill_factor);
455-
if let NodeData::Uncompressed(ziplist) = &node.data {
456-
let entries = ziplist.to_vec();
457-
for (i, entry) in entries.iter().enumerate() {
458-
let overall_index = current_index + i;
459-
if overall_index >= start && overall_index <= stop {
460-
// Clone the Bytes object and add it to our result.
461-
// Cloning is cheap (reference counted).
462-
result.push(entry.clone());
463-
}
464-
if overall_index >= stop {
465-
break;
466-
}
449+
let ziplist = node.ensure_decompressed(&self.fill_factor);
450+
451+
let entries = ziplist.to_vec();
452+
for (i, entry) in entries.iter().enumerate() {
453+
let overall_index = current_index + i;
454+
if overall_index >= start && overall_index <= stop {
455+
// Clone the Bytes object and add it to our result.
456+
// Cloning is cheap (reference counted).
457+
result.push(entry.clone());
458+
}
459+
if overall_index >= stop {
460+
break;
467461
}
468462
}
463+
469464
current_index += node_len;
470465
}
471466
result
@@ -522,15 +517,71 @@ impl QuickList {
522517
let mut current_index = 0;
523518
for node in &mut self.nodes {
524519
if current_index + node.entry_count > index {
525-
node.ensure_decompressed(&self.fill_factor);
526-
if let NodeData::Uncompressed(ziplist) = &node.data {
527-
return ziplist.to_vec().get(index - current_index).cloned();
528-
}
520+
let ziplist = node.ensure_decompressed(&self.fill_factor);
521+
return ziplist.to_vec().get(index - current_index).cloned();
529522
}
530523
current_index += node.entry_count;
531524
}
532525
None
533526
}
527+
528+
pub(crate) fn lset(&mut self, index: isize, value: String) -> anyhow::Result<()> {
529+
if self.len == 0 {
530+
return Err(anyhow::anyhow!("List is empty"));
531+
}
532+
533+
// Convert to absolute index
534+
let len = self.len as isize;
535+
let abs_index = if index < 0 { (len + index).max(0) } else { index } as usize;
536+
537+
if abs_index >= self.len {
538+
return Err(anyhow::anyhow!("Index out of bounds"));
539+
}
540+
541+
// Find the node containing this index
542+
let mut current_index = 0;
543+
544+
for node in &mut self.nodes {
545+
if current_index + node.entry_count <= abs_index {
546+
current_index += node.entry_count;
547+
continue;
548+
}
549+
550+
let local_index = abs_index - current_index;
551+
552+
let ziplist = node.ensure_decompressed(&self.fill_factor);
553+
let new_bytes = Bytes::from(value);
554+
let mut cursor = 0;
555+
556+
// Skip to the target entry
557+
for _ in 0..local_index {
558+
let entry_len =
559+
u32::from_le_bytes(ziplist[cursor..cursor + 4].try_into().unwrap()) as usize;
560+
cursor += 4 + entry_len;
561+
}
562+
563+
let old_entry_len =
564+
u32::from_le_bytes(ziplist[cursor..cursor + 4].try_into().unwrap()) as usize;
565+
566+
if new_bytes.len() == old_entry_len {
567+
// Same size - overwrite in place
568+
ziplist[cursor + 4..cursor + 4 + old_entry_len].copy_from_slice(&new_bytes);
569+
} else {
570+
// Different size - rebuild ziplist
571+
let mut new_ziplist =
572+
Vec::with_capacity(ziplist.len() - old_entry_len + new_bytes.len());
573+
new_ziplist.extend_from_slice(&ziplist[0..cursor]);
574+
new_ziplist.extend_from_slice(&(new_bytes.len() as u32).to_le_bytes());
575+
new_ziplist.extend_from_slice(&new_bytes);
576+
new_ziplist.extend_from_slice(&ziplist[cursor + 4 + old_entry_len..]);
577+
ziplist.0 = new_ziplist;
578+
}
579+
580+
return Ok(());
581+
}
582+
583+
Err(anyhow::anyhow!("Index not found"))
584+
}
534585
}
535586

536587
#[cfg(test)]

duva/src/domains/caches/command.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,4 +90,10 @@ pub(crate) enum CacheCommand {
9090
index: isize,
9191
callback: Callback<anyhow::Result<CacheValue>>,
9292
},
93+
LSet {
94+
key: String,
95+
index: isize,
96+
value: String,
97+
callback: Callback<anyhow::Result<()>>,
98+
},
9399
}

duva/src/domains/caches/service.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@ impl CacheActor {
9494
| CacheCommand::LIndex { key, index, callback } => {
9595
let _ = callback.send(self.lindex(key, index));
9696
},
97+
| CacheCommand::LSet { key, index, value, callback } => {
98+
let _ = callback.send(self.lset(key, index, value));
99+
},
97100
}
98101
}
99102
Ok(self)

duva/src/domains/operation_logs/operation.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ pub enum WriteRequest {
2626
RPush { key: String, value: Vec<String> },
2727
LTrim { key: String, start: isize, end: isize },
2828
LPushX { key: String, value: Vec<String> },
29+
LSet { key: String, index: isize, value: String },
2930
}
3031

3132
impl WriteOperation {
@@ -66,6 +67,7 @@ impl WriteRequest {
6667
| WriteRequest::RPush { key, .. } => vec![key],
6768
| WriteRequest::LTrim { key, .. } => vec![key],
6869
| WriteRequest::LPushX { key, .. } => vec![key],
70+
| WriteRequest::LSet { key, .. } => vec![key],
6971
}
7072
}
7173
}

duva/src/presentation/clients/controller.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,12 @@ impl ClientController {
209209
| ClientAction::LIndex { key, index } => {
210210
self.cache_manager.route_lindex(key, index).await?.into()
211211
},
212+
| ClientAction::LSet { key, index, value } => QueryIO::SimpleString(
213+
self.cache_manager
214+
.route_lset(key, index, value, current_index.unwrap())
215+
.await?
216+
.into(),
217+
),
212218
};
213219

214220
Ok(response)

duva/src/presentation/clients/request.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ pub enum ClientAction {
4343
RPush { key: String, value: Vec<String> },
4444
RPushX { key: String, value: Vec<String> },
4545
RPop { key: String, count: usize },
46+
LSet { key: String, index: isize, value: String },
4647

4748
LTrim { key: String, start: isize, end: isize },
4849
LLen { key: String },
@@ -78,7 +79,7 @@ impl ClientAction {
7879
| ClientAction::RPushX { key, value } => WriteRequest::RPush { key, value },
7980
| ClientAction::RPop { key, count } => WriteRequest::LPop { key, count },
8081
| ClientAction::LTrim { key, start, end } => WriteRequest::LTrim { key, start, end },
81-
82+
| ClientAction::LSet { key, index, value } => WriteRequest::LSet { key, index, value },
8283
| _ => {
8384
debug_assert!(false, "to_write_request called on non-write action: {self:?}");
8485
unreachable!(
@@ -107,6 +108,7 @@ impl ClientAction {
107108
| ClientAction::RPushX { .. }
108109
| ClientAction::RPop { .. }
109110
| ClientAction::LTrim { .. }
111+
| ClientAction::LSet { .. }
110112
)
111113
}
112114
}
@@ -385,6 +387,13 @@ pub fn extract_action(action: &str, args: &[&str]) -> anyhow::Result<ClientActio
385387
let index = args[1].parse::<isize>().context("Invalid index")?;
386388
Ok(ClientAction::LIndex { key, index })
387389
},
390+
| "LSET" => {
391+
require_exact_args(3)?;
392+
let key = args[0].to_string();
393+
let index = args[1].parse::<isize>().context("Invalid index")?;
394+
let value = args[2].to_string();
395+
Ok(ClientAction::LSet { key, index, value })
396+
},
388397

389398
// Add other commands as needed
390399
| unknown_cmd => Err(anyhow::anyhow!(

duva/tests/client_ops/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ mod test_lpop;
1414
mod test_lpush;
1515
mod test_lpushx;
1616
mod test_lrange;
17+
mod test_lset;
1718
mod test_ltrim;
1819
mod test_replication_info;
1920
mod test_rpop;

0 commit comments

Comments
 (0)