Skip to content

Commit fc57269

Browse files
committed
feat: add a sync message to synchronize both clocks
1 parent d4e1331 commit fc57269

File tree

8 files changed

+918
-275
lines changed

8 files changed

+918
-275
lines changed

network/src/protocols/hole_punching/component/connection_request.rs

+161-186
Large diffs are not rendered by default.

network/src/protocols/hole_punching/component/connection_request_delivered.rs

+150-55
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::{borrow::Cow, net::SocketAddr};
22

33
use ckb_logger::debug;
4+
use ckb_systemtime::unix_time_as_millis;
45
use ckb_types::{packed, prelude::*};
56
use futures::future::select_ok;
67
use p2p::{
@@ -50,6 +51,9 @@ impl<'a> ConnectionRequestDeliveredProcess<'a> {
5051
.with_context("the listen address count is too large");
5152
}
5253
let route = self.message.route();
54+
if route.len() > 8 || self.message.sync_route().len() > 8 {
55+
return StatusCode::InvalidRoute.with_context("the route length is too long");
56+
}
5357
match route.iter().last() {
5458
Some(next_peer_id_data) => {
5559
let next_peer_id = match PeerId::from_bytes(next_peer_id_data.raw_data().to_vec()) {
@@ -117,70 +121,161 @@ impl<'a> ConnectionRequestDeliveredProcess<'a> {
117121
Err(_) => return StatusCode::InvalidToPeerId.into(),
118122
};
119123

120-
let mut tasks = Vec::new();
121-
let control: ServiceAsyncControl = self.p2p_control.clone();
122-
for listen_addr in self.message.listen_addrs().iter() {
123-
match Multiaddr::try_from(listen_addr.bytes().raw_data().to_vec()) {
124-
Ok(mut addr) => {
125-
if let Some(peer_id) = extract_peer_id(&addr) {
126-
if peer_id != to_peer_id {
127-
continue;
124+
let request_start = self.protocol.inflight_requests.write().remove(&to_peer_id);
125+
126+
match request_start {
127+
Some(start) => {
128+
let now = unix_time_as_millis();
129+
let ttl = now - start;
130+
let sync_route = self.message.sync_route();
131+
132+
match sync_route.iter().last() {
133+
Some(next_peer_id_data) => {
134+
let res = self
135+
.respond_sync(
136+
from_peer_id,
137+
next_peer_id_data.raw_data().to_vec(),
138+
)
139+
.await;
140+
if !res.is_ok() {
141+
return res;
128142
}
129-
} else {
130-
addr.push(Protocol::P2P(Cow::Borrowed(to_peer_id.as_bytes())));
131143
}
132-
match find_type(&addr) {
133-
TransportType::Memory => continue,
134-
TransportType::Onion => continue,
135-
TransportType::Ws => continue,
136-
TransportType::Wss => continue,
137-
TransportType::Tls => continue,
138-
TransportType::Tcp => {
139-
if addr
140-
.iter()
141-
.any(|p| matches!(p, Protocol::Dns4(_) | Protocol::Dns6(_)))
142-
{
143-
let control = control.clone();
144-
// If the address contains DNS4 or DNS6, we just dial it directly
145-
// without NAT traversal
146-
runtime::spawn(async move {
147-
let _ignore = control
148-
.dial(
149-
addr,
150-
TargetProtocol::Single(
151-
SupportProtocols::Identify.protocol_id(),
152-
),
153-
)
154-
.await;
155-
});
156-
} else {
157-
let task = try_nat_traversal(self.bind_addr, addr);
158-
tasks.push(Box::pin(task));
159-
}
160-
}
144+
None => {
145+
return StatusCode::Ignore.with_context("the sync route is empty");
161146
}
162147
}
163-
Err(_) => {
164-
continue;
165-
}
148+
self.try_nat_traversal(to_peer_id, ttl);
149+
150+
Status::ok()
166151
}
152+
None => StatusCode::Ignore.with_context("the request is not in flight"),
153+
}
154+
}
155+
}
156+
}
157+
158+
async fn respond_sync(&self, from_peer_id: PeerId, next_peer_id_data: Vec<u8>) -> Status {
159+
let next_peer_id = match PeerId::from_bytes(next_peer_id_data) {
160+
Ok(peer_id) => peer_id,
161+
Err(_) => {
162+
return StatusCode::InvalidRoute
163+
.with_context("the sync route last peer id is invalid");
164+
}
165+
};
166+
167+
let target_sid = self
168+
.protocol
169+
.network_state
170+
.peer_registry
171+
.read()
172+
.get_key_by_peer_id(&next_peer_id);
173+
174+
match target_sid {
175+
Some(next_peer) => {
176+
let message = self.message.to_entity();
177+
let new_route = packed::BytesVec::new_builder()
178+
.extend(
179+
message
180+
.sync_route()
181+
.into_iter()
182+
.take(self.message.sync_route().len() - 1),
183+
)
184+
.build();
185+
let content = packed::ConnectionSync::new_builder()
186+
.from(message.from())
187+
.to(message.to())
188+
.route(new_route)
189+
.build();
190+
let new_message = packed::HolePunchingMessage::new_builder()
191+
.set(content)
192+
.build()
193+
.as_bytes();
194+
let proto_id = SupportProtocols::HolePunching.protocol_id();
195+
debug!(
196+
"current peer is the target peer {}, respond the sync to next peer {} (id: {})",
197+
from_peer_id, next_peer, next_peer_id
198+
);
199+
if let Err(error) = self
200+
.p2p_control
201+
.send_message_to(next_peer, proto_id, new_message)
202+
.await
203+
{
204+
StatusCode::ForwardError.with_context(error)
205+
} else {
206+
Status::ok()
167207
}
208+
}
209+
None => {
210+
StatusCode::Ignore.with_context("the next peer in the sync route is disconnected")
211+
}
212+
}
213+
}
168214

169-
runtime::spawn(async move {
170-
if let Ok(((stream, addr), _)) = select_ok(tasks).await {
171-
let _ignore = control
172-
.raw_session(
173-
stream,
174-
addr,
175-
RawSessionInfo::outbound(TargetProtocol::Single(
176-
SupportProtocols::Identify.protocol_id(),
177-
)),
178-
)
179-
.await;
215+
fn try_nat_traversal(&self, to_peer_id: PeerId, ttl: u64) {
216+
let mut tasks = Vec::new();
217+
let control: ServiceAsyncControl = self.p2p_control.clone();
218+
for listen_addr in self.message.listen_addrs().iter() {
219+
match Multiaddr::try_from(listen_addr.bytes().raw_data().to_vec()) {
220+
Ok(mut addr) => {
221+
if let Some(peer_id) = extract_peer_id(&addr) {
222+
if peer_id != to_peer_id {
223+
continue;
224+
}
225+
} else {
226+
addr.push(Protocol::P2P(Cow::Borrowed(to_peer_id.as_bytes())));
180227
}
181-
});
182-
Status::ok()
228+
match find_type(&addr) {
229+
TransportType::Memory
230+
| TransportType::Onion
231+
| TransportType::Ws
232+
| TransportType::Wss
233+
| TransportType::Tls => continue,
234+
TransportType::Tcp => {
235+
if addr
236+
.iter()
237+
.any(|p| matches!(p, Protocol::Dns4(_) | Protocol::Dns6(_)))
238+
{
239+
let control = control.clone();
240+
// If the address contains DNS4 or DNS6, we just dial it directly
241+
// without NAT traversal
242+
runtime::spawn(async move {
243+
let _ignore = control
244+
.dial(
245+
addr,
246+
TargetProtocol::Single(
247+
SupportProtocols::Identify.protocol_id(),
248+
),
249+
)
250+
.await;
251+
});
252+
} else {
253+
let task = try_nat_traversal(self.bind_addr, addr);
254+
tasks.push(Box::pin(task));
255+
}
256+
}
257+
}
258+
}
259+
Err(_) => {
260+
continue;
261+
}
183262
}
184263
}
264+
265+
runtime::spawn(async move {
266+
tokio::time::sleep(std::time::Duration::from_millis(ttl / 2)).await;
267+
if let Ok(((stream, addr), _)) = select_ok(tasks).await {
268+
debug!("NAT traversal success, addr: {:?}", addr);
269+
let _ignore = control
270+
.raw_session(
271+
stream,
272+
addr,
273+
RawSessionInfo::outbound(TargetProtocol::Single(
274+
SupportProtocols::Identify.protocol_id(),
275+
)),
276+
)
277+
.await;
278+
}
279+
});
185280
}
186281
}

0 commit comments

Comments
 (0)