diff --git a/consensus/fuzz/src/lib.rs b/consensus/fuzz/src/lib.rs index 88f5ec4bd33..a95198808f4 100644 --- a/consensus/fuzz/src/lib.rs +++ b/consensus/fuzz/src/lib.rs @@ -33,7 +33,6 @@ use commonware_p2p::{ simulated::{Config as NetworkConfig, Link, Network, Oracle, SplitOrigin, SplitTarget}, Recipients, }; -use commonware_parallel::Sequential; use commonware_runtime::{ buffer::paged::CacheRef, deterministic, Clock, IoBuf, Metrics, Runner, Spawner, }; @@ -402,7 +401,6 @@ where replay_buffer: NZUsize!(1024 * 1024), write_buffer: NZUsize!(1024 * 1024), page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE), - strategy: Sequential, forwarding: ForwardingPolicy::Disabled, }; let engine = Engine::new(context.with_label("engine"), engine_cfg); @@ -636,7 +634,6 @@ fn run_with_twin_mutator(input: FuzzInput) { replay_buffer: NZUsize!(1024 * 1024), write_buffer: NZUsize!(1024 * 1024), page_cache: CacheRef::from_pooler(&primary_context, PAGE_SIZE, PAGE_CACHE_SIZE), - strategy: Sequential, forwarding: ForwardingPolicy::Disabled, }; let engine = Engine::new(primary_context.with_label("engine"), engine_cfg); diff --git a/consensus/src/aggregation/config.rs b/consensus/src/aggregation/config.rs index 52c41647270..d05be569b30 100644 --- a/consensus/src/aggregation/config.rs +++ b/consensus/src/aggregation/config.rs @@ -8,7 +8,6 @@ use commonware_cryptography::{ Digest, }; use commonware_p2p::Blocker; -use commonware_parallel::Strategy; use commonware_runtime::buffer::paged::CacheRef; use commonware_utils::NonZeroDuration; use std::num::{NonZeroU64, NonZeroUsize}; @@ -21,7 +20,6 @@ pub struct Config< Z: Reporter>, M: Monitor, B: Blocker::PublicKey>, - T: Strategy, > { /// Tracks the current state of consensus (to determine which participants should /// be involved in the current broadcast attempt). @@ -79,7 +77,4 @@ pub struct Config< /// Page cache for the journal. pub journal_page_cache: CacheRef, - - /// Strategy for parallel operations. - pub strategy: T, } diff --git a/consensus/src/aggregation/engine.rs b/consensus/src/aggregation/engine.rs index a685e0f5fe4..409e962238d 100644 --- a/consensus/src/aggregation/engine.rs +++ b/consensus/src/aggregation/engine.rs @@ -20,7 +20,6 @@ use commonware_p2p::{ utils::codec::{wrap, WrappedSender}, Blocker, Receiver, Recipients, Sender, }; -use commonware_parallel::Strategy; use commonware_runtime::{ buffer::paged::CacheRef, spawn_cell, @@ -28,7 +27,7 @@ use commonware_runtime::{ histogram, status::{CounterExt, GaugeExt, Status}, }, - BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage, + BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage, Strategist, }; use commonware_storage::journal::segmented::variable::{Config as JConfig, Journal}; use commonware_utils::{futures::Pool as FuturesPool, ordered::Quorum, N3f1, PrioritySet}; @@ -71,14 +70,13 @@ struct DigestRequest { /// Instance of the engine. pub struct Engine< - E: BufferPooler + Clock + Spawner + Storage + Metrics + CryptoRngCore, + E: BufferPooler + Clock + Spawner + Storage + Metrics + CryptoRngCore + Strategist, P: Provider, D: Digest, A: Automaton + Clone, Z: Reporter>, M: Monitor, B: Blocker::PublicKey>, - T: Strategy, > { // ---------- Interfaces ---------- context: ContextCell, @@ -87,7 +85,6 @@ pub struct Engine< provider: P, reporter: Z, blocker: B, - strategy: T, // Pruning /// A tuple representing the epochs to keep in memory. @@ -154,18 +151,17 @@ pub struct Engine< } impl< - E: BufferPooler + Clock + Spawner + Storage + Metrics + CryptoRngCore, + E: BufferPooler + Clock + Spawner + Storage + Metrics + CryptoRngCore + Strategist, P: Provider>, D: Digest, A: Automaton + Clone, Z: Reporter>, M: Monitor, B: Blocker::PublicKey>, - T: Strategy, - > Engine + > Engine { /// Creates a new engine with the given context and configuration. - pub fn new(context: E, cfg: Config) -> Self { + pub fn new(context: E, cfg: Config) -> Self { // TODO(#1833): Metrics should use the post-start context let metrics = metrics::Metrics::init(context.clone()); @@ -176,7 +172,6 @@ impl< monitor: cfg.monitor, provider: cfg.provider, blocker: cfg.blocker, - strategy: cfg.strategy, epoch_bounds: cfg.epoch_bounds, window: HeightDelta::new(cfg.window.into()), activity_timeout: cfg.activity_timeout, @@ -385,7 +380,7 @@ impl< } // Validate that we need to process the ack - if let Err(err) = self.validate_ack(&ack, &sender) { + if let Err(err) = self.validate_ack(&ack, &sender).await { if err.blockable() { commonware_p2p::block!( self.blocker, @@ -530,9 +525,16 @@ impl< let filtered = acks .values() .filter(|a| a.item.digest == ack.item.digest) + .cloned() .collect::>(); if filtered.len() >= quorum as usize { - if let Some(certificate) = Certificate::from_acks(&*scheme, filtered, &self.strategy) { + let certificate = self + .context + .with_strategy(move |strategy| { + Certificate::from_acks(&*scheme, filtered.iter(), strategy) + }) + .await; + if let Some(certificate) = certificate { self.metrics.certificates.inc(); self.handle_certificate(certificate).await; } @@ -613,7 +615,7 @@ impl< /// Takes a raw ack (from sender) from the p2p network and validates it. /// /// Returns an error if the ack is invalid. - fn validate_ack( + async fn validate_ack( &mut self, ack: &Ack, sender: &::PublicKey, @@ -679,7 +681,13 @@ impl< } // Validate signature - if !ack.verify(&mut self.context, &*scheme, &self.strategy) { + let ack = ack.clone(); + let mut rng = self.context.clone(); + let valid = self + .context + .with_strategy(move |strategy| ack.verify(&mut rng, &*scheme, strategy)) + .await; + if !valid { return Err(Error::InvalidAckSignature); } diff --git a/consensus/src/aggregation/mod.rs b/consensus/src/aggregation/mod.rs index 6bf3d50f678..d67269dce3a 100644 --- a/consensus/src/aggregation/mod.rs +++ b/consensus/src/aggregation/mod.rs @@ -98,7 +98,6 @@ mod tests { }; use commonware_macros::{select, test_group, test_traced}; use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender}; - use commonware_parallel::Sequential; use commonware_runtime::{ buffer::paged::CacheRef, deterministic::{self, Context}, @@ -253,7 +252,6 @@ mod tests { journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(), journal_compression: Some(3), journal_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE), - strategy: Sequential, }, ); @@ -502,7 +500,6 @@ mod tests { PAGE_SIZE, PAGE_CACHE_SIZE, ), - strategy: Sequential, }, ); @@ -656,7 +653,6 @@ mod tests { PAGE_SIZE, PAGE_CACHE_SIZE, ), - strategy: Sequential, }, ); @@ -743,7 +739,6 @@ mod tests { PAGE_SIZE, PAGE_CACHE_SIZE, ), - strategy: Sequential, }, ); @@ -1091,7 +1086,6 @@ mod tests { PAGE_SIZE, PAGE_CACHE_SIZE, ), - strategy: Sequential, }, ); diff --git a/consensus/src/marshal/coding/marshaled.rs b/consensus/src/marshal/coding/marshaled.rs index 6078868dede..878cd98be73 100644 --- a/consensus/src/marshal/coding/marshaled.rs +++ b/consensus/src/marshal/coding/marshaled.rs @@ -44,7 +44,6 @@ //! shards: shard_mailbox, //! scheme_provider, //! epocher, -//! strategy, //! }; //! let application = Marshaled::new(context, cfg); //! ``` @@ -105,10 +104,9 @@ use commonware_cryptography::{ }; use commonware_macros::select; use commonware_p2p::Recipients; -use commonware_parallel::Strategy; use commonware_runtime::{ telemetry::metrics::histogram::{Buckets, Timed}, - Clock, Metrics, Spawner, Storage, + Clock, Metrics, Spawner, Storage, Strategist, }; use commonware_utils::{ channel::{ @@ -132,13 +130,12 @@ const GENESIS_CODING_CONFIG: CodingConfig = CodingConfig { /// Configuration for initializing [`Marshaled`]. #[allow(clippy::type_complexity)] -pub struct MarshaledConfig +pub struct MarshaledConfig where B: CertifiableBlock::PublicKey>>, C: CodingScheme, H: Hasher, Z: Provider>, - S: Strategy, ES: Epocher, { /// The underlying application to wrap. @@ -150,8 +147,6 @@ where pub shards: shards::Mailbox::PublicKey>, /// Provider for signing schemes scoped by epoch. pub scheme_provider: Z, - /// Strategy for parallel operations. - pub strategy: S, /// Strategy for determining epoch boundaries. pub epocher: ES, } @@ -163,15 +158,14 @@ where /// re-proposing boundary blocks during epoch transitions. #[derive(Clone)] #[allow(clippy::type_complexity)] -pub struct Marshaled +pub struct Marshaled where - E: Rng + Storage + Spawner + Metrics + Clock, + E: Rng + Storage + Spawner + Metrics + Clock + Strategist, A: Application, B: CertifiableBlock::PublicKey>>, C: CodingScheme, H: Hasher, Z: Provider>, - S: Strategy, ES: Epocher, { context: E, @@ -180,7 +174,6 @@ where shards: shards::Mailbox::PublicKey>, scheme_provider: Z, epocher: ES, - strategy: S, verification_tasks: VerificationTasks, cached_genesis: Arc)>>, @@ -190,9 +183,9 @@ where erasure_encode_duration: Timed, } -impl Marshaled +impl Marshaled where - E: Rng + Storage + Spawner + Metrics + Clock, + E: Rng + Storage + Spawner + Metrics + Clock + Strategist, A: VerifyingApplication< E, Block = B, @@ -203,7 +196,6 @@ where C: CodingScheme, H: Hasher, Z: Provider>, - S: Strategy, ES: Epocher, { /// Creates a new [`Marshaled`] wrapper. @@ -211,13 +203,12 @@ where /// # Panics /// /// Panics if the marshal metadata store cannot be initialized. - pub fn new(context: E, cfg: MarshaledConfig) -> Self { + pub fn new(context: E, cfg: MarshaledConfig) -> Self { let MarshaledConfig { application, marshal, shards, scheme_provider, - strategy, epocher, } = cfg; @@ -261,7 +252,6 @@ where marshal, shards, scheme_provider, - strategy, epocher, verification_tasks: VerificationTasks::new(), cached_genesis: Arc::new(OnceLock::new()), @@ -432,9 +422,9 @@ where } } -impl Automaton for Marshaled +impl Automaton for Marshaled where - E: Rng + Storage + Spawner + Metrics + Clock, + E: Rng + Storage + Spawner + Metrics + Clock + Strategist, A: VerifyingApplication< E, Block = B, @@ -445,7 +435,6 @@ where C: CodingScheme, H: Hasher, Z: Provider>, - S: Strategy, ES: Epocher, { type Digest = Commitment; @@ -497,7 +486,6 @@ where let mut marshal = self.marshal.clone(); let mut application = self.application.clone(); let epocher = self.epocher.clone(); - let strategy = self.strategy.clone(); let cached_genesis = self.cached_genesis.clone(); // If there's no scheme for the current epoch, we cannot verify the proposal. @@ -651,7 +639,11 @@ where build_timer.observe(); let mut erasure_timer = erasure_encode_duration.timer(); - let coded_block = CodedBlock::::new(built_block, coding_config, &strategy); + let coded_block = runtime_context + .with_strategy(move |strategy| { + CodedBlock::::new(built_block, coding_config, strategy) + }) + .await; erasure_timer.observe(); let commitment = coded_block.commitment(); @@ -847,9 +839,9 @@ where } } -impl CertifiableAutomaton for Marshaled +impl CertifiableAutomaton for Marshaled where - E: Rng + Storage + Spawner + Metrics + Clock, + E: Rng + Storage + Spawner + Metrics + Clock + Strategist, A: VerifyingApplication< E, Block = B, @@ -860,7 +852,6 @@ where C: CodingScheme, H: Hasher, Z: Provider>, - S: Strategy, ES: Epocher, { async fn certify(&mut self, round: Round, payload: Self::Digest) -> oneshot::Receiver { @@ -966,9 +957,9 @@ where } } -impl Relay for Marshaled +impl Relay for Marshaled where - E: Rng + Storage + Spawner + Metrics + Clock, + E: Rng + Storage + Spawner + Metrics + Clock + Strategist, A: Application< E, Block = B, @@ -978,7 +969,6 @@ where C: CodingScheme, H: Hasher, Z: Provider>, - S: Strategy, ES: Epocher, { type Digest = Commitment; @@ -999,9 +989,9 @@ where } } -impl Reporter for Marshaled +impl Reporter for Marshaled where - E: Rng + Storage + Spawner + Metrics + Clock, + E: Rng + Storage + Spawner + Metrics + Clock + Strategist, A: Application< E, Block = B, @@ -1011,7 +1001,6 @@ where C: CodingScheme, H: Hasher, Z: Provider>, - S: Strategy, ES: Epocher, { type Activity = A::Activity; diff --git a/consensus/src/marshal/coding/mod.rs b/consensus/src/marshal/coding/mod.rs index 35c6d728074..9cdad7e31ce 100644 --- a/consensus/src/marshal/coding/mod.rs +++ b/consensus/src/marshal/coding/mod.rs @@ -420,7 +420,6 @@ mod tests { shards: shards.clone(), scheme_provider: ConstantProvider::new(schemes[0].clone()), epocher: FixedEpocher::new(BLOCKS_PER_EPOCH), - strategy: Sequential, }; let mut marshaled = Marshaled::new(context.clone(), cfg); @@ -541,7 +540,6 @@ mod tests { shards: shards.clone(), scheme_provider: ConstantProvider::new(schemes[0].clone()), epocher: FixedEpocher::new(BLOCKS_PER_EPOCH), - strategy: Sequential, }; let mut marshaled = Marshaled::new(context.clone(), cfg); @@ -757,7 +755,6 @@ mod tests { shards: shards.clone(), scheme_provider: ConstantProvider::new(schemes[0].clone()), epocher: FixedEpocher::new(BLOCKS_PER_EPOCH), - strategy: Sequential, }; let mut marshaled = Marshaled::new(context.clone(), cfg); @@ -847,7 +844,6 @@ mod tests { shards: shards.clone(), scheme_provider: ConstantProvider::new(schemes[0].clone()), epocher: FixedEpocher::new(BLOCKS_PER_EPOCH), - strategy: Sequential, }; let mut marshaled = Marshaled::new(context.clone(), cfg); @@ -924,7 +920,6 @@ mod tests { shards: shards.clone(), scheme_provider: ConstantProvider::new(schemes[0].clone()), epocher: FixedEpocher::new(BLOCKS_PER_EPOCH), - strategy: Sequential, }; let mut marshaled = Marshaled::new(context.clone(), cfg); @@ -1116,7 +1111,6 @@ mod tests { shards: shards.clone(), scheme_provider: ConstantProvider::new(schemes[0].clone()), epocher: limited_epocher, - strategy: Sequential, }; let mut marshaled = Marshaled::new(context.clone(), cfg); @@ -1218,7 +1212,6 @@ mod tests { shards: shards.clone(), scheme_provider: ConstantProvider::new(schemes[0].clone()), epocher: FixedEpocher::new(BLOCKS_PER_EPOCH), - strategy: Sequential, }; let mut marshaled = Marshaled::new(context.clone(), cfg); @@ -1384,7 +1377,6 @@ mod tests { shards: shards.clone(), scheme_provider: ConstantProvider::new(schemes[0].clone()), epocher: FixedEpocher::new(BLOCKS_PER_EPOCH), - strategy: Sequential, }; let mut marshaled = Marshaled::new(context.clone(), cfg); @@ -1506,7 +1498,6 @@ mod tests { shards: shards.clone(), scheme_provider: ConstantProvider::new(schemes[0].clone()), epocher: FixedEpocher::new(BLOCKS_PER_EPOCH), - strategy: Sequential, }; let mut marshaled = Marshaled::new(context.clone(), cfg); @@ -1705,7 +1696,6 @@ mod tests { shards: setup.extra, scheme_provider: EmptyProvider, epocher: FixedEpocher::new(BLOCKS_PER_EPOCH), - strategy: Sequential, }; let mut marshaled = Marshaled::new(context.clone(), cfg); @@ -1810,7 +1800,6 @@ mod tests { shards: shards.clone(), scheme_provider: ConstantProvider::new(schemes[0].clone()), epocher: FixedEpocher::new(BLOCKS_PER_EPOCH), - strategy: Sequential, }; let mut marshaled = Marshaled::new(context.clone(), cfg); @@ -1928,7 +1917,6 @@ mod tests { shards: shards.clone(), scheme_provider: ConstantProvider::new(schemes[0].clone()), epocher: FixedEpocher::new(BLOCKS_PER_EPOCH), - strategy: Sequential, }; let mut marshaled = Marshaled::new(context.clone(), cfg); @@ -2042,7 +2030,6 @@ mod tests { shards: shards.clone(), scheme_provider: ConstantProvider::new(schemes[0].clone()), epocher: FixedEpocher::new(BLOCKS_PER_EPOCH), - strategy: Sequential, }; let mut marshaled = Marshaled::new(context.clone(), cfg); @@ -2132,7 +2119,6 @@ mod tests { shards: shards.clone(), scheme_provider: ConstantProvider::new(schemes[0].clone()), epocher: FixedEpocher::new(BLOCKS_PER_EPOCH), - strategy: Sequential, }; let mut marshaled = Marshaled::new(context.clone(), cfg); diff --git a/consensus/src/marshal/coding/shards/engine.rs b/consensus/src/marshal/coding/shards/engine.rs index 104a18459bf..1f6426e75fd 100644 --- a/consensus/src/marshal/coding/shards/engine.rs +++ b/consensus/src/marshal/coding/shards/engine.rs @@ -163,7 +163,7 @@ use commonware_parallel::Strategy; use commonware_runtime::{ spawn_cell, telemetry::metrics::{histogram::HistogramExt, status::GaugeExt}, - BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, + BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Strategist, }; use commonware_utils::{ bitmap::BitMap, @@ -210,7 +210,7 @@ enum BlockSubscriptionKey { } /// Configuration for the [`Engine`]. -pub struct Config +pub struct Config where P: PublicKey, S: Provider, @@ -219,7 +219,6 @@ where C: CodingScheme, H: Hasher, B: CertifiableBlock, - T: Strategy, { /// The scheme provider. pub scheme_provider: S, @@ -233,9 +232,6 @@ where /// [`commonware_codec::Read`] configuration for decoding blocks. pub block_codec_cfg: B::Cfg, - /// The strategy used for parallel computation. - pub strategy: T, - /// The size of the mailbox buffer. pub mailbox_size: usize, @@ -269,9 +265,9 @@ where /// /// When enough [`Shard`]s are present in the mailbox, the [`Engine`] may facilitate /// reconstruction of the original [`CodedBlock`] and notify any subscribers waiting for it. -pub struct Engine +pub struct Engine where - E: BufferPooler + Rng + Spawner + Metrics + Clock, + E: BufferPooler + Rng + Spawner + Metrics + Clock + Strategist, S: Provider, S::Scheme: CertificateScheme, X: Blocker, @@ -280,7 +276,6 @@ where H: Hasher, B: CertifiableBlock, P: PublicKey, - T: Strategy, { /// Context held by the actor. context: ContextCell, @@ -300,9 +295,6 @@ where /// [`Read`] configuration for decoding [`CodedBlock`]s. block_codec_cfg: B::Cfg, - /// The strategy used for parallel shard verification. - strategy: T, - /// A map of [`Commitment`]s to [`ReconstructionState`]s. state: BTreeMap>, @@ -356,9 +348,9 @@ where metrics: ShardMetrics, } -impl Engine +impl Engine where - E: BufferPooler + Rng + Spawner + Metrics + Clock, + E: BufferPooler + Rng + Spawner + Metrics + Clock + Strategist, S: Provider, S::Scheme: CertificateScheme, X: Blocker, @@ -367,10 +359,9 @@ where H: Hasher, B: CertifiableBlock, P: PublicKey, - T: Strategy, { /// Create a new [`Engine`] with the given configuration. - pub fn new(context: E, config: Config) -> (Self, Mailbox) { + pub fn new(context: E, config: Config) -> (Self, Mailbox) { let metrics = ShardMetrics::new(&context); let (sender, mailbox) = mpsc::channel(config.mailbox_size); ( @@ -381,7 +372,6 @@ where blocker: config.blocker, shard_codec_cfg: config.shard_codec_cfg, block_codec_cfg: config.block_codec_cfg, - strategy: config.strategy, state: BTreeMap::new(), peer_buffers: BTreeMap::new(), peer_buffer_size: config.peer_buffer_size, @@ -415,6 +405,10 @@ where self.context.network_buffer_pool().clone(), sender, ); + let max_concurrency = self + .context + .with_strategy(|strategy| strategy.parallelism_hint()) + .await; let (receiver_service, mut receiver): (_, mpsc::Receiver<(P, Shard)>) = WrappedBackgroundReceiver::new( self.context.with_label("shard_ingress"), @@ -422,7 +416,7 @@ where self.shard_codec_cfg.clone(), self.blocker.clone(), self.background_channel_capacity, - &self.strategy, + max_concurrency, ); // Keep the handle alive to prevent the background receiver from being aborted. let _receiver_handle = receiver_service.start(); @@ -529,19 +523,13 @@ where continue; } - if let Some(state) = self.state.get_mut(&commitment) { - let round = state.round(); + if let Some(round) = self.state.get(&commitment).map(ReconstructionState::round) { let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else { warn!(%commitment, "no scheme for epoch, ignoring shard"); continue; }; - let progressed = state - .on_network_shard( - peer, - shard, - InsertCtx::new(scheme.as_ref(), &self.strategy), - &mut self.blocker, - ) + let progressed = self + .handle_network_shard(commitment, peer, shard, scheme) .await; if progressed { self.try_advance(&mut sender, commitment).await; @@ -569,6 +557,36 @@ where true } + async fn handle_network_shard( + &mut self, + commitment: Commitment, + peer: P, + shard: Shard, + scheme: Arc, + ) -> bool { + let Some(mut state) = self.state.remove(&commitment) else { + return false; + }; + let mut blocker = self.blocker.clone(); + let participants_len = u64::try_from(scheme.participants().len()) + .expect("participant count impossibly out of bounds"); + let progressed = state + .on_network_shard(peer, shard, scheme.as_ref(), &mut blocker) + .await; + if progressed { + state = Self::try_transition_state( + self.context.clone(), + commitment, + participants_len, + state, + &mut blocker, + ) + .await; + } + self.state.insert(commitment, state); + progressed + } + /// Attempts to reconstruct a [`CodedBlock`] from the checked [`Shard`]s present in the /// [`ReconstructionState`]. /// @@ -577,29 +595,37 @@ where /// - `Ok(None)` if reconstruction could not be attempted due to insufficient checked shards. /// - `Err(_)` if reconstruction was attempted but failed. #[allow(clippy::type_complexity)] - fn try_reconstruct( + async fn try_reconstruct( &mut self, commitment: Commitment, ) -> Result>>, Error> { if let Some(block) = self.reconstructed_blocks.get(&commitment) { return Ok(Some(Arc::clone(block))); } - let Some(state) = self.state.get_mut(&commitment) else { + let Some(state) = self.state.remove(&commitment) else { return Ok(None); }; if state.checked_shards().len() < usize::from(commitment.config().minimum_shards.get()) { debug!(%commitment, "not enough checked shards to reconstruct block"); + self.state.insert(commitment, state); return Ok(None); } // Attempt to reconstruct the encoded blob let start = self.context.current(); - let blob = C::decode( - &commitment.config(), - &commitment.root(), - state.checked_shards().iter(), - &self.strategy, - ) - .map_err(Error::Coding)?; + let (state, blob) = self + .context + .with_strategy(move |strategy| { + let blob = C::decode( + &commitment.config(), + &commitment.root(), + state.checked_shards().iter(), + strategy, + ); + (state, blob) + }) + .await; + self.state.insert(commitment, state); + let blob = blob.map_err(Error::Coding)?; self.metrics .erasure_decode_duration .observe_between(start, self.context.current()); @@ -722,27 +748,63 @@ where } } - let Some(state) = self.state.get_mut(&commitment) else { + let Some(mut state) = self.state.remove(&commitment) else { return false; }; let round = state.round(); let Some(scheme) = self.scheme_provider.scoped(round.epoch()) else { warn!(%commitment, "no scheme for epoch, dropping buffered shards"); + self.state.insert(commitment, state); return false; }; + let participants_len = u64::try_from(scheme.participants().len()) + .expect("participant count impossibly out of bounds"); // Ingest buffered shards into the active reconstruction state. Batch verification // will be triggered if there are enough shards to meet the quorum threshold. + let mut blocker = self.blocker.clone(); let mut progressed = false; - let ctx = InsertCtx::new(scheme.as_ref(), &self.strategy); for (peer, shard) in buffered { - progressed |= state - .on_network_shard(peer, shard, ctx, &mut self.blocker) + let shard_progressed = state + .on_network_shard(peer, shard, scheme.as_ref(), &mut blocker) .await; + if !shard_progressed { + continue; + } + progressed = true; + state = Self::try_transition_state( + self.context.clone(), + commitment, + participants_len, + state, + &mut blocker, + ) + .await; } + self.state.insert(commitment, state); progressed } + async fn try_transition_state( + context: ContextCell, + commitment: Commitment, + participants_len: u64, + state: ReconstructionState, + blocker: &mut X, + ) -> ReconstructionState { + let (state, to_block) = context + .with_strategy(move |strategy| { + let mut state = state; + let to_block = state.try_transition(commitment, participants_len, strategy); + (state, to_block) + }) + .await; + for peer in to_block { + commonware_p2p::block!(blocker, peer, "invalid shard received"); + } + state + } + /// Cache a block and notify all subscribers waiting on it. fn cache_block(&mut self, block: Arc>) { let commitment = block.commitment(); @@ -776,7 +838,13 @@ where return; }; - let shard_count = block.shards(&self.strategy).len(); + let (block, shard_count) = self + .context + .with_strategy(move |strategy| { + let shard_count = block.shards(strategy).len(); + (block, shard_count) + }) + .await; if shard_count != participants.len() { warn!( %commitment, @@ -872,7 +940,7 @@ where } } - match self.try_reconstruct(commitment) { + match self.try_reconstruct(commitment).await { Ok(Some(block)) => { // Do not prune other reconstruction state here. A Byzantine // leader can equivocate by proposing multiple commitments in @@ -1215,17 +1283,16 @@ where H: Hasher, { /// Check whether quorum is met and, if so, batch-validate all pending - /// shards in parallel. Returns `Some(ReadyState)` on successful transition. - async fn try_transition( + /// shards in parallel. Returns blocking actions and a ready-state transition when successful. + fn try_transition( &mut self, commitment: Commitment, participants_len: u64, strategy: &impl Strategy, - blocker: &mut impl Blocker, - ) -> Option> { + ) -> TransitionAttempt { let minimum = usize::from(commitment.config().minimum_shards.get()); if self.common.checked_shards.len() + self.pending_shards.len() < minimum { - return None; + return TransitionAttempt::default(); } // Batch-validate all pending weak shards in parallel. @@ -1241,16 +1308,16 @@ where (peer, checked.ok()) }); - for peer in to_block { - commonware_p2p::block!(blocker, peer, "invalid shard received"); - } for checked in new_checked { self.common.checked_shards.push(checked); } // After validation, some may have failed; recheck threshold. if self.common.checked_shards.len() < minimum { - return None; + return TransitionAttempt { + ready: None, + to_block, + }; } // Transition to Ready. @@ -1260,37 +1327,33 @@ where &mut self.common, CommonState::new(leader, round, participants_len), ); - Some(ReadyState { common }) + TransitionAttempt { + ready: Some(ReadyState { common }), + to_block, + } } } -/// Context required for processing incoming network shards. -struct InsertCtx<'a, Sch, S> +struct TransitionAttempt where - Sch: CertificateScheme, - S: Strategy, + P: PublicKey, + C: CodingScheme, + H: Hasher, { - scheme: &'a Sch, - strategy: &'a S, - participants_len: u64, -} - -impl Clone for InsertCtx<'_, Sch, S> { - fn clone(&self) -> Self { - *self - } + ready: Option>, + to_block: Vec

, } -impl Copy for InsertCtx<'_, Sch, S> {} - -impl<'a, Sch: CertificateScheme, S: Strategy> InsertCtx<'a, Sch, S> { - fn new(scheme: &'a Sch, strategy: &'a S) -> Self { - let participants_len = u64::try_from(scheme.participants().len()) - .expect("participant count impossibly out of bounds"); +impl Default for TransitionAttempt +where + P: PublicKey, + C: CodingScheme, + H: Hasher, +{ + fn default() -> Self { Self { - scheme, - strategy, - participants_len, + ready: None, + to_block: Vec::new(), } } } @@ -1398,19 +1461,18 @@ where /// engine level in bounded per-peer queues until /// [`Discovered`](super::Message::Discovered) creates a /// reconstruction state for this commitment. - async fn on_network_shard( + async fn on_network_shard( &mut self, sender: P, shard: Shard, - ctx: InsertCtx<'_, Sch, S>, + scheme: &Sch, blocker: &mut X, ) -> bool where Sch: CertificateScheme, - S: Strategy, X: Blocker, { - let Some(sender_index) = ctx.scheme.participants().index(&sender) else { + let Some(sender_index) = scheme.participants().index(&sender) else { commonware_p2p::block!(blocker, sender, "shard sent by non-participant"); return false; }; @@ -1423,7 +1485,7 @@ where // Determine expected index based on sender role. let is_from_leader = sender == self.common().leader; let expected_participant = if is_from_leader { - ctx.scheme.me().unwrap_or(sender_index) + scheme.me().unwrap_or(sender_index) } else { sender_index }; @@ -1461,25 +1523,8 @@ where if is_from_leader && !self.common().assigned_shard_verified { let progressed = self .common_mut() - .verify_assigned_shard( - sender, - commitment, - indexed, - ctx.scheme.me().is_some(), - blocker, - ) + .verify_assigned_shard(sender, commitment, indexed, scheme.me().is_some(), blocker) .await; - - if progressed { - if let Self::AwaitingQuorum(state) = self { - if let Some(ready) = state - .try_transition(commitment, ctx.participants_len, ctx.strategy, blocker) - .await - { - *self = Self::Ready(ready); - } - } - } return progressed; } @@ -1495,14 +1540,24 @@ where .insert(indexed.index, indexed.data.clone()); state.common.contributed.set(u64::from(indexed.index), true); state.pending_shards.insert(sender, indexed); - if let Some(ready) = state - .try_transition(commitment, ctx.participants_len, ctx.strategy, blocker) - .await - { + true + } + + fn try_transition( + &mut self, + commitment: Commitment, + participants_len: u64, + strategy: &impl Strategy, + ) -> Vec

{ + let Self::AwaitingQuorum(state) = self else { + return Vec::new(); + }; + let TransitionAttempt { ready, to_block } = + state.try_transition(commitment, participants_len, strategy); + if let Some(ready) = ready { *self = Self::Ready(ready); } - - true + to_block } } @@ -1656,9 +1711,9 @@ mod tests { type Prov = MultiEpochProvider; type NetworkSender = simulated::Sender; type D = simulated::Manager; - type ShardEngine = Engine; + type ShardEngine = Engine; type ChurningShardEngine = - Engine; + Engine; async fn assert_blocked(oracle: &O, blocker: &P, blocked: &P) { let blocked_peers = oracle.blocked().await.unwrap(); @@ -1806,7 +1861,6 @@ mod tests { maximum_shard_size: MAX_SHARD_SIZE, }, block_codec_cfg: (), - strategy: STRATEGY, mailbox_size: 1024, peer_buffer_size: NZUsize!(64), background_channel_capacity: 1024, @@ -1843,7 +1897,6 @@ mod tests { maximum_shard_size: MAX_SHARD_SIZE, }, block_codec_cfg: (), - strategy: STRATEGY, mailbox_size: 1024, peer_buffer_size: NZUsize!(64), background_channel_capacity: 1024, @@ -3746,14 +3799,13 @@ mod tests { let scheme_provider = MultiEpochProvider::single(scheme_epoch0).with_epoch(Epoch::new(1), scheme_epoch1); - let config: Config<_, _, _, _, C, _, _, _> = Config { + let config: Config<_, _, _, _, C, _, _> = Config { scheme_provider, blocker: receiver_control.clone(), shard_codec_cfg: CodecConfig { maximum_shard_size: MAX_SHARD_SIZE, }, block_codec_cfg: (), - strategy: STRATEGY, mailbox_size: 1024, peer_buffer_size: NZUsize!(64), background_channel_capacity: 1024, @@ -3880,14 +3932,13 @@ mod tests { // and `ingest_buffered_shards`). Leader-shard validation is the third. // Any additional lookup for epoch 0 churns to `None`. let broadcaster_provider = ChurningProvider::new(broadcaster_scheme, 3); - let broadcaster_config: Config<_, _, _, _, C, _, _, _> = Config { + let broadcaster_config: Config<_, _, _, _, C, _, _> = Config { scheme_provider: broadcaster_provider, blocker: broadcaster_control.clone(), shard_codec_cfg: CodecConfig { maximum_shard_size: MAX_SHARD_SIZE, }, block_codec_cfg: (), - strategy: STRATEGY, mailbox_size: 1024, peer_buffer_size: NZUsize!(64), background_channel_capacity: 1024, @@ -3903,14 +3954,13 @@ mod tests { private_keys[receiver_idx].clone(), ) .expect("signer scheme should be created"); - let receiver_config: Config<_, _, _, _, C, _, _, _> = Config { + let receiver_config: Config<_, _, _, _, C, _, _> = Config { scheme_provider: MultiEpochProvider::single(receiver_scheme), blocker: receiver_control.clone(), shard_codec_cfg: CodecConfig { maximum_shard_size: MAX_SHARD_SIZE, }, block_codec_cfg: (), - strategy: STRATEGY, mailbox_size: 1024, peer_buffer_size: NZUsize!(64), background_channel_capacity: 1024, @@ -4741,14 +4791,13 @@ mod tests { ) .expect("signer scheme should be created"); - let config: Config<_, _, _, _, C, _, _, _> = Config { + let config: Config<_, _, _, _, C, _, _> = Config { scheme_provider: MultiEpochProvider::single(scheme), blocker: receiver_control.clone(), shard_codec_cfg: CodecConfig { maximum_shard_size: MAX_SHARD_SIZE, }, block_codec_cfg: (), - strategy: STRATEGY, mailbox_size: 1024, peer_buffer_size: NZUsize!(64), background_channel_capacity: 1024, @@ -4852,14 +4901,13 @@ mod tests { ) .expect("signer scheme should be created"); - let config: Config<_, _, _, _, C, _, _, _> = Config { + let config: Config<_, _, _, _, C, _, _> = Config { scheme_provider: MultiEpochProvider::single(scheme), blocker: receiver_control, shard_codec_cfg: CodecConfig { maximum_shard_size: MAX_SHARD_SIZE, }, block_codec_cfg: (), - strategy: STRATEGY, mailbox_size: 16, peer_buffer_size: NZUsize!(4), background_channel_capacity: 16, @@ -4984,7 +5032,7 @@ mod tests { Scheme::signer(SCHEME_NAMESPACE, epoch1_set.clone(), receiver_key.clone()) .expect("epoch 1 signer scheme should be created"); - let config: Config<_, _, _, _, C, _, _, _> = Config { + let config: Config<_, _, _, _, C, _, _> = Config { scheme_provider: MultiEpochProvider::single(scheme_epoch0) .with_epoch(Epoch::new(1), scheme_epoch1), blocker: receiver_control.clone(), @@ -4992,7 +5040,6 @@ mod tests { maximum_shard_size: MAX_SHARD_SIZE, }, block_codec_cfg: (), - strategy: STRATEGY, mailbox_size: 1024, peer_buffer_size: NZUsize!(64), background_channel_capacity: 1024, @@ -5145,14 +5192,13 @@ mod tests { ) .expect("signer scheme should be created"); - let config: Config<_, _, _, _, C, _, _, _> = Config { + let config: Config<_, _, _, _, C, _, _> = Config { scheme_provider: MultiEpochProvider::single(scheme), blocker: receiver_control.clone(), shard_codec_cfg: CodecConfig { maximum_shard_size: MAX_SHARD_SIZE, }, block_codec_cfg: (), - strategy: STRATEGY, mailbox_size: 1024, peer_buffer_size: NZUsize!(64), background_channel_capacity: 1024, diff --git a/consensus/src/marshal/config.rs b/consensus/src/marshal/config.rs index a9a8c66010b..ead13d067b0 100644 --- a/consensus/src/marshal/config.rs +++ b/consensus/src/marshal/config.rs @@ -3,7 +3,6 @@ use crate::{ Block, }; use commonware_cryptography::certificate::Provider; -use commonware_parallel::Strategy; use commonware_runtime::buffer::paged::CacheRef; use std::num::{NonZeroU64, NonZeroUsize}; @@ -26,12 +25,11 @@ use std::num::{NonZeroU64, NonZeroUsize}; /// height passes a prune target. The last processed height can be /// derived from an `Update::Block` at height `H` as /// `H - max_pending_acks` (the maximum backlog of blocks the application can buffer). -pub struct Config +pub struct Config where B: Block, P: Provider, ES: Epocher, - T: Strategy, { /// Provider for epoch-specific signing schemes. /// @@ -79,7 +77,4 @@ where /// yet been acknowledged. Increasing this value allows the application /// to buffer work while marshal continues dispatching, hiding ack latency. pub max_pending_acks: NonZeroUsize, - - /// Strategy for parallel operations. - pub strategy: T, } diff --git a/consensus/src/marshal/core/actor.rs b/consensus/src/marshal/core/actor.rs index b8d9e23576c..2bb8a50bbdf 100644 --- a/consensus/src/marshal/core/actor.rs +++ b/consensus/src/marshal/core/actor.rs @@ -24,11 +24,10 @@ use commonware_cryptography::{ }; use commonware_macros::select_loop; use commonware_p2p::Recipients; -use commonware_parallel::Strategy; use commonware_resolver::Resolver; use commonware_runtime::{ spawn_cell, telemetry::metrics::status::GaugeExt, BufferPooler, Clock, ContextCell, Handle, - Metrics, Spawner, Storage, + Metrics, Spawner, Storage, Strategist, }; use commonware_storage::{ archive::Identifier as ArchiveID, @@ -71,6 +70,38 @@ enum PendingVerification { }, } +#[derive(Clone)] +enum OwnedVerification { + Notarized(Notarization), + Finalized(Finalization), +} + +impl OwnedVerification { + fn epoch(&self) -> Epoch { + match self { + Self::Notarized(notarization) => notarization.epoch(), + Self::Finalized(finalization) => finalization.epoch(), + } + } + + const fn as_subject_and_certificate(&self) -> (Subject<'_, D>, &S::Certificate) { + match self { + Self::Finalized(finalization) => ( + Subject::Finalize { + proposal: &finalization.proposal, + }, + &finalization.certificate, + ), + Self::Notarized(notarization) => ( + Subject::Notarize { + proposal: ¬arization.proposal, + }, + ¬arization.certificate, + ), + } + } +} + /// A pending acknowledgement from the application for a block at the contained height/commitment. #[pin_project] struct PendingAck { @@ -198,9 +229,9 @@ type BlockSubscriptionKeyFor = /// finalization for a block that is ahead of its current view, it will request the missing blocks /// from its peers. This ensures that the actor can catch up to the rest of the network if it falls /// behind. -pub struct Actor +pub struct Actor where - E: BufferPooler + CryptoRngCore + Spawner + Metrics + Clock + Storage, + E: BufferPooler + CryptoRngCore + Spawner + Metrics + Clock + Storage + Strategist, V: Variant, P: Provider>, FC: Certificates< @@ -210,7 +241,6 @@ where >, FB: Blocks, ES: Epocher, - T: Strategy, A: Acknowledgement, { // ---------- Context ---------- @@ -231,9 +261,6 @@ where max_repair: NonZeroUsize, // Codec configuration for block type block_codec_config: ::Cfg, - // Strategy for parallel operations - strategy: T, - // ---------- State ---------- // Last proposed block last_proposed_block: Option<(Round, V::Commitment, V::Block)>, @@ -265,9 +292,9 @@ where processed_height: Gauge, } -impl Actor +impl Actor where - E: BufferPooler + CryptoRngCore + Spawner + Metrics + Clock + Storage, + E: BufferPooler + CryptoRngCore + Spawner + Metrics + Clock + Storage + Strategist, V: Variant, P: Provider>, FC: Certificates< @@ -277,7 +304,6 @@ where >, FB: Blocks, ES: Epocher, - T: Strategy, A: Acknowledgement, { /// Create a new application actor. @@ -285,7 +311,7 @@ where context: E, finalizations_by_height: FC, finalized_blocks: FB, - config: Config, + config: Config, ) -> (Self, Mailbox, Height) { // Initialize cache let prunable_config = cache::Config { @@ -344,7 +370,6 @@ where view_retention_timeout: config.view_retention_timeout, max_repair: config.max_repair, block_codec_config: config.block_codec_config, - strategy: config.strategy, last_proposed_block: None, last_processed_round: Round::zero(), last_processed_height, @@ -1091,36 +1116,36 @@ where let certs: Vec<_> = delivers .iter() .map(|item| match item { - PendingVerification::Finalized { finalization, .. } => ( - Subject::Finalize { - proposal: &finalization.proposal, - }, - &finalization.certificate, - ), - PendingVerification::Notarized { notarization, .. } => ( - Subject::Notarize { - proposal: ¬arization.proposal, - }, - ¬arization.certificate, - ), + PendingVerification::Finalized { finalization, .. } => { + OwnedVerification::Finalized(finalization.clone()) + } + PendingVerification::Notarized { notarization, .. } => { + OwnedVerification::Notarized(notarization.clone()) + } }) .collect(); // Batch verify using the all-epoch verifier if available, otherwise // batch verify per epoch using scoped verifiers. let verified = if let Some(scheme) = self.provider.all() { - verify_certificates(&mut self.context, scheme.as_ref(), &certs, &self.strategy) + let certs = certs.clone(); + let mut rng = self.context.clone(); + self.context + .with_strategy(move |strategy| { + let cert_refs: Vec<_> = certs + .iter() + .map(OwnedVerification::as_subject_and_certificate) + .collect(); + verify_certificates(&mut rng, scheme.as_ref(), &cert_refs, strategy) + }) + .await } else { let mut verified = vec![false; delivers.len()]; // Group indices by epoch. let mut by_epoch: BTreeMap> = BTreeMap::new(); - for (i, item) in delivers.iter().enumerate() { - let epoch = match item { - PendingVerification::Notarized { notarization, .. } => notarization.epoch(), - PendingVerification::Finalized { finalization, .. } => finalization.epoch(), - }; - by_epoch.entry(epoch).or_default().push(i); + for (i, cert) in certs.iter().enumerate() { + by_epoch.entry(cert.epoch()).or_default().push(i); } // Batch verify each epoch group. @@ -1128,9 +1153,18 @@ where let Some(scheme) = self.provider.scoped(*epoch) else { continue; }; - let group: Vec<_> = indices.iter().map(|&i| certs[i]).collect(); - let results = - verify_certificates(&mut self.context, scheme.as_ref(), &group, &self.strategy); + let group: Vec<_> = indices.iter().map(|&i| certs[i].clone()).collect(); + let mut rng = self.context.clone(); + let results = self + .context + .with_strategy(move |strategy| { + let group_refs: Vec<_> = group + .iter() + .map(OwnedVerification::as_subject_and_certificate) + .collect(); + verify_certificates(&mut rng, scheme.as_ref(), &group_refs, strategy) + }) + .await; for (j, &idx) in indices.iter().enumerate() { verified[idx] = results[j]; } diff --git a/consensus/src/marshal/mocks/harness.rs b/consensus/src/marshal/mocks/harness.rs index f1782b4d37e..31a0bfab86a 100644 --- a/consensus/src/marshal/mocks/harness.rs +++ b/consensus/src/marshal/mocks/harness.rs @@ -1567,7 +1567,6 @@ impl TestHarness for StandardHarness { key_write_buffer: NZUsize!(1024), value_write_buffer: NZUsize!(1024), page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE), - strategy: Sequential, }; let control = oracle.control(validator.clone()); let backfill = control.register(1, TEST_QUOTA).await.unwrap(); @@ -1802,7 +1801,6 @@ impl TestHarness for StandardHarness { key_write_buffer: NZUsize!(1024), value_write_buffer: NZUsize!(1024), page_cache: page_cache.clone(), - strategy: Sequential, }; let backfill = control.register(0, TEST_QUOTA).await.unwrap(); @@ -2370,7 +2368,6 @@ impl TestHarness for CodingHarness { key_write_buffer: NZUsize!(1024), value_write_buffer: NZUsize!(1024), page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE), - strategy: Sequential, }; let control = oracle.control(validator.clone()); @@ -2469,14 +2466,13 @@ impl TestHarness for CodingHarness { .expect("failed to initialize finalized blocks archive"); info!(elapsed = ?start.elapsed(), "restored finalized blocks archive"); - let shard_config: shards::Config<_, _, _, _, _, Sha256, _, _> = shards::Config { + let shard_config: shards::Config<_, _, _, _, _, Sha256, _> = shards::Config { scheme_provider: provider.clone(), blocker: oracle.control(validator.clone()), shard_codec_cfg: CodecConfig { maximum_shard_size: 1024 * 1024, }, block_codec_cfg: (), - strategy: Sequential, mailbox_size: 10, peer_buffer_size: NZUsize!(64), background_channel_capacity: 1024, @@ -2638,7 +2634,6 @@ impl TestHarness for CodingHarness { key_write_buffer: NZUsize!(1024), value_write_buffer: NZUsize!(1024), page_cache: page_cache.clone(), - strategy: Sequential, }; let backfill = control.register(0, TEST_QUOTA).await.unwrap(); @@ -2655,14 +2650,13 @@ impl TestHarness for CodingHarness { }; let resolver = resolver::init(&context, resolver_cfg, backfill); - let shard_config: shards::Config<_, _, _, _, _, Sha256, _, _> = shards::Config { + let shard_config: shards::Config<_, _, _, _, _, Sha256, _> = shards::Config { scheme_provider: provider.clone(), blocker: oracle.control(validator.clone()), shard_codec_cfg: CodecConfig { maximum_shard_size: 1024 * 1024, }, block_codec_cfg: (), - strategy: Sequential, mailbox_size: 10, peer_buffer_size: NZUsize!(64), background_channel_capacity: 1024, diff --git a/consensus/src/marshal/standard/mod.rs b/consensus/src/marshal/standard/mod.rs index c3f279a71f9..ddc137eff05 100644 --- a/consensus/src/marshal/standard/mod.rs +++ b/consensus/src/marshal/standard/mod.rs @@ -79,7 +79,6 @@ mod tests { simulated::{self, Network}, Recipients, }; - use commonware_parallel::Sequential; use commonware_resolver::Resolver; use commonware_runtime::{ buffer::paged::CacheRef, deterministic, Clock, Metrics, Quota, Runner, @@ -1785,7 +1784,6 @@ mod tests { key_write_buffer: NZUsize!(1024), value_write_buffer: NZUsize!(1024), page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE), - strategy: Sequential, }; let finalizations_by_height = immutable::Archive::init( context.with_label("finalizations_by_height"), @@ -1900,7 +1898,6 @@ mod tests { key_write_buffer: NZUsize!(1024), value_write_buffer: NZUsize!(1024), page_cache: page_cache.clone(), - strategy: Sequential, }; let finalizations_by_height = prunable::Archive::init( context.with_label("finalizations_by_height"), diff --git a/consensus/src/ordered_broadcast/config.rs b/consensus/src/ordered_broadcast/config.rs index 87a2a83ff34..57ccaa20325 100644 --- a/consensus/src/ordered_broadcast/config.rs +++ b/consensus/src/ordered_broadcast/config.rs @@ -4,7 +4,6 @@ use crate::{ Automaton, Monitor, Relay, Reporter, }; use commonware_cryptography::{certificate::Provider, Digest, Signer}; -use commonware_parallel::Strategy; use commonware_runtime::buffer::paged::CacheRef; use std::{ num::{NonZeroU64, NonZeroUsize}, @@ -21,7 +20,6 @@ pub struct Config< R: Relay, Z: Reporter>, M: Monitor, - T: Strategy, > { /// The signer used when this engine acts as a sequencer. /// @@ -96,7 +94,4 @@ pub struct Config< /// Page cache for the journal. pub journal_page_cache: CacheRef, - - /// Strategy for parallel operations. - pub strategy: T, } diff --git a/consensus/src/ordered_broadcast/engine.rs b/consensus/src/ordered_broadcast/engine.rs index 75fb05b2e12..06c30e1fef6 100644 --- a/consensus/src/ordered_broadcast/engine.rs +++ b/consensus/src/ordered_broadcast/engine.rs @@ -29,7 +29,6 @@ use commonware_p2p::{ utils::codec::{wrap, WrappedSender}, Receiver, Recipients, Sender, }; -use commonware_parallel::Strategy; use commonware_runtime::{ buffer::paged::CacheRef, spawn_cell, @@ -37,7 +36,7 @@ use commonware_runtime::{ histogram, status::{CounterExt, GaugeExt, Status}, }, - BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage, + BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage, Strategist, }; use commonware_storage::journal::segmented::variable::{Config as JournalConfig, Journal}; use commonware_utils::{channel::oneshot, futures::Pool as FuturesPool, ordered::Quorum}; @@ -63,7 +62,7 @@ struct Verify { /// Instance of the engine. pub struct Engine< - E: BufferPooler + Clock + Spawner + CryptoRngCore + Storage + Metrics, + E: BufferPooler + Clock + Spawner + CryptoRngCore + Storage + Metrics + Strategist, C: Signer, S: SequencersProvider, P: Provider>, @@ -72,7 +71,6 @@ pub struct Engine< R: Relay, Z: Reporter>, M: Monitor, - T: Strategy, > { //////////////////////////////////////// // Interfaces @@ -85,7 +83,6 @@ pub struct Engine< relay: R, monitor: M, reporter: Z, - strategy: T, //////////////////////////////////////// // Namespace Constants @@ -201,7 +198,7 @@ pub struct Engine< } impl< - E: BufferPooler + Clock + Spawner + CryptoRngCore + Storage + Metrics, + E: BufferPooler + Clock + Spawner + CryptoRngCore + Storage + Metrics + Strategist, C: Signer, S: SequencersProvider, P: Provider>, @@ -210,11 +207,10 @@ impl< R: Relay, Z: Reporter>, M: Monitor, - T: Strategy, - > Engine + > Engine { /// Creates a new engine with the given context and configuration. - pub fn new(context: E, cfg: Config) -> Self { + pub fn new(context: E, cfg: Config) -> Self { // TODO(#1833): Metrics should use the post-start context let metrics = metrics::Metrics::init(context.clone()); @@ -227,7 +223,6 @@ impl< relay: cfg.relay, reporter: cfg.reporter, monitor: cfg.monitor, - strategy: cfg.strategy, chunk_verifier: cfg.chunk_verifier, rebroadcast_timeout: cfg.rebroadcast_timeout, rebroadcast_deadline: None, @@ -404,7 +399,7 @@ impl< continue; } }; - let result = match self.validate_node(&node, &sender) { + let result = match self.validate_node(&node, &sender).await { Ok(result) => result, Err(err) => { debug!(?err, ?sender, "node validate failed"); @@ -453,7 +448,7 @@ impl< continue; } }; - if let Err(err) = self.validate_ack(&ack, &sender) { + if let Err(err) = self.validate_ack(&ack, &sender).await { debug!(?err, ?sender, "ack validate failed"); continue; }; @@ -627,10 +622,18 @@ impl< }; // Add the vote. If a new certificate is formed, handle it. - if let Some(certificate) = self - .ack_manager - .add_ack(ack, scheme.as_ref(), &self.strategy) - { + let mut ack_manager = std::mem::replace(&mut self.ack_manager, AckManager::new()); + let ack = ack.clone(); + let ack_for_assembly = ack.clone(); + let (ack_manager, certificate) = self + .context + .with_strategy(move |strategy| { + let certificate = ack_manager.add_ack(&ack_for_assembly, scheme.as_ref(), strategy); + (ack_manager, certificate) + }) + .await; + self.ack_manager = ack_manager; + if let Some(certificate) = certificate { debug!(epoch = %ack.epoch, sequencer = ?ack.chunk.sequencer, height = %ack.chunk.height, "recovered certificate"); self.metrics.certificates.inc(); self.handle_certificate(&ack.chunk, ack.epoch, certificate) @@ -879,7 +882,7 @@ impl< /// If valid (and not already the tracked tip for the sender), returns the implied /// parent chunk and its certificate. /// Else returns an error if the `Node` is invalid. - fn validate_node( + async fn validate_node( &mut self, node: &Node, sender: &C::PublicKey, @@ -901,19 +904,22 @@ impl< self.validate_chunk(&node.chunk, self.epoch)?; // Verify the node - node.verify( - &mut self.context, - &self.chunk_verifier, - &self.validators_provider, - &self.strategy, - ) + let node = node.clone(); + let chunk_verifier = self.chunk_verifier.clone(); + let validators_provider = self.validators_provider.clone(); + let mut rng = self.context.clone(); + self.context + .with_strategy(move |strategy| { + node.verify(&mut rng, &chunk_verifier, &validators_provider, strategy) + }) + .await } /// Takes a raw ack (from sender) from the p2p network and validates it. /// /// Returns the chunk, epoch, and vote if the ack is valid. /// Returns an error if the ack is invalid. - fn validate_ack( + async fn validate_ack( &mut self, ack: &Ack, sender: &::PublicKey, @@ -963,7 +969,13 @@ impl< } // Validate the vote signature - if !ack.verify(&mut self.context, scheme.as_ref(), &self.strategy) { + let ack = ack.clone(); + let mut rng = self.context.clone(); + let valid = self + .context + .with_strategy(move |strategy| ack.verify(&mut rng, scheme.as_ref(), strategy)) + .await; + if !valid { return Err(Error::InvalidAckSignature); } diff --git a/consensus/src/ordered_broadcast/mod.rs b/consensus/src/ordered_broadcast/mod.rs index 841a9b56cad..37a4e6eaf8a 100644 --- a/consensus/src/ordered_broadcast/mod.rs +++ b/consensus/src/ordered_broadcast/mod.rs @@ -92,7 +92,6 @@ mod tests { }; use commonware_macros::{select, test_group, test_traced}; use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender}; - use commonware_parallel::Sequential; use commonware_runtime::{ buffer::paged::CacheRef, deterministic::{self, Context}, @@ -265,7 +264,6 @@ mod tests { journal_name_prefix: format!("ordered-broadcast-seq-{validator}-"), journal_compression: Some(3), journal_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE), - strategy: Sequential, }, ); @@ -801,7 +799,6 @@ mod tests { PAGE_SIZE, PAGE_CACHE_SIZE, ), - strategy: Sequential, }, ); @@ -965,7 +962,6 @@ mod tests { PAGE_SIZE, PAGE_CACHE_SIZE, ), - strategy: Sequential, }, ); @@ -1023,7 +1019,6 @@ mod tests { PAGE_SIZE, PAGE_CACHE_SIZE, ), - strategy: Sequential, }, ); diff --git a/consensus/src/simplex/actors/batcher/actor.rs b/consensus/src/simplex/actors/batcher/actor.rs index 46c61ee4cec..fc6c4f5c5de 100644 --- a/consensus/src/simplex/actors/batcher/actor.rs +++ b/consensus/src/simplex/actors/batcher/actor.rs @@ -6,7 +6,7 @@ use crate::{ interesting, metrics::{Inbound, Peer, TimeoutReason}, scheme::Scheme, - types::{Activity, Certificate, Proposal, Vote}, + types::{Activity, Certificate, Finalization, Notarization, Nullification, Proposal, Vote}, Plan, }, types::{Epoch, Participant, View, ViewDelta}, @@ -15,14 +15,13 @@ use crate::{ use commonware_cryptography::Digest; use commonware_macros::select_loop; use commonware_p2p::{utils::codec::WrappedReceiver, Blocker, Receiver, Recipients}; -use commonware_parallel::Strategy; use commonware_runtime::{ spawn_cell, telemetry::metrics::{ histogram::{self, Buckets}, status::GaugeExt, }, - Clock, ContextCell, Handle, Metrics, Spawner, + Clock, ContextCell, Handle, Metrics, Spawner, Strategist, }; use commonware_utils::{ channel::{fallible::OneshotExt, mpsc}, @@ -43,15 +42,14 @@ struct Current { timed_out: bool, } -pub struct Actor +pub struct Actor where - E: Spawner + Metrics + Clock + CryptoRngCore, + E: Spawner + Metrics + Clock + CryptoRngCore + Strategist, S: Scheme, B: Blocker, D: Digest, Re: Reporter>, Rl: Relay, - T: Strategy, { context: ContextCell, @@ -61,7 +59,6 @@ where blocker: B, reporter: Re, relay: Rl, - strategy: T, activity_timeout: ViewDelta, skip_timeout: ViewDelta, @@ -80,17 +77,16 @@ where recover_latency: histogram::Timed, } -impl Actor +impl Actor where - E: Spawner + Metrics + Clock + CryptoRngCore, + E: Spawner + Metrics + Clock + CryptoRngCore + Strategist, S: Scheme, B: Blocker, D: Digest, Re: Reporter>, Rl: Relay>, - T: Strategy, { - pub fn new(context: E, cfg: Config) -> (Self, Mailbox) { + pub fn new(context: E, cfg: Config) -> (Self, Mailbox) { let participants = cfg.scheme.participants().clone(); let participant_count = participants.len(); let added = Counter::default(); @@ -148,7 +144,6 @@ where blocker: cfg.blocker, reporter: cfg.reporter, relay: cfg.relay, - strategy: cfg.strategy, activity_timeout: cfg.activity_timeout, skip_timeout: cfg.skip_timeout, @@ -179,6 +174,90 @@ where ) } + async fn verify_certificate( + context: ContextCell, + scheme: S, + certificate: Certificate, + ) -> bool { + let mut rng = context.clone(); + context + .with_strategy(move |strategy| match certificate { + Certificate::Notarization(notarization) => { + notarization.verify(&mut rng, &scheme, strategy) + } + Certificate::Nullification(nullification) => { + nullification.verify::<_, D>(&mut rng, &scheme, strategy) + } + Certificate::Finalization(finalization) => { + finalization.verify(&mut rng, &scheme, strategy) + } + }) + .await + } + + async fn process_round( + context: ContextCell, + mut round: Round, + ) -> ( + Round, + Option<(Vec>, Vec)>, + ) { + let mut rng = context.clone(); + context + .with_strategy(move |strategy| { + let verified = if round.ready_notarizes() { + Some(round.verify_notarizes(&mut rng, strategy)) + } else if round.ready_nullifies() { + Some(round.verify_nullifies(&mut rng, strategy)) + } else if round.ready_finalizes() { + Some(round.verify_finalizes(&mut rng, strategy)) + } else { + None + }; + (round, verified) + }) + .await + } + + async fn construct_notarization( + context: ContextCell, + scheme: S, + mut round: Round, + ) -> (Round, Option>) { + context + .with_strategy(move |strategy| { + let notarization = round.try_construct_notarization(&scheme, strategy); + (round, notarization) + }) + .await + } + + async fn construct_nullification( + context: ContextCell, + scheme: S, + mut round: Round, + ) -> (Round, Option>) { + context + .with_strategy(move |strategy| { + let nullification = round.try_construct_nullification(&scheme, strategy); + (round, nullification) + }) + .await + } + + async fn construct_finalization( + context: ContextCell, + scheme: S, + mut round: Round, + ) -> (Round, Option>) { + context + .with_strategy(move |strategy| { + let finalization = round.try_construct_finalization(&scheme, strategy); + (round, finalization) + }) + .await + } + /// Records the latest view message received from a participant. /// /// This mechanism is not resistant to malicious validators (nor is @@ -424,7 +503,13 @@ where } // Verify the certificate - if !notarization.verify(&mut self.context, &self.scheme, &self.strategy) { + if !Self::verify_certificate( + self.context.clone(), + self.scheme.clone(), + Certificate::Notarization(notarization.clone()), + ) + .await + { commonware_p2p::block!(self.blocker, sender, %view, "invalid notarization"); continue; } @@ -445,11 +530,13 @@ where } // Verify the certificate - if !nullification.verify::<_, D>( - &mut self.context, - &self.scheme, - &self.strategy, - ) { + if !Self::verify_certificate( + self.context.clone(), + self.scheme.clone(), + Certificate::Nullification(nullification.clone()), + ) + .await + { commonware_p2p::block!(self.blocker, sender, %view, "invalid nullification"); continue; } @@ -470,7 +557,13 @@ where } // Verify the certificate - if !finalization.verify(&mut self.context, &self.scheme, &self.strategy) { + if !Self::verify_certificate( + self.context.clone(), + self.scheme.clone(), + Certificate::Finalization(finalization.clone()), + ) + .await + { commonware_p2p::block!(self.blocker, sender, %view, "invalid finalization"); continue; } @@ -570,26 +663,23 @@ where } // Process the updated view (if any) - let Some(round) = work.get_mut(&updated_view) else { + let Some(round) = work.remove(&updated_view) else { continue; }; - // Batch verify votes if ready - let mut timer = self.verify_latency.timer(); - let verified = if round.ready_notarizes() { - Some(round.verify_notarizes(&mut self.context, &self.strategy)) - } else if round.ready_nullifies() { - Some(round.verify_nullifies(&mut self.context, &self.strategy)) - } else if round.ready_finalizes() { - Some(round.verify_finalizes(&mut self.context, &self.strategy)) - } else { - None + let (mut round, verified) = { + let mut timer = self.verify_latency.timer(); + let result = Self::process_round(self.context.clone(), round).await; + if result.1.is_some() { + timer.observe(); + } else { + timer.cancel(); + } + result }; // Process batch verification results if let Some((voters, failed)) = verified { - timer.observe(); - // Process verified votes let batch = voters.len() + failed.len(); trace!(view = %updated_view, batch, "batch verified votes"); @@ -612,7 +702,6 @@ where round.add_verified(valid); } } else { - timer.cancel(); trace!( current = %current.view, %finalized, @@ -620,11 +709,54 @@ where ); } + let (round, notarization) = { + let mut timer = self.recover_latency.timer(); + let result = Self::construct_notarization( + self.context.clone(), + self.scheme.clone(), + round, + ) + .await; + if result.1.is_some() { + timer.observe(); + } else { + timer.cancel(); + } + result + }; + let (round, nullification) = { + let mut timer = self.recover_latency.timer(); + let result = Self::construct_nullification( + self.context.clone(), + self.scheme.clone(), + round, + ) + .await; + if result.1.is_some() { + timer.observe(); + } else { + timer.cancel(); + } + result + }; + let (round, finalization) = { + let mut timer = self.recover_latency.timer(); + let result = Self::construct_finalization( + self.context.clone(), + self.scheme.clone(), + round, + ) + .await; + if result.1.is_some() { + timer.observe(); + } else { + timer.cancel(); + } + result + }; + // Try to construct and forward certificates - if let Some(notarization) = self - .recover_latency - .time_some(|| round.try_construct_notarization(&self.scheme, &self.strategy)) - { + if let Some(notarization) = notarization { debug!(view = %updated_view, "constructed notarization, forwarding to voter"); // Forward notarization to voter @@ -632,25 +764,21 @@ where .recovered(Certificate::Notarization(notarization)) .await; } - if let Some(nullification) = self - .recover_latency - .time_some(|| round.try_construct_nullification(&self.scheme, &self.strategy)) - { + if let Some(nullification) = nullification { debug!(view = %updated_view, "constructed nullification, forwarding to voter"); voter .recovered(Certificate::Nullification(nullification)) .await; } - if let Some(finalization) = self - .recover_latency - .time_some(|| round.try_construct_finalization(&self.scheme, &self.strategy)) - { + if let Some(finalization) = finalization { debug!(view = %updated_view, "constructed finalization, forwarding to voter"); voter .recovered(Certificate::Finalization(finalization)) .await; } + work.insert(updated_view, round); + // Drop any rounds that are no longer interesting while work.first_key_value().is_some_and(|(&view, _)| { !interesting(self.activity_timeout, finalized, current.view, view, false) diff --git a/consensus/src/simplex/actors/batcher/mod.rs b/consensus/src/simplex/actors/batcher/mod.rs index 6c52e1eab36..33c0bd2c5ad 100644 --- a/consensus/src/simplex/actors/batcher/mod.rs +++ b/consensus/src/simplex/actors/batcher/mod.rs @@ -11,21 +11,17 @@ use crate::{ pub use actor::Actor; use commonware_cryptography::certificate::Scheme; use commonware_p2p::Blocker; -use commonware_parallel::Strategy; pub use ingress::{Mailbox, Message}; pub use round::Round; pub use verifier::Verifier; -pub struct Config { +pub struct Config { pub scheme: S, pub blocker: B, pub reporter: Re, pub relay: Rl, - /// Strategy for parallel operations. - pub strategy: T, - pub activity_timeout: ViewDelta, pub skip_timeout: ViewDelta, pub epoch: Epoch, @@ -235,7 +231,6 @@ mod tests { blocker: oracle.control(me.clone()), reporter: reporter.clone(), relay: MockRelay::new(), - strategy: Sequential, activity_timeout: ViewDelta::new(10), skip_timeout: ViewDelta::new(5), epoch, @@ -407,7 +402,6 @@ mod tests { blocker: oracle.control(me.clone()), reporter: reporter.clone(), relay: MockRelay::new(), - strategy: Sequential, activity_timeout: ViewDelta::new(10), skip_timeout: ViewDelta::new(5), epoch, @@ -572,7 +566,6 @@ mod tests { blocker: oracle.control(me.clone()), reporter: reporter.clone(), relay: relay.clone(), - strategy: Sequential, activity_timeout: ViewDelta::new(10), skip_timeout: ViewDelta::new(5), epoch, @@ -722,7 +715,6 @@ mod tests { blocker: oracle.control(me.clone()), reporter: reporter.clone(), relay: relay.clone(), - strategy: Sequential, activity_timeout: ViewDelta::new(10), skip_timeout: ViewDelta::new(5), epoch, @@ -905,7 +897,6 @@ mod tests { blocker: oracle.control(me.clone()), reporter: reporter.clone(), relay: relay.clone(), - strategy: Sequential, activity_timeout: ViewDelta::new(10), skip_timeout: ViewDelta::new(5), epoch, @@ -1147,7 +1138,6 @@ mod tests { blocker: oracle.control(me.clone()), reporter: reporter.clone(), relay: relay.clone(), - strategy: Sequential, activity_timeout: ViewDelta::new(10), skip_timeout: ViewDelta::new(5), epoch, @@ -1376,7 +1366,6 @@ mod tests { blocker: oracle.control(me.clone()), reporter: reporter.clone(), relay: relay.clone(), - strategy: Sequential, activity_timeout: ViewDelta::new(10), skip_timeout: ViewDelta::new(5), epoch, @@ -1556,7 +1545,6 @@ mod tests { blocker: oracle.control(me.clone()), reporter: reporter.clone(), relay: relay.clone(), - strategy: Sequential, activity_timeout: ViewDelta::new(10), skip_timeout: ViewDelta::new(5), epoch, @@ -1786,7 +1774,6 @@ mod tests { blocker: oracle.control(me.clone()), reporter: reporter.clone(), relay: relay.clone(), - strategy: Sequential, activity_timeout: ViewDelta::new(10), skip_timeout: ViewDelta::new(5), epoch, @@ -1996,7 +1983,6 @@ mod tests { blocker: oracle.control(me.clone()), reporter: reporter.clone(), relay: MockRelay::new(), - strategy: Sequential, activity_timeout: ViewDelta::new(10), skip_timeout: ViewDelta::new(5), epoch, @@ -2195,7 +2181,6 @@ mod tests { blocker: oracle.control(me.clone()), reporter: reporter.clone(), relay: MockRelay::new(), - strategy: Sequential, activity_timeout: ViewDelta::new(10), skip_timeout: ViewDelta::new(5), epoch, @@ -2405,7 +2390,6 @@ mod tests { blocker: oracle.control(me.clone()), reporter: reporter.clone(), relay: MockRelay::new(), - strategy: Sequential, activity_timeout: ViewDelta::new(10), skip_timeout: ViewDelta::new(5), epoch, @@ -2532,7 +2516,6 @@ mod tests { blocker: oracle.control(me.clone()), reporter: reporter.clone(), relay: MockRelay::new(), - strategy: Sequential, activity_timeout: ViewDelta::new(10), skip_timeout: ViewDelta::new(5), epoch, @@ -2662,7 +2645,6 @@ mod tests { blocker: oracle.control(me.clone()), reporter: reporter.clone(), relay: MockRelay::new(), - strategy: Sequential, activity_timeout: ViewDelta::new(10), skip_timeout: ViewDelta::new(skip_timeout), epoch, @@ -2813,7 +2795,6 @@ mod tests { blocker: oracle.control(me.clone()), reporter: reporter.clone(), relay: MockRelay::new(), - strategy: Sequential, activity_timeout: ViewDelta::new(10), skip_timeout: ViewDelta::new(skip_timeout), epoch, @@ -2942,7 +2923,6 @@ mod tests { blocker: oracle.control(me.clone()), reporter: reporter.clone(), relay: MockRelay::new(), - strategy: Sequential, activity_timeout: ViewDelta::new(10), skip_timeout: ViewDelta::new(skip_timeout), epoch, @@ -3083,7 +3063,6 @@ mod tests { blocker: oracle.control(me.clone()), reporter: reporter.clone(), relay: MockRelay::new(), - strategy: Sequential, activity_timeout: ViewDelta::new(10), skip_timeout: ViewDelta::new(5), epoch, @@ -3212,7 +3191,6 @@ mod tests { blocker: oracle.control(me.clone()), reporter: reporter.clone(), relay: MockRelay::new(), - strategy: Sequential, activity_timeout: ViewDelta::new(10), skip_timeout: ViewDelta::new(5), epoch, @@ -3342,7 +3320,6 @@ mod tests { blocker: oracle.control(me.clone()), reporter: reporter.clone(), relay: MockRelay::new(), - strategy: Sequential, activity_timeout: ViewDelta::new(10), skip_timeout: ViewDelta::new(5), epoch, @@ -3541,7 +3518,6 @@ mod tests { blocker: oracle.control(me.clone()), reporter: reporter.clone(), relay: MockRelay::new(), - strategy: Sequential, activity_timeout: ViewDelta::new(10), skip_timeout: ViewDelta::new(5), epoch, @@ -3787,7 +3763,6 @@ mod tests { blocker: oracle.control(me.clone()), reporter: reporter.clone(), relay: MockRelay::new(), - strategy: Sequential, activity_timeout: ViewDelta::new(10), skip_timeout: ViewDelta::new(5), epoch, @@ -3999,7 +3974,6 @@ mod tests { blocker: oracle.control(me.clone()), reporter: reporter.clone(), relay: MockRelay::new(), - strategy: Sequential, activity_timeout: ViewDelta::new(10), skip_timeout: ViewDelta::new(5), epoch, diff --git a/consensus/src/simplex/actors/resolver/actor.rs b/consensus/src/simplex/actors/resolver/actor.rs index 6e66639ee0b..6def614acc1 100644 --- a/consensus/src/simplex/actors/resolver/actor.rs +++ b/consensus/src/simplex/actors/resolver/actor.rs @@ -16,9 +16,10 @@ use commonware_codec::{Decode, Encode}; use commonware_cryptography::Digest; use commonware_macros::select_loop; use commonware_p2p::{utils::StaticProvider, Blocker, Receiver, Sender}; -use commonware_parallel::Strategy; use commonware_resolver::p2p; -use commonware_runtime::{spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner}; +use commonware_runtime::{ + spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Strategist, +}; use commonware_utils::{ channel::{fallible::OneshotExt, mpsc}, ordered::Quorum, @@ -30,16 +31,14 @@ use tracing::debug; /// Requests are made concurrently to multiple peers. pub struct Actor< - E: BufferPooler + Clock + CryptoRngCore + Metrics + Spawner, + E: BufferPooler + Clock + CryptoRngCore + Metrics + Spawner + Strategist, S: Scheme, B: Blocker, D: Digest, - T: Strategy, > { context: ContextCell, scheme: S, blocker: Option, - strategy: T, epoch: Epoch, mailbox_size: usize, @@ -51,21 +50,19 @@ pub struct Actor< } impl< - E: BufferPooler + Clock + CryptoRngCore + Metrics + Spawner, + E: BufferPooler + Clock + CryptoRngCore + Metrics + Spawner + Strategist, S: Scheme, B: Blocker, D: Digest, - T: Strategy, - > Actor + > Actor { - pub fn new(context: E, cfg: Config) -> (Self, Mailbox) { + pub fn new(context: E, cfg: Config) -> (Self, Mailbox) { let (sender, receiver) = mpsc::channel(cfg.mailbox_size); ( Self { context: ContextCell::new(context), scheme: cfg.scheme, blocker: Some(cfg.blocker), - strategy: cfg.strategy, epoch: cfg.epoch, mailbox_size: cfg.mailbox_size, @@ -151,7 +148,7 @@ impl< } /// Validates an incoming message, returning the parsed message if valid. - fn validate(&mut self, view: View, data: Bytes) -> Option> { + async fn validate(&mut self, view: View, data: Bytes) -> Option> { // Decode message let incoming = Certificate::::decode_cfg(data, &self.scheme.certificate_codec_config()).ok()?; @@ -179,7 +176,14 @@ impl< ); return None; } - if !notarization.verify(&mut self.context, &self.scheme, &self.strategy) { + let scheme = self.scheme.clone(); + let verification = notarization.clone(); + let mut rng = self.context.clone(); + if !self + .context + .with_strategy(move |strategy| verification.verify(&mut rng, &scheme, strategy)) + .await + { debug!(%view, "notarization failed verification"); return None; } @@ -199,7 +203,14 @@ impl< ); return None; } - if !finalization.verify(&mut self.context, &self.scheme, &self.strategy) { + let scheme = self.scheme.clone(); + let verification = finalization.clone(); + let mut rng = self.context.clone(); + if !self + .context + .with_strategy(move |strategy| verification.verify(&mut rng, &scheme, strategy)) + .await + { debug!(%view, "finalization failed verification"); return None; } @@ -219,7 +230,16 @@ impl< ); return None; } - if !nullification.verify::<_, D>(&mut self.context, &self.scheme, &self.strategy) { + let scheme = self.scheme.clone(); + let verification = nullification.clone(); + let mut rng = self.context.clone(); + if !self + .context + .with_strategy(move |strategy| { + verification.verify::<_, D>(&mut rng, &scheme, strategy) + }) + .await + { debug!(%view, "nullification failed verification"); return None; } @@ -243,7 +263,7 @@ impl< response, } => { // Validate incoming message - let Some(parsed) = self.validate(view, data) else { + let Some(parsed) = self.validate(view, data).await else { // Resolver will block any peers that send invalid responses, so // we don't need to do again here response.send_lossy(false); diff --git a/consensus/src/simplex/actors/resolver/mod.rs b/consensus/src/simplex/actors/resolver/mod.rs index 5bede0a2a38..01dfce0f0e5 100644 --- a/consensus/src/simplex/actors/resolver/mod.rs +++ b/consensus/src/simplex/actors/resolver/mod.rs @@ -6,20 +6,16 @@ use crate::types::Epoch; pub use actor::Actor; use commonware_cryptography::certificate::Scheme; use commonware_p2p::Blocker; -use commonware_parallel::Strategy; pub use ingress::Mailbox; #[cfg(test)] pub use ingress::MailboxMessage; use std::time::Duration; -pub struct Config { +pub struct Config { pub scheme: S, pub blocker: B, - /// Strategy for parallel operations. - pub strategy: T, - pub epoch: Epoch, pub mailbox_size: usize, pub fetch_concurrent: usize, diff --git a/consensus/src/simplex/config.rs b/consensus/src/simplex/config.rs index 520c5a2a5fe..1bf12efd45b 100644 --- a/consensus/src/simplex/config.rs +++ b/consensus/src/simplex/config.rs @@ -8,7 +8,6 @@ use crate::{ }; use commonware_cryptography::{certificate::Scheme, Digest}; use commonware_p2p::Blocker; -use commonware_parallel::Strategy; use commonware_runtime::buffer::paged::CacheRef; use std::{num::NonZeroUsize, time::Duration}; @@ -41,7 +40,7 @@ impl ForwardingPolicy { } /// Configuration for the consensus engine. -pub struct Config +pub struct Config where S: Scheme, L: Elector, @@ -50,7 +49,6 @@ where A: CertifiableAutomaton>, R: Relay, F: Reporter>, - T: Strategy, { /// Signing scheme for the consensus engine. /// @@ -89,9 +87,6 @@ where /// automatically filter and verify activities based on scheme attributability. pub reporter: F, - /// Strategy for parallel operations. - pub strategy: T, - /// Partition for the consensus engine. pub partition: String, @@ -153,8 +148,7 @@ impl< A: CertifiableAutomaton>, R: Relay, F: Reporter>, - T: Strategy, - > Config + > Config { /// Assert enforces that all configuration values are valid. pub fn assert(&self) { diff --git a/consensus/src/simplex/engine.rs b/consensus/src/simplex/engine.rs index 22edfd73b58..51e19123f11 100644 --- a/consensus/src/simplex/engine.rs +++ b/consensus/src/simplex/engine.rs @@ -11,16 +11,15 @@ use crate::{ use commonware_cryptography::Digest; use commonware_macros::select; use commonware_p2p::{Blocker, Receiver, Sender}; -use commonware_parallel::Strategy; use commonware_runtime::{ - spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage, + spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage, Strategist, }; use rand_core::CryptoRngCore; use tracing::debug; /// Instance of `simplex` consensus engine. pub struct Engine< - E: BufferPooler + Clock + CryptoRngCore + Spawner + Storage + Metrics, + E: BufferPooler + Clock + CryptoRngCore + Spawner + Storage + Metrics + Strategist, S: Scheme, L: Elector, B: Blocker, @@ -28,22 +27,21 @@ pub struct Engine< A: CertifiableAutomaton, Digest = D>, R: Relay>, F: Reporter>, - T: Strategy, > { context: ContextCell, voter: voter::Actor, voter_mailbox: voter::Mailbox, - batcher: batcher::Actor, + batcher: batcher::Actor, batcher_mailbox: batcher::Mailbox, - resolver: resolver::Actor, + resolver: resolver::Actor, resolver_mailbox: resolver::Mailbox, } impl< - E: BufferPooler + Clock + CryptoRngCore + Spawner + Storage + Metrics, + E: BufferPooler + Clock + CryptoRngCore + Spawner + Storage + Metrics + Strategist, S: Scheme, L: Elector, B: Blocker, @@ -51,11 +49,10 @@ impl< A: CertifiableAutomaton, Digest = D>, R: Relay>, F: Reporter>, - T: Strategy, - > Engine + > Engine { /// Create a new `simplex` consensus engine. - pub fn new(context: E, cfg: Config) -> Self { + pub fn new(context: E, cfg: Config) -> Self { // Ensure configuration is valid cfg.assert(); @@ -67,7 +64,6 @@ impl< blocker: cfg.blocker.clone(), reporter: cfg.reporter.clone(), relay: cfg.relay.clone(), - strategy: cfg.strategy.clone(), epoch: cfg.epoch, mailbox_size: cfg.mailbox_size, activity_timeout: cfg.activity_timeout, @@ -105,7 +101,6 @@ impl< resolver::Config { blocker: cfg.blocker, scheme: cfg.scheme, - strategy: cfg.strategy, mailbox_size: cfg.mailbox_size, epoch: cfg.epoch, fetch_concurrent: cfg.fetch_concurrent, diff --git a/consensus/src/simplex/mod.rs b/consensus/src/simplex/mod.rs index 84c016727ef..8853b583957 100644 --- a/consensus/src/simplex/mod.rs +++ b/consensus/src/simplex/mod.rs @@ -784,7 +784,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: validator.to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -1037,7 +1036,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: validator.to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -1194,7 +1192,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: validator.to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -1350,7 +1347,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: validator.to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -1531,7 +1527,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: validator.to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -1652,7 +1647,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: me.to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -1786,7 +1780,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: validator.to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -2018,7 +2011,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: validator.to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -2183,7 +2175,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: validator.to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -2384,7 +2375,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: validator.to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -2581,7 +2571,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: validator.to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -2833,7 +2822,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: validator.to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -3009,7 +2997,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: validator.clone().to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -3177,7 +3164,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: validator.clone().to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -3369,7 +3355,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: validator.clone().to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -3530,7 +3515,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: validator.to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -3621,7 +3605,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: validator.to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -3845,7 +3828,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: validator.to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -3998,7 +3980,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: validator.clone().to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -4168,7 +4149,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: validator.clone().to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -4304,7 +4284,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: validator.to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -4467,7 +4446,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: participants[0].clone().to_string(), mailbox_size: 64, epoch: Epoch::new(333), @@ -4686,7 +4664,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: attributable_reporter, - strategy: Sequential, partition: validator.to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -5039,7 +5016,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: validator.to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -5244,7 +5220,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: validator.to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -5384,7 +5359,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: validator.to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -5481,7 +5455,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: selected_reporter, - strategy: Sequential, partition: validator.to_string(), mailbox_size: 1024, epoch: Epoch::new(333), @@ -5974,7 +5947,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: label, mailbox_size: 1024, epoch: Epoch::new(333), @@ -6043,7 +6015,6 @@ mod tests { automaton: application.clone(), relay: application.clone(), reporter: reporter.clone(), - strategy: Sequential, partition: label, mailbox_size: 1024, epoch: Epoch::new(333), diff --git a/examples/bridge/src/bin/validator.rs b/examples/bridge/src/bin/validator.rs index 0cde6405a83..ee922ae0dc6 100644 --- a/examples/bridge/src/bin/validator.rs +++ b/examples/bridge/src/bin/validator.rs @@ -17,7 +17,7 @@ use commonware_cryptography::{ }; use commonware_p2p::{authenticated, Manager as _}; use commonware_runtime::{ - buffer::paged::CacheRef, tokio, Metrics, Network, Quota, Runner, ThreadPooler, + buffer::paged::CacheRef, tokio, Metrics, Network, Quota, Runner, StrategyConfig, }; use commonware_stream::encrypted::{dial, Config as StreamConfig}; use commonware_utils::{from_hex, ordered::Set, union, NZUsize, TryCollect, NZU16, NZU32}; @@ -150,7 +150,9 @@ fn main() { .expect("Other identity not well-formed"); // Initialize context - let runtime_cfg = tokio::Config::new().with_storage_directory(storage_directory); + let runtime_cfg = tokio::Config::new() + .with_storage_directory(storage_directory) + .with_strategy(StrategyConfig::Rayon(NZUsize!(2))); let executor = tokio::Runner::new(runtime_cfg); // Configure indexer @@ -221,7 +223,6 @@ fn main() { ); // Initialize application - let strategy = context.clone().create_strategy(NZUsize!(2)).unwrap(); let consensus_namespace = union(APPLICATION_NAMESPACE, CONSENSUS_SUFFIX); let this_network = Scheme::signer(&consensus_namespace, validators.clone(), identity, share) @@ -261,7 +262,6 @@ fn main() { skip_timeout: ViewDelta::new(5), fetch_concurrent: 32, page_cache: CacheRef::from_pooler(&context, NZU16!(16_384), NZUsize!(10_000)), - strategy, forwarding: simplex::ForwardingPolicy::Disabled, }, ); diff --git a/examples/log/src/main.rs b/examples/log/src/main.rs index 76ae6ec08ca..8460e1ffe13 100644 --- a/examples/log/src/main.rs +++ b/examples/log/src/main.rs @@ -54,7 +54,6 @@ use commonware_consensus::{ }; use commonware_cryptography::{ed25519, Sha256, Signer as _}; use commonware_p2p::{authenticated::discovery, Manager as _}; -use commonware_parallel::Sequential; use commonware_runtime::{buffer::paged::CacheRef, tokio, Metrics, Quota, Runner}; use commonware_utils::{ordered::Set, union, NZUsize, TryCollect, NZU16, NZU32}; use std::{ @@ -225,7 +224,6 @@ fn main() { skip_timeout: ViewDelta::new(5), fetch_concurrent: 32, page_cache: CacheRef::from_pooler(&context, NZU16!(16_384), NZUsize!(10_000)), - strategy: Sequential, forwarding: simplex::ForwardingPolicy::Disabled, }; let engine = simplex::Engine::new(context.with_label("engine"), cfg); diff --git a/examples/reshare/src/engine.rs b/examples/reshare/src/engine.rs index b25db02c7f1..faa25bcd496 100644 --- a/examples/reshare/src/engine.rs +++ b/examples/reshare/src/engine.rs @@ -27,10 +27,9 @@ use commonware_cryptography::{ BatchVerifier, Hasher, Signer, }; use commonware_p2p::{Blocker, Manager, Receiver, Sender}; -use commonware_parallel::Strategy; use commonware_runtime::{ buffer::paged::CacheRef, spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, - Network, Spawner, Storage, + Network, Spawner, Storage, Strategist, }; use commonware_storage::archive::immutable; use commonware_utils::{channel::mpsc, union, NZUsize, NZU16, NZU32, NZU64}; @@ -60,13 +59,12 @@ const PAGE_CACHE_CAPACITY: NonZero = NZUsize!(8_192); // 32MB const MAX_REPAIR: NonZero = NZUsize!(50); const MAX_PENDING_ACKS: NonZero = NZUsize!(16); -pub struct Config +pub struct Config where P: Manager, C: Signer, B: Blocker, V: Variant, - T: Strategy, { pub signer: C, pub manager: P, @@ -77,12 +75,11 @@ where pub peer_config: PeerConfig, pub partition_prefix: String, pub freezer_table_initial_size: u32, - pub strategy: T, } -pub struct Engine +pub struct Engine where - E: BufferPooler + Spawner + Metrics + CryptoRngCore + Clock + Storage + Network, + E: BufferPooler + Spawner + Metrics + CryptoRngCore + Clock + Storage + Network + Strategist, C: Signer, P: Manager, B: Blocker, @@ -90,11 +87,10 @@ where V: Variant, S: Scheme, L: Elector, - T: Strategy, Provider: EpochProvider, { context: ContextCell, - config: Config, + config: Config, dkg: dkg::Actor, dkg_mailbox: dkg::Mailbox, buffer: buffered::Engine, P>, @@ -107,7 +103,6 @@ where immutable::Archive>, immutable::Archive>, FixedEpocher, - T, >, #[allow(clippy::type_complexity)] orchestrator: orchestrator::Actor< @@ -119,14 +114,13 @@ where Deferred, Block, FixedEpocher>, S, L, - T, >, orchestrator_mailbox: orchestrator::Mailbox, } -impl Engine +impl Engine where - E: BufferPooler + Spawner + Metrics + CryptoRngCore + Clock + Storage + Network, + E: BufferPooler + Spawner + Metrics + CryptoRngCore + Clock + Storage + Network + Strategist, C: Signer, P: Manager, B: Blocker, @@ -134,11 +128,10 @@ where V: Variant, S: Scheme, L: Elector, - T: Strategy, Batch: BatchVerifier, Provider: EpochProvider, { - pub async fn new(context: E, config: Config) -> Self { + pub async fn new(context: E, config: Config) -> Self { let page_cache = CacheRef::from_pooler(&context, PAGE_CACHE_PAGE_SIZE, PAGE_CACHE_CAPACITY); let consensus_namespace = union(&config.namespace, b"_CONSENSUS"); let num_participants = NZU32!(config.peer_config.max_participants_per_round()); @@ -282,7 +275,6 @@ where block_codec_config: num_participants, max_repair: MAX_REPAIR, max_pending_acks: MAX_PENDING_ACKS, - strategy: config.strategy.clone(), }, ) .await; @@ -301,7 +293,6 @@ where application, provider, marshal: marshal_mailbox, - strategy: config.strategy.clone(), muxer_size: MAILBOX_SIZE, mailbox_size: MAILBOX_SIZE, partition_prefix: format!("{}_consensus", config.partition_prefix), diff --git a/examples/reshare/src/main.rs b/examples/reshare/src/main.rs index efe55059e48..37242a74281 100644 --- a/examples/reshare/src/main.rs +++ b/examples/reshare/src/main.rs @@ -11,9 +11,9 @@ use commonware_consensus::simplex::elector::{Random, RoundRobin}; use commonware_cryptography::{bls12381::primitives::variant::MinSig, ed25519::PublicKey}; use commonware_runtime::{ tokio::{self, telemetry::Logging}, - Metrics, Runner, + Metrics, Runner, StrategyConfig, }; -use commonware_utils::{hex, NZU64}; +use commonware_utils::{hex, NZUsize, NZU64}; use std::{future::Future, num::NonZeroU64, path::PathBuf, pin::Pin}; use tracing::Level; @@ -153,7 +153,8 @@ fn main() { let config = tokio::Config::new() .with_worker_threads(app.worker_threads) - .with_catch_panics(false); + .with_catch_panics(false) + .with_strategy(StrategyConfig::Rayon(NZUsize!(2))); let runner = tokio::Runner::new(config); runner.start(|context| async move { // Initialize telemetry. diff --git a/examples/reshare/src/orchestrator/actor.rs b/examples/reshare/src/orchestrator/actor.rs index dec4a0f37a0..b943d1fcd86 100644 --- a/examples/reshare/src/orchestrator/actor.rs +++ b/examples/reshare/src/orchestrator/actor.rs @@ -19,10 +19,9 @@ use commonware_p2p::{ utils::mux::{Builder, MuxHandle, Muxer}, Blocker, Receiver, Sender, }; -use commonware_parallel::Strategy; use commonware_runtime::{ buffer::paged::CacheRef, spawn_cell, telemetry::metrics::status::GaugeExt, BufferPooler, Clock, - ContextCell, Handle, Metrics, Network, Spawner, Storage, + ContextCell, Handle, Metrics, Network, Spawner, Storage, Strategist, }; use commonware_utils::{channel::mpsc, vec::NonEmptyVec, NZUsize, NZU16}; use prometheus_client::metrics::gauge::Gauge; @@ -31,7 +30,7 @@ use std::{collections::BTreeMap, marker::PhantomData, time::Duration}; use tracing::{debug, info, warn}; /// Configuration for the orchestrator. -pub struct Config +pub struct Config where B: Blocker, V: Variant, @@ -41,13 +40,11 @@ where + Relay>, S: Scheme, L: Elector, - T: Strategy, { pub oracle: B, pub application: A, pub provider: Provider, pub marshal: MarshalMailbox>>, - pub strategy: T, pub muxer_size: usize, pub mailbox_size: usize, @@ -58,9 +55,9 @@ where pub _phantom: PhantomData, } -pub struct Actor +pub struct Actor where - E: BufferPooler + Spawner + Metrics + CryptoRngCore + Clock + Storage + Network, + E: BufferPooler + Spawner + Metrics + CryptoRngCore + Clock + Storage + Network + Strategist, B: Blocker, V: Variant, C: Signer, @@ -69,7 +66,6 @@ where + Relay>, S: Scheme, L: Elector, - T: Strategy, Provider: EpochProvider, { context: ContextCell, @@ -79,7 +75,6 @@ where oracle: B, marshal: MarshalMailbox>>, provider: Provider, - strategy: T, muxer_size: usize, partition_prefix: String, @@ -90,9 +85,9 @@ where _phantom: PhantomData, } -impl Actor +impl Actor where - E: BufferPooler + Spawner + Metrics + CryptoRngCore + Clock + Storage + Network, + E: BufferPooler + Spawner + Metrics + CryptoRngCore + Clock + Storage + Network + Strategist, B: Blocker, V: Variant, C: Signer, @@ -101,12 +96,11 @@ where + Relay>, S: scheme::Scheme, L: Elector, - T: Strategy, Provider: EpochProvider, { pub fn new( context: E, - config: Config, + config: Config, ) -> (Self, Mailbox) { let (sender, mailbox) = mpsc::channel(config.mailbox_size); let page_cache_ref = CacheRef::from_pooler(&context, NZU16!(16_384), NZUsize!(10_000)); @@ -123,7 +117,6 @@ where oracle: config.oracle, marshal: config.marshal, provider: config.provider, - strategy: config.strategy, muxer_size: config.muxer_size, partition_prefix: config.partition_prefix, page_cache_ref, @@ -328,7 +321,6 @@ where skip_timeout: ViewDelta::new(10), fetch_concurrent: 32, page_cache: self.page_cache_ref.clone(), - strategy: self.strategy.clone(), forwarding: simplex::ForwardingPolicy::Disabled, }, ); diff --git a/examples/reshare/src/validator.rs b/examples/reshare/src/validator.rs index fb8d19992c9..54f64fa419a 100644 --- a/examples/reshare/src/validator.rs +++ b/examples/reshare/src/validator.rs @@ -14,8 +14,8 @@ use commonware_cryptography::{ bls12381::primitives::variant::MinSig, ed25519, Hasher, Sha256, Signer, }; use commonware_p2p::authenticated::discovery; -use commonware_runtime::{tokio, Metrics, Quota, ThreadPooler}; -use commonware_utils::{union, union_unique, NZUsize, NZU32}; +use commonware_runtime::{tokio, Metrics, Quota}; +use commonware_utils::{union, union_unique, NZU32}; use futures::future::try_join_all; use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, @@ -116,8 +116,7 @@ pub async fn run( }; let marshal = marshal_resolver::init(&context, resolver_cfg, marshal); - let strategy = context.clone().create_strategy(NZUsize!(2)).unwrap(); - let engine = engine::Engine::<_, _, _, _, Sha256, MinSig, S, L, _>::new( + let engine = engine::Engine::<_, _, _, _, Sha256, MinSig, S, L>::new( context.with_label("engine"), engine::Config { signer: config.signing_key.clone(), @@ -129,7 +128,6 @@ pub async fn run( partition_prefix: "engine".to_string(), freezer_table_initial_size: 1024 * 1024, // 100mb peer_config, - strategy, }, ) .await; @@ -176,14 +174,13 @@ mod test { utils::mux, Message, Receiver, }; - use commonware_parallel::Sequential; use commonware_runtime::{ deterministic::{self, Runner}, Clock, Handle, Metrics, Quota, Runner as _, Spawner, }; use commonware_utils::{ channel::{mpsc, oneshot}, - test_rng_seeded, union, N3f1, TryCollect, + test_rng_seeded, union, N3f1, NZUsize, TryCollect, }; use rand::seq::SliceRandom; use rand_core::CryptoRngCore; @@ -384,7 +381,7 @@ mod test { }; let marshal = marshal_resolver::init(&validator_ctx.with_label("marshal"), resolver_cfg, marshal); - let engine = engine::Engine::<_, _, _, _, Sha256, MinSig, S, L, _>::new( + let engine = engine::Engine::<_, _, _, _, Sha256, MinSig, S, L>::new( validator_ctx.with_label("consensus"), engine::Config { signer: sk.clone(), @@ -396,7 +393,6 @@ mod test { partition_prefix: format!("validator_{}", &pk), freezer_table_initial_size: 1024, // 1mb peer_config: self.peer_config.clone(), - strategy: Sequential, }, ) .await; diff --git a/p2p/src/utils/codec.rs b/p2p/src/utils/codec.rs index 936024585d4..ba62fd85eab 100644 --- a/p2p/src/utils/codec.rs +++ b/p2p/src/utils/codec.rs @@ -4,7 +4,6 @@ use crate::{Blocker, CheckedSender, Receiver, Recipients, Sender}; use commonware_codec::{Codec, Error}; use commonware_cryptography::PublicKey; use commonware_macros::select_loop; -use commonware_parallel::Strategy; use commonware_runtime::{iobuf::EncodeExt, spawn_cell, BufferPool, ContextCell, Handle, Spawner}; use commonware_utils::{ channel::{fallible::AsyncFallibleExt, mpsc}, @@ -127,10 +126,9 @@ impl WrappedReceiver { /// backpressure on the event loop, such as signature verification, decryption, or intensive /// validity checks. /// -/// Concurrency is bounded by the provided [`Strategy`]'s -/// [`parallelism_hint`](Strategy::parallelism_hint): when the number of in-flight decode -/// tasks reaches this limit, the receiver stops accepting new messages until an in-flight -/// task completes, providing natural backpressure. +/// Concurrency is bounded by the configured `max_concurrency`: when the number of in-flight +/// decode tasks reaches this limit, the receiver stops accepting new messages until an +/// in-flight task completes, providing natural backpressure. pub struct WrappedBackgroundReceiver where E: Spawner, @@ -158,15 +156,14 @@ where /// Create a new [`WrappedBackgroundReceiver`]. /// /// `channel_capacity` controls the size of the internal channel to the consumer. - /// The `strategy`'s [`parallelism_hint`](Strategy::parallelism_hint) bounds the - /// number of in-flight decode tasks. + /// `max_concurrency` bounds the number of in-flight decode tasks. pub fn new( context: E, receiver: R, codec_config: V::Cfg, blocker: B, channel_capacity: usize, - strategy: &impl Strategy, + max_concurrency: usize, ) -> (Self, mpsc::Receiver<(P, V)>) { let (tx, rx) = mpsc::channel(channel_capacity); ( @@ -176,7 +173,7 @@ where codec_config, blocker, sender: tx, - max_concurrency: strategy.parallelism_hint().max(1), + max_concurrency: max_concurrency.max(1), }, rx, ) @@ -411,7 +408,7 @@ mod tests { (), control2.clone(), 16, - &Sequential, + Sequential.parallelism_hint(), ); let _handle = bg.start(); @@ -448,7 +445,7 @@ mod tests { (), control2.clone(), 16, - &Sequential, + Sequential.parallelism_hint(), ); let _handle = bg.start(); @@ -507,7 +504,7 @@ mod tests { (), control2.clone(), 16, - &Sequential, + Sequential.parallelism_hint(), ); let _handle = bg.start(); @@ -555,7 +552,7 @@ mod tests { (), control2.clone(), 16, - &Sequential, + Sequential.parallelism_hint(), ); let _handle = bg.start(); @@ -603,7 +600,7 @@ mod tests { (), control2.clone(), 16, - &Sequential, + Sequential.parallelism_hint(), ); let _handle = bg.start(); @@ -659,7 +656,7 @@ mod tests { (), NoopBlocker, count as usize, - &HintStrategy(8), + HintStrategy(8).parallelism_hint(), ); let _handle = bg.start(); diff --git a/parallel/src/lib.rs b/parallel/src/lib.rs index 60fe7248125..b6b7449f77e 100644 --- a/parallel/src/lib.rs +++ b/parallel/src/lib.rs @@ -504,6 +504,11 @@ commonware_macros::stability_scope!(BETA, cfg(feature = "std") { pub const fn with_pool(thread_pool: ThreadPool) -> Self { Self { thread_pool } } + + /// Returns the underlying [`ThreadPool`]. + pub const fn thread_pool(&self) -> &ThreadPool { + &self.thread_pool + } } impl Strategy for Rayon { diff --git a/runtime/src/deterministic.rs b/runtime/src/deterministic.rs index 20bad024e8d..537f65cd1b8 100644 --- a/runtime/src/deterministic.rs +++ b/runtime/src/deterministic.rs @@ -60,7 +60,8 @@ use crate::{ Panicker, Registry, ScopeGuard, }, validate_label, BufferPool, BufferPoolConfig, Clock, Error, Execution, Handle, ListenerOf, - Metrics as _, Panicked, Spawner as _, METRICS_PREFIX, + Metrics as _, Panicked, RuntimeStrategy, Spawner as _, StrategyConfig, ThreadPooler as _, + METRICS_PREFIX, }; #[cfg(feature = "external")] use crate::{Blocker, Pacer}; @@ -228,6 +229,9 @@ pub struct Config { /// Buffer pool configuration for storage I/O. storage_buffer_pool_cfg: BufferPoolConfig, + + /// Strategy used by [`crate::Strategist`]. + strategy_cfg: StrategyConfig, } impl Config { @@ -259,6 +263,7 @@ impl Config { storage_fault_cfg: FaultConfig::default(), network_buffer_pool_cfg, storage_buffer_pool_cfg, + strategy_cfg: StrategyConfig::default(), } } @@ -308,6 +313,11 @@ impl Config { self.storage_buffer_pool_cfg = cfg; self } + /// See [Config] + pub const fn with_strategy(mut self, cfg: StrategyConfig) -> Self { + self.strategy_cfg = cfg; + self + } /// Configure storage fault injection. /// @@ -344,6 +354,10 @@ impl Config { pub const fn storage_buffer_pool_config(&self) -> &BufferPoolConfig { &self.storage_buffer_pool_cfg } + /// See [Config] + pub const fn strategy(&self) -> &StrategyConfig { + &self.strategy_cfg + } /// Assert that the configuration is valid. pub fn assert(&self) { @@ -386,6 +400,7 @@ pub struct Executor { shutdown: Mutex, panicker: Panicker, dns: Mutex>>, + strategy_cfg: StrategyConfig, } impl Executor { @@ -473,6 +488,7 @@ pub struct Checkpoint { catch_panics: bool, network_buffer_pool_cfg: BufferPoolConfig, storage_buffer_pool_cfg: BufferPoolConfig, + strategy_cfg: StrategyConfig, } impl Checkpoint { @@ -713,6 +729,7 @@ impl Runner { catch_panics: executor.panicker.catch(), network_buffer_pool_cfg, storage_buffer_pool_cfg, + strategy_cfg: executor.strategy_cfg, }; (output, checkpoint) @@ -924,6 +941,8 @@ impl Clone for Context { impl Context { fn new(cfg: Config) -> (Self, Arc, Panicked) { + let strategy_cfg = cfg.strategy_cfg.clone(); + // Create a new registry let mut registry = Registry::new(); let runtime_registry = registry.root_mut().sub_registry_with_prefix(METRICS_PREFIX); @@ -984,25 +1003,22 @@ impl Context { shutdown: Mutex::new(Stopper::default()), panicker, dns: Mutex::new(HashMap::new()), + strategy_cfg, }); - - ( - Self { - name: String::new(), - attributes: Vec::new(), - scope: None, - executor: Arc::downgrade(&executor), - network: Arc::new(network), - storage: Arc::new(storage), - network_buffer_pool, - storage_buffer_pool, - tree: Tree::root(), - execution: Execution::default(), - traced: false, - }, - executor, - panicked, - ) + let context = Self { + name: String::new(), + attributes: Vec::new(), + scope: None, + executor: Arc::downgrade(&executor), + network: Arc::new(network), + storage: Arc::new(storage), + network_buffer_pool, + storage_buffer_pool, + tree: Tree::root(), + execution: Execution::default(), + traced: false, + }; + (context, executor, panicked) } /// Recover the inner state (deadline, metrics, auditor, rng, synced storage, etc.) from the @@ -1057,24 +1073,22 @@ impl Context { sleeping: Mutex::new(BinaryHeap::new()), shutdown: Mutex::new(Stopper::default()), panicker, + strategy_cfg: checkpoint.strategy_cfg.clone(), }); - ( - Self { - name: String::new(), - attributes: Vec::new(), - scope: None, - executor: Arc::downgrade(&executor), - network: Arc::new(network), - storage: checkpoint.storage, - network_buffer_pool, - storage_buffer_pool, - tree: Tree::root(), - execution: Execution::default(), - traced: false, - }, - executor, - panicked, - ) + let context = Self { + name: String::new(), + attributes: Vec::new(), + scope: None, + executor: Arc::downgrade(&executor), + network: Arc::new(network), + storage: checkpoint.storage, + network_buffer_pool, + storage_buffer_pool, + tree: Tree::root(), + execution: Execution::default(), + traced: false, + }; + (context, executor, panicked) } /// Upgrade Weak reference to [Executor]. @@ -1245,6 +1259,30 @@ impl crate::ThreadPooler for Context { } } +impl crate::Strategist for Context { + type Strategy = RuntimeStrategy; + + fn with_strategy(&self, f: F) -> impl Future + Send + where + F: FnOnce(&Self::Strategy) -> T + Send + 'static, + T: Send + 'static, + { + let handle = self.clone().shared(true).spawn(move |context| async move { + let strategy = match context.executor().strategy_cfg.clone() { + StrategyConfig::Sequential => RuntimeStrategy::default(), + StrategyConfig::Rayon(concurrency) => RuntimeStrategy::Rayon( + context + .with_label("strategy") + .create_strategy(concurrency) + .expect("failed to create runtime strategy"), + ), + }; + f(&strategy) + }); + async move { handle.await.expect("strategy task failed") } + } +} + impl crate::Metrics for Context { fn label(&self) -> String { self.name.clone() diff --git a/runtime/src/lib.rs b/runtime/src/lib.rs index 0294900de7f..4335b7cdd56 100644 --- a/runtime/src/lib.rs +++ b/runtime/src/lib.rs @@ -45,7 +45,9 @@ stability_scope!(BETA, cfg(not(target_arch = "wasm32")) { }); stability_scope!(BETA { use commonware_macros::select; - use commonware_parallel::{Rayon, ThreadPool}; + use commonware_parallel::{ + Rayon, Sequential, Strategy as ParallelStrategy, ThreadPool, + }; use iobuf::PoolError; use prometheus_client::registry::Metric; use rayon::ThreadPoolBuildError; @@ -252,6 +254,93 @@ stability_scope!(BETA { fn stopped(&self) -> signal::Signal; } + /// Runtime-owned strategy configuration for CPU-bound work. + #[derive(Clone, Debug, Default)] + pub enum StrategyConfig { + /// Execute strategy-backed work sequentially. + #[default] + Sequential, + /// Execute strategy-backed work on a rayon thread pool with the given concurrency. + Rayon(NonZeroUsize), + } + + /// Runtime-owned strategy used by [`Strategist`]. + #[derive(Clone, Debug)] + pub enum RuntimeStrategy { + Sequential(Sequential), + Rayon(Rayon), + } + + impl Default for RuntimeStrategy { + fn default() -> Self { + Self::Sequential(Sequential) + } + } + + impl ParallelStrategy for RuntimeStrategy { + fn fold_init( + &self, + iter: I, + init: INIT, + identity: ID, + fold_op: F, + reduce_op: RD, + ) -> R + where + I: IntoIterator + Send, + INIT: Fn() -> T + Send + Sync, + T: Send, + R: Send, + ID: Fn() -> R + Send + Sync, + F: Fn(R, &mut T, I::Item) -> R + Send + Sync, + RD: Fn(R, R) -> R + Send + Sync, + { + match self { + Self::Sequential(strategy) => { + strategy.fold_init(iter, init, identity, fold_op, reduce_op) + } + Self::Rayon(strategy) => { + strategy.fold_init(iter, init, identity, fold_op, reduce_op) + } + } + } + + fn join(&self, a: A, b: B) -> (RA, RB) + where + A: FnOnce() -> RA + Send, + B: FnOnce() -> RB + Send, + RA: Send, + RB: Send, + { + match self { + Self::Sequential(strategy) => strategy.join(a, b), + Self::Rayon(strategy) => strategy.join(a, b), + } + } + + fn parallelism_hint(&self) -> usize { + match self { + Self::Sequential(strategy) => strategy.parallelism_hint(), + Self::Rayon(strategy) => strategy.parallelism_hint(), + } + } + } + + /// Interface for runtimes that expose a built-in parallel strategy. + pub trait Strategist: Clone + Send + Sync + 'static { + /// Strategy implementation owned by the runtime. + type Strategy: ParallelStrategy; + + /// Executes CPU-bound work using the runtime's configured strategy. + /// + /// The closure and output must be `Send + 'static` because runtimes may move + /// the work to a blocking or rayon executor. + fn with_strategy(&self, f: F) -> impl Future + Send + where + F: FnOnce(&Self::Strategy) -> T + Send + 'static, + T: Send + 'static; + } + /// Trait for creating [rayon]-compatible thread pools with each worker thread /// placed on dedicated threads via [Spawner]. pub trait ThreadPooler: Spawner + Metrics { @@ -865,6 +954,7 @@ mod tests { use crate::telemetry::traces::collector::TraceStorage; use bytes::Bytes; use commonware_macros::{select, test_collect_traces}; + use commonware_parallel::Strategy as _; use commonware_utils::{ channel::{mpsc, oneshot}, sync::Mutex, @@ -4182,6 +4272,49 @@ mod tests { }); } + fn test_with_strategy(runner: R, expected_parallelism: usize) + where + R::Context: Strategist, + { + runner.start(|context| async move { + let (sum, parallelism) = context + .with_strategy(|strategy| { + let sum = + strategy.fold(0..1000u64, || 0u64, |acc, item| acc + item, |a, b| a + b); + (sum, strategy.parallelism_hint()) + }) + .await; + assert_eq!(sum, 499_500); + assert_eq!(parallelism, expected_parallelism); + }); + } + + #[test] + fn test_tokio_with_strategy_sequential() { + test_with_strategy(tokio::Runner::default(), 1); + } + + #[test] + fn test_tokio_with_strategy_rayon() { + let runner = tokio::Runner::new( + tokio::Config::default().with_strategy(StrategyConfig::Rayon(NZUsize!(4))), + ); + test_with_strategy(runner, 4); + } + + #[test] + fn test_deterministic_with_strategy_sequential() { + test_with_strategy(deterministic::Runner::default(), 1); + } + + #[test] + fn test_deterministic_with_strategy_rayon() { + let runner = deterministic::Runner::new( + deterministic::Config::default().with_strategy(StrategyConfig::Rayon(NZUsize!(4))), + ); + test_with_strategy(runner, 4); + } + fn test_buffer_pooler( runner: R, expected_network_max_per_class: usize, diff --git a/runtime/src/tokio/runtime.rs b/runtime/src/tokio/runtime.rs index a6de1585ee0..c654d80d871 100644 --- a/runtime/src/tokio/runtime.rs +++ b/runtime/src/tokio/runtime.rs @@ -20,13 +20,13 @@ use crate::{ utils::{ self, add_attribute, signal::Stopper, supervision::Tree, Panicker, Registry, ScopeGuard, }, - BufferPool, BufferPoolConfig, Clock, Error, Execution, Handle, Metrics as _, SinkOf, - Spawner as _, StreamOf, METRICS_PREFIX, + BufferPool, BufferPoolConfig, Clock, Error, Execution, Handle, Metrics as _, RuntimeStrategy, + SinkOf, Spawner as _, StrategyConfig, StreamOf, ThreadPooler as _, METRICS_PREFIX, }; use commonware_macros::{select, stability}; #[stability(BETA)] use commonware_parallel::ThreadPool; -use commonware_utils::{sync::Mutex, NZUsize}; +use commonware_utils::{channel::oneshot, sync::Mutex, NZUsize}; use futures::future::Either; use governor::clock::{Clock as GClock, ReasonablyRealtime}; use prometheus_client::{ @@ -43,7 +43,7 @@ use std::{ net::{IpAddr, SocketAddr}, num::NonZeroUsize, path::PathBuf, - sync::Arc, + sync::{Arc, OnceLock}, time::{Duration, SystemTime}, }; use tokio::runtime::{Builder, Runtime}; @@ -177,6 +177,9 @@ pub struct Config { /// Explicit buffer pool configuration for storage I/O, if provided. storage_buffer_pool_cfg: Option, + + /// Strategy used by [`crate::Strategist`]. + strategy_cfg: StrategyConfig, } impl Config { @@ -195,6 +198,7 @@ impl Config { network_cfg: NetworkConfig::default(), network_buffer_pool_cfg: None, storage_buffer_pool_cfg: None, + strategy_cfg: StrategyConfig::default(), } } @@ -259,6 +263,11 @@ impl Config { self.storage_buffer_pool_cfg = Some(cfg); self } + /// See [Config] + pub const fn with_strategy(mut self, cfg: StrategyConfig) -> Self { + self.strategy_cfg = cfg; + self + } // Getters /// See [Config] @@ -301,6 +310,10 @@ impl Config { pub const fn maximum_buffer_size(&self) -> usize { self.maximum_buffer_size } + /// See [Config] + pub const fn strategy(&self) -> &StrategyConfig { + &self.strategy_cfg + } /// Returns the network buffer pool config, deriving thread-cache /// parallelism from `worker_threads` if not explicitly configured. @@ -335,6 +348,7 @@ pub struct Executor { shutdown: Mutex, panicker: Panicker, thread_stack_size: usize, + strategy: OnceLock, } /// Implementation of [crate::Runner] for the `tokio` runtime. @@ -480,6 +494,7 @@ impl crate::Runner for Runner { shutdown: Mutex::new(Stopper::default()), panicker, thread_stack_size: self.cfg.thread_stack_size, + strategy: OnceLock::new(), }); // Get metrics @@ -501,6 +516,20 @@ impl crate::Runner for Runner { execution: Execution::default(), traced: false, }; + let strategy = match self.cfg.strategy_cfg { + StrategyConfig::Sequential => RuntimeStrategy::default(), + StrategyConfig::Rayon(concurrency) => RuntimeStrategy::Rayon( + context + .with_label("strategy") + .create_strategy(concurrency) + .expect("failed to create runtime strategy"), + ), + }; + executor + .strategy + .set(strategy) + .expect("runtime strategy already initialized"); + let output = executor.runtime.block_on(panicked.interrupt(f(context))); gauge.dec(); @@ -565,6 +594,13 @@ impl Context { fn metrics(&self) -> &Metrics { &self.executor.metrics } + + fn strategy(&self) -> &RuntimeStrategy { + self.executor + .strategy + .get() + .expect("runtime strategy not initialized") + } } impl crate::Spawner for Context { @@ -691,6 +727,38 @@ impl crate::ThreadPooler for Context { } } +#[stability(BETA)] +impl crate::Strategist for Context { + type Strategy = RuntimeStrategy; + + fn with_strategy(&self, f: F) -> impl Future + Send + where + F: FnOnce(&Self::Strategy) -> T + Send + 'static, + T: Send + 'static, + { + let strategy = self.strategy().clone(); + match strategy { + RuntimeStrategy::Sequential(strategy) => { + let strategy = RuntimeStrategy::Sequential(strategy); + let handle = self + .clone() + .shared(true) + .spawn(move |_| async move { f(&strategy) }); + Either::Left(async move { handle.await.expect("strategy task failed") }) + } + RuntimeStrategy::Rayon(strategy) => { + let (sender, receiver) = oneshot::channel(); + let pool = strategy.thread_pool().clone(); + let strategy = RuntimeStrategy::Rayon(strategy); + pool.spawn(move || { + let _ = sender.send(f(&strategy)); + }); + Either::Right(async move { receiver.await.expect("strategy task failed") }) + } + } + } +} + impl crate::Metrics for Context { fn label(&self) -> String { self.name.clone() diff --git a/runtime/src/utils/cell.rs b/runtime/src/utils/cell.rs index 713ecbbcc16..59103a4e781 100644 --- a/runtime/src/utils/cell.rs +++ b/runtime/src/utils/cell.rs @@ -206,6 +206,21 @@ commonware_macros::stability_scope!(BETA { use commonware_parallel::ThreadPool; use rayon::ThreadPoolBuildError; + impl crate::Strategist for Cell + where + C: crate::Strategist, + { + type Strategy = ::Strategy; + + fn with_strategy(&self, f: F) -> impl Future + Send + where + F: FnOnce(&Self::Strategy) -> T + Send + 'static, + T: Send + 'static, + { + self.as_present().with_strategy(f) + } + } + impl crate::ThreadPooler for Cell where C: crate::ThreadPooler,