Skip to content

Commit ad46bb3

Browse files
authored
Merge pull request #740 from Migorithm/feat/739
feat: lindex
2 parents c07a009 + 0f91df9 commit ad46bb3

13 files changed

Lines changed: 108 additions & 17 deletions

File tree

duva-client/src/broker/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ impl Broker {
130130
// pull-based leader discovery
131131

132132
async fn discover_leader(&mut self) -> Result<(), IoError> {
133-
for node in self.topology.node_infos.iter().map(|n| n.peer_id.clone()).into_iter() {
133+
for node in self.topology.node_infos.iter().map(|n| n.peer_id.clone()) {
134134
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
135135
println!("Trying to connect to node: {node}...");
136136

duva-client/src/controller.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ impl<T> ClientController<T> {
4242
match kind {
4343
| Ping
4444
| Get { .. }
45+
| LIndex { .. }
4546
| IndexGet { .. }
4647
| Echo { .. }
4748
| Config { .. }

duva/src/domains/caches/actor.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,20 @@ impl CacheActor {
206206
| _ => Err(anyhow::anyhow!(WRONG_TYPE_ERR_MSG)),
207207
}
208208
}
209+
210+
pub(crate) fn lindex(&mut self, key: String, index: isize) -> anyhow::Result<CacheValue> {
211+
let Some(CacheValue { value, .. }) = self.cache.get_mut(&key) else {
212+
return Ok(CacheValue::new(TypedValue::Null));
213+
};
214+
match value {
215+
| TypedValue::List(list) => Ok(list
216+
.lindex(index)
217+
.map(|v| CacheValue::new(TypedValue::String(v)))
218+
.unwrap_or_default()),
219+
220+
| _ => Err(anyhow::anyhow!(WRONG_TYPE_ERR_MSG)),
221+
}
222+
}
209223
}
210224

211225
#[derive(Clone, Debug)]

duva/src/domains/caches/cache_manager.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -413,6 +413,15 @@ impl CacheManager {
413413

414414
Ok(IndexedValueCodec::encode("".to_string(), current_idx))
415415
}
416+
417+
pub(crate) async fn route_lindex(&self, key: String, index: isize) -> Result<CacheValue> {
418+
let (tx, rx) = tokio::sync::oneshot::channel();
419+
self.select_shard(&key)
420+
.send(CacheCommand::LIndex { key, index, callback: tx.into() })
421+
.await?;
422+
let value = rx.await??;
423+
Ok(value)
424+
}
416425
}
417426

418427
pub struct IndexedValueCodec;

duva/src/domains/caches/cache_objects/types/quicklist.rs

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -294,11 +294,11 @@ impl QuickList {
294294
}
295295

296296
let max_bytes = kb * 1024;
297+
let merge_threshold = max_bytes / 4;
297298

298299
let should_merge = {
299300
let node = &self.nodes[index];
300301
let next = &self.nodes[index + 1];
301-
let merge_threshold = max_bytes / 4;
302302

303303
(node.entry_count > 0 && next.entry_count > 0)
304304
&& (node.byte_size() < merge_threshold || next.byte_size() < merge_threshold)
@@ -408,10 +408,13 @@ impl QuickList {
408408
pub fn compress(&mut self) {
409409
let node_count = self.nodes.len();
410410
if self.compress_depth > 0 && node_count > self.compress_depth * 2 {
411-
for i in self.compress_depth..(node_count - self.compress_depth) {
412-
if let Some(node) = self.nodes.get_mut(i) {
413-
node.try_compress();
414-
}
411+
for node in self
412+
.nodes
413+
.iter_mut()
414+
.skip(self.compress_depth)
415+
.take(node_count - self.compress_depth * 2)
416+
{
417+
node.try_compress();
415418
}
416419
}
417420
}
@@ -502,6 +505,32 @@ impl QuickList {
502505
self.rpush(element);
503506
}
504507
}
508+
509+
pub(crate) fn lindex(&mut self, index: isize) -> Option<Bytes> {
510+
if self.len == 0 {
511+
return None;
512+
}
513+
514+
// Calculate absolute index
515+
let len = self.len as isize;
516+
let index = if index < 0 { (len + index).max(0) } else { index } as usize;
517+
518+
if index >= self.len {
519+
return None; // Out of bounds
520+
}
521+
522+
let mut current_index = 0;
523+
for node in &mut self.nodes {
524+
if current_index + node.entry_count > index {
525+
node.ensure_decompressed(&self.fill_factor);
526+
if let NodeData::Uncompressed(ziplist) = &node.data {
527+
return ziplist.to_vec().get(index - current_index).cloned();
528+
}
529+
}
530+
current_index += node.entry_count;
531+
}
532+
None
533+
}
505534
}
506535

507536
#[cfg(test)]

duva/src/domains/caches/command.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,9 @@ pub(crate) enum CacheCommand {
8585
end: isize,
8686
callback: Callback<anyhow::Result<()>>,
8787
},
88+
LIndex {
89+
key: String,
90+
index: isize,
91+
callback: Callback<anyhow::Result<CacheValue>>,
92+
},
8893
}

duva/src/domains/caches/service.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,9 @@ impl CacheActor {
9191
| CacheCommand::LTrim { key, start, end, callback } => {
9292
let _ = callback.send(self.ltrim(key, start, end));
9393
},
94+
| CacheCommand::LIndex { key, index, callback } => {
95+
let _ = callback.send(self.lindex(key, index));
96+
},
9497
}
9598
}
9699
Ok(self)

duva/src/domains/cluster_actors/topology.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ impl NodeReplInfo {
3333
}
3434
}
3535

36-
pub fn from_replication_state(replication_state: &ReplicationState) -> Self {
36+
pub(crate) fn from_replication_state(replication_state: &ReplicationState) -> Self {
3737
Self {
3838
peer_id: replication_state.self_identifier(),
3939
repl_id: replication_state.replid.clone(),

duva/src/domains/query_io.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -467,9 +467,7 @@ mod test {
467467
use super::*;
468468
use crate::domains::caches::cache_objects::CacheEntry;
469469
use crate::domains::cluster_actors::hash_ring::{BatchId, HashRing};
470-
use crate::domains::cluster_actors::replication::{
471-
ReplicationId, ReplicationRole, ReplicationState,
472-
};
470+
use crate::domains::cluster_actors::replication::{ReplicationId, ReplicationRole};
473471
use crate::domains::cluster_actors::topology::NodeReplInfo;
474472
use crate::domains::operation_logs::WriteRequest;
475473
use crate::domains::peers::command::BannedPeer;

duva/src/presentation/clients/controller.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,8 +184,6 @@ impl ClientController {
184184
| ClientAction::RPush { key, value } => QueryIO::SimpleString(
185185
self.cache_manager.route_rpush(key, value, current_index.unwrap()).await?.into(),
186186
),
187-
188-
//TODO
189187
| ClientAction::RPushX { key, value } => todo!(),
190188
| ClientAction::RPop { key, count } => {
191189
let values = self.cache_manager.route_rpop(key, count).await?;
@@ -208,6 +206,9 @@ impl ClientController {
208206
let values = self.cache_manager.route_lrange(key, start, end).await?;
209207
QueryIO::Array(values.into_iter().map(|v| QueryIO::BulkString(v.into())).collect())
210208
},
209+
| ClientAction::LIndex { key, index } => {
210+
self.cache_manager.route_lindex(key, index).await?.into()
211+
},
211212
};
212213

213214
Ok(response)

0 commit comments

Comments
 (0)