Skip to content

Store peer penality records in the PeerDB in PeerInfo #7320

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 137 additions & 3 deletions beacon_node/lighthouse_network/src/peer_manager/peerdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,16 @@ use crate::{metrics, multiaddr::Multiaddr, types::Subnet, Enr, EnrExt, Gossipsub
use itertools::Itertools;
use logging::crit;
use peer_info::{ConnectionDirection, PeerConnectionStatus, PeerInfo};
use score::{PeerAction, ReportSource, Score, ScoreState};
use score::{PeerAction, PenaltyRecord, ReportSource, Score, ScoreState};
use serde::Serialize;
use std::net::IpAddr;
use std::time::Instant;
use std::{cmp::Ordering, fmt::Display};
use std::{
collections::{hash_map::Entry, HashMap, HashSet},
fmt::Formatter,
};
use strum::AsRefStr;
use sync_status::SyncStatus;
use tracing::{debug, error, trace, warn};
use types::data_column_custody_group::compute_subnets_for_node;
Expand Down Expand Up @@ -560,7 +562,11 @@ impl<E: EthSpec> PeerDB<E> {
info.apply_peer_action_to_score(action);
metrics::inc_counter_vec(
&metrics::PEER_ACTION_EVENTS_PER_CLIENT,
&[info.client().kind.as_ref(), action.as_ref(), source.into()],
&[
info.client().kind.as_ref(),
action.as_ref(),
source.as_ref(),
],
);
let result = Self::handle_score_transition(previous_state, peer_id, info);
if previous_state == info.score_state() {
Expand All @@ -571,6 +577,10 @@ impl<E: EthSpec> PeerDB<E> {
"Peer score adjusted"
);
}
self.add_penalty_record(
peer_id,
PenaltyRecord::new(action, source, msg, result.clone()),
);
match result {
ScoreTransitionResult::Banned => {
// The peer was banned as a result of this action.
Expand Down Expand Up @@ -752,6 +762,32 @@ impl<E: EthSpec> PeerDB<E> {
peer_id
}

fn add_penalty_record(&mut self, peer_id: &PeerId, penalty_record: PenaltyRecord) {
let info = self.peers.entry(*peer_id).or_insert_with(|| {
error!(%peer_id, "Adding a penalty record to a non-existant peer");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can happen in a race condition, not sure if it should be an error log

if self.disable_peer_scoring {
PeerInfo::trusted_peer_info()
} else {
PeerInfo::default()
}
});
info.add_penalty_record(penalty_record);
}

/// Gets all the penalty records for a given peer id
pub fn get_penalty_records(
&self,
peer_id: &PeerId,
) -> Option<impl Iterator<Item = &PenaltyRecord>> {
match self.peers.get(peer_id) {
Some(info) => Some(info.get_penalty_records()),
None => {
error!(%peer_id, "Getting the penalty records of a non-existant peer");
None
}
}
}

/// The connection state of the peer has been changed. Modify the peer in the db to ensure all
/// variables are in sync with libp2p.
/// Updating the state can lead to a `BanOperation` which needs to be processed via the peer
Expand Down Expand Up @@ -1225,7 +1261,9 @@ enum NewConnectionState {
}

/// The result of applying a score transition to a peer.
enum ScoreTransitionResult {
#[derive(Clone, Debug, Serialize, AsRefStr)]
#[strum(serialize_all = "snake_case")]
pub enum ScoreTransitionResult {
/// The peer has become disconnected.
Disconnected,
/// The peer has been banned.
Expand Down Expand Up @@ -1345,6 +1383,7 @@ impl BannedPeersCount {
mod tests {
use super::*;
use libp2p::core::multiaddr::Protocol;
use score::MAX_STORED_PENALTY_RECORDS;
use std::net::{Ipv4Addr, Ipv6Addr};
use types::MinimalEthSpec;

Expand Down Expand Up @@ -1896,6 +1935,101 @@ mod tests {
p
}

#[test]
fn test_add_penalty_record() {
let mut pdb = get_db();
let random_peer = PeerId::random();

pdb.connect_ingoing(&random_peer, "/ip4/0.0.0.0".parse().unwrap(), None);

// Check to see if get_penalty_records is able to get the associated peer info
assert!(pdb.get_penalty_records(&random_peer).is_some());

// Check to see if there are no initial penalty records
assert_eq!(
pdb.get_penalty_records(&random_peer)
.unwrap()
.into_iter()
.count(),
0
);

let _ = pdb.report_peer(
&random_peer,
PeerAction::HighToleranceError,
ReportSource::PeerManager,
"Minor report action",
);

// Check to see if there was a penalty record added
assert_eq!(
pdb.get_penalty_records(&random_peer)
.unwrap()
.into_iter()
.count(),
1
);

let first_record = pdb
.get_penalty_records(&random_peer)
.unwrap()
.into_iter()
.next()
.unwrap();

// Check to see the correct report action for the penalty record
assert!(matches!(
first_record.action,
PeerAction::HighToleranceError
));

// Check to see the correct report source for the penalty record
assert!(matches!(first_record.source, ReportSource::PeerManager));

// Check to see the correct message for the penalty record
assert_eq!(*first_record.msg, "Minor report action".to_string());

// Check to see the correct result for the penalty record
assert!(matches!(
first_record.result,
ScoreTransitionResult::NoAction
));

let _ = pdb.report_peer(
&random_peer,
PeerAction::HighToleranceError,
ReportSource::PeerManager,
"Minor report action",
);

// Check to see if there was a penalty record added
assert_eq!(
pdb.get_penalty_records(&random_peer)
.unwrap()
.into_iter()
.count(),
2
);

for _ in 1..(MAX_STORED_PENALTY_RECORDS + 1) {
let _ = pdb.report_peer(
&random_peer,
PeerAction::HighToleranceError,
ReportSource::PeerManager,
"Minor report action",
);
}

// Check to make sure that the number of penalty records don't exceed the maximum
assert_eq!(
pdb.get_penalty_records(&random_peer)
.unwrap()
.into_iter()
.count(),
MAX_STORED_PENALTY_RECORDS
);
}

#[test]
fn test_ban_address() {
let mut pdb = get_db();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::client::Client;
use super::score::{PeerAction, Score, ScoreState};
use super::score::{PeerAction, PenaltyRecord, Score, ScoreState, MAX_STORED_PENALTY_RECORDS};
use super::sync_status::SyncStatus;
use crate::discovery::Eth2Enr;
use crate::{rpc::MetaData, types::Subnet};
Expand All @@ -10,7 +10,7 @@ use serde::{
ser::{SerializeStruct, Serializer},
Serialize,
};
use std::collections::HashSet;
use std::collections::{HashSet, VecDeque};
use std::net::IpAddr;
use std::time::Instant;
use strum::AsRefStr;
Expand Down Expand Up @@ -46,6 +46,8 @@ pub struct PeerInfo<E: EthSpec> {
/// Note: Another reason to keep this separate to `self.subnets` is an upcoming change to
/// decouple custody requirements from the actual subnets, i.e. changing this to `custody_groups`.
custody_subnets: HashSet<DataColumnSubnetId>,
/// The record of penalties given to a peer
penalty_records: VecDeque<PenaltyRecord>,
/// The time we would like to retain this peer. After this time, the peer is no longer
/// necessary.
#[serde(skip)]
Expand Down Expand Up @@ -75,6 +77,7 @@ impl<E: EthSpec> Default for PeerInfo<E> {
is_trusted: false,
connection_direction: None,
enr: None,
penalty_records: VecDeque::new(),
}
}
}
Expand Down Expand Up @@ -387,6 +390,23 @@ impl<E: EthSpec> PeerInfo<E> {
self.meta_data = Some(meta_data);
}

/// Adds a penalty record
// VISIBILITY: The peer manager is able to add a penalty record
pub(in crate::peer_manager) fn add_penalty_record(&mut self, penalty_record: PenaltyRecord) {
self.penalty_records.push_back(penalty_record);
while self.penalty_records.len() > MAX_STORED_PENALTY_RECORDS {
self.penalty_records.pop_front();
}
}

/// Gets an iterator with the penalty records
// VISIBILITY: The peer manager is able to add a penalty record
pub(in crate::peer_manager) fn get_penalty_records(
&self,
) -> impl Iterator<Item = &PenaltyRecord> {
self.penalty_records.iter()
}

/// Sets the connection status of the peer.
pub(super) fn set_connection_status(&mut self, connection_status: PeerConnectionStatus) {
self.connection_status = connection_status
Expand Down
44 changes: 41 additions & 3 deletions beacon_node/lighthouse_network/src/peer_manager/peerdb/score.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
//! As the logic develops this documentation will advance.
//!
//! The scoring algorithms are currently experimental.
use super::ScoreTransitionResult;
use crate::service::gossipsub_scoring_parameters::GREYLIST_THRESHOLD as GOSSIPSUB_GREYLIST_THRESHOLD;
use serde::Serialize;
use std::cmp::Ordering;
use std::sync::LazyLock;
use std::time::Instant;
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use strum::AsRefStr;
use tokio::time::Duration;

Expand Down Expand Up @@ -39,11 +40,48 @@ const GOSSIPSUB_NEGATIVE_SCORE_WEIGHT: f64 =
(MIN_SCORE_BEFORE_DISCONNECT + 1.0) / GOSSIPSUB_GREYLIST_THRESHOLD;
const GOSSIPSUB_POSITIVE_SCORE_WEIGHT: f64 = GOSSIPSUB_NEGATIVE_SCORE_WEIGHT;

pub(crate) const MAX_STORED_PENALTY_RECORDS: usize = 20;

#[derive(Clone, Debug, Serialize)]
pub struct PenaltyRecord {
/// The action that caused the penalty
pub action: PeerAction,
/// Where the penalty came from
pub source: ReportSource,
/// The penalty message
pub msg: String,
/// The result of the penalty
pub result: ScoreTransitionResult,
/// The time when the penalty occured in unix millis
pub time_stamp: u128,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub time_stamp: u128,
pub timestamp_ms: u128,

}

impl PenaltyRecord {
/// Create a new penalty record
pub fn new(
action: PeerAction,
source: ReportSource,
msg: impl Into<String>,
result: ScoreTransitionResult,
) -> PenaltyRecord {
let time_stamp: u128 = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_or(0, |dur| dur.as_millis());
PenaltyRecord {
action: action,
source: source,
msg: msg.into(),
result: result,
time_stamp: time_stamp,
}
}
}

/// A collection of actions a peer can perform which will adjust its score.
/// Each variant has an associated score change.
// To easily assess the behaviour of scores changes the number of variants should stay low, and
// somewhat generic.
#[derive(Debug, Clone, Copy, AsRefStr)]
#[derive(Debug, Clone, Copy, AsRefStr, Serialize)]
#[strum(serialize_all = "snake_case")]
pub enum PeerAction {
/// We should not communicate more with this peer.
Expand All @@ -66,7 +104,7 @@ pub enum PeerAction {
}

/// Service reporting a `PeerAction` for a peer.
#[derive(Debug)]
#[derive(Clone, Debug, Serialize, AsRefStr)]
pub enum ReportSource {
Gossipsub,
RPC,
Expand Down