Skip to content

Commit 54154fa

Browse files
aunthVlad Masliankostoropoli
authored
Feature/scoring fixes (#103)
* fix conflicts * fix conflicts * use PeerScore instead of 3 separate scores * add decay and test for it * fix clippy * apply fixes * fix clippy * add decay to commands handler and update the test * apply fixes * fix clippy * fix clippy * doc: clarify why scoring system is disable in byos --------- Co-authored-by: Vlad Maslianko <masliankovladik@gmail.ocm> Co-authored-by: Jose Storopoli <jose@storopoli.com>
1 parent 760ec56 commit 54154fa

File tree

6 files changed

+263
-61
lines changed

6 files changed

+263
-61
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/p2p/src/commands.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,12 @@ use libp2p::PeerId;
77
use libp2p::identity::PublicKey;
88
use tokio::sync::oneshot;
99

10+
#[cfg(all(
11+
any(feature = "gossipsub", feature = "request-response"),
12+
not(feature = "byos")
13+
))]
14+
use crate::score_manager::PeerScore;
15+
1016
/// Commands that users can send to the P2P node.
1117
#[derive(Debug)]
1218
pub enum Command {
@@ -35,6 +41,18 @@ pub enum Command {
3541
target_transport_id: PeerId,
3642
},
3743

44+
/// Gets [`PeerScore`] for a specific peer by [`PeerId`].
45+
#[cfg(all(
46+
any(feature = "gossipsub", feature = "request-response"),
47+
not(feature = "byos")
48+
))]
49+
GetPeerScore {
50+
/// Transport `PeerId` to query.
51+
peer_id: PeerId,
52+
/// Channel to send the response back.
53+
response_sender: oneshot::Sender<PeerScore>,
54+
},
55+
3856
/// Directly queries P2P state (doesn't produce events).
3957
QueryP2PState(QueryP2PStateCommand),
4058
}

crates/p2p/src/score_manager.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ pub const DEFAULT_GOSSIP_APP_SCORE: f64 = 0.0;
1010

1111
#[cfg(any(feature = "gossipsub", feature = "request-response"))]
1212
use std::collections::HashMap;
13+
#[cfg(any(feature = "gossipsub", feature = "request-response"))]
14+
use std::time::SystemTime;
1315

1416
#[cfg(any(feature = "gossipsub", feature = "request-response"))]
1517
use libp2p::identity::PeerId;
@@ -26,6 +28,13 @@ pub struct ScoreManager {
2628
/// [`HashMap`] of request-response score for each peer.
2729
#[cfg(feature = "request-response")]
2830
req_resp_app_score: HashMap<PeerId, f64>,
31+
32+
/// Storage for last scoring decay time.
33+
#[cfg(all(
34+
any(feature = "gossipsub", feature = "request-response"),
35+
not(feature = "byos")
36+
))]
37+
last_scoring_decay_time: HashMap<PeerId, SystemTime>,
2938
}
3039

3140
impl ScoreManager {
@@ -37,6 +46,9 @@ impl ScoreManager {
3746

3847
#[cfg(feature = "request-response")]
3948
req_resp_app_score: HashMap::new(),
49+
50+
#[cfg(any(feature = "request-response", feature = "gossipsub"))]
51+
last_scoring_decay_time: HashMap::new(),
4052
}
4153
}
4254

@@ -67,6 +79,18 @@ impl ScoreManager {
6779
pub fn update_req_resp_app_score(&mut self, peer_id: &PeerId, new_score: f64) {
6880
self.req_resp_app_score.insert(*peer_id, new_score);
6981
}
82+
83+
/// Updates the last scoring decay time for a given [`PeerId`].
84+
///
85+
/// Returns `Some(SystemTime)` containing the previous decay time if it existed,
86+
/// or `None` if this is the first time updating the peer.
87+
pub fn update_last_scoring_decay_time(
88+
&mut self,
89+
peer_id: PeerId,
90+
time: SystemTime,
91+
) -> Option<SystemTime> {
92+
self.last_scoring_decay_time.insert(peer_id, time)
93+
}
7094
}
7195

7296
/// All scores for a peer.

crates/p2p/src/swarm/mod.rs

Lines changed: 92 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1141,6 +1141,18 @@ impl P2P {
11411141
return Ok(());
11421142
}
11431143

