Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion duva/src/adapters/wal/local_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,12 @@ mod tests {
let (encoded, _): (WriteOperation, usize) =
bincode::decode_from_slice(&buf[1..], bincode::config::standard()).unwrap();

assert_eq!(encoded.request.key(), vec!["foo"]);
let key = match encoded.request {
WriteRequest::Set { key, .. } => key,
WriteRequest::SetWithExpiry { key, .. } => key,
WriteRequest::Delete { keys: key } => key[0].clone(),
};
assert_eq!(key, "foo");

Ok(())
}
Expand Down
75 changes: 3 additions & 72 deletions duva/src/domains/append_only_files/log.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use crate::{
domains::query_parsers::{QueryIO, deserialize},
write_array,
};
use crate::domains::query_parsers::{QueryIO, deserialize};
use bytes::{Bytes, BytesMut};

#[derive(Debug, Clone, PartialEq, Eq, bincode::Encode, bincode::Decode)]
Expand All @@ -21,35 +18,14 @@ pub enum WriteRequest {
}

impl WriteOperation {
pub fn serialize(self) -> Bytes {
pub(crate) fn serialize(self) -> Bytes {
QueryIO::WriteOperation(self).serialize()
}
}

impl WriteRequest {
pub(crate) fn key(&self) -> Vec<String> {
match self {
WriteRequest::Set { key, .. } => vec![key.clone()],
WriteRequest::SetWithExpiry { key, .. } => vec![key.clone()],
WriteRequest::Delete { keys: key } => key.clone(),
}
}
pub fn to_array(self) -> QueryIO {
match self {
WriteRequest::Set { key, value } => write_array!("SET", key, value),
WriteRequest::SetWithExpiry { key, value, expires_at } => {
write_array!("SET", key, value, "px", expires_at.to_string())
},
WriteRequest::Delete { keys: key } => {
let mut args = vec!["DEL".to_string()];
args.extend(key);
QueryIO::Array(args.into_iter().map(QueryIO::BulkString).collect())
},
}
}

/// Deserialize `WriteOperation`s from the given bytes.
pub fn deserialize(mut bytes: BytesMut) -> anyhow::Result<Vec<WriteOperation>> {
pub(crate) fn deserialize(mut bytes: BytesMut) -> anyhow::Result<Vec<WriteOperation>> {
let mut ops: Vec<WriteOperation> = Vec::new();

while !bytes.is_empty() {
Expand All @@ -61,51 +37,6 @@ impl WriteRequest {
};
ops.push(write_operation);
}

Ok(ops)
}

pub fn new(cmd: String, args: std::vec::IntoIter<QueryIO>) -> anyhow::Result<Self> {
match cmd.as_str() {
"set" => Self::to_set(args),

_ => Err(anyhow::anyhow!("unsupported command")),
}
}

pub fn to_set(mut args: std::vec::IntoIter<QueryIO>) -> anyhow::Result<Self> {
match args.len() {
2 => {
let (Some(QueryIO::BulkString(key)), Some(QueryIO::BulkString(value))) =
(args.next(), args.next())
else {
return Err(anyhow::anyhow!("expected value"));
};

Ok(WriteRequest::Set { key, value })
},

4 => {
let (
Some(QueryIO::BulkString(key)),
Some(QueryIO::BulkString(value)),
Some(QueryIO::BulkString(_)),
Some(QueryIO::BulkString(expiry)),
) = (args.next(), args.next(), args.next(), args.next())
else {
return Err(anyhow::anyhow!("expected value"));
};

Ok(WriteRequest::SetWithExpiry { key, value, expires_at: expiry.parse()? })
},

_ => Err(anyhow::anyhow!("expected 2 or 4 arguments")),
}
}
}

impl From<WriteOperation> for Bytes {
fn from(op: WriteOperation) -> Self {
op.serialize()
}
}
18 changes: 11 additions & 7 deletions duva/src/domains/cluster_actors/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ impl ClusterActor {
);

for peer in self.members.values_mut() {
let _ = peer.write_io(msg.clone()).await;
let _ = peer.send_to_peer(msg.clone()).await;
}
}

Expand Down Expand Up @@ -277,7 +277,7 @@ impl ClusterActor {

// dbg!(self.replicas().count()); // CHECKED. replica count right.
self.generate_follower_entries(append_entries, prev_log_index, prev_term)
.map(|(peer, hb)| peer.write_io(AppendEntriesRPC(hb)))
.map(|(peer, hb)| peer.send_to_peer(AppendEntriesRPC(hb)))
.collect::<FuturesUnordered<_>>()
.for_each(|_| async {})
.await;
Expand Down Expand Up @@ -355,7 +355,11 @@ impl ClusterActor {
) {
if let Some(leader) = self.members.get_mut(send_to) {
let _ = leader
.write_io(ReplicationResponse::new(log_idx, rejection_reason, &self.replication))
.send_to_peer(ReplicationResponse::new(
log_idx,
rejection_reason,
&self.replication,
))
.await;
}
}
Expand Down Expand Up @@ -459,7 +463,7 @@ impl ClusterActor {

async fn send_to_replicas(&mut self, msg: impl Into<QueryIO> + Send + Clone) {
self.replicas_mut()
.map(|(peer, _)| peer.write_io(msg.clone()))
.map(|(peer, _)| peer.send_to_peer(msg.clone()))
.collect::<FuturesUnordered<_>>()
.for_each(|_| async {})
.await;
Expand Down Expand Up @@ -494,7 +498,7 @@ impl ClusterActor {

println!("[INFO] Running for election term {}", self.replication.term);
self.replicas_mut()
.map(|(peer, _)| peer.write_io(request_vote.clone()))
.map(|(peer, _)| peer.send_to_peer(request_vote.clone()))
.collect::<FuturesUnordered<_>>()
.for_each(|_| async {})
.await;
Expand All @@ -513,7 +517,7 @@ impl ClusterActor {
let Some(peer) = self.find_replica_mut(&request_vote.candidate_id) else {
return;
};
let _ = peer.write_io(RequestVoteReply { term, vote_granted: grant_vote }).await;
let _ = peer.send_to_peer(RequestVoteReply { term, vote_granted: grant_vote }).await;
}

pub(crate) async fn tally_vote(&mut self, logger: &ReplicatedLogs<impl TWriteAheadLog>) {
Expand All @@ -525,7 +529,7 @@ impl ClusterActor {
let msg =
self.replication.default_heartbeat(0, logger.last_log_index, logger.last_log_term);
self.replicas_mut()
.map(|(peer, _)| peer.write_io(AppendEntriesRPC(msg.clone())))
.map(|(peer, _)| peer.send_to_peer(AppendEntriesRPC(msg.clone())))
.collect::<FuturesUnordered<_>>()
.for_each(|_| async {})
.await;
Expand Down
7 changes: 5 additions & 2 deletions duva/src/domains/peers/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub(crate) struct Peer {
}

impl Peer {
pub fn new(
pub(crate) fn new(
addr: String,
w: OwnedWriteHalf,
kind: PeerState,
Expand All @@ -34,7 +34,10 @@ impl Peer {
}
}

pub(crate) async fn write_io(&mut self, io: impl Into<QueryIO> + Send) -> Result<(), IoError> {
pub(crate) async fn send_to_peer(
&mut self,
io: impl Into<QueryIO> + Send,
) -> Result<(), IoError> {
self.w_conn.stream.write_io(io).await
}

Expand Down