Skip to content

Network metrics per client #7445

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion beacon_node/lighthouse_network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ pub static RESPONSE_IDLING: LazyLock<Result<Histogram>> = LazyLock::new(|| {
)
});

pub static BYTES_RECEIVED_PER_CLIENT: LazyLock<Result<IntCounterVec>> = LazyLock::new(|| {
try_create_int_counter_vec(
"bytes_received_per_client",
"Total bytes received per client type",
&["Client"],
)
});

pub fn scrape_discovery_metrics() {
let metrics =
discv5::metrics::Metrics::from(discv5::Discv5::<discv5::DefaultProtocolId>::raw_metrics());
Expand All @@ -229,4 +237,4 @@ pub fn scrape_discovery_metrics() {
set_gauge_vec(&DISCOVERY_BYTES, &["outbound"], metrics.bytes_sent as i64);
set_gauge_vec(&NAT_OPEN, &["discv5_ipv4"], metrics.ipv4_contactable as i64);
set_gauge_vec(&NAT_OPEN, &["discv5_ipv6"], metrics.ipv6_contactable as i64);
}
}
16 changes: 16 additions & 0 deletions beacon_node/lighthouse_network/src/peer_manager/peerdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,11 @@ impl<E: EthSpec> PeerDB<E> {
}
}

/// Returns the sync start time of the peer if it exists
pub fn sync_start_time(&self, peer_id: &PeerId) -> Option<&Instant> {
self.peers.get(peer_id).and_then(|info| info.sync_start_time())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't follow the meaning/purpose of sync start time, could you detail it?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this to calculate sync time per client. When a sync request is received I update this parameter to record the time when the sync started here and later when the state of the peer changes to is_synced here I calculate the time taken to include in the metric.

}

/// Returns the current [`BanResult`] of the peer if banned. This doesn't check the connection state, rather the
/// underlying score of the peer. A peer may be banned but still in the connected state
/// temporarily.
Expand Down Expand Up @@ -395,6 +400,17 @@ impl<E: EthSpec> PeerDB<E> {
Some(info.update_sync_status(sync_status))
}

/// Updates the sync start time for a peer
pub fn update_sync_start_time(
&mut self,
peer_id: &PeerId,
sync_start_time: Instant
) {
if let Some(info) = self.peers.get_mut(peer_id) {
info.update_sync_start_time(sync_start_time);
}
}

/// Updates the scores of known peers according to their connection status and the time that
/// has passed. This function returns a list of peers that have been unbanned.
/// NOTE: Peer scores cannot be penalized during the update, they can only increase. Therefore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ pub struct PeerInfo<E: EthSpec> {
connection_direction: Option<ConnectionDirection>,
/// The enr of the peer, if known.
enr: Option<Enr>,
/// The time the peer was started syncing
#[serde(skip)]
sync_start_time: Option<Instant>,
}

impl<E: EthSpec> Default for PeerInfo<E> {
Expand All @@ -75,6 +78,7 @@ impl<E: EthSpec> Default for PeerInfo<E> {
is_trusted: false,
connection_direction: None,
enr: None,
sync_start_time: None,
}
}
}
Expand Down Expand Up @@ -148,6 +152,11 @@ impl<E: EthSpec> PeerInfo<E> {
&self.sync_status
}

/// Returns the sync start time of the peer.
pub fn sync_start_time(&self) -> Option<&Instant> {
self.sync_start_time.as_ref()
}

/// Returns the metadata for the peer if currently known.
pub fn meta_data(&self) -> Option<&MetaData<E>> {
self.meta_data.as_ref()
Expand Down Expand Up @@ -371,6 +380,11 @@ impl<E: EthSpec> PeerInfo<E> {
self.client = client
}

/// Updates the sync start time. Returns true if the start time was changed.
pub fn update_sync_start_time(&mut self, sync_start_time: Instant) {
self.sync_start_time = Some(sync_start_time);
}

/// Replaces the current listening addresses with those specified, returning the current
/// listening addresses.
// VISIBILITY: The peer manager is able to set the listening addresses
Expand Down
13 changes: 12 additions & 1 deletion beacon_node/lighthouse_network/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1507,6 +1507,7 @@ impl<E: EthSpec> Network<E> {
message_id: id,
message: gs_msg,
} => {
self.track_bytes_received_per_client(propagation_source, gs_msg.data.len());
// Note: We are keeping track here of the peer that sent us the message, not the
// peer that originally published the message.
match PubsubMessage::decode(&gs_msg.topic, &gs_msg.data, &self.fork_context) {
Expand Down Expand Up @@ -2221,4 +2222,14 @@ impl<E: EthSpec> Network<E> {
}
}
}
}

pub fn track_bytes_received_per_client(&mut self, peer_id: PeerId, bytes_len: usize) {
let client = self.network_globals.client(&peer_id).kind.to_string();
// update metric for bytes received per client
metrics::inc_counter_vec_by(
&metrics::BYTES_RECEIVED_PER_CLIENT,
&[&client],
bytes_len as u64,
);
}
}
41 changes: 41 additions & 0 deletions beacon_node/network/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,47 @@ pub static SAMPLING_REQUEST_RESULT: LazyLock<Result<IntCounterVec>> = LazyLock::
)
});

