Skip to content

Commit e3b00fd

Browse files
committed
chore: implement review feedback
1 parent e2dcc08 commit e3b00fd

File tree

10 files changed

+532
-383
lines changed

10 files changed

+532
-383
lines changed

network/src/network.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -508,7 +508,6 @@ impl NetworkState {
508508
}
509509

510510
// randomly select count addresses from observed_addrs
511-
#[cfg(not(target_family = "wasm"))]
512511
pub(crate) fn observed_addrs(&self, count: usize) -> Vec<Multiaddr> {
513512
let observed_addrs = self
514513
.observed_addrs

network/src/peer_store/peer_store_impl.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ impl PeerStore {
217217
self.addr_manager.fetch_random(count, filter)
218218
}
219219

220-
/// Return address that we never connected to, used for penetration.
220+
/// Return address that we never connected to, used for hole punching.
221221
pub fn fetch_nat_addrs(&mut self, count: usize, required_flags: Flags) -> Vec<AddrInfo> {
222222
// Get info:
223223
// 1. Never connected
@@ -230,7 +230,7 @@ impl PeerStore {
230230
&& extract_peer_id(&peer_addr.addr)
231231
.map(|peer_id| !peers.contains_key(&peer_id))
232232
.unwrap_or_default()
233-
&& peer_addr.connected(|t| t == 0)
233+
&& peer_addr.last_connected_at_ms == 0
234234
};
235235

236236
self.addr_manager.fetch_random(count, filter)

network/src/protocols/hole_punching/component/connection_request.rs

Lines changed: 125 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,80 @@ use crate::{
1414
protocols::{
1515
SupportProtocols,
1616
hole_punching::{
17-
ADDRS_COUNT_LIMIT, HolePunching, MAX_TTL, PENETRATED_INTERVAL,
17+
ADDRS_COUNT_LIMIT, HolePunching, MAX_HOPS, PENETRATED_INTERVAL,
1818
component::{forward_request, init_delivered},
1919
status::{Status, StatusCode},
2020
},
2121
},
2222
};
2323

24+
struct RequestContent {
25+
from: PeerId,
26+
to: PeerId,
27+
listen_addrs: Vec<Multiaddr>,
28+
route: Vec<PeerId>,
29+
max_hops: u8,
30+
}
31+
32+
impl TryFrom<&packed::ConnectionRequestReader<'_>> for RequestContent {
33+
type Error = Status;
34+
35+
fn try_from(value: &packed::ConnectionRequestReader<'_>) -> Result<Self, Self::Error> {
36+
let from = PeerId::from_bytes(value.from().raw_data().to_vec()).map_err(|_| {
37+
StatusCode::InvalidFromPeerId.with_context("the from peer id is invalid")
38+
})?;
39+
let to = PeerId::from_bytes(value.to().raw_data().to_vec())
40+
.map_err(|_| StatusCode::InvalidToPeerId.with_context("the to peer id is invalid"))?;
41+
let listen_addrs: Vec<Multiaddr> = value
42+
.listen_addrs()
43+
.iter()
44+
.map(
45+
|raw| match Multiaddr::try_from(raw.bytes().raw_data().to_vec()) {
46+
Ok(mut addr) => {
47+
if let Some(peer_id) = extract_peer_id(&addr) {
48+
if peer_id != from {
49+
return Err(StatusCode::InvalidListenAddrLen
50+
.with_context("peer id in listen address is invalid"));
51+
}
52+
} else {
53+
addr.push(Protocol::P2P(Cow::Borrowed(from.as_bytes())));
54+
}
55+
Ok(addr)
56+
}
57+
Err(_) => Err(StatusCode::InvalidListenAddrLen
58+
.with_context("the listen address is invalid")),
59+
},
60+
)
61+
.collect::<Result<Vec<_>, _>>()?;
62+
63+
let route: Vec<PeerId> = value
64+
.route()
65+
.iter()
66+
.map(|raw| {
67+
PeerId::from_bytes(raw.raw_data().to_vec()).map_err(|_| {
68+
StatusCode::InvalidRoute.with_context("the route peer id is invalid")
69+
})
70+
})
71+
.collect::<Result<Vec<_>, _>>()?;
72+
73+
let max_hops: u8 = value.max_hops().into();
74+
75+
Ok(Self {
76+
from,
77+
to,
78+
listen_addrs,
79+
route,
80+
max_hops,
81+
})
82+
}
83+
}
84+
2485
pub(crate) struct ConnectionRequestProcess<'a> {
2586
message: packed::ConnectionRequestReader<'a>,
2687
protocol: &'a HolePunching,
2788
peer: PeerIndex,
2889
p2p_control: &'a ServiceAsyncControl,
90+
msg_item_id: u32,
2991
}
3092

