1+ use std:: collections:: VecDeque ;
2+
13use super :: communication_manager:: ClusterCommunicationManager ;
24use super :: outbound:: stream:: OutboundStream ;
35
46use crate :: domains:: cluster_actors:: commands:: ClusterCommand ;
57use crate :: domains:: peers:: identifier:: PeerIdentifier ;
68use crate :: { InboundStream , make_smart_pointer} ;
7- use tokio:: sync:: mpsc:: Sender ;
89
910pub ( crate ) struct ClusterConnectionManager ( pub ( crate ) ClusterCommunicationManager ) ;
1011
@@ -20,7 +21,7 @@ impl ClusterConnectionManager {
2021 peer_stream. may_try_sync ( ccm, & connected_peer_info) . await ?;
2122
2223 let ( tx, rx) = tokio:: sync:: oneshot:: channel ( ) ;
23- self . send ( peer_stream. into_add_peer ( self . clone ( ) , connected_peer_info, tx) ?) . await ?;
24+ self . send ( peer_stream. into_add_peer ( self . 0 . 0 . clone ( ) , connected_peer_info, tx) ?) . await ?;
2425
2526 Ok ( ( ) )
2627 }
@@ -29,45 +30,46 @@ impl ClusterConnectionManager {
2930 self ,
3031 self_port : u16 ,
3132 connect_to : PeerIdentifier ,
32- sender : tokio:: sync:: oneshot:: Sender < ( ) > ,
3333 ) -> anyhow:: Result < ( ) > {
34- // Base case
34+ let mut queue = VecDeque :: from ( vec ! [ connect_to. clone( ) ] ) ;
35+ let mut callbacks = Vec :: new ( ) ;
36+ while let Some ( connect_to) = queue. pop_front ( ) {
37+ let ( tx, rx) = tokio:: sync:: oneshot:: channel ( ) ;
38+ queue. extend (
39+ ClusterConnectionManager ( self . clone ( ) )
40+ . connect_to_server ( self_port, connect_to, tx)
41+ . await ?,
42+ ) ;
43+ callbacks. push ( rx) ;
44+ }
45+ for callback in callbacks {
46+ let _ = callback. await ;
47+ }
48+ Ok ( ( ) )
49+ }
50+
51+ async fn connect_to_server (
52+ self ,
53+ self_port : u16 ,
54+ connect_to : PeerIdentifier ,
55+ sender : tokio:: sync:: oneshot:: Sender < ( ) > ,
56+ ) -> anyhow:: Result < Vec < PeerIdentifier > > {
3557 let existing_peers = self . get_peers ( ) . await ?;
3658 if existing_peers. contains ( & connect_to) {
37- return Ok ( ( ) ) ;
59+ return Ok ( vec ! [ ] ) ;
3860 }
3961
40- // Recursive case
4162 let replication_info = self . replication_info ( ) . await ?;
4263 let ( add_peer_cmd, peer_list) = OutboundStream :: new ( connect_to, replication_info)
4364 . await ?
4465 . establish_connection ( self_port)
4566 . await ?
4667 . set_replication_info ( & self )
4768 . await ?
48- . create_peer_cmd ( self . clone ( ) , sender) ?;
69+ . create_peer_cmd ( self . 0 . 0 . clone ( ) , sender) ?;
4970 self . send ( add_peer_cmd) . await ?;
5071
51- // Discover additional peers concurrently
52- // TODO Require investigation. Why does 'list_peer_binding_addrs' have to be called at here?
53- let mut callbacks = Vec :: new ( ) ;
54- for peer in peer_list {
55- println ! ( "Discovering peer: {}" , peer) ;
56- let ( tx, rx) = tokio:: sync:: oneshot:: channel ( ) ;
57- Box :: pin (
58- ClusterConnectionManager ( self . 0 . clone ( ) ) . discover_cluster ( self_port, peer, tx) ,
59- )
60- . await ?;
61- callbacks. push ( rx) ;
62- }
63- for callback in callbacks {
64- let _ = callback. await ;
65- }
66- Ok ( ( ) )
67- }
68-
69- pub fn clone ( & self ) -> Sender < ClusterCommand > {
70- self . 0 . 0 . clone ( )
72+ Ok ( peer_list)
7173 }
7274
7375 pub async fn send ( & self , cmd : ClusterCommand ) -> anyhow:: Result < ( ) > {
0 commit comments