diff --git a/Cargo.lock b/Cargo.lock index 79bada330b537..255779e26545a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15064,6 +15064,7 @@ name = "polkadot-availability-distribution" version = "7.0.0" dependencies = [ "assert_matches", + "async-trait", "fatality", "futures", "futures-timer", @@ -15080,6 +15081,7 @@ dependencies = [ "rand 0.8.5", "rstest", "sc-network", + "sc-network-types", "schnellru", "sp-core 28.0.0", "sp-keyring", @@ -15759,6 +15761,27 @@ dependencies = [ "tracing-gum", ] +[[package]] +name = "polkadot-node-core-rewards-statistics-collector" +version = "7.0.0" +dependencies = [ + "assert_matches", + "fatality", + "futures", + "polkadot-node-primitives", + "polkadot-node-subsystem", + "polkadot-node-subsystem-test-helpers", + "polkadot-node-subsystem-util", + "polkadot-primitives", + "sp-application-crypto", + "sp-authority-discovery", + "sp-core 28.0.0", + "sp-keyring", + "sp-tracing 16.0.0", + "thiserror 1.0.65", + "tracing-gum", +] + [[package]] name = "polkadot-node-core-runtime-api" version = "7.0.0" @@ -16550,6 +16573,7 @@ dependencies = [ "polkadot-node-core-pvf", "polkadot-node-core-pvf-checker", "polkadot-node-core-pvf-common", + "polkadot-node-core-rewards-statistics-collector", "polkadot-node-core-runtime-api", "polkadot-node-metrics", "polkadot-node-network-protocol", @@ -16909,6 +16933,7 @@ dependencies = [ "polkadot-node-core-provisioner", "polkadot-node-core-pvf", "polkadot-node-core-pvf-checker", + "polkadot-node-core-rewards-statistics-collector", "polkadot-node-core-runtime-api", "polkadot-node-network-protocol", "polkadot-node-primitives", @@ -17047,6 +17072,7 @@ dependencies = [ "polkadot-node-core-approval-voting-parallel", "polkadot-node-core-av-store", "polkadot-node-core-dispute-coordinator", + "polkadot-node-core-rewards-statistics-collector", "polkadot-node-metrics", "polkadot-node-network-protocol", "polkadot-node-primitives", diff --git a/Cargo.toml b/Cargo.toml index 34c921adbab38..9c2c92a549ae2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -178,6 +178,7 @@ members = [ "polkadot/node/core/pvf/common", "polkadot/node/core/pvf/execute-worker", "polkadot/node/core/pvf/prepare-worker", + "polkadot/node/core/rewards-statistics-collector", "polkadot/node/core/runtime-api", "polkadot/node/gum", "polkadot/node/gum/proc-macro", @@ -1140,6 +1141,7 @@ polkadot-node-core-pvf-checker = { path = "polkadot/node/core/pvf-checker", defa polkadot-node-core-pvf-common = { path = "polkadot/node/core/pvf/common", default-features = false } polkadot-node-core-pvf-execute-worker = { path = "polkadot/node/core/pvf/execute-worker", default-features = false } polkadot-node-core-pvf-prepare-worker = { path = "polkadot/node/core/pvf/prepare-worker", default-features = false } +polkadot-node-core-rewards-statistics-collector = { path = "polkadot/node/core/rewards-statistics-collector", default-features = false } polkadot-node-core-runtime-api = { path = "polkadot/node/core/runtime-api", default-features = false } polkadot-node-metrics = { path = "polkadot/node/metrics", default-features = false } polkadot-node-network-protocol = { path = "polkadot/node/network/protocol", default-features = false } diff --git a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs index d98f2a24f8736..d70c3b82e40a4 100644 --- a/cumulus/client/relay-chain-inprocess-interface/src/lib.rs +++ b/cumulus/client/relay-chain-inprocess-interface/src/lib.rs @@ -419,6 +419,7 @@ fn build_polkadot_full_node( keep_finalized_for: None, invulnerable_ah_collators: HashSet::new(), collator_protocol_hold_off: None, + verbose_approval_metrics: false, }; let (relay_chain_full_node, paranode_req_receiver) = match config.network.network_backend { diff --git a/cumulus/zombienet/zombienet-sdk-helpers/src/lib.rs b/cumulus/zombienet/zombienet-sdk-helpers/src/lib.rs index 54c1be83bfb14..5e48aa9849038 100644 --- a/cumulus/zombienet/zombienet-sdk-helpers/src/lib.rs +++ b/cumulus/zombienet/zombienet-sdk-helpers/src/lib.rs @@ -29,7 +29,7 @@ use zombienet_sdk::subxt::{ const WAIT_MAX_BLOCKS_FOR_SESSION: u32 = 50; /// Find an event in subxt `Events` and attempt to decode the fields of the event. -fn find_event_and_decode_fields( +pub fn find_event_and_decode_fields( events: &Events, pallet: &str, variant: &str, @@ -552,3 +552,16 @@ pub async fn wait_for_runtime_upgrade( Err(anyhow!("Did not find a runtime upgrade")) } + +/// Builds the prometheus labels +/// +/// This method receive the label and the attributes the label can have +/// useful for querying prometheus metrics data for testing +pub fn report_label_with_attributes(label: &str, attributes: Vec<(&str, &str)>) -> String { + let mut attrs: Vec = vec![]; + for (k, v) in attributes { + attrs.push(format!("{k}=\"{v}\"")); + } + let final_attrs = attrs.join(","); + format!("{label}{{{final_attrs}}}") +} diff --git a/polkadot/cli/src/cli.rs b/polkadot/cli/src/cli.rs index fa8595cc7c57c..ab1a6dbd907af 100644 --- a/polkadot/cli/src/cli.rs +++ b/polkadot/cli/src/cli.rs @@ -166,6 +166,11 @@ pub struct RunCmd { /// **Dangerous!** Do not touch unless explicitly advised to. #[arg(long, hide = true)] pub collator_protocol_hold_off: Option, + + /// Enable or disable per validator collected approvals metrics + /// to be published to prometheus. If not specified, set to false. + #[arg(long)] + pub verbose_approval_metrics: bool, } #[allow(missing_docs)] diff --git a/polkadot/cli/src/command.rs b/polkadot/cli/src/command.rs index ba31f3130eff6..be84d6ca94a91 100644 --- a/polkadot/cli/src/command.rs +++ b/polkadot/cli/src/command.rs @@ -278,6 +278,7 @@ where telemetry_worker_handle: None, node_version, secure_validator_mode, + verbose_approval_metrics: cli.run.verbose_approval_metrics, workers_path: cli.run.workers_path, workers_names: None, overseer_gen, diff --git a/polkadot/node/core/approval-voting/benches/approval-voting-regression-bench.rs b/polkadot/node/core/approval-voting/benches/approval-voting-regression-bench.rs index 70ec6bff440b7..8e2b4693f19dd 100644 --- a/polkadot/node/core/approval-voting/benches/approval-voting-regression-bench.rs +++ b/polkadot/node/core/approval-voting/benches/approval-voting-regression-bench.rs @@ -83,6 +83,11 @@ fn main() -> Result<(), String> { ("Sent to peers", 63995.2200, 0.01), ])); messages.extend(average_usage.check_cpu_usage(&[("approval-voting-parallel", 12.3817, 0.1)])); + messages.extend(average_usage.check_cpu_usage(&[( + "rewards-statistics-collector", + 0.0085, + 0.0009, + )])); if messages.is_empty() { Ok(()) diff --git a/polkadot/node/core/approval-voting/src/import.rs b/polkadot/node/core/approval-voting/src/import.rs index 692fbff4cb8f1..5d3fe8c40dfdd 100644 --- a/polkadot/node/core/approval-voting/src/import.rs +++ b/polkadot/node/core/approval-voting/src/import.rs @@ -65,9 +65,9 @@ use crate::{ persisted_entries::CandidateEntry, }; -use polkadot_node_primitives::approval::time::{slot_number_to_tick, Tick}; - use super::{State, LOG_TARGET}; +use polkadot_node_primitives::approval::time::{slot_number_to_tick, Tick}; +use polkadot_node_subsystem::messages::RewardsStatisticsCollectorMessage; #[derive(Debug)] struct ImportedBlockInfo { @@ -337,7 +337,8 @@ pub struct BlockImportedCandidates { pub(crate) async fn handle_new_head< Sender: SubsystemSender + SubsystemSender - + SubsystemSender, + + SubsystemSender + + SubsystemSender, AVSender: SubsystemSender, B: Backend, >( diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index e01368979835b..98455b5080bc8 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -95,7 +95,7 @@ use persisted_entries::{ApprovalEntry, BlockEntry, CandidateEntry}; use polkadot_node_primitives::approval::time::{ slot_number_to_tick, Clock, ClockExt, DelayedApprovalTimer, SystemClock, Tick, }; - +use polkadot_node_subsystem::messages::RewardsStatisticsCollectorMessage; mod approval_checking; pub mod approval_db; mod backend; @@ -1249,6 +1249,7 @@ async fn run< Sender: SubsystemSender + SubsystemSender + SubsystemSender + + SubsystemSender + SubsystemSender + SubsystemSender + SubsystemSender @@ -1484,6 +1485,7 @@ pub async fn start_approval_worker< + SubsystemSender + SubsystemSender + SubsystemSender + + SubsystemSender + SubsystemSender + SubsystemSender + Clone, @@ -1563,6 +1565,7 @@ async fn handle_actions< + SubsystemSender + SubsystemSender + SubsystemSender + + SubsystemSender + Clone, ADSender: SubsystemSender, >( @@ -2029,6 +2032,7 @@ async fn handle_from_overseer< Sender: SubsystemSender + SubsystemSender + SubsystemSender + + SubsystemSender + Clone, ADSender: SubsystemSender, >( @@ -2609,11 +2613,10 @@ fn schedule_wakeup_action( block_hash: Hash, block_number: BlockNumber, candidate_hash: CandidateHash, - block_tick: Tick, + approval_status: &ApprovalStatus, tick_now: Tick, - required_tranches: RequiredTranches, ) -> Option { - let maybe_action = match required_tranches { + let maybe_action = match approval_status.required_tranches { _ if approval_entry.is_approved() => None, RequiredTranches::All => None, RequiredTranches::Exact { next_no_show, last_assignment_tick, .. } => { @@ -2652,7 +2655,7 @@ fn schedule_wakeup_action( // Apply the clock drift to these tranches. min_prefer_some(next_announced, our_untriggered) - .map(|t| t as Tick + block_tick + clock_drift) + .map(|t| t as Tick + approval_status.block_tick + clock_drift) }; min_prefer_some(next_non_empty_tranche, next_no_show).map(|tick| { @@ -2667,14 +2670,14 @@ fn schedule_wakeup_action( tick, ?candidate_hash, ?block_hash, - block_tick, + approval_status.block_tick, "Scheduling next wakeup.", ), None => gum::trace!( target: LOG_TARGET, ?candidate_hash, ?block_hash, - block_tick, + approval_status.block_tick, "No wakeup needed.", ), Some(_) => {}, // unreachable @@ -2852,9 +2855,8 @@ where block_entry.block_hash(), block_entry.block_number(), *assigned_candidate_hash, - status.block_tick, + &status, tick_now, - status.required_tranches, )); } @@ -2906,7 +2908,7 @@ async fn import_approval( wakeups: &Wakeups, ) -> SubsystemResult<(Vec, ApprovalCheckResult)> where - Sender: SubsystemSender, + Sender: SubsystemSender + SubsystemSender, { macro_rules! respond_early { ($e: expr) => {{ @@ -3059,7 +3061,7 @@ async fn advance_approval_state( wakeups: &Wakeups, ) -> Vec where - Sender: SubsystemSender, + Sender: SubsystemSender + SubsystemSender, { let validator_index = transition.validator_index(); @@ -3184,17 +3186,18 @@ where if is_approved { approval_entry.mark_approved(); } + if newly_approved { state.record_no_shows(session_index, para_id.into(), &status.no_show_validators); } + actions.extend(schedule_wakeup_action( &approval_entry, block_hash, block_number, candidate_hash, - status.block_tick, + &status, tick_now, - status.required_tranches, )); if is_approved && transition.is_remote_approval() { @@ -3234,6 +3237,35 @@ where } } } + + if newly_approved { + gum::debug!( + target: LOG_TARGET, + ?block_hash, + ?candidate_hash, + "Candidate newly approved, collecting useful approvals..." + ); + + collect_useful_approvals(sender, &status, block_hash, block_number, &candidate_entry); + + if status.no_show_validators.len() > 0 { + _ = sender + .try_send_message(RewardsStatisticsCollectorMessage::NoShows( + block_hash, + block_number, + status.no_show_validators, + )) + .map_err(|_| { + gum::warn!( + target: LOG_TARGET, + ?candidate_hash, + ?block_hash, + "Failed to send no shows to reward statistics subsystem", + ); + }); + } + } + // We have no need to write the candidate entry if all of the following // is true: // @@ -3247,7 +3279,7 @@ where // In all other cases, we need to write the candidate entry. db.write_candidate_entry(candidate_entry); } - } + }; actions } @@ -3286,7 +3318,7 @@ fn should_trigger_assignment( } } -async fn process_wakeup>( +async fn process_wakeup( sender: &mut Sender, state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, @@ -3295,7 +3327,10 @@ async fn process_wakeup>( candidate_hash: CandidateHash, metrics: &Metrics, wakeups: &Wakeups, -) -> SubsystemResult> { +) -> SubsystemResult> +where + Sender: SubsystemSender + SubsystemSender, +{ let block_entry = db.load_block_entry(&relay_block)?; let candidate_entry = db.load_candidate_entry(&candidate_hash)?; @@ -3707,7 +3742,7 @@ async fn launch_approval< // have been done. #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] async fn issue_approval< - Sender: SubsystemSender, + Sender: SubsystemSender + SubsystemSender, ADSender: SubsystemSender, >( sender: &mut Sender, @@ -4079,3 +4114,80 @@ fn compute_delayed_approval_sending_tick( metrics.on_delayed_approval(sign_no_later_than.checked_sub(tick_now).unwrap_or_default()); sign_no_later_than } + +// collect all the approvals required to approve the +// candidate, ignoring any other approval that belongs +// to not required tranches +fn collect_useful_approvals( + sender: &mut Sender, + status: &ApprovalStatus, + block_hash: Hash, + block_number: BlockNumber, + candidate_entry: &CandidateEntry, +) where + Sender: SubsystemSender, +{ + let candidate_hash = candidate_entry.candidate.hash(); + let candidate_approvals = candidate_entry.approvals(); + + let approval_entry = match candidate_entry.approval_entry(&block_hash) { + Some(approval_entry) => approval_entry, + None => { + gum::warn!( + target: LOG_TARGET, + ?block_hash, + ?block_number, + ?candidate_hash, + "approval entry not found, cannot collect useful approvals." + ); + return + }, + }; + + let collected_useful_approvals: Vec = match status.required_tranches { + RequiredTranches::All => + candidate_approvals.iter_ones().map(|idx| ValidatorIndex(idx as _)).collect(), + RequiredTranches::Exact { needed, .. } => { + let mut assigned_mask = approval_entry.assignments_up_to(needed); + assigned_mask &= candidate_approvals; + assigned_mask.iter_ones().map(|idx| ValidatorIndex(idx as _)).collect() + }, + RequiredTranches::Pending { .. } => { + gum::warn!( + target: LOG_TARGET, + ?block_hash, + ?block_number, + ?candidate_hash, + "approval status required tranches still pending when collecting useful approvals" + ); + return + }, + }; + + if !collected_useful_approvals.is_empty() { + let useful_approvals = collected_useful_approvals.len(); + gum::debug!( + target: LOG_TARGET, + ?block_hash, + ?block_number, + ?candidate_hash, + ?useful_approvals, + "collected useful approvals" + ); + + _ = sender + .try_send_message(RewardsStatisticsCollectorMessage::CandidateApproved( + block_hash, + block_number, + collected_useful_approvals, + )) + .map_err(|_| { + gum::warn!( + target: LOG_TARGET, + ?candidate_hash, + ?block_hash, + "Failed to send approvals to rewards statistics subsystem", + ); + }); + } +} diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs index db85f365dba3c..aed669099ee1e 100644 --- a/polkadot/node/core/approval-voting/src/tests.rs +++ b/polkadot/node/core/approval-voting/src/tests.rs @@ -650,6 +650,17 @@ fn make_candidate(para_id: ParaId, hash: &Hash) -> CandidateReceipt { r } +struct ExpectApprovalsStatsCollected { + block_hash: Hash, + validators: Vec, +} + +impl ExpectApprovalsStatsCollected { + fn new(block_hash: Hash, validators: Vec) -> Self { + Self { block_hash, validators } + } +} + async fn import_approval( overseer: &mut VirtualOverseer, block_hash: Hash, @@ -658,6 +669,7 @@ async fn import_approval( candidate_hash: CandidateHash, session_index: SessionIndex, expect_chain_approved: bool, + expected_approvals_stats_collected: Option, signature_opt: Option, ) -> oneshot::Receiver { let signature = signature_opt.unwrap_or(sign_approval( @@ -681,6 +693,21 @@ async fn import_approval( }, ) .await; + + if let Some(expected_stats_collected) = expected_approvals_stats_collected { + assert_matches!( + overseer_recv(overseer).await, + AllMessages::RewardsStatisticsCollector( + RewardsStatisticsCollectorMessage::CandidateApproved( + b_hash, _b_number, validators, + ) + ) => { + assert_eq!(b_hash, expected_stats_collected.block_hash); + assert_eq!(validators, expected_stats_collected.validators); + } + ); + } + if expect_chain_approved { assert_matches!( overseer_recv(overseer).await, @@ -1277,6 +1304,7 @@ fn subsystem_rejects_approval_if_no_candidate_entry() { session_index, false, None, + None, ) .await; @@ -1318,6 +1346,7 @@ fn subsystem_rejects_approval_if_no_block_entry() { session_index, false, None, + None, ) .await; @@ -1383,6 +1412,7 @@ fn subsystem_rejects_approval_before_assignment() { session_index, false, None, + None, ) .await; @@ -1638,6 +1668,7 @@ fn subsystem_accepts_and_imports_approval_after_assignment() { candidate_hash, session_index, true, + Some(ExpectApprovalsStatsCollected::new(block_hash, vec![validator])), None, ) .await; @@ -1729,6 +1760,7 @@ fn subsystem_second_approval_import_only_schedules_wakeups() { session_index, false, None, + None, ) .await; @@ -1746,6 +1778,7 @@ fn subsystem_second_approval_import_only_schedules_wakeups() { session_index, false, None, + None, ) .await; @@ -1963,6 +1996,7 @@ fn test_approvals_on_fork_are_always_considered_after_no_show( 1, false, None, + None, ) .await; @@ -2356,6 +2390,7 @@ fn import_checked_approval_updates_entries_and_schedules() { candidate_hash, session_index, false, + None, Some(sig_a), ) .await; @@ -2383,6 +2418,10 @@ fn import_checked_approval_updates_entries_and_schedules() { candidate_hash, session_index, true, + Some(ExpectApprovalsStatsCollected::new( + block_hash, + vec![validator_index_a, validator_index_b], + )), Some(sig_b), ) .await; @@ -2511,6 +2550,15 @@ fn subsystem_import_checked_approval_sets_one_block_bit_at_a_time() { } else { sign_approval(Sr25519Keyring::Bob, *candidate_hash, session_index) }; + + let expected_stats_collected = if i == 1 { + Some(ExpectApprovalsStatsCollected::new(block_hash, vec![validator1, validator2])) + } else if i == 3 { + Some(ExpectApprovalsStatsCollected::new(block_hash, vec![validator1, validator2])) + } else { + None + }; + let rx = import_approval( &mut virtual_overseer, block_hash, @@ -2519,6 +2567,7 @@ fn subsystem_import_checked_approval_sets_one_block_bit_at_a_time() { *candidate_hash, session_index, expect_block_approved, + expected_stats_collected, Some(signature), ) .await; @@ -2791,6 +2840,7 @@ fn approved_ancestor_test( candidate_hash, i as u32 + 1, true, + Some(ExpectApprovalsStatsCollected::new(*block_hash, vec![validator])), None, ) .await; @@ -3362,8 +3412,20 @@ where } let n_validators = validators.len(); + let to_collect = min((n_validators / 3) + 1, approvals_to_import.len()); + let validators_collected = approvals_to_import[0..to_collect] + .iter() + .map(|validator_index| ValidatorIndex(*validator_index)) + .collect::>(); + for (i, &validator_index) in approvals_to_import.iter().enumerate() { let expect_chain_approved = 3 * (i + 1) > n_validators; + let expect_approvals_stats_collected = if expect_chain_approved { + Some(ExpectApprovalsStatsCollected::new(block_hash, validators_collected.clone())) + } else { + None + }; + let rx = import_approval( &mut virtual_overseer, block_hash, @@ -3372,6 +3434,7 @@ where candidate_hash, 1, expect_chain_approved, + expect_approvals_stats_collected, Some(sign_approval(validators[validator_index as usize], candidate_hash, 1)), ) .await; @@ -3731,6 +3794,7 @@ fn pre_covers_dont_stall_approval() { candidate_hash, session_index, false, + None, Some(sig_b), ) .await; @@ -3746,6 +3810,7 @@ fn pre_covers_dont_stall_approval() { candidate_hash, session_index, false, + None, Some(sig_c), ) .await; @@ -3763,7 +3828,7 @@ fn pre_covers_dont_stall_approval() { // that may not be the reality from the database's perspective. This could be avoided // entirely by having replies processed after database writes, but that would constitute a // larger refactor and incur a performance penalty. - futures_timer::Delay::new(Duration::from_millis(100)).await; + Delay::new(Duration::from_millis(100)).await; // The candidate should not be approved. let candidate_entry = store.load_candidate_entry(&candidate_hash).unwrap().unwrap(); @@ -3775,13 +3840,33 @@ fn pre_covers_dont_stall_approval() { clock.inner.lock().set_tick(30); // Sleep to ensure we get a consistent read on the database. - futures_timer::Delay::new(Duration::from_millis(100)).await; + Delay::new(Duration::from_millis(100)).await; // The next wakeup should observe the assignment & approval from // tranche 1, and the no-show from tranche 0 should be immediately covered. assert_eq!(clock.inner.lock().next_wakeup(), Some(31)); clock.inner.lock().set_tick(31); + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RewardsStatisticsCollector(RewardsStatisticsCollectorMessage::CandidateApproved( + b_hash, _b_number, validators, + )) => { + assert_eq!(b_hash, block_hash); + assert_eq!(validators, vec![validator_index_b, validator_index_c]); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RewardsStatisticsCollector(RewardsStatisticsCollectorMessage::NoShows( + b_hash, _b_number, validators + )) => { + assert_eq!(b_hash, block_hash); + assert_eq!(validators, vec![validator_index_a]); + } + ); + assert_matches!( overseer_recv(&mut virtual_overseer).await, AllMessages::ChainSelection(ChainSelectionMessage::Approved(b_hash)) => { @@ -3903,6 +3988,7 @@ fn waits_until_approving_assignments_are_old_enough() { candidate_hash, session_index, false, + None, Some(sig_a), ) .await; @@ -3919,6 +4005,7 @@ fn waits_until_approving_assignments_are_old_enough() { candidate_hash, session_index, false, + None, Some(sig_b), ) .await; @@ -3943,6 +4030,18 @@ fn waits_until_approving_assignments_are_old_enough() { // Sleep to ensure we get a consistent read on the database. futures_timer::Delay::new(Duration::from_millis(100)).await; + let _ = assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RewardsStatisticsCollector( + RewardsStatisticsCollectorMessage::CandidateApproved( + b_hash, _b_number, validators + ) + ) => { + assert_eq!(b_hash, block_hash); + assert_eq!(validators, vec![validator_index_a, validator_index_b]); + } + ); + assert_matches!( overseer_recv(&mut virtual_overseer).await, AllMessages::ChainSelection(ChainSelectionMessage::Approved(b_hash)) => { diff --git a/polkadot/node/core/av-store/src/lib.rs b/polkadot/node/core/av-store/src/lib.rs index 016ec9e001034..5c1a8b92e9baf 100644 --- a/polkadot/node/core/av-store/src/lib.rs +++ b/polkadot/node/core/av-store/src/lib.rs @@ -48,7 +48,7 @@ use polkadot_node_subsystem::{ use polkadot_node_subsystem_util as util; use polkadot_primitives::{ BlockNumber, CandidateEvent, CandidateHash, CandidateReceiptV2 as CandidateReceipt, ChunkIndex, - CoreIndex, Hash, Header, NodeFeatures, ValidatorIndex, + CoreIndex, Hash, Header, NodeFeatures, SessionIndex, ValidatorIndex, }; use util::availability_chunks::availability_chunk_indices; @@ -154,6 +154,7 @@ enum State { struct CandidateMeta { state: State, data_available: bool, + session_index: Option, chunks_stored: BitVec, } @@ -793,6 +794,7 @@ fn note_block_backed( let meta = CandidateMeta { state: State::Unavailable(now.into()), data_available: false, + session_index: candidate.descriptor.session_index(), chunks_stored: bitvec::bitvec![u8, BitOrderLsb0; 0; n_validators], }; @@ -853,6 +855,11 @@ fn note_block_included( }, }; + let candidate_session_index = candidate.descriptor.session_index(); + if meta.session_index.is_none() && candidate_session_index.is_some() { + meta.session_index = candidate_session_index; + } + write_unfinalized_block_contains( db_transaction, config, @@ -1105,8 +1112,17 @@ fn process_message( }, AvailabilityStoreMessage::QueryChunk(candidate, validator_index, tx) => { let _timer = subsystem.metrics.time_get_chunk(); - let _ = - tx.send(load_chunk(&subsystem.db, &subsystem.config, &candidate, validator_index)?); + let chunk = load_chunk(&subsystem.db, &subsystem.config, &candidate, validator_index)?; + let chunk_meta = load_meta(&subsystem.db, &subsystem.config, &candidate)?; + + let _ = match (chunk, chunk_meta) { + (Some(mut chunk), Some(chunk_meta)) => { + chunk.session_index = chunk_meta.session_index; + tx.send(Some(chunk)) + }, + (Some(chunk), _) => tx.send(Some(chunk)), + _ => tx.send(None), + }; }, AvailabilityStoreMessage::QueryChunkSize(candidate, tx) => { let meta = load_meta(&subsystem.db, &subsystem.config, &candidate)?; @@ -1304,6 +1320,7 @@ fn store_available_data( CandidateMeta { state: State::Unavailable(now.into()), data_available: false, + session_index: None, chunks_stored: BitVec::new(), } }, @@ -1326,6 +1343,7 @@ fn store_available_data( chunk: chunk.clone(), proof, index: ChunkIndex(index as u32), + session_index: meta.session_index, }) .collect(); diff --git a/polkadot/node/core/av-store/src/tests.rs b/polkadot/node/core/av-store/src/tests.rs index 53188e6a9e6c0..40ad0441ac4d7 100644 --- a/polkadot/node/core/av-store/src/tests.rs +++ b/polkadot/node/core/av-store/src/tests.rs @@ -295,6 +295,7 @@ fn store_chunk_works() { chunk: vec![1, 2, 3], index: chunk_index, proof: Proof::try_from(vec![vec![3, 4, 5]]).unwrap(), + session_index: None, }; // Ensure an entry already exists. In reality this would come from watching @@ -308,6 +309,7 @@ fn store_chunk_works() { data_available: false, chunks_stored: bitvec::bitvec![u8, BitOrderLsb0; 0; n_validators], state: State::Unavailable(BETimestamp(0)), + session_index: None, }, ); }); @@ -347,6 +349,7 @@ fn store_chunk_does_nothing_if_no_entry_already() { chunk: vec![1, 2, 3], index: chunk_index, proof: Proof::try_from(vec![vec![3, 4, 5]]).unwrap(), + session_index: None, }; let (tx, rx) = oneshot::channel(); @@ -395,6 +398,7 @@ fn query_chunk_checks_meta() { v }, state: State::Unavailable(BETimestamp(0)), + session_index: None, }, ); }); @@ -532,6 +536,7 @@ fn store_pov_and_queries_work() { chunk: branch.1.to_vec(), index: validator_index.into(), proof: Proof::try_from(branch.0.clone()).unwrap(), + session_index: None, }; assert_eq!(chunk, expected_chunk); assert_eq!(chunk, query_all_chunks_res[validator_index as usize]); @@ -621,6 +626,7 @@ fn store_pov_and_queries_work() { chunk: branch.1.to_vec(), index: expected_chunk_index, proof: Proof::try_from(branch.0.clone()).unwrap(), + session_index: None, }; assert_eq!(chunk, expected_chunk); assert_eq!( @@ -690,6 +696,7 @@ fn query_all_chunks_works() { data_available: false, chunks_stored: bitvec::bitvec![u8, BitOrderLsb0; 0; n_validators as _], state: State::Unavailable(BETimestamp(0)), + session_index: None, }, ); }); @@ -698,6 +705,7 @@ fn query_all_chunks_works() { chunk: vec![1, 2, 3], index: ChunkIndex(1), proof: Proof::try_from(vec![vec![3, 4, 5]]).unwrap(), + session_index: None, }; let (tx, rx) = oneshot::channel(); @@ -1346,6 +1354,7 @@ fn query_chunk_size_works() { chunk: vec![1, 2, 3], index: chunk_index, proof: Proof::try_from(vec![vec![3, 4, 5]]).unwrap(), + session_index: None, }; // Ensure an entry already exists. In reality this would come from watching @@ -1359,6 +1368,7 @@ fn query_chunk_size_works() { data_available: false, chunks_stored: bitvec::bitvec![u8, BitOrderLsb0; 0; n_validators], state: State::Unavailable(BETimestamp(0)), + session_index: None, }, ); }); diff --git a/polkadot/node/core/rewards-statistics-collector/Cargo.toml b/polkadot/node/core/rewards-statistics-collector/Cargo.toml new file mode 100644 index 0000000000000..b92e98b4ee5cf --- /dev/null +++ b/polkadot/node/core/rewards-statistics-collector/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "polkadot-node-core-rewards-statistics-collector" +version = "7.0.0" +description = "The Statistics Collector subsystem. Collects Approval Voting and Approvals Distributions stats." +authors.workspace = true +edition.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[lints] +workspace = true + +[dependencies] +fatality = { workspace = true } +futures = { workspace = true } +gum = { workspace = true, default-features = true } +thiserror = { workspace = true } + +polkadot-node-primitives = { workspace = true, default-features = true } +polkadot-node-subsystem = { workspace = true, default-features = true } +polkadot-node-subsystem-util = { workspace = true, default-features = true } +polkadot-primitives = { workspace = true, default-features = true } + +[dev-dependencies] +assert_matches = { workspace = true } +polkadot-node-subsystem-test-helpers = { workspace = true } +polkadot-primitives = { workspace = true, features = ["test"] } +sp-application-crypto = { workspace = true, default-features = true } +sp-authority-discovery = { workspace = true, default-features = true } +sp-core = { workspace = true, default-features = true } +sp-keyring = { workspace = true, default-features = true } +sp-tracing = { workspace = true } diff --git a/polkadot/node/core/rewards-statistics-collector/src/approval_voting_metrics.rs b/polkadot/node/core/rewards-statistics-collector/src/approval_voting_metrics.rs new file mode 100644 index 0000000000000..8c7198fc88920 --- /dev/null +++ b/polkadot/node/core/rewards-statistics-collector/src/approval_voting_metrics.rs @@ -0,0 +1,61 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use crate::View; +use polkadot_primitives::{BlockNumber, Hash, ValidatorIndex}; +use std::collections::BTreeMap; + +#[derive(Debug, Clone, Default, Eq, PartialEq)] +pub struct ApprovalsStats { + pub votes: BTreeMap, + pub no_shows: BTreeMap, +} + +pub fn handle_candidate_approved( + view: &mut View, + block_hash: Hash, + block_number: BlockNumber, + approvals: Vec, +) { + view.per_relay.entry((block_hash, block_number)).and_modify(|relay_view| { + for validator_index in approvals { + relay_view + .approvals_stats + .votes + .entry(validator_index) + .and_modify(|count| *count = count.saturating_add(1)) + .or_insert(1); + } + }); +} + +pub fn handle_observed_no_shows( + view: &mut View, + block_hash: Hash, + block_number: BlockNumber, + no_show_validators: Vec, +) { + view.per_relay.entry((block_hash, block_number)).and_modify(|relay_view| { + for validator_index in no_show_validators { + relay_view + .approvals_stats + .no_shows + .entry(validator_index) + .and_modify(|count| *count = count.saturating_add(1)) + .or_insert(1); + } + }); +} diff --git a/polkadot/node/core/rewards-statistics-collector/src/availability_distribution_metrics.rs b/polkadot/node/core/rewards-statistics-collector/src/availability_distribution_metrics.rs new file mode 100644 index 0000000000000..b543e363c23ff --- /dev/null +++ b/polkadot/node/core/rewards-statistics-collector/src/availability_distribution_metrics.rs @@ -0,0 +1,116 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use crate::{View, LOG_TARGET}; +use polkadot_primitives::{AuthorityDiscoveryId, SessionIndex, ValidatorIndex}; +use std::collections::{BTreeMap, HashMap, HashSet}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct AvailabilityChunks { + pub downloads: BTreeMap, + pub uploads: BTreeMap, +} + +impl AvailabilityChunks { + pub fn new() -> Self { + Self { downloads: Default::default(), uploads: Default::default() } + } + + pub fn new_with_upload(auth_id: AuthorityDiscoveryId, count: u64) -> Self { + Self { + downloads: Default::default(), + uploads: vec![(auth_id, count)].into_iter().collect(), + } + } + + pub fn note_candidate_chunk_downloaded( + &mut self, + authority_id: AuthorityDiscoveryId, + count: u64, + ) { + self.downloads + .entry(authority_id) + .and_modify(|current| *current = current.saturating_add(count)) + .or_insert(count); + } + + pub fn note_candidate_chunk_uploaded( + &mut self, + authority_id: AuthorityDiscoveryId, + count: u64, + ) { + self.uploads + .entry(authority_id) + .and_modify(|current| *current = current.saturating_add(count)) + .or_insert(count); + } +} + +// whenever chunks are acquired throughout availability +// recovery we collect the metrics about which validator +// provided and the amount of chunks +pub fn handle_chunks_downloaded( + view: &mut View, + session_index: SessionIndex, + downloads: HashMap, +) { + let av_chunks = view + .availability_chunks + .entry(session_index) + .or_insert(AvailabilityChunks::new()); + + for (validator_index, download_count) in downloads { + let authority_id = view + .per_session + .get(&session_index) + .and_then(|session_view| session_view.authorities_ids.get(validator_index.0 as usize)); + + match authority_id { + Some(authority_id) => { + av_chunks.note_candidate_chunk_downloaded(authority_id.clone(), download_count); + }, + None => { + gum::debug!( + target: LOG_TARGET, + validator_index = ?validator_index, + download_count = download_count, + session_idx = ?session_index, + "could not find validator authority id" + ); + }, + }; + } +} + +// handle_chunk_uploaded receive the authority ids of the peer +// we just uploaded chunks to +pub fn handle_chunk_uploaded( + view: &mut View, + session_index: SessionIndex, + authority_ids: HashSet, +) { + if let Some(session_info) = view.per_session.get(&session_index) { + let validator_authority_id = + session_info.authorities_ids.iter().find(|auth| authority_ids.contains(auth)); + + if let Some(auth_id) = validator_authority_id { + view.availability_chunks + .entry(session_index) + .and_modify(|av| av.note_candidate_chunk_uploaded(auth_id.clone(), 1)) + .or_insert(AvailabilityChunks::new_with_upload(auth_id.clone(), 1)); + } + } +} diff --git a/polkadot/node/core/rewards-statistics-collector/src/error.rs b/polkadot/node/core/rewards-statistics-collector/src/error.rs new file mode 100644 index 0000000000000..73ac72012563d --- /dev/null +++ b/polkadot/node/core/rewards-statistics-collector/src/error.rs @@ -0,0 +1,60 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Error types. + +use futures::channel::oneshot; +use polkadot_node_subsystem::{ChainApiError, RuntimeApiError, SubsystemError}; + +use crate::LOG_TARGET; +use fatality::Nested; + +#[allow(missing_docs)] +#[fatality::fatality(splitable)] +pub enum Error { + #[fatal] + #[error("Receiving message from overseer failed: {0}")] + SubsystemReceive(#[source] SubsystemError), + + #[error("Sending message to overseer failed: {0}")] + OverseerCommunication(#[source] oneshot::Canceled), + + #[error("Failed to request runtime data: {0}")] + RuntimeApiCallError(#[source] RuntimeApiError), + + #[error("Failed to request chain api data: {0}")] + ChainApiCallError(#[source] ChainApiError), +} + +/// General `Result` type. +pub type Result = std::result::Result; + +/// Result for fatal only failures. +pub type FatalResult = std::result::Result; + +/// Utility for eating top level errors and log them. +/// +/// We basically always want to try and continue on error. This utility function is meant to +/// consume top-level errors by simply logging them +pub fn log_error(result: Result<()>, ctx: &'static str) -> FatalResult<()> { + match result.into_nested()? { + Ok(()) => Ok(()), + Err(jfyi) => { + gum::debug!(target: LOG_TARGET, error = ?jfyi, ctx); + Ok(()) + }, + } +} diff --git a/polkadot/node/core/rewards-statistics-collector/src/lib.rs b/polkadot/node/core/rewards-statistics-collector/src/lib.rs new file mode 100644 index 0000000000000..2333a6c2d3d3d --- /dev/null +++ b/polkadot/node/core/rewards-statistics-collector/src/lib.rs @@ -0,0 +1,354 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Implementation of the Rewards Statistics Collector subsystem. +//! This component monitors and manages metrics related to parachain candidate approvals, +//! including approval votes, distribution of approval chunks, chunk downloads, and chunk uploads. +//! +//! Its primary responsibility is to collect and track data reflecting node’s perspective +//! on the approval work carried out by all session validators. + +use crate::error::{FatalError, FatalResult, JfyiError, Result}; +use futures::{channel::oneshot, prelude::*}; +use polkadot_node_primitives::{new_session_window_size, SessionWindowSize, DISPUTE_WINDOW}; +use polkadot_node_subsystem::{ + messages::{ChainApiMessage, RewardsStatisticsCollectorMessage}, + overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, +}; +use polkadot_primitives::{AuthorityDiscoveryId, BlockNumber, Hash, SessionIndex, ValidatorIndex}; +use std::collections::{BTreeMap, HashMap}; + +mod approval_voting_metrics; +mod availability_distribution_metrics; +mod error; +pub mod metrics; +#[cfg(test)] +mod tests; + +use self::metrics::Metrics; +use crate::{ + approval_voting_metrics::{handle_candidate_approved, handle_observed_no_shows}, + availability_distribution_metrics::{ + handle_chunk_uploaded, handle_chunks_downloaded, AvailabilityChunks, + }, +}; +use approval_voting_metrics::ApprovalsStats; +use polkadot_node_subsystem_util::{request_session_index_for_child, request_session_info}; + +const MAX_SESSION_VIEWS_TO_KEEP: SessionWindowSize = DISPUTE_WINDOW; +const MAX_AVAILABILITIES_TO_KEEP: SessionWindowSize = new_session_window_size!(3); + +const LOG_TARGET: &str = "parachain::rewards-statistics-collector"; + +#[derive(Default)] +pub struct Config { + pub verbose_approval_metrics: bool, +} + +#[derive(Debug, Default, Clone)] +struct PerRelayView { + session_index: SessionIndex, + approvals_stats: ApprovalsStats, +} + +impl PerRelayView { + fn new(session_index: SessionIndex) -> Self { + PerRelayView { session_index, approvals_stats: ApprovalsStats::default() } + } +} + +#[derive(Debug, Eq, PartialEq, Clone, Default)] +struct PerValidatorTally { + no_shows: u32, + approvals: u32, +} + +impl PerValidatorTally { + fn increment_noshow_by(&mut self, value: u32) { + self.no_shows = self.no_shows.saturating_add(value); + } + + fn increment_approval_by(&mut self, value: u32) { + self.approvals = self.approvals.saturating_add(value); + } +} + +#[derive(Debug, Eq, PartialEq, Clone)] +pub struct PerSessionView { + authorities_ids: Vec, + validators_tallies: HashMap, +} + +impl PerSessionView { + fn new(authorities_ids: Vec) -> Self { + Self { authorities_ids, validators_tallies: HashMap::new() } + } +} + +/// View holds the subsystem internal state +struct View { + /// per_relay holds collected approvals statistics for + /// all the candidates under the given unfinalized relay hash + per_relay: HashMap<(Hash, BlockNumber), PerRelayView>, + /// per_session holds session information (authorities lookup) + /// and approvals tallies which is the aggregation of collected + /// approvals statistics under finalized blocks + per_session: BTreeMap, + /// availability_chunks holds collected upload and download chunks + /// statistics per validator + availability_chunks: BTreeMap, + latest_finalized_block: (BlockNumber, Hash), +} + +impl View { + fn new() -> Self { + View { + per_relay: HashMap::new(), + per_session: BTreeMap::new(), + availability_chunks: BTreeMap::new(), + latest_finalized_block: (0, Hash::default()), + } + } +} + +/// The statistics collector subsystem. +#[derive(Default)] +pub struct RewardsStatisticsCollector { + metrics: Metrics, + config: Config, +} + +impl RewardsStatisticsCollector { + /// Create a new instance of the `RewardsStatisticsCollector`. + pub fn new(metrics: Metrics, config: Config) -> Self { + Self { metrics, config } + } +} + +#[overseer::subsystem(RewardsStatisticsCollector, error = SubsystemError, prefix = self::overseer)] +impl RewardsStatisticsCollector +where + Context: Send + Sync, +{ + fn start(self, ctx: Context) -> SpawnedSubsystem { + SpawnedSubsystem { + future: run(ctx, (self.metrics, self.config.verbose_approval_metrics)) + .map_err(|e| SubsystemError::with_origin("statistics-parachains", e)) + .boxed(), + name: "rewards-statistics-collector-subsystem", + } + } +} + +#[overseer::contextbounds(RewardsStatisticsCollector, prefix = self::overseer)] +async fn run(mut ctx: Context, metrics: (Metrics, bool)) -> FatalResult<()> { + let mut view = View::new(); + loop { + error::log_error( + run_iteration(&mut ctx, &mut view, (&metrics.0, metrics.1)).await, + "Encountered issue during run iteration", + )?; + } +} + +#[overseer::contextbounds(RewardsStatisticsCollector, prefix = self::overseer)] +pub(crate) async fn run_iteration( + ctx: &mut Context, + view: &mut View, + // the boolean flag indicates to the subsystem's + // inner metric to publish the accumulated tallies + // per session per validator, enabling the flag + // could cause overhead to prometheus depending on + // the amount of active validators + metrics: (&Metrics, bool), +) -> Result<()> { + loop { + match ctx.recv().await.map_err(FatalError::SubsystemReceive)? { + FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()), + FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => { + if let Some(activated) = update.activated { + let relay_hash = activated.hash; + let relay_number = activated.number; + + let session_idx = request_session_index_for_child(relay_hash, ctx.sender()) + .await + .await + .map_err(JfyiError::OverseerCommunication)? + .map_err(JfyiError::RuntimeApiCallError)?; + + view.per_relay + .insert((relay_hash, relay_number), PerRelayView::new(session_idx)); + + prune_based_on_session_windows( + view, + session_idx, + MAX_SESSION_VIEWS_TO_KEEP, + MAX_AVAILABILITIES_TO_KEEP, + ); + + if !view.per_session.contains_key(&session_idx) { + let session_info = + request_session_info(relay_hash, session_idx, ctx.sender()) + .await + .await + .map_err(JfyiError::OverseerCommunication)? + .map_err(JfyiError::RuntimeApiCallError)?; + + if let Some(session_info) = session_info { + view.per_session.insert( + session_idx, + PerSessionView::new( + session_info.discovery_keys.iter().cloned().collect(), + ), + ); + } + } + } + }, + FromOrchestra::Signal(OverseerSignal::BlockFinalized( + fin_block_hash, + fin_block_number, + )) => { + // when a block is finalized it performs: + // 1. Pruning unneeded forks + // 2. Collected statistics that belongs to the finalized chain + // 3. After collection of finalized statistics then remove finalized nodes from the + // mapping leaving only the unfinalized blocks after finalization + let (tx, rx) = oneshot::channel(); + let ancestor_req_message = ChainApiMessage::Ancestors { + hash: fin_block_hash, + k: fin_block_number.saturating_sub(view.latest_finalized_block.0) as _, + response_channel: tx, + }; + ctx.send_message(ancestor_req_message).await; + + let mut finalized_hashes = rx + .map_err(JfyiError::OverseerCommunication) + .await? + .map_err(JfyiError::ChainApiCallError)?; + finalized_hashes.push(fin_block_hash); + + let (mut before, after): (HashMap<_, _>, HashMap<_, _>) = view + .per_relay + .clone() + .into_iter() + .partition(|((_, relay_number), _)| *relay_number <= fin_block_number); + + before.retain(|(relay_hash, _), _| finalized_hashes.contains(relay_hash)); + let finalized_views: HashMap<&Hash, &PerRelayView> = before + .iter() + .map(|((relay_hash, _), per_relay_view)| (relay_hash, per_relay_view)) + .collect::>(); + + aggregate_finalized_approvals_stats(view, finalized_views, metrics); + log_session_view_general_stats(view); + + view.per_relay = after; + view.latest_finalized_block = (fin_block_number, fin_block_hash); + }, + FromOrchestra::Communication { msg } => match msg { + RewardsStatisticsCollectorMessage::ChunksDownloaded(session_index, downloads) => + handle_chunks_downloaded(view, session_index, downloads), + RewardsStatisticsCollectorMessage::ChunkUploaded(session_index, authority_ids) => + handle_chunk_uploaded(view, session_index, authority_ids), + RewardsStatisticsCollectorMessage::CandidateApproved( + block_hash, + block_number, + approvals, + ) => { + handle_candidate_approved(view, block_hash, block_number, approvals); + }, + RewardsStatisticsCollectorMessage::NoShows( + block_hash, + block_number, + no_show_validators, + ) => { + handle_observed_no_shows(view, block_hash, block_number, no_show_validators); + }, + }, + } + } +} + +// aggregate_finalized_approvals_stats will iterate over the finalized hashes +// tallying each collected approval stats on its correct session per validator index +fn aggregate_finalized_approvals_stats( + view: &mut View, + finalized_relays: HashMap<&Hash, &PerRelayView>, + metrics: (&Metrics, bool), +) { + for (_, per_relay_view) in finalized_relays { + if let Some(session_view) = view.per_session.get_mut(&per_relay_view.session_index) { + metrics.0.record_approvals_stats( + per_relay_view.session_index, + per_relay_view.approvals_stats.clone(), + // if true will report the metrics per validator index + metrics.1, + ); + + for (validator_idx, total_votes) in &per_relay_view.approvals_stats.votes { + session_view + .validators_tallies + .entry(*validator_idx) + .or_default() + .increment_approval_by(*total_votes); + } + + for (validator_idx, total_noshows) in &per_relay_view.approvals_stats.no_shows { + session_view + .validators_tallies + .entry(*validator_idx) + .or_default() + .increment_noshow_by(*total_noshows); + } + } + } +} + +// prune_based_on_session_windows prunes the per_session and the availability_chunks +// mappings based on a session windows avoiding them to grow indefinitely +fn prune_based_on_session_windows( + view: &mut View, + session_idx: SessionIndex, + max_session_view_to_keep: SessionWindowSize, + max_availabilities_to_keep: SessionWindowSize, +) { + if let Some(wipe_before) = session_idx.checked_sub(max_session_view_to_keep.get()) { + view.per_session = view.per_session.split_off(&wipe_before); + } + + if let Some(wipe_before) = session_idx.checked_sub(max_availabilities_to_keep.get()) { + view.availability_chunks = view.availability_chunks.split_off(&wipe_before); + } +} + +fn log_session_view_general_stats(view: &View) { + for (session_index, session_view) in &view.per_session { + let session_tally = session_view + .validators_tallies + .values() + .map(|tally| (tally.approvals, tally.no_shows)) + .fold((0, 0), |acc, (approvals, noshows)| (acc.0 + approvals, acc.1 + noshows)); + + gum::debug!( + target: LOG_TARGET, + session_idx = ?session_index, + approvals = ?session_tally.0, + noshows = ?session_tally.1, + "session collected statistics", + ); + } +} diff --git a/polkadot/node/core/rewards-statistics-collector/src/metrics.rs b/polkadot/node/core/rewards-statistics-collector/src/metrics.rs new file mode 100644 index 0000000000000..70ff7d2439925 --- /dev/null +++ b/polkadot/node/core/rewards-statistics-collector/src/metrics.rs @@ -0,0 +1,142 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use crate::approval_voting_metrics::ApprovalsStats; +use polkadot_node_subsystem_util::metrics::{ + self, + prometheus::{self, U64}, +}; +use polkadot_primitives::SessionIndex; + +#[derive(Clone)] +pub(crate) struct MetricsInner { + approvals_usage_per_session: prometheus::CounterVec, + no_shows_per_session: prometheus::CounterVec, + + approvals_per_session_per_validator: prometheus::CounterVec, + no_shows_per_session_per_validator: prometheus::CounterVec, + + submission_started: prometheus::Counter, +} + +/// Candidate backing metrics. +#[derive(Default, Clone)] +pub struct Metrics(pub(crate) Option); + +impl Metrics { + pub fn record_approvals_stats( + &self, + session: SessionIndex, + approval_stats: ApprovalsStats, + per_validator_metrics: bool, + ) { + self.0.as_ref().map(|metrics| { + metrics + .approvals_usage_per_session + .with_label_values(&[session.to_string().as_str()]) + .inc_by(approval_stats.votes.len() as u64); + + metrics + .no_shows_per_session + .with_label_values(&[session.to_string().as_str()]) + .inc_by(approval_stats.no_shows.len() as u64); + + if per_validator_metrics { + for validator in &approval_stats.votes { + metrics + .approvals_per_session_per_validator + .with_label_values(&[ + session.to_string().as_str(), + validator.0 .0.to_string().as_str(), + ]) + .inc() + } + + for validator in &approval_stats.no_shows { + metrics + .no_shows_per_session_per_validator + .with_label_values(&[ + session.to_string().as_str(), + validator.0 .0.to_string().as_str(), + ]) + .inc() + } + } + }); + } + + pub fn submit_approvals_tallies(&self, tallies: usize) { + self.0.as_ref().map(|metrics| { + metrics.submission_started.inc_by(tallies as u64); + }); + } +} + +impl metrics::Metrics for Metrics { + fn try_register(registry: &prometheus::Registry) -> Result { + let metrics = MetricsInner { + approvals_per_session_per_validator: prometheus::register( + prometheus::CounterVec::new( + prometheus::Opts::new( + "polkadot_parachain_rewards_statistics_collector_approvals_per_session_per_validator", + "Total number of useful approvals a given validator provided on a session.", + ), + vec!["session", "validator_idx"].as_ref(), + )?, + registry, + )?, + no_shows_per_session_per_validator: prometheus::register( + prometheus::CounterVec::new( + prometheus::Opts::new( + "polkadot_parachain_rewards_statistics_collector_no_shows_per_session_per_validator", + "Total number a given validator no showed on a session.", + ), + vec!["session", "validator_idx"].as_ref(), + )?, + registry, + )?, + approvals_usage_per_session: prometheus::register( + prometheus::CounterVec::new( + prometheus::Opts::new( + "polkadot_parachain_rewards_statistics_collector_approvals_per_session", + "Total number of useful approvals on a session.", + ), + vec!["session"].as_ref(), + )?, + registry, + )?, + no_shows_per_session: prometheus::register( + prometheus::CounterVec::new( + prometheus::Opts::new( + "polkadot_parachain_rewards_statistics_collector_no_shows_per_session", + "Total number of no-shows on a session.", + ), + vec!["session"].as_ref(), + )?, + registry, + )?, + submission_started: prometheus::register( + prometheus::Counter::new( + "polkadot_parachain_rewards_statistics_collector_submittion_started", + "The number of rewards tallies submitted" + )?, + registry + )?, + }; + + Ok(Metrics(Some(metrics))) + } +} diff --git a/polkadot/node/core/rewards-statistics-collector/src/tests.rs b/polkadot/node/core/rewards-statistics-collector/src/tests.rs new file mode 100644 index 0000000000000..72b2baaac9ed7 --- /dev/null +++ b/polkadot/node/core/rewards-statistics-collector/src/tests.rs @@ -0,0 +1,642 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use super::*; +use assert_matches::assert_matches; +use overseer::FromOrchestra; +use polkadot_node_subsystem::{ + messages::{ + AllMessages, RewardsStatisticsCollectorMessage, RuntimeApiMessage, RuntimeApiRequest, + }, + ActivatedLeaf, ActiveLeavesUpdate, +}; +use polkadot_node_subsystem_test_helpers as test_helpers; +use polkadot_primitives::{Hash, SessionIndex, SessionInfo}; +use sp_application_crypto::Pair as PairT; +use sp_authority_discovery::AuthorityPair as AuthorityDiscoveryPair; +use sp_keyring::Sr25519Keyring; +use std::collections::HashSet; +use test_helpers::mock::new_leaf; + +type VirtualOverseer = polkadot_node_subsystem_test_helpers::TestSubsystemContextHandle< + RewardsStatisticsCollectorMessage, +>; + +async fn activate_leaf( + virtual_overseer: &mut VirtualOverseer, + activated: ActivatedLeaf, + session_index: SessionIndex, + session_info: Option, +) { + let activated_leaf_hash = activated.hash; + virtual_overseer + .send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::start_work( + activated, + )))) + .await; + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(parent, RuntimeApiRequest::SessionIndexForChild(tx)) + ) if parent == activated_leaf_hash => { + tx.send(Ok(session_index)).unwrap(); + } + ); + + if let Some(session_info) = session_info { + assert_matches!( + virtual_overseer.recv().await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_parent, RuntimeApiRequest::SessionInfo(req_session, tx)) + ) if req_session == session_index => { + tx.send(Ok(Some(session_info))).unwrap(); + } + ); + } +} + +async fn finalize_block( + virtual_overseer: &mut VirtualOverseer, + finalized: (Hash, BlockNumber), + latest_finalized_block_number: BlockNumber, + finalized_hashes: Vec, +) { + let fin_block_hash = finalized.0; + let fin_block_number = finalized.1; + + virtual_overseer + .send(FromOrchestra::Signal(OverseerSignal::BlockFinalized( + fin_block_hash, + fin_block_number, + ))) + .await; + + let expected_amt_request_blocks = + fin_block_number.saturating_sub(latest_finalized_block_number) as usize; + + assert_matches!( + virtual_overseer.recv().await, + AllMessages::ChainApi( + ChainApiMessage::Ancestors { hash, k, response_channel } + ) if hash == fin_block_hash && k == expected_amt_request_blocks => { + response_channel.send(Ok(finalized_hashes)).unwrap(); + } + ); +} + +async fn candidate_approved( + virtual_overseer: &mut VirtualOverseer, + rb_hash: Hash, + rb_number: BlockNumber, + approvals: Vec, +) { + let msg = FromOrchestra::Communication { + msg: RewardsStatisticsCollectorMessage::CandidateApproved(rb_hash, rb_number, approvals), + }; + virtual_overseer.send(msg).await; +} + +async fn no_shows( + virtual_overseer: &mut VirtualOverseer, + rb_hash: Hash, + rb_number: BlockNumber, + no_shows: Vec, +) { + let msg = FromOrchestra::Communication { + msg: RewardsStatisticsCollectorMessage::NoShows(rb_hash, rb_number, no_shows), + }; + virtual_overseer.send(msg).await; +} + +macro_rules! approvals_stats_assertion { + ($fn_name:ident, $field:ident) => { + fn $fn_name( + view: &View, + rb_hash: Hash, + rb_number: BlockNumber, + expected: Vec<(ValidatorIndex, u32)>, + ) { + let expected_map = expected.into_iter().collect::>(); + + let stats_for = + view.per_relay.get(&(rb_hash, rb_number)).unwrap().approvals_stats.clone(); + + assert!(stats_for.$field.eq(&expected_map)); + } + }; +} + +approvals_stats_assertion!(assert_votes, votes); +approvals_stats_assertion!(assert_no_shows, no_shows); + +fn test_harness>( + view: &mut View, + test: impl FnOnce(VirtualOverseer) -> T, +) { + sp_tracing::init_for_tests(); + + let pool = sp_core::testing::TaskExecutor::new(); + + let (mut context, virtual_overseer) = + polkadot_node_subsystem_test_helpers::make_subsystem_context(pool.clone()); + + let subsystem = async move { + if let Err(e) = run_iteration(&mut context, view, (&Metrics(None), true)).await { + panic!("{:?}", e); + } + + view + }; + + let test_fut = test(virtual_overseer); + + futures::pin_mut!(test_fut); + futures::pin_mut!(subsystem); + _ = futures::executor::block_on(future::join( + async move { + let mut virtual_overseer = test_fut.await; + virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; + }, + subsystem, + )); +} + +#[test] +fn single_candidate_approved() { + let validator_idx = ValidatorIndex(2); + + let rb_hash = Hash::from_low_u64_be(132); + let rb_number: BlockNumber = 1; + + let leaf = new_leaf(rb_hash, rb_number); + + let mut view = View::new(); + test_harness(&mut view, |mut virtual_overseer| async move { + activate_leaf(&mut virtual_overseer, leaf, 1, Some(default_session_info(1))).await; + + candidate_approved(&mut virtual_overseer, rb_hash, rb_number, vec![validator_idx]).await; + virtual_overseer + }); + + assert_eq!(view.per_relay.len(), 1); + + assert_votes(&view, rb_hash, rb_number, vec![(validator_idx, 1)]); +} + +#[test] +fn candidate_approved_for_different_forks() { + let validator_idx0 = ValidatorIndex(0); + let validator_idx1 = ValidatorIndex(1); + + let rb_number: BlockNumber = 1; + let rb_hash_fork_0 = Hash::from_low_u64_be(132); + let rb_hash_fork_1 = Hash::from_low_u64_be(231); + + let mut view = View::new(); + test_harness(&mut view, |mut virtual_overseer| async move { + let leaf0 = new_leaf(rb_hash_fork_0, rb_number); + + let leaf1 = new_leaf(rb_hash_fork_1, rb_number); + + activate_leaf(&mut virtual_overseer, leaf0, 1, Some(default_session_info(1))).await; + + activate_leaf(&mut virtual_overseer, leaf1, 1, None).await; + + candidate_approved(&mut virtual_overseer, rb_hash_fork_0, rb_number, vec![validator_idx0]) + .await; + + candidate_approved(&mut virtual_overseer, rb_hash_fork_1, rb_number, vec![validator_idx1]) + .await; + + virtual_overseer + }); + + assert_eq!(view.per_relay.len(), 2); + + assert_votes(&view, rb_hash_fork_0, rb_number, vec![(validator_idx0, 1)]); + + assert_votes(&view, rb_hash_fork_1, rb_number, vec![(validator_idx1, 1)]); +} + +#[test] +fn candidate_approval_stats_with_no_shows() { + let approvals_from = vec![ValidatorIndex(0), ValidatorIndex(3)]; + let no_show_validators = vec![ValidatorIndex(1), ValidatorIndex(2)]; + + let rb_hash = Hash::from_low_u64_be(111); + let rb_number: BlockNumber = 1; + + let mut view = View::new(); + test_harness(&mut view, |mut virtual_overseer| async move { + let leaf1 = new_leaf(rb_hash, rb_number); + activate_leaf(&mut virtual_overseer, leaf1, 1, Some(default_session_info(1))).await; + + candidate_approved(&mut virtual_overseer, rb_hash, rb_number, approvals_from).await; + + no_shows(&mut virtual_overseer, rb_hash, rb_number, no_show_validators).await; + + virtual_overseer + }); + + assert_eq!(view.per_relay.len(), 1); + assert_votes(&view, rb_hash, rb_number, vec![(ValidatorIndex(0), 1), (ValidatorIndex(3), 1)]); + + assert_no_shows( + &view, + rb_hash, + rb_number, + vec![(ValidatorIndex(1), 1), (ValidatorIndex(2), 1)], + ); +} + +#[test] +fn note_chunks_downloaded() { + let session_idx: SessionIndex = 2; + let chunk_downloads = vec![(ValidatorIndex(0), 10u64), (ValidatorIndex(1), 2)]; + + let mut view = View::new(); + let authorities: Vec = + vec![Sr25519Keyring::Alice.public().into(), Sr25519Keyring::Bob.public().into()]; + + view.per_session.insert(session_idx, PerSessionView::new(authorities.clone())); + + test_harness(&mut view, |mut virtual_overseer| async move { + virtual_overseer + .send(FromOrchestra::Communication { + msg: RewardsStatisticsCollectorMessage::ChunksDownloaded( + session_idx, + HashMap::from_iter(chunk_downloads.clone().into_iter()), + ), + }) + .await; + + // should increment only validator 0 + let second_round_of_downloads = vec![(ValidatorIndex(0), 5u64)]; + virtual_overseer + .send(FromOrchestra::Communication { + msg: RewardsStatisticsCollectorMessage::ChunksDownloaded( + session_idx, + HashMap::from_iter(second_round_of_downloads.into_iter()), + ), + }) + .await; + + virtual_overseer + }); + + assert_eq!(view.availability_chunks.len(), 1); + let ac = view.availability_chunks.get(&session_idx).unwrap(); + + assert_eq!(ac.downloads.len(), 2); + + let expected = vec![(ValidatorIndex(0), 15u64), (ValidatorIndex(1), 2)]; + + for (vidx, expected_count) in expected { + let auth_id = authorities.get(vidx.0 as usize).unwrap(); + let count = ac.downloads.get(&auth_id).unwrap(); + assert_eq!(*count, expected_count); + } +} + +fn default_session_info(session_idx: SessionIndex) -> SessionInfo { + SessionInfo { + active_validator_indices: vec![], + random_seed: Default::default(), + dispute_period: session_idx, + validators: Default::default(), + discovery_keys: vec![], + assignment_keys: vec![], + validator_groups: Default::default(), + n_cores: 0, + zeroth_delay_tranche_width: 0, + relay_vrf_modulo_samples: 0, + n_delay_tranches: 0, + no_show_slots: 0, + needed_approvals: 0, + } +} + +#[test] +fn note_chunks_uploaded_to_active_validator() { + let activated_leaf_hash = Hash::from_low_u64_be(111); + let leaf1 = new_leaf(activated_leaf_hash, 1); + + let session_index: SessionIndex = 2; + let mut session_info: SessionInfo = default_session_info(session_index); + + let validator_idx_pair = AuthorityDiscoveryPair::generate(); + + let validator_idx_auth_id: AuthorityDiscoveryId = validator_idx_pair.0.public().into(); + session_info.discovery_keys = vec![validator_idx_auth_id.clone()]; + + let mut view = View::new(); + test_harness(&mut view, |mut virtual_overseer| async move { + activate_leaf(&mut virtual_overseer, leaf1, session_index, Some(session_info)).await; + + virtual_overseer + .send(FromOrchestra::Communication { + msg: RewardsStatisticsCollectorMessage::ChunkUploaded( + session_index, + HashSet::from_iter(vec![validator_idx_auth_id]), + ), + }) + .await; + + virtual_overseer + }); + + // assert that the leaf was activated and the session info is present + let validator_idx_auth_id: AuthorityDiscoveryId = validator_idx_pair.0.public().into(); + let expected_view = PerSessionView::new(vec![validator_idx_auth_id.clone()]); + + assert_eq!(view.per_session.len(), 1); + assert_eq!(view.per_session.get(&session_index).unwrap(), &expected_view); + + assert_eq!(view.availability_chunks.len(), 1); + + let mut expected_av_chunks = AvailabilityChunks::new(); + expected_av_chunks.note_candidate_chunk_uploaded(validator_idx_auth_id.clone(), 1); + + assert_eq!(view.availability_chunks.get(&session_index).unwrap(), &expected_av_chunks); +} + +#[test] +fn prune_unfinalized_forks() { + // testing pruning capabilities + // the pruning happens when a session is finalized + // means that all the collected data for the finalized session + // should be kept and the collected data that belongs to unfinalized + // should be pruned + + // Building a "chain" with the following relay blocks (all in the same session) + // A -> B + // A -> C -> D + + let hash_a = Hash::from_slice(&[00; 32]); + let number_a: BlockNumber = 1; + + let hash_b = Hash::from_slice(&[01; 32]); + let number_b: BlockNumber = 2; + + let hash_c = Hash::from_slice(&[02; 32]); + let number_c: BlockNumber = 2; + + let hash_d = Hash::from_slice(&[03; 32]); + let number_d: BlockNumber = 3; + + let session_zero: SessionIndex = 0; + + let mut view = View::new(); + test_harness(&mut view, |mut virtual_overseer| async move { + let leaf_a = new_leaf(hash_a, number_a); + + activate_leaf( + &mut virtual_overseer, + leaf_a, + session_zero, + Some(default_session_info(session_zero)), + ) + .await; + + candidate_approved( + &mut virtual_overseer, + hash_a, + number_a, + vec![ValidatorIndex(2), ValidatorIndex(3)], + ) + .await; + no_shows( + &mut virtual_overseer, + hash_a, + number_a, + vec![ValidatorIndex(0), ValidatorIndex(1)], + ) + .await; + + let leaf_b = new_leaf(hash_b, 2); + activate_leaf(&mut virtual_overseer, leaf_b, session_zero, None).await; + + candidate_approved( + &mut virtual_overseer, + hash_b, + number_b, + vec![ValidatorIndex(0), ValidatorIndex(1)], + ) + .await; + + let leaf_c = new_leaf(hash_c, 2); + activate_leaf(&mut virtual_overseer, leaf_c, session_zero, None).await; + + candidate_approved( + &mut virtual_overseer, + hash_c, + number_c, + vec![ValidatorIndex(0), ValidatorIndex(1), ValidatorIndex(2)], + ) + .await; + + let leaf_d = new_leaf(hash_d, 3); + activate_leaf(&mut virtual_overseer, leaf_d, session_zero, None).await; + + candidate_approved( + &mut virtual_overseer, + hash_d, + number_d, + vec![ValidatorIndex(0), ValidatorIndex(1)], + ) + .await; + + virtual_overseer + }); + + let expect = vec![ + ( + hash_a, + number_a, + vec![(ValidatorIndex(2), 1), (ValidatorIndex(3), 1)], + vec![(ValidatorIndex(0), 1), (ValidatorIndex(1), 1)], + ), + (hash_b, number_b, vec![(ValidatorIndex(0), 1), (ValidatorIndex(1), 1)], vec![]), + ( + hash_c, + number_c, + vec![(ValidatorIndex(0), 1), (ValidatorIndex(1), 1), (ValidatorIndex(2), 1)], + vec![], + ), + (hash_d, number_d, vec![(ValidatorIndex(0), 1), (ValidatorIndex(1), 1)], vec![]), + ]; + + assert_relay_view_approval_stats(&view, expect); + + // Finalizing block C should prune the current unfinalized mapping + // and aggregate data of the finalized chain on the per session view + // the collected data for block D should remain untouched + test_harness(&mut view, |mut virtual_overseer| async move { + finalize_block( + &mut virtual_overseer, + (hash_c, number_c), + 0 as BlockNumber, + // send the parent hash and the genesis hash (all zeroes) + vec![hash_a, Default::default()], + ) + .await; + virtual_overseer + }); + + let expect = + vec![(hash_d, number_d, vec![(ValidatorIndex(0), 1), (ValidatorIndex(1), 1)], vec![])]; + + assert_relay_view_approval_stats(&view, expect); + + // check if the data was aggregated correctly for the session view + // it should aggregate approvals and no-shows collected on blocks + // A and C. + // Data collected on block B should be discarded + // Data collected on block D should remain in the mapping as it was not finalized or pruned + let expected_tallies = HashMap::from_iter(vec![ + (ValidatorIndex(0), PerValidatorTally { no_shows: 1, approvals: 1 }), + (ValidatorIndex(1), PerValidatorTally { no_shows: 1, approvals: 1 }), + (ValidatorIndex(2), PerValidatorTally { no_shows: 0, approvals: 2 }), + (ValidatorIndex(3), PerValidatorTally { no_shows: 0, approvals: 1 }), + ]); + + assert_per_session_tallies(&view.per_session, 0, expected_tallies); + // creating more 3 relay block (E, F, G), all in session 1 + // D -> E -> F + // -> G + + let hash_e = Hash::from_slice(&[04; 32]); + let number_e: BlockNumber = 4; + + let hash_f = Hash::from_slice(&[05; 32]); + let number_f: BlockNumber = 5; + + let hash_g = Hash::from_slice(&[06; 32]); + let number_g: BlockNumber = 5; + + let session_one: SessionIndex = 1; + + test_harness(&mut view, |mut virtual_overseer| async move { + let leaf_e = new_leaf(hash_e, 4); + activate_leaf( + &mut virtual_overseer, + leaf_e, + session_one, + Some(default_session_info(session_one)), + ) + .await; + + candidate_approved( + &mut virtual_overseer, + hash_e, + number_e, + vec![ValidatorIndex(3), ValidatorIndex(1), ValidatorIndex(0)], + ) + .await; + no_shows(&mut virtual_overseer, hash_e, number_e, vec![ValidatorIndex(2)]).await; + + let leaf_f = new_leaf(hash_f, number_f); + activate_leaf(&mut virtual_overseer, leaf_f, session_one, None).await; + candidate_approved(&mut virtual_overseer, hash_f, number_f, vec![ValidatorIndex(3)]).await; + + let leaf_g = new_leaf(hash_g, number_g); + activate_leaf(&mut virtual_overseer, leaf_g, session_one, None).await; + candidate_approved(&mut virtual_overseer, hash_g, number_g, vec![ValidatorIndex(0)]).await; + no_shows(&mut virtual_overseer, hash_g, number_g, vec![ValidatorIndex(1)]).await; + + // finalizing relay block E + finalize_block(&mut virtual_overseer, (hash_e, number_e), number_c, vec![hash_d, hash_c]) + .await; + + virtual_overseer + }); + + // Finalizing block E triggers the pruning mechanism + // now it should aggregate collected data from block D and E + // keeping only blocks F and G on the mapping + let expect = vec![ + (hash_f, number_f, vec![(ValidatorIndex(3), 1)], vec![]), + (hash_g, number_g, vec![(ValidatorIndex(0), 1)], vec![(ValidatorIndex(1), 1)]), + ]; + + assert_relay_view_approval_stats(&view, expect); + + // assert tallies for session 0 + let expected_tallies = HashMap::from_iter(vec![ + ( + ValidatorIndex(0), + PerValidatorTally { + no_shows: 1, + // validator 0 approvals increased from 1 to 2 + // as block D, with more approvals, was finalized + approvals: 2, + }, + ), + (ValidatorIndex(1), PerValidatorTally { no_shows: 1, approvals: 2 }), + (ValidatorIndex(2), PerValidatorTally { no_shows: 0, approvals: 2 }), + (ValidatorIndex(3), PerValidatorTally { no_shows: 0, approvals: 1 }), + ]); + + assert_per_session_tallies(&view.per_session, 0, expected_tallies); + + // assert tallies for session 1 + let expected_tallies = HashMap::from_iter(vec![ + (ValidatorIndex(0), PerValidatorTally { no_shows: 0, approvals: 1 }), + (ValidatorIndex(1), PerValidatorTally { no_shows: 0, approvals: 1 }), + (ValidatorIndex(2), PerValidatorTally { no_shows: 1, approvals: 0 }), + (ValidatorIndex(3), PerValidatorTally { no_shows: 0, approvals: 1 }), + ]); + + assert_per_session_tallies(&view.per_session, 1, expected_tallies); +} + +fn assert_relay_view_approval_stats( + view: &View, + expected_relay_view_stats: Vec<( + Hash, + BlockNumber, + Vec<(ValidatorIndex, u32)>, + Vec<(ValidatorIndex, u32)>, + )>, +) { + assert_eq!(view.per_relay.len(), expected_relay_view_stats.len()); + + for (hash, block_number, votes, no_shows) in &expected_relay_view_stats { + assert_votes(&view, *hash, *block_number, votes.clone()); + assert_no_shows(&view, *hash, *block_number, no_shows.clone()); + } +} + +fn assert_per_session_tallies( + per_session_view: &BTreeMap, + session_idx: SessionIndex, + expected_tallies: HashMap, +) { + let session_view = per_session_view + .get(&session_idx) + .expect("session index should exists in the view"); + + assert_eq!(session_view.validators_tallies.len(), expected_tallies.len()); + for (validator_index, expected_tally) in expected_tallies.iter() { + assert_eq!( + session_view.validators_tallies.get(validator_index), + Some(expected_tally), + "unexpected value for validator index {:?}", + validator_index + ); + } +} diff --git a/polkadot/node/network/availability-distribution/Cargo.toml b/polkadot/node/network/availability-distribution/Cargo.toml index 93565628e6ed2..d691f566cc71f 100644 --- a/polkadot/node/network/availability-distribution/Cargo.toml +++ b/polkadot/node/network/availability-distribution/Cargo.toml @@ -37,11 +37,13 @@ thiserror = { workspace = true } [dev-dependencies] assert_matches = { workspace = true } +async-trait = { workspace = true } futures-timer = { workspace = true } polkadot-node-subsystem-test-helpers = { workspace = true } polkadot-primitives-test-helpers = { workspace = true } polkadot-subsystem-bench = { workspace = true } rstest = { workspace = true } +sc-network-types = { workspace = true, default-features = true } sp-keyring = { workspace = true, default-features = true } sp-tracing = { workspace = true, default-features = true } diff --git a/polkadot/node/network/availability-distribution/src/lib.rs b/polkadot/node/network/availability-distribution/src/lib.rs index 126a50d674f11..621c19f7f6c66 100644 --- a/polkadot/node/network/availability-distribution/src/lib.rs +++ b/polkadot/node/network/availability-distribution/src/lib.rs @@ -46,6 +46,7 @@ use responder::{run_chunk_receivers, run_pov_receiver}; mod metrics; /// Prometheus `Metrics` for availability distribution. pub use metrics::Metrics; +use polkadot_node_network_protocol::authority_discovery::AuthorityDiscovery; #[cfg(test)] mod tests; @@ -53,9 +54,10 @@ mod tests; const LOG_TARGET: &'static str = "parachain::availability-distribution"; /// The availability distribution subsystem. -pub struct AvailabilityDistributionSubsystem { +pub struct AvailabilityDistributionSubsystem { /// Easy and efficient runtime access for this subsystem. runtime: RuntimeInfo, + authority_discovery_service: AD, /// Receivers to receive messages from. recvs: IncomingRequestReceivers, /// Mapping of the req-response protocols to the full protocol names. @@ -75,7 +77,10 @@ pub struct IncomingRequestReceivers { } #[overseer::subsystem(AvailabilityDistribution, error=SubsystemError, prefix=self::overseer)] -impl AvailabilityDistributionSubsystem { +impl AvailabilityDistributionSubsystem +where + AD: AuthorityDiscovery + Clone + Sync, +{ fn start(self, ctx: Context) -> SpawnedSubsystem { let future = self .run(ctx) @@ -87,21 +92,26 @@ impl AvailabilityDistributionSubsystem { } #[overseer::contextbounds(AvailabilityDistribution, prefix = self::overseer)] -impl AvailabilityDistributionSubsystem { +impl AvailabilityDistributionSubsystem +where + AD: AuthorityDiscovery + Clone + Sync, +{ /// Create a new instance of the availability distribution. pub fn new( keystore: KeystorePtr, recvs: IncomingRequestReceivers, req_protocol_names: ReqProtocolNames, + authority_discovery_service: AD, metrics: Metrics, ) -> Self { let runtime = RuntimeInfo::new(Some(keystore)); - Self { runtime, recvs, req_protocol_names, metrics } + Self { runtime, authority_discovery_service, recvs, req_protocol_names, metrics } } /// Start processing work as passed on from the Overseer. async fn run(self, mut ctx: Context) -> std::result::Result<(), FatalError> { - let Self { mut runtime, recvs, metrics, req_protocol_names } = self; + let Self { mut runtime, authority_discovery_service, recvs, metrics, req_protocol_names } = + self; let IncomingRequestReceivers { pov_req_receiver, @@ -123,6 +133,7 @@ impl AvailabilityDistributionSubsystem { "chunk-receiver", run_chunk_receivers( sender, + authority_discovery_service, chunk_req_v1_receiver, chunk_req_v2_receiver, metrics.clone(), diff --git a/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs b/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs index eedae3a429853..0699fc116d4fc 100644 --- a/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs +++ b/polkadot/node/network/availability-distribution/src/requester/fetch_task/tests.rs @@ -388,7 +388,12 @@ fn get_response( } .encode(), Protocol::ChunkFetchingV2 => if let Some((chunk, proof, index)) = chunk { - v2::ChunkFetchingResponse::Chunk(ErasureChunk { chunk, index, proof }) + v2::ChunkFetchingResponse::Chunk(ErasureChunk { + chunk, + index, + proof, + session_index: None, + }) } else { v2::ChunkFetchingResponse::NoSuchChunk } diff --git a/polkadot/node/network/availability-distribution/src/responder.rs b/polkadot/node/network/availability-distribution/src/responder.rs index 128c45a9e19df..cace89561f10d 100644 --- a/polkadot/node/network/availability-distribution/src/responder.rs +++ b/polkadot/node/network/availability-distribution/src/responder.rs @@ -23,11 +23,15 @@ use futures::{channel::oneshot, select, FutureExt}; use codec::{Decode, Encode}; use fatality::Nested; use polkadot_node_network_protocol::{ + authority_discovery::AuthorityDiscovery, request_response::{v1, v2, IncomingRequest, IncomingRequestReceiver, IsRequest}, UnifiedReputationChange as Rep, }; use polkadot_node_primitives::{AvailableData, ErasureChunk}; -use polkadot_node_subsystem::{messages::AvailabilityStoreMessage, SubsystemSender}; +use polkadot_node_subsystem::{ + messages::{AvailabilityStoreMessage, RewardsStatisticsCollectorMessage}, + SubsystemSender, +}; use polkadot_primitives::{CandidateHash, ValidatorIndex}; use crate::{ @@ -67,13 +71,16 @@ pub async fn run_pov_receiver( } /// Receiver task to be forked as a separate task to handle chunk requests. -pub async fn run_chunk_receivers( +pub async fn run_chunk_receivers( mut sender: Sender, + mut authority_discovery: AD, mut receiver_v1: IncomingRequestReceiver, mut receiver_v2: IncomingRequestReceiver, metrics: Metrics, ) where - Sender: SubsystemSender, + Sender: SubsystemSender + + SubsystemSender, + AD: AuthorityDiscovery + Clone + Sync, { let make_resp_v1 = |chunk: Option| match chunk { None => v1::ChunkFetchingResponse::NoSuchChunk, @@ -89,7 +96,7 @@ pub async fn run_chunk_receivers( select! { res = receiver_v1.recv(|| vec![COST_INVALID_REQUEST]).fuse() => match res.into_nested() { Ok(Ok(msg)) => { - answer_chunk_request_log(&mut sender, msg, make_resp_v1, &metrics).await; + answer_chunk_request_log(&mut sender, &mut authority_discovery, msg, make_resp_v1, &metrics).await; }, Err(fatal) => { gum::debug!( @@ -109,7 +116,7 @@ pub async fn run_chunk_receivers( }, res = receiver_v2.recv(|| vec![COST_INVALID_REQUEST]).fuse() => match res.into_nested() { Ok(Ok(msg)) => { - answer_chunk_request_log(&mut sender, msg.into(), make_resp_v2, &metrics).await; + answer_chunk_request_log(&mut sender, &mut authority_discovery, msg.into(), make_resp_v2, &metrics).await; }, Err(fatal) => { gum::debug!( @@ -158,18 +165,21 @@ pub async fn answer_pov_request_log( /// Variant of `answer_chunk_request` that does Prometheus metric and logging on errors. /// /// Any errors of `answer_request` will simply be logged. -pub async fn answer_chunk_request_log( +pub async fn answer_chunk_request_log( sender: &mut Sender, + authority_discovery: &mut AD, req: IncomingRequest, make_response: MakeResp, metrics: &Metrics, ) where + AD: AuthorityDiscovery, Req: IsRequest + Decode + Encode + Into, Req::Response: Encode, - Sender: SubsystemSender, + Sender: SubsystemSender + + SubsystemSender, MakeResp: Fn(Option) -> Req::Response, { - let res = answer_chunk_request(sender, req, make_response).await; + let res = answer_chunk_request(sender, authority_discovery, req, make_response).await; match res { Ok(result) => metrics.on_served_chunk(if result { SUCCEEDED } else { NOT_FOUND }), Err(err) => { @@ -212,13 +222,16 @@ where /// Answer an incoming chunk request by querying the av store. /// /// Returns: `Ok(true)` if chunk was found and served. -pub async fn answer_chunk_request( +pub async fn answer_chunk_request( sender: &mut Sender, + authority_discovery: &mut AD, req: IncomingRequest, make_response: MakeResp, ) -> Result where - Sender: SubsystemSender, + AD: AuthorityDiscovery, + Sender: SubsystemSender + + SubsystemSender, Req: IsRequest + Decode + Encode + Into, Req::Response: Encode, MakeResp: Fn(Option) -> Req::Response, @@ -231,6 +244,19 @@ where let result = chunk.is_some(); + if let Some(chunk) = &chunk { + let authority_ids = authority_discovery.get_authority_ids_by_peer_id(req.peer).await; + match (chunk.session_index, authority_ids) { + (Some(session_index), Some(authority_ids)) => { + _ = sender.try_send_message(RewardsStatisticsCollectorMessage::ChunkUploaded( + session_index, + authority_ids, + )); + }, + _ => {}, + }; + } + gum::trace!( target: LOG_TARGET, hash = ?payload.candidate_hash, diff --git a/polkadot/node/network/availability-distribution/src/tests/mock.rs b/polkadot/node/network/availability-distribution/src/tests/mock.rs index 0380bc7b7e12c..87ab16fc26274 100644 --- a/polkadot/node/network/availability-distribution/src/tests/mock.rs +++ b/polkadot/node/network/availability-distribution/src/tests/mock.rs @@ -16,21 +16,24 @@ //! Helper functions and tools to generate mock data useful for testing this subsystem. -use std::sync::Arc; - -use sp_keyring::Sr25519Keyring; +use std::{collections::HashSet, sync::Arc}; +use async_trait::async_trait; use polkadot_erasure_coding::{branches, obtain_chunks_v1 as obtain_chunks}; +use polkadot_node_network_protocol::authority_discovery::AuthorityDiscovery; use polkadot_node_primitives::{AvailableData, BlockData, ErasureChunk, PoV, Proof}; use polkadot_primitives::{ - CandidateCommitments, CandidateHash, ChunkIndex, CommittedCandidateReceiptV2, GroupIndex, Hash, - HeadData, Id as ParaId, IndexedVec, OccupiedCore, PersistedValidationData, SessionInfo, - ValidatorIndex, + AuthorityDiscoveryId, CandidateCommitments, CandidateHash, ChunkIndex, + CommittedCandidateReceiptV2, GroupIndex, Hash, HeadData, Id as ParaId, IndexedVec, + OccupiedCore, PersistedValidationData, SessionInfo, ValidatorIndex, }; use polkadot_primitives_test_helpers::{ dummy_collator, dummy_collator_signature, dummy_hash, dummy_validation_code, CandidateDescriptor, CommittedCandidateReceipt, }; +use sc_network::Multiaddr; +use sc_network_types::PeerId; +use sp_keyring::Sr25519Keyring; /// Create dummy session info with two validator groups. pub fn make_session_info() -> SessionInfo { @@ -159,8 +162,29 @@ pub fn get_valid_chunk_data( chunk: chunk.to_vec(), index: ChunkIndex(index as _), proof: Proof::try_from(proof).unwrap(), + session_index: None, }) .nth(chunk_index.0 as usize) .expect("There really should be enough chunks."); (root, chunk) } + +#[derive(Debug, Clone)] +pub struct MockEmptyAuthorityDiscovery; + +#[async_trait] +impl AuthorityDiscovery for MockEmptyAuthorityDiscovery { + async fn get_addresses_by_authority_id( + &mut self, + _authority: AuthorityDiscoveryId, + ) -> Option> { + None + } + + async fn get_authority_ids_by_peer_id( + &mut self, + _peer_id: PeerId, + ) -> Option> { + None + } +} diff --git a/polkadot/node/network/availability-distribution/src/tests/mod.rs b/polkadot/node/network/availability-distribution/src/tests/mod.rs index 078220607c37f..8792122be0eea 100644 --- a/polkadot/node/network/availability-distribution/src/tests/mod.rs +++ b/polkadot/node/network/availability-distribution/src/tests/mod.rs @@ -61,6 +61,7 @@ fn test_harness>( keystore, IncomingRequestReceivers { pov_req_receiver, chunk_req_v1_receiver, chunk_req_v2_receiver }, req_protocol_names, + mock::MockEmptyAuthorityDiscovery, Default::default(), ); let subsystem = subsystem.run(context); diff --git a/polkadot/node/network/availability-recovery/src/lib.rs b/polkadot/node/network/availability-recovery/src/lib.rs index 84d02f1c6ea45..12af83eece670 100644 --- a/polkadot/node/network/availability-recovery/src/lib.rs +++ b/polkadot/node/network/availability-recovery/src/lib.rs @@ -528,13 +528,13 @@ async fn handle_recover( let session_info = session_info.clone(); let n_validators = session_info.validators.len(); - launch_recovery_task( state, ctx, response_sender, recovery_strategies, RecoveryParams { + session_index, validator_authority_keys: session_info.discovery_keys.clone(), n_validators, threshold: recovery_threshold(n_validators)?, diff --git a/polkadot/node/network/availability-recovery/src/task/mod.rs b/polkadot/node/network/availability-recovery/src/task/mod.rs index 51b6d9adeee94..beead118f35d4 100644 --- a/polkadot/node/network/availability-recovery/src/task/mod.rs +++ b/polkadot/node/network/availability-recovery/src/task/mod.rs @@ -32,16 +32,22 @@ use crate::{metrics::Metrics, ErasureTask, PostRecoveryCheck, LOG_TARGET}; use codec::Encode; use polkadot_node_primitives::AvailableData; -use polkadot_node_subsystem::{messages::AvailabilityStoreMessage, overseer, RecoveryError}; -use polkadot_primitives::{AuthorityDiscoveryId, CandidateHash, Hash}; +use polkadot_node_subsystem::{ + messages::AvailabilityStoreMessage, overseer, RecoveryError, SubsystemSender, +}; +use polkadot_primitives::{AuthorityDiscoveryId, CandidateHash, Hash, SessionIndex}; use sc_network::ProtocolName; use futures::channel::{mpsc, oneshot}; +use polkadot_node_subsystem::messages::RewardsStatisticsCollectorMessage; use std::collections::VecDeque; /// Recovery parameters common to all strategies in a `RecoveryTask`. #[derive(Clone)] pub struct RecoveryParams { + /// Session index where the validators belong to + pub session_index: SessionIndex, + /// Discovery ids of `validators`. pub validator_authority_keys: Vec, @@ -96,7 +102,8 @@ pub struct RecoveryTask { impl RecoveryTask where - Sender: overseer::AvailabilityRecoverySenderTrait, + Sender: overseer::AvailabilityRecoverySenderTrait + + SubsystemSender, { /// Instantiate a new recovery task. pub fn new( @@ -174,11 +181,18 @@ where self.params.metrics.on_recovery_invalid(strategy_type), _ => self.params.metrics.on_recovery_failed(strategy_type), } - return Err(err); + _ = self.state.get_download_chunks_metrics(); + return Err(err) }, Ok(data) => { self.params.metrics.on_recovery_succeeded(strategy_type, data.encoded_size()); - return Ok(data); + _ = self.sender.try_send_message( + RewardsStatisticsCollectorMessage::ChunksDownloaded( + self.params.session_index, + self.state.get_download_chunks_metrics(), + ), + ); + return Ok(data) }, } } diff --git a/polkadot/node/network/availability-recovery/src/task/strategy/full.rs b/polkadot/node/network/availability-recovery/src/task/strategy/full.rs index 882bdbd8cdafd..fd4d29ff33f49 100644 --- a/polkadot/node/network/availability-recovery/src/task/strategy/full.rs +++ b/polkadot/node/network/availability-recovery/src/task/strategy/full.rs @@ -62,7 +62,7 @@ impl RecoveryStrategy async fn run( mut self: Box, - _: &mut State, + state: &mut State, sender: &mut Sender, common_params: &RecoveryParams, ) -> Result { @@ -126,7 +126,8 @@ impl RecoveryStrategy ); common_params.metrics.on_full_request_succeeded(); - return Ok(data); + state.note_received_available_data(validator_index); + return Ok(data) }, None => { common_params.metrics.on_full_request_invalid(); diff --git a/polkadot/node/network/availability-recovery/src/task/strategy/mod.rs b/polkadot/node/network/availability-recovery/src/task/strategy/mod.rs index 2c6b20a234289..706a8554d6b71 100644 --- a/polkadot/node/network/availability-recovery/src/task/strategy/mod.rs +++ b/polkadot/node/network/availability-recovery/src/task/strategy/mod.rs @@ -211,17 +211,36 @@ pub struct State { /// A record of errors returned when requesting a chunk from a validator. recorded_errors: HashMap<(AuthorityDiscoveryId, ValidatorIndex), ErrorRecord>, + + // a counter of received available data including individual chunks and full available data + received_available_data_by: HashMap, } impl State { pub fn new() -> Self { - Self { received_chunks: BTreeMap::new(), recorded_errors: HashMap::new() } + Self { + received_chunks: BTreeMap::new(), + recorded_errors: HashMap::new(), + received_available_data_by: HashMap::new(), + } } fn insert_chunk(&mut self, chunk_index: ChunkIndex, chunk: Chunk) { self.received_chunks.insert(chunk_index, chunk); } + // increase the counter of received available data of the given validator index + fn note_received_available_data(&mut self, sender: ValidatorIndex) { + let counter = self.received_available_data_by.entry(sender).or_default(); + *counter += 1; + } + + // drain the record of chunks received per validator returning + // all the contained data + pub fn get_download_chunks_metrics(&mut self) -> HashMap { + self.received_available_data_by.drain().collect() + } + fn chunk_count(&self) -> usize { self.received_chunks.len() } @@ -506,6 +525,7 @@ impl State { chunk.index, Chunk { chunk: chunk.chunk, validator_index }, ); + self.note_received_available_data(validator_index); } else { metrics.on_chunk_request_invalid(strategy_type); error_count += 1; @@ -669,6 +689,7 @@ mod tests { let (erasure_task_tx, _erasure_task_rx) = mpsc::channel(10); Self { + session_index: 0, validator_authority_keys: validator_authority_id(&validators), n_validators: validators.len(), threshold: recovery_threshold(validators.len()).unwrap(), diff --git a/polkadot/node/network/availability-recovery/src/tests.rs b/polkadot/node/network/availability-recovery/src/tests.rs index 2da3ea54d284f..08756188d10d5 100644 --- a/polkadot/node/network/availability-recovery/src/tests.rs +++ b/polkadot/node/network/availability-recovery/src/tests.rs @@ -15,6 +15,7 @@ // along with Polkadot. If not, see . use crate::task::{REGULAR_CHUNKS_REQ_RETRY_LIMIT, SYSTEMATIC_CHUNKS_REQ_RETRY_LIMIT}; +use polkadot_node_subsystem::messages::RewardsStatisticsCollectorMessage; use super::*; use std::{result::Result, sync::Arc, time::Duration}; @@ -792,6 +793,14 @@ fn availability_is_recovered_from_chunks_if_no_group_provided(#[case] systematic ) .await; + // Consume the statistics message sent after successful recovery + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RewardsStatisticsCollector( + RewardsStatisticsCollectorMessage::ChunksDownloaded(_, _) + ) + ); + // Recovered data should match the original one. assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data); @@ -921,6 +930,14 @@ fn availability_is_recovered_from_chunks_even_if_backing_group_supplied_if_chunk ) .await; + // Consume the statistics message sent after successful recovery + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RewardsStatisticsCollector( + RewardsStatisticsCollectorMessage::ChunksDownloaded(_, _) + ) + ); + // Recovered data should match the original one. assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data); @@ -1492,6 +1509,14 @@ fn recovers_from_only_chunks_if_pov_large( ) .await; + // Consume the statistics message sent after successful recovery + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RewardsStatisticsCollector( + RewardsStatisticsCollectorMessage::ChunksDownloaded(_, _) + ) + ); + // Recovered data should match the original one. assert_eq!(rx.await.unwrap().unwrap(), test_state.available_data); diff --git a/polkadot/node/network/protocol/src/request_response/v1.rs b/polkadot/node/network/protocol/src/request_response/v1.rs index 9108dee4f6148..591286d980924 100644 --- a/polkadot/node/network/protocol/src/request_response/v1.rs +++ b/polkadot/node/network/protocol/src/request_response/v1.rs @@ -82,7 +82,7 @@ pub struct ChunkResponse { } impl From for ChunkResponse { - fn from(ErasureChunk { chunk, index: _, proof }: ErasureChunk) -> Self { + fn from(ErasureChunk { chunk, index: _, proof, session_index: _ }: ErasureChunk) -> Self { ChunkResponse { chunk, proof } } } @@ -90,7 +90,12 @@ impl From for ChunkResponse { impl ChunkResponse { /// Re-build an `ErasureChunk` from response and request. pub fn recombine_into_chunk(self, req: &ChunkFetchingRequest) -> ErasureChunk { - ErasureChunk { chunk: self.chunk, proof: self.proof, index: req.index.into() } + ErasureChunk { + chunk: self.chunk, + proof: self.proof, + index: req.index.into(), + session_index: None, + } } } diff --git a/polkadot/node/overseer/src/dummy.rs b/polkadot/node/overseer/src/dummy.rs index 5caca808d90c5..5dbacc514af86 100644 --- a/polkadot/node/overseer/src/dummy.rs +++ b/polkadot/node/overseer/src/dummy.rs @@ -89,6 +89,7 @@ pub fn dummy_overseer_builder( DummySubsystem, DummySubsystem, DummySubsystem, + DummySubsystem, >, SubsystemError, > @@ -133,6 +134,7 @@ pub fn one_for_all_overseer_builder( Sub, Sub, Sub, + Sub, >, SubsystemError, > @@ -163,7 +165,8 @@ where + Subsystem, SubsystemError> + Subsystem, SubsystemError> + Subsystem, SubsystemError> - + Subsystem, SubsystemError>, + + Subsystem, SubsystemError> + + Subsystem, SubsystemError>, { let metrics = ::register(registry)?; @@ -192,6 +195,7 @@ where .dispute_distribution(subsystem.clone()) .chain_selection(subsystem.clone()) .prospective_parachains(subsystem.clone()) + .rewards_statistics_collector(subsystem.clone()) .activation_external_listeners(Default::default()) .active_leaves(Default::default()) .spawner(SpawnGlue(spawner)) diff --git a/polkadot/node/overseer/src/lib.rs b/polkadot/node/overseer/src/lib.rs index b3d5ffcaed6bc..c8c075643c435 100644 --- a/polkadot/node/overseer/src/lib.rs +++ b/polkadot/node/overseer/src/lib.rs @@ -84,7 +84,8 @@ use polkadot_node_subsystem_types::messages::{ ChainApiMessage, ChainSelectionMessage, CollationGenerationMessage, CollatorProtocolMessage, DisputeCoordinatorMessage, DisputeDistributionMessage, GossipSupportMessage, NetworkBridgeRxMessage, NetworkBridgeTxMessage, ProspectiveParachainsMessage, - ProvisionerMessage, RuntimeApiMessage, StatementDistributionMessage, + ProvisionerMessage, RewardsStatisticsCollectorMessage, RuntimeApiMessage, + StatementDistributionMessage, }; pub use polkadot_node_subsystem_types::{ @@ -518,6 +519,7 @@ pub struct Overseer { #[subsystem(AvailabilityDistributionMessage, sends: [ AvailabilityStoreMessage, ChainApiMessage, + RewardsStatisticsCollectorMessage, RuntimeApiMessage, NetworkBridgeTxMessage, ])] @@ -527,6 +529,7 @@ pub struct Overseer { NetworkBridgeTxMessage, RuntimeApiMessage, AvailabilityStoreMessage, + RewardsStatisticsCollectorMessage, ])] availability_recovery: AvailabilityRecovery, @@ -607,6 +610,7 @@ pub struct Overseer { CandidateValidationMessage, ChainApiMessage, ChainSelectionMessage, + RewardsStatisticsCollectorMessage, DisputeCoordinatorMessage, RuntimeApiMessage, ])] @@ -616,6 +620,7 @@ pub struct Overseer { CandidateValidationMessage, ChainApiMessage, ChainSelectionMessage, + RewardsStatisticsCollectorMessage, DisputeCoordinatorMessage, RuntimeApiMessage, NetworkBridgeTxMessage, @@ -659,6 +664,12 @@ pub struct Overseer { ])] prospective_parachains: ProspectiveParachains, + #[subsystem(RewardsStatisticsCollectorMessage, sends: [ + RuntimeApiMessage, + ChainApiMessage, + ])] + rewards_statistics_collector: RewardsStatisticsCollector, + /// External listeners waiting for a hash to be in the active-leave set. pub activation_external_listeners: HashMap>>>, diff --git a/polkadot/node/overseer/src/tests.rs b/polkadot/node/overseer/src/tests.rs index 06f94fd1c8dc2..4b90dd21eb7d6 100644 --- a/polkadot/node/overseer/src/tests.rs +++ b/polkadot/node/overseer/src/tests.rs @@ -980,10 +980,18 @@ fn test_prospective_parachains_msg() -> ProspectiveParachainsMessage { ) } +fn test_rewards_statistics_collector_msg() -> RewardsStatisticsCollectorMessage { + RewardsStatisticsCollectorMessage::CandidateApproved( + Hash::zero(), + 1, + vec![ValidatorIndex(1), ValidatorIndex(2)], + ) +} + // Checks that `stop`, `broadcast_signal` and `broadcast_message` are implemented correctly. #[test] fn overseer_all_subsystems_receive_signals_and_messages() { - const NUM_SUBSYSTEMS: usize = 24; + const NUM_SUBSYSTEMS: usize = 25; // -4 for BitfieldSigning, GossipSupport, AvailabilityDistribution and PvfCheckerSubsystem. const NUM_SUBSYSTEMS_MESSAGED: usize = NUM_SUBSYSTEMS - 4; @@ -1081,6 +1089,11 @@ fn overseer_all_subsystems_receive_signals_and_messages() { handle .send_msg_anon(AllMessages::ProspectiveParachains(test_prospective_parachains_msg())) .await; + handle + .send_msg_anon(AllMessages::RewardsStatisticsCollector( + test_rewards_statistics_collector_msg(), + )) + .await; // handle.send_msg_anon(AllMessages::PvfChecker(test_pvf_checker_msg())).await; // Wait until all subsystems have received. Otherwise the messages might race against @@ -1140,6 +1153,7 @@ fn context_holds_onto_message_until_enough_signals_received() { let (pvf_checker_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); let (prospective_parachains_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); let (approval_voting_parallel_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (rewards_statistics_collector_tx, _) = metered::channel(CHANNEL_CAPACITY); let (candidate_validation_unbounded_tx, _) = metered::unbounded(); let (candidate_backing_unbounded_tx, _) = metered::unbounded(); @@ -1165,6 +1179,7 @@ fn context_holds_onto_message_until_enough_signals_received() { let (pvf_checker_unbounded_tx, _) = metered::unbounded(); let (prospective_parachains_unbounded_tx, _) = metered::unbounded(); let (approval_voting_parallel_unbounded_tx, _) = metered::unbounded(); + let (rewards_statistics_collector_unbounded_tx, _) = metered::unbounded(); let channels_out = ChannelsOut { candidate_validation: candidate_validation_bounded_tx.clone(), @@ -1191,6 +1206,7 @@ fn context_holds_onto_message_until_enough_signals_received() { pvf_checker: pvf_checker_bounded_tx.clone(), prospective_parachains: prospective_parachains_bounded_tx.clone(), approval_voting_parallel: approval_voting_parallel_tx.clone(), + rewards_statistics_collector: rewards_statistics_collector_tx.clone(), candidate_validation_unbounded: candidate_validation_unbounded_tx.clone(), candidate_backing_unbounded: candidate_backing_unbounded_tx.clone(), @@ -1216,6 +1232,7 @@ fn context_holds_onto_message_until_enough_signals_received() { pvf_checker_unbounded: pvf_checker_unbounded_tx.clone(), prospective_parachains_unbounded: prospective_parachains_unbounded_tx.clone(), approval_voting_parallel_unbounded: approval_voting_parallel_unbounded_tx.clone(), + rewards_statistics_collector_unbounded: rewards_statistics_collector_unbounded_tx.clone(), }; let (mut signal_tx, signal_rx) = metered::channel(CHANNEL_CAPACITY); @@ -1343,6 +1360,7 @@ impl IsPrioMessage for ProvisionerMessage {} impl IsPrioMessage for RuntimeApiMessage {} impl IsPrioMessage for BitfieldSigningMessage {} impl IsPrioMessage for PvfCheckerMessage {} +impl IsPrioMessage for RewardsStatisticsCollectorMessage {} impl Subsystem for SlowSubsystem where diff --git a/polkadot/node/primitives/src/lib.rs b/polkadot/node/primitives/src/lib.rs index c4516c561849a..7f6054794aba4 100644 --- a/polkadot/node/primitives/src/lib.rs +++ b/polkadot/node/primitives/src/lib.rs @@ -652,6 +652,8 @@ pub struct ErasureChunk { pub index: ChunkIndex, /// Proof for this chunk's branch in the Merkle tree. pub proof: Proof, + /// Session the chunk belongs to + pub session_index: Option, } impl ErasureChunk { diff --git a/polkadot/node/service/Cargo.toml b/polkadot/node/service/Cargo.toml index 50bb0dc698669..1cdfed19f6aa0 100644 --- a/polkadot/node/service/Cargo.toml +++ b/polkadot/node/service/Cargo.toml @@ -130,6 +130,7 @@ polkadot-node-core-prospective-parachains = { optional = true, workspace = true, polkadot-node-core-provisioner = { optional = true, workspace = true, default-features = true } polkadot-node-core-pvf = { optional = true, workspace = true, default-features = true } polkadot-node-core-pvf-checker = { optional = true, workspace = true, default-features = true } +polkadot-node-core-rewards-statistics-collector = { optional = true, workspace = true, default-features = true } polkadot-node-core-runtime-api = { optional = true, workspace = true, default-features = true } polkadot-statement-distribution = { optional = true, workspace = true, default-features = true } @@ -172,6 +173,7 @@ full-node = [ "polkadot-node-core-provisioner", "polkadot-node-core-pvf", "polkadot-node-core-pvf-checker", + "polkadot-node-core-rewards-statistics-collector", "polkadot-node-core-runtime-api", "polkadot-statement-distribution", ] diff --git a/polkadot/node/service/src/builder/mod.rs b/polkadot/node/service/src/builder/mod.rs index 3156f214468c6..2046463a309d4 100644 --- a/polkadot/node/service/src/builder/mod.rs +++ b/polkadot/node/service/src/builder/mod.rs @@ -41,6 +41,7 @@ use polkadot_node_core_chain_selection::{ self as chain_selection_subsystem, Config as ChainSelectionConfig, }; use polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig; +use polkadot_node_core_rewards_statistics_collector::Config as RewardsStatisticsCollectorConfig; use polkadot_node_network_protocol::{ peer_set::{PeerSet, PeerSetProtocolNames}, request_response::{IncomingRequest, ReqProtocolNames}, @@ -76,6 +77,8 @@ pub struct NewFullParams { pub node_version: Option, /// Whether the node is attempting to run as a secure validator. pub secure_validator_mode: bool, + /// Whether the node will publish collected approval metrics per validator + pub verbose_approval_metrics: bool, /// An optional path to a directory containing the workers. pub workers_path: Option, /// Optional custom names for the prepare and execute workers. @@ -197,6 +200,7 @@ where telemetry_worker_handle: _, node_version, secure_validator_mode, + verbose_approval_metrics, workers_path, workers_names, overseer_gen, @@ -436,6 +440,9 @@ where }, }; + let rewards_statistics_collector_config = + RewardsStatisticsCollectorConfig { verbose_approval_metrics }; + Some(ExtendedOverseerGenArgs { keystore: keystore_container.local_keystore(), parachains_db, @@ -452,6 +459,7 @@ where fetch_chunks_threshold, invulnerable_ah_collators, collator_protocol_hold_off, + rewards_statistics_collector_config, }) }; diff --git a/polkadot/node/service/src/overseer.rs b/polkadot/node/service/src/overseer.rs index d6ed752b4c31c..0f8d5f254ff9e 100644 --- a/polkadot/node/service/src/overseer.rs +++ b/polkadot/node/service/src/overseer.rs @@ -25,6 +25,7 @@ use polkadot_node_core_av_store::Config as AvailabilityConfig; use polkadot_node_core_candidate_validation::Config as CandidateValidationConfig; use polkadot_node_core_chain_selection::Config as ChainSelectionConfig; use polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig; +use polkadot_node_core_rewards_statistics_collector::Config as RewardsStatisticsCollectorConfig; use polkadot_node_network_protocol::{ peer_set::{PeerSet, PeerSetProtocolNames}, request_response::{ @@ -75,6 +76,7 @@ pub use polkadot_node_core_dispute_coordinator::DisputeCoordinatorSubsystem; pub use polkadot_node_core_prospective_parachains::ProspectiveParachainsSubsystem; pub use polkadot_node_core_provisioner::ProvisionerSubsystem; pub use polkadot_node_core_pvf_checker::PvfCheckerSubsystem; +pub use polkadot_node_core_rewards_statistics_collector::RewardsStatisticsCollector; pub use polkadot_node_core_runtime_api::RuntimeApiSubsystem; pub use polkadot_statement_distribution::StatementDistributionSubsystem; @@ -133,6 +135,7 @@ pub struct ExtendedOverseerGenArgs { pub candidate_req_v2_receiver: IncomingRequestReceiver, /// Configuration for the approval voting subsystem. pub approval_voting_config: ApprovalVotingConfig, + pub rewards_statistics_collector_config: RewardsStatisticsCollectorConfig, /// Receiver for incoming disputes. pub dispute_req_receiver: IncomingRequestReceiver, /// Configuration for the dispute coordinator subsystem. @@ -183,6 +186,7 @@ pub fn validator_overseer_builder( fetch_chunks_threshold, invulnerable_ah_collators, collator_protocol_hold_off, + rewards_statistics_collector_config, }: ExtendedOverseerGenArgs, ) -> Result< InitializedOverseerBuilder< @@ -192,7 +196,7 @@ pub fn validator_overseer_builder( PvfCheckerSubsystem, CandidateBackingSubsystem, StatementDistributionSubsystem, - AvailabilityDistributionSubsystem, + AvailabilityDistributionSubsystem, AvailabilityRecoverySubsystem, BitfieldSigningSubsystem, BitfieldDistributionSubsystem, @@ -218,6 +222,7 @@ pub fn validator_overseer_builder( DisputeDistributionSubsystem, ChainSelectionSubsystem, ProspectiveParachainsSubsystem, + RewardsStatisticsCollector, >, Error, > @@ -261,6 +266,7 @@ where chunk_req_v2_receiver, }, req_protocol_names.clone(), + authority_discovery_service.clone(), Metrics::register(registry)?, )) .availability_recovery(AvailabilityRecoverySubsystem::for_validator( @@ -350,6 +356,10 @@ where )) .chain_selection(ChainSelectionSubsystem::new(chain_selection_config, parachains_db)) .prospective_parachains(ProspectiveParachainsSubsystem::new(Metrics::register(registry)?)) + .rewards_statistics_collector(RewardsStatisticsCollector::new( + Metrics::register(registry)?, + rewards_statistics_collector_config, + )) .activation_external_listeners(Default::default()) .active_leaves(Default::default()) .supports_parachains(runtime_client) @@ -416,6 +426,7 @@ pub fn collator_overseer_builder( DummySubsystem, DummySubsystem, DummySubsystem, + DummySubsystem, >, Error, > @@ -495,6 +506,7 @@ where .dispute_distribution(DummySubsystem) .chain_selection(DummySubsystem) .prospective_parachains(DummySubsystem) + .rewards_statistics_collector(DummySubsystem) .activation_external_listeners(Default::default()) .active_leaves(Default::default()) .supports_parachains(runtime_client) diff --git a/polkadot/node/subsystem-bench/Cargo.toml b/polkadot/node/subsystem-bench/Cargo.toml index 99ece70d02a02..d537916fdada8 100644 --- a/polkadot/node/subsystem-bench/Cargo.toml +++ b/polkadot/node/subsystem-bench/Cargo.toml @@ -36,6 +36,7 @@ polkadot-availability-recovery = { features = ["subsystem-benchmarks"], workspac polkadot-dispute-distribution = { workspace = true, default-features = true } polkadot-node-core-av-store = { workspace = true, default-features = true } polkadot-node-core-dispute-coordinator = { workspace = true, default-features = true } +polkadot-node-core-rewards-statistics-collector = { workspace = true, default-features = true } polkadot-node-network-protocol = { workspace = true, default-features = true } polkadot-node-primitives = { workspace = true, default-features = true } polkadot-node-subsystem = { workspace = true, default-features = true } diff --git a/polkadot/node/subsystem-bench/src/lib/approval/mod.rs b/polkadot/node/subsystem-bench/src/lib/approval/mod.rs index f4dfa47ff7621..451abe4c00469 100644 --- a/polkadot/node/subsystem-bench/src/lib/approval/mod.rs +++ b/polkadot/node/subsystem-bench/src/lib/approval/mod.rs @@ -57,6 +57,7 @@ use polkadot_node_primitives::approval::time::{ use polkadot_node_core_approval_voting::{ ApprovalVotingSubsystem, Config as ApprovalVotingConfig, RealAssignmentCriteria, }; +use polkadot_node_core_rewards_statistics_collector::RewardsStatisticsCollector as RewardsStatisticsCollectorSubsystem; use polkadot_node_network_protocol::v3 as protocol_v3; use polkadot_node_primitives::approval::{self, v1::RelayVRFStory}; use polkadot_node_subsystem::{ @@ -853,6 +854,9 @@ fn build_overseer( let mock_rx_bridge = MockNetworkBridgeRx::new(network_receiver, None); let overseer_metrics = OverseerMetrics::try_register(&dependencies.registry).unwrap(); let task_handle = spawn_task_handle.clone(); + + let rewards_statistics_collector_subsystem = RewardsStatisticsCollectorSubsystem::default(); + let dummy = dummy_builder!(task_handle, overseer_metrics) .replace_chain_api(|_| mock_chain_api) .replace_chain_selection(|_| mock_chain_selection) @@ -860,7 +864,8 @@ fn build_overseer( .replace_network_bridge_tx(|_| mock_tx_bridge) .replace_network_bridge_rx(|_| mock_rx_bridge) .replace_availability_recovery(|_| MockAvailabilityRecovery::new()) - .replace_candidate_validation(|_| MockCandidateValidation::new()); + .replace_candidate_validation(|_| MockCandidateValidation::new()) + .replace_rewards_statistics_collector(|_| rewards_statistics_collector_subsystem); let (overseer, raw_handle) = if state.options.approval_voting_parallel_enabled { let approval_voting_parallel = ApprovalVotingParallelSubsystem::with_config_and_clock( @@ -1172,7 +1177,12 @@ pub async fn bench_approvals_run( ); env.collect_resource_usage( - &["approval-distribution", "approval-voting", "approval-voting-parallel"], + &[ + "approval-distribution", + "approval-voting", + "approval-voting-parallel", + "rewards-statistics-collector", + ], true, ) } diff --git a/polkadot/node/subsystem-bench/src/lib/availability/mod.rs b/polkadot/node/subsystem-bench/src/lib/availability/mod.rs index b346f988a3c90..1b496055acee7 100644 --- a/polkadot/node/subsystem-bench/src/lib/availability/mod.rs +++ b/polkadot/node/subsystem-bench/src/lib/availability/mod.rs @@ -28,6 +28,7 @@ use crate::{ network::new_network, usage::BenchmarkUsage, }; +use async_trait::async_trait; use colored::Colorize; use futures::{channel::oneshot, stream::FuturesUnordered, StreamExt}; @@ -40,6 +41,7 @@ use polkadot_availability_recovery::{AvailabilityRecoverySubsystem, RecoveryStra use polkadot_node_core_av_store::AvailabilityStoreSubsystem; use polkadot_node_metrics::metrics::Metrics; use polkadot_node_network_protocol::{ + authority_discovery::AuthorityDiscovery, request_response::{v1, v2, IncomingRequest}, OurView, }; @@ -49,15 +51,15 @@ use polkadot_node_subsystem::{ }; use polkadot_node_subsystem_types::messages::{AvailabilityStoreMessage, NetworkBridgeEvent}; use polkadot_overseer::{metrics::Metrics as OverseerMetrics, Handle as OverseerHandle}; -use polkadot_primitives::{Block, CoreIndex, GroupIndex, Hash}; +use polkadot_primitives::{AuthorityDiscoveryId, Block, CoreIndex, GroupIndex, Hash}; use sc_network::request_responses::{IncomingRequest as RawIncomingRequest, ProtocolConfig}; -use std::{ops::Sub, sync::Arc, time::Instant}; +use std::{collections::HashSet, ops::Sub, sync::Arc, time::Instant}; use strum::Display; +use sc_network_types::{multiaddr::Multiaddr, PeerId}; use sc_service::SpawnTaskHandle; use serde::{Deserialize, Serialize}; pub use test_state::TestState; - mod av_store_helpers; mod test_state; @@ -120,7 +122,7 @@ fn build_overseer_for_availability_write( spawn_task_handle: SpawnTaskHandle, runtime_api: MockRuntimeApi, (network_bridge_tx, network_bridge_rx): (MockNetworkBridgeTx, MockNetworkBridgeRx), - availability_distribution: AvailabilityDistributionSubsystem, + availability_distribution: AvailabilityDistributionSubsystem, chain_api: MockChainApi, availability_store: AvailabilityStoreSubsystem, bitfield_distribution: BitfieldDistribution, @@ -267,6 +269,7 @@ pub fn prepare_test( chunk_req_v2_receiver, }, state.req_protocol_names.clone(), + TestAuthorityDiscovery, Metrics::try_register(&dependencies.registry).unwrap(), ); @@ -506,3 +509,23 @@ pub async fn benchmark_availability_write( false, ) } + +#[derive(Debug, Clone)] +pub struct TestAuthorityDiscovery; + +#[async_trait] +impl AuthorityDiscovery for TestAuthorityDiscovery { + async fn get_addresses_by_authority_id( + &mut self, + _authority: AuthorityDiscoveryId, + ) -> Option> { + None + } + + async fn get_authority_ids_by_peer_id( + &mut self, + _peer_id: PeerId, + ) -> Option> { + None + } +} diff --git a/polkadot/node/subsystem-bench/src/lib/mock/dummy.rs b/polkadot/node/subsystem-bench/src/lib/mock/dummy.rs index 092a8fc5f4c12..d4f0fbb36dc27 100644 --- a/polkadot/node/subsystem-bench/src/lib/mock/dummy.rs +++ b/polkadot/node/subsystem-bench/src/lib/mock/dummy.rs @@ -99,3 +99,4 @@ mock!(ApprovalVoting); mock!(ApprovalVotingParallel); mock!(ApprovalDistribution); mock!(RuntimeApi); +mock!(RewardsStatisticsCollector); diff --git a/polkadot/node/subsystem-bench/src/lib/mock/mod.rs b/polkadot/node/subsystem-bench/src/lib/mock/mod.rs index a3702dfe792f4..181dd11ad8841 100644 --- a/polkadot/node/subsystem-bench/src/lib/mock/mod.rs +++ b/polkadot/node/subsystem-bench/src/lib/mock/mod.rs @@ -71,6 +71,7 @@ macro_rules! dummy_builder { .gossip_support(MockGossipSupport {}) .dispute_distribution(MockDisputeDistribution {}) .prospective_parachains(MockProspectiveParachains {}) + .rewards_statistics_collector(MockRewardsStatisticsCollector {}) .activation_external_listeners(Default::default()) .active_leaves(Default::default()) .metrics($metrics) diff --git a/polkadot/node/subsystem-test-helpers/src/lib.rs b/polkadot/node/subsystem-test-helpers/src/lib.rs index 6ed5896972982..ee82910e3a009 100644 --- a/polkadot/node/subsystem-test-helpers/src/lib.rs +++ b/polkadot/node/subsystem-test-helpers/src/lib.rs @@ -564,6 +564,7 @@ pub fn derive_erasure_chunks_with_proofs_and_root( chunk: chunk.to_vec(), index: ChunkIndex(index as _), proof: Proof::try_from(proof).unwrap(), + session_index: None, }) .collect::>(); diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs index e813fae96afcd..de76471d6d0d0 100644 --- a/polkadot/node/subsystem-types/src/messages.rs +++ b/polkadot/node/subsystem-types/src/messages.rs @@ -1471,3 +1471,25 @@ pub enum ProspectiveParachainsMessage { oneshot::Sender>, ), } + +/// Messages sent to the Statistics Collector subsystem. +#[derive(Debug)] +pub enum RewardsStatisticsCollectorMessage { + /// After retrieving chunks from validators we should collect + /// how many chunks we have downloaded and who provided + ChunksDownloaded(SessionIndex, HashMap), + + /// Uploading chunks to some validator after his request + /// we have the set of authority discovery id for the peer who + /// requested the chunk, and the session where the chunk belongs to + ChunkUploaded(SessionIndex, HashSet), + + /// Candidate received enough approval from the validators, + /// and we receive the set of validators for which the approvals + /// effectively count in for the candidate approval + CandidateApproved(Hash, BlockNumber, Vec), + + /// Set of candidates who was supposed to issue approvals for + /// a given candidate but was not sent or delivered in time + NoShows(Hash, BlockNumber, Vec), +} diff --git a/polkadot/node/test/service/src/lib.rs b/polkadot/node/test/service/src/lib.rs index be01a667c73d5..15eb0507fdaaa 100644 --- a/polkadot/node/test/service/src/lib.rs +++ b/polkadot/node/test/service/src/lib.rs @@ -88,6 +88,7 @@ pub fn new_full( enable_beefy: true, force_authoring_backoff: false, telemetry_worker_handle: None, + verbose_approval_metrics: false, node_version: None, secure_validator_mode: false, workers_path, diff --git a/polkadot/parachain/test-parachains/adder/collator/src/main.rs b/polkadot/parachain/test-parachains/adder/collator/src/main.rs index f6ed513c76a33..4c9b353faa944 100644 --- a/polkadot/parachain/test-parachains/adder/collator/src/main.rs +++ b/polkadot/parachain/test-parachains/adder/collator/src/main.rs @@ -101,6 +101,7 @@ fn main() -> Result<()> { keep_finalized_for: None, invulnerable_ah_collators: HashSet::new(), collator_protocol_hold_off: None, + verbose_approval_metrics: false, }, ) .map_err(|e| e.to_string())?; diff --git a/polkadot/parachain/test-parachains/undying/collator/src/main.rs b/polkadot/parachain/test-parachains/undying/collator/src/main.rs index 50392bce7d4f1..2d120cd714fb8 100644 --- a/polkadot/parachain/test-parachains/undying/collator/src/main.rs +++ b/polkadot/parachain/test-parachains/undying/collator/src/main.rs @@ -96,6 +96,7 @@ fn main() -> Result<()> { enable_beefy: false, force_authoring_backoff: false, telemetry_worker_handle: None, + verbose_approval_metrics: false, // Collators don't spawn PVF workers, so we can disable version checks. node_version: None, diff --git a/polkadot/zombienet-sdk-tests/tests/functional/mod.rs b/polkadot/zombienet-sdk-tests/tests/functional/mod.rs index e28cfb4039303..bd3e4789bd736 100644 --- a/polkadot/zombienet-sdk-tests/tests/functional/mod.rs +++ b/polkadot/zombienet-sdk-tests/tests/functional/mod.rs @@ -6,6 +6,8 @@ mod approved_peer_mixed_validators; mod async_backing_6_seconds_rate; mod dispute_old_finalized; mod duplicate_collations; +mod rewards_statistics_collector; +mod rewards_statistics_mixed_validators; mod shared_core_idle_parachain; mod spam_statement_distribution_requests; mod sync_backing; diff --git a/polkadot/zombienet-sdk-tests/tests/functional/rewards_statistics_collector.rs b/polkadot/zombienet-sdk-tests/tests/functional/rewards_statistics_collector.rs new file mode 100644 index 0000000000000..89b756c9ba00b --- /dev/null +++ b/polkadot/zombienet-sdk-tests/tests/functional/rewards_statistics_collector.rs @@ -0,0 +1,144 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Test that nodes fetch availability chunks early for scheduled cores and normally for occupied +// core. + +use anyhow::anyhow; +use cumulus_zombienet_sdk_helpers::{ + assert_finality_lag, assert_para_throughput, report_label_with_attributes, +}; +use polkadot_primitives::{Id as ParaId, SessionIndex}; +use serde_json::json; +use std::{collections::HashMap, ops::Range}; +use subxt::{OnlineClient, PolkadotConfig}; +use zombienet_orchestrator::network::Network; +use zombienet_sdk::{LocalFileSystem, NetworkConfigBuilder}; + +#[tokio::test(flavor = "multi_thread")] +async fn rewards_statistics_collector_test() -> Result<(), anyhow::Error> { + let _ = env_logger::try_init_from_env( + env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"), + ); + + let images = zombienet_sdk::environment::get_images_from_env(); + + let config = NetworkConfigBuilder::new() + .with_relaychain(|r| { + let r = r + .with_chain("rococo-local") + .with_default_command("polkadot") + .with_default_image(images.polkadot.as_str()) + .with_default_args(vec![ + ("-lparachain=debug").into(), + ("--verbose-approval-metrics=true").into(), + ]) + .with_genesis_overrides(json!({ + "configuration": { + "config": { + "scheduler_params": { + "group_rotation_frequency": 4 + } + } + } + })) + .with_node(|node| node.with_name("validator-0")); + + (1..12) + .fold(r, |acc, i| acc.with_node(|node| node.with_name(&format!("validator-{i}")))) + }) + .with_parachain(|p| { + p.with_id(2000) + .with_default_command("adder-collator") + .with_default_image( + std::env::var("COL_IMAGE") + .unwrap_or("docker.io/paritypr/colander:latest".to_string()) + .as_str(), + ) + .cumulus_based(false) + .with_default_args(vec![("-lparachain=debug").into()]) + .with_collator(|n| n.with_name("collator-adder-2000")) + }) + .with_parachain(|p| { + p.with_id(2001) + .with_default_command("polkadot-parachain") + .with_default_image(images.cumulus.as_str()) + .with_default_args(vec![("-lparachain=debug,aura=debug").into()]) + .with_collator(|n| n.with_name("collator-2001")) + }) + .build() + .map_err(|e| { + let errs = e.into_iter().map(|e| e.to_string()).collect::>().join(" "); + anyhow!("config errs: {errs}") + })?; + + let spawn_fn = zombienet_sdk::environment::get_spawn_fn(); + let network = spawn_fn(config).await?; + + let relay_node = network.get_node("validator-0")?; + let relay_client: OnlineClient = relay_node.wait_client().await?; + + assert_para_throughput( + &relay_client, + 15, + [(ParaId::from(2000), 11..16), (ParaId::from(2001), 11..16)] + .into_iter() + .collect::>>(), + ) + .await?; + + // Assert the parachain finalized block height is also on par with the number of backed + // candidates. We can only do this for the collator based on cumulus. + assert_finality_lag(&relay_client, 6).await?; + + assert_approval_usages_medians(1, 0..12, &network).await?; + + Ok(()) +} + +async fn assert_approval_usages_medians( + session: SessionIndex, + validators_range: Range, + network: &Network, +) -> Result<(), anyhow::Error> { + let mut medians = vec![]; + + for idx in validators_range.clone() { + let validator_identifier = format!("validator-{idx}"); + let relay_node = network.get_node(validator_identifier.clone())?; + + let approvals_per_session = report_label_with_attributes( + "polkadot_parachain_rewards_statistics_collector_approvals_per_session", + vec![("session", session.to_string().as_str()), ("chain", "rococo_local_testnet")], + ); + + let total_approvals = relay_node.reports(approvals_per_session.clone()).await?; + + let mut metrics = vec![]; + for validator_idx in validators_range.clone() { + let approvals_per_session_per_validator = + report_label_with_attributes( + "polkadot_parachain_rewards_statistics_collector_approvals_per_session_per_validator", + vec![ + ("session", session.to_string().as_str()), + ("validator_idx", validator_idx.to_string().as_str()), + ("chain", "rococo_local_testnet"), + ], + ); + metrics.push(approvals_per_session_per_validator); + } + + let mut total_sum = 0; + for metric_per_validator in metrics { + let validator_approvals_usage = + relay_node.reports(metric_per_validator.clone()).await?; + total_sum += validator_approvals_usage as u32; + } + + assert_eq!(total_sum, total_approvals as u32); + medians.push(total_sum / validators_range.len() as u32); + } + + log::info!("Collected medians for session {session} {:?}", medians); + Ok(()) +} diff --git a/polkadot/zombienet-sdk-tests/tests/functional/rewards_statistics_mixed_validators.rs b/polkadot/zombienet-sdk-tests/tests/functional/rewards_statistics_mixed_validators.rs new file mode 100644 index 0000000000000..ef8ee095a2262 --- /dev/null +++ b/polkadot/zombienet-sdk-tests/tests/functional/rewards_statistics_mixed_validators.rs @@ -0,0 +1,238 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: Apache-2.0 + +// Test that nodes fetch availability chunks early for scheduled cores and normally for occupied +// core. + +use anyhow::anyhow; +use cumulus_zombienet_sdk_helpers::{ + assert_finality_lag, find_event_and_decode_fields, report_label_with_attributes, + wait_for_first_session_change, +}; +use pallet_revive::H256; +use polkadot_primitives::{CandidateReceiptV2, Id as ParaId, SessionIndex}; +use serde_json::json; +use std::{collections::HashMap, ops::Range}; +use subxt::{OnlineClient, PolkadotConfig}; +use zombienet_orchestrator::network::Network; +use zombienet_sdk::{LocalFileSystem, NetworkConfigBuilder}; + +#[tokio::test(flavor = "multi_thread")] +async fn rewards_statistics_mixed_validators_test() -> Result<(), anyhow::Error> { + let _ = env_logger::try_init_from_env( + env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"), + ); + + let images = zombienet_sdk::environment::get_images_from_env(); + + let config = NetworkConfigBuilder::new() + .with_relaychain(|r| { + let r = r + .with_chain("rococo-local") + .with_default_command("polkadot") + .with_default_image(images.polkadot.as_str()) + .with_default_args(vec![("-lparachain=debug").into()]) + .with_genesis_overrides(json!({ + "configuration": { + "config": { + "scheduler_params": { + "num_cores": 1, + "group_rotation_frequency": 4 + }, + } + } + })) + .with_node(|node| node.with_name("validator-0")); + + let r = (1..9) + .fold(r, |acc, i| acc.with_node(|node| node.with_name(&format!("validator-{i}")))); + + (9..12).fold(r, |acc, i| { + acc.with_node(|node| { + node.with_name(&format!("malus-{i}")) + .with_args(vec![ + "-lparachain=debug,MALUS=trace".into(), + "--no-hardware-benchmarks".into(), + "--insecure-validator-i-know-what-i-do".into(), + ]) + .with_command("malus") + .with_subcommand("dispute-ancestor") + .invulnerable(false) + }) + }) + }) + .with_parachain(|p| { + p.with_id(1000) + .with_default_command("adder-collator") + .with_default_image( + std::env::var("COL_IMAGE") + .unwrap_or("docker.io/paritypr/colander:latest".to_string()) + .as_str(), + ) + .cumulus_based(false) + .with_default_args(vec![("-lparachain=debug").into()]) + .with_collator(|n| n.with_name("adder-collator-1000")) + }) + .build() + .map_err(|e| { + let errs = e.into_iter().map(|e| e.to_string()).collect::>().join(" "); + anyhow!("config errs: {errs}") + })?; + + let spawn_fn = zombienet_sdk::environment::get_spawn_fn(); + let network = spawn_fn(config).await?; + + let relay_node = network.get_node("validator-0")?; + let relay_client: OnlineClient = relay_node.wait_client().await?; + + assert_para_throughput_for_included_parablocks( + &relay_client, + 20, + [(polkadot_primitives::Id::from(1000), (10..30, 8..14))].into_iter().collect(), + ) + .await?; + + // Assert the parachain finalized block height is also on par with the number of backed + // candidates. We can only do this for the collator based on cumulus. + assert_finality_lag(&relay_client, 6).await?; + + assert_approval_usages_medians( + 1, + [("validator", 0..9), ("malus", 9..12)].into_iter().collect(), + &network, + ) + .await?; + + Ok(()) +} + +pub async fn assert_para_throughput_for_included_parablocks( + relay_client: &OnlineClient, + stop_after: u32, + expected_candidate_ranges: HashMap, Range)>, +) -> Result<(), anyhow::Error> { + let mut blocks_sub = relay_client.blocks().subscribe_finalized().await?; + let mut candidate_backed_count: HashMap = HashMap::new(); + let mut candidate_included_count: HashMap = HashMap::new(); + let mut current_block_count = 0; + + let valid_para_ids: Vec = expected_candidate_ranges.keys().cloned().collect(); + + // Wait for the first session, block production on the parachain will start after that. + wait_for_first_session_change(&mut blocks_sub).await?; + + while let Some(block) = blocks_sub.next().await { + let block = block?; + log::debug!("Finalized relay chain block {}", block.number()); + let events = block.events().await?; + let is_session_change = events.iter().any(|event| { + event.as_ref().is_ok_and(|event| { + event.pallet_name() == "Session" && event.variant_name() == "NewSession" + }) + }); + + // Do not count blocks with session changes, no backed blocks there. + if is_session_change { + continue; + } + + current_block_count += 1; + + let receipts_for_backed = find_event_and_decode_fields::>( + &events, + "ParaInclusion", + "CandidateBacked", + )?; + + for receipt in receipts_for_backed { + let para_id = receipt.descriptor.para_id(); + log::debug!("Block backed for para_id {para_id}"); + if !valid_para_ids.contains(¶_id) { + return Err(anyhow!("Invalid ParaId detected: {}", para_id)); + }; + *(candidate_backed_count.entry(para_id).or_default()) += 1; + } + + let receipts_for_included = find_event_and_decode_fields::>( + &events, + "ParaInclusion", + "CandidateIncluded", + )?; + + for receipt in receipts_for_included { + let para_id = receipt.descriptor.para_id(); + log::debug!("Block included for para_id {para_id}"); + if !valid_para_ids.contains(¶_id) { + return Err(anyhow!("Invalid ParaId detected: {}", para_id)); + }; + *(candidate_included_count.entry(para_id).or_default()) += 1; + } + + if current_block_count == stop_after { + break; + } + } + + log::info!( + "Reached {stop_after} finalized relay chain blocks that contain backed/included candidates. The per-parachain distribution is: {:#?} {:#?}", + candidate_backed_count.iter().map(|(para_id, count)| format!("{para_id} has {count} backed candidates"),).collect::>(), + candidate_included_count.iter().map(|(para_id, count)| format!("{para_id} has {count} included candidates"),).collect::>() + ); + + for (para_id, expected_candidate_range) in expected_candidate_ranges { + let actual_backed = candidate_backed_count + .get(¶_id) + .ok_or_else(|| anyhow!("ParaId did not have any backed candidates"))?; + + let actual_included = candidate_included_count + .get(¶_id) + .ok_or_else(|| anyhow!("ParaId did not have any included candidates"))?; + + if !expected_candidate_range.0.contains(actual_backed) { + let range = expected_candidate_range.0; + return Err(anyhow!("Candidate Backed count {actual_backed} not within range {range:?}")) + } + + if !expected_candidate_range.1.contains(actual_included) { + let range = expected_candidate_range.1; + return Err(anyhow!( + "Candidate Included count {actual_included} not within range {range:?}" + )) + } + } + + Ok(()) +} + +async fn assert_approval_usages_medians( + session: SessionIndex, + validators_kind_and_range: Vec<(&str, Range)>, + network: &Network, +) -> Result<(), anyhow::Error> { + for (kind, validators_range) in validators_kind_and_range { + for idx in validators_range { + let validator_identifier = format!("{kind}-{idx}"); + let relay_node = network.get_node(validator_identifier)?; + + let approvals_per_session = report_label_with_attributes( + "polkadot_parachain_rewards_statistics_collector_approvals_per_session", + vec![("session", session.to_string().as_str()), ("chain", "rococo_local_testnet")], + ); + + let noshows_per_session = report_label_with_attributes( + "polkadot_parachain_rewards_statistics_collector_no_shows_per_session", + vec![("session", session.to_string().as_str()), ("chain", "rococo_local_testnet")], + ); + + let total_approvals = relay_node.reports(approvals_per_session).await?; + let total_noshows = relay_node.reports(noshows_per_session).await?; + + log::info!("Session {session}: {kind} #{idx} (Approvals: {total_approvals}, Noshows: {total_noshows}) "); + + assert!(total_approvals >= 9.0); + assert!(total_noshows >= 3.0); + } + } + + Ok(()) +} diff --git a/prdoc/pr_9687.prdoc b/prdoc/pr_9687.prdoc new file mode 100644 index 0000000000000..3d526f53390e5 --- /dev/null +++ b/prdoc/pr_9687.prdoc @@ -0,0 +1,49 @@ +# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0 +# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json + +title: rewards-statistics-collector subsystem + +doc: + - audience: Node Dev + description: | + Creates `Rewards Statistics Collector` subsytem. + + This subsystem monitors and collects metrics related to parachain candidate approvals, + including approval votes, distribution of approval chunks, chunk downloads, and chunk uploads. + + Its primary responsibility is to collect and track data reflecting node’s perspective + on the approval work carried out by all session validators. + + The PR modifies the `ErasureChunk` to include the session index to which the chunk belongs to + + The PR includes a new CLI option: `verbose_approval_metrics`, it enables or disable + publishing prometheus metrics per validator collected approvals metrics. If not specified, set to false, + the default behavior is to publish collected statistics metrics per session only. + +crates: + - name: polkadot-sdk + bump: minor + - name: polkadot-overseer + bump: major + - name: polkadot-node-primitives + bump: major + - name: polkadot-node-subsystem-types + bump: minor + - name: polkadot-node-network-protocol + bump: patch + - name: polkadot-service + bump: major + - name: polkadot-availability-distribution + bump: major + - name: polkadot-availability-recovery + bump: patch + - name: polkadot-node-core-approval-voting + bump: patch + - name: polkadot-node-core-av-store + bump: patch + - name: polkadot-node-core-rewards-statistics-collector + bump: patch + - name: cumulus-relay-chain-inprocess-interface + bump: patch + - name: polkadot-cli + bump: major diff --git a/umbrella/Cargo.toml b/umbrella/Cargo.toml index 074dd96fe9555..6aa96eabe6e68 100644 --- a/umbrella/Cargo.toml +++ b/umbrella/Cargo.toml @@ -916,6 +916,7 @@ node = [ "polkadot-node-core-pvf", "polkadot-node-core-pvf-checker", "polkadot-node-core-pvf-common", + "polkadot-node-core-rewards-statistics-collector", "polkadot-node-core-runtime-api", "polkadot-node-metrics", "polkadot-node-network-protocol", @@ -2479,6 +2480,11 @@ default-features = false optional = true path = "../polkadot/node/core/pvf/common" +[dependencies.polkadot-node-core-rewards-statistics-collector] +default-features = false +optional = true +path = "../polkadot/node/core/rewards-statistics-collector" + [dependencies.polkadot-node-core-runtime-api] default-features = false optional = true diff --git a/umbrella/src/lib.rs b/umbrella/src/lib.rs index ab74bef7f3a0b..083e47577066d 100644 --- a/umbrella/src/lib.rs +++ b/umbrella/src/lib.rs @@ -946,6 +946,11 @@ pub use polkadot_node_core_pvf_checker; #[cfg(feature = "polkadot-node-core-pvf-common")] pub use polkadot_node_core_pvf_common; +/// The Statistics Collector subsystem. Collects Approval Voting and Approvals Distributions +/// stats. +#[cfg(feature = "polkadot-node-core-rewards-statistics-collector")] +pub use polkadot_node_core_rewards_statistics_collector; + /// Wrapper around the parachain-related runtime APIs. #[cfg(feature = "polkadot-node-core-runtime-api")] pub use polkadot_node_core_runtime_api;