Skip to content

[Feature] Prometheus Metrics for Telemetry #3628

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ metrics = [
"snarkos-node-tcp/metrics"
]
history = [ "snarkos-node-rest/history" ]
telemetry = [ "snarkos-node-bft/telemetry", "snarkos-node-rest/telemetry" ]
telemetry = [ "snarkos-node-bft/telemetry", "snarkos-node-consensus/telemetry", "snarkos-node-rest/telemetry" ]
cuda = [
"snarkvm/cuda",
"snarkos-account/cuda",
Expand Down
55 changes: 32 additions & 23 deletions node/bft/src/helpers/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ use snarkvm::{
committee::Committee,
narwhal::{BatchCertificate, BatchHeader, Subdag},
},
prelude::{Address, Field, Network, cfg_iter},
prelude::{Address, Field, Network, cfg_chunks, cfg_iter},
};

use indexmap::{IndexMap, IndexSet};
use parking_lot::RwLock;
use rayon::prelude::*;
use std::sync::Arc;
use std::{collections::BTreeMap, sync::Arc};

// TODO: Consider other metrics to track:
// - Response time
Expand All @@ -41,7 +41,7 @@ use std::sync::Arc;
pub struct Telemetry<N: Network> {
/// The certificates seen for each round
/// A mapping of `round` to set of certificate IDs.
tracked_certificates: Arc<RwLock<IndexMap<u64, IndexSet<Field<N>>>>>,
tracked_certificates: Arc<RwLock<BTreeMap<u64, IndexSet<Field<N>>>>>,

/// The total number of signatures seen for a validator, including for their own certificates.
/// A mapping of `address` to a mapping of `round` to `count`.
Expand All @@ -51,8 +51,11 @@ pub struct Telemetry<N: Network> {
/// A mapping of `address` to a list of rounds.
validator_certificates: Arc<RwLock<IndexMap<Address<N>, IndexSet<u64>>>>,

/// The certificate and signature participation scores for each validator.
participation_scores: Arc<RwLock<IndexMap<Address<N>, (f64, f64)>>>,
/// The certificate, signature, and combined participation scores for each validator.
/// Certificate Score: The % of rounds the validator has a valid certificate
/// Signature Score: The % of certificates the validator has a valid signature for
/// Combined Score: The weighted score using the certificate and signature scores
participation_scores: Arc<RwLock<IndexMap<Address<N>, (f64, f64, f64)>>>,
}

impl<N: Network> Default for Telemetry<N> {
Expand All @@ -76,27 +79,15 @@ impl<N: Network> Telemetry<N> {
// TODO (raychu86): Consider using committee lookback here.
/// Fetch the participation scores for each validator in the committee set.
pub fn get_participation_scores(&self, committee: &Committee<N>) -> IndexMap<Address<N>, f64> {
// Calculate the combined score with custom weights:
// - 90% certificate participation score
// - 10% signature participation score
fn weighted_score(certificate_score: f64, signature_score: f64) -> f64 {
let score = (0.9 * certificate_score) + (0.1 * signature_score);

// Truncate to the last 2 decimal places.
(score * 100.0).round() / 100.0
}

// Fetch the participation scores.
let participation_scores = self.participation_scores.read();
// Calculate the weighted score for each validator.
committee
.members()
.iter()
.map(|(address, _)| {
let score = participation_scores
.get(address)
.map(|(certificate_score, signature_score)| weighted_score(*certificate_score, *signature_score))
.unwrap_or(0.0);
let score =
participation_scores.get(address).map(|(_, _, combined_score)| *combined_score).unwrap_or(0.0);
(*address, score)
})
.collect()
Expand Down Expand Up @@ -163,13 +154,22 @@ impl<N: Network> Telemetry<N> {

/// Calculate and update the participation scores for each validator.
pub fn update_participation_scores(&self) {
// Calculate the combined score with custom weights:
// - 90% certificate participation score
// - 10% signature participation score
fn weighted_score(certificate_score: f64, signature_score: f64) -> f64 {
let score = (0.9 * certificate_score) + (0.1 * signature_score);

// Truncate to the last 2 decimal places.
(score * 100.0).round() / 100.0
}

// Fetch the certificates and signatures.
let tracked_certificates = self.tracked_certificates.read();
let validator_signatures = self.validator_signatures.read();
let validator_certificates = self.validator_certificates.read();

// Fetch the total number of certificates.
let num_certificate_rounds = tracked_certificates.len();
let total_certificates = validator_certificates.values().map(|rounds| rounds.len()).sum::<usize>();

// Calculate the signature participation scores for each validator.
Expand All @@ -182,10 +182,18 @@ impl<N: Network> Telemetry<N> {
.collect();

// Calculate the certificate participation scores for each validator.
// This score is based on how many certificates the validator has included in every two rounds.
let tracked_rounds: Vec<_> = tracked_certificates.keys().skip_while(|r| *r % 2 == 0).copied().collect();
let certificate_participation_scores: IndexMap<_, _> = cfg_iter!(validator_certificates)
.map(|(address, certificate_rounds)| {
// Calculate a rough score for the validator based on the number of certificates seen.
let score = certificate_rounds.len() as f64 / num_certificate_rounds as f64 * 100.0;
// Count the number of round pairs that are included in the certificate rounds.
let num_included_round_pairs = cfg_chunks!(tracked_rounds, 2)
.filter(|chunk| chunk.iter().any(|r| certificate_rounds.contains(r)))
.count();
// Calculate the number of round pairs.
let num_round_pairs = (tracked_rounds.len().saturating_add(1)).saturating_div(2);
// Calculate the score based on the number of certificate rounds the validator is a part of.
let score = num_included_round_pairs as f64 / num_round_pairs.max(1) as f64 * 100.0;
(*address, score as u16)
})
.collect();
Expand All @@ -197,7 +205,8 @@ impl<N: Network> Telemetry<N> {
for address in validator_addresses {
let signature_score = *signature_participation_scores.get(&address).unwrap_or(&0) as f64;
let certificate_score = *certificate_participation_scores.get(&address).unwrap_or(&0) as f64;
new_participation_scores.insert(address, (certificate_score, signature_score));
let combined_score = weighted_score(certificate_score, signature_score);
new_participation_scores.insert(address, (certificate_score, signature_score, combined_score));
}

// Update the participation scores.
Expand Down
1 change: 1 addition & 0 deletions node/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ locktick = [
"snarkvm/locktick"
]
metrics = [ "dep:metrics" ]
telemetry = [ "snarkos-node-bft/telemetry" ]
cuda = [ "snarkvm/cuda", "snarkos-account/cuda", "snarkos-node-bft-ledger-service/cuda" ]

[dependencies.aleo-std]
Expand Down
20 changes: 20 additions & 0 deletions node/consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,9 @@ impl<N: Network> Consensus<N> {
self.ledger.check_next_block(&next_block)?;
// Advance to the next block.
self.ledger.advance_to_next_block(&next_block)?;
#[cfg(feature = "telemetry")]
// Fetch the latest committee
let latest_committee = self.ledger.current_committee()?;

// If the next block starts a new epoch, clear the existing solutions.
if next_block.height() % N::NUM_BLOCKS_PER_EPOCH == 0 {
Expand All @@ -553,6 +556,23 @@ impl<N: Network> Consensus<N> {
metrics::gauge(metrics::blocks::PROOF_TARGET, proof_target as f64);
metrics::gauge(metrics::blocks::COINBASE_TARGET, coinbase_target as f64);
metrics::gauge(metrics::blocks::CUMULATIVE_PROOF_TARGET, cumulative_proof_target as f64);

#[cfg(feature = "telemetry")]
{
// Retrieve the latest participation scores.
let participation_scores =
self.bft().primary().gateway().validator_telemetry().get_participation_scores(&latest_committee);

// Log the participation scores.
for (address, participation_score) in participation_scores {
metrics::histogram_label(
metrics::consensus::VALIDATOR_PARTICIPATION,
"validator_address",
address.to_string(),
participation_score,
)
}
}
}
Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions node/metrics/src/names.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ pub mod consensus {
pub const UNCONFIRMED_SOLUTIONS: &str = "snarkos_consensus_unconfirmed_solutions_total";
pub const TRANSMISSION_LATENCY: &str = "snarkos_consensus_transmission_latency";
pub const STALE_UNCONFIRMED_TRANSMISSIONS: &str = "snarkos_consensus_stale_unconfirmed_transmissions";
pub const VALIDATOR_PARTICIPATION: &str = "snarkos_consensus_validator_participation";
}

pub mod router {
Expand Down
2 changes: 1 addition & 1 deletion node/rest/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ edition = "2021"
default = [ "parallel" ]
parallel = [ "rayon" ]
history = [ "snarkvm-synthesizer/history" ]
telemetry = [ ]
telemetry = [ "snarkos-node-consensus/telemetry" ]
cuda = [ "snarkvm/cuda", "snarkos-node-consensus/cuda", "snarkos-node-router/cuda" ]
locktick = [
"dep:locktick",
Expand Down