1144+
// NOTE: BYOS uses an allowlist, making scoring system useless.
1145+
#[cfg(not(feature = "byos"))]
1146+
{
1147+
let elapsed = self.get_elapsed_decay_time(&propagation_source);
1148+
if !elapsed.is_zero() {
1149+
let current_score = self.get_all_scores(&propagation_source).gossipsub_app_score;
1150+
let new_score = self.validator.apply_decay(&current_score, &elapsed);
1151+
self.score_manager
1152+
.update_gossipsub_app_score(&propagation_source, new_score);
1153+
}
1154+
}
1155+
11441156
let signed_gossipsub_message: SignedGossipsubMessage = match flexbuffers::from_slice(
11451157
&message.data,
11461158
) {
@@ -1259,19 +1271,11 @@ impl P2P {
12591271
self.score_manager
12601272
.update_gossipsub_app_score(&propagation_source, updated_score);
12611273

1262-
let PeerScore {
1263-
gossipsub_app_score,
1264-
gossipsub_internal_score,
1265-
#[cfg(feature = "request-response")]
1266-
req_resp_app_score,
1267-
} = self.get_all_scores(&propagation_source);
1274+
let peer_score = self.get_all_scores(&propagation_source);
12681275

12691276
if let Some(penalty) = self.validator.get_penalty(
12701277
&MessageType::Gossipsub(signed_gossipsub_message.message.message.clone()),
1271-
gossipsub_internal_score,
1272-
gossipsub_app_score,
1273-
#[cfg(feature = "request-response")]
1274-
req_resp_app_score,
1278+
&peer_score,
12751279
) {
12761280
self.apply_penalty(&propagation_source, penalty).await;
12771281
return Ok(());
@@ -1369,6 +1373,7 @@ impl P2P {
13691373

13701374
Ok(())
13711375
}
1376+
13721377
Command::DisconnectFromPeer {
13731378
#[cfg(feature = "byos")]
13741379
target_app_public_key,
@@ -1400,6 +1405,44 @@ impl P2P {
14001405

14011406
Ok(())
14021407
}
1408+
1409+
// NOTE: BYOS uses an allowlist, making scoring system useless.
1410+
#[cfg(all(
1411+
any(feature = "gossipsub", feature = "request-response"),
1412+
not(feature = "byos")
1413+
))]
1414+
Command::GetPeerScore {
1415+
peer_id,
1416+
response_sender,
1417+
} => {
1418+
let mut score = self.get_all_scores(&peer_id);
1419+
1420+
let elapsed = self.get_elapsed_decay_time(&peer_id);
1421+
if !elapsed.is_zero() {
1422+
#[cfg(feature = "gossipsub")]
1423+
{
1424+
let new_score = self
1425+
.validator
1426+
.apply_decay(&score.gossipsub_app_score, &elapsed);
1427+
self.score_manager
1428+
.update_gossipsub_app_score(&peer_id, new_score);
1429+
score.gossipsub_app_score = new_score;
1430+
}
1431+
#[cfg(feature = "request-response")]
1432+
{
1433+
let updated_score = self
1434+
.validator
1435+
.apply_decay(&score.req_resp_app_score, &elapsed);
1436+
self.score_manager
1437+
.update_req_resp_app_score(&peer_id, updated_score);
1438+
score.req_resp_app_score = updated_score;
1439+
}
1440+
}
1441+
1442+
let _ = response_sender.send(score);
1443+
Ok(())
1444+
}
1445+
14031446
Command::QueryP2PState(query) => match query {
14041447
QueryP2PStateCommand::IsConnected {
14051448
#[cfg(feature = "byos")]
@@ -1440,6 +1483,7 @@ impl P2P {
14401483

14411484
Ok(())
14421485
}
1486+
14431487
QueryP2PStateCommand::GetConnectedPeers { response_sender } => {
14441488
info!("Querying connected peers");
14451489
let peer_ids = self.swarm.connected_peers().cloned().collect::<Vec<_>>();
@@ -1466,6 +1510,7 @@ impl P2P {
14661510

14671511
Ok(())
14681512
}
1513+
14691514
QueryP2PStateCommand::GetMyListeningAddresses { response_sender } => {
14701515
info!("Querying my own local listening addresses.");
14711516
// We clone here because if not clone, we'll receive `Vec<&Multiaddr>`. Ok,
@@ -1673,6 +1718,33 @@ impl P2P {
16731718
Ok(())
16741719
}
16751720

1721+
// NOTE: BYOS uses an allowlist, making scoring system useless.
1722+
#[cfg(all(
1723+
any(feature = "gossipsub", feature = "request-response"),
1724+
not(feature = "byos")
1725+
))]
1726+
fn get_elapsed_decay_time(&mut self, peer_id: &PeerId) -> Duration {
1727+
let now = SystemTime::now();
1728+
let last_time = self
1729+
.score_manager
1730+
.update_last_scoring_decay_time(*peer_id, now);
1731+
1732+
last_time
1733+
.and_then(|last| now.duration_since(last).ok())
1734+
.unwrap_or_default()
1735+
}
1736+
1737+
#[cfg(all(feature = "request-response", not(feature = "byos")))]
1738+
async fn decay_request_response_score(&mut self, peer_id: &PeerId) {
1739+
let elapsed = self.get_elapsed_decay_time(peer_id);
1740+
if !elapsed.is_zero() {
1741+
let current_score = self.get_all_scores(peer_id).req_resp_app_score;
1742+
let updated_score = self.validator.apply_decay(&current_score, &elapsed);
1743+
self.score_manager
1744+
.update_req_resp_app_score(peer_id, updated_score);
1745+
}
1746+
}
1747+
16761748
/// Handles [`MessageEvent`] from the swarm.
16771749
#[cfg(feature = "request-response")]
16781750
async fn handle_message_event(
@@ -1732,6 +1804,7 @@ impl P2P {
17321804

17331805
#[cfg(not(feature = "byos"))]
17341806
{
1807+
self.decay_request_response_score(&peer_id).await;
17351808
let old_app_score = self
17361809
.score_manager
17371810
.get_req_resp_score(&peer_id)
@@ -1745,22 +1818,12 @@ impl P2P {
17451818
self.score_manager
17461819
.update_req_resp_app_score(&peer_id, updated_score);
17471820

1748-
let PeerScore {
1749-
#[cfg(feature = "gossipsub")]
1750-
gossipsub_internal_score,
1751-
#[cfg(feature = "gossipsub")]
1752-
gossipsub_app_score,
1753-
req_resp_app_score,
1754-
} = self.get_all_scores(&peer_id);
1821+
let peer_score = self.get_all_scores(&peer_id);
17551822

1756-
if let Some(penalty) = self.validator.get_penalty(
1757-
&MessageType::Request(request.message.clone()),
1758-
#[cfg(feature = "gossipsub")]
1759-
gossipsub_internal_score,
1760-
#[cfg(feature = "gossipsub")]
1761-
gossipsub_app_score,
1762-
req_resp_app_score,
1763-
) {
1823+
if let Some(penalty) = self
1824+
.validator
1825+
.get_penalty(&MessageType::Request(request.message.clone()), &peer_score)
1826+
{
17641827
self.apply_penalty(&peer_id, penalty).await;
17651828
return Ok(());
17661829
}
@@ -1904,8 +1967,10 @@ impl P2P {
19041967
return Ok(());
19051968
}
19061969

1970+
// NOTE: BYOS uses an allowlist, making scoring system useless.
19071971
#[cfg(not(feature = "byos"))]
19081972
{
1973+
self.decay_request_response_score(&peer_id).await;
19091974
let response: ResponseMessage = signed_response_message.message.clone();
19101975
let old_app_score = self
19111976
.score_manager
@@ -1920,21 +1985,11 @@ impl P2P {
19201985
self.score_manager
19211986
.update_req_resp_app_score(&peer_id, updated_score);
19221987

1923-
let PeerScore {
1924-
#[cfg(feature = "gossipsub")]
1925-
gossipsub_internal_score,
1926-
#[cfg(feature = "gossipsub")]
1927-
gossipsub_app_score,
1928-
req_resp_app_score,
1929-
} = self.get_all_scores(&peer_id);
1988+
let peer_score = self.get_all_scores(&peer_id);
19301989

19311990
if let Some(penalty) = self.validator.get_penalty(
19321991
&MessageType::Response(response.message.clone()),
1933-
#[cfg(feature = "gossipsub")]
1934-
gossipsub_internal_score,
1935-
#[cfg(feature = "gossipsub")]
1936-
gossipsub_app_score,
1937-
req_resp_app_score,
1992+
&peer_score,
19381993
) {
19391994
self.apply_penalty(&peer_id, penalty).await;
19401995
return Ok(());

0 commit comments

Comments
 (0)