@@ -28,6 +28,7 @@ use ckb_systemtime::{Duration, Instant};
2828use ckb_util:: { Condvar , Mutex , RwLock } ;
2929use futures:: { Future , channel:: mpsc:: Sender } ;
3030use ipnetwork:: IpNetwork ;
31+ use p2p:: multiaddr:: MultiAddr ;
3132use p2p:: {
3233 SessionId , async_trait,
3334 builder:: ServiceBuilder ,
@@ -67,6 +68,54 @@ const P2P_TRY_SEND_INTERVAL: Duration = Duration::from_millis(100);
6768// After 5 minutes we consider this dial hang
6869const DIAL_HANG_TIMEOUT : Duration = Duration :: from_secs ( 300 ) ;
6970
71+ /// CKB node's public addresses:
72+ ///
73+ /// This struct holds the public addresses of the CKB node, categorized by how they were obtained.
74+ pub struct PublicAddresses {
75+ /// Addresses explicitly configured by the user in the ckb.toml configuration file.
76+ /// These addresses are considered static and represent the node's intended public endpoints.
77+ configured : HashSet < MultiAddr > ,
78+
79+ /// Addresses discovered dynamically at runtime through observing successful outbound connections.
80+ /// These addresses may change over time and are managed behind a `RwLock` to allow concurrent
81+ /// read access while providing exclusive write access for updates. Addresses that fail to connect
82+ /// are removed from this set.
83+ discovered : RwLock < HashSet < Multiaddr > > ,
84+ }
85+
86+ impl PublicAddresses {
87+ fn new ( configured : HashSet < MultiAddr > , discovered : HashSet < Multiaddr > ) -> Self {
88+ Self {
89+ configured,
90+ discovered : RwLock :: new ( discovered) ,
91+ }
92+ }
93+
94+ fn all ( & self ) -> Vec < MultiAddr > {
95+ self . configured
96+ . iter ( )
97+ . chain ( self . discovered . read ( ) . iter ( ) )
98+ . cloned ( )
99+ . collect ( )
100+ }
101+
102+ fn contains ( & self , addr : & MultiAddr ) -> bool {
103+ self . discovered . read ( ) . contains ( addr) || self . configured . contains ( addr)
104+ }
105+
106+ fn count ( & self ) -> usize {
107+ self . configured . len ( ) + self . discovered . read ( ) . len ( )
108+ }
109+
110+ fn random_choose ( & self ) -> Option < MultiAddr > {
111+ let addrs = self . all ( ) ;
112+ if addrs. is_empty ( ) {
113+ return None ;
114+ }
115+ addrs. into_iter ( ) . choose ( & mut rand:: thread_rng ( ) )
116+ }
117+ }
118+
70119/// The global shared state of the network module
71120pub struct NetworkState {
72121 pub ( crate ) peer_registry : RwLock < PeerRegistry > ,
@@ -76,7 +125,7 @@ pub struct NetworkState {
76125 dialing_addrs : RwLock < HashMap < PeerId , Instant > > ,
77126 /// Node public addresses,
78127 /// includes manually public addrs and remote peer observed addrs
79- public_addrs : RwLock < HashSet < Multiaddr > > ,
128+ public_addrs : PublicAddresses ,
80129 pending_observed_addrs : RwLock < HashSet < Multiaddr > > ,
81130 local_private_key : secio:: SecioKeyPair ,
82131 local_peer_id : PeerId ,
@@ -99,7 +148,7 @@ impl NetworkState {
99148 let local_private_key = config. fetch_private_key ( ) ?;
100149 let local_peer_id = local_private_key. peer_id ( ) ;
101150 // set max score to public addresses
102- let public_addrs : HashSet < Multiaddr > = config
151+ let configured_public_addrs : HashSet < Multiaddr > = config
103152 . listen_addresses
104153 . iter ( )
105154 . chain ( config. public_addresses . iter ( ) )
@@ -114,6 +163,9 @@ impl NetworkState {
114163 }
115164 } )
116165 . collect ( ) ;
166+
167+ let discovered_public_addrs = HashSet :: new ( ) ;
168+ let public_addrs = PublicAddresses :: new ( configured_public_addrs, discovered_public_addrs) ;
117169 info ! ( "Loading the peer store. This process may take a few seconds to complete." ) ;
118170
119171 let peer_store = Mutex :: new ( PeerStore :: load_from_dir_or_default (
@@ -134,7 +186,7 @@ impl NetworkState {
134186 bootnodes,
135187 peer_registry : RwLock :: new ( peer_registry) ,
136188 dialing_addrs : RwLock :: new ( HashMap :: default ( ) ) ,
137- public_addrs : RwLock :: new ( public_addrs ) ,
189+ public_addrs,
138190 listened_addrs : RwLock :: new ( Vec :: new ( ) ) ,
139191 pending_observed_addrs : RwLock :: new ( HashSet :: default ( ) ) ,
140192 local_private_key,
@@ -334,7 +386,7 @@ impl NetworkState {
334386
335387 pub ( crate ) fn public_addrs ( & self , count : usize ) -> Vec < Multiaddr > {
336388 self . public_addrs
337- . read ( )
389+ . all ( )
338390 . iter ( )
339391 . take ( count)
340392 . cloned ( )
@@ -387,7 +439,7 @@ impl NetworkState {
387439 trace ! ( "Do not dial self: {:?}, {}" , peer_id, addr) ;
388440 return false ;
389441 }
390- if self . public_addrs . read ( ) . contains ( addr) {
442+ if self . public_addrs . contains ( addr) {
391443 trace ! (
392444 "Do not dial listened address(self): {:?}, {}" ,
393445 peer_id, addr
@@ -499,12 +551,12 @@ impl NetworkState {
499551 pub ( crate ) fn try_dial_observed_addrs ( & self , p2p_control : & ServiceControl ) {
500552 let mut pending_observed_addrs = self . pending_observed_addrs . write ( ) ;
501553 if pending_observed_addrs. is_empty ( ) {
502- let addrs = self . public_addrs . read ( ) ;
503- if addrs. is_empty ( ) {
554+ let addrs = & self . public_addrs ;
555+ if addrs. count ( ) == 0 {
504556 return ;
505557 }
506558 // random get addr
507- if let Some ( addr) = addrs. iter ( ) . choose ( & mut rand :: thread_rng ( ) ) {
559+ if let Some ( addr) = addrs. random_choose ( ) {
508560 if let Err ( err) = p2p_control. dial (
509561 addr. clone ( ) ,
510562 TargetProtocol :: Single ( SupportProtocols :: Identify . protocol_id ( ) ) ,
@@ -608,7 +660,11 @@ impl ServiceHandle for EventHandler {
608660 async fn handle_error ( & mut self , context : & mut ServiceContext , error : ServiceError ) {
609661 match error {
610662 ServiceError :: DialerError { address, error } => {
611- let mut public_addrs = self . network_state . public_addrs . write ( ) ;
663+ let mut discovered_public_addrs =
664+ self . network_state . public_addrs . discovered . write ( ) ;
665+
666+ let mut user_configured_public_addrs =
667+ self . network_state . public_addrs . configured . write ( ) ;
612668
613669 match error {
614670 DialerErrorKind :: HandshakeError ( HandshakeErrorKind :: SecioError (
@@ -617,7 +673,7 @@ impl ServiceHandle for EventHandler {
617673 debug ! ( "dial observed address success: {:?}" , address) ;
618674 if let Some ( ip) = multiaddr_to_socketaddr ( & address) {
619675 if is_reachable ( ip. ip ( ) ) {
620- public_addrs . insert ( address) ;
676+ discovered_public_addrs . insert ( address) ;
621677 }
622678 }
623679 return ;
@@ -631,9 +687,18 @@ impl ServiceHandle for EventHandler {
631687 debug ! ( "DialerError({}) {}" , address, error) ;
632688 }
633689 }
634- if public_addrs. remove ( & address) {
690+
691+ if user_configured_public_addrs. contains ( & address) {
692+ // don't remove the public_addr, sicne its user configred in ckb.toml
693+ warn ! (
694+ "Dial the public addr {} which is configured in ckb.toml failed, keep it." ,
695+ address
696+ ) ;
697+ }
698+
699+ if discovered_public_addrs. remove ( & address) {
635700 info ! (
636- "Dial {} failed, remove it from network_state.public_addrs" ,
701+ "Dial {} failed, remove it from network_state.public_addrs.discovered " ,
637702 address
638703 ) ;
639704 }
0 commit comments