Skip to content

Commit 645b913

Browse files
committed
merge conflict resolved
2 parents 50d3cb8 + 8634269 commit 645b913

8 files changed

Lines changed: 221 additions & 177 deletions

File tree

duva-client/src/broker/mod.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,7 @@ impl Broker {
142142
// ! If request is updating action yet receive error, we need to increase the request id
143143
// ! otherwise, server will not be able to process the next command
144144
fn extract_req_id(request_id: u64, kind: &ClientAction, query_io: &QueryIO) -> Option<u64> {
145-
if !kind.consensus_required() {
146-
return None;
147-
}
145+
kind.to_write_request()?;
148146
match query_io {
149147
// * Current rule: s:value-idx:index_num
150148
| QueryIO::SimpleString(v) => {

duva-client/src/cli/editor/completion.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ pub(crate) static COMMANDS: &[&str] = &[
3232
"cluster reshard",
3333
"info replication",
3434
"replicaof",
35+
"lpush",
36+
"lpushx",
37+
"rpush",
38+
"rpushx",
3539
];
3640

3741
macro_rules! new_pair {
@@ -143,6 +147,15 @@ impl Completer for DuvaHinter {
143147
candidates.push(new_pair!("key"));
144148
}
145149
},
150+
| "lpush" | "lpushx" | "rpush" | "rpushx" => {
151+
if previous_words.len() == 1 {
152+
// Suggest "key" after set
153+
candidates.push(new_pair!("key"));
154+
} else if previous_words.len() > 1 {
155+
// Suggest "value" after set key
156+
candidates.push(new_pair!("value"));
157+
}
158+
},
146159
| "get" | "incr" | "decr" | "ttl" => {
147160
if previous_words.len() == 1 {
148161
// Suggest "index" after get key

duva-client/src/cli/editor/hints.rs

Lines changed: 100 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ pub struct CommandHint {
1515
}
1616

1717
impl CommandHint {
18-
fn new(text: &str, complete_up_to: &str) -> Self {
19-
assert!(text.starts_with(complete_up_to));
20-
Self { display: text.into(), complete_up_to: complete_up_to.len() }
18+
fn new(text: &str) -> Self {
19+
let first = text.split_whitespace().next().unwrap_or("");
20+
Self { display: text.into(), complete_up_to: first.len() + 1 }
2121
}
2222

2323
fn suffix(&self, strip_chars: usize) -> Self {
@@ -55,91 +55,93 @@ impl Hinter for DuvaHinter {
5555

5656
let command = parts[0];
5757
let args_count = parts.len() - 1;
58-
let ends_with_space = line.ends_with(" ");
59-
60-
if let Some(patterns) = self.dynamic_hints.get(command.to_lowercase().as_str()) {
61-
for hint in patterns {
62-
// Skip if we have more args than required (unless it's a repeating pattern)
63-
if args_count > hint.args_required && !hint.repeat_last_arg {
64-
continue;
65-
}
66-
67-
// For repeating patterns, show the hint after reaching minimum args
68-
if hint.repeat_last_arg && args_count >= hint.args_required {
69-
let hint_text = if hint.args_required == 0 && args_count == 0 {
70-
hint.hint_text
71-
} else {
72-
// After first argument, show just the repeating part
73-
hint.hint_text
74-
.split_once(' ')
75-
.map(|(_, rest)| rest)
76-
.unwrap_or(hint.hint_text)
77-
};
78-
79-
return Some(CommandHint::new(
80-
if ends_with_space {
81-
hint_text.to_string()
82-
} else {
83-
format!(" {hint_text}")
84-
}
85-
.as_str(),
86-
"",
87-
));
88-
}
89-
90-
// Original behavior for non-repeating patterns
91-
if args_count == hint.args_required {
92-
return Some(CommandHint::new(
93-
if ends_with_space {
94-
hint.hint_text.to_string()
95-
} else {
96-
format!(" {}", hint.hint_text)
97-
}
98-
.as_str(),
99-
"",
100-
));
101-
}
102-
}
58+
59+
let needs_space = !line.ends_with(" ");
60+
61+
if let Some(hint) = self.get_dynamic_hint(command, args_count, needs_space) {
62+
return Some(hint);
10363
}
10464

105-
// Default behavior - try to match against full hints
106-
let mut matching_hints =
107-
self.default_hints
108-
.iter()
109-
.filter_map(|hint| {
110-
if hint.display.starts_with(line) { Some(hint.suffix(pos)) } else { None }
111-
})
112-
.collect::<Vec<_>>();
65+
// Fall back to default hints
66+
self.get_default_hint(line, pos)
67+
}
68+
}
69+
70+
impl DuvaHinter {
71+
fn get_dynamic_hint(
72+
&self,
73+
command: &str,
74+
args_count: usize,
75+
needs_space: bool,
76+
) -> Option<CommandHint> {
77+
let patterns = self.dynamic_hints.get(command.to_lowercase().as_str())?;
78+
79+
for hint in patterns {
80+
let hint_text = if hint.repeat_last_arg && args_count >= hint.args_required {
81+
// Show repeating pattern
82+
self.get_repeating_hint_text(hint, args_count)
83+
} else if args_count == hint.args_required && !hint.repeat_last_arg {
84+
// Show exact match
85+
hint.hint_text
86+
} else {
87+
continue;
88+
};
89+
90+
let hint_text = if needs_space { &format!(" {hint_text}") } else { hint_text };
91+
return Some(CommandHint::new(hint_text));
92+
}
93+
94+
None
95+
}
96+
97+
fn get_repeating_hint_text(&self, hint: &DynamicHint, args_count: usize) -> &str {
98+
if hint.args_required == 0 && args_count == 0 {
99+
hint.hint_text
100+
} else {
101+
hint.hint_text.split_once(' ').map(|(_, rest)| rest).unwrap_or(hint.hint_text)
102+
}
103+
}
113104

114-
matching_hints.sort_by(|a, b| a.display.len().cmp(&b.display.len()));
115-
matching_hints.into_iter().next()
105+
fn get_default_hint(&self, line: &str, pos: usize) -> Option<CommandHint> {
106+
self.default_hints
107+
.iter()
108+
.filter_map(
109+
|hint| {
110+
if hint.display.starts_with(line) { Some(hint.suffix(pos)) } else { None }
111+
},
112+
)
113+
.min_by_key(|hint| hint.display.len())
116114
}
117115
}
118116

119117
pub(crate) fn default_hints() -> HashSet<CommandHint> {
120118
let mut set = HashSet::new();
121-
set.insert(CommandHint::new("get key", "get "));
122-
set.insert(CommandHint::new("set key value", "set "));
123-
set.insert(CommandHint::new("set key value [px expr]", "set "));
124-
set.insert(CommandHint::new("append key value", "append "));
125-
set.insert(CommandHint::new("incr key", "incr "));
126-
set.insert(CommandHint::new("incrby key value", "incrby "));
127-
set.insert(CommandHint::new("decr key", "decr "));
128-
set.insert(CommandHint::new("decrby key value", "decrby "));
129-
set.insert(CommandHint::new("cluster info", "cluster "));
130-
set.insert(CommandHint::new("cluster nodes", "cluster "));
131-
set.insert(CommandHint::new("cluster forget node", "cluster "));
132-
set.insert(CommandHint::new("cluster reshard", "cluster "));
133-
set.insert(CommandHint::new("cluster meet node [lazy|eager]", "cluster "));
134-
set.insert(CommandHint::new("ping", ""));
135-
set.insert(CommandHint::new("keys pattern", "keys "));
136-
set.insert(CommandHint::new("info [section]", ""));
137-
set.insert(CommandHint::new("info replication", ""));
138-
set.insert(CommandHint::new("exists key [key ...]", "exists "));
139-
set.insert(CommandHint::new("mget key [key ...]", "mget "));
140-
set.insert(CommandHint::new("del key [key ...]", "del "));
141-
set.insert(CommandHint::new("ttl key", "ttl "));
142-
set.insert(CommandHint::new("replicaof host port", "replicaof "));
119+
set.insert(CommandHint::new("get key"));
120+
set.insert(CommandHint::new("set key value"));
121+
set.insert(CommandHint::new("set key value [px expr]"));
122+
set.insert(CommandHint::new("append key value"));
123+
set.insert(CommandHint::new("incr key"));
124+
set.insert(CommandHint::new("incrby key value"));
125+
set.insert(CommandHint::new("decr key"));
126+
set.insert(CommandHint::new("decrby key value"));
127+
set.insert(CommandHint::new("cluster info"));
128+
set.insert(CommandHint::new("cluster nodes"));
129+
set.insert(CommandHint::new("cluster forget node"));
130+
set.insert(CommandHint::new("cluster reshard"));
131+
set.insert(CommandHint::new("cluster meet node [lazy|eager]"));
132+
set.insert(CommandHint::new("ping"));
133+
set.insert(CommandHint::new("keys pattern"));
134+
set.insert(CommandHint::new("info [section]"));
135+
set.insert(CommandHint::new("info replication"));
136+
set.insert(CommandHint::new("exists key [key ...]"));
137+
set.insert(CommandHint::new("mget key [key ...]"));
138+
set.insert(CommandHint::new("del key [key ...]"));
139+
set.insert(CommandHint::new("ttl key"));
140+
set.insert(CommandHint::new("replicaof host port"));
141+
set.insert(CommandHint::new("lpush key value [value ...]"));
142+
set.insert(CommandHint::new("lpushx key value [value ...]"));
143+
set.insert(CommandHint::new("rpush key value [value ...]"));
144+
set.insert(CommandHint::new("rpushx key value [value ...]"));
143145

144146
set
145147
}
@@ -168,6 +170,7 @@ pub(crate) fn dynamic_hints() -> HashMap<&'static str, Vec<DynamicHint>> {
168170
vec![hint!("key value", 0), hint!("value", 1), hint!("[px expr]", 2), hint!("expr", 3)],
169171
);
170172
map.insert("append", vec![hint!("key value", 0), hint!("value", 1)]);
173+
171174
map.insert("incrby", vec![hint!("key increment", 0), hint!("increment", 1)]);
172175
map.insert("decrby", vec![hint!("key decrement", 0), hint!("decrement", 1)]);
173176

@@ -178,6 +181,23 @@ pub(crate) fn dynamic_hints() -> HashMap<&'static str, Vec<DynamicHint>> {
178181
map.insert("exists", vec![hint!("key [key ...]", 0, repeat), hint!("[key ...]", 1, repeat)]);
179182
map.insert("del", vec![hint!("key [key ...]", 0, repeat), hint!("[key ...]", 1, repeat)]);
180183
map.insert("mget", vec![hint!("key [key ...]", 0, repeat), hint!("[key ...]", 1, repeat)]);
184+
map.insert(
185+
"lpush",
186+
vec![hint!("key value [value ...]", 0, repeat), hint!("[value ...]", 1, repeat)],
187+
);
188+
map.insert(
189+
"lpushx",
190+
vec![hint!("key value [value ...]", 0, repeat), hint!("[value ...]", 1, repeat)],
191+
);
192+
map.insert(
193+
"rpush",
194+
vec![hint!("key value [value ...]", 0, repeat), hint!("[value ...]", 1, repeat)],
195+
);
196+
197+
map.insert(
198+
"rpushx",
199+
vec![hint!("key value [value ...]", 0, repeat), hint!("[value ...]", 1, repeat)],
200+
);
181201
map.insert("replicaof", vec![hint!("host port", 0), hint!("port", 1)]);
182202

183203
map

duva/src/domains/caches/cache_manager.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,12 @@ impl CacheManager {
216216
| WriteRequest::LSet { key, index, value } => {
217217
self.route_lset(key, index, value, log_index).await?;
218218
},
219+
| WriteRequest::RPop { key, count } => {
220+
self.route_rpop(key, count).await?;
221+
},
222+
| WriteRequest::RPushX { key, value } => {
223+
self.route_rpushx(key, value, log_index).await?;
224+
},
219225
};
220226

221227
// * This is to wake up the cache actors to process the pending read requests

duva/src/domains/operation_logs/operation.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,13 @@ pub enum WriteRequest {
2222
Decr { key: String, delta: i64 },
2323
Incr { key: String, delta: i64 },
2424
LPush { key: String, value: Vec<String> },
25+
LPushX { key: String, value: Vec<String> },
2526
LPop { key: String, count: usize },
2627
RPush { key: String, value: Vec<String> },
28+
RPop { key: String, count: usize },
29+
RPushX { key: String, value: Vec<String> },
2730
LTrim { key: String, start: isize, end: isize },
28-
LPushX { key: String, value: Vec<String> },
31+
2932
LSet { key: String, index: isize, value: String },
3033
}
3134

@@ -68,6 +71,8 @@ impl WriteRequest {
6871
| WriteRequest::LTrim { key, .. } => vec![key],
6972
| WriteRequest::LPushX { key, .. } => vec![key],
7073
| WriteRequest::LSet { key, .. } => vec![key],
74+
| WriteRequest::RPop { key, .. } => vec![key],
75+
| WriteRequest::RPushX { key, .. } => vec![key],
7176
}
7277
}
7378
}

duva/src/presentation/clients/controller.rs

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1-
use super::request::ClientRequest;
21
use crate::config::ENV;
32
use crate::domains::QueryIO;
43
use crate::domains::caches::cache_manager::CacheManager;
54
use crate::domains::caches::cache_objects::{CacheEntry, CacheValue, TypedValue};
6-
use crate::domains::cluster_actors::{ClientMessage, ConsensusClientResponse, ConsensusRequest};
5+
use crate::domains::cluster_actors::{
6+
ClientMessage, ConsensusClientResponse, ConsensusRequest, SessionRequest,
7+
};
8+
use crate::domains::operation_logs::WriteRequest;
79
use crate::domains::saves::actor::SaveTarget;
810
use crate::prelude::PeerIdentifier;
911
use crate::presentation::clients::request::ClientAction;
@@ -224,25 +226,28 @@ impl ClientController {
224226

225227
pub(crate) async fn make_consensus(
226228
&self,
227-
request: ClientRequest,
228-
) -> anyhow::Result<(ClientAction, u64)> {
229+
session_req: SessionRequest,
230+
write_req: WriteRequest,
231+
cli_action: &mut ClientAction,
232+
) -> anyhow::Result<u64> {
229233
let (tx, consensus_res) = tokio::sync::oneshot::channel();
230234

231235
self.cluster_communication_manager
232236
.send(ClientMessage::LeaderReqConsensus(ConsensusRequest::new(
233-
request.action.clone().to_write_request(),
237+
write_req,
234238
tx,
235-
Some(request.session_req),
239+
Some(session_req),
236240
)))
237241
.await?;
238242

239243
match consensus_res.await? {
240244
| ConsensusClientResponse::AlreadyProcessed { key: keys, index } => {
241245
// * Conversion! request has already been processed so we need to convert it to get
242246
let action = ClientAction::MGet { keys };
243-
Ok((action, index))
247+
*cli_action = action;
248+
Ok(index)
244249
},
245-
| ConsensusClientResponse::LogIndex(idx) => Ok((request.action, idx)),
250+
| ConsensusClientResponse::LogIndex(idx) => Ok(idx),
246251
| ConsensusClientResponse::Err(error_msg) => Err(anyhow::anyhow!(error_msg)),
247252
}
248253
}

0 commit comments

Comments
 (0)