@@ -65,7 +65,7 @@ use crate::{
65
65
transform:: { DataTransform , IdentityTransform } ,
66
66
types:: {
67
67
ControlAction , Graft , IDontWant , IHave , IWant , Message , MessageAcceptance , MessageId ,
68
- PeerConnections , PeerInfo , PeerKind , Prune , RawMessage , RpcOut , Subscription ,
68
+ PeerDetails , PeerInfo , PeerKind , Prune , RawMessage , RpcOut , Subscription ,
69
69
SubscriptionAction ,
70
70
} ,
71
71
FailedMessages , PublishError , SubscriptionError , TopicScoreParams , ValidationError ,
@@ -270,7 +270,7 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
270
270
271
271
/// A set of connected peers, indexed by their [`PeerId`] tracking both the [`PeerKind`] and
272
272
/// the set of [`ConnectionId`]s.
273
- connected_peers : HashMap < PeerId , PeerConnections > ,
273
+ connected_peers : HashMap < PeerId , PeerDetails > ,
274
274
275
275
/// A set of all explicit peers. These are peers that remain connected and we unconditionally
276
276
/// forward messages to, outside of the scoring system.
@@ -308,10 +308,6 @@ pub struct Behaviour<D = IdentityTransform, F = AllowAllSubscriptionFilter> {
308
308
/// be removed from this list which may result in a true outbound rediscovery.
309
309
px_peers : HashSet < PeerId > ,
310
310
311
- /// Set of connected outbound peers (we only consider true outbound peers found through
312
- /// discovery and not by PX).
313
- outbound_peers : HashSet < PeerId > ,
314
-
315
311
/// Stores optional peer score data together with thresholds, decay interval and gossip
316
312
/// promises.
317
313
peer_score : Option < ( PeerScore , PeerScoreThresholds , Delay ) > ,
@@ -465,7 +461,6 @@ where
465
461
heartbeat : Delay :: new ( config. heartbeat_interval ( ) + config. heartbeat_initial_delay ( ) ) ,
466
462
heartbeat_ticks : 0 ,
467
463
px_peers : HashSet :: new ( ) ,
468
- outbound_peers : HashSet :: new ( ) ,
469
464
peer_score : None ,
470
465
count_received_ihave : HashMap :: new ( ) ,
471
466
count_sent_iwant : HashMap :: new ( ) ,
@@ -1380,6 +1375,8 @@ where
1380
1375
tracing:: error!( peer_id = %peer_id, "Peer non-existent when handling graft" ) ;
1381
1376
return ;
1382
1377
} ;
1378
+ // Needs to be here to comply with the borrow checker.
1379
+ let is_outbound = connected_peer. outbound ;
1383
1380
1384
1381
// For each topic, if a peer has grafted us, then we necessarily must be in their mesh
1385
1382
// and they must be subscribed to the topic. Ensure we have recorded the mapping.
@@ -1468,7 +1465,7 @@ where
1468
1465
// or if it is an outbound peer
1469
1466
let mesh_n_high = self . config . mesh_n_high_for_topic ( & topic_hash) ;
1470
1467
1471
- if peers. len ( ) >= mesh_n_high && !self . outbound_peers . contains ( peer_id ) {
1468
+ if peers. len ( ) >= mesh_n_high && !is_outbound {
1472
1469
to_prune_topics. insert ( topic_hash. clone ( ) ) ;
1473
1470
continue ;
1474
1471
}
@@ -2102,7 +2099,6 @@ where
2102
2099
for ( topic_hash, peers) in self . mesh . iter_mut ( ) {
2103
2100
let explicit_peers = & self . explicit_peers ;
2104
2101
let backoffs = & self . backoffs ;
2105
- let outbound_peers = & self . outbound_peers ;
2106
2102
2107
2103
let mesh_n = self . config . mesh_n_for_topic ( topic_hash) ;
2108
2104
let mesh_n_low = self . config . mesh_n_low_for_topic ( topic_hash) ;
@@ -2197,13 +2193,14 @@ where
2197
2193
shuffled[ ..peers. len ( ) - self . config . retain_scores ( ) ] . shuffle ( & mut rng) ;
2198
2194
2199
2195
// count total number of outbound peers
2200
- let mut outbound = {
2201
- let outbound_peers = & self . outbound_peers ;
2202
- shuffled
2203
- . iter ( )
2204
- . filter ( |p| outbound_peers. contains ( * p) )
2205
- . count ( )
2206
- } ;
2196
+ let mut outbound = shuffled
2197
+ . iter ( )
2198
+ . filter ( |peer_id| {
2199
+ self . connected_peers
2200
+ . get ( peer_id)
2201
+ . is_some_and ( |peer| peer. outbound )
2202
+ } )
2203
+ . count ( ) ;
2207
2204
2208
2205
// remove the first excess_peer_no allowed (by outbound restrictions) peers adding
2209
2206
// them to to_prune
@@ -2212,7 +2209,11 @@ where
2212
2209
if removed == excess_peer_no {
2213
2210
break ;
2214
2211
}
2215
- if self . outbound_peers . contains ( & peer) {
2212
+ if self
2213
+ . connected_peers
2214
+ . get ( & peer)
2215
+ . is_some_and ( |peer| peer. outbound )
2216
+ {
2216
2217
if outbound <= mesh_outbound_min {
2217
2218
// do not remove anymore outbound peers
2218
2219
continue ;
@@ -2236,18 +2237,28 @@ where
2236
2237
// do we have enough outbound peers?
2237
2238
if peers. len ( ) >= mesh_n_low {
2238
2239
// count number of outbound peers we have
2239
- let outbound = { peers. iter ( ) . filter ( |p| outbound_peers. contains ( * p) ) . count ( ) } ;
2240
+ let outbound = peers
2241
+ . iter ( )
2242
+ . filter ( |peer_id| {
2243
+ self . connected_peers
2244
+ . get ( peer_id)
2245
+ . is_some_and ( |peer| peer. outbound )
2246
+ } )
2247
+ . count ( ) ;
2240
2248
2241
2249
// if we have not enough outbound peers, graft to some new outbound peers
2242
2250
if outbound < mesh_outbound_min {
2243
2251
let needed = mesh_outbound_min - outbound;
2244
2252
let peer_list =
2245
- get_random_peers ( & self . connected_peers , topic_hash, needed, |peer| {
2246
- !peers. contains ( peer)
2247
- && !explicit_peers. contains ( peer)
2248
- && !backoffs. is_backoff_with_slack ( topic_hash, peer)
2249
- && * scores. get ( peer) . unwrap_or ( & 0.0 ) >= 0.0
2250
- && outbound_peers. contains ( peer)
2253
+ get_random_peers ( & self . connected_peers , topic_hash, needed, |peer_id| {
2254
+ !peers. contains ( peer_id)
2255
+ && !explicit_peers. contains ( peer_id)
2256
+ && !backoffs. is_backoff_with_slack ( topic_hash, peer_id)
2257
+ && * scores. get ( peer_id) . unwrap_or ( & 0.0 ) >= 0.0
2258
+ && self
2259
+ . connected_peers
2260
+ . get ( peer_id)
2261
+ . is_some_and ( |peer| peer. outbound )
2251
2262
} ) ;
2252
2263
2253
2264
for peer in & peer_list {
@@ -2904,15 +2915,6 @@ where
2904
2915
..
2905
2916
} : ConnectionEstablished ,
2906
2917
) {
2907
- // Diverging from the go implementation we only want to consider a peer as outbound peer
2908
- // if its first connection is outbound.
2909
-
2910
- if endpoint. is_dialer ( ) && other_established == 0 && !self . px_peers . contains ( & peer_id) {
2911
- // The first connection is outbound and it is not a peer from peer exchange => mark
2912
- // it as outbound peer
2913
- self . outbound_peers . insert ( peer_id) ;
2914
- }
2915
-
2916
2918
// Add the IP to the peer scoring system
2917
2919
if let Some ( ( peer_score, ..) ) = & mut self . peer_score {
2918
2920
if let Some ( ip) = get_ip_addr ( endpoint. get_remote_address ( ) ) {
@@ -3030,7 +3032,6 @@ where
3030
3032
3031
3033
// Forget px and outbound status for this peer
3032
3034
self . px_peers . remove ( & peer_id) ;
3033
- self . outbound_peers . remove ( & peer_id) ;
3034
3035
3035
3036
// If metrics are enabled, register the disconnection of a peer based on its protocol.
3036
3037
if let Some ( metrics) = self . metrics . as_mut ( ) {
@@ -3113,16 +3114,14 @@ where
3113
3114
// The protocol negotiation occurs once a message is sent/received. Once this happens we
3114
3115
// update the type of peer that this is in order to determine which kind of routing should
3115
3116
// occur.
3116
- let connected_peer = self
3117
- . connected_peers
3118
- . entry ( peer_id)
3119
- . or_insert ( PeerConnections {
3120
- kind : PeerKind :: Floodsub ,
3121
- connections : vec ! [ ] ,
3122
- sender : Sender :: new ( self . config . connection_handler_queue_len ( ) ) ,
3123
- topics : Default :: default ( ) ,
3124
- dont_send : LinkedHashMap :: new ( ) ,
3125
- } ) ;
3117
+ let connected_peer = self . connected_peers . entry ( peer_id) . or_insert ( PeerDetails {
3118
+ kind : PeerKind :: Floodsub ,
3119
+ connections : vec ! [ ] ,
3120
+ outbound : false ,
3121
+ sender : Sender :: new ( self . config . connection_handler_queue_len ( ) ) ,
3122
+ topics : Default :: default ( ) ,
3123
+ dont_send : LinkedHashMap :: new ( ) ,
3124
+ } ) ;
3126
3125
// Add the new connection
3127
3126
connected_peer. connections . push ( connection_id) ;
3128
3127
@@ -3140,16 +3139,16 @@ where
3140
3139
_: Endpoint ,
3141
3140
_: PortUse ,
3142
3141
) -> Result < THandler < Self > , ConnectionDenied > {
3143
- let connected_peer = self
3144
- . connected_peers
3145
- . entry ( peer_id )
3146
- . or_insert ( PeerConnections {
3147
- kind : PeerKind :: Floodsub ,
3148
- connections : vec ! [ ] ,
3149
- sender : Sender :: new ( self . config . connection_handler_queue_len ( ) ) ,
3150
- topics : Default :: default ( ) ,
3151
- dont_send : LinkedHashMap :: new ( ) ,
3152
- } ) ;
3142
+ let connected_peer = self . connected_peers . entry ( peer_id ) . or_insert ( PeerDetails {
3143
+ kind : PeerKind :: Floodsub ,
3144
+ connections : vec ! [ ] ,
3145
+ // Diverging from the go implementation we only want to consider a peer as outbound peer
3146
+ // if its first connection is outbound.
3147
+ outbound : ! self . px_peers . contains ( & peer_id ) ,
3148
+ sender : Sender :: new ( self . config . connection_handler_queue_len ( ) ) ,
3149
+ topics : Default :: default ( ) ,
3150
+ dont_send : LinkedHashMap :: new ( ) ,
3151
+ } ) ;
3153
3152
// Add the new connection
3154
3153
connected_peer. connections . push ( connection_id) ;
3155
3154
@@ -3399,7 +3398,7 @@ fn peer_added_to_mesh(
3399
3398
new_topics : Vec < & TopicHash > ,
3400
3399
mesh : & HashMap < TopicHash , BTreeSet < PeerId > > ,
3401
3400
events : & mut VecDeque < ToSwarm < Event , HandlerIn > > ,
3402
- connections : & HashMap < PeerId , PeerConnections > ,
3401
+ connections : & HashMap < PeerId , PeerDetails > ,
3403
3402
) {
3404
3403
// Ensure there is an active connection
3405
3404
let connection_id = match connections. get ( & peer_id) {
@@ -3441,7 +3440,7 @@ fn peer_removed_from_mesh(
3441
3440
old_topic : & TopicHash ,
3442
3441
mesh : & HashMap < TopicHash , BTreeSet < PeerId > > ,
3443
3442
events : & mut VecDeque < ToSwarm < Event , HandlerIn > > ,
3444
- connections : & HashMap < PeerId , PeerConnections > ,
3443
+ connections : & HashMap < PeerId , PeerDetails > ,
3445
3444
) {
3446
3445
// Ensure there is an active connection
3447
3446
let connection_id = match connections. get ( & peer_id) {
@@ -3479,7 +3478,7 @@ fn peer_removed_from_mesh(
3479
3478
/// filtered by the function `f`. The number of peers to get equals the output of `n_map`
3480
3479
/// that gets as input the number of filtered peers.
3481
3480
fn get_random_peers_dynamic (
3482
- connected_peers : & HashMap < PeerId , PeerConnections > ,
3481
+ connected_peers : & HashMap < PeerId , PeerDetails > ,
3483
3482
topic_hash : & TopicHash ,
3484
3483
// maps the number of total peers to the number of selected peers
3485
3484
n_map : impl Fn ( usize ) -> usize ,
@@ -3512,7 +3511,7 @@ fn get_random_peers_dynamic(
3512
3511
/// Helper function to get a set of `n` random gossipsub peers for a `topic_hash`
3513
3512
/// filtered by the function `f`.
3514
3513
fn get_random_peers (
3515
- connected_peers : & HashMap < PeerId , PeerConnections > ,
3514
+ connected_peers : & HashMap < PeerId , PeerDetails > ,
3516
3515
topic_hash : & TopicHash ,
3517
3516
n : usize ,
3518
3517
f : impl FnMut ( & PeerId ) -> bool ,
0 commit comments