Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions polkadot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ substrate-build-script-utils = { workspace = true, default-features = true }
runtime-benchmarks = ["polkadot-cli/runtime-benchmarks"]
try-runtime = ["polkadot-cli/try-runtime"]
fast-runtime = ["polkadot-cli/fast-runtime"]
test-persistence = ["polkadot-cli/test-persistence"]
runtime-metrics = ["polkadot-cli/runtime-metrics"]
pyroscope = ["polkadot-cli/pyroscope"]
jemalloc-allocator = ["polkadot-jemalloc-shim/jemalloc-allocator"]
Expand Down
1 change: 1 addition & 0 deletions polkadot/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ try-runtime = [
"sp-runtime/try-runtime",
]
fast-runtime = ["polkadot-service/fast-runtime"]
test-persistence = ["polkadot-service/test-persistence"]
pyroscope = ["dep:pyroscope", "pyroscope_pprofrs"]

# Configure the native runtimes to use.
Expand Down
3 changes: 3 additions & 0 deletions polkadot/node/network/collator-protocol/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ polkadot-node-subsystem-util = { workspace = true, default-features = true }
polkadot-primitives = { workspace = true, default-features = true }
thiserror = { workspace = true }
tokio-util = { workspace = true }
codec.workspace = true

[dev-dependencies]
assert_matches = { workspace = true }
kvdb-memorydb = { workspace = true }
rstest = { workspace = true }
sc-network-types = { workspace = true, default-features = true }
sp-tracing = { workspace = true }
Expand All @@ -51,3 +53,4 @@ polkadot-primitives-test-helpers = { workspace = true }

[features]
default = []
test-persistence = []
15 changes: 10 additions & 5 deletions polkadot/node/network/collator-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

use std::{
collections::HashSet,
sync::Arc,
time::{Duration, Instant},
};

Expand All @@ -31,16 +32,16 @@ use futures::{
FutureExt, TryFutureExt,
};

use polkadot_node_subsystem_util::reputation::ReputationAggregator;
use polkadot_node_subsystem_util::{database::Database, reputation::ReputationAggregator};
use sp_keystore::KeystorePtr;

use polkadot_node_network_protocol::{
request_response::{v2 as protocol_v2, IncomingRequestReceiver},
PeerId, UnifiedReputationChange as Rep,
};
use polkadot_primitives::CollatorPair;

use polkadot_node_subsystem::{errors::SubsystemError, overseer, DummySubsystem, SpawnedSubsystem};
use polkadot_primitives::CollatorPair;
pub use validator_side_experimental::ReputationConfig;

mod collator_side;
mod validator_side;
Expand Down Expand Up @@ -91,6 +92,10 @@ pub enum ProtocolSide {
keystore: KeystorePtr,
/// Prometheus metrics for validators.
metrics: validator_side_experimental::Metrics,
/// Database used for reputation house keeping.
db: Arc<dyn Database>,
/// Reputation configuration (column number and persist interval).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

persist interval is not here

reputation_config: validator_side_experimental::ReputationConfig,
},
/// Collators operate on a parachain.
Collator {
Expand Down Expand Up @@ -148,8 +153,8 @@ impl<Context> CollatorProtocolSubsystem {
.map_err(|e| SubsystemError::with_origin("collator-protocol", e))
.boxed()
},
ProtocolSide::ValidatorExperimental { keystore, metrics } =>
validator_side_experimental::run(ctx, keystore, metrics)
ProtocolSide::ValidatorExperimental { keystore, metrics, db, reputation_config } =>
validator_side_experimental::run(ctx, keystore, metrics, db, reputation_config)
.map_err(|e| SubsystemError::with_origin("collator-protocol", e))
.boxed(),
ProtocolSide::Collator { peer_id, collator_pair, request_receiver_v2, metrics } =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use codec::{Decode, Encode};
use polkadot_node_network_protocol::{
peer_set::CollationVersion,
request_response::{outgoing::RequestError, v2 as request_v2},
Expand Down Expand Up @@ -83,8 +84,17 @@ pub const MAX_FETCH_DELAY: Duration = Duration::from_millis(300);
/// advertised collations.
pub const MIN_FETCH_TIMER_DELAY: Duration = Duration::from_millis(150);

/// How often to persist the reputation database to disk.
/// Using 10 minutes in production as a balance between data safety and disk I/O.
/// Using 30 seconds in test mode for faster test execution.
pub const REPUTATION_PERSIST_INTERVAL: Duration = if cfg!(feature = "test-persistence") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the feature name is not expressive enough (what persistence is this referring to).
a better alternative would be to make this duration configurable via CLI

Duration::from_secs(30)
} else {
Duration::from_secs(10 * 60)
};

/// Reputation score type.
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy, Default)]
#[derive(PartialEq, Eq, PartialOrd, Ord, Debug, Clone, Copy, Default, Encode, Decode)]
pub struct Score(u16);

