Skip to content

Commit a1f0625

Browse files
committed
use bulkstring as superset of simplestring, remove simple string
1 parent ce0768b commit a1f0625

9 files changed

Lines changed: 42 additions & 76 deletions

File tree

duva-client/src/broker/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -295,7 +295,7 @@ impl Broker {
295295

296296
match res {
297297
ServerResponse::WriteRes { res, .. } | ServerResponse::ReadRes { res, .. } => {
298-
if let QueryIO::SimpleString(v) = res {
298+
if let QueryIO::BulkString(v) = res {
299299
let s = String::from_utf8_lossy(v);
300300
connection.request_id = IndexedValueCodec::decode_index(s)
301301
.filter(|&id| id > connection.request_id)

duva-client/src/command.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ impl InputContext {
9696
let mut count = 0;
9797

9898
while let Some(ServerResponse::ReadRes {
99-
res: QueryIO::SimpleString(byte),
99+
res: QueryIO::BulkString(byte),
100100
request_id,
101101
}) = iterator.next()
102102
{
@@ -109,15 +109,15 @@ impl InputContext {
109109
}
110110

111111
Ok(ServerResponse::ReadRes {
112-
res: QueryIO::SimpleString(BinBytes::new(count.to_string())),
112+
res: QueryIO::BulkString(BinBytes::new(count.to_string())),
113113
request_id: highest_req_id,
114114
})
115115
},
116116
ClientAction::Mutating(LogEntry::Delete { keys: _ }) => {
117117
let mut count = 0;
118118

119119
while let Some(ServerResponse::WriteRes {
120-
res: QueryIO::SimpleString(value),
120+
res: QueryIO::BulkString(value),
121121
request_id,
122122
..
123123
}) = iterator.next()
@@ -130,7 +130,7 @@ impl InputContext {
130130
}
131131

132132
Ok(ServerResponse::WriteRes {
133-
res: QueryIO::SimpleString(BinBytes::new(count.to_string())),
133+
res: QueryIO::BulkString(BinBytes::new(count.to_string())),
134134
log_index: 0, // TODO
135135
request_id: highest_req_id,
136136
})

duva-client/src/controller.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,12 @@ fn render_return(kind: ClientAction, res: QueryIO) -> Response {
4545
| ClusterInfo,
4646
) => match res {
4747
QueryIO::Null => Response::Null,
48-
QueryIO::SimpleString(value) => Response::String(value.into()),
4948
QueryIO::BulkString(value) => Response::String(value.into()),
49+
5050
_err => Response::FormatError,
5151
},
5252
Mutating(LogEntry::Delete { .. }) | NonMutating(Exists { .. } | LLen { .. }) => {
53-
let QueryIO::SimpleString(value) = res else {
53+
let QueryIO::BulkString(value) = res else {
5454
return Response::FormatError;
5555
};
5656
match str::from_utf8(&value) {
@@ -68,7 +68,7 @@ fn render_return(kind: ClientAction, res: QueryIO) -> Response {
6868
| LogEntry::LPushX { .. }
6969
| LogEntry::RPushX { .. },
7070
) => match res {
71-
QueryIO::SimpleString(value) => {
71+
QueryIO::BulkString(value) => {
7272
let s = String::from_utf8_lossy(&value);
7373
let s: Option<i64> = IndexedValueCodec::decode_value(s);
7474
Response::Integer(s.unwrap().to_string().into())
@@ -84,7 +84,7 @@ fn render_return(kind: ClientAction, res: QueryIO) -> Response {
8484
},
8585
Mutating(LogEntry::Set { .. } | LogEntry::LTrim { .. } | LogEntry::LSet { .. }) => {
8686
match res {
87-
QueryIO::SimpleString(_) => Response::String("OK".into()),
87+
QueryIO::BulkString(_) => Response::String("OK".into()),
8888

8989
_ => Response::FormatError,
9090
}
@@ -95,7 +95,7 @@ fn render_return(kind: ClientAction, res: QueryIO) -> Response {
9595
_ => Response::FormatError,
9696
},
9797
Mutating(LogEntry::Append { .. }) => match res {
98-
QueryIO::SimpleString(value) => {
98+
QueryIO::BulkString(value) => {
9999
let s = String::from_utf8_lossy(&value);
100100
let s: Option<i64> = IndexedValueCodec::decode_value(s);
101101
Response::String(s.unwrap().to_string().into())

duva/src/adapters/io/tokio_stream.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -213,8 +213,20 @@ pub mod test_tokio_stream_impl {
213213
async fn test_read_values() {
214214
let mut buffer = BytesMut::with_capacity(INITIAL_CAPACITY);
215215
// add a simple string to buffer
216-
buffer.extend_from_slice(b"+FULLRESYNC 8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb 0\r\n");
217-
buffer.extend_from_slice(b"+PEERS 127.0.0.1:6378\r\n");
216+
let sync_msg = "FULLRESYNC 8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb 0";
217+
218+
buffer.extend_from_slice(
219+
format!(
220+
"${}\r\nFULLRESYNC 8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb 0\r\n",
221+
sync_msg.len()
222+
)
223+
.as_bytes(),
224+
);
225+
226+
let peer_info_msg = "PEERS 127.0.0.1:6378";
227+
buffer.extend_from_slice(
228+
format!("${}\r\nPEERS 127.0.0.1:6378\r\n", peer_info_msg.len()).as_bytes(),
229+
);
218230
// add an integer to buffer
219231

220232
let mut parsed_values = vec![];
@@ -230,12 +242,12 @@ pub mod test_tokio_stream_impl {
230242
assert_eq!(parsed_values.len(), 2);
231243
assert_eq!(
232244
parsed_values[0],
233-
QueryIO::SimpleString(BinBytes::new(
245+
QueryIO::BulkString(BinBytes::new(
234246
"FULLRESYNC 8371b4fb1155b71f4a04d3e1bc3e18c4a990aeeb 0"
235247
))
236248
);
237249

238-
assert_eq!(parsed_values[1], QueryIO::SimpleString(BinBytes::new("PEERS 127.0.0.1:6378")));
250+
assert_eq!(parsed_values[1], QueryIO::BulkString(BinBytes::new("PEERS 127.0.0.1:6378")));
239251
}
240252

241253
#[tokio::test]

duva/src/domains/peers/connections/inbound/stream.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ impl InboundStream {
4646
let cmd = self.extract_cmd().await?;
4747
cmd.match_query(HandShakeRequestEnum::Ping)?;
4848

49-
self.w.write(QueryIO::SimpleString(BinBytes::new("PONG"))).await?;
49+
self.w.write(QueryIO::BulkString(BinBytes::new("PONG"))).await?;
5050
Ok(())
5151
}
5252

@@ -55,15 +55,15 @@ impl InboundStream {
5555

5656
let port = cmd.extract_listening_port()?;
5757

58-
self.w.write(QueryIO::SimpleString(BinBytes::new("OK"))).await?;
58+
self.w.write(QueryIO::BulkString(BinBytes::new("OK"))).await?;
5959

6060
Ok(port)
6161
}
6262

6363
async fn recv_replconf_capa(&mut self) -> anyhow::Result<Vec<(BinBytes, BinBytes)>> {
6464
let cmd = self.extract_cmd().await?;
6565
let capa_val_vec = cmd.extract_capa()?;
66-
self.w.write(QueryIO::SimpleString(BinBytes::new("OK"))).await?;
66+
self.w.write(QueryIO::BulkString(BinBytes::new("OK"))).await?;
6767
Ok(capa_val_vec)
6868
}
6969
async fn recv_psync(&mut self) -> anyhow::Result<(ReplicationId, u64, ReplicationRole, u64)> {
@@ -81,7 +81,7 @@ impl InboundStream {
8181
);
8282

8383
self.w
84-
.write(QueryIO::SimpleString(BinBytes::new(format!(
84+
.write(QueryIO::BulkString(BinBytes::new(format!(
8585
"FULLRESYNC {id} {self_replid} {self_repl_offset} {self_role} {term}"
8686
))))
8787
.await?;
@@ -98,7 +98,7 @@ impl InboundStream {
9898
let Some(query) = query_io.pop() else {
9999
return Err(anyhow::anyhow!("No query found"));
100100
};
101-
let QueryIO::SimpleString(val) = query else {
101+
let QueryIO::BulkString(val) = query else {
102102
return Err(anyhow::anyhow!("Invalid query"));
103103
};
104104
if val.as_ref() != b"ok" {

duva/src/domains/peers/connections/outbound/response.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ impl TryFrom<QueryIO> for ConnectionResponse {
4646
type Error = anyhow::Error;
4747
fn try_from(value: QueryIO) -> Result<Self, Self::Error> {
4848
match value {
49-
QueryIO::SimpleString(value) => Ok(String::from_utf8(value.0.into())?.try_into()?),
49+
QueryIO::BulkString(value) => Ok(String::from_utf8(value.0.into())?.try_into()?),
5050
_ => {
5151
eprintln!("Invalid command");
5252
Err(anyhow::anyhow!("Invalid command"))

duva/src/domains/peers/connections/outbound/stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ impl OutboundStream {
7272
}
7373

7474
async fn reply_with_ok(&mut self) -> anyhow::Result<()> {
75-
self.w.write(QueryIO::SimpleString(BinBytes::new("ok"))).await?;
75+
self.w.write(QueryIO::BulkString(BinBytes::new("ok"))).await?;
7676
Ok(())
7777
}
7878

duva/src/domains/query_io.rs

Lines changed: 3 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ use bytes::{Bytes, BytesMut};
1212

1313
// ! CURRENTLY, only ascii unicode(0-127) is supported
1414
const FILE_PREFIX: char = '\u{0066}';
15-
const SIMPLE_STRING_PREFIX: char = '+';
1615
const BULK_STRING_PREFIX: char = '$';
1716
const ARRAY_PREFIX: char = '*';
1817
const APPEND_ENTRY_RPC_PREFIX: char = '^';
@@ -40,7 +39,6 @@ macro_rules! write_array {
4039
pub enum QueryIO {
4140
#[default]
4241
Null,
43-
SimpleString(BinBytes),
4442
BulkString(BinBytes),
4543
Array(Vec<QueryIO>),
4644

@@ -63,14 +61,7 @@ impl QueryIO {
6361
pub fn serialize(self) -> BinBytes {
6462
match self {
6563
QueryIO::Null => BinBytes(NULL_PREFIX.to_string().into()),
66-
QueryIO::SimpleString(s) => {
67-
let mut buffer =
68-
BytesMut::with_capacity(SIMPLE_STRING_PREFIX.len_utf8() + s.len() + 2);
69-
buffer.extend_from_slice(&[SIMPLE_STRING_PREFIX as u8]);
70-
buffer.extend_from_slice(&s);
71-
buffer.extend_from_slice(b"\r\n");
72-
buffer.freeze().into()
73-
},
64+
7465
QueryIO::BulkString(s) => {
7566
let mut byte_mut = BytesMut::with_capacity(1 + 1 + s.len() + 4);
7667
byte_mut.extend_from_slice(BULK_STRING_PREFIX.encode_utf8(&mut [0; 4]).as_bytes());
@@ -202,7 +193,7 @@ impl QueryIO {
202193
}
203194

204195
pub(crate) fn convert_str_res(res: &str, index: u64) -> Self {
205-
QueryIO::SimpleString(BinBytes::new(IndexedValueCodec::encode(res, index)))
196+
QueryIO::BulkString(BinBytes::new(IndexedValueCodec::encode(res, index)))
206197
}
207198

208199
pub(crate) fn convert_str_vec_res(values: Vec<String>, index: u64) -> Self {
@@ -220,7 +211,7 @@ pub(crate) fn serialized_len_with_bincode<T: bincode::Encode>(prefix: char, arg:
220211
fn estimate_serialized_size(query: &QueryIO) -> usize {
221212
match query {
222213
QueryIO::Null => 1,
223-
QueryIO::SimpleString(s) => 1 + s.len() + 2,
214+
224215
QueryIO::BulkString(s) => 1 + s.len().to_string().len() + 2 + s.len() + 2,
225216
QueryIO::Array(array) => {
226217
let header = 1 + array.len().to_string().len() + 2;
@@ -286,10 +277,6 @@ pub fn deserialize(buffer: impl Into<Bytes>) -> Result<(QueryIO, usize)> {
286277

287278
fn deserialize_by_prefix(buffer: Bytes, prefix: char) -> Result<(QueryIO, usize)> {
288279
match prefix {
289-
SIMPLE_STRING_PREFIX => {
290-
let (bytes, len) = parse_simple_string(buffer)?;
291-
Ok((QueryIO::SimpleString(BinBytes(bytes)), len))
292-
},
293280
ARRAY_PREFIX => parse_array(buffer),
294281

295282
BULK_STRING_PREFIX => {
@@ -323,13 +310,6 @@ fn deserialize_by_prefix(buffer: Bytes, prefix: char) -> Result<(QueryIO, usize)
323310
}
324311
}
325312

326-
// +PING\r\n
327-
pub(crate) fn parse_simple_string(buffer: Bytes) -> Result<(Bytes, usize)> {
328-
let (line, len) = read_until_crlf_exclusive(&buffer.slice(1..))
329-
.ok_or(anyhow::anyhow!("Invalid simple string"))?;
330-
Ok((line.into(), len + 1))
331-
}
332-
333313
fn parse_array(buffer: Bytes) -> Result<(QueryIO, usize)> {
334314
// Skip the array type indicator (first byte)
335315
let mut offset = 1;
@@ -523,32 +503,6 @@ mod test {
523503
use chrono::DateTime;
524504
use uuid::Uuid;
525505

526-
#[test]
527-
fn test_deserialize_simple_string() {
528-
// GIVEN
529-
let buffer = Bytes::from("+OK\r\n");
530-
531-
// WHEN
532-
let (value, len) = parse_simple_string(buffer).unwrap();
533-
534-
// THEN
535-
assert_eq!(len, 5);
536-
assert_eq!(value, Bytes::from("OK"));
537-
}
538-
539-
#[test]
540-
fn test_deserialize_simple_string_ping() {
541-
// GIVEN
542-
let buffer = Bytes::from("+PING\r\n");
543-
544-
// WHEN
545-
let (value, len) = deserialize(buffer).unwrap();
546-
547-
// THEN
548-
assert_eq!(len, 7);
549-
assert_eq!(value, QueryIO::SimpleString(BinBytes(Bytes::from("PING"))));
550-
}
551-
552506
#[test]
553507
fn test_deserialize_bulk_string() {
554508
// GIVEN

duva/src/presentation/clients/controller.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ impl ClientController {
2727
use NonMutatingAction::*;
2828

2929
let response = match non_mutating {
30-
Ping => QueryIO::SimpleString(BinBytes::new("PONG")),
30+
Ping => QueryIO::BulkString(BinBytes::new("PONG")),
3131
Echo(val) => QueryIO::BulkString(BinBytes::new(val)),
3232

3333
Save => {
@@ -80,7 +80,7 @@ impl ClientController {
8080
}
8181
},
8282

83-
Exists { keys } => QueryIO::SimpleString(BinBytes::new(
83+
Exists { keys } => QueryIO::BulkString(BinBytes::new(
8484
self.cache_manager.route_exists(keys).await?.to_string(),
8585
)),
8686
Info => QueryIO::BulkString(BinBytes::new(
@@ -97,7 +97,7 @@ impl ClientController {
9797
.into(),
9898
ClusterForget(peer_identifier) => {
9999
match self.cluster_actor_sender.route_forget_peer(peer_identifier).await {
100-
Ok(true) => QueryIO::SimpleString(BinBytes::new("OK")),
100+
Ok(true) => QueryIO::BulkString(BinBytes::new("OK")),
101101
Ok(false) => {
102102
return Err(anyhow::anyhow!("No such peer"));
103103
},
@@ -112,16 +112,16 @@ impl ClientController {
112112
ClusterReshard => self.cluster_actor_sender.route_cluster_reshard().await?.into(),
113113
ReplicaOf(peer_identifier) => {
114114
self.cluster_actor_sender.route_replicaof(peer_identifier.clone()).await?;
115-
QueryIO::SimpleString(BinBytes::new("OK"))
115+
QueryIO::BulkString(BinBytes::new("OK"))
116116
},
117117
Role => self.cluster_actor_sender.route_get_roles().await?.into(),
118118
Ttl { key } => {
119-
QueryIO::SimpleString(BinBytes::new(self.cache_manager.route_ttl(key).await?))
119+
QueryIO::BulkString(BinBytes::new(self.cache_manager.route_ttl(key).await?))
120120
},
121121

122122
LLen { key } => {
123123
let len = self.cache_manager.route_llen(key).await?;
124-
QueryIO::SimpleString(BinBytes::new(len.to_string()))
124+
QueryIO::BulkString(BinBytes::new(len.to_string()))
125125
},
126126
LRange { key, start, end } => {
127127
let values = self.cache_manager.route_lrange(key, start, end).await?;

0 commit comments

Comments
 (0)