pub static BLOCKS_RECEIVED_PER_CLIENT: LazyLock<Result<IntCounterVec>> = LazyLock::new(|| {
try_create_int_counter_vec(
"blocks_received_per_client",
"Total number of blocks received for sync per client type",
&["Client"],
)
});

pub static GOSSIP_BLOCKS_IMPORTED_PER_CLIENT: LazyLock<Result<IntCounterVec>> = LazyLock::new(|| {
try_create_int_counter_vec(
"gossip_blocks_imported_per_client",
"Total number of blocks imported via gossipper client type",
&["Client"],
)
});

pub static RPC_BLOCKS_IMPORTED_PER_CLIENT: LazyLock<Result<IntCounterVec>> = LazyLock::new(|| {
try_create_int_counter_vec(
"gossip_blocks_imported_per_client",
"Total number of blocks imported via gossipper client type",
&["Client"],
)
});

pub static SYNC_TIME_PER_CLIENT: LazyLock<Result<HistogramVec>> = LazyLock::new(|| {
try_create_histogram_vec_with_buckets(
"client_sync_time_seconds",
"Time taken to sync with a client.",
Ok(vec![0.1, 0.2, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0, 300.0]),
&["Client"],
)
});

pub static MESSAGES_RECEIVED_PER_CLIENT: LazyLock<Result<IntCounterVec>> = LazyLock::new(|| {
try_create_int_counter_vec(
"messages_received_per_client",
"Total messages received per client type and object type",
&["client", "object_type"],
)
});

pub fn register_finality_update_error(error: &LightClientFinalityUpdateError) {
inc_counter_vec(&GOSSIP_FINALITY_UPDATE_ERRORS_PER_TYPE, &[error.as_ref()]);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_VERIFIED_TOTAL,
);
self.track_messages_received_per_client(peer_id, "gossip_attestation");

if let Err(e) = self
.chain
Expand Down Expand Up @@ -446,6 +447,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
match conversion_result {
Ok(Ok(attestation)) => {
let slot = attestation.data().slot;
self.track_messages_received_per_client(peer_id, "gossip_attestation");
if let Err(e) = self.send_unaggregated_attestation(
message_id.clone(),
peer_id,
Expand Down Expand Up @@ -683,6 +685,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
metrics::inc_counter(
&metrics::BEACON_PROCESSOR_AGGREGATED_ATTESTATION_VERIFIED_TOTAL,
);
self.track_messages_received_per_client(peer_id, "gossip_aggregate");

if let Err(e) = self
.chain
Expand Down Expand Up @@ -759,6 +762,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
&metrics::BEACON_DATA_COLUMN_GOSSIP_SLOT_START_DELAY_TIME,
delay,
);

match self
.chain
.verify_data_column_sidecar_for_gossip(column_sidecar.clone(), *subnet_id)
Expand Down Expand Up @@ -901,6 +905,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let delay = get_slot_delay_ms(seen_duration, slot, &self.chain.slot_clock);
// Log metrics to track delay from other nodes on the network.
metrics::set_gauge(&metrics::BEACON_BLOB_DELAY_GOSSIP, delay.as_millis() as i64);

match self
.chain
.verify_blob_sidecar_for_gossip(blob_sidecar.clone(), blob_index)
Expand Down Expand Up @@ -1062,9 +1067,11 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let block_root = verified_blob.block_root();
let blob_slot = verified_blob.slot();
let blob_index = verified_blob.id().index;
let client = self.network_globals.client(&peer_id).kind.to_string();

let result = self.chain.process_gossip_blob(verified_blob).await;
register_process_result_metrics(&result, metrics::BlockSource::Gossip, "blob");
self.track_messages_received_per_client(peer_id, "gossip_blob");

match &result {
Ok(AvailabilityProcessingStatus::Imported(block_root)) => {
Expand Down Expand Up @@ -1112,11 +1119,14 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
// If a block is in the da_checker, sync maybe awaiting for an event when block is finally
// imported. A block can become imported both after processing a block or blob. If a
// importing a block results in `Imported`, notify. Do not notify of blob errors.
if matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))) {
self.send_sync_message(SyncMessage::GossipBlockProcessResult {
block_root,
imported: true,
});
let imported = matches!(result, Ok(AvailabilityProcessingStatus::Imported(_)));
self.send_sync_message(SyncMessage::GossipBlockProcessResult {
block_root,
imported,
});

if imported {
metrics::inc_counter_vec(&metrics::GOSSIP_BLOCKS_IMPORTED_PER_CLIENT, &[&client]);
}
}

Expand All @@ -1137,7 +1147,8 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
.process_gossip_data_columns(vec![verified_data_column], || Ok(()))
.await;
register_process_result_metrics(&result, metrics::BlockSource::Gossip, "data_column");

self.track_messages_received_per_client(peer_id, "gossip_data_column");

match result {
Ok(availability) => match availability {
AvailabilityProcessingStatus::Imported(block_root) => {
Expand Down Expand Up @@ -1533,6 +1544,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
invalid_block_storage: InvalidBlockStorage,
_seen_duration: Duration,
) {
self.track_messages_received_per_client(peer_id, "gossip_block");
let processing_start_time = Instant::now();
let block = verified_block.block.block_cloned();
let block_root = verified_block.block_root;
Expand Down Expand Up @@ -1677,10 +1689,15 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
self.maybe_store_invalid_block(&invalid_block_storage, block_root, &block, e);
}

let client = self.network_globals.client(&peer_id).kind.to_string();
let imported = matches!(result, Ok(AvailabilityProcessingStatus::Imported(_)));
self.send_sync_message(SyncMessage::GossipBlockProcessResult {
block_root,
imported: matches!(result, Ok(AvailabilityProcessingStatus::Imported(_))),
imported,
});
if imported {
metrics::inc_counter_vec(&metrics::GOSSIP_BLOCKS_IMPORTED_PER_CLIENT, &[&client]);
}
}

pub fn process_gossip_voluntary_exit(
Expand Down Expand Up @@ -1723,6 +1740,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
};

metrics::inc_counter(&metrics::BEACON_PROCESSOR_EXIT_VERIFIED_TOTAL);
self.track_messages_received_per_client(peer_id, "gossip_voluntary_exit");

self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);

Expand Down Expand Up @@ -1784,6 +1802,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
};

