Skip to content

Commit f6e33f9

Browse files
authored
Merge pull request #890 from Migorithm/feat/client-response2
feat: expressive server response
2 parents cbd0244 + a1f0625 commit f6e33f9

26 files changed

Lines changed: 307 additions & 428 deletions

File tree

duva-client/src/broker/mod.rs

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@ use duva::prelude::tokio::sync::mpsc::Receiver;
1414
use duva::prelude::tokio::sync::mpsc::Sender;
1515
use duva::prelude::uuid::Uuid;
1616
use duva::prelude::{
17-
BinBytes, ConnectionRequest, ConnectionRequests, ConnectionResponse, ConnectionResponses,
17+
ConnectionRequest, ConnectionRequests, ConnectionResponse, ConnectionResponses,
1818
};
1919
use duva::prelude::{PeerIdentifier, tokio};
2020
use duva::prelude::{Topology, anyhow};
21-
use duva::presentation::clients::request::{ClientAction, NonMutatingAction};
21+
use duva::presentation::clients::request::{ClientAction, NonMutatingAction, ServerResponse};
2222
use futures::future::try_join_all;
2323

2424
use duva::prelude::anyhow::anyhow;
@@ -67,18 +67,19 @@ impl Broker {
6767
let mut queue = CommandQueue::default();
6868
while let Some(msg) = self.rx.recv().await {
6969
match msg {
70-
BrokerMessage::FromServer(_, QueryIO::TopologyChange(topology)) => {
70+
BrokerMessage::FromServer(_, ServerResponse::TopologyChange(topology)) => {
7171
self.update_topology(topology).await;
7272
},
7373

74-
BrokerMessage::FromServer(repl_id, query_io) => {
74+
BrokerMessage::FromServer(repl_id, res) => {
7575
let Some(context) = queue.pop() else {
7676
continue;
7777
};
78+
7879
if matches!(context.client_action, ClientAction::Mutating(..)) {
79-
self.update_reqid(repl_id, &query_io);
80+
self.update_reqid(repl_id, &res);
8081
}
81-
queue.finalize_or_requeue(query_io, context);
82+
queue.finalize_or_requeue(res, context);
8283
},
8384

8485
BrokerMessage::FromServerError(repl_id, e) => {
@@ -97,9 +98,10 @@ impl Broker {
9798
context.expected_result_cnt = result_count;
9899
queue.push(context);
99100
} else {
100-
context.callback(QueryIO::Err(BinBytes::new(
101-
"Failed to route command. Try again after ttl time",
102-
)));
101+
context.callback(ServerResponse::Err {
102+
reason: "Failed to route command. Try again after ttl time".to_string(),
103+
request_id: 0,
104+
})
103105
};
104106
},
105107
}
@@ -284,22 +286,26 @@ impl Broker {
284286
}
285287
}
286288

287-
pub(crate) fn update_reqid(&mut self, replid: ReplicationId, res: &QueryIO) {
289+
pub(crate) fn update_reqid(&mut self, replid: ReplicationId, res: &ServerResponse) {
288290
if let Some(connection) = self.node_connections.get_mut(&replid) {
289291
// ! CONSIDER IDEMPOTENCY RULE
290292
// !
291293
// ! If request is updating action yet receive error, we need to increase the request id
292294
// ! otherwise, server will not be able to process the next command
295+
293296
match res {
294-
// * Current rule: s:value-idx:index_num
295-
QueryIO::SimpleString(v) => {
296-
let s = String::from_utf8_lossy(v);
297-
connection.request_id = IndexedValueCodec::decode_index(s)
298-
.filter(|&id| id > connection.request_id)
299-
.unwrap_or(connection.request_id);
297+
ServerResponse::WriteRes { res, .. } | ServerResponse::ReadRes { res, .. } => {
298+
if let QueryIO::BulkString(v) = res {
299+
let s = String::from_utf8_lossy(v);
300+
connection.request_id = IndexedValueCodec::decode_index(s)
301+
.filter(|&id| id > connection.request_id)
302+
.unwrap_or(connection.request_id);
303+
}
300304
},
301-
//TODO replace "self.request_id + 1" - make the call to get "current_index" from the server
302-
QueryIO::Err(_) => connection.request_id += 1,
305+
ServerResponse::Err { .. } => {
306+
connection.request_id += 1;
307+
},
308+
303309
_ => {},
304310
}
305311
} else {
@@ -312,7 +318,7 @@ impl Broker {
312318
}
313319

314320
pub enum BrokerMessage {
315-
FromServer(ReplicationId, QueryIO),
321+
FromServer(ReplicationId, ServerResponse),
316322
FromServerError(ReplicationId, IoError),
317323
ToServer(InputContext),
318324
}

duva-client/src/broker/node_connections.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,19 @@
11
use super::write_stream::MsgToServer;
2-
use duva::domains::query_io::QueryIO::SessionRequest;
2+
use duva::domains::query_io::SERDE_CONFIG;
33
use duva::domains::replications::ReplicationId;
44
use duva::make_smart_pointer;
55
use duva::prelude::PeerIdentifier;
66
use duva::prelude::anyhow;
77
use duva::prelude::anyhow::Context;
8+
use duva::prelude::bincode;
89
use duva::prelude::rand;
910
use duva::prelude::rand::SeedableRng;
1011
use duva::prelude::rand::rngs::StdRng;
1112
use duva::prelude::rand::seq::IteratorRandom;
1213
use duva::prelude::tokio::sync::mpsc;
1314
use duva::prelude::tokio::sync::oneshot;
1415
use duva::presentation::clients::request::ClientAction;
16+
use duva::presentation::clients::request::SessionRequest;
1517
use futures::future::join_all;
1618
use futures::future::try_join_all;
1719
use std::collections::HashMap;
@@ -96,7 +98,7 @@ impl NodeConnection {
9698
pub(crate) async fn send(&self, client_action: ClientAction) -> anyhow::Result<()> {
9799
let session_request = SessionRequest { request_id: self.request_id, action: client_action };
98100
self.writer
99-
.send(MsgToServer::Command(session_request.serialize().to_vec()))
101+
.send(MsgToServer::Command(bincode::encode_to_vec(session_request, SERDE_CONFIG)?))
100102
.await
101103
.context("Failed to send commend")
102104
}

duva-client/src/broker/read_stream.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,7 @@
11
use crate::broker::BrokerMessage;
2+
use duva::domains::TSerdeRead;
23
use duva::domains::replications::ReplicationId;
3-
use duva::{
4-
domains::interface::TRead,
5-
prelude::tokio::{self, net::tcp::OwnedReadHalf, sync::oneshot},
6-
};
4+
use duva::prelude::tokio::{self, net::tcp::OwnedReadHalf, sync::oneshot};
75

86
pub struct ServerStreamReader(pub(crate) OwnedReadHalf);
97
impl ServerStreamReader {
@@ -18,11 +16,11 @@ impl ServerStreamReader {
1816
let controller_sender = controller_sender.clone();
1917

2018
loop {
21-
match self.0.read_values().await {
22-
Ok(query_ios) => {
23-
for query_io in query_ios {
19+
match self.0.deserialized_reads().await {
20+
Ok(server_responses) => {
21+
for res in server_responses {
2422
if controller_sender
25-
.send(BrokerMessage::FromServer(replication_id.clone(), query_io))
23+
.send(BrokerMessage::FromServer(replication_id.clone(), res))
2624
.await
2725
.is_err()
2826
{

duva-client/src/command.rs

Lines changed: 50 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use duva::domains::caches::cache_manager::IndexedValueCodec;
22
use duva::domains::replications::LogEntry;
33
use duva::prelude::BinBytes;
44
use duva::prelude::anyhow::{self, Context};
5-
use duva::presentation::clients::request::NonMutatingAction;
5+
use duva::presentation::clients::request::{NonMutatingAction, ServerResponse};
66
use duva::{
77
domains::query_io::QueryIO, prelude::tokio::sync::oneshot,
88
presentation::clients::request::ClientAction,
@@ -22,16 +22,21 @@ impl CommandQueue {
2222
self.queue.pop_front()
2323
}
2424

25-
pub(crate) fn finalize_or_requeue(&mut self, query_io: QueryIO, mut context: InputContext) {
25+
pub(crate) fn finalize_or_requeue(
26+
&mut self,
27+
query_io: ServerResponse,
28+
mut context: InputContext,
29+
) {
2630
context.results.push(query_io);
2731

2832
if context.results.len() != context.expected_result_cnt {
2933
self.push(context);
3034
return;
3135
}
3236

33-
let result =
34-
context.get_result().unwrap_or_else(|err| QueryIO::Err(BinBytes::new(err.to_string())));
37+
let result = context
38+
.get_result()
39+
.unwrap_or_else(|err| ServerResponse::Err { reason: err.to_string(), request_id: 0 });
3540
context.callback(result);
3641
}
3742
}
@@ -47,19 +52,19 @@ pub fn separate_command_and_args(args: Vec<&str>) -> (&str, Vec<&str>) {
4752
#[derive(Debug)]
4853
pub struct InputContext {
4954
pub(crate) client_action: ClientAction,
50-
pub(crate) callback: oneshot::Sender<(ClientAction, QueryIO)>,
51-
pub(crate) results: Vec<QueryIO>,
55+
pub(crate) callback: oneshot::Sender<(ClientAction, ServerResponse)>,
56+
pub(crate) results: Vec<ServerResponse>,
5257
pub(crate) expected_result_cnt: usize,
5358
}
5459
impl InputContext {
5560
pub fn new(
5661
client_action: ClientAction,
57-
callback: oneshot::Sender<(ClientAction, QueryIO)>,
62+
callback: oneshot::Sender<(ClientAction, ServerResponse)>,
5863
) -> Self {
5964
Self { client_action, callback, results: Vec::new(), expected_result_cnt: 0 }
6065
}
6166

62-
pub(crate) fn callback(self, query_io: QueryIO) {
67+
pub(crate) fn callback(self, query_io: ServerResponse) {
6368
let action_debug = format!("{:?}", self.client_action);
6469
self.callback.send((self.client_action, query_io)).unwrap_or_else(|_| {
6570
// Log callback failure for debugging
@@ -70,51 +75,67 @@ impl InputContext {
7075
});
7176
}
7277

73-
pub(crate) fn get_result(&mut self) -> anyhow::Result<QueryIO> {
78+
pub(crate) fn get_result(&mut self) -> anyhow::Result<ServerResponse> {
7479
use NonMutatingAction::*;
7580
let res = std::mem::take(&mut self.results);
81+
let mut highest_req_id = 0; // TODO for now, set request id to the highest one
82+
let mut iterator = res.into_iter();
7683

7784
match self.client_action {
7885
ClientAction::NonMutating(Keys { pattern: _ } | MGet { keys: _ }) => {
79-
let mut init = QueryIO::Array(Vec::with_capacity(res.len()));
80-
for item in res {
81-
init = init.merge(item)?;
86+
let mut init = QueryIO::Array(Vec::with_capacity(iterator.len()));
87+
88+
while let Some(ServerResponse::ReadRes { res, request_id }) = iterator.next() {
89+
init = init.merge(res)?;
90+
highest_req_id = highest_req_id.max(request_id);
8291
}
83-
Ok(init)
92+
Ok(ServerResponse::ReadRes { res: init, request_id: highest_req_id })
8493
},
94+
8595
ClientAction::NonMutating(Exists { keys: _ }) => {
8696
let mut count = 0;
87-
for result in res {
88-
let QueryIO::SimpleString(byte) = result else {
89-
return Err(anyhow::anyhow!("Expected SimpleString result"));
90-
};
97+
98+
while let Some(ServerResponse::ReadRes {
99+
res: QueryIO::BulkString(byte),
100+
request_id,
101+
}) = iterator.next()
102+
{
91103
let num = String::from_utf8(byte.to_vec())
92104
.context("Failed to convert byte to string")?;
93105
let num = num.parse::<u64>().context("Failed to parse string to u64")?;
94106

95107
count += num;
108+
highest_req_id = highest_req_id.max(request_id);
96109
}
97-
Ok(QueryIO::SimpleString(BinBytes::new(count.to_string())))
110+
111+
Ok(ServerResponse::ReadRes {
112+
res: QueryIO::BulkString(BinBytes::new(count.to_string())),
113+
request_id: highest_req_id,
114+
})
98115
},
99116
ClientAction::Mutating(LogEntry::Delete { keys: _ }) => {
100117
let mut count = 0;
101-
for result in res {
102-
let QueryIO::SimpleString(value) = result else {
103-
return Err(anyhow::anyhow!("Expected SimpleString result"));
104-
};
118+
119+
while let Some(ServerResponse::WriteRes {
120+
res: QueryIO::BulkString(value),
121+
request_id,
122+
..
123+
}) = iterator.next()
124+
{
105125
let decoded_value =
106126
IndexedValueCodec::decode_value(String::from_utf8_lossy(&value)).unwrap();
107127

108128
count += decoded_value;
129+
highest_req_id = highest_req_id.max(request_id);
109130
}
110-
Ok(QueryIO::SimpleString(BinBytes::new(count.to_string())))
111-
},
112-
_ => {
113-
if res.len() != 1 {
114-
return Err(anyhow::anyhow!("Expected exactly one result"));
115-
}
116-
Ok(res[0].clone())
131+
132+
Ok(ServerResponse::WriteRes {
133+
res: QueryIO::BulkString(BinBytes::new(count.to_string())),
134+
log_index: 0, // TODO
135+
request_id: highest_req_id,
136+
})
117137
},
138+
_ => iterator.next().ok_or(anyhow::anyhow!("Expected exactly one result")),
118139
}
119140
}
120141
}

0 commit comments

Comments
 (0)