@@ -14,18 +14,80 @@ use crate::{
1414 protocols:: {
1515 SupportProtocols ,
1616 hole_punching:: {
17- ADDRS_COUNT_LIMIT , HolePunching , MAX_TTL , PENETRATED_INTERVAL ,
17+ ADDRS_COUNT_LIMIT , HolePunching , MAX_HOPS , PENETRATED_INTERVAL ,
1818 component:: { forward_request, init_delivered} ,
1919 status:: { Status , StatusCode } ,
2020 } ,
2121 } ,
2222} ;
2323
24+ struct RequestContent {
25+ from : PeerId ,
26+ to : PeerId ,
27+ listen_addrs : Vec < Multiaddr > ,
28+ route : Vec < PeerId > ,
29+ max_hops : u8 ,
30+ }
31+
32+ impl TryFrom < & packed:: ConnectionRequestReader < ' _ > > for RequestContent {
33+ type Error = Status ;
34+
35+ fn try_from ( value : & packed:: ConnectionRequestReader < ' _ > ) -> Result < Self , Self :: Error > {
36+ let from = PeerId :: from_bytes ( value. from ( ) . raw_data ( ) . to_vec ( ) ) . map_err ( |_| {
37+ StatusCode :: InvalidFromPeerId . with_context ( "the from peer id is invalid" )
38+ } ) ?;
39+ let to = PeerId :: from_bytes ( value. to ( ) . raw_data ( ) . to_vec ( ) )
40+ . map_err ( |_| StatusCode :: InvalidToPeerId . with_context ( "the to peer id is invalid" ) ) ?;
41+ let listen_addrs: Vec < Multiaddr > = value
42+ . listen_addrs ( )
43+ . iter ( )
44+ . map (
45+ |raw| match Multiaddr :: try_from ( raw. bytes ( ) . raw_data ( ) . to_vec ( ) ) {
46+ Ok ( mut addr) => {
47+ if let Some ( peer_id) = extract_peer_id ( & addr) {
48+ if peer_id != from {
49+ return Err ( StatusCode :: InvalidListenAddrLen
50+ . with_context ( "peer id in listen address is invalid" ) ) ;
51+ }
52+ } else {
53+ addr. push ( Protocol :: P2P ( Cow :: Borrowed ( from. as_bytes ( ) ) ) ) ;
54+ }
55+ Ok ( addr)
56+ }
57+ Err ( _) => Err ( StatusCode :: InvalidListenAddrLen
58+ . with_context ( "the listen address is invalid" ) ) ,
59+ } ,
60+ )
61+ . collect :: < Result < Vec < _ > , _ > > ( ) ?;
62+
63+ let route: Vec < PeerId > = value
64+ . route ( )
65+ . iter ( )
66+ . map ( |raw| {
67+ PeerId :: from_bytes ( raw. raw_data ( ) . to_vec ( ) ) . map_err ( |_| {
68+ StatusCode :: InvalidRoute . with_context ( "the route peer id is invalid" )
69+ } )
70+ } )
71+ . collect :: < Result < Vec < _ > , _ > > ( ) ?;
72+
73+ let max_hops: u8 = value. max_hops ( ) . into ( ) ;
74+
75+ Ok ( Self {
76+ from,
77+ to,
78+ listen_addrs,
79+ route,
80+ max_hops,
81+ } )
82+ }
83+ }
84+
2485pub ( crate ) struct ConnectionRequestProcess < ' a > {
2586 message : packed:: ConnectionRequestReader < ' a > ,
2687 protocol : & ' a HolePunching ,
2788 peer : PeerIndex ,
2889 p2p_control : & ' a ServiceAsyncControl ,
90+ msg_item_id : u32 ,
2991}
3092
3193impl < ' a > ConnectionRequestProcess < ' a > {
@@ -34,72 +96,68 @@ impl<'a> ConnectionRequestProcess<'a> {
3496 protocol : & ' a HolePunching ,
3597 peer : PeerIndex ,
3698 p2p_control : & ' a ServiceAsyncControl ,
99+ msg_item_id : u32 ,
37100 ) -> Self {
38101 Self {
39102 message,
40103 protocol,
41104 peer,
42105 p2p_control,
106+ msg_item_id,
43107 }
44108 }
45109
46110 pub ( crate ) async fn execute ( self ) -> Status {
47- if self . message . listen_addrs ( ) . len ( ) > ADDRS_COUNT_LIMIT
48- || self . message . listen_addrs ( ) . is_empty ( )
49- {
111+ let content = match RequestContent :: try_from ( & self . message ) {
112+ Ok ( content) => content,
113+ Err ( status) => return status,
114+ } ;
115+ if content. listen_addrs . len ( ) > ADDRS_COUNT_LIMIT || content. listen_addrs . is_empty ( ) {
50116 return StatusCode :: InvalidListenAddrLen
51117 . with_context ( "the listen address count is too large or empty" ) ;
52118 }
53- let ttl : u8 = self . message . ttl ( ) . into ( ) ;
54- if ttl > MAX_TTL {
119+
120+ if content . max_hops > MAX_HOPS {
55121 return StatusCode :: InvalidMaxTTL . into ( ) ;
56122 }
57- if self . message . route ( ) . len ( ) > 8 {
123+ if content . route . len ( ) > 8 {
58124 return StatusCode :: InvalidRoute . with_context ( "the route length is too long" ) ;
59125 }
60126
61127 let self_peer_id = self . protocol . network_state . local_peer_id ( ) ;
62- for peer_id_bytes in self . message . route ( ) . iter ( ) {
63- match PeerId :: from_bytes ( peer_id_bytes. raw_data ( ) . to_vec ( ) ) {
64- Ok ( peer_id) => {
65- if self_peer_id == & peer_id {
66- return StatusCode :: Ignore . with_context ( "the message is passed, ignore it" ) ;
67- }
68- }
69- Err ( _) => {
70- return StatusCode :: InvalidRoute . into ( ) ;
71- }
72- }
128+ if content. route . contains ( & self_peer_id) {
129+ return StatusCode :: Ignore . with_context ( "the message is passed, ignore it" ) ;
73130 }
74131
75- let from_peer_id = match PeerId :: from_bytes ( self . message . from ( ) . raw_data ( ) . to_vec ( ) ) {
76- Ok ( peer_id) => {
77- if self_peer_id == & peer_id {
78- return StatusCode :: Ignore . with_context ( "the message is passed, ignore it" ) ;
79- }
80- peer_id
81- }
82- Err ( _) => {
83- return StatusCode :: InvalidFromPeerId . into ( ) ;
84- }
85- } ;
86- let to_peer_id = match PeerId :: from_bytes ( self . message . to ( ) . raw_data ( ) . to_vec ( ) ) {
87- Ok ( peer_id) => peer_id,
88- Err ( _) => {
89- return StatusCode :: InvalidToPeerId . into ( ) ;
90- }
91- } ;
132+ if self
133+ . protocol
134+ . forward_rate_limiter
135+ . check_key ( & ( content. from . clone ( ) , content. to . clone ( ) , self . msg_item_id ) )
136+ . is_err ( )
137+ {
138+ debug ! (
139+ "from: {}, to {}, item_name: {}, rate limit is reached" ,
140+ content. from, content. to, "ConnectionRequest" ,
141+ ) ;
142+ return StatusCode :: TooManyRequests . with_context ( "ConnectionRequest" ) ;
143+ }
92144
93- if self_peer_id == & to_peer_id {
94- self . respond_delivered ( from_peer_id, & to_peer_id) . await
95- } else if ttl == 0u8 {
96- StatusCode :: ReachedMaxTTL . into ( )
145+ if self_peer_id == & content. to {
146+ self . respond_delivered ( content. from , & content. to , content. listen_addrs )
147+ . await
148+ } else if content. max_hops == 0u8 {
149+ StatusCode :: ReachedMaxHops . into ( )
97150 } else {
98- self . forward_message ( self_peer_id, & to_peer_id ) . await
151+ self . forward_message ( self_peer_id, & content . to ) . await
99152 }
100153 }
101154
102- async fn respond_delivered ( & self , from_peer_id : PeerId , to_peer_id : & PeerId ) -> Status {
155+ async fn respond_delivered (
156+ & self ,
157+ from_peer_id : PeerId ,
158+ to_peer_id : & PeerId ,
159+ remote_listens : Vec < Multiaddr > ,
160+ ) -> Status {
103161 if let Some ( ( _, t) ) = self . protocol . pending_delivered . read ( ) . get ( & from_peer_id) {
104162 let now = unix_time_as_millis ( ) ;
105163 if now - t < PENETRATED_INTERVAL {
@@ -140,6 +198,22 @@ impl<'a> ConnectionRequestProcess<'a> {
140198 to_peer_id
141199 ) ;
142200
201+ let remote_listens: Vec < Multiaddr > = remote_listens
202+ . into_iter ( )
203+ . filter_map ( |addr| match find_type ( & addr) {
204+ TransportType :: Memory
205+ | TransportType :: Onion
206+ | TransportType :: Ws
207+ | TransportType :: Wss
208+ | TransportType :: Tls => None ,
209+ TransportType :: Tcp => Some ( addr) ,
210+ } )
211+ . collect ( ) ;
212+
213+ if remote_listens. is_empty ( ) {
214+ return StatusCode :: Ignore . with_context ( "remote listen address is empty" ) ;
215+ }
216+
143217 if let Err ( error) = self
144218 . p2p_control
145219 . send_message_to ( self . peer , proto_id, new_message)
@@ -148,35 +222,6 @@ impl<'a> ConnectionRequestProcess<'a> {
148222 return StatusCode :: ForwardError . with_context ( error) ;
149223 }
150224
151- let remote_listens = self
152- . message
153- . listen_addrs ( )
154- . iter ( )
155- . filter_map (
156- |raw| match Multiaddr :: try_from ( raw. bytes ( ) . raw_data ( ) . to_vec ( ) ) {
157- Ok ( mut addr) => {
158- if let Some ( peer_id) = extract_peer_id ( & addr) {
159- if peer_id != from_peer_id {
160- return None ;
161- }
162- } else {
163- addr. push ( Protocol :: P2P ( Cow :: Borrowed ( from_peer_id. as_bytes ( ) ) ) ) ;
164- }
165-
166- match find_type ( & addr) {
167- TransportType :: Memory
168- | TransportType :: Onion
169- | TransportType :: Ws
170- | TransportType :: Wss
171- | TransportType :: Tls => None ,
172- TransportType :: Tcp => Some ( addr) ,
173- }
174- }
175- Err ( _) => None ,
176- } ,
177- )
178- . collect ( ) ;
179-
180225 let mut pending_delivered = self . protocol . pending_delivered . write ( ) ;
181226 let now = unix_time_as_millis ( ) ;
182227 pending_delivered. insert ( from_peer_id, ( remote_listens, now) ) ;
@@ -220,11 +265,20 @@ impl<'a> ConnectionRequestProcess<'a> {
220265 "target peer {} is not found, broadcast the request to more peers" ,
221266 to_peer_id
222267 ) ;
268+
269+ // Broadcast using gossip while removing the source of the message
223270 let sid = self . peer ;
271+ let mut init = 0usize ;
224272 if let Err ( error) = self
225273 . p2p_control
226274 . filter_broadcast (
227- TargetSession :: Filter ( Box :: new ( move |id| id != & sid) ) ,
275+ TargetSession :: Filter ( Box :: new ( move |id| {
276+ if id == & sid {
277+ return false ;
278+ }
279+ init += 1 ;
280+ if init % 3 != 0 { true } else { false }
281+ } ) ) ,
228282 proto_id,
229283 new_message,
230284 )
0 commit comments