Skip to content

Commit 13e4f0e

Browse files
committed
add bincode io
1 parent 72080bb commit 13e4f0e

18 files changed

Lines changed: 172 additions & 236 deletions

File tree

duva-client/src/broker/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use duva::prelude::tokio::sync::mpsc::Receiver;
1414
use duva::prelude::tokio::sync::mpsc::Sender;
1515
use duva::prelude::uuid::Uuid;
1616
use duva::prelude::{
17-
ConnectionRequest, ConnectionRequests, ConnectionResponse, ConnectionResponses,
17+
BinBytes, ConnectionRequest, ConnectionRequests, ConnectionResponse, ConnectionResponses,
1818
};
1919
use duva::prelude::{PeerIdentifier, tokio};
2020
use duva::prelude::{Topology, anyhow};
@@ -97,9 +97,9 @@ impl Broker {
9797
context.expected_result_cnt = result_count;
9898
queue.push(context);
9999
} else {
100-
context.callback(QueryIO::Err(
101-
"Failed to route command. Try again after ttl time".into(),
102-
));
100+
context.callback(QueryIO::Err(BinBytes::new(
101+
"Failed to route command. Try again after ttl time",
102+
)));
103103
};
104104
},
105105
}

duva-client/src/command.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use duva::domains::caches::cache_manager::IndexedValueCodec;
22
use duva::domains::replications::LogEntry;
3+
use duva::prelude::BinBytes;
34
use duva::prelude::anyhow::{self, Context};
45
use duva::presentation::clients::request::NonMutatingAction;
56
use duva::{
@@ -30,7 +31,7 @@ impl CommandQueue {
3031
}
3132

3233
let result =
33-
context.get_result().unwrap_or_else(|err| QueryIO::Err(err.to_string().into()));
34+
context.get_result().unwrap_or_else(|err| QueryIO::Err(BinBytes::new(err.to_string())));
3435
context.callback(result);
3536
}
3637
}
@@ -93,7 +94,7 @@ impl InputContext {
9394

9495
count += num;
9596
}
96-
Ok(QueryIO::SimpleString(count.to_string().into()))
97+
Ok(QueryIO::SimpleString(BinBytes::new(count.to_string())))
9798
},
9899
ClientAction::Mutating(LogEntry::Delete { keys: _ }) => {
99100
let mut count = 0;
@@ -106,7 +107,7 @@ impl InputContext {
106107

107108
count += decoded_value;
108109
}
109-
Ok(QueryIO::SimpleString(count.to_string().into()))
110+
Ok(QueryIO::SimpleString(BinBytes::new(count.to_string())))
110111
},
111112
_ => {
112113
if res.len() != 1 {

duva-client/src/controller.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,14 +46,14 @@ fn render_return(kind: ClientAction, query_io: QueryIO) -> Response {
4646
| ClusterInfo,
4747
) => match query_io {
4848
QueryIO::Null => Response::Null,
49-
QueryIO::SimpleString(value) => Response::String(value),
50-
QueryIO::BulkString(value) => Response::String(value),
51-
QueryIO::Err(value) => Response::Error(value),
49+
QueryIO::SimpleString(value) => Response::String(value.into()),
50+
QueryIO::BulkString(value) => Response::String(value.into()),
51+
QueryIO::Err(value) => Response::Error(value.into()),
5252
_err => Response::FormatError,
5353
},
5454
Mutating(LogEntry::Delete { .. }) | NonMutating(Exists { .. } | LLen { .. }) => {
5555
if let QueryIO::Err(value) = query_io {
56-
return Response::Error(value);
56+
return Response::Error(value.into());
5757
}
5858

5959
let QueryIO::SimpleString(value) = query_io else {
@@ -79,7 +79,7 @@ fn render_return(kind: ClientAction, query_io: QueryIO) -> Response {
7979
let s: Option<i64> = IndexedValueCodec::decode_value(s);
8080
Response::Integer(s.unwrap().to_string().into())
8181
},
82-
QueryIO::Err(value) => Response::Error(value),
82+
QueryIO::Err(value) => Response::Error(value.into()),
8383

8484
_ => Response::FormatError,
8585
},
@@ -92,13 +92,13 @@ fn render_return(kind: ClientAction, query_io: QueryIO) -> Response {
9292
Mutating(LogEntry::Set { .. } | LogEntry::LTrim { .. } | LogEntry::LSet { .. }) => {
9393
match query_io {
9494
QueryIO::SimpleString(_) => Response::String("OK".into()),
95-
QueryIO::Err(value) => Response::Error(value),
95+
QueryIO::Err(value) => Response::Error(value.into()),
9696
_ => Response::FormatError,
9797
}
9898
},
9999
NonMutating(ClusterMeet { .. } | ClusterReshard) => match query_io {
100100
QueryIO::Null => Response::String("OK".into()),
101-
QueryIO::Err(value) => Response::Error(value),
101+
QueryIO::Err(value) => Response::Error(value.into()),
102102
_ => Response::FormatError,
103103
},
104104
Mutating(LogEntry::Append { .. }) => match query_io {
@@ -107,7 +107,7 @@ fn render_return(kind: ClientAction, query_io: QueryIO) -> Response {
107107
let s: Option<i64> = IndexedValueCodec::decode_value(s);
108108
Response::String(s.unwrap().to_string().into())
109109
},
110-
QueryIO::Err(value) => Response::Error(value),
110+
QueryIO::Err(value) => Response::Error(value.into()),
111111
_ => Response::FormatError,
112112
},
113113
Mutating(LogEntry::LPop { .. } | LogEntry::RPop { .. })
@@ -137,11 +137,11 @@ fn render_return(kind: ClientAction, query_io: QueryIO) -> Response {
137137
let QueryIO::BulkString(value) = item else {
138138
return Response::FormatError;
139139
};
140-
nodes.push(Response::String(value));
140+
nodes.push(Response::String(value.into()));
141141
}
142142
Response::Array(nodes)
143143
},
144-
QueryIO::Err(value) => Response::Error(value),
144+
QueryIO::Err(value) => Response::Error(value.into()),
145145
_ => Response::FormatError,
146146
},
147147

duva/Cargo.toml

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,3 @@ lzf = { version = "1.0.0" }
3838
[dev-dependencies]
3939
tempfile = "3.19.1"
4040
criterion = "0.7.0"
41-
42-
[[bench]]
43-
name = "benchmark"
44-
harness = false

duva/benches/benchmark.rs

Lines changed: 0 additions & 13 deletions
This file was deleted.

duva/benches/benchmark_query.rs

Lines changed: 0 additions & 74 deletions
This file was deleted.

duva/src/adapters/io/tokio_stream.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,8 @@ impl From<ErrorKind> for IoError {
148148

149149
#[cfg(test)]
150150
pub mod test_tokio_stream_impl {
151+
use crate::types::BinBytes;
152+
151153
use super::*;
152154
#[derive(Debug, PartialEq, bincode::Encode, bincode::Decode)]
153155
struct TestMessage {
@@ -228,10 +230,12 @@ pub mod test_tokio_stream_impl {
228230
assert_eq!(parsed_values.len(), 2);
229231
assert_eq!(
230232
parsed_values[0],
231-
QueryIO::SimpleString("FULLRESYNC 8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb 0".into())
233+
QueryIO::SimpleString(BinBytes::new(
234+
"FULLRESYNC 8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb 0"
235+
))
232236
);
233237

234-
assert_eq!(parsed_values[1], QueryIO::SimpleString("PEERS 127.0.0.1:6378".into()));
238+
assert_eq!(parsed_values[1], QueryIO::SimpleString(BinBytes::new("PEERS 127.0.0.1:6378")));
235239
}
236240

237241
#[tokio::test]

duva/src/domains/caches/cache_objects/value.rs

Lines changed: 2 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,9 @@
1-
use bincode::{
2-
BorrowDecode, Decode,
3-
de::Decoder,
4-
enc::Encoder,
5-
error::{DecodeError, EncodeError},
6-
};
71
use bytes::Bytes;
82
use chrono::{DateTime, Utc};
93

104
use crate::{
115
domains::caches::cache_objects::{THasExpiry, types::quicklist::QuickList},
12-
from_to, make_smart_pointer,
6+
types::BinBytes,
137
};
148

159
#[derive(Debug, PartialEq, Eq, Clone, Default, bincode::Encode, bincode::Decode)]
@@ -49,35 +43,10 @@ impl CacheValue {
4943
pub enum TypedValue {
5044
#[default]
5145
Null,
52-
String(BytesWrapper),
46+
String(BinBytes),
5347
List(QuickList),
5448
}
5549

56-
#[derive(Debug, PartialEq, Eq, Clone, Default)]
57-
pub(crate) struct BytesWrapper(Bytes); // To minimize bincode implementation boilerplate as Bytes does not implement bincode traits
58-
make_smart_pointer!(BytesWrapper, Bytes);
59-
from_to!(Bytes, BytesWrapper);
60-
61-
impl bincode::Encode for BytesWrapper {
62-
fn encode<E: Encoder>(&self, encoder: &mut E) -> Result<(), EncodeError> {
63-
self.0.as_ref().encode(encoder)
64-
}
65-
}
66-
impl<Ctx> Decode<Ctx> for BytesWrapper {
67-
fn decode<D: Decoder>(decoder: &mut D) -> Result<Self, DecodeError> {
68-
let vec: Vec<u8> = Decode::decode(decoder)?;
69-
Ok(BytesWrapper(Bytes::from(vec)))
70-
}
71-
}
72-
impl<'de, Ctx> BorrowDecode<'de, Ctx> for BytesWrapper {
73-
fn borrow_decode<D: bincode::de::BorrowDecoder<'de>>(
74-
decoder: &mut D,
75-
) -> Result<Self, DecodeError> {
76-
let slice: &'de [u8] = BorrowDecode::borrow_decode(decoder)?;
77-
Ok(BytesWrapper(Bytes::copy_from_slice(slice)))
78-
}
79-
}
80-
8150
pub const WRONG_TYPE_ERR_MSG: &str =
8251
"WRONGTYPE Operation against a key holding the wrong kind of value";
8352

duva/src/domains/peers/connections/inbound/request.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::domains::replications::*;
2+
use crate::types::BinBytes;
23
use crate::{domains::QueryIO, err, from_to, make_smart_pointer};
34
use anyhow::Context;
4-
use bytes::Bytes;
55

66
pub(crate) struct HandShakeRequest {
77
pub(crate) command: HandShakeRequestEnum,
@@ -58,14 +58,14 @@ impl HandShakeRequest {
5858
}
5959
}
6060

61-
pub(crate) fn extract_capa(&self) -> anyhow::Result<Vec<(Bytes, Bytes)>> {
61+
pub(crate) fn extract_capa(&self) -> anyhow::Result<Vec<(BinBytes, BinBytes)>> {
6262
self.match_query(HandShakeRequestEnum::ReplConf)?;
6363
if self.args.is_empty() || !self.args.len().is_multiple_of(2) {
6464
return Err(anyhow::anyhow!("Invalid number of arguments"));
6565
}
6666

6767
// Process pairs directly using chunks_exact
68-
let capabilities: Vec<(Bytes, Bytes)> = self
68+
let capabilities: Vec<(BinBytes, BinBytes)> = self
6969
.args
7070
.chunks_exact(2)
7171
.filter_map(|chunk| match (&chunk[0], &chunk[1]) {
@@ -79,7 +79,7 @@ impl HandShakeRequest {
7979
.collect();
8080

8181
// Validate last capability is psync2
82-
if capabilities.last().context("No capabilities given")?.1 != "psync2" {
82+
if *capabilities.last().context("No capabilities given")?.1 != "psync2" {
8383
return Err(anyhow::anyhow!("psync2 must be given as the last capability"));
8484
}
8585
Ok(capabilities)

duva/src/domains/peers/connections/inbound/stream.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::domains::peers::identifier::PeerIdentifier;
1212

1313
use crate::domains::peers::peer::Peer;
1414
use crate::domains::peers::service::PeerListener;
15-
use bytes::Bytes;
15+
use crate::types::BinBytes;
1616

1717
// The following is used only when the node is in leader mode
1818
#[derive(Debug)]
@@ -46,7 +46,7 @@ impl InboundStream {
4646
let cmd = self.extract_cmd().await?;
4747
cmd.match_query(HandShakeRequestEnum::Ping)?;
4848

49-
self.w.write(QueryIO::SimpleString("PONG".into())).await?;
49+
self.w.write(QueryIO::SimpleString(BinBytes::new("PONG"))).await?;
5050
Ok(())
5151
}
5252

@@ -55,15 +55,15 @@ impl InboundStream {
5555

5656
let port = cmd.extract_listening_port()?;
5757

58-
self.w.write(QueryIO::SimpleString("OK".into())).await?;
58+
self.w.write(QueryIO::SimpleString(BinBytes::new("OK"))).await?;
5959

6060
Ok(port)
6161
}
6262

63-
async fn recv_replconf_capa(&mut self) -> anyhow::Result<Vec<(Bytes, Bytes)>> {
63+
async fn recv_replconf_capa(&mut self) -> anyhow::Result<Vec<(BinBytes, BinBytes)>> {
6464
let cmd = self.extract_cmd().await?;
6565
let capa_val_vec = cmd.extract_capa()?;
66-
self.w.write(QueryIO::SimpleString("OK".into())).await?;
66+
self.w.write(QueryIO::SimpleString(BinBytes::new("OK"))).await?;
6767
Ok(capa_val_vec)
6868
}
6969
async fn recv_psync(&mut self) -> anyhow::Result<(ReplicationId, u64, ReplicationRole, u64)> {
@@ -81,10 +81,9 @@ impl InboundStream {
8181
);
8282

8383
self.w
84-
.write(QueryIO::SimpleString(
85-
format!("FULLRESYNC {id} {self_replid} {self_repl_offset} {self_role} {term}")
86-
.into(),
87-
))
84+
.write(QueryIO::SimpleString(BinBytes::new(format!(
85+
"FULLRESYNC {id} {self_replid} {self_repl_offset} {self_role} {term}"
86+
))))
8887
.await?;
8988
self.recv_ok().await?;
9089
Ok((inbound_repl_id, offset, role, term))

0 commit comments

Comments
 (0)