Skip to content

Commit e2dcc08

Browse files
committed
feat: add random jitter to nat traversal and check tcp stream io error before return
1 parent 2f73b1a commit e2dcc08

File tree

3 files changed

+120
-41
lines changed

3 files changed

+120
-41
lines changed

network/src/protocols/hole_punching/component/mod.rs

Lines changed: 108 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,32 @@ use tokio::net::{TcpSocket, TcpStream};
2121
use crate::{PeerId, protocols::hole_punching::MAX_TTL};
2222

2323
// Attempt to establish a TCP connection with NAT traversal
24+
//
25+
// Why is random jitter time added in NAT traversal?
26+
//
27+
// 1. Prevents synchronization problems
28+
// - Without jitter, both parties might always send connection requests simultaneously
29+
// - When requests collide rather than complement each other, connection establishment fails
30+
//
31+
// 2. Avoids NAT filtering
32+
// - NAT devices often restrict or block perfectly regular connection attempts
33+
// - Random intervals make connection attempts appear more natural, avoiding detection
34+
// - Helps bypass NAT devices that might interpret regular patterns as scanning or attacks
35+
//
36+
// 3. Compensates for network uncertainties
37+
// - Real networks have inherent variations in packet delivery times
38+
// - System scheduling and network congestion create unpredictable delays
39+
// - Jitter accounts for these natural timing variations
40+
//
41+
// 4. Increases connection success probability
42+
// - Different system clocks and startup times can cause connection attempts to miss each other
43+
// - Random jitter expands the time window when connection attempts might overlap
44+
// - This "window expansion" strategy improves connection success rates
45+
//
46+
// 5. Breaks repetitive failure patterns
47+
// - If a specific timing pattern causes connection failure
48+
// - Using the same fixed interval would repeat the same failure
49+
// - Randomness helps break out of these failure modes
2450
#[cfg(not(target_family = "wasm"))]
2551
pub(crate) async fn try_nat_traversal(
2652
bind_addr: Option<SocketAddr>,
@@ -33,60 +59,103 @@ pub(crate) async fn try_nat_traversal(
3359
return Err(std::io::ErrorKind::InvalidInput.into());
3460
}
3561
};
36-
let now = Instant::now();
37-
let mut count = 0;
38-
loop {
39-
count += 1;
40-
if count / 5 > 30 && now.elapsed() > Duration::from_secs(30) {
41-
debug!("NAT traversal timed out");
42-
return Err(std::io::ErrorKind::TimedOut.into());
43-
}
44-
let socket = match bind_addr {
45-
Some(listen_addr) => match (listen_addr.ip(), net_addr.ip()) {
46-
(IpAddr::V4(_), IpAddr::V4(_)) => {
47-
let socket = TcpSocket::new_v4().unwrap();
48-
socket.set_reuseaddr(true).unwrap();
49-
#[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))]
50-
socket.set_reuseport(true).unwrap();
51-
socket.bind(listen_addr).unwrap();
52-
socket
53-
}
54-
(IpAddr::V6(_), IpAddr::V6(_)) => {
55-
let socket = TcpSocket::new_v6().unwrap();
56-
socket.set_reuseaddr(true).unwrap();
57-
#[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))]
58-
socket.set_reuseport(true).unwrap();
59-
socket.bind(listen_addr).unwrap();
60-
socket
61-
}
62-
(IpAddr::V4(_), IpAddr::V6(_)) => TcpSocket::new_v6().unwrap(),
63-
(IpAddr::V6(_), IpAddr::V4(_)) => TcpSocket::new_v4().unwrap(),
64-
},
65-
None => match net_addr.ip() {
66-
IpAddr::V4(_) => TcpSocket::new_v4().unwrap(),
67-
IpAddr::V6(_) => TcpSocket::new_v6().unwrap(),
68-
},
62+
63+
// Use a fixed interval but add a small amount of randomness
64+
let base_retry_interval = Duration::from_millis(200);
65+
66+
// total time
67+
let timeout_duration = Duration::from_secs(30);
68+
let start_time = Instant::now();
69+
let mut retry_count = 0u32;
70+
while start_time.elapsed() < timeout_duration {
71+
retry_count += 1;
72+
73+
// Add a small amount of random jitter (±25ms) to avoid conflicts
74+
// caused by continuous precise synchronization
75+
let jitter = Duration::from_millis(rand::random::<u64>() % 50);
76+
let actual_interval = if rand::random::<bool>() {
77+
base_retry_interval + jitter
78+
} else {
79+
base_retry_interval.saturating_sub(jitter)
6980
};
7081

82+
let socket = create_socket(bind_addr, net_addr)?;
83+
7184
match runtime::timeout(
7285
std::time::Duration::from_millis(200),
7386
socket.connect(net_addr),
7487
)
7588
.await
7689
{
77-
Ok(Ok(stream)) => break Ok((stream, addr)),
90+
Ok(Ok(stream)) => {
91+
// try get the stored error in the underlying socket
92+
// if the socket is not connected, it will return an error
93+
if let Err(err) = check_connection(&stream) {
94+
debug!("Failed to connect to NAT(base check): {}", err);
95+
}
96+
return Ok((stream, addr));
97+
}
7898
Err(err) => {
79-
debug!("Failed to connect to NAT: {}", err);
80-
continue;
99+
debug!("Failed to connect to NAT(timeout): {}", err);
81100
}
82101
Ok(Err(err)) => {
83102
if err.kind() == std::io::ErrorKind::AddrNotAvailable {
84-
break Err(err);
103+
return Err(err);
85104
}
86-
debug!("Failed to connect to NAT: {}, {}", err.kind(), err);
87-
continue;
105+
debug!(
106+
"Failed to connect to NAT(other error): {}, {}",
107+
err.kind(),
108+
err
109+
);
88110
}
89111
}
112+
runtime::delay_for(actual_interval).await;
113+
}
114+
115+
debug!("Failed to connect to NAT after {} retries", retry_count);
116+
Err(std::io::ErrorKind::TimedOut.into())
117+
}
118+
119+
#[cfg(not(target_family = "wasm"))]
120+
fn create_socket(
121+
bind_addr: Option<SocketAddr>,
122+
target_addr: SocketAddr,
123+
) -> Result<TcpSocket, std::io::Error> {
124+
let socket = match bind_addr {
125+
Some(listen_addr) => match (listen_addr.ip(), target_addr.ip()) {
126+
(IpAddr::V4(_), IpAddr::V4(_)) => {
127+
let socket = TcpSocket::new_v4()?;
128+
socket.set_reuseaddr(true)?;
129+
#[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))]
130+
socket.set_reuseport(true)?;
131+
socket.bind(listen_addr)?;
132+
socket
133+
}
134+
(IpAddr::V6(_), IpAddr::V6(_)) => {
135+
let socket = TcpSocket::new_v6()?;
136+
socket.set_reuseaddr(true)?;
137+
#[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))]
138+
socket.set_reuseport(true)?;
139+
socket.bind(listen_addr)?;
140+
socket
141+
}
142+
(IpAddr::V4(_), IpAddr::V6(_)) => TcpSocket::new_v6()?,
143+
(IpAddr::V6(_), IpAddr::V4(_)) => TcpSocket::new_v4()?,
144+
},
145+
None => match target_addr.ip() {
146+
IpAddr::V4(_) => TcpSocket::new_v4()?,
147+
IpAddr::V6(_) => TcpSocket::new_v6()?,
148+
},
149+
};
150+
Ok(socket)
151+
}
152+
153+
#[cfg(not(target_family = "wasm"))]
154+
fn check_connection(stream: &TcpStream) -> Result<(), std::io::Error> {
155+
match stream.take_error() {
156+
Ok(Some(err)) => Err(err),
157+
Ok(None) => Ok(()),
158+
Err(err) => Err(err),
90159
}
91160
}
92161

network/src/protocols/identify/mod.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,17 @@ impl Callback for IdentifyCallback {
492492

493493
/// Get local listen addresses
494494
fn local_listen_addrs(&mut self) -> Vec<Multiaddr> {
495-
self.listen_addrs()
495+
let mut listens = self.listen_addrs();
496+
497+
if listens.len() < MAX_RETURN_LISTEN_ADDRS {
498+
let observe_addrs = self
499+
.network_state
500+
.observed_addrs(MAX_RETURN_LISTEN_ADDRS - listens.len());
501+
listens.extend(observe_addrs);
502+
listens
503+
} else {
504+
listens
505+
}
496506
}
497507

498508
fn add_remote_listen_addrs(&mut self, session: &SessionContext, addrs: Vec<Multiaddr>) {

network/src/protocols/support_protocols.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ impl SupportProtocols {
9696
SupportProtocols::Alert => "/ckb/alt",
9797
SupportProtocols::LightClient => "/ckb/lightclient",
9898
SupportProtocols::Filter => "/ckb/filter",
99-
SupportProtocols::HolePunching => "/ckb/HolePunching",
99+
SupportProtocols::HolePunching => "/ckb/holepunching",
100100
}
101101
.to_owned()
102102
}

0 commit comments

Comments
 (0)