Skip to content

Commit 8e1499f

Browse files
authored
Merge pull request #892 from Migorithm/refactor/seperate-out-peer-message
feat: seperate out peer message from query io
2 parents f6e33f9 + c9130d6 commit 8e1499f

20 files changed

Lines changed: 215 additions & 629 deletions

File tree

duva-client/src/broker/mod.rs

Lines changed: 22 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use duva::prelude::{Topology, anyhow};
2121
use duva::presentation::clients::request::{ClientAction, NonMutatingAction, ServerResponse};
2222
use futures::future::try_join_all;
2323

24-
use duva::prelude::anyhow::anyhow;
24+
use duva::prelude::anyhow::bail;
2525
use node_connections::NodeConnection;
2626
use read_stream::ServerStreamReader;
2727
use tracing::error;
@@ -37,8 +37,10 @@ pub struct Broker {
3737

3838
impl Broker {
3939
pub(crate) async fn new(server_addr: &PeerIdentifier) -> anyhow::Result<Self> {
40-
let (r, w, auth_response) = Broker::authenticate(&server_addr.clone(), None).await?;
40+
let stream =
41+
TcpStream::connect(server_addr.as_str()).await.map_err(|_| IoError::NotConnected)?;
4142

43+
let (r, w, auth_response) = Broker::authenticate(stream, Default::default()).await?;
4244
let (broker_tx, rx) = tokio::sync::mpsc::channel::<BrokerMessage>(2000);
4345

4446
let seed_replid = auth_response.replication_id;
@@ -109,21 +111,17 @@ impl Broker {
109111
}
110112

111113
pub(crate) async fn authenticate(
112-
server_addr: &PeerIdentifier,
113-
auth_request: Option<ConnectionRequest>,
114-
) -> Result<(ServerStreamReader, ServerStreamWriter, ConnectionResponse), IoError> {
115-
let mut stream =
116-
TcpStream::connect(server_addr.as_str()).await.map_err(|_| IoError::NotConnected)?;
117-
let request = auth_request.unwrap_or_default();
118-
let connection_req = ConnectionRequests::Authenticate(request);
119-
stream.serialized_write(connection_req).await?; // client_id not exist
120-
let connection_res: ConnectionResponses = stream.deserialized_read().await?;
121-
let auth_response = match connection_res {
122-
ConnectionResponses::Authenticated(response) => response,
123-
_ => return Err(IoError::Custom("Authentication failed".into())),
114+
mut stream: TcpStream,
115+
conn_req: ConnectionRequest,
116+
) -> anyhow::Result<(ServerStreamReader, ServerStreamWriter, ConnectionResponse)> {
117+
stream.serialized_write(ConnectionRequests::Authenticate(conn_req)).await?; // client_id not exist
118+
119+
let ConnectionResponses::Authenticated(response) = stream.deserialized_read().await? else {
120+
bail!("Authentication failed");
124121
};
122+
125123
let (r, w) = stream.into_split();
126-
Ok((ServerStreamReader(r), ServerStreamWriter(w), auth_response))
124+
Ok((ServerStreamReader(r), ServerStreamWriter(w), response))
127125
}
128126

129127
// pull-based leader discovery
@@ -161,12 +159,12 @@ impl Broker {
161159
}
162160

163161
async fn discover_leader_from(&mut self, follower: PeerIdentifier) -> anyhow::Result<()> {
164-
let mut stream =
165-
TcpStream::connect(follower.as_str()).await.map_err(|_| IoError::NotConnected)?;
162+
let mut stream = TcpStream::connect(follower.as_str()).await?;
166163
stream.serialized_write(ConnectionRequests::Discovery).await?;
167164
let ConnectionResponses::Discovery { leader_id } = stream.deserialized_read().await? else {
168-
return Err(anyhow!("Discovery failed"));
165+
bail!("Discovery failed!");
169166
};
167+
170168
self.add_node_connection(leader_id.clone()).await
171169
}
172170

@@ -190,14 +188,14 @@ impl Broker {
190188
let auth_req =
191189
ConnectionRequest { client_id: Some(self.client_id.to_string()), request_id: 0 };
192190

193-
let Ok((server_stream_reader, server_stream_writer, auth_response)) =
194-
Self::authenticate(&peer_id, Some(auth_req)).await
195-
else {
196-
return Err(anyhow::anyhow!("Authentication failed!"));
197-
};
191+
let stream =
192+
TcpStream::connect(peer_id.as_str()).await.map_err(|_| IoError::NotConnected)?;
193+
194+
let (server_stream_reader, server_stream_writer, auth_response) =
195+
Self::authenticate(stream, auth_req).await?;
198196

199197
if !auth_response.is_leader_node {
200-
return Err(anyhow::anyhow!("Only Leader connection is allowed!"));
198+
bail!("Only Leader connection is allowed!");
201199
}
202200

203201
self.node_connections.insert(

duva/src/adapters/io/tokio_stream.rs

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,11 @@
11
use crate::domains::interface::{TRead, TWrite};
22
use crate::domains::peers::connections::connection_types::{ReadConnected, WriteConnected};
33
use crate::domains::query_io::SERDE_CONFIG;
4-
use crate::domains::{IoError, TAsyncReadWrite, TSerdeRead, TSerdeWrite};
4+
use crate::domains::replications::messages::PeerMessage;
5+
use crate::domains::{
6+
IoError, TAsyncReadWrite, TReadBytes, TSerdeDynamicRead, TSerdeDynamicWrite, TSerdeRead,
7+
TSerdeWrite,
8+
};
59
use crate::domains::{QueryIO, deserialize};
610
use bytes::BytesMut;
711
use std::fmt::Debug;
@@ -20,7 +24,7 @@ impl<T: AsyncWriteExt + std::marker::Unpin + Sync + Send + Debug + 'static> TWri
2024
}
2125

2226
#[async_trait::async_trait]
23-
impl<T: AsyncReadExt + std::marker::Unpin + Sync + Send + Debug + 'static> TRead for T {
27+
impl<T: AsyncReadExt + std::marker::Unpin + Sync + Send + Debug + 'static> TReadBytes for T {
2428
// TCP doesn't inherently delimit messages.
2529
// The data arrives in a continuous stream of bytes. And
2630
// we might not receive all the data in one go.
@@ -52,7 +56,10 @@ impl<T: AsyncReadExt + std::marker::Unpin + Sync + Send + Debug + 'static> TRead
5256
}
5357
Ok(())
5458
}
59+
}
5560

61+
#[async_trait::async_trait]
62+
impl<T: AsyncReadExt + std::marker::Unpin + Sync + Send + Debug + 'static> TRead for T {
5663
async fn read_values(&mut self) -> Result<Vec<QueryIO>, IoError> {
5764
let mut buffer = BytesMut::with_capacity(INITIAL_CAPACITY);
5865
self.read_bytes(&mut buffer).await?;
@@ -75,6 +82,22 @@ impl<T: AsyncReadExt + std::marker::Unpin + Sync + Send + Debug + 'static> TRead
7582
}
7683
}
7784

85+
#[async_trait::async_trait]
86+
impl<T: AsyncReadExt + std::marker::Unpin + Sync + Send + Debug + 'static> TSerdeDynamicRead for T {
87+
async fn receive_peer_msgs(&mut self) -> Result<Vec<PeerMessage>, IoError> {
88+
let mut buffer = BytesMut::with_capacity(INITIAL_CAPACITY);
89+
self.read_bytes(&mut buffer).await?;
90+
let mut parsed_values = Vec::new();
91+
while !buffer.is_empty() {
92+
let (request, size) = bincode::decode_from_slice(&buffer, SERDE_CONFIG)
93+
.map_err(|e| IoError::Custom(e.to_string()))?;
94+
parsed_values.push(request);
95+
buffer = buffer.split_off(size);
96+
}
97+
Ok(parsed_values)
98+
}
99+
}
100+
78101
impl<T: AsyncWriteExt + std::marker::Unpin + Sync + Send + Debug + 'static> TSerdeWrite for T {
79102
async fn serialized_write(&mut self, buf: impl bincode::Encode + Send) -> Result<(), IoError> {
80103
let encoded = bincode::encode_to_vec(buf, SERDE_CONFIG)
@@ -83,6 +106,17 @@ impl<T: AsyncWriteExt + std::marker::Unpin + Sync + Send + Debug + 'static> TSer
83106
}
84107
}
85108

