@@ -30,7 +30,7 @@ use futures::{Future, channel::mpsc::Sender};
3030use ipnetwork:: IpNetwork ;
3131use p2p:: {
3232 SessionId , async_trait,
33- builder:: ServiceBuilder ,
33+ builder:: { MetaBuilder , ServiceBuilder } ,
3434 bytes:: Bytes ,
3535 context:: { ServiceContext , SessionContext } ,
3636 error:: { DialerErrorKind , HandshakeErrorKind , ProtocolHandleErrorKind , SendErrorKind } ,
@@ -74,10 +74,9 @@ pub struct NetworkState {
7474 /// Node listened addresses
7575 pub ( crate ) listened_addrs : RwLock < Vec < Multiaddr > > ,
7676 dialing_addrs : RwLock < HashMap < PeerId , Instant > > ,
77- /// Node public addresses,
78- /// includes manually public addrs and remote peer observed addrs
79- public_addrs : RwLock < HashSet < Multiaddr > > ,
80- pending_observed_addrs : RwLock < HashSet < Multiaddr > > ,
77+ /// Node public addresses by config
78+ public_addrs : HashSet < Multiaddr > ,
79+ observed_addrs : RwLock < HashMap < PeerIndex , Multiaddr > > ,
8180 local_private_key : secio:: SecioKeyPair ,
8281 local_peer_id : PeerId ,
8382 pub ( crate ) bootnodes : Vec < Multiaddr > ,
@@ -139,9 +138,9 @@ impl NetworkState {
139138 bootnodes,
140139 peer_registry : RwLock :: new ( peer_registry) ,
141140 dialing_addrs : RwLock :: new ( HashMap :: default ( ) ) ,
142- public_addrs : RwLock :: new ( public_addrs ) ,
141+ public_addrs,
143142 listened_addrs : RwLock :: new ( Vec :: new ( ) ) ,
144- pending_observed_addrs : RwLock :: new ( HashSet :: default ( ) ) ,
143+ observed_addrs : RwLock :: new ( HashMap :: default ( ) ) ,
145144 local_private_key,
146145 local_peer_id,
147146 active : AtomicBool :: new ( true ) ,
@@ -187,9 +186,9 @@ impl NetworkState {
187186 bootnodes,
188187 peer_registry : RwLock :: new ( peer_registry) ,
189188 dialing_addrs : RwLock :: new ( HashMap :: default ( ) ) ,
190- public_addrs : RwLock :: new ( public_addrs ) ,
189+ public_addrs,
191190 listened_addrs : RwLock :: new ( Vec :: new ( ) ) ,
192- pending_observed_addrs : RwLock :: new ( HashSet :: default ( ) ) ,
191+ observed_addrs : RwLock :: new ( HashMap :: default ( ) ) ,
193192 local_private_key,
194193 local_peer_id,
195194 active : AtomicBool :: new ( true ) ,
@@ -338,12 +337,14 @@ impl NetworkState {
338337 }
339338
340339 pub ( crate ) fn public_addrs ( & self , count : usize ) -> Vec < Multiaddr > {
341- self . public_addrs
342- . read ( )
343- . iter ( )
344- . take ( count)
345- . cloned ( )
346- . collect ( )
340+ if self . public_addrs . len ( ) <= count {
341+ return self . public_addrs . iter ( ) . cloned ( ) . collect ( ) ;
342+ } else {
343+ self . public_addrs
344+ . iter ( )
345+ . cloned ( )
346+ . choose_multiple ( & mut rand:: thread_rng ( ) , count)
347+ }
347348 }
348349
349350 pub ( crate ) fn connection_status ( & self ) -> ConnectionStatus {
@@ -392,7 +393,7 @@ impl NetworkState {
392393 trace ! ( "Do not dial self: {:?}, {}" , peer_id, addr) ;
393394 return false ;
394395 }
395- if self . public_addrs . read ( ) . contains ( addr) {
396+ if self . public_addrs . contains ( addr) {
396397 trace ! (
397398 "Do not dial listened address(self): {:?}, {}" ,
398399 peer_id, addr
@@ -500,40 +501,28 @@ impl NetworkState {
500501 }
501502 }
502503
503- /// this method is intent to check observed addr by dial to self
504- pub ( crate ) fn try_dial_observed_addrs ( & self , p2p_control : & ServiceControl ) {
505- let mut pending_observed_addrs = self . pending_observed_addrs . write ( ) ;
506- if pending_observed_addrs. is_empty ( ) {
507- let addrs = self . public_addrs . read ( ) ;
508- if addrs. is_empty ( ) {
509- return ;
510- }
511- // random get addr
512- if let Some ( addr) = addrs. iter ( ) . choose ( & mut rand:: thread_rng ( ) ) {
513- if let Err ( err) = p2p_control. dial (
514- addr. clone ( ) ,
515- TargetProtocol :: Single ( SupportProtocols :: Identify . protocol_id ( ) ) ,
516- ) {
517- trace ! ( "try_dial_observed_addrs {err} failed in public address" )
518- }
519- }
520- } else {
521- for addr in pending_observed_addrs. drain ( ) {
522- trace ! ( "try dial observed addr: {:?}" , addr) ;
523- if let Err ( err) = p2p_control. dial (
524- addr,
525- TargetProtocol :: Single ( SupportProtocols :: Identify . protocol_id ( ) ) ,
526- ) {
527- trace ! ( "try_dial_observed_addrs {err} failed in pending observed addresses" )
528- }
529- }
530- }
504+ /// add observed address for identify protocol
505+ pub ( crate ) fn add_observed_addr ( & self , session_id : SessionId , addr : Multiaddr ) {
506+ let mut pending_observed_addrs = self . observed_addrs . write ( ) ;
507+ pending_observed_addrs. insert ( session_id, addr) ;
531508 }
532509
533- /// add observed address for identify protocol
534- pub ( crate ) fn add_observed_addrs ( & self , iter : impl Iterator < Item = Multiaddr > ) {
535- let mut pending_observed_addrs = self . pending_observed_addrs . write ( ) ;
536- pending_observed_addrs. extend ( iter)
510+ // randomly select count addresses from observed_addrs
511+ #[ cfg( not( target_family = "wasm" ) ) ]
512+ pub ( crate ) fn observed_addrs ( & self , count : usize ) -> Vec < Multiaddr > {
513+ let observed_addrs = self
514+ . observed_addrs
515+ . read ( )
516+ . values ( )
517+ . cloned ( )
518+ . collect :: < HashSet < _ > > ( ) ;
519+ if observed_addrs. len ( ) <= count {
520+ return observed_addrs. into_iter ( ) . collect ( ) ;
521+ } else {
522+ observed_addrs
523+ . into_iter ( )
524+ . choose_multiple ( & mut rand:: thread_rng ( ) , count)
525+ }
537526 }
538527
539528 /// Network message processing controller, default is true, if false, discard any received messages
@@ -613,19 +602,11 @@ impl ServiceHandle for EventHandler {
613602 async fn handle_error ( & mut self , context : & mut ServiceContext , error : ServiceError ) {
614603 match error {
615604 ServiceError :: DialerError { address, error } => {
616- let mut public_addrs = self . network_state . public_addrs . write ( ) ;
617-
618605 match error {
619606 DialerErrorKind :: HandshakeError ( HandshakeErrorKind :: SecioError (
620607 SecioError :: ConnectSelf ,
621608 ) ) => {
622609 debug ! ( "dial observed address success: {:?}" , address) ;
623- if let Some ( ip) = multiaddr_to_socketaddr ( & address) {
624- if is_reachable ( ip. ip ( ) ) {
625- public_addrs. insert ( address) ;
626- }
627- }
628- return ;
629610 }
630611 DialerErrorKind :: IoError ( e)
631612 if e. kind ( ) == std:: io:: ErrorKind :: AddrNotAvailable =>
@@ -636,12 +617,6 @@ impl ServiceHandle for EventHandler {
636617 debug ! ( "DialerError({}) {}" , address, error) ;
637618 }
638619 }
639- if public_addrs. remove ( & address) {
640- info ! (
641- "Dial {} failed, remove it from network_state.public_addrs" ,
642- address
643- ) ;
644- }
645620 self . network_state . dial_failed ( & address) ;
646621 }
647622 ServiceError :: ProtocolError {
@@ -815,6 +790,10 @@ impl ServiceHandle for EventHandler {
815790 peer_store. remove_disconnected_peer ( & session_context. address ) ;
816791 } ) ;
817792 }
793+ self . network_state
794+ . observed_addrs
795+ . write ( )
796+ . remove ( & session_context. id ) ;
818797 }
819798 _ => {
820799 info ! ( "p2p service event: {:?}" , event) ;
@@ -937,6 +916,26 @@ impl NetworkService {
937916 protocol_metas. push ( disconnect_message_meta) ;
938917 }
939918
919+ // HolePunching protocol
920+ #[ cfg( not( target_family = "wasm" ) ) ]
921+ if config
922+ . support_protocols
923+ . contains ( & SupportProtocol :: HolePunching )
924+ {
925+ let hole_punching_state = Arc :: clone ( & network_state) ;
926+ let hole_punching_meta_builder: MetaBuilder = SupportProtocols :: HolePunching . into ( ) ;
927+ let hole_punching_meta = hole_punching_meta_builder
928+ . before_send ( crate :: compress:: compress)
929+ . before_receive ( || Some ( Box :: new ( crate :: compress:: decompress) ) )
930+ . service_handle ( move || {
931+ ProtocolHandle :: Callback ( Box :: new (
932+ crate :: protocols:: hole_punching:: HolePunching :: new ( hole_punching_state) ,
933+ ) )
934+ } )
935+ . build ( ) ;
936+ protocol_metas. push ( hole_punching_meta) ;
937+ }
938+
940939 let mut service_builder = ServiceBuilder :: default ( ) ;
941940 let yamux_config = YamuxConfig {
942941 max_stream_count : protocol_metas. len ( ) ,
0 commit comments