From d52e2dfce9d0ed7f5d9426ce350af82d5c62801d Mon Sep 17 00:00:00 2001 From: Alexandru Cihodaru Date: Thu, 22 Jan 2026 20:07:10 +0200 Subject: [PATCH 1/4] Add a new column to parachains_db for collators reputation Signed-off-by: Alexandru Cihodaru --- .../node/network/collator-protocol/src/lib.rs | 15 ++- .../src/validator_side_experimental/mod.rs | 12 +- polkadot/node/service/src/builder/mod.rs | 5 + polkadot/node/service/src/overseer.rs | 6 + .../node/service/src/parachains_db/mod.rs | 51 +++++--- .../node/service/src/parachains_db/upgrade.rs | 113 +++++++++++++++--- 6 files changed, 165 insertions(+), 37 deletions(-) diff --git a/polkadot/node/network/collator-protocol/src/lib.rs b/polkadot/node/network/collator-protocol/src/lib.rs index d8ed813c0a98a..beabfe93f6f4e 100644 --- a/polkadot/node/network/collator-protocol/src/lib.rs +++ b/polkadot/node/network/collator-protocol/src/lib.rs @@ -23,6 +23,7 @@ use std::{ collections::HashSet, + sync::Arc, time::{Duration, Instant}, }; @@ -31,16 +32,16 @@ use futures::{ FutureExt, TryFutureExt, }; -use polkadot_node_subsystem_util::reputation::ReputationAggregator; +use polkadot_node_subsystem_util::{database::Database, reputation::ReputationAggregator}; use sp_keystore::KeystorePtr; use polkadot_node_network_protocol::{ request_response::{v2 as protocol_v2, IncomingRequestReceiver}, PeerId, UnifiedReputationChange as Rep, }; -use polkadot_primitives::CollatorPair; - use polkadot_node_subsystem::{errors::SubsystemError, overseer, DummySubsystem, SpawnedSubsystem}; +use polkadot_primitives::CollatorPair; +pub use validator_side_experimental::ReputationConfig; mod collator_side; mod validator_side; @@ -91,6 +92,10 @@ pub enum ProtocolSide { keystore: KeystorePtr, /// Prometheus metrics for validators. metrics: validator_side_experimental::Metrics, + /// Database used for reputation house keeping. + db: Arc, + /// Reputation data column number. + reputation_col: u32, }, /// Collators operate on a parachain. Collator { @@ -148,8 +153,8 @@ impl CollatorProtocolSubsystem { .map_err(|e| SubsystemError::with_origin("collator-protocol", e)) .boxed() }, - ProtocolSide::ValidatorExperimental { keystore, metrics } => - validator_side_experimental::run(ctx, keystore, metrics) + ProtocolSide::ValidatorExperimental { keystore, metrics, db, reputation_col } => + validator_side_experimental::run(ctx, keystore, metrics, db, reputation_col) .map_err(|e| SubsystemError::with_origin("collator-protocol", e)) .boxed(), ProtocolSide::Collator { peer_id, collator_pair, request_receiver_v2, metrics } => diff --git a/polkadot/node/network/collator-protocol/src/validator_side_experimental/mod.rs b/polkadot/node/network/collator-protocol/src/validator_side_experimental/mod.rs index 83ff5330c7aac..228c489bbfee9 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side_experimental/mod.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side_experimental/mod.rs @@ -35,8 +35,9 @@ use polkadot_node_subsystem::{ messages::{CollatorProtocolMessage, NetworkBridgeEvent}, overseer, ActivatedLeaf, CollatorProtocolSenderTrait, FromOrchestra, OverseerSignal, }; +use polkadot_node_subsystem_util::database::Database; use sp_keystore::KeystorePtr; -use std::{future, future::Future, pin::Pin, time::Duration}; +use std::{future, future::Future, pin::Pin, sync::Arc, time::Duration}; use peer_manager::{Db, PeerManager}; @@ -44,12 +45,21 @@ use state::State; pub use crate::validator_side_metrics::Metrics; +/// Configuration for the reputation db. +#[derive(Debug, Clone, Copy)] +pub struct ReputationConfig { + /// The data column in the store to use for reputation data. + pub col_reputation_data: u32, +} + /// The main run loop. #[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)] pub(crate) async fn run( mut ctx: Context, keystore: KeystorePtr, metrics: Metrics, + _db: Arc, + _reputation_col: u32, ) -> FatalResult<()> { gum::info!(LOG_TARGET, "Running experimental collator protocol"); if let Some(state) = initialize(&mut ctx, keystore, metrics).await? { diff --git a/polkadot/node/service/src/builder/mod.rs b/polkadot/node/service/src/builder/mod.rs index e21e4acd65bf6..a6a8342e5d7cc 100644 --- a/polkadot/node/service/src/builder/mod.rs +++ b/polkadot/node/service/src/builder/mod.rs @@ -34,6 +34,7 @@ use frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE; use gum::info; use mmr_gadget::MmrGadget; use polkadot_availability_recovery::FETCH_CHUNKS_THRESHOLD; +use polkadot_collator_protocol::ReputationConfig; use polkadot_node_core_approval_voting::Config as ApprovalVotingConfig; use polkadot_node_core_av_store::Config as AvailabilityConfig; use polkadot_node_core_candidate_validation::Config as CandidateValidationConfig; @@ -424,6 +425,9 @@ where stagnant_check_interval: Default::default(), stagnant_check_mode: chain_selection_subsystem::StagnantCheckMode::PruneOnly, }; + let reputation_config = ReputationConfig { + col_reputation_data: parachains_db::REAL_COLUMNS.col_collator_reputation_data, + }; // Kusama + testnets get a higher threshold, we are conservative on Polkadot for now. let fetch_chunks_threshold = @@ -456,6 +460,7 @@ where invulnerable_ah_collators, collator_protocol_hold_off, experimental_collator_protocol, + reputation_config, }) }; diff --git a/polkadot/node/service/src/overseer.rs b/polkadot/node/service/src/overseer.rs index 4c8c943e8dd38..5e2e0842fd946 100644 --- a/polkadot/node/service/src/overseer.rs +++ b/polkadot/node/service/src/overseer.rs @@ -15,6 +15,7 @@ // along with Polkadot. If not, see . use super::{Error, IsParachainNode, Registry}; +use polkadot_collator_protocol::ReputationConfig; use polkadot_node_subsystem_types::{ChainApiBackend, RuntimeApiSubsystemClient}; use polkadot_overseer::{DummySubsystem, InitializedOverseerBuilder, SubsystemError}; use sp_core::traits::SpawnNamed; @@ -149,6 +150,8 @@ pub struct ExtendedOverseerGenArgs { pub collator_protocol_hold_off: Option, /// Use experimental collator protocol pub experimental_collator_protocol: bool, + /// Reputation DB config used by experimental collator protocol, + pub reputation_config: ReputationConfig, } /// Obtain a prepared validator `Overseer`, that is initialized with all default values. @@ -186,6 +189,7 @@ pub fn validator_overseer_builder( invulnerable_ah_collators, collator_protocol_hold_off, experimental_collator_protocol, + reputation_config, }: ExtendedOverseerGenArgs, ) -> Result< InitializedOverseerBuilder< @@ -307,6 +311,8 @@ where ProtocolSide::ValidatorExperimental { keystore: keystore.clone(), metrics: Metrics::register(registry)?, + db: parachains_db.clone(), + reputation_col: reputation_config.col_reputation_data, } } else { ProtocolSide::Validator { diff --git a/polkadot/node/service/src/parachains_db/mod.rs b/polkadot/node/service/src/parachains_db/mod.rs index 887db80a30348..9b69f3dba7c2d 100644 --- a/polkadot/node/service/src/parachains_db/mod.rs +++ b/polkadot/node/service/src/parachains_db/mod.rs @@ -47,15 +47,33 @@ pub(crate) mod columns { } pub mod v4 { + pub use super::v5::{NUM_COLUMNS, ORDERED_COL}; + } + + pub mod v5 { pub const NUM_COLUMNS: u32 = 5; + + pub const ORDERED_COL: &[u32] = &[ + super::v6::COL_AVAILABILITY_META, + super::v6::COL_CHAIN_SELECTION_DATA, + super::v6::COL_DISPUTE_COORDINATOR_DATA, + ]; + } + + pub mod v6 { + pub const NUM_COLUMNS: u32 = 6; pub const COL_AVAILABILITY_DATA: u32 = 0; pub const COL_AVAILABILITY_META: u32 = 1; pub const COL_APPROVAL_DATA: u32 = 2; pub const COL_CHAIN_SELECTION_DATA: u32 = 3; pub const COL_DISPUTE_COORDINATOR_DATA: u32 = 4; - - pub const ORDERED_COL: &[u32] = - &[COL_AVAILABILITY_META, COL_CHAIN_SELECTION_DATA, COL_DISPUTE_COORDINATOR_DATA]; + pub const COL_COLLATOR_REPUTATION_DATA: u32 = 5; + pub const ORDERED_COL: &[u32] = &[ + COL_AVAILABILITY_META, + COL_CHAIN_SELECTION_DATA, + COL_DISPUTE_COORDINATOR_DATA, + COL_COLLATOR_REPUTATION_DATA, + ]; } } @@ -73,16 +91,19 @@ pub struct ColumnsConfig { pub col_chain_selection_data: u32, /// The column used by dispute coordinator for data. pub col_dispute_coordinator_data: u32, + /// The column used to keep data about collators reputation. + pub col_collator_reputation_data: u32, } /// The real columns used by the parachains DB. #[cfg(any(test, feature = "full-node"))] pub const REAL_COLUMNS: ColumnsConfig = ColumnsConfig { - col_availability_data: columns::v4::COL_AVAILABILITY_DATA, - col_availability_meta: columns::v4::COL_AVAILABILITY_META, - col_approval_data: columns::v4::COL_APPROVAL_DATA, - col_chain_selection_data: columns::v4::COL_CHAIN_SELECTION_DATA, - col_dispute_coordinator_data: columns::v4::COL_DISPUTE_COORDINATOR_DATA, + col_availability_data: columns::v6::COL_AVAILABILITY_DATA, + col_availability_meta: columns::v6::COL_AVAILABILITY_META, + col_approval_data: columns::v6::COL_APPROVAL_DATA, + col_chain_selection_data: columns::v6::COL_CHAIN_SELECTION_DATA, + col_dispute_coordinator_data: columns::v6::COL_DISPUTE_COORDINATOR_DATA, + col_collator_reputation_data: columns::v6::COL_COLLATOR_REPUTATION_DATA, }; #[derive(PartialEq, Copy, Clone)] @@ -123,17 +144,17 @@ pub fn open_creating_rocksdb( let path = root.join("parachains").join("db"); - let mut db_config = DatabaseConfig::with_columns(columns::v4::NUM_COLUMNS); + let mut db_config = DatabaseConfig::with_columns(columns::v6::NUM_COLUMNS); let _ = db_config .memory_budget - .insert(columns::v4::COL_AVAILABILITY_DATA, cache_sizes.availability_data); + .insert(columns::v6::COL_AVAILABILITY_DATA, cache_sizes.availability_data); let _ = db_config .memory_budget - .insert(columns::v4::COL_AVAILABILITY_META, cache_sizes.availability_meta); + .insert(columns::v6::COL_AVAILABILITY_META, cache_sizes.availability_meta); let _ = db_config .memory_budget - .insert(columns::v4::COL_APPROVAL_DATA, cache_sizes.approval_data); + .insert(columns::v6::COL_APPROVAL_DATA, cache_sizes.approval_data); let path_str = path .to_str() @@ -144,7 +165,7 @@ pub fn open_creating_rocksdb( let db = Database::open(&db_config, &path_str)?; let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new( db, - columns::v4::ORDERED_COL, + columns::v6::ORDERED_COL, ); Ok(Arc::new(db)) @@ -164,12 +185,12 @@ pub fn open_creating_paritydb( std::fs::create_dir_all(&path_str)?; upgrade::try_upgrade_db(&path, DatabaseKind::ParityDB, upgrade::CURRENT_VERSION)?; - let db = parity_db::Db::open_or_create(&upgrade::paritydb_version_3_config(&path)) + let db = parity_db::Db::open_or_create(&upgrade::paritydb_version_6_config(&path)) .map_err(|err| io::Error::new(io::ErrorKind::Other, format!("{:?}", err)))?; let db = polkadot_node_subsystem_util::database::paritydb_impl::DbAdapter::new( db, - columns::v4::ORDERED_COL, + columns::v6::ORDERED_COL, ); Ok(Arc::new(db)) } diff --git a/polkadot/node/service/src/parachains_db/upgrade.rs b/polkadot/node/service/src/parachains_db/upgrade.rs index 52b010f0b5d0b..ad9d950bafe4a 100644 --- a/polkadot/node/service/src/parachains_db/upgrade.rs +++ b/polkadot/node/service/src/parachains_db/upgrade.rs @@ -40,7 +40,8 @@ const VERSION_FILE_NAME: &'static str = "parachain_db_version"; /// Version 4 changes approval db format for `OurAssignment`. /// Version 5 changes approval db format to hold some additional /// information about delayed approvals. -pub(crate) const CURRENT_VERSION: Version = 5; +/// Version 6 adds a new column for collator reputation data. +pub(crate) const CURRENT_VERSION: Version = 6; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -111,6 +112,7 @@ pub(crate) fn try_upgrade_db_to_next_version( // 3 -> 4 migration Some(3) => migrate_from_version_3_or_4_to_5(db_path, db_kind, v1_to_latest)?, Some(4) => migrate_from_version_3_or_4_to_5(db_path, db_kind, v2_to_latest)?, + Some(5) => migrate_from_version_5_to_6(db_path, db_kind)?, // Already at current version, do nothing. Some(CURRENT_VERSION) => CURRENT_VERSION, // This is an arbitrary future version, we don't handle it. @@ -228,7 +230,7 @@ where }; gum::info!(target: LOG_TARGET, "Migration complete! "); - Ok(CURRENT_VERSION) + Ok(5) } fn migrate_from_version_2_to_3(path: &Path, db_kind: DatabaseKind) -> Result { @@ -243,6 +245,18 @@ fn migrate_from_version_2_to_3(path: &Path, db_kind: DatabaseKind) -> Result Result { + gum::info!(target: LOG_TARGET, "Migrating parachains db from version 5 to version 6 ..."); + match db_kind { + DatabaseKind::ParityDB => parity_db_migrate_from_version_5_to_6(path), + DatabaseKind::RocksDB => rocksdb_migrate_from_version_5_to_6(path), + } + .and_then(|result| { + gum::info!(target: LOG_TARGET, "Migration complete! "); + Ok(result) + }) +} + /// Migration from version 0 to version 1: /// * the number of columns has changed from 3 to 5; fn rocksdb_migrate_from_version_0_to_1(path: &Path) -> Result { @@ -347,11 +361,24 @@ fn paritydb_fix_columns( Ok(()) } +fn rocksdb_migrate_from_version_5_to_6(path: &Path) -> Result { + use kvdb_rocksdb::{Database, DatabaseConfig}; + + let db_path = path + .to_str() + .ok_or_else(|| super::other_io_error("Invalid database path".into()))?; + let db_cfg = DatabaseConfig::with_columns(super::columns::v5::NUM_COLUMNS); + let mut db = Database::open(&db_cfg, db_path)?; + + db.add_column()?; + Ok(6) +} + /// Database configuration for version 1. pub(crate) fn paritydb_version_1_config(path: &Path) -> parity_db::Options { let mut options = parity_db::Options::with_columns(&path, super::columns::v1::NUM_COLUMNS as u8); - for i in columns::v4::ORDERED_COL { + for i in columns::v5::ORDERED_COL { options.columns[*i as usize].btree_index = true; } @@ -362,7 +389,7 @@ pub(crate) fn paritydb_version_1_config(path: &Path) -> parity_db::Options { pub(crate) fn paritydb_version_2_config(path: &Path) -> parity_db::Options { let mut options = parity_db::Options::with_columns(&path, super::columns::v2::NUM_COLUMNS as u8); - for i in columns::v4::ORDERED_COL { + for i in columns::v5::ORDERED_COL { options.columns[*i as usize].btree_index = true; } @@ -380,12 +407,21 @@ pub(crate) fn paritydb_version_3_config(path: &Path) -> parity_db::Options { options } +pub(crate) fn paritydb_version_6_config(path: &Path) -> parity_db::Options { + let mut options = + parity_db::Options::with_columns(&path, super::columns::v6::NUM_COLUMNS as u8); + for idx in columns::v6::ORDERED_COL { + options.columns[*idx as usize].btree_index = true; + } + options +} + /// Database configuration for version 0. This is useful just for testing. #[cfg(test)] pub(crate) fn paritydb_version_0_config(path: &Path) -> parity_db::Options { let mut options = parity_db::Options::with_columns(&path, super::columns::v0::NUM_COLUMNS as u8); - options.columns[super::columns::v4::COL_AVAILABILITY_META as usize].btree_index = true; + options.columns[super::columns::v6::COL_AVAILABILITY_META as usize].btree_index = true; options } @@ -400,7 +436,7 @@ fn paritydb_migrate_from_version_0_to_1(path: &Path) -> Result { paritydb_fix_columns( path, paritydb_version_1_config(path), - vec![super::columns::v4::COL_DISPUTE_COORDINATOR_DATA], + vec![super::columns::v6::COL_DISPUTE_COORDINATOR_DATA], )?; Ok(1) @@ -426,6 +462,17 @@ fn paritydb_migrate_from_version_2_to_3(path: &Path) -> Result { Ok(3) } +/// Migration from version 5 to version 6: +/// - add a new column for reputation +fn parity_db_migrate_from_version_5_to_6(path: &Path) -> Result { + let mut options = paritydb_version_3_config(path); + let mut column_config = parity_db::ColumnOptions::default(); + column_config.btree_index = true; + parity_db::Db::add_column(&mut options, column_config) + .map_err(|e| other_io_error(format!("Error adding a new column {:?}", e)))?; + Ok(6) +} + /// Remove the lock file. If file is locked, it will wait up to 1s. #[cfg(test)] pub fn remove_file_lock(path: &std::path::Path) { @@ -454,7 +501,7 @@ pub fn remove_file_lock(path: &std::path::Path) { #[cfg(test)] mod tests { use super::{ - columns::{v2::COL_SESSION_WINDOW_DATA, v4::*}, + columns::{v2::COL_SESSION_WINDOW_DATA, v6::*}, *, }; use kvdb_rocksdb::{Database, DatabaseConfig}; @@ -558,7 +605,7 @@ mod tests { // We need to properly set db version for upgrade to work. fs::write(version_file_path(db_dir.path()), "1").expect("Failed to write DB version"); { - let db = DbAdapter::new(db, columns::v4::ORDERED_COL); + let db = DbAdapter::new(db, columns::v5::ORDERED_COL); db.write(DBTransaction { ops: vec![DBOp::Insert { col: COL_DISPUTE_COORDINATOR_DATA, @@ -576,7 +623,7 @@ mod tests { assert_eq!(db.num_columns(), super::columns::v2::NUM_COLUMNS); - let db = DbAdapter::new(db, columns::v4::ORDERED_COL); + let db = DbAdapter::new(db, columns::v5::ORDERED_COL); assert_eq!( db.get(COL_DISPUTE_COORDINATOR_DATA, b"1234").unwrap(), @@ -623,9 +670,9 @@ mod tests { try_upgrade_db(&db_dir.path(), DatabaseKind::RocksDB, 5).unwrap(); - let db_cfg = DatabaseConfig::with_columns(super::columns::v4::NUM_COLUMNS); + let db_cfg = DatabaseConfig::with_columns(super::columns::v5::NUM_COLUMNS); let db = Database::open(&db_cfg, db_path).unwrap(); - let db = DbAdapter::new(db, columns::v4::ORDERED_COL); + let db = DbAdapter::new(db, columns::v5::ORDERED_COL); v1_to_latest_sanity_check(std::sync::Arc::new(db), approval_cfg, expected_candidates) .unwrap(); @@ -654,9 +701,9 @@ mod tests { try_upgrade_db(&db_dir.path(), DatabaseKind::RocksDB, 5).unwrap(); - let db_cfg = DatabaseConfig::with_columns(super::columns::v4::NUM_COLUMNS); + let db_cfg = DatabaseConfig::with_columns(super::columns::v5::NUM_COLUMNS); let db = Database::open(&db_cfg, db_path).unwrap(); - let db = DbAdapter::new(db, columns::v4::ORDERED_COL); + let db = DbAdapter::new(db, columns::v5::ORDERED_COL); v1_to_latest_sanity_check(std::sync::Arc::new(db), approval_cfg, expected_candidates) .unwrap(); @@ -672,10 +719,10 @@ mod tests { fs::write(version_file_path(db_dir.path()), "0").expect("Failed to write DB version"); try_upgrade_db(&db_dir.path(), DatabaseKind::RocksDB, 5).unwrap(); - let db_cfg = DatabaseConfig::with_columns(super::columns::v4::NUM_COLUMNS); + let db_cfg = DatabaseConfig::with_columns(super::columns::v5::NUM_COLUMNS); let db = Database::open(&db_cfg, db_path).unwrap(); - assert_eq!(db.num_columns(), columns::v4::NUM_COLUMNS); + assert_eq!(db.num_columns(), columns::v5::NUM_COLUMNS); } #[test] @@ -696,7 +743,7 @@ mod tests { try_upgrade_db(&path, DatabaseKind::ParityDB, 5).unwrap(); let db = Db::open(&paritydb_version_3_config(&path)).unwrap(); - assert_eq!(db.num_columns(), columns::v4::NUM_COLUMNS as u8); + assert_eq!(db.num_columns(), columns::v5::NUM_COLUMNS as u8); } #[test] @@ -754,4 +801,38 @@ mod tests { assert_eq!(db.num_columns(), super::columns::v3::NUM_COLUMNS); } + + #[test] + fn test_paritydb_migrate_5_to_6() { + use parity_db::Db; + + let db_dir = tempfile::tempdir().unwrap(); + fs::write(version_file_path(db_dir.path()), "5").expect("Failed to write DB version."); + { + let db = Db::open_or_create(&paritydb_version_3_config(&db_dir.path())).unwrap(); + assert_eq!(db.num_columns(), columns::v5::NUM_COLUMNS as u8); + } + try_upgrade_db(db_dir.path(), DatabaseKind::ParityDB, 6).unwrap(); + let db = Db::open(&paritydb_version_6_config(&db_dir.path())).unwrap(); + assert_eq!(db.num_columns(), columns::v6::NUM_COLUMNS as u8); + } + + #[test] + fn test_rocksdb_migrate_5_to_6() { + let db_dir = tempfile::tempdir().unwrap(); + let db_path = db_dir.path().to_str().unwrap(); + let db_cfg = DatabaseConfig::with_columns(super::columns::v5::NUM_COLUMNS); + + { + let db = Database::open(&db_cfg, db_path).unwrap(); + assert_eq!(db.num_columns(), super::columns::v5::NUM_COLUMNS); + } + fs::write(version_file_path(db_dir.path()), "5").expect("Failed to write DB version."); + try_upgrade_db(&db_dir.path(), DatabaseKind::RocksDB, 6).unwrap(); + + let db_cfg = DatabaseConfig::with_columns(super::columns::v6::NUM_COLUMNS); + let db = Database::open(&db_cfg, db_path).unwrap(); + + assert_eq!(db.num_columns(), super::columns::v6::NUM_COLUMNS); + } } From f8eaa63733c60d251a0a09352b2881a151cad6fa Mon Sep 17 00:00:00 2001 From: Alexandru Cihodaru Date: Tue, 27 Jan 2026 22:06:02 +0200 Subject: [PATCH 2/4] Add persistent disk storage for collator reputation database Implements disk persistence for the experimental collator protocol's reputation system to preserve peer scores across validator restarts. Key changes: - Add PersistentDb wrapper that persists in-memory Db to disk - Implement periodic persistence (10 min prod, 30 sec test mode) - Add immediate persistence for slashes and para pruning - Create serialization layer for ScoreEntry and reputation data - Add ReputationConfig for database column configuration Signed-off-by: Alexandru Cihodaru --- Cargo.lock | 2 + polkadot/Cargo.toml | 1 + polkadot/cli/Cargo.toml | 1 + .../node/network/collator-protocol/Cargo.toml | 3 + .../node/network/collator-protocol/src/lib.rs | 8 +- .../src/validator_side_experimental/common.rs | 12 +- .../src/validator_side_experimental/error.rs | 3 + .../src/validator_side_experimental/mod.rs | 84 +- .../peer_manager/db.rs | 44 +- .../peer_manager/mod.rs | 17 +- .../peer_manager/persistence.rs | 146 ++++ .../peer_manager/persistent_db.rs | 783 ++++++++++++++++++ .../src/validator_side_experimental/state.rs | 17 +- polkadot/node/service/Cargo.toml | 3 + polkadot/node/service/src/overseer.rs | 2 +- 15 files changed, 1099 insertions(+), 27 deletions(-) create mode 100644 polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/persistence.rs create mode 100644 polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/persistent_db.rs diff --git a/Cargo.lock b/Cargo.lock index 9ceeed7e6e9f3..254fe78b86f3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15153,6 +15153,7 @@ dependencies = [ "futures", "futures-timer", "itertools 0.11.0", + "kvdb-memorydb", "parity-scale-codec", "polkadot-node-network-protocol", "polkadot-node-primitives", @@ -17249,6 +17250,7 @@ dependencies = [ "parity-scale-codec", "polkadot-primitives", "rand 0.8.5", + "regex", "sc-executor", "sc-runtime-utilities", "serde", diff --git a/polkadot/Cargo.toml b/polkadot/Cargo.toml index e5f51feefaa0d..3c0797f29824f 100644 --- a/polkadot/Cargo.toml +++ b/polkadot/Cargo.toml @@ -97,6 +97,7 @@ substrate-build-script-utils = { workspace = true, default-features = true } runtime-benchmarks = ["polkadot-cli/runtime-benchmarks"] try-runtime = ["polkadot-cli/try-runtime"] fast-runtime = ["polkadot-cli/fast-runtime"] +test-persistence = ["polkadot-cli/test-persistence"] runtime-metrics = ["polkadot-cli/runtime-metrics"] pyroscope = ["polkadot-cli/pyroscope"] jemalloc-allocator = ["polkadot-jemalloc-shim/jemalloc-allocator"] diff --git a/polkadot/cli/Cargo.toml b/polkadot/cli/Cargo.toml index 98b8d5743f445..36fb2df1ef26d 100644 --- a/polkadot/cli/Cargo.toml +++ b/polkadot/cli/Cargo.toml @@ -71,6 +71,7 @@ try-runtime = [ "sp-runtime/try-runtime", ] fast-runtime = ["polkadot-service/fast-runtime"] +test-persistence = ["polkadot-service/test-persistence"] pyroscope = ["dep:pyroscope", "pyroscope_pprofrs"] # Configure the native runtimes to use. diff --git a/polkadot/node/network/collator-protocol/Cargo.toml b/polkadot/node/network/collator-protocol/Cargo.toml index cebc576871936..fae69fa826a09 100644 --- a/polkadot/node/network/collator-protocol/Cargo.toml +++ b/polkadot/node/network/collator-protocol/Cargo.toml @@ -31,9 +31,11 @@ polkadot-node-subsystem-util = { workspace = true, default-features = true } polkadot-primitives = { workspace = true, default-features = true } thiserror = { workspace = true } tokio-util = { workspace = true } +codec.workspace = true [dev-dependencies] assert_matches = { workspace = true } +kvdb-memorydb = { workspace = true } rstest = { workspace = true } sc-network-types = { workspace = true, default-features = true } sp-tracing = { workspace = true } @@ -51,3 +53,4 @@ polkadot-primitives-test-helpers = { workspace = true } [features] default = [] +test-persistence = [] diff --git a/polkadot/node/network/collator-protocol/src/lib.rs b/polkadot/node/network/collator-protocol/src/lib.rs index beabfe93f6f4e..8bc05f842e3fb 100644 --- a/polkadot/node/network/collator-protocol/src/lib.rs +++ b/polkadot/node/network/collator-protocol/src/lib.rs @@ -94,8 +94,8 @@ pub enum ProtocolSide { metrics: validator_side_experimental::Metrics, /// Database used for reputation house keeping. db: Arc, - /// Reputation data column number. - reputation_col: u32, + /// Reputation configuration (column number and persist interval). + reputation_config: validator_side_experimental::ReputationConfig, }, /// Collators operate on a parachain. Collator { @@ -153,8 +153,8 @@ impl CollatorProtocolSubsystem { .map_err(|e| SubsystemError::with_origin("collator-protocol", e)) .boxed() }, - ProtocolSide::ValidatorExperimental { keystore, metrics, db, reputation_col } => - validator_side_experimental::run(ctx, keystore, metrics, db, reputation_col) + ProtocolSide::ValidatorExperimental { keystore, metrics, db, reputation_config } => + validator_side_experimental::run(ctx, keystore, metrics, db, reputation_config) .map_err(|e| SubsystemError::with_origin("collator-protocol", e)) .boxed(), ProtocolSide::Collator { peer_id, collator_pair, request_receiver_v2, metrics } => diff --git a/polkadot/node/network/collator-protocol/src/validator_side_experimental/common.rs b/polkadot/node/network/collator-protocol/src/validator_side_experimental/common.rs index 8dcfaba2b5c1e..45b5d8053beb9 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side_experimental/common.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side_experimental/common.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +use codec::{Decode, Encode}; use polkadot_node_network_protocol::{ peer_set::CollationVersion, request_response::{outgoing::RequestError, v2 as request_v2}, @@ -83,8 +84,17 @@ pub const MAX_FETCH_DELAY: Duration = Duration::from_millis(300); /// advertised collations. pub const MIN_FETCH_TIMER_DELAY: Duration = Duration::from_millis(150); +/// How often to persist the reputation database to disk. +/// Using 10 minutes in production as a balance between data safety and disk I/O. +/// Using 30 seconds in test mode for faster test execution. +pub const REPUTATION_PERSIST_INTERVAL: Duration = if cfg!(feature = "test-persistence") { + Duration::from_secs(30) +} else { + Duration::from_secs(10 * 60) +}; + /// Reputation score type. -#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy, Default)] +#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy, Default, Encode, Decode)] pub struct Score(u16); impl Score { diff --git a/polkadot/node/network/collator-protocol/src/validator_side_experimental/error.rs b/polkadot/node/network/collator-protocol/src/validator_side_experimental/error.rs index f1a6015e2f14c..2dd6c2afa6bba 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side_experimental/error.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side_experimental/error.rs @@ -44,6 +44,9 @@ pub enum Error { #[fatal] #[error("Receiving message from overseer failed: {0}")] SubsystemReceive(#[source] SubsystemError), + #[fatal] + #[error("Failed to initialize reputation database: {0}")] + ReputationDbInit(String), #[error("Unable to retrieve block number for {0:?} from implicit view")] BlockNumberNotFoundInImplicitView(Hash), #[fatal(forward)] diff --git a/polkadot/node/network/collator-protocol/src/validator_side_experimental/mod.rs b/polkadot/node/network/collator-protocol/src/validator_side_experimental/mod.rs index 228c489bbfee9..d17b3148ae1cf 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side_experimental/mod.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side_experimental/mod.rs @@ -22,7 +22,13 @@ mod state; #[cfg(test)] mod tests; -use crate::{validator_side_experimental::common::MIN_FETCH_TIMER_DELAY, LOG_TARGET}; +use crate::{ + validator_side_experimental::{ + common::{MIN_FETCH_TIMER_DELAY, REPUTATION_PERSIST_INTERVAL}, + peer_manager::PersistentDb, + }, + LOG_TARGET, +}; use collation_manager::CollationManager; use common::{ProspectiveCandidate, MAX_STORED_SCORES_PER_PARA}; use error::{log_error, FatalError, FatalResult, Result}; @@ -39,7 +45,9 @@ use polkadot_node_subsystem_util::database::Database; use sp_keystore::KeystorePtr; use std::{future, future::Future, pin::Pin, sync::Arc, time::Duration}; -use peer_manager::{Db, PeerManager}; +#[cfg(test)] +use peer_manager::Db; +use peer_manager::PeerManager; use state::State; @@ -58,12 +66,16 @@ pub(crate) async fn run( mut ctx: Context, keystore: KeystorePtr, metrics: Metrics, - _db: Arc, - _reputation_col: u32, + db: Arc, + reputation_config: ReputationConfig, ) -> FatalResult<()> { - gum::info!(LOG_TARGET, "Running experimental collator protocol"); - if let Some(state) = initialize(&mut ctx, keystore, metrics).await? { - run_inner(ctx, state).await?; + gum::info!( + LOG_TARGET, + persist_interval_secs = REPUTATION_PERSIST_INTERVAL.as_secs(), + "Running experimental collator protocol" + ); + if let Some(state) = initialize(&mut ctx, keystore, metrics, db, reputation_config).await? { + run_inner(ctx, state, REPUTATION_PERSIST_INTERVAL).await?; } Ok(()) @@ -74,7 +86,9 @@ async fn initialize( ctx: &mut Context, keystore: KeystorePtr, metrics: Metrics, -) -> FatalResult>> { + db: Arc, + reputation_config: ReputationConfig, +) -> FatalResult>> { loop { let first_leaf = match wait_for_first_leaf(ctx).await? { Some(activated_leaf) => { @@ -94,7 +108,27 @@ async fn initialize( let scheduled_paras = collation_manager.assignments(); - let backend = Db::new(MAX_STORED_SCORES_PER_PARA).await; + // Create PersistentDb with disk persistence + let backend = match peer_manager::PersistentDb::new( + db.clone(), + reputation_config, + MAX_STORED_SCORES_PER_PARA, + ) + .await + { + Ok(backend) => backend, + Err(e) => { + gum::error!( + target: LOG_TARGET, + error = ?e, + "Failed to initialize persistent reputation DB" + ); + return Err(FatalError::ReputationDbInit(format!( + "PersistentDb init failed: {:?}", + e + ))) + }, + }; match PeerManager::startup(backend, ctx.sender(), scheduled_paras.into_iter().collect()) .await @@ -143,9 +177,21 @@ fn create_timer(maybe_delay: Option) -> Fuse Fuse + Send>>> { + let delay: Pin + Send>> = Box::pin(Delay::new(interval)); + delay.fuse() +} + #[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)] -async fn run_inner(mut ctx: Context, mut state: State) -> FatalResult<()> { +async fn run_inner( + mut ctx: Context, + mut state: State, + persist_interval: Duration, +) -> FatalResult<()> { let mut timer = create_timer(None); + let mut persistence_timer = create_persistence_timer(persist_interval); + loop { select! { // Calling `fuse()` here is useless, because the termination state of the resulting @@ -163,7 +209,11 @@ async fn run_inner(mut ctx: Context, mut state: State) -> FatalResu msg, ).await; } - Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) | Err(_) => break, + Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) | Err(_) => { + // Persist to disk before shutdown + state.persist_to_disk(); + break + }, Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(hash, number))) => { state.handle_finalized_block(ctx.sender(), hash, number).await?; }, @@ -177,6 +227,12 @@ async fn run_inner(mut ctx: Context, mut state: State) -> FatalResu // We don't need to do anything specific here. // If the timer expires, we only need to trigger the advertisement fetching logic. }, + _ = &mut persistence_timer => { + // Periodic persistence - write reputation DB to disk + state.persist_to_disk(); + // Reset the timer for the next interval + persistence_timer = create_persistence_timer(persist_interval); + }, } // Now try triggering advertisement fetching, if we have room in any of the active leaves @@ -196,7 +252,7 @@ async fn run_inner(mut ctx: Context, mut state: State) -> FatalResu /// The main message receiver switch. async fn process_msg( sender: &mut Sender, - state: &mut State, + state: &mut State, msg: CollatorProtocolMessage, ) { use CollatorProtocolMessage::*; @@ -249,7 +305,7 @@ async fn process_msg( /// Bridge event switch. async fn handle_network_msg( sender: &mut Sender, - state: &mut State, + state: &mut State, bridge_message: NetworkBridgeEvent, ) -> Result<()> { use NetworkBridgeEvent::*; @@ -297,7 +353,7 @@ async fn handle_network_msg( async fn process_incoming_peer_message( sender: &mut Sender, - state: &mut State, + state: &mut State, origin: PeerId, msg: CollationProtocols< protocol_v1::CollatorProtocolMessage, diff --git a/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/db.rs b/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/db.rs index 2b3339decd1de..f047bc17e12f8 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/db.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/db.rs @@ -43,12 +43,12 @@ impl Db { } } -type Timestamp = u128; +pub(crate) type Timestamp = u128; -#[derive(Clone, Debug)] -struct ScoreEntry { - score: Score, - last_bumped: Timestamp, +#[derive(Clone, Copy, Debug, codec::Encode, codec::Decode)] +pub(crate) struct ScoreEntry { + pub(crate) score: Score, + pub(crate) last_bumped: Timestamp, } #[async_trait] @@ -222,6 +222,40 @@ impl Db { } } + /// Get the last finalized block number (for persistence). + pub(crate) fn get_last_finalized(&self) -> Option { + self.last_finalized + } + + /// Set the last finalized block number (for loading from disk). + pub(crate) fn set_last_finalized(&mut self, last_finalized: Option) { + self.last_finalized = last_finalized; + } + + /// Get reputations for a specific para (for persistence). + pub(crate) fn get_para_reputations( + &self, + para_id: &ParaId, + ) -> Option> { + self.db.get(para_id).cloned() + } + + /// Set reputations for a specific para (for loading from disk). + pub(crate) fn set_para_reputations( + &mut self, + para_id: ParaId, + reputations: HashMap, + ) { + self.db.insert(para_id, reputations); + } + + /// Get all reputations (for persistence). + pub(crate) fn all_reputations( + &self, + ) -> impl Iterator)> { + self.db.iter() + } + #[cfg(test)] fn len(&self) -> usize { self.db.len() diff --git a/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/mod.rs b/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/mod.rs index a72f843c651fb..7f0b9ffcb2c9d 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/mod.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/mod.rs @@ -16,6 +16,8 @@ mod backend; mod connected; mod db; +mod persistence; +mod persistent_db; use futures::channel::oneshot; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; @@ -32,7 +34,9 @@ use crate::{ }; pub use backend::Backend; use connected::ConnectedPeers; +#[cfg(test)] pub use db::Db; +pub use persistent_db::PersistentDb; use polkadot_node_network_protocol::{peer_set::PeerSet, PeerId}; use polkadot_node_subsystem::{ messages::{ChainApiMessage, NetworkBridgeTxMessage}, @@ -69,7 +73,7 @@ enum DeclarationOutcome { } pub struct PeerManager { - db: B, + pub(crate) db: B, connected: ConnectedPeers, /// The `SessionIndex` of the last finalized block latest_finalized_session: Option, @@ -117,6 +121,17 @@ impl PeerManager { instance.db.process_bumps(latest_finalized_block_number, bumps, None).await; + if latest_finalized_block_number != processed_finalized_block_number { + gum::trace!( + target: LOG_TARGET, + blocks_processed = std::cmp::min( + latest_finalized_block_number.saturating_sub(processed_finalized_block_number), + MAX_STARTUP_ANCESTRY_LOOKBACK + ), + "Startup lookback completed" + ); + } + Ok(instance) } diff --git a/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/persistence.rs b/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/persistence.rs new file mode 100644 index 0000000000000..beaca0e122f65 --- /dev/null +++ b/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/persistence.rs @@ -0,0 +1,146 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Serialization types for disk persistence of collator reputation data. + +use codec::{Decode, Encode}; +use polkadot_node_network_protocol::PeerId; +use polkadot_primitives::{BlockNumber, Id as ParaId}; +use std::collections::HashMap; + +use super::db::ScoreEntry; + +/// Key prefix for per-para reputation data. +pub const REPUTATION_PARA_PREFIX: &[u8; 12] = b"Rep_per_para"; +/// Key for metadata. +pub const REPUTATION_META_KEY: &[u8; 8] = b"Rep_meta"; + +/// Serializable PeerId wrapper. +/// PeerId is a Multihash which can be converted to/from bytes. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct SerializablePeerId(pub PeerId); + +impl Encode for SerializablePeerId { + fn encode(&self) -> Vec { + self.0.to_bytes().encode() + } + + fn encode_to(&self, dest: &mut T) { + self.0.to_bytes().encode_to(dest) + } +} + +impl Decode for SerializablePeerId { + fn decode(input: &mut I) -> Result { + let bytes = Vec::::decode(input)?; + PeerId::from_bytes(&bytes) + .map(SerializablePeerId) + .map_err(|_| codec::Error::from("Invalid PeerId bytes")) + } +} + +/// Stored reputations for a single para. +/// This is the VALUE stored in the DB, keyed by ParaId. +#[derive(Debug, Clone, Encode, Decode, Default)] +pub struct StoredParaReputations { + /// Vec of (peer_id, score_entry) pairs. + pub entries: Vec<(SerializablePeerId, ScoreEntry)>, +} + +impl StoredParaReputations { + /// Convert from in-memory HashMap to storable format. + pub fn from_hashmap(map: &HashMap) -> Self { + let entries = map + .iter() + .map(|(peer_id, entry)| (SerializablePeerId(*peer_id), *entry)) + .collect(); + StoredParaReputations { entries } + } + + /// Convert to in-memory HashMap. + pub fn to_hashmap(&self) -> HashMap { + self.entries.iter().map(|(peer_id, entry)| (peer_id.0, *entry)).collect() + } +} + +/// Metadata stored separately from per-para data. +#[derive(Debug, Clone, Encode, Decode)] +pub struct StoredMetadata { + /// The last finalized block number that was processed. + pub last_finalized: Option, +} + +/// Generate key for a para's reputation data. +/// Key format: "Rep_per_para" (12 bytes) + ParaId (4 bytes, big-endian) +/// Using big-endian for lexicographic ordering when iterating. +pub fn para_reputation_key(para_id: ParaId) -> [u8; 16] { + let mut key = [0u8; 12 + 4]; + key[..12].copy_from_slice(REPUTATION_PARA_PREFIX); + // Use big-endian for lexicographic ordering + key[12..].copy_from_slice(&u32::from(para_id).to_be_bytes()); + key +} + +/// Returns the metadata key. +pub fn metadata_key() -> &'static [u8] { + REPUTATION_META_KEY +} + +/// Decode a para key to extract the ParaId. +/// Returns None if the key doesn't match the expected format. +pub fn decode_para_key(key: &[u8]) -> Option { + if key.len() != 16 || !key.starts_with(REPUTATION_PARA_PREFIX) { + return None + } + let mut bytes = [0u8; 4]; + bytes.copy_from_slice(&key[12..16]); + Some(ParaId::from(u32::from_be_bytes(bytes))) +} + +/// Errors during persistence operations. +#[derive(Debug, thiserror::Error)] +pub enum PersistenceError { + #[error("I/O error: {0}")] + Io(#[from] std::io::Error), + #[error("Codec error: {0}")] + Codec(#[from] codec::Error), +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::validator_side_experimental::peer_manager::Score; + + #[test] + fn stored_para_reputations_roundtrip() { + let mut map = HashMap::new(); + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); + + map.insert(peer1, ScoreEntry { score: Score::new(100).unwrap(), last_bumped: 1234567890 }); + map.insert(peer2, ScoreEntry { score: Score::new(50).unwrap(), last_bumped: 9876543210 }); + + let stored = StoredParaReputations::from_hashmap(&map); + let encoded = stored.encode(); + let decoded = StoredParaReputations::decode(&mut &encoded[..]).expect("decode should work"); + + let restored_map = decoded.to_hashmap(); + + assert_eq!(restored_map.len(), 2); + assert_eq!(restored_map.get(&peer1).unwrap().score, Score::new(100).unwrap()); + assert_eq!(restored_map.get(&peer2).unwrap().score, Score::new(50).unwrap()); + } +} diff --git a/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/persistent_db.rs b/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/persistent_db.rs new file mode 100644 index 0000000000000..c96091568a86c --- /dev/null +++ b/polkadot/node/network/collator-protocol/src/validator_side_experimental/peer_manager/persistent_db.rs @@ -0,0 +1,783 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Disk-backed reputation database for collator protocol. + +use async_trait::async_trait; +use codec::{Decode, Encode}; +use polkadot_node_network_protocol::PeerId; +use polkadot_node_subsystem_util::database::{DBTransaction, Database}; +use polkadot_primitives::{BlockNumber, Id as ParaId}; +use std::{ + collections::{BTreeMap, BTreeSet, HashMap}, + sync::Arc, +}; + +use crate::{ + validator_side_experimental::{ + common::Score, + peer_manager::{ + backend::Backend, + db::Db, + persistence::{ + decode_para_key, metadata_key, para_reputation_key, PersistenceError, + StoredMetadata, StoredParaReputations, REPUTATION_PARA_PREFIX, + }, + ReputationUpdate, + }, + ReputationConfig, + }, + LOG_TARGET, +}; + +/// Persistent database implementation for collator reputation. +/// +/// This wraps the in-memory `Db` and adds disk persistence capability. +/// +/// **Persistence Policy:** +/// - All operations (bumps, decays, queries) happen in-memory only +/// - Disk writes happen: +/// 1. On slash operations (immediate, for security) +/// 2. When `persist()` is called explicitly by the main loop (periodic timer) +/// 3. On paras pruning (immediate) +/// +/// The main loop is responsible for calling `persist()` periodically (currently, every 10 minutes). +pub struct PersistentDb { + /// In-memory database (does all the actual logic). + inner: Db, + /// Disk database handle. + disk_db: Arc, + /// Column configuration. + config: ReputationConfig, +} + +impl PersistentDb { + /// Create a new persistent DB, loading existing state from disk. + pub async fn new( + disk_db: Arc, + config: ReputationConfig, + stored_limit_per_para: u16, + ) -> Result { + // Create empty in-memory DB + let inner = Db::new(stored_limit_per_para).await; + + // Load data from disk into the in-memory DB + let mut instance = Self { inner, disk_db, config }; + let (para_count, total_entries) = instance.load_from_disk().await?; + + let last_finalized = instance.inner.processed_finalized_block_number().await; + + // Use info level for clear observability in tests and production + if para_count > 0 || last_finalized.is_some() { + gum::trace!( + target: LOG_TARGET, + ?last_finalized, + para_count, + total_peer_entries = total_entries, + "Loaded existing reputation DB from disk" + ); + } else { + gum::trace!( + target: LOG_TARGET, + "Reputation DB initialized fresh (no existing data on disk)" + ); + } + + Ok(instance) + } + + /// Load all data from disk into the in-memory DB. + /// Returns (para_count, total_entries) for logging purposes. + async fn load_from_disk(&mut self) -> Result<(usize, usize), PersistenceError> { + gum::trace!( + target: LOG_TARGET, + "Starting to load reputation data from disk" + ); + + // Load metadata + if let Some(meta) = self.load_metadata()? { + // We need to directly access the inner fields to restore state + // This requires making Db fields pub(super) or adding setter methods + self.inner.set_last_finalized(meta.last_finalized); + gum::debug!( + target: LOG_TARGET, + last_finalized = ?meta.last_finalized, + "Loaded reputation DB metadata from disk" + ); + } else { + gum::debug!( + target: LOG_TARGET, + "No existing reputation metadata found on disk (fresh start)" + ); + } + + // Load all para reputations + let iter = self + .disk_db + .iter_with_prefix(self.config.col_reputation_data, REPUTATION_PARA_PREFIX); + + let mut total_entries = 0; + let mut para_count = 0; + for result in iter { + let (key, value) = result.map_err(PersistenceError::Io)?; + if let Some(para_id) = decode_para_key(&key) { + let stored: StoredParaReputations = + Decode::decode(&mut &value[..]).map_err(PersistenceError::Codec)?; + let entries = stored.to_hashmap(); + let entry_count = entries.len(); + total_entries += entry_count; + para_count += 1; + gum::trace!( + target: LOG_TARGET, + ?para_id, + peer_count = entry_count, + "Loaded reputation entries for para from disk" + ); + self.inner.set_para_reputations(para_id, entries); + } + } + + gum::debug!( + target: LOG_TARGET, + total_peer_entries = total_entries, + para_count, + "Completed loading reputation data from disk" + ); + + Ok((para_count, total_entries)) + } + + /// Load metadata from disk. + fn load_metadata(&self) -> Result, PersistenceError> { + match self.disk_db.get(self.config.col_reputation_data, metadata_key())? { + None => Ok(None), + Some(raw) => + StoredMetadata::decode(&mut &raw[..]).map(Some).map_err(PersistenceError::Codec), + } + } + + /// Persist a single para's data to disk (called immediately after slash). + fn persist_para(&self, para_id: &ParaId) -> Result<(), PersistenceError> { + let mut tx = DBTransaction::new(); + let key = para_reputation_key(*para_id); + + if let Some(peer_scores) = self.inner.get_para_reputations(para_id) { + if peer_scores.is_empty() { + tx.delete(self.config.col_reputation_data, &key); + gum::trace!( + target: LOG_TARGET, + ?para_id, + "Deleted empty para reputation entry from disk" + ); + } else { + let stored = StoredParaReputations::from_hashmap(&peer_scores); + tx.put_vec(self.config.col_reputation_data, &key, stored.encode()); + gum::trace!( + target: LOG_TARGET, + ?para_id, + peers = peer_scores.len(), + "Persisted para reputation to disk" + ); + } + } else { + tx.delete(self.config.col_reputation_data, &key); + gum::trace!( + target: LOG_TARGET, + ?para_id, + "Deleted removed para reputation entry from disk" + ); + } + + self.disk_db.write(tx).map_err(PersistenceError::Io) + } + + /// Persist all in-memory data to disk. + /// + /// This should be called periodically by the main loop (currently, every 10 minutes). + /// It writes all reputation data and metadata in a single transaction. + pub fn persist(&self) -> Result<(), PersistenceError> { + let mut tx = DBTransaction::new(); + + // Write metadata + let meta = StoredMetadata { last_finalized: self.inner.get_last_finalized() }; + tx.put_vec(self.config.col_reputation_data, metadata_key(), meta.encode()); + + // Write all para data + let mut total_entries = 0; + let mut para_count = 0; + let all_reps: Vec<_> = self.inner.all_reputations().collect(); + + for (para_id, peer_scores) in all_reps { + let key = para_reputation_key(*para_id); + if peer_scores.is_empty() { + tx.delete(self.config.col_reputation_data, &key); + } else { + let stored = StoredParaReputations::from_hashmap(peer_scores); + tx.put_vec(self.config.col_reputation_data, &key, stored.encode()); + total_entries += peer_scores.len(); + para_count += 1; + } + } + + self.disk_db.write(tx).map_err(PersistenceError::Io)?; + + gum::debug!( + target: LOG_TARGET, + total_peer_entries = total_entries, + para_count, + last_finalized = ?meta.last_finalized, + "Periodic persistence completed: reputation DB written to disk" + ); + + Ok(()) + } +} + +#[async_trait] +impl Backend for PersistentDb { + async fn processed_finalized_block_number(&self) -> Option { + self.inner.processed_finalized_block_number().await + } + + async fn query(&self, peer_id: &PeerId, para_id: &ParaId) -> Option { + self.inner.query(peer_id, para_id).await + } + + async fn slash(&mut self, peer_id: &PeerId, para_id: &ParaId, value: Score) { + // Delegate to inner DB + self.inner.slash(peer_id, para_id, value).await; + + // Immediately persist to disk after slash (security-critical) + match self.persist_para(para_id) { + Ok(()) => { + gum::debug!( + target: LOG_TARGET, + ?para_id, + ?peer_id, + slash_value = ?value, + "Slash persisted to disk immediately" + ); + }, + Err(e) => { + gum::error!( + target: LOG_TARGET, + ?para_id, + ?peer_id, + error = ?e, + "CRITICAL: Failed to persist reputation after slash to disk. \ + Slash is recorded in-memory and will be persisted by periodic timer." + ); + }, + } + } + + async fn prune_paras(&mut self, registered_paras: BTreeSet) { + // Collect paras to prune before modifying state + let paras_to_prune: Vec = self + .inner + .all_reputations() + .filter(|(para_id, _)| !registered_paras.contains(para_id)) + .map(|(para_id, _)| *para_id) + .collect(); + gum::trace!(target: LOG_TARGET, ?paras_to_prune, "Alex: prune_paras"); + + let pruned_count = paras_to_prune.len(); + + // Prune from in-memory state + self.inner.prune_paras(registered_paras.clone()).await; + let paras_after = self.inner.all_reputations().count(); + + // Persist with explicit deletion of pruned paras + let mut tx = DBTransaction::new(); + + // Delete pruned paras from disk + for para_id in ¶s_to_prune { + let key = para_reputation_key(*para_id); + tx.delete(self.config.col_reputation_data, &key); + } + + // Write remaining paras and metadata + let meta = StoredMetadata { last_finalized: self.inner.get_last_finalized() }; + tx.put_vec(self.config.col_reputation_data, metadata_key(), meta.encode()); + + for (para_id, peer_scores) in self.inner.all_reputations() { + let key = para_reputation_key(*para_id); + if !peer_scores.is_empty() { + let stored = StoredParaReputations::from_hashmap(peer_scores); + tx.put_vec(self.config.col_reputation_data, &key, stored.encode()); + } + } + + match self.disk_db.write(tx).map_err(PersistenceError::Io) { + Ok(()) => { + gum::debug!( + target: LOG_TARGET, + pruned_para_count = pruned_count, + remaining_para_count = paras_after, + registered_para_count = registered_paras.len(), + "Prune paras persisted to disk immediately" + ); + }, + Err(e) => { + gum::error!( + target: LOG_TARGET, + error = ?e, + "Failed to persist reputation after pruning paras. \ + Pruned data is removed from memory and will be persisted by periodic timer." + ); + }, + } + } + + async fn process_bumps( + &mut self, + leaf_number: BlockNumber, + bumps: BTreeMap>, + decay_value: Option, + ) -> Vec { + // Delegate to inner DB - NO PERSISTENCE HERE + // Persistence happens via the periodic timer calling persist() + self.inner.process_bumps(leaf_number, bumps, decay_value).await + } + + async fn max_scores_for_paras(&self, paras: BTreeSet) -> HashMap { + self.inner.max_scores_for_paras(paras).await + } +} + +#[cfg(test)] +mod tests { + use super::*; + use polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter; + + const DATA_COL: u32 = 0; + const NUM_COLUMNS: u32 = 1; + + fn make_db() -> Arc { + let db = kvdb_memorydb::create(NUM_COLUMNS); + let db = DbAdapter::new(db, &[DATA_COL]); + Arc::new(db) + } + + fn make_config() -> ReputationConfig { + ReputationConfig { col_reputation_data: DATA_COL } + } + + #[tokio::test] + async fn load_from_empty_disk_fresh_start() { + // Test that PersistentDb can be created from an empty database (fresh start) + let disk_db = make_db(); + let config = make_config(); + + let db = PersistentDb::new(disk_db, config, 100).await.expect("should create db"); + + // Fresh start should have no finalized block + assert_eq!(db.processed_finalized_block_number().await, None); + } + + #[tokio::test] + async fn load_from_disk_with_existing_data() { + // Test that PersistentDb correctly loads existing data from disk + let disk_db = make_db(); + let config = make_config(); + + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); + let para_id_100 = ParaId::from(100); + let para_id_200 = ParaId::from(200); + + // First, create a DB, add some data, and persist it + { + let mut db = + PersistentDb::new(disk_db.clone(), config, 100).await.expect("should create db"); + + // Process some bumps to add reputation data + let bumps = [ + (para_id_100, [(peer1, Score::new(50).unwrap())].into_iter().collect()), + (para_id_200, [(peer2, Score::new(75).unwrap())].into_iter().collect()), + ] + .into_iter() + .collect(); + + db.process_bumps(10, bumps, None).await; + + // Persist to disk + db.persist().expect("should persist"); + } + + // Now create a new DB instance and verify data was loaded + { + let db = PersistentDb::new(disk_db, config, 100).await.expect("should create db"); + + // Verify data was loaded correctly + assert_eq!(db.processed_finalized_block_number().await, Some(10)); + assert_eq!(db.query(&peer1, ¶_id_100).await, Some(Score::new(50).unwrap())); + assert_eq!(db.query(&peer2, ¶_id_200).await, Some(Score::new(75).unwrap())); + // Non-existent queries should return None + assert_eq!(db.query(&peer1, ¶_id_200).await, None); + assert_eq!(db.query(&peer2, ¶_id_100).await, None); + } + } + + #[tokio::test] + async fn slash_persists_immediately() { + // Test that slash operations persist to disk immediately + let disk_db = make_db(); + let config = make_config(); + + let peer = PeerId::random(); + let para_id = ParaId::from(100); + + // Create DB and add some reputation + { + let mut db = + PersistentDb::new(disk_db.clone(), config, 100).await.expect("should create db"); + + let bumps = [(para_id, [(peer, Score::new(100).unwrap())].into_iter().collect())] + .into_iter() + .collect(); + db.process_bumps(10, bumps, None).await; + + // Persist initial state + db.persist().expect("should persist"); + + // Now slash - this should persist immediately + db.slash(&peer, ¶_id, Score::new(30).unwrap()).await; + } + + // Create new DB instance and verify slash was persisted + { + let db = PersistentDb::new(disk_db, config, 100).await.expect("should create db"); + + // Score should be 100 - 30 = 70 + assert_eq!(db.query(&peer, ¶_id).await, Some(Score::new(70).unwrap())); + } + } + + #[tokio::test] + async fn slash_that_removes_entry_persists_immediately() { + // Test that a slash that reduces score to zero (removing entry) persists immediately + let disk_db = make_db(); + let config = make_config(); + + let peer = PeerId::random(); + let para_id = ParaId::from(100); + + // Create DB and add some reputation + { + let mut db = + PersistentDb::new(disk_db.clone(), config, 100).await.expect("should create db"); + + let bumps = [(para_id, [(peer, Score::new(50).unwrap())].into_iter().collect())] + .into_iter() + .collect(); + db.process_bumps(10, bumps, None).await; + db.persist().expect("should persist"); + + // Slash more than the current score - should remove entry + db.slash(&peer, ¶_id, Score::new(100).unwrap()).await; + } + + // Create new DB instance and verify entry was removed + { + let db = PersistentDb::new(disk_db, config, 100).await.expect("should create db"); + + // Entry should be gone + assert_eq!(db.query(&peer, ¶_id).await, None); + } + } + + #[tokio::test] + async fn prune_paras_persists_immediately() { + // Test that prune_paras persists immediately + let disk_db = make_db(); + let config = make_config(); + + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); + let para_id_100 = ParaId::from(100); + let para_id_200 = ParaId::from(200); + let para_id_300 = ParaId::from(300); + + // Create DB and add reputation for multiple paras + { + let mut db = + PersistentDb::new(disk_db.clone(), config, 100).await.expect("should create db"); + + let bumps = [ + (para_id_100, [(peer1, Score::new(50).unwrap())].into_iter().collect()), + (para_id_200, [(peer2, Score::new(75).unwrap())].into_iter().collect()), + (para_id_300, [(peer1, Score::new(25).unwrap())].into_iter().collect()), + ] + .into_iter() + .collect(); + db.process_bumps(10, bumps, None).await; + db.persist().expect("should persist"); + + // Prune - only keep para 200 registered + let registered_paras = [para_id_200].into_iter().collect(); + db.prune_paras(registered_paras).await; + } + + // Create new DB instance and verify pruning was persisted + { + let db = PersistentDb::new(disk_db, config, 100).await.expect("should create db"); + + // Only para 200 should remain + assert_eq!(db.query(&peer1, ¶_id_100).await, None); + assert_eq!(db.query(&peer2, ¶_id_200).await, Some(Score::new(75).unwrap())); + assert_eq!(db.query(&peer1, ¶_id_300).await, None); + } + } + + #[tokio::test] + async fn periodic_persist_writes_all_data() { + // Test that persist() correctly writes all in-memory data + let disk_db = make_db(); + let config = make_config(); + + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); + let para_id_100 = ParaId::from(100); + let para_id_200 = ParaId::from(200); + + // Create DB, add data, but DON'T persist yet + { + let mut db = + PersistentDb::new(disk_db.clone(), config, 100).await.expect("should create db"); + + // Add reputation via bumps (these don't trigger immediate persistence) + let bumps = [ + (para_id_100, [(peer1, Score::new(50).unwrap())].into_iter().collect()), + (para_id_200, [(peer2, Score::new(75).unwrap())].into_iter().collect()), + ] + .into_iter() + .collect(); + db.process_bumps(15, bumps, None).await; + + // Now call periodic persist + db.persist().expect("should persist"); + } + + // Reload and verify + { + let db = PersistentDb::new(disk_db, config, 100).await.expect("should create db"); + + assert_eq!(db.processed_finalized_block_number().await, Some(15)); + assert_eq!(db.query(&peer1, ¶_id_100).await, Some(Score::new(50).unwrap())); + assert_eq!(db.query(&peer2, ¶_id_200).await, Some(Score::new(75).unwrap())); + } + } + + #[tokio::test] + async fn data_survives_simulated_restart() { + // Test full restart scenario: create, populate, persist, drop, reload + let disk_db = make_db(); + let config = make_config(); + + let peer1 = PeerId::random(); + let peer2 = PeerId::random(); + let peer3 = PeerId::random(); + let para_id_100 = ParaId::from(100); + let para_id_200 = ParaId::from(200); + + // Session 1: Create and populate + { + let mut db = + PersistentDb::new(disk_db.clone(), config, 100).await.expect("should create db"); + + let bumps = [ + ( + para_id_100, + [(peer1, Score::new(100).unwrap()), (peer2, Score::new(50).unwrap())] + .into_iter() + .collect(), + ), + (para_id_200, [(peer3, Score::new(200).unwrap())].into_iter().collect()), + ] + .into_iter() + .collect(); + db.process_bumps(20, bumps, None).await; + + // Slash peer2 + db.slash(&peer2, ¶_id_100, Score::new(25).unwrap()).await; + + // Final persist before "shutdown" + db.persist().expect("should persist"); + } + + // Session 2: "Restart" - create new instance + { + let mut db = + PersistentDb::new(disk_db.clone(), config, 100).await.expect("should create db"); + + // Verify all data survived + assert_eq!(db.processed_finalized_block_number().await, Some(20)); + assert_eq!(db.query(&peer1, ¶_id_100).await, Some(Score::new(100).unwrap())); + assert_eq!(db.query(&peer2, ¶_id_100).await, Some(Score::new(25).unwrap())); + assert_eq!(db.query(&peer3, ¶_id_200).await, Some(Score::new(200).unwrap())); + + // Continue with more operations + let bumps = [(para_id_100, [(peer1, Score::new(50).unwrap())].into_iter().collect())] + .into_iter() + .collect(); + db.process_bumps(25, bumps, None).await; + db.persist().expect("should persist"); + } + + // Session 3: Verify continued state + { + let db = PersistentDb::new(disk_db, config, 100).await.expect("should create db"); + + assert_eq!(db.processed_finalized_block_number().await, Some(25)); + // peer1 should now have 100 + 50 = 150 + assert_eq!(db.query(&peer1, ¶_id_100).await, Some(Score::new(150).unwrap())); + } + } + + #[tokio::test] + async fn roundtrip_serialization_correctness() { + // Test that data roundtrips correctly through serialization + let disk_db = make_db(); + let config = make_config(); + + // Create peers with specific scores to verify exact values + let peers: Vec<_> = (0..10).map(|_| PeerId::random()).collect(); + let para_id = ParaId::from(42); + + let original_scores: HashMap = peers + .iter() + .enumerate() + .map(|(i, peer)| (*peer, Score::new((i as u16 + 1) * 100).unwrap())) + .collect(); + + // Store data + { + let mut db = + PersistentDb::new(disk_db.clone(), config, 100).await.expect("should create db"); + + let bumps = + [(para_id, original_scores.iter().map(|(peer, score)| (*peer, *score)).collect())] + .into_iter() + .collect(); + db.process_bumps(100, bumps, None).await; + db.persist().expect("should persist"); + } + + // Reload and verify exact values + { + let db = PersistentDb::new(disk_db, config, 100).await.expect("should create db"); + + for (peer, expected_score) in &original_scores { + let actual_score = db.query(peer, ¶_id).await; + assert_eq!( + actual_score, + Some(*expected_score), + "Score mismatch for peer after roundtrip" + ); + } + } + } + + #[tokio::test] + async fn bumps_without_persist_not_saved() { + // Test that bumps without explicit persist are NOT saved to disk + // (they only persist via periodic timer or slash) + let disk_db = make_db(); + let config = make_config(); + + let peer = PeerId::random(); + let para_id = ParaId::from(100); + + // Create DB and add bumps, but DON'T persist + { + let mut db = + PersistentDb::new(disk_db.clone(), config, 100).await.expect("should create db"); + + let bumps = [(para_id, [(peer, Score::new(100).unwrap())].into_iter().collect())] + .into_iter() + .collect(); + db.process_bumps(10, bumps, None).await; + + // Verify in-memory state + assert_eq!(db.query(&peer, ¶_id).await, Some(Score::new(100).unwrap())); + + // Don't call persist - just drop + } + + // Create new instance - data should NOT be there + { + let db = PersistentDb::new(disk_db, config, 100).await.expect("should create db"); + + // Data was never persisted + assert_eq!(db.query(&peer, ¶_id).await, None); + assert_eq!(db.processed_finalized_block_number().await, None); + } + } + + #[tokio::test] + async fn multiple_paras_multiple_peers() { + // Test handling of multiple paras with multiple peers each + let disk_db = make_db(); + let config = make_config(); + + let peers: Vec<_> = (0..5).map(|_| PeerId::random()).collect(); + let paras: Vec<_> = (100..105).map(ParaId::from).collect(); + + // Create complex state + { + let mut db = + PersistentDb::new(disk_db.clone(), config, 100).await.expect("should create db"); + + let bumps: BTreeMap> = paras + .iter() + .enumerate() + .map(|(para_idx, para_id)| { + let peer_scores: HashMap = peers + .iter() + .enumerate() + .map(|(peer_idx, peer)| { + let score = ((para_idx + 1) * 10 + peer_idx) as u16; + (*peer, Score::new(score).unwrap()) + }) + .collect(); + (*para_id, peer_scores) + }) + .collect(); + + db.process_bumps(50, bumps, None).await; + db.persist().expect("should persist"); + } + + // Verify all data + { + let db = PersistentDb::new(disk_db, config, 100).await.expect("should create db"); + + for (para_idx, para_id) in paras.iter().enumerate() { + for (peer_idx, peer) in peers.iter().enumerate() { + let expected_score = ((para_idx + 1) * 10 + peer_idx) as u16; + assert_eq!( + db.query(peer, para_id).await, + Some(Score::new(expected_score).unwrap()), + "Mismatch for para {} peer {}", + para_idx, + peer_idx + ); + } + } + } + } +} diff --git a/polkadot/node/network/collator-protocol/src/validator_side_experimental/state.rs b/polkadot/node/network/collator-protocol/src/validator_side_experimental/state.rs index 30afeab742576..4aed1232113fd 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side_experimental/state.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side_experimental/state.rs @@ -23,7 +23,7 @@ use crate::{ ProspectiveCandidate, TryAcceptOutcome, INVALID_COLLATION_SLASH, }, error::{Error, FatalResult}, - peer_manager::Backend, + peer_manager::{Backend, PersistentDb}, Metrics, PeerManager, }, LOG_TARGET, @@ -618,3 +618,18 @@ impl State { self.collation_manager.advertisements() } } + +// Specific implementation for PersistentDb to support disk persistence. +impl State { + /// Persist the reputation database to disk. + /// Called periodically by the main loop timer. + pub fn persist_to_disk(&self) { + if let Err(e) = self.peer_manager.db.persist() { + gum::error!( + target: LOG_TARGET, + error = ?e, + "Failed to persist reputation DB to disk" + ); + } + } +} diff --git a/polkadot/node/service/Cargo.toml b/polkadot/node/service/Cargo.toml index 50bb0dc698669..303645ccb0883 100644 --- a/polkadot/node/service/Cargo.toml +++ b/polkadot/node/service/Cargo.toml @@ -223,6 +223,9 @@ fast-runtime = [ "rococo-runtime?/fast-runtime", "westend-runtime?/fast-runtime", ] +test-persistence = [ + "polkadot-collator-protocol?/test-persistence", +] malus = ["full-node"] runtime-metrics = [ diff --git a/polkadot/node/service/src/overseer.rs b/polkadot/node/service/src/overseer.rs index 5e2e0842fd946..bbfbd26a504b6 100644 --- a/polkadot/node/service/src/overseer.rs +++ b/polkadot/node/service/src/overseer.rs @@ -312,7 +312,7 @@ where keystore: keystore.clone(), metrics: Metrics::register(registry)?, db: parachains_db.clone(), - reputation_col: reputation_config.col_reputation_data, + reputation_config, } } else { ProtocolSide::Validator { From 299194284b3f70412387e98b1b13e049111aaebb Mon Sep 17 00:00:00 2001 From: Alexandru Cihodaru Date: Tue, 27 Jan 2026 22:14:01 +0200 Subject: [PATCH 3/4] Update undying collator to send valid PeerId in ApprovedPeer UMP signal Signed-off-by: Alexandru Cihodaru --- polkadot/parachain/test-parachains/undying/src/lib.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/polkadot/parachain/test-parachains/undying/src/lib.rs b/polkadot/parachain/test-parachains/undying/src/lib.rs index ac27f58f38d8e..cd956c96a4614 100644 --- a/polkadot/parachain/test-parachains/undying/src/lib.rs +++ b/polkadot/parachain/test-parachains/undying/src/lib.rs @@ -166,8 +166,12 @@ pub fn execute( ); if block_data.experimental_send_approved_peer { + // Create a valid PeerId in multihash format: [hash_code, digest_size, ...digest_bytes] + // Using multihash code 0x0 (identity hash) with 32 bytes of data + let mut peer_id_bytes = alloc::vec![0x0, 32]; // hash code 0x0, size 32 + peer_id_bytes.extend_from_slice(&[1u8; 32]); // 32 bytes of data upward_messages - .force_push(UMPSignal::ApprovedPeer(alloc::vec![1, 2, 3].try_into().unwrap()).encode()); + .force_push(UMPSignal::ApprovedPeer(peer_id_bytes.try_into().unwrap()).encode()); } // We need to clone the block data as the fn will mutate it's state. From d1ae2f5968f5e10a876e21ac724196e479780b5e Mon Sep 17 00:00:00 2001 From: Alexandru Cihodaru Date: Tue, 27 Jan 2026 22:28:26 +0200 Subject: [PATCH 4/4] Add zombienet tests for collator reputation persistence Tests added: - basic_persistence.rs: Validates that reputation data persists across validator restarts and that the startup lookback mechanism correctly processes missed blocks during downtime. Tests both large gaps (20+ blocks) and small gaps (<20 blocks). - pruning.rs: Verifies that reputation data is correctly pruned when parachains are deregistered. Tests para cleanup via sudo, session boundary detection, immediate persistence of pruned state, and correct loading of only remaining paras after restart. Signed-off-by: Alexandru Cihodaru --- polkadot/zombienet-sdk-tests/Cargo.toml | 1 + .../tests/functional/mod.rs | 1 + .../basic_persistence.rs | 359 ++++++++++++++++++ .../functional/reputation_persistence/mod.rs | 5 + .../reputation_persistence/pruning.rs | 266 +++++++++++++ 5 files changed, 632 insertions(+) create mode 100644 polkadot/zombienet-sdk-tests/tests/functional/reputation_persistence/basic_persistence.rs create mode 100644 polkadot/zombienet-sdk-tests/tests/functional/reputation_persistence/mod.rs create mode 100644 polkadot/zombienet-sdk-tests/tests/functional/reputation_persistence/pruning.rs diff --git a/polkadot/zombienet-sdk-tests/Cargo.toml b/polkadot/zombienet-sdk-tests/Cargo.toml index d97326b0aaf01..9b78898b1fb46 100644 --- a/polkadot/zombienet-sdk-tests/Cargo.toml +++ b/polkadot/zombienet-sdk-tests/Cargo.toml @@ -19,6 +19,7 @@ log = { workspace = true } pallet-revive = { workspace = true, features = ["std"] } polkadot-primitives = { workspace = true, default-features = true } rand = { workspace = true } +regex = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } sp-core = { workspace = true } diff --git a/polkadot/zombienet-sdk-tests/tests/functional/mod.rs b/polkadot/zombienet-sdk-tests/tests/functional/mod.rs index e28cfb4039303..b987ef712855d 100644 --- a/polkadot/zombienet-sdk-tests/tests/functional/mod.rs +++ b/polkadot/zombienet-sdk-tests/tests/functional/mod.rs @@ -6,6 +6,7 @@ mod approved_peer_mixed_validators; mod async_backing_6_seconds_rate; mod dispute_old_finalized; mod duplicate_collations; +mod reputation_persistence; mod shared_core_idle_parachain; mod spam_statement_distribution_requests; mod sync_backing; diff --git a/polkadot/zombienet-sdk-tests/tests/functional/reputation_persistence/basic_persistence.rs b/polkadot/zombienet-sdk-tests/tests/functional/reputation_persistence/basic_persistence.rs new file mode 100644 index 0000000000000..8828866f763a2 --- /dev/null +++ b/polkadot/zombienet-sdk-tests/tests/functional/reputation_persistence/basic_persistence.rs @@ -0,0 +1,359 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: Apache-2.0 + +//! Test: Basic Persistence on Graceful Shutdown with Startup Lookback Verification +//! +//! This test verifies that collator reputation data is correctly persisted to disk +//! during normal operation, survives a graceful validator restart, and that the +//! startup lookback mechanism correctly catches up on reputation bumps from blocks +//! finalized between persistence and restart. +//! +//! ## Test Scenario - Phase 1: Large Gap (>= 20 blocks) +//! +//! 1. Spawn a network with 4 validators and 1 parachain (using experimental collator protocol) +//! 2. Wait for parachain blocks to be produced (establishing reputation through backed candidates) +//! 3. Wait for periodic persistence (using short interval for testing) +//! 4. Record the finalized block number at persistence time +//! 5. **Pause validator-0** (so it misses blocks being finalized) +//! 6. Wait for 22+ finalized blocks while validator-0 is paused (creating a real gap) +//! 7. Restart validator-0 (triggers full startup sequence) +//! 8. Verify validator loads existing reputation from disk on restart +//! 9. Verify the startup lookback processes blocks it missed while paused +//! 10. Verify validator continues normal operation +//! +//! ## Test Scenario - Phase 2: Small Gap (< 20 blocks) +//! +//! 11. Pause validator-0 again +//! 12. Wait for ~10 finalized blocks (smaller gap) +//! 13. Restart validator-0 again +//! 14. Verify the startup lookback processes the entire gap (not limited by MAX_STARTUP_ANCESTRY_LOOKBACK) +//! 15. Verify processed block count matches the actual gap size +//! +//! ## Success Criteria +//! +//! - Validator logs show "Loaded existing reputation DB from disk" on both restarts +//! - First lookback processes at least 20 blocks (large gap) +//! - Second lookback processes exactly ~10 blocks (entire small gap) +//! - Validator resumes backing candidates after both restarts +//! - No errors about missing or corrupted data + +use anyhow::anyhow; +use regex::Regex; +use tokio::time::Duration; + +use cumulus_zombienet_sdk_helpers::assert_para_throughput; +use polkadot_primitives::Id as ParaId; +use serde_json::json; +use zombienet_orchestrator::network::node::LogLineCountOptions; +use zombienet_sdk::{ + subxt::{OnlineClient, PolkadotConfig}, + NetworkConfigBuilder, +}; + +const PARA_ID: u32 = 2000; + +#[tokio::test(flavor = "multi_thread")] +async fn basic_persistence_test() -> Result<(), anyhow::Error> { + let _ = env_logger::try_init_from_env( + env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"), + ); + + let images = zombienet_sdk::environment::get_images_from_env(); + + let config = NetworkConfigBuilder::new() + .with_relaychain(|r| { + let r = r + .with_chain("rococo-local") + .with_default_command("polkadot") + .with_default_image(images.polkadot.as_str()) + .with_default_args(vec![ + ("-lparachain=debug,parachain::collator-protocol=trace").into(), + ("--experimental-collator-protocol").into(), + ]) + .with_genesis_overrides(json!({ + "configuration": { + "config": { + "scheduler_params": { + "group_rotation_frequency": 4, + "num_cores": 1 + } + } + } + })) + .with_node(|node| node.with_name("validator-0")); + + (1..4) + .fold(r, |acc, i| acc.with_node(|node| node.with_name(&format!("validator-{i}")))) + }) + .with_parachain(|p| { + p.with_id(PARA_ID) + .with_default_command("undying-collator") + .cumulus_based(false) + .with_default_image( + std::env::var("COL_IMAGE") + .unwrap_or("docker.io/paritypr/colander:latest".to_string()) + .as_str(), + ) + .with_default_args(vec![ + ("-lparachain=debug").into(), + ("--experimental-send-approved-peer").into(), + ]) + .with_collator(|n| n.with_name("collator")) + }) + .build() + .map_err(|e| { + let errs = e.into_iter().map(|e| e.to_string()).collect::>().join(" "); + anyhow!("config errs: {errs}") + })?; + + let spawn_fn = zombienet_sdk::environment::get_spawn_fn(); + let network = spawn_fn(config).await?; + + let validator_0 = network.get_node("validator-0")?; + let validator0_client: OnlineClient = validator_0.wait_client().await?; + + // Verify validator-0 shows fresh start initially (no existing data) + let fresh_start_result = validator_0 + .wait_log_line_count_with_timeout( + "Reputation DB initialized fresh", + false, + LogLineCountOptions::new(|n| n >= 1, Duration::from_secs(60), false), + ) + .await?; + assert!( + fresh_start_result.success(), + "Expected validator to log 'Reputation DB initialized fresh' on initial startup" + ); + + log::info!("Network spawned, waiting for parachain blocks to be produced"); + assert_para_throughput(&validator0_client, 10, [(ParaId::from(PARA_ID), 8..12)]).await?; + + log::info!("Parachain blocks produced, waiting for periodic persistence"); + let persistence_result = validator_0 + .wait_log_line_count_with_timeout( + "Periodic persistence completed: reputation DB written to disk", + false, + LogLineCountOptions::new(|n| n >= 1, Duration::from_secs(60), false), + ) + .await?; + assert!(persistence_result.success(), "Periodic persistence should have completed"); + + let logs_before_pause = validator_0.logs().await?; + let persistence_re = Regex::new( + r"Periodic persistence completed: reputation DB written to disk.*last_finalized=Some\((\d+)\)" + )?; + + let mut block_at_persistence: Option = None; + for line in logs_before_pause.lines() { + if let Some(caps) = persistence_re.captures(line) { + block_at_persistence = caps.get(1).and_then(|m| m.as_str().parse().ok()); + } + } + + let block_at_persistence = block_at_persistence + .ok_or(anyhow!("Could not parse last_finalized from persistence log"))?; + log::info!("Periodic persistence completed at finalized block {}", block_at_persistence); + + log::info!("Pausing validator-0 to create a block gap"); + validator_0.pause().await?; + + let validator_1 = network.get_node("validator-1")?; + let validator_1_client: OnlineClient = validator_1.wait_client().await?; + let mut finalized_blocks_1 = validator_1_client.blocks().subscribe_finalized().await?; + + log::info!("Waiting for finalized blocks while validator-0 is paused"); + let target_gap = 30u32; + let mut block_at_restart = block_at_persistence; + while block_at_restart < block_at_persistence + target_gap { + if let Some(Ok(block)) = finalized_blocks_1.next().await { + block_at_restart = block.number(); + log::info!("Finalized block {} (gap: {})", block_at_restart, block_at_restart.saturating_sub(block_at_persistence)); + } + } + log::info!( + "Gap created while validator-0 was paused: finalized block now at {}, gap of {} blocks", + block_at_restart, + block_at_restart.saturating_sub(block_at_persistence) + ); + + log::info!("Restarting validator-0 (full restart to trigger startup lookback)"); + validator_0.restart(None).await?; + let _: OnlineClient = validator_0.wait_client().await?; + log::info!("Validator-0 restarted, verifying reputation loaded from disk"); + + let load_result = validator_0 + .wait_log_line_count_with_timeout( + "Loaded existing reputation DB from disk", + false, + LogLineCountOptions::new(|n| n >= 1, Duration::from_secs(60), false), + ) + .await?; + assert!( + load_result.success(), + "Expected validator to log 'Loaded existing reputation DB from disk' after restart" + ); + + let lookback_completed_result = validator_0 + .wait_log_line_count_with_timeout( + "Startup lookback completed", + false, + LogLineCountOptions::new(|n| n >= 1, Duration::from_secs(30), false), + ) + .await?; + assert!( + lookback_completed_result.success(), + "Expected validator to log 'Startup lookback completed' after restart" + ); + + let logs = validator_0.logs().await?; + let lookback_completed_re = Regex::new( + r"Startup lookback completed.*blocks_processed=(\d+)" + )?; + + let mut found_lookback_completed = false; + let mut blocks_processed: Option = None; + + for line in logs.lines() { + if let Some(caps) = lookback_completed_re.captures(line) { + found_lookback_completed = true; + blocks_processed = caps.get(1).and_then(|m| m.as_str().parse().ok()); + log::info!( + "Found startup lookback completed log: blocks_processed={}", + blocks_processed.unwrap() + ); + break; + } + } + + assert!( + found_lookback_completed, + "Expected to find 'Startup lookback completed' log with blocks_processed field" + ); + + let processed = blocks_processed.expect("blocks_processed should be present in log"); + assert!( + processed == 20, + "Expected blocks_processed ({}) == MAX_STARTUP_ANCESTRY_LOOKBACK ({})", + processed, target_gap + ); + log::info!( + "Lookback verification passed: processed {} blocks (< existing gap {})", + processed, target_gap + ); + + log::info!("Verifying validator resumes normal operation"); + + let relay_client_after: OnlineClient = validator_0.wait_client().await?; + assert_para_throughput(&relay_client_after, 5, [(ParaId::from(PARA_ID), 4..6)]).await?; + + // === Phase 2: Verify lookback processes entire gap when gap < MAX_STARTUP_ANCESTRY_LOOKBACK === + log::info!("Phase 2: Testing lookback with smaller gap (< 20 blocks)"); + + // Wait for another periodic persistence to get a precise starting point + log::info!("Waiting for second periodic persistence"); + let persistence_result_2 = validator_0 + .wait_log_line_count_with_timeout( + "Periodic persistence completed: reputation DB written to disk", + false, + LogLineCountOptions::new(|n| n >= 2, Duration::from_secs(60), false), + ) + .await?; + assert!(persistence_result_2.success(), "Second periodic persistence should have completed"); + + validator_0.pause().await?; + log::info!("Pausing validator-0 again to create a smaller gap"); + + let logs_before_second_pause = validator_0.logs().await?; + let mut block_before_second_pause: Option = None; + + for line in logs_before_second_pause.lines().rev() { + if let Some(caps) = persistence_re.captures(line) { + block_before_second_pause = caps.get(1).and_then(|m| m.as_str().parse().ok()); + if block_before_second_pause.is_some() { + break; + } + } + } + + let block_before_second_pause = block_before_second_pause + .ok_or(anyhow!("Could not parse last_finalized from second persistence log"))?; + log::info!("Second periodic persistence completed at finalized block {}", block_before_second_pause); + + + let small_gap_target = 10u32; + let mut block_at_second_restart = block_before_second_pause; + while block_at_second_restart < block_before_second_pause + small_gap_target { + if let Some(Ok(block)) = finalized_blocks_1.next().await { + block_at_second_restart = block.number(); + log::info!( + "Finalized block {} (gap: {})", + block_at_second_restart, + block_at_second_restart.saturating_sub(block_before_second_pause) + ); + } + } + log::info!( + "Small gap created: {} blocks (from {} to {})", + block_at_second_restart.saturating_sub(block_before_second_pause), + block_before_second_pause, + block_at_second_restart + ); + + log::info!("Restarting validator-0 (second restart)"); + validator_0.restart(None).await?; + let _: OnlineClient = validator_0.wait_client().await?; + + let lookback_completed_result_2 = validator_0 + .wait_log_line_count_with_timeout( + "Startup lookback completed", + false, + LogLineCountOptions::new(|n| n >= 2, Duration::from_secs(30), false), + ) + .await?; + assert!( + lookback_completed_result_2.success(), + "Expected second 'Startup lookback completed' log" + ); + + let logs_second_restart = validator_0.logs().await?; + + let mut last_blocks_processed: Option = None; + for line in logs_second_restart.lines().rev() { + if let Some(caps) = lookback_completed_re.captures(line) { + last_blocks_processed = caps.get(1).and_then(|m| m.as_str().parse().ok()); + if last_blocks_processed.is_some() { + break; + } + } + } + + let processed_second = last_blocks_processed.expect("Should find second lookback completed log"); + log::info!("Second lookback processed {} blocks", processed_second); + + let expected_gap = block_at_second_restart.saturating_sub(block_before_second_pause); + log::info!( + "Second lookback: gap was {} blocks (from {} to {}), processed {}", + expected_gap, + block_before_second_pause, + block_at_second_restart, + processed_second + ); + + assert!( + expected_gap < 20, + "Expected second gap to be < 20 (to test no artificial limit), but got {}", + expected_gap + ); + + assert!( + processed_second >= expected_gap.saturating_sub(4) && + processed_second <= expected_gap + 4, + "Expected second lookback to process entire gap (~{} blocks), but got {}", + expected_gap, + processed_second + ); + + log::info!("Basic persistence test completed successfully - both large and small gap tests passed"); + + Ok(()) +} diff --git a/polkadot/zombienet-sdk-tests/tests/functional/reputation_persistence/mod.rs b/polkadot/zombienet-sdk-tests/tests/functional/reputation_persistence/mod.rs new file mode 100644 index 0000000000000..45cd7ca9ffebf --- /dev/null +++ b/polkadot/zombienet-sdk-tests/tests/functional/reputation_persistence/mod.rs @@ -0,0 +1,5 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: Apache-2.0 + +mod basic_persistence; +mod pruning; diff --git a/polkadot/zombienet-sdk-tests/tests/functional/reputation_persistence/pruning.rs b/polkadot/zombienet-sdk-tests/tests/functional/reputation_persistence/pruning.rs new file mode 100644 index 0000000000000..464a3577cbf8d --- /dev/null +++ b/polkadot/zombienet-sdk-tests/tests/functional/reputation_persistence/pruning.rs @@ -0,0 +1,266 @@ +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: Apache-2.0 + +//! Test: Reputation Pruning on Parachain Deregistration +//! +//! This test verifies that the reputation pruning mechanism correctly: +//! 1. Builds reputation for multiple parachains +//! 2. Detects when a parachain is deregistered +//! 3. Prunes reputation data for the deregistered parachain +//! 4. Persists the pruned state to disk +//! 5. Correctly loads only the non-pruned data after restart +//! +//! ## Test Scenario +//! +//! 1. Spawn a network with 4 validators and 2 parachains +//! 2. Wait for both parachains to produce blocks (establishing reputation for both) +//! 3. Wait for periodic persistence (both paras' reputation on disk) +//! 4. Record reputation entries for both parachains +//! 5. **Deregister parachain 2001 using sudo** +//! 6. Wait for session boundary (triggers pruning check) +//! 7. Verify pruning logs show para 2001 was pruned +//! 8. Wait for periodic persistence (pruned state written to disk) +//! 9. Restart validator-0 +//! 10. Verify only para 2000's reputation was loaded (para 2001 pruned) +//! 11. Verify validator continues normal operation with para 2000 +//! +//! ## Success Criteria +//! +//! - Both parachains build reputation initially +//! - After deregistration, pruning logs show para 2001 removed +//! - After restart, only para 2000's reputation is loaded from disk +//! - Para 2000 continues producing blocks normally + +use anyhow::anyhow; +use regex::Regex; +use tokio::time::Duration; + +use cumulus_zombienet_sdk_helpers::{assert_para_throughput, wait_for_first_session_change}; +use polkadot_primitives::Id as ParaId; +use serde_json::json; +use zombienet_orchestrator::network::node::LogLineCountOptions; +use zombienet_sdk::{ + subxt::{ext::scale_value::value, OnlineClient, PolkadotConfig}, + subxt_signer::sr25519::dev, + NetworkConfigBuilder, +}; + +const PARA_ID_1: u32 = 2000; +const PARA_ID_2: u32 = 2001; + +#[tokio::test(flavor = "multi_thread")] +async fn pruning_test() -> Result<(), anyhow::Error> { + let _ = env_logger::try_init_from_env( + env_logger::Env::default().filter_or(env_logger::DEFAULT_FILTER_ENV, "info"), + ); + + let images = zombienet_sdk::environment::get_images_from_env(); + + let config = NetworkConfigBuilder::new() + .with_relaychain(|r| { + let r = r + .with_chain("rococo-local") + .with_default_command("polkadot") + .with_default_image(images.polkadot.as_str()) + .with_default_args(vec![ + ("-lparachain=debug,parachain::collator-protocol=trace").into(), + ("--experimental-collator-protocol").into(), + ]) + .with_genesis_overrides(json!({ + "configuration": { + "config": { + "scheduler_params": { + "group_rotation_frequency": 4, + "num_cores": 2 + } + } + } + })) + .with_node(|node| node.with_name("validator-0")); + + (1..4) + .fold(r, |acc, i| acc.with_node(|node| node.with_name(&format!("validator-{i}")))) + }) + .with_parachain(|p| { + p.with_id(PARA_ID_1) + .with_default_command("undying-collator") + .cumulus_based(false) + .with_default_image( + std::env::var("COL_IMAGE") + .unwrap_or("docker.io/paritypr/colander:latest".to_string()) + .as_str(), + ) + .with_default_args(vec![("-lparachain=debug").into(), ("--experimental-send-approved-peer").into()]) + .with_collator(|n| n.with_name("collator-1")) + }) + .with_parachain(|p| { + p.with_id(PARA_ID_2) + .with_default_command("undying-collator") + .cumulus_based(false) + .with_default_image( + std::env::var("COL_IMAGE") + .unwrap_or("docker.io/paritypr/colander:latest".to_string()) + .as_str(), + ) + .with_default_args(vec![("-lparachain=debug").into(), ("--experimental-send-approved-peer").into()]) + .with_collator(|n| n.with_name("collator-2")) + }) + .build() + .map_err(|e| { + let errs = e.into_iter().map(|e| e.to_string()).collect::>().join(" "); + anyhow!("config errs: {errs}") + })?; + + let spawn_fn = zombienet_sdk::environment::get_spawn_fn(); + let network = spawn_fn(config).await?; + + let validator_0 = network.get_node("validator-0")?; + let validator0_client: OnlineClient = validator_0.wait_client().await?; + + // Verify validator-0 shows fresh start initially (no existing data) + let fresh_start_result = validator_0 + .wait_log_line_count_with_timeout( + "Reputation DB initialized fresh", + false, + LogLineCountOptions::new(|n| n >= 1, Duration::from_secs(60), false), + ) + .await?; + assert!( + fresh_start_result.success(), + "Expected validator to log 'Reputation DB initialized fresh' on initial startup" + ); + + log::info!("Network spawned, waiting for both parachains to produce blocks"); + assert_para_throughput( + &validator0_client, + 10, + [(ParaId::from(PARA_ID_1), 8..12), (ParaId::from(PARA_ID_2), 8..12)], + ) + .await?; + log::info!("Both parachains producing blocks, waiting for initial periodic persistence"); + + let persistence_result = validator_0 + .wait_log_line_count_with_timeout( + "Periodic persistence completed: reputation DB written to disk", + false, + LogLineCountOptions::new(|n| n >= 1, Duration::from_secs(60), false), + ) + .await?; + assert!(persistence_result.success(), "Initial periodic persistence should have completed"); + log::info!("Initial persistence completed - both paras' reputation on disk"); + // Parse logs to verify both paras have reputation entries before pruning + let logs_before_pruning = validator_0.logs().await?; + let persistence_para_count_re = Regex::new( + r"Periodic persistence completed: reputation DB written to disk.*para_count=(\d+)" + )?; + let mut para_count_before_pruning: Option = None; + for line in logs_before_pruning.lines() { + if let Some(caps) = persistence_para_count_re.captures(line) { + para_count_before_pruning = caps.get(1).and_then(|m| m.as_str().parse().ok()); + } + } + + let para_count = para_count_before_pruning + .ok_or(anyhow!("Could not parse para_count from persistence log"))?; + log::info!("Before pruning: para_count={}", para_count); + assert_eq!( + para_count, 2, + "Expected 2 paras with reputation before pruning (2000 and 2001), but found {}", + para_count + ); + + + log::info!("Cleaning up parachain 2001 using ParasSudoWrapper::sudo_schedule_para_cleanup + Paras::force_queue_action"); + // Get Alice's signer + let alice = dev::alice(); + let cleanup_calls = vec![ + value! { + ParasSudoWrapper(sudo_schedule_para_cleanup { id: PARA_ID_2 }) + }, + value! { + Paras(force_queue_action { para: PARA_ID_2 }) + }, + ]; + let sudo_batch_call = zombienet_sdk::subxt::tx::dynamic( + "Sudo", + "sudo", + vec![value! { + Utility(batch_all { calls: cleanup_calls }) + }], + ); + // Submit the transaction + let tx_progress = validator0_client + .tx() + .sign_and_submit_then_watch_default(&sudo_batch_call, &alice) + .await?; + // Wait for finalization + let _finalized = tx_progress.wait_for_finalized_success().await?; + log::info!("Para cleanup scheduled and force_queue_action submitted successfully"); + // Stop the collator for para 2001 since it's now being cleaned up + log::info!("Stopping collator-2 for the cleaned-up parachain 2001"); + let collator_2 = network.get_node("collator-2")?; + collator_2.pause().await?; + log::info!("Parachain 2001 cleanup scheduled, waiting for session change"); + // The cleanup is scheduled for the next session. We need to wait for at least one + // session change for the para to be fully offboarded. + let mut best_blocks = validator0_client.blocks().subscribe_best().await?; + wait_for_first_session_change(&mut best_blocks).await?; + log::info!("Session change detected, para 2001 should now be offboarded"); + + log::info!("Waiting for pruning logs to confirm para 2001 was pruned"); + let pruning_result = validator_0 + .wait_log_line_count_with_timeout( + "Prune paras persisted to disk immediately pruned_para_count=1 remaining_para_count=1 registered_para_count=1", + false, + LogLineCountOptions::new(|n| n >= 1, Duration::from_secs(90), false), + ) + .await?; + assert!( + pruning_result.success(), + "Expected validator to log 'Prune paras persisted to disk immediately' with pruned=1, remaining=1, registered=1" + ); + log::info!("Pruning verified: pruned 1 para, 1 remaining, 1 registered"); + + log::info!("Restarting validator-0 to verify only para 2000's reputation loads"); + validator_0.restart(None).await?; + let validator0_client_after: OnlineClient = validator_0.wait_client().await?; + log::info!("Validator-0 restarted, verifying reputation loaded from disk"); + + let load_result = validator_0 + .wait_log_line_count_with_timeout( + "Loaded existing reputation DB from disk", + false, + LogLineCountOptions::new(|n| n >= 1, Duration::from_secs(60), false), + ) + .await?; + assert!( + load_result.success(), + "Expected validator to log 'Loaded existing reputation DB from disk' after restart" + ); + // Parse logs after restart to verify only para 2000 was loaded + let logs_after_restart = validator_0.logs().await?; + let load_re = Regex::new(r"Loaded existing reputation DB from disk.*para_count=(\d+)")?; + let mut para_count: Option = None; + for line in logs_after_restart.lines() { + if let Some(caps) = load_re.captures(line) { + para_count = caps.get(1).and_then(|m| m.as_str().parse().ok()); + if para_count.is_some() { + break; + } + } + } + let count = para_count.unwrap(); + log::info!("After restart: para_count={}", count); + assert!( + count <= 1, + "Expected at most 1 para after pruning, but found {}", + count + ); + + log::info!("Verifying para 2000 continues normal operation (para 2001 is deregistered)"); + assert_para_throughput(&validator0_client_after, 5, [(ParaId::from(PARA_ID_1), 4..6)]) + .await?; + + log::info!("Pruning test completed successfully"); + Ok(()) +}