metrics::inc_counter(&metrics::BEACON_PROCESSOR_PROPOSER_SLASHING_VERIFIED_TOTAL);
self.track_messages_received_per_client(peer_id, "gossip_proposer_slashing");

self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);

Expand Down Expand Up @@ -1837,6 +1856,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
};

metrics::inc_counter(&metrics::BEACON_PROCESSOR_ATTESTER_SLASHING_VERIFIED_TOTAL);
self.track_messages_received_per_client(peer_id, "gossip_attester_slashing");

self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);

Expand Down Expand Up @@ -1906,6 +1926,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
};

metrics::inc_counter(&metrics::BEACON_PROCESSOR_BLS_TO_EXECUTION_CHANGE_VERIFIED_TOTAL);
self.track_messages_received_per_client(peer_id, "gossip_bls_to_execution_change");

self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);

Expand Down Expand Up @@ -1960,6 +1981,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {

// If the message is still timely, propagate it.
self.propagate_sync_message_if_timely(message_slot, message_id, peer_id);
self.track_messages_received_per_client(peer_id, "gossip_sync_committee_message");

// Register the sync signature with any monitored validators.
self.chain
Expand Down Expand Up @@ -2033,6 +2055,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
&self.chain.slot_clock,
);
metrics::inc_counter(&metrics::BEACON_PROCESSOR_SYNC_CONTRIBUTION_VERIFIED_TOTAL);
self.track_messages_received_per_client(peer_id, "gossip_sync_committee_contribution");

if let Err(e) = self
.chain
Expand Down Expand Up @@ -2060,6 +2083,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
{
Ok(_verified_light_client_finality_update) => {
self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
self.track_messages_received_per_client(peer_id, "gossip_finality_update");
}
Err(e) => {
metrics::register_finality_update_error(&e);
Expand Down Expand Up @@ -2122,6 +2146,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
);

self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept);
self.track_messages_received_per_client(peer_id, "gossip_optimistic_update");
}
Err(e) => {
match e {
Expand Down Expand Up @@ -3246,4 +3271,12 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
write_file(error_path, error.to_string().as_bytes());
}
}
}

fn track_messages_received_per_client(&self, peer_id: PeerId, object_type: &'static str) {
let client = self.network_globals.client(&peer_id).kind.to_string();
metrics::inc_counter_vec(
&metrics::MESSAGES_RECEIVED_PER_CLIENT,
&[&client, object_type]
);
}
}
7 changes: 7 additions & 0 deletions beacon_node/network/src/sync/backfill_sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use std::collections::{
use std::sync::Arc;
use tracing::{debug, error, info, instrument, warn};
use types::{Epoch, EthSpec};
use crate::metrics::MESSAGES_RECEIVED_PER_CLIENT;

/// Blocks are downloaded in batches from peers. This constant specifies how many epochs worth of
/// blocks per batch are requested _at most_. A batch may request less blocks to account for
Expand Down Expand Up @@ -410,6 +411,12 @@ impl<T: BeaconChainTypes> BackFillSync<T> {
"Completed batch received"
);

let client = network.client_type(peer_id).kind.to_string();
metrics::inc_counter_vec(
&MESSAGES_RECEIVED_PER_CLIENT,
&[&client, "backfill_sync_blocks"]
);

// pre-emptively request more blocks from peers whilst we process current blocks,
self.request_batches(network)?;
self.process_completed_batches(network)
Expand Down
Loading