109+
#[async_trait::async_trait]
110+
impl<T: AsyncWriteExt + std::marker::Unpin + Sync + Send + Debug + 'static> TSerdeDynamicWrite
111+
for T
112+
{
113+
async fn send(&mut self, msg: PeerMessage) -> Result<(), IoError> {
114+
let encoded = bincode::encode_to_vec(msg, SERDE_CONFIG)
115+
.map_err(|e| IoError::Custom(e.to_string()))?;
116+
self.write_all(&encoded).await.map_err(|e| io_error_from_kind(e.kind()))
117+
}
118+
}
119+
86120
impl<T: AsyncReadExt + std::marker::Unpin + Sync + Send + Debug + 'static> TSerdeRead for T {
87121
async fn deserialized_read<U>(&mut self) -> Result<U, IoError>
88122
where

duva/src/adapters/op_logs/disk_based.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1+
use crate::domains::query_io::SERDE_CONFIG;
12
use crate::domains::query_io::serialized_len_with_bincode;
2-
use crate::domains::query_io::{SERDE_CONFIG, WRITE_OP_PREFIX};
33
use crate::domains::replications::LogEntry;
44
use crate::domains::replications::WriteOperation;
55
use crate::domains::replications::interfaces::TWriteAheadLog;
@@ -13,6 +13,7 @@ use std::io::{ErrorKind, Read};
1313
use std::path::{Path, PathBuf};
1414

