Skip to content

Commit f46b1e0

Browse files
committed
feat: add a hote puching protocol try use on nat traversal
1 parent f0bb922 commit f46b1e0

File tree

15 files changed

+1988
-104
lines changed

15 files changed

+1988
-104
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

network/src/network.rs

Lines changed: 61 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use futures::{Future, channel::mpsc::Sender};
3030
use ipnetwork::IpNetwork;
3131
use p2p::{
3232
SessionId, async_trait,
33-
builder::ServiceBuilder,
33+
builder::{MetaBuilder, ServiceBuilder},
3434
bytes::Bytes,
3535
context::{ServiceContext, SessionContext},
3636
error::{DialerErrorKind, HandshakeErrorKind, ProtocolHandleErrorKind, SendErrorKind},
@@ -74,10 +74,9 @@ pub struct NetworkState {
7474
/// Node listened addresses
7575
pub(crate) listened_addrs: RwLock<Vec<Multiaddr>>,
7676
dialing_addrs: RwLock<HashMap<PeerId, Instant>>,
77-
/// Node public addresses,
78-
/// includes manually public addrs and remote peer observed addrs
79-
public_addrs: RwLock<HashSet<Multiaddr>>,
80-
pending_observed_addrs: RwLock<HashSet<Multiaddr>>,
77+
/// Node public addresses by config
78+
public_addrs: HashSet<Multiaddr>,
79+
observed_addrs: RwLock<HashMap<PeerIndex, Multiaddr>>,
8180
local_private_key: secio::SecioKeyPair,
8281
local_peer_id: PeerId,
8382
pub(crate) bootnodes: Vec<Multiaddr>,
@@ -139,9 +138,9 @@ impl NetworkState {
139138
bootnodes,
140139
peer_registry: RwLock::new(peer_registry),
141140
dialing_addrs: RwLock::new(HashMap::default()),
142-
public_addrs: RwLock::new(public_addrs),
141+
public_addrs,
143142
listened_addrs: RwLock::new(Vec::new()),
144-
pending_observed_addrs: RwLock::new(HashSet::default()),
143+
observed_addrs: RwLock::new(HashMap::default()),
145144
local_private_key,
146145
local_peer_id,
147146
active: AtomicBool::new(true),
@@ -187,9 +186,9 @@ impl NetworkState {
187186
bootnodes,
188187
peer_registry: RwLock::new(peer_registry),
189188
dialing_addrs: RwLock::new(HashMap::default()),
190-
public_addrs: RwLock::new(public_addrs),
189+
public_addrs,
191190
listened_addrs: RwLock::new(Vec::new()),
192-
pending_observed_addrs: RwLock::new(HashSet::default()),
191+
observed_addrs: RwLock::new(HashMap::default()),
193192
local_private_key,
194193
local_peer_id,
195194
active: AtomicBool::new(true),
@@ -338,12 +337,14 @@ impl NetworkState {
338337
}
339338

340339
pub(crate) fn public_addrs(&self, count: usize) -> Vec<Multiaddr> {
341-
self.public_addrs
342-
.read()
343-
.iter()
344-
.take(count)
345-
.cloned()
346-
.collect()
340+
if self.public_addrs.len() <= count {
341+
return self.public_addrs.iter().cloned().collect();
342+
} else {
343+
self.public_addrs
344+
.iter()
345+
.cloned()
346+
.choose_multiple(&mut rand::thread_rng(), count)
347+
}
347348
}
348349

349350
pub(crate) fn connection_status(&self) -> ConnectionStatus {
@@ -392,7 +393,7 @@ impl NetworkState {
392393
trace!("Do not dial self: {:?}, {}", peer_id, addr);
393394
return false;
394395
}
395-
if self.public_addrs.read().contains(addr) {
396+
if self.public_addrs.contains(addr) {
396397
trace!(
397398
"Do not dial listened address(self): {:?}, {}",
398399
peer_id, addr
@@ -500,40 +501,28 @@ impl NetworkState {
500501
}
501502
}
502503

503-
/// this method is intent to check observed addr by dial to self
504-
pub(crate) fn try_dial_observed_addrs(&self, p2p_control: &ServiceControl) {
505-
let mut pending_observed_addrs = self.pending_observed_addrs.write();
506-
if pending_observed_addrs.is_empty() {
507-
let addrs = self.public_addrs.read();
508-
if addrs.is_empty() {
509-
return;
510-
}
511-
// random get addr
512-
if let Some(addr) = addrs.iter().choose(&mut rand::thread_rng()) {
513-
if let Err(err) = p2p_control.dial(
514-
addr.clone(),
515-
TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
516-
) {
517-
trace!("try_dial_observed_addrs {err} failed in public address")
518-
}
519-
}
520-
} else {
521-
for addr in pending_observed_addrs.drain() {
522-
trace!("try dial observed addr: {:?}", addr);
523-
if let Err(err) = p2p_control.dial(
524-
addr,
525-
TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
526-
) {
527-
trace!("try_dial_observed_addrs {err} failed in pending observed addresses")
528-
}
529-
}
530-
}
504+
/// add observed address for identify protocol
505+
pub(crate) fn add_observed_addr(&self, session_id: SessionId, addr: Multiaddr) {
506+
let mut pending_observed_addrs = self.observed_addrs.write();
507+
pending_observed_addrs.insert(session_id, addr);
531508
}
532509

533-
/// add observed address for identify protocol
534-
pub(crate) fn add_observed_addrs(&self, iter: impl Iterator<Item = Multiaddr>) {
535-
let mut pending_observed_addrs = self.pending_observed_addrs.write();
536-
pending_observed_addrs.extend(iter)
510+
// randomly select count addresses from observed_addrs
511+
#[cfg(not(target_family = "wasm"))]
512+
pub(crate) fn observed_addrs(&self, count: usize) -> Vec<Multiaddr> {
513+
let observed_addrs = self
514+
.observed_addrs
515+
.read()
516+
.values()
517+
.cloned()
518+
.collect::<HashSet<_>>();
519+
if observed_addrs.len() <= count {
520+
return observed_addrs.into_iter().collect();
521+
} else {
522+
observed_addrs
523+
.into_iter()
524+
.choose_multiple(&mut rand::thread_rng(), count)
525+
}
537526
}
538527

539528
/// Network message processing controller, default is true, if false, discard any received messages
@@ -613,19 +602,11 @@ impl ServiceHandle for EventHandler {
613602
async fn handle_error(&mut self, context: &mut ServiceContext, error: ServiceError) {
614603
match error {
615604
ServiceError::DialerError { address, error } => {
616-
let mut public_addrs = self.network_state.public_addrs.write();
617-
618605
match error {
619606
DialerErrorKind::HandshakeError(HandshakeErrorKind::SecioError(
620607
SecioError::ConnectSelf,
621608
)) => {
622609
debug!("dial observed address success: {:?}", address);
623-
if let Some(ip) = multiaddr_to_socketaddr(&address) {
624-
if is_reachable(ip.ip()) {
625-
public_addrs.insert(address);
626-
}
627-
}
628-
return;
629610
}
630611
DialerErrorKind::IoError(e)
631612
if e.kind() == std::io::ErrorKind::AddrNotAvailable =>
@@ -636,12 +617,6 @@ impl ServiceHandle for EventHandler {
636617
debug!("DialerError({}) {}", address, error);
637618
}
638619
}
639-
if public_addrs.remove(&address) {
640-
info!(
641-
"Dial {} failed, remove it from network_state.public_addrs",
642-
address
643-
);
644-
}
645620
self.network_state.dial_failed(&address);
646621
}
647622
ServiceError::ProtocolError {
@@ -815,6 +790,10 @@ impl ServiceHandle for EventHandler {
815790
peer_store.remove_disconnected_peer(&session_context.address);
816791
});
817792
}
793+
self.network_state
794+
.observed_addrs
795+
.write()
796+
.remove(&session_context.id);
818797
}
819798
_ => {
820799
info!("p2p service event: {:?}", event);
@@ -937,6 +916,26 @@ impl NetworkService {
937916
protocol_metas.push(disconnect_message_meta);
938917
}
939918

919+
// HolePunching protocol
920+
#[cfg(not(target_family = "wasm"))]
921+
if config
922+
.support_protocols
923+
.contains(&SupportProtocol::HolePuching)
924+
{
925+
let hole_punching_state = Arc::clone(&network_state);
926+
let hole_punching_meta_builder: MetaBuilder = SupportProtocols::HolePuching.into();
927+
let hole_punching_meta = hole_punching_meta_builder
928+
.before_send(crate::compress::compress)
929+
.before_receive(|| Some(Box::new(crate::compress::decompress)))
930+
.service_handle(move || {
931+
ProtocolHandle::Callback(Box::new(
932+
crate::protocols::hole_puching::HolePuching::new(hole_punching_state),
933+
))
934+
})
935+
.build();
936+
protocol_metas.push(hole_punching_meta);
937+
}
938+
940939
let mut service_builder = ServiceBuilder::default();
941940
let yamux_config = YamuxConfig {
942941
max_stream_count: protocol_metas.len(),

network/src/peer_store/peer_store_impl.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,25 @@ impl PeerStore {
217217
self.addr_manager.fetch_random(count, filter)
218218
}
219219

220+
/// Return address that we never connected to, used for penetration.
221+
pub fn fetch_nat_addrs(&mut self, count: usize, required_flags: Flags) -> Vec<AddrInfo> {
222+
// Get info:
223+
// 1. Never connected
224+
// 2. Not already connected
225+
226+
let peers = &self.connected_peers;
227+
228+
let filter = |peer_addr: &AddrInfo| {
229+
required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags))
230+
&& extract_peer_id(&peer_addr.addr)
231+
.map(|peer_id| !peers.contains_key(&peer_id))
232+
.unwrap_or_default()
233+
&& peer_addr.connected(|t| t == 0)
234+
};
235+
236+
self.addr_manager.fetch_random(count, filter)
237+
}
238+
220239
/// Return valid addrs that success connected, used for discovery.
221240
pub fn fetch_random_addrs(&mut self, count: usize, required_flags: Flags) -> Vec<AddrInfo> {
222241
// Get info:

0 commit comments

Comments
 (0)