impl Score {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub enum Error {
#[fatal]
#[error("Receiving message from overseer failed: {0}")]
SubsystemReceive(#[source] SubsystemError),
#[fatal]
#[error("Failed to initialize reputation database: {0}")]
ReputationDbInit(String),
#[error("Unable to retrieve block number for {0:?} from implicit view")]
BlockNumberNotFoundInImplicitView(Hash),
#[fatal(forward)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@ mod state;
#[cfg(test)]
mod tests;

use crate::{validator_side_experimental::common::MIN_FETCH_TIMER_DELAY, LOG_TARGET};
use crate::{
validator_side_experimental::{
common::{MIN_FETCH_TIMER_DELAY, REPUTATION_PERSIST_INTERVAL},
peer_manager::PersistentDb,
},
LOG_TARGET,
};
use collation_manager::CollationManager;
use common::{ProspectiveCandidate, MAX_STORED_SCORES_PER_PARA};
use error::{log_error, FatalError, FatalResult, Result};
Expand All @@ -35,25 +41,41 @@ use polkadot_node_subsystem::{
messages::{CollatorProtocolMessage, NetworkBridgeEvent},
overseer, ActivatedLeaf, CollatorProtocolSenderTrait, FromOrchestra, OverseerSignal,
};
use polkadot_node_subsystem_util::database::Database;
use sp_keystore::KeystorePtr;
use std::{future, future::Future, pin::Pin, time::Duration};
use std::{future, future::Future, pin::Pin, sync::Arc, time::Duration};

use peer_manager::{Db, PeerManager};
#[cfg(test)]
use peer_manager::Db;
use peer_manager::PeerManager;

use state::State;

pub use crate::validator_side_metrics::Metrics;

/// Configuration for the reputation db.
#[derive(Debug, Clone, Copy)]
pub struct ReputationConfig {
/// The data column in the store to use for reputation data.
pub col_reputation_data: u32,
}

/// The main run loop.
#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
pub(crate) async fn run<Context>(
mut ctx: Context,
keystore: KeystorePtr,
metrics: Metrics,
db: Arc<dyn Database>,
reputation_config: ReputationConfig,
) -> FatalResult<()> {
gum::info!(LOG_TARGET, "Running experimental collator protocol");
if let Some(state) = initialize(&mut ctx, keystore, metrics).await? {
run_inner(ctx, state).await?;
gum::info!(
LOG_TARGET,
persist_interval_secs = REPUTATION_PERSIST_INTERVAL.as_secs(),
"Running experimental collator protocol"
);
if let Some(state) = initialize(&mut ctx, keystore, metrics, db, reputation_config).await? {
run_inner(ctx, state, REPUTATION_PERSIST_INTERVAL).await?;
}

Ok(())
Expand All @@ -64,7 +86,9 @@ async fn initialize<Context>(
ctx: &mut Context,
keystore: KeystorePtr,
metrics: Metrics,
) -> FatalResult<Option<State<Db>>> {
db: Arc<dyn Database>,
reputation_config: ReputationConfig,
) -> FatalResult<Option<State<peer_manager::PersistentDb>>> {
loop {
let first_leaf = match wait_for_first_leaf(ctx).await? {
Some(activated_leaf) => {
Expand All @@ -84,7 +108,27 @@ async fn initialize<Context>(

let scheduled_paras = collation_manager.assignments();

let backend = Db::new(MAX_STORED_SCORES_PER_PARA).await;
// Create PersistentDb with disk persistence
let backend = match peer_manager::PersistentDb::new(
db.clone(),
reputation_config,
MAX_STORED_SCORES_PER_PARA,
)
.await
{
Ok(backend) => backend,
Err(e) => {
gum::error!(
target: LOG_TARGET,
error = ?e,
"Failed to initialize persistent reputation DB"
);
return Err(FatalError::ReputationDbInit(format!(
"PersistentDb init failed: {:?}",
e
)))
},
};

match PeerManager::startup(backend, ctx.sender(), scheduled_paras.into_iter().collect())
.await
Expand Down Expand Up @@ -133,9 +177,21 @@ fn create_timer(maybe_delay: Option<Duration>) -> Fuse<Pin<Box<dyn Future<Output
timer.fuse()
}

/// Create the persistence timer that fires after the given interval.
fn create_persistence_timer(interval: Duration) -> Fuse<Pin<Box<dyn Future<Output = ()> + Send>>> {
let delay: Pin<Box<dyn Future<Output = ()> + Send>> = Box::pin(Delay::new(interval));
delay.fuse()
}

#[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)]
async fn run_inner<Context>(mut ctx: Context, mut state: State<Db>) -> FatalResult<()> {
async fn run_inner<Context>(
mut ctx: Context,
mut state: State<peer_manager::PersistentDb>,
persist_interval: Duration,
) -> FatalResult<()> {
let mut timer = create_timer(None);
let mut persistence_timer = create_persistence_timer(persist_interval);

loop {
select! {
// Calling `fuse()` here is useless, because the termination state of the resulting
Expand All @@ -153,7 +209,11 @@ async fn run_inner<Context>(mut ctx: Context, mut state: State<Db>) -> FatalResu
msg,
).await;
}
Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) | Err(_) => break,
Ok(FromOrchestra::Signal(OverseerSignal::Conclude)) | Err(_) => {
// Persist to disk before shutdown
state.persist_to_disk();
break
},
Ok(FromOrchestra::Signal(OverseerSignal::BlockFinalized(hash, number))) => {
state.handle_finalized_block(ctx.sender(), hash, number).await?;
},
Expand All @@ -167,6 +227,12 @@ async fn run_inner<Context>(mut ctx: Context, mut state: State<Db>) -> FatalResu
// We don't need to do anything specific here.
// If the timer expires, we only need to trigger the advertisement fetching logic.
},
_ = &mut persistence_timer => {
// Periodic persistence - write reputation DB to disk
state.persist_to_disk();
// Reset the timer for the next interval
persistence_timer = create_persistence_timer(persist_interval);
},
}

// Now try triggering advertisement fetching, if we have room in any of the active leaves
Expand All @@ -186,7 +252,7 @@ async fn run_inner<Context>(mut ctx: Context, mut state: State<Db>) -> FatalResu
/// The main message receiver switch.
async fn process_msg<Sender: CollatorProtocolSenderTrait>(
sender: &mut Sender,
state: &mut State<Db>,
state: &mut State<PersistentDb>,
msg: CollatorProtocolMessage,
) {
use CollatorProtocolMessage::*;
Expand Down Expand Up @@ -239,7 +305,7 @@ async fn process_msg<Sender: CollatorProtocolSenderTrait>(
/// Bridge event switch.
async fn handle_network_msg<Sender: CollatorProtocolSenderTrait>(
sender: &mut Sender,
state: &mut State<Db>,
state: &mut State<PersistentDb>,
bridge_message: NetworkBridgeEvent<net_protocol::CollatorProtocolMessage>,
) -> Result<()> {
use NetworkBridgeEvent::*;
Expand Down Expand Up @@ -287,7 +353,7 @@ async fn handle_network_msg<Sender: CollatorProtocolSenderTrait>(

async fn process_incoming_peer_message<Sender: CollatorProtocolSenderTrait>(
sender: &mut Sender,
state: &mut State<Db>,
state: &mut State<PersistentDb>,
origin: PeerId,
msg: CollationProtocols<
protocol_v1::CollatorProtocolMessage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ impl Db {
}
}

type Timestamp = u128;
pub(crate) type Timestamp = u128;

#[derive(Clone, Debug)]
struct ScoreEntry {
score: Score,
last_bumped: Timestamp,
#[derive(Clone, Copy, Debug, codec::Encode, codec::Decode)]
pub(crate) struct ScoreEntry {
pub(crate) score: Score,
pub(crate) last_bumped: Timestamp,
}

#[async_trait]
Expand Down Expand Up @@ -222,6 +222,40 @@ impl Db {
}
}

/// Get the last finalized block number (for persistence).
pub(crate) fn get_last_finalized(&self) -> Option<BlockNumber> {
self.last_finalized
}

/// Set the last finalized block number (for loading from disk).
pub(crate) fn set_last_finalized(&mut self, last_finalized: Option<BlockNumber>) {
self.last_finalized = last_finalized;
}

/// Get reputations for a specific para (for persistence).
pub(crate) fn get_para_reputations(
&self,
para_id: &ParaId,
) -> Option<HashMap<PeerId, ScoreEntry>> {
self.db.get(para_id).cloned()
}

/// Set reputations for a specific para (for loading from disk).
pub(crate) fn set_para_reputations(
&mut self,
para_id: ParaId,
reputations: HashMap<PeerId, ScoreEntry>,
) {
self.db.insert(para_id, reputations);
}

/// Get all reputations (for persistence).
pub(crate) fn all_reputations(
&self,
) -> impl Iterator<Item = (&ParaId, &HashMap<PeerId, ScoreEntry>)> {
self.db.iter()
}

#[cfg(test)]
fn len(&self) -> usize {
self.db.len()
Expand Down
Loading
Loading