1515
const SEGMENT_SIZE: usize = 1024 * 1024; // 1MB per segment
16+
const WRITE_OP_PREFIX: char = '#';
1617

1718
/// A local write-ahead-log (WAL) file (op_logs) implementation using segmented logs.
1819
pub struct FileOpLogs {
@@ -587,7 +588,7 @@ mod tests {
587588
file.read_to_end(&mut buf).unwrap();
588589

589590
let (encoded, _): (WriteOperation, usize) =
590-
bincode::decode_from_slice(&buf[1..], bincode::config::standard()).unwrap();
591+
bincode::decode_from_slice(&buf[1..], SERDE_CONFIG).unwrap();
591592

592593
assert_eq!(encoded.entry, request);
593594
}

duva/src/domains/caches/cache_objects/entry.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ impl CacheEntry {
7070

7171
#[cfg(test)]
7272
mod tests {
73+
use crate::domains::query_io::SERDE_CONFIG;
74+
7375
use super::*;
7476
use bincode::{decode_from_slice, encode_to_vec};
7577
use chrono::{DateTime, Utc};
@@ -82,10 +84,10 @@ mod tests {
8284
let original_value = CacheValue::new("test_value").with_expiry(expiry_time);
8385

8486
// WHEN
85-
let encoded = encode_to_vec(&original_value, bincode::config::standard()).unwrap();
87+
let encoded = encode_to_vec(&original_value, SERDE_CONFIG).unwrap();
8688

8789
let (decoded_value, _): (CacheValue, usize) =
88-
decode_from_slice(&encoded, bincode::config::standard()).unwrap();
90+
decode_from_slice(&encoded, SERDE_CONFIG).unwrap();
8991

9092
// THEN - Verify the decoded value matches the original
9193
assert_eq!(decoded_value.value, original_value.value);
@@ -99,11 +101,11 @@ mod tests {
99101
let original_value = CacheValue::new("test_value_no_expiry");
100102

101103
// Encode the value
102-
let encoded = encode_to_vec(&original_value, bincode::config::standard()).unwrap();
104+
let encoded = encode_to_vec(&original_value, SERDE_CONFIG).unwrap();
103105

104106
// Decode the value back
105107
let (decoded_value, _): (CacheValue, usize) =
106-
decode_from_slice(&encoded, bincode::config::standard()).unwrap();
108+
decode_from_slice(&encoded, SERDE_CONFIG).unwrap();
107109

108110
// Verify the decoded value matches the original
109111
assert_eq!(decoded_value.value, original_value.value);
@@ -120,11 +122,11 @@ mod tests {
120122
.with_expiry(DateTime::from_timestamp_millis(expiry_millis).unwrap());
121123

122124
// Encode the entry
123-
let encoded = encode_to_vec(&original_entry, bincode::config::standard()).unwrap();
125+
let encoded = encode_to_vec(&original_entry, SERDE_CONFIG).unwrap();
124126

125127
// Decode the entry back
126128
let (decoded_entry, _): (CacheEntry, usize) =
127-
decode_from_slice(&encoded, bincode::config::standard()).unwrap();
129+
decode_from_slice(&encoded, SERDE_CONFIG).unwrap();
128130

129131
// Verify the decoded entry matches the original
130132
assert_eq!(decoded_entry.key(), original_entry.key());

duva/src/domains/caches/cache_objects/value.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,15 +74,16 @@ impl THasExpiry for CacheValue {
7474

7575
#[cfg(test)]
7676
mod tests {
77+
use crate::domains::query_io::SERDE_CONFIG;
78+
7779
use super::*;
7880
use bincode::{decode_from_slice, encode_to_vec};
7981

8082
#[test]
8183
fn test_string_variant_encode_decode() {
8284
let original = CacheValue::new("hello");
83-
let encoded = encode_to_vec(&original, bincode::config::standard()).unwrap();
84-
let (decoded, _): (CacheValue, usize) =
85-
decode_from_slice(&encoded, bincode::config::standard()).unwrap();
85+
let encoded = encode_to_vec(&original, SERDE_CONFIG).unwrap();
86+
let (decoded, _): (CacheValue, usize) = decode_from_slice(&encoded, SERDE_CONFIG).unwrap();
8687
assert_eq!(decoded, original);
8788
assert_eq!(decoded.value, TypedValue::String(Bytes::copy_from_slice(b"hello").into()));
8889
}
@@ -95,9 +96,8 @@ mod tests {
9596
q_list.rpush(i);
9697
}
9798
let original = CacheValue { value: TypedValue::List(q_list.clone()), expiry: None };
98-
let encoded = encode_to_vec(&original, bincode::config::standard()).unwrap();
99-
let (decoded, _): (CacheValue, usize) =
100-
decode_from_slice(&encoded, bincode::config::standard()).unwrap();
99+
let encoded = encode_to_vec(&original, SERDE_CONFIG).unwrap();
100+
let (decoded, _): (CacheValue, usize) = decode_from_slice(&encoded, SERDE_CONFIG).unwrap();
101101
assert_eq!(decoded, original);
102102
assert_eq!(decoded.value, TypedValue::List(q_list));
103103
}

duva/src/domains/cluster_actors/actor.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::domains::peers::command::BannedPeer;
1818
use crate::domains::peers::command::BatchEntries;
1919
use crate::domains::peers::command::BatchId;
2020
use crate::domains::replications::messages::ElectionVote;
21+
use crate::domains::replications::messages::PeerMessage;
2122
use crate::domains::replications::messages::RejectionReason;
2223
use crate::domains::replications::messages::ReplicationAck;
2324
use crate::domains::replications::messages::RequestVote;
@@ -654,7 +655,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
654655
}
655656

656657
let peer = self.members.get_mut(&request_to).unwrap();
657-
let _ = peer.send(QueryIO::StartRebalance).await;
658+
let _ = peer.send(PeerMessage::StartRebalance).await;
658659
}
659660
}
660661

@@ -861,7 +862,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
861862

862863
async fn send_rpc_to_replicas(&mut self) {
863864
self.iter_follower_append_entries()
864-
.map(|(peer, hb)| peer.send(QueryIO::AppendEntriesRPC(hb)))
865+
.map(|(peer, hb)| peer.send(PeerMessage::AppendEntriesRPC(hb)))
865866
.collect::<FuturesUnordered<_>>()
866867
.for_each(|_| async {})
867868
.await;
@@ -1338,7 +1339,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
13381339
warn!("No Member Found");
13391340
return;
13401341
};
1341-
let _ = peer.send(QueryIO::MigrationBatchAck(migrate_batch.batch_id)).await;
1342+
let _ = peer.send(PeerMessage::MigrationBatchAck(migrate_batch.batch_id)).await;
13421343
return;
13431344
}
13441345

@@ -1446,7 +1447,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
14461447
let Some(peer) = self.members.get_mut(&to) else {
14471448
return;
14481449
};
1449-
let _ = peer.send(QueryIO::MigrationBatchAck(batch_id)).await;
1450+
let _ = peer.send(PeerMessage::MigrationBatchAck(batch_id)).await;
14501451
}
14511452

14521453
async fn update_cluster_members(
@@ -1468,7 +1469,7 @@ impl<T: TWriteAheadLog> ClusterActor<T> {
14681469
pub(crate) async fn process_graceful_shutdown(&mut self) {
14691470
self.members
14701471
.iter_mut()
1471-
.map(|(_, p)| p.send(QueryIO::CloseConnection))
1472+
.map(|(_, p)| p.send(PeerMessage::CloseConnection))
14721473
.collect::<FuturesUnordered<_>>()
14731474
.for_each(|_| async {})
14741475
.await;

duva/src/domains/cluster_actors/actor/tests/elections.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,7 +194,7 @@ async fn test_receive_election_vote_candidate_wins_election() {
194194
..Default::default()
195195
};
196196

197-
assert_expected_queryio(&replica1_fake_buf, QueryIO::AppendEntriesRPC(hb.clone())).await;
197+
assert_expected_queryio(&replica1_fake_buf, PeerMessage::AppendEntriesRPC(hb.clone())).await;
198198

199199
// hb that sends to cluster
200200
hb.append_entries = vec![];
@@ -203,7 +203,7 @@ async fn test_receive_election_vote_candidate_wins_election() {
203203

204204
assert_expected_queryio(
205205
&replica1_fake_buf,
206-
QueryIO::ClusterHeartBeat(hb.set_hashring(candidate_actor.hash_ring.clone())),
206+
PeerMessage::ClusterHeartBeat(hb.set_hashring(candidate_actor.hash_ring.clone())),
207207
)
208208
.await;
209209
}

0 commit comments

Comments
 (0)