Skip to content

Commit c45b12b

Browse files
committed
feat: add a hote punching protocol try use on nat traversal
1 parent f0bb922 commit c45b12b

File tree

11 files changed

+1098
-105
lines changed

11 files changed

+1098
-105
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

network/src/network.rs

Lines changed: 61 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use futures::{Future, channel::mpsc::Sender};
3030
use ipnetwork::IpNetwork;
3131
use p2p::{
3232
SessionId, async_trait,
33-
builder::ServiceBuilder,
33+
builder::{MetaBuilder, ServiceBuilder},
3434
bytes::Bytes,
3535
context::{ServiceContext, SessionContext},
3636
error::{DialerErrorKind, HandshakeErrorKind, ProtocolHandleErrorKind, SendErrorKind},
@@ -74,10 +74,9 @@ pub struct NetworkState {
7474
/// Node listened addresses
7575
pub(crate) listened_addrs: RwLock<Vec<Multiaddr>>,
7676
dialing_addrs: RwLock<HashMap<PeerId, Instant>>,
77-
/// Node public addresses,
78-
/// includes manually public addrs and remote peer observed addrs
79-
public_addrs: RwLock<HashSet<Multiaddr>>,
80-
pending_observed_addrs: RwLock<HashSet<Multiaddr>>,
77+
/// Node public addresses by config
78+
public_addrs: HashSet<Multiaddr>,
79+
observed_addrs: RwLock<HashMap<PeerIndex, Multiaddr>>,
8180
local_private_key: secio::SecioKeyPair,
8281
local_peer_id: PeerId,
8382
pub(crate) bootnodes: Vec<Multiaddr>,
@@ -139,9 +138,9 @@ impl NetworkState {
139138
bootnodes,
140139
peer_registry: RwLock::new(peer_registry),
141140
dialing_addrs: RwLock::new(HashMap::default()),
142-
public_addrs: RwLock::new(public_addrs),
141+
public_addrs,
143142
listened_addrs: RwLock::new(Vec::new()),
144-
pending_observed_addrs: RwLock::new(HashSet::default()),
143+
observed_addrs: RwLock::new(HashMap::default()),
145144
local_private_key,
146145
local_peer_id,
147146
active: AtomicBool::new(true),
@@ -187,9 +186,9 @@ impl NetworkState {
187186
bootnodes,
188187
peer_registry: RwLock::new(peer_registry),
189188
dialing_addrs: RwLock::new(HashMap::default()),
190-
public_addrs: RwLock::new(public_addrs),
189+
public_addrs,
191190
listened_addrs: RwLock::new(Vec::new()),
192-
pending_observed_addrs: RwLock::new(HashSet::default()),
191+
observed_addrs: RwLock::new(HashMap::default()),
193192
local_private_key,
194193
local_peer_id,
195194
active: AtomicBool::new(true),
@@ -338,12 +337,14 @@ impl NetworkState {
338337
}
339338

340339
pub(crate) fn public_addrs(&self, count: usize) -> Vec<Multiaddr> {
341-
self.public_addrs
342-
.read()
343-
.iter()
344-
.take(count)
345-
.cloned()
346-
.collect()
340+
if self.public_addrs.len() <= count {
341+
return self.public_addrs.iter().cloned().collect();
342+
} else {
343+
self.public_addrs
344+
.iter()
345+
.cloned()
346+
.choose_multiple(&mut rand::thread_rng(), count)
347+
}
347348
}
348349

349350
pub(crate) fn connection_status(&self) -> ConnectionStatus {
@@ -392,7 +393,7 @@ impl NetworkState {
392393
trace!("Do not dial self: {:?}, {}", peer_id, addr);
393394
return false;
394395
}
395-
if self.public_addrs.read().contains(addr) {
396+
if self.public_addrs.contains(addr) {
396397
trace!(
397398
"Do not dial listened address(self): {:?}, {}",
398399
peer_id, addr
@@ -500,40 +501,28 @@ impl NetworkState {
500501
}
501502
}
502503

503-
/// this method is intent to check observed addr by dial to self
504-
pub(crate) fn try_dial_observed_addrs(&self, p2p_control: &ServiceControl) {
505-
let mut pending_observed_addrs = self.pending_observed_addrs.write();
506-
if pending_observed_addrs.is_empty() {
507-
let addrs = self.public_addrs.read();
508-
if addrs.is_empty() {
509-
return;
510-
}
511-
// random get addr
512-
if let Some(addr) = addrs.iter().choose(&mut rand::thread_rng()) {
513-
if let Err(err) = p2p_control.dial(
514-
addr.clone(),
515-
TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
516-
) {
517-
trace!("try_dial_observed_addrs {err} failed in public address")
518-
}
519-
}
520-
} else {
521-
for addr in pending_observed_addrs.drain() {
522-
trace!("try dial observed addr: {:?}", addr);
523-
if let Err(err) = p2p_control.dial(
524-
addr,
525-
TargetProtocol::Single(SupportProtocols::Identify.protocol_id()),
526-
) {
527-
trace!("try_dial_observed_addrs {err} failed in pending observed addresses")
528-
}
529-
}
530-
}
504+
/// add observed address for identify protocol
505+
pub(crate) fn add_observed_addr(&self, session_id: SessionId, addr: Multiaddr) {
506+
let mut pending_observed_addrs = self.observed_addrs.write();
507+
pending_observed_addrs.insert(session_id, addr);
531508
}
532509

533-
/// add observed address for identify protocol
534-
pub(crate) fn add_observed_addrs(&self, iter: impl Iterator<Item = Multiaddr>) {
535-
let mut pending_observed_addrs = self.pending_observed_addrs.write();
536-
pending_observed_addrs.extend(iter)
510+
// randomly select count addresses from observed_addrs
511+
#[cfg(not(target_family = "wasm"))]
512+
pub(crate) fn observed_addrs(&self, count: usize) -> Vec<Multiaddr> {
513+
let observed_addrs = self
514+
.observed_addrs
515+
.read()
516+
.values()
517+
.cloned()
518+
.collect::<HashSet<_>>();
519+
if observed_addrs.len() <= count {
520+
return observed_addrs.into_iter().collect();
521+
} else {
522+
observed_addrs
523+
.into_iter()
524+
.choose_multiple(&mut rand::thread_rng(), count)
525+
}
537526
}
538527

539528
/// Network message processing controller, default is true, if false, discard any received messages
@@ -613,19 +602,11 @@ impl ServiceHandle for EventHandler {
613602
async fn handle_error(&mut self, context: &mut ServiceContext, error: ServiceError) {
614603
match error {
615604
ServiceError::DialerError { address, error } => {
616-
let mut public_addrs = self.network_state.public_addrs.write();
617-
618605
match error {
619606
DialerErrorKind::HandshakeError(HandshakeErrorKind::SecioError(
620607
SecioError::ConnectSelf,
621608
)) => {
622609
debug!("dial observed address success: {:?}", address);
623-
if let Some(ip) = multiaddr_to_socketaddr(&address) {
624-
if is_reachable(ip.ip()) {
625-
public_addrs.insert(address);
626-
}
627-
}
628-
return;
629610
}
630611
DialerErrorKind::IoError(e)
631612
if e.kind() == std::io::ErrorKind::AddrNotAvailable =>
@@ -636,12 +617,6 @@ impl ServiceHandle for EventHandler {
636617
debug!("DialerError({}) {}", address, error);
637618
}
638619
}
639-
if public_addrs.remove(&address) {
640-
info!(
641-
"Dial {} failed, remove it from network_state.public_addrs",
642-
address
643-
);
644-
}
645620
self.network_state.dial_failed(&address);
646621
}
647622
ServiceError::ProtocolError {
@@ -815,6 +790,10 @@ impl ServiceHandle for EventHandler {
815790
peer_store.remove_disconnected_peer(&session_context.address);
816791
});
817792
}
793+
self.network_state
794+
.observed_addrs
795+
.write()
796+
.remove(&session_context.id);
818797
}
819798
_ => {
820799
info!("p2p service event: {:?}", event);
@@ -937,6 +916,26 @@ impl NetworkService {
937916
protocol_metas.push(disconnect_message_meta);
938917
}
939918

919+
// HolePunching protocol
920+
#[cfg(not(target_family = "wasm"))]
921+
if config
922+
.support_protocols
923+
.contains(&SupportProtocol::HolePunching)
924+
{
925+
let hole_punching_state = Arc::clone(&network_state);
926+
let hole_punching_meta_builder: MetaBuilder = SupportProtocols::HolePunching.into();
927+
let hole_punching_meta = hole_punching_meta_builder
928+
.before_send(crate::compress::compress)
929+
.before_receive(|| Some(Box::new(crate::compress::decompress)))
930+
.service_handle(move || {
931+
ProtocolHandle::Callback(Box::new(
932+
crate::protocols::hole_punching::HolePunching::new(hole_punching_state),
933+
))
934+
})
935+
.build();
936+
protocol_metas.push(hole_punching_meta);
937+
}
938+
940939
let mut service_builder = ServiceBuilder::default();
941940
let yamux_config = YamuxConfig {
942941
max_stream_count: protocol_metas.len(),

network/src/peer_store/peer_store_impl.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,25 @@ impl PeerStore {
217217
self.addr_manager.fetch_random(count, filter)
218218
}
219219

220+
/// Return address that we never connected to, used for penetration.
221+
pub fn fetch_nat_addrs(&mut self, count: usize, required_flags: Flags) -> Vec<AddrInfo> {
222+
// Get info:
223+
// 1. Never connected
224+
// 2. Not already connected
225+
226+
let peers = &self.connected_peers;
227+
228+
let filter = |peer_addr: &AddrInfo| {
229+
required_flags_filter(required_flags, Flags::from_bits_truncate(peer_addr.flags))
230+
&& extract_peer_id(&peer_addr.addr)
231+
.map(|peer_id| !peers.contains_key(&peer_id))
232+
.unwrap_or_default()
233+
&& peer_addr.connected(|t| t == 0)
234+
};
235+
236+
self.addr_manager.fetch_random(count, filter)
237+
}
238+
220239
/// Return valid addrs that success connected, used for discovery.
221240
pub fn fetch_random_addrs(&mut self, count: usize, required_flags: Flags) -> Vec<AddrInfo> {
222241
// Get info:

network/src/protocols/identify/mod.rs

Lines changed: 5 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use p2p::{
99
bytes::Bytes,
1010
context::{ProtocolContext, ProtocolContextMutRef, SessionContext},
1111
multiaddr::{Multiaddr, Protocol},
12-
service::{SessionType, TargetProtocol},
12+
service::TargetProtocol,
1313
traits::ServiceProtocol,
1414
utils::{extract_peer_id, is_reachable, multiaddr_to_socketaddr},
1515
};
@@ -77,7 +77,7 @@ pub trait Callback: Clone + Send {
7777
/// Add remote peer's listen addresses
7878
fn add_remote_listen_addrs(&mut self, session: &SessionContext, addrs: Vec<Multiaddr>);
7979
/// Add our address observed by remote peer
80-
fn add_observed_addr(&mut self, addr: Multiaddr, ty: SessionType) -> MisbehaveResult;
80+
fn add_observed_addr(&mut self, addr: Multiaddr, sessioin_id: SessionId) -> MisbehaveResult;
8181
/// Report misbehavior
8282
fn misbehave(&mut self, session: &SessionContext, kind: Misbehavior) -> MisbehaveResult;
8383
}
@@ -173,7 +173,7 @@ impl<T: Callback> IdentifyProtocol<T> {
173173
return MisbehaveResult::Continue;
174174
}
175175

176-
self.callback.add_observed_addr(observed, info.session.ty)
176+
self.callback.add_observed_addr(observed, info.session.id)
177177
}
178178
}
179179

@@ -528,42 +528,14 @@ impl Callback for IdentifyCallback {
528528
})
529529
}
530530

531-
fn add_observed_addr(&mut self, mut addr: Multiaddr, ty: SessionType) -> MisbehaveResult {
532-
if ty.is_inbound() {
533-
// The address already been discovered by other peer
534-
return MisbehaveResult::Continue;
535-
}
536-
537-
// observed addr is not a reachable ip
538-
if !multiaddr_to_socketaddr(&addr)
539-
.map(|socket_addr| is_reachable(socket_addr.ip()))
540-
.unwrap_or(false)
541-
{
542-
return MisbehaveResult::Continue;
543-
}
544-
531+
fn add_observed_addr(&mut self, mut addr: Multiaddr, session_id: SessionId) -> MisbehaveResult {
545532
if extract_peer_id(&addr).is_none() {
546533
addr.push(Protocol::P2P(Cow::Borrowed(
547534
self.network_state.local_peer_id().as_bytes(),
548535
)))
549536
}
550537

551-
let source_addr = addr.clone();
552-
let observed_addrs_iter = self
553-
.listen_addrs()
554-
.into_iter()
555-
.filter_map(|listen_addr| multiaddr_to_socketaddr(&listen_addr))
556-
.map(|socket_addr| {
557-
addr.iter()
558-
.map(|proto| match proto {
559-
Protocol::Tcp(_) => Protocol::Tcp(socket_addr.port()),
560-
value => value,
561-
})
562-
.collect::<Multiaddr>()
563-
})
564-
.chain(::std::iter::once(source_addr));
565-
566-
self.network_state.add_observed_addrs(observed_addrs_iter);
538+
self.network_state.add_observed_addr(session_id, addr);
567539
// NOTE: for future usage
568540
MisbehaveResult::Continue
569541
}

network/src/protocols/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ pub(crate) mod identify;
55
pub(crate) mod ping;
66
pub(crate) mod support_protocols;
77

8+
#[cfg(not(target_family = "wasm"))]
9+
pub(crate) mod hole_punching;
10+
811
#[cfg(test)]
912
mod tests;
1013

network/src/protocols/support_protocols.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ pub enum SupportProtocols {
5656
LightClient,
5757
/// Filter: A protocol used for client side block data filtering.
5858
Filter,
59+
/// HolePunching: A protocol used to connect peers behind firewalls or NAT routers.
60+
HolePunching,
5961
}
6062

6163
impl SupportProtocols {
@@ -74,6 +76,7 @@ impl SupportProtocols {
7476
SupportProtocols::Alert => 110,
7577
SupportProtocols::LightClient => 120,
7678
SupportProtocols::Filter => 121,
79+
SupportProtocols::HolePunching => 130,
7780
}
7881
.into()
7982
}
@@ -93,6 +96,7 @@ impl SupportProtocols {
9396
SupportProtocols::Alert => "/ckb/alt",
9497
SupportProtocols::LightClient => "/ckb/lightclient",
9598
SupportProtocols::Filter => "/ckb/filter",
99+
SupportProtocols::HolePunching => "/ckb/HolePunching",
96100
}
97101
.to_owned()
98102
}
@@ -117,6 +121,7 @@ impl SupportProtocols {
117121
SupportProtocols::RelayV3 => vec!["2".to_owned(), LASTEST_VERSION.to_owned()],
118122
SupportProtocols::LightClient => vec!["2".to_owned(), LASTEST_VERSION.to_owned()],
119123
SupportProtocols::Filter => vec!["2".to_owned(), LASTEST_VERSION.to_owned()],
124+
SupportProtocols::HolePunching => vec!["2".to_owned(), LASTEST_VERSION.to_owned()],
120125
}
121126
}
122127

@@ -134,6 +139,7 @@ impl SupportProtocols {
134139
SupportProtocols::Alert => 128 * 1024, // 128 KB
135140
SupportProtocols::LightClient => 2 * 1024 * 1024, // 2 MB
136141
SupportProtocols::Filter => 2 * 1024 * 1024, // 2 MB
142+
SupportProtocols::HolePunching => 512 * 1024, // 512 KB
137143
}
138144
}
139145

network/src/services/outbound_peer.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -196,11 +196,6 @@ impl OutboundPeerService {
196196
}
197197
}
198198

199-
fn try_dial_observed(&self) {
200-
self.network_state
201-
.try_dial_observed_addrs(&self.p2p_control);
202-
}
203-
204199
fn update_outbound_connected_ms(&mut self) {
205200
if self.update_outbound_connected_count > 10 {
206201
let connected_outbounds: Vec<p2p::multiaddr::Multiaddr> =
@@ -256,8 +251,6 @@ impl Future for OutboundPeerService {
256251
self.dial_feeler();
257252
// keep outbound peer is enough
258253
self.try_dial_peers();
259-
// try dial observed addrs
260-
self.try_dial_observed();
261254
// Keep connected nodes up to date in the peer store
262255
self.update_outbound_connected_ms();
263256
}

resource/ckb.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ discovery_local_address = false # {{
105105
bootnode_mode = false
106106

107107
# Supported protocols list, only "Sync" and "Identify" are mandatory, others are optional
108-
support_protocols = ["Ping", "Discovery", "Identify", "Feeler", "DisconnectMessage", "Sync", "Relay", "Time", "Alert", "LightClient", "Filter"]
108+
support_protocols = ["Ping", "Discovery", "Identify", "Feeler", "DisconnectMessage", "Sync", "Relay", "Time", "Alert", "LightClient", "Filter", "HolePunching"]
109109

110110
# [network.sync.header_map]
111111
# memory_limit = "256MB"

0 commit comments

Comments
 (0)