3193
impl<'a> ConnectionRequestProcess<'a> {
@@ -34,72 +96,68 @@ impl<'a> ConnectionRequestProcess<'a> {
3496
protocol: &'a HolePunching,
3597
peer: PeerIndex,
3698
p2p_control: &'a ServiceAsyncControl,
99+
msg_item_id: u32,
37100
) -> Self {
38101
Self {
39102
message,
40103
protocol,
41104
peer,
42105
p2p_control,
106+
msg_item_id,
43107
}
44108
}
45109

46110
pub(crate) async fn execute(self) -> Status {
47-
if self.message.listen_addrs().len() > ADDRS_COUNT_LIMIT
48-
|| self.message.listen_addrs().is_empty()
49-
{
111+
let content = match RequestContent::try_from(&self.message) {
112+
Ok(content) => content,
113+
Err(status) => return status,
114+
};
115+
if content.listen_addrs.len() > ADDRS_COUNT_LIMIT || content.listen_addrs.is_empty() {
50116
return StatusCode::InvalidListenAddrLen
51117
.with_context("the listen address count is too large or empty");
52118
}
53-
let ttl: u8 = self.message.ttl().into();
54-
if ttl > MAX_TTL {
119+
120+
if content.max_hops > MAX_HOPS {
55121
return StatusCode::InvalidMaxTTL.into();
56122
}
57-
if self.message.route().len() > 8 {
123+
if content.route.len() > 8 {
58124
return StatusCode::InvalidRoute.with_context("the route length is too long");
59125
}
60126

61127
let self_peer_id = self.protocol.network_state.local_peer_id();
62-
for peer_id_bytes in self.message.route().iter() {
63-
match PeerId::from_bytes(peer_id_bytes.raw_data().to_vec()) {
64-
Ok(peer_id) => {
65-
if self_peer_id == &peer_id {
66-
return StatusCode::Ignore.with_context("the message is passed, ignore it");
67-
}
68-
}
69-
Err(_) => {
70-
return StatusCode::InvalidRoute.into();
71-
}
72-
}
128+
if content.route.contains(self_peer_id) {
129+
return StatusCode::Ignore.with_context("the message is passed, ignore it");
73130
}
74131

75-
let from_peer_id = match PeerId::from_bytes(self.message.from().raw_data().to_vec()) {
76-
Ok(peer_id) => {
77-
if self_peer_id == &peer_id {
78-
return StatusCode::Ignore.with_context("the message is passed, ignore it");
79-
}
80-
peer_id
81-
}
82-
Err(_) => {
83-
return StatusCode::InvalidFromPeerId.into();
84-
}
85-
};
86-
let to_peer_id = match PeerId::from_bytes(self.message.to().raw_data().to_vec()) {
87-
Ok(peer_id) => peer_id,
88-
Err(_) => {
89-
return StatusCode::InvalidToPeerId.into();
90-
}
91-
};
132+
if self
133+
.protocol
134+
.forward_rate_limiter
135+
.check_key(&(content.from.clone(), content.to.clone(), self.msg_item_id))
136+
.is_err()
137+
{
138+
debug!(
139+
"from: {}, to {}, item_name: {}, rate limit is reached",
140+
content.from, content.to, "ConnectionRequest",
141+
);
142+
return StatusCode::TooManyRequests.with_context("ConnectionRequest");
143+
}
92144

93-
if self_peer_id == &to_peer_id {
94-
self.respond_delivered(from_peer_id, &to_peer_id).await
95-
} else if ttl == 0u8 {
96-
StatusCode::ReachedMaxTTL.into()
145+
if self_peer_id == &content.to {
146+
self.respond_delivered(content.from, &content.to, content.listen_addrs)
147+
.await
148+
} else if content.max_hops == 0u8 {
149+
StatusCode::ReachedMaxHops.into()
97150
} else {
98-
self.forward_message(self_peer_id, &to_peer_id).await
151+
self.forward_message(self_peer_id, &content.to).await
99152
}
100153
}
101154

102-
async fn respond_delivered(&self, from_peer_id: PeerId, to_peer_id: &PeerId) -> Status {
155+
async fn respond_delivered(
156+
&self,
157+
from_peer_id: PeerId,
158+
to_peer_id: &PeerId,
159+
remote_listens: Vec<Multiaddr>,
160+
) -> Status {
103161
if let Some((_, t)) = self.protocol.pending_delivered.read().get(&from_peer_id) {
104162
let now = unix_time_as_millis();
105163
if now - t < PENETRATED_INTERVAL {
@@ -140,6 +198,22 @@ impl<'a> ConnectionRequestProcess<'a> {
140198
to_peer_id
141199
);
142200

201+
let remote_listens: Vec<Multiaddr> = remote_listens
202+
.into_iter()
203+
.filter_map(|addr| match find_type(&addr) {
204+
TransportType::Memory
205+
| TransportType::Onion
206+
| TransportType::Ws
207+
| TransportType::Wss
208+
| TransportType::Tls => None,
209+
TransportType::Tcp => Some(addr),
210+
})
211+
.collect();
212+
213+
if remote_listens.is_empty() {
214+
return StatusCode::Ignore.with_context("remote listen address is empty");
215+
}
216+
143217
if let Err(error) = self
144218
.p2p_control
145219
.send_message_to(self.peer, proto_id, new_message)
@@ -148,35 +222,6 @@ impl<'a> ConnectionRequestProcess<'a> {
148222
return StatusCode::ForwardError.with_context(error);
149223
}
150224

151-
let remote_listens = self
152-
.message
153-
.listen_addrs()
154-
.iter()
155-
.filter_map(
156-
|raw| match Multiaddr::try_from(raw.bytes().raw_data().to_vec()) {
157-
Ok(mut addr) => {
158-
if let Some(peer_id) = extract_peer_id(&addr) {
159-
if peer_id != from_peer_id {
160-
return None;
161-
}
162-
} else {
163-
addr.push(Protocol::P2P(Cow::Borrowed(from_peer_id.as_bytes())));
164-
}
165-
166-
match find_type(&addr) {
167-
TransportType::Memory
168-
| TransportType::Onion
169-
| TransportType::Ws
170-
| TransportType::Wss
171-
| TransportType::Tls => None,
172-
TransportType::Tcp => Some(addr),
173-
}
174-
}
175-
Err(_) => None,
176-
},
177-
)
178-
.collect();
179-
180225
let mut pending_delivered = self.protocol.pending_delivered.write();
181226
let now = unix_time_as_millis();
182227
pending_delivered.insert(from_peer_id, (remote_listens, now));
@@ -220,11 +265,20 @@ impl<'a> ConnectionRequestProcess<'a> {
220265
"target peer {} is not found, broadcast the request to more peers",
221266
to_peer_id
222267
);
268+
269+
// Broadcast using gossip while removing the source of the message
223270
let sid = self.peer;
271+
let mut init = 0usize;
224272
if let Err(error) = self
225273
.p2p_control
226274
.filter_broadcast(
227-
TargetSession::Filter(Box::new(move |id| id != &sid)),
275+
TargetSession::Filter(Box::new(move |id| {
276+
if id == &sid {
277+
return false;
278+
}
279+
init += 1;
280+
init % 3 != 0
281+
})),
228282
proto_id,
229283
new_message,
230284
)

0 commit comments

Comments
 (0)