Skip to content

Commit c705611

Browse files
committed
peer list added auth response
1 parent d462314 commit c705611

7 files changed

Lines changed: 39 additions & 18 deletions

File tree

duva-client/src/controller.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use duva::{
33
clients::authentications::{AuthRequest, AuthResponse},
44
domains::query_parsers::query_io::{QueryIO, deserialize},
55
prelude::{
6-
BytesMut,
6+
BytesMut, PeerIdentifier,
77
tokio::{
88
self,
99
io::{AsyncReadExt, AsyncWriteExt},
@@ -19,27 +19,31 @@ pub struct ClientController<T> {
1919
client_id: Uuid,
2020
request_id: u64,
2121
latest_known_index: u64,
22+
cluster_nodes: Vec<PeerIdentifier>,
2223
pub target: T,
2324
}
2425

2526
impl<T> ClientController<T> {
2627
pub async fn new(editor: T, server_addr: &str) -> Self {
27-
let (stream, client_id, request_id) =
28-
ClientController::<T>::authenticate(server_addr).await;
29-
Self { stream, client_id, target: editor, latest_known_index: 0, request_id }
28+
let (stream, mut auth_response) = ClientController::<T>::authenticate(server_addr).await;
29+
auth_response.cluster_nodes.push(server_addr.to_string().into());
30+
Self {
31+
stream,
32+
client_id: Uuid::parse_str(&auth_response.client_id).unwrap(),
33+
target: editor,
34+
latest_known_index: 0,
35+
request_id: auth_response.request_id,
36+
cluster_nodes: auth_response.cluster_nodes,
37+
}
3038
}
3139

32-
async fn authenticate(server_addr: &str) -> (TcpStream, Uuid, u64) {
40+
async fn authenticate(server_addr: &str) -> (TcpStream, AuthResponse) {
3341
let mut stream = TcpStream::connect(server_addr).await.unwrap();
3442
stream.serialized_write(AuthRequest::default()).await.unwrap(); // client_id not exist
3543

36-
let AuthResponse { client_id, request_id } = stream.deserialized_read().await.unwrap();
37-
38-
let client_id = Uuid::parse_str(&client_id).unwrap();
39-
println!("Client ID: {}", client_id);
40-
println!("Connected to Redis at {}", server_addr);
44+
let auth_response: AuthResponse = stream.deserialized_read().await.unwrap();
4145

42-
(stream, client_id, request_id)
46+
(stream, auth_response)
4347
}
4448

4549
pub async fn send_command(

duva/src/clients/authentications.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use crate::domains::peers::identifier::PeerIdentifier;
2+
13
#[derive(Debug, Clone, PartialEq, Eq, Default, bincode::Decode, bincode::Encode)]
24
pub struct AuthRequest {
35
pub client_id: Option<String>,
@@ -7,4 +9,5 @@ pub struct AuthRequest {
79
pub struct AuthResponse {
810
pub client_id: String,
911
pub request_id: u64,
12+
pub cluster_nodes: Vec<PeerIdentifier>,
1013
}

duva/src/clients/utils.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,8 @@ impl ClientStreamHandler {
3434

3535
stream.serialized_write(AuthRequest::default()).await.unwrap(); // client_id not exist
3636

37-
let AuthResponse { client_id, request_id } = stream.deserialized_read().await.unwrap();
37+
let AuthResponse { client_id, request_id, cluster_nodes } =
38+
stream.deserialized_read().await.unwrap();
3839
let client_id = Uuid::parse_str(&client_id).unwrap();
3940

4041
let (read_half, write_half) = stream.into_split();

duva/src/domains/peers/identifier.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::{from_to, make_smart_pointer};
33
#[derive(
44
Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Hash, Default, bincode::Encode, bincode::Decode,
55
)]
6-
pub(crate) struct PeerIdentifier(pub String);
6+
pub struct PeerIdentifier(pub String);
77
impl PeerIdentifier {
88
pub(crate) fn new(host: &str, port: u16) -> Self {
99
Self(format!("{}:{}", host, port))

duva/src/domains/peers/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
pub mod connected_peer_info;
22
pub(crate) mod connected_types;
3-
pub(crate) mod identifier;
3+
pub mod identifier;
44

55
pub(crate) mod peer;

duva/src/lib.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use services::interface::TSerdeReadWrite;
2323
use tokio::net::TcpListener;
2424

2525
pub mod prelude {
26+
pub use crate::domains::peers::identifier::PeerIdentifier;
2627
pub use bytes;
2728
pub use bytes::BytesMut;
2829
pub use tokio;
@@ -128,8 +129,10 @@ impl StartUpFacade {
128129
let client_stream_listener = TcpListener::bind(&self.config_manager.bind_addr()).await?;
129130
println!("start listening on {}", self.config_manager.bind_addr());
130131
let mut conn_handlers: Vec<tokio::task::JoinHandle<()>> = Vec::with_capacity(100);
132+
131133
while let Ok((stream, _)) = client_stream_listener.accept().await {
132-
let Ok(client_stream) = ClientStream::authenticate(stream).await else {
134+
let peers = self.registry.cluster_communication_manager().get_peers().await?;
135+
let Ok(client_stream) = ClientStream::authenticate(stream, peers).await else {
133136
eprintln!("[ERROR] Failed to authenticate client stream");
134137
continue;
135138
};

duva/src/presentation/clients/stream.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ use super::{parser::parse_query, request::ClientRequest};
22
use crate::{
33
TSerdeReadWrite,
44
clients::authentications::{AuthRequest, AuthResponse},
5-
domains::{IoError, cluster_actors::session::SessionRequest, query_parsers::QueryIO},
5+
domains::{
6+
IoError, cluster_actors::session::SessionRequest, peers::identifier::PeerIdentifier,
7+
query_parsers::QueryIO,
8+
},
69
make_smart_pointer,
710
services::interface::TRead,
811
};
@@ -18,7 +21,10 @@ pub struct ClientStream {
1821
make_smart_pointer!(ClientStream, TcpStream=>stream);
1922

2023
impl ClientStream {
21-
pub(crate) async fn authenticate(mut stream: TcpStream) -> Result<Self, IoError> {
24+
pub(crate) async fn authenticate(
25+
mut stream: TcpStream,
26+
peers: Vec<PeerIdentifier>,
27+
) -> Result<Self, IoError> {
2228
let auth_req: AuthRequest = stream.deserialized_read().await?;
2329

2430
let client_id = match auth_req.client_id {
@@ -30,7 +36,11 @@ impl ClientStream {
3036
};
3137

3238
stream
33-
.serialized_write(AuthResponse { client_id: client_id.to_string(), request_id: 0 })
39+
.serialized_write(AuthResponse {
40+
client_id: client_id.to_string(),
41+
request_id: 0,
42+
cluster_nodes: peers,
43+
})
3444
.await?;
3545

3646
Ok(Self { stream, client_id })

0 commit comments

Comments
 (0)