Skip to content

Commit ff49d22

Browse files
authored
Merge pull request #887 from Migorithm/refactor/ch-type
rfct : ch type
2 parents 6028a86 + 476a470 commit ff49d22

3 files changed

Lines changed: 10 additions & 12 deletions

File tree

duva/src/domains/cluster_actors/queue.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,10 +156,10 @@ impl ClusterActorSender {
156156

157157
pub(crate) async fn route_subscribe_topology_change(
158158
&self,
159-
) -> anyhow::Result<tokio::sync::broadcast::Receiver<Topology>> {
159+
) -> tokio::sync::broadcast::Receiver<Topology> {
160160
let (tx, rx) = Callback::create();
161-
self.send(ClientMessage::SubscribeToTopologyChange(tx)).await?;
162-
Ok(rx.recv().await)
161+
self.send(ClientMessage::SubscribeToTopologyChange(tx)).await;
162+
rx.recv().await
163163
}
164164
}
165165

duva/src/lib.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -244,16 +244,14 @@ impl StartUpFacade {
244244
},
245245
ConnectionRequests::Authenticate(request) => {
246246
let client_id = writer.send_conn_res(&self.cluster_actor_sender, request).await?;
247-
let observer =
248-
self.cluster_actor_sender.route_subscribe_topology_change().await.unwrap();
249-
let stream_writer = writer.run(observer);
247+
let observer = self.cluster_actor_sender.route_subscribe_topology_change().await;
250248
let client_controller = ClientController {
251249
cluster_actor_sender: self.cluster_actor_sender.clone(),
252250
cache_manager: self.cache_manager.clone(),
253251
};
254252
tokio::spawn(
255253
ClientStreamReader { client_id, r: read_half }
256-
.handle_client_stream(client_controller, stream_writer),
254+
.handle_client_stream(client_controller, writer.run(observer)),
257255
);
258256
},
259257
}

duva/src/presentation/clients/stream.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,14 +135,14 @@ impl ClientStreamWriter {
135135

136136
let (client_id, request_id) = auth_req.deconstruct()?;
137137

138-
self.serialized_write(ConnectionResponses::Authenticated(ConnectionResponse {
139-
client_id: client_id.to_string(),
138+
let connection_response = ConnectionResponse {
139+
client_id: client_id.clone(),
140140
request_id,
141141
topology: cluster_manager.route_get_topology().await?,
142142
is_leader_node: replication_state.role == ReplicationRole::Leader,
143-
replication_id: replication_state.replid.clone(),
144-
}))
145-
.await?;
143+
replication_id: replication_state.replid,
144+
};
145+
self.serialized_write(ConnectionResponses::Authenticated(connection_response)).await?;
146146

147147
Ok(client_id)
148148
}

0 commit comments

Comments
 (0)