diff --git a/Cargo.lock b/Cargo.lock index 07385d9fc72..0f9758fd654 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1519,6 +1519,8 @@ version = "2026.4.0" dependencies = [ "cfg-if", "commonware-macros", + "commonware-utils", + "futures", "proptest", "rayon", ] diff --git a/consensus/src/aggregation/config.rs b/consensus/src/aggregation/config.rs index 52c41647270..407cccab486 100644 --- a/consensus/src/aggregation/config.rs +++ b/consensus/src/aggregation/config.rs @@ -8,7 +8,7 @@ use commonware_cryptography::{ Digest, }; use commonware_p2p::Blocker; -use commonware_parallel::Strategy; +use commonware_parallel::Bridge; use commonware_runtime::buffer::paged::CacheRef; use commonware_utils::NonZeroDuration; use std::num::{NonZeroU64, NonZeroUsize}; @@ -21,7 +21,7 @@ pub struct Config< Z: Reporter>, M: Monitor, B: Blocker::PublicKey>, - T: Strategy, + T: Bridge, > { /// Tracks the current state of consensus (to determine which participants should /// be involved in the current broadcast attempt). diff --git a/consensus/src/aggregation/engine.rs b/consensus/src/aggregation/engine.rs index a685e0f5fe4..ece6cb4a683 100644 --- a/consensus/src/aggregation/engine.rs +++ b/consensus/src/aggregation/engine.rs @@ -20,7 +20,7 @@ use commonware_p2p::{ utils::codec::{wrap, WrappedSender}, Blocker, Receiver, Recipients, Sender, }; -use commonware_parallel::Strategy; +use commonware_parallel::Bridge; use commonware_runtime::{ buffer::paged::CacheRef, spawn_cell, @@ -78,7 +78,7 @@ pub struct Engine< Z: Reporter>, M: Monitor, B: Blocker::PublicKey>, - T: Strategy, + T: Bridge, > { // ---------- Interfaces ---------- context: ContextCell, @@ -161,7 +161,7 @@ impl< Z: Reporter>, M: Monitor, B: Blocker::PublicKey>, - T: Strategy, + T: Bridge, > Engine { /// Creates a new engine with the given context and configuration. @@ -385,7 +385,7 @@ impl< } // Validate that we need to process the ack - if let Err(err) = self.validate_ack(&ack, &sender) { + if let Err(err) = self.validate_ack(&ack, &sender).await { if err.blockable() { commonware_p2p::block!( self.blocker, @@ -613,7 +613,7 @@ impl< /// Takes a raw ack (from sender) from the p2p network and validates it. /// /// Returns an error if the ack is invalid. - fn validate_ack( + async fn validate_ack( &mut self, ack: &Ack, sender: &::PublicKey, @@ -679,7 +679,14 @@ impl< } // Validate signature - if !ack.verify(&mut self.context, &*scheme, &self.strategy) { + let ack_clone = ack.clone(); + let scheme = scheme.clone(); + let mut context = self.context.with_label("verify"); + if !self + .strategy + .spawn(move |s| ack_clone.verify(&mut context, &*scheme, &s)) + .await + { return Err(Error::InvalidAckSignature); } diff --git a/consensus/src/marshal/coding/marshaled.rs b/consensus/src/marshal/coding/marshaled.rs index ef04a4146bb..f5fa21f169f 100644 --- a/consensus/src/marshal/coding/marshaled.rs +++ b/consensus/src/marshal/coding/marshaled.rs @@ -106,7 +106,7 @@ use commonware_cryptography::{ Committable, Digestible, Hasher, }; use commonware_macros::select; -use commonware_parallel::Strategy; +use commonware_parallel::Bridge; use commonware_runtime::{ telemetry::metrics::histogram::{Buckets, Timed}, Clock, Metrics, Spawner, Storage, @@ -140,7 +140,7 @@ where C: CodingScheme, H: Hasher, Z: Provider>, - S: Strategy, + S: Bridge, ES: Epocher, { /// The underlying application to wrap. @@ -173,7 +173,7 @@ where C: CodingScheme, H: Hasher, Z: Provider>, - S: Strategy, + S: Bridge, ES: Epocher, { context: E, @@ -206,7 +206,7 @@ where C: CodingScheme, H: Hasher, Z: Provider>, - S: Strategy, + S: Bridge, ES: Epocher, { /// Creates a new [`Marshaled`] wrapper. @@ -448,7 +448,7 @@ where C: CodingScheme, H: Hasher, Z: Provider>, - S: Strategy, + S: Bridge, ES: Epocher, { type Digest = Commitment; @@ -825,7 +825,7 @@ where C: CodingScheme, H: Hasher, Z: Provider>, - S: Strategy, + S: Bridge, ES: Epocher, { async fn certify(&mut self, round: Round, payload: Self::Digest) -> oneshot::Receiver { @@ -935,7 +935,7 @@ where C: CodingScheme, H: Hasher, Z: Provider>, - S: Strategy, + S: Bridge, ES: Epocher, { type Digest = Commitment; @@ -988,7 +988,7 @@ where C: CodingScheme, H: Hasher, Z: Provider>, - S: Strategy, + S: Bridge, ES: Epocher, { type Activity = A::Activity; diff --git a/consensus/src/marshal/coding/shards/engine.rs b/consensus/src/marshal/coding/shards/engine.rs index 104a18459bf..2c7d2fb3683 100644 --- a/consensus/src/marshal/coding/shards/engine.rs +++ b/consensus/src/marshal/coding/shards/engine.rs @@ -159,7 +159,7 @@ use commonware_p2p::{ utils::codec::{WrappedBackgroundReceiver, WrappedSender}, Blocker, Provider as PeerProvider, Receiver, Recipients, Sender, }; -use commonware_parallel::Strategy; +use commonware_parallel::Bridge; use commonware_runtime::{ spawn_cell, telemetry::metrics::{histogram::HistogramExt, status::GaugeExt}, @@ -219,7 +219,7 @@ where C: CodingScheme, H: Hasher, B: CertifiableBlock, - T: Strategy, + T: Bridge, { /// The scheme provider. pub scheme_provider: S, @@ -280,7 +280,7 @@ where H: Hasher, B: CertifiableBlock, P: PublicKey, - T: Strategy, + T: Bridge, { /// Context held by the actor. context: ContextCell, @@ -367,7 +367,7 @@ where H: Hasher, B: CertifiableBlock, P: PublicKey, - T: Strategy, + T: Bridge, { /// Create a new [`Engine`] with the given configuration. pub fn new(context: E, config: Config) -> (Self, Mailbox) { @@ -577,7 +577,7 @@ where /// - `Ok(None)` if reconstruction could not be attempted due to insufficient checked shards. /// - `Err(_)` if reconstruction was attempted but failed. #[allow(clippy::type_complexity)] - fn try_reconstruct( + async fn try_reconstruct( &mut self, commitment: Commitment, ) -> Result>>, Error> { @@ -591,20 +591,21 @@ where debug!(%commitment, "not enough checked shards to reconstruct block"); return Ok(None); } - // Attempt to reconstruct the encoded blob + + // Spawn only the erasure decode (which uses the strategy for parallel + // recovery) so we can yield while it runs. + let shards = state.take_checked_shards(); let start = self.context.current(); - let blob = C::decode( - &commitment.config(), - &commitment.root(), - state.checked_shards().iter(), - &self.strategy, - ) - .map_err(Error::Coding)?; + let blob = self + .strategy + .spawn(move |s| C::decode(&commitment.config(), &commitment.root(), shards.iter(), &s)) + .await + .map_err(Error::Coding)?; self.metrics .erasure_decode_duration .observe_between(start, self.context.current()); - // Attempt to decode the block from the encoded blob + // Decode the block and validate the reconstruction. let (inner, config): (B, CodingConfig) = Decode::decode_cfg(&mut blob.as_slice(), &(self.block_codec_cfg.clone(), ()))?; @@ -872,7 +873,7 @@ where } } - match self.try_reconstruct(commitment) { + match self.try_reconstruct(commitment).await { Ok(Some(block)) => { // Do not prune other reconstruction state here. A Byzantine // leader can equivocate by proposing multiple commitments in @@ -1220,7 +1221,7 @@ where &mut self, commitment: Commitment, participants_len: u64, - strategy: &impl Strategy, + strategy: &impl Bridge, blocker: &mut impl Blocker, ) -> Option> { let minimum = usize::from(commitment.config().minimum_shards.get()); @@ -1228,18 +1229,22 @@ where return None; } - // Batch-validate all pending weak shards in parallel. + // Spawn batch-validation of all pending weak shards so we can yield + // while the work runs. let pending = std::mem::take(&mut self.pending_shards); - let (new_checked, to_block) = - strategy.map_partition_collect_vec(pending, |(peer, shard)| { - let checked = C::check( - &commitment.config(), - &commitment.root(), - shard.index, - &shard.data, - ); - (peer, checked.ok()) - }); + let (new_checked, to_block) = strategy + .spawn(move |s| { + s.map_partition_collect_vec(pending, |(peer, shard)| { + let checked = C::check( + &commitment.config(), + &commitment.root(), + shard.index, + &shard.data, + ); + (peer, checked.ok()) + }) + }) + .await; for peer in to_block { commonware_p2p::block!(blocker, peer, "invalid shard received"); @@ -1268,22 +1273,22 @@ where struct InsertCtx<'a, Sch, S> where Sch: CertificateScheme, - S: Strategy, + S: Bridge, { scheme: &'a Sch, strategy: &'a S, participants_len: u64, } -impl Clone for InsertCtx<'_, Sch, S> { +impl Clone for InsertCtx<'_, Sch, S> { fn clone(&self) -> Self { *self } } -impl Copy for InsertCtx<'_, Sch, S> {} +impl Copy for InsertCtx<'_, Sch, S> {} -impl<'a, Sch: CertificateScheme, S: Strategy> InsertCtx<'a, Sch, S> { +impl<'a, Sch: CertificateScheme, S: Bridge> InsertCtx<'a, Sch, S> { fn new(scheme: &'a Sch, strategy: &'a S) -> Self { let participants_len = u64::try_from(scheme.participants().len()) .expect("participant count impossibly out of bounds"); @@ -1345,6 +1350,11 @@ where self.common().checked_shards.as_slice() } + /// Takes the verified shards out of the state, leaving it empty. + fn take_checked_shards(&mut self) -> Vec { + std::mem::take(&mut self.common_mut().checked_shards) + } + /// Takes the pending action for this commitment's validated shard. /// /// Returns [`None`] if the leader's shard hasn't been validated yet. @@ -1407,7 +1417,7 @@ where ) -> bool where Sch: CertificateScheme, - S: Strategy, + S: Bridge, X: Blocker, { let Some(sender_index) = ctx.scheme.participants().index(&sender) else { diff --git a/consensus/src/marshal/config.rs b/consensus/src/marshal/config.rs index a9a8c66010b..c55f766970b 100644 --- a/consensus/src/marshal/config.rs +++ b/consensus/src/marshal/config.rs @@ -3,7 +3,7 @@ use crate::{ Block, }; use commonware_cryptography::certificate::Provider; -use commonware_parallel::Strategy; +use commonware_parallel::Bridge; use commonware_runtime::buffer::paged::CacheRef; use std::num::{NonZeroU64, NonZeroUsize}; @@ -31,7 +31,7 @@ where B: Block, P: Provider, ES: Epocher, - T: Strategy, + T: Bridge, { /// Provider for epoch-specific signing schemes. /// diff --git a/consensus/src/marshal/core/actor.rs b/consensus/src/marshal/core/actor.rs index 54597e252e4..0c180fff8ef 100644 --- a/consensus/src/marshal/core/actor.rs +++ b/consensus/src/marshal/core/actor.rs @@ -24,7 +24,7 @@ use commonware_cryptography::{ }; use commonware_macros::select_loop; use commonware_p2p::Recipients; -use commonware_parallel::Strategy; +use commonware_parallel::Bridge; use commonware_resolver::Resolver; use commonware_runtime::{ spawn_cell, telemetry::metrics::status::GaugeExt, BufferPooler, Clock, ContextCell, Handle, @@ -210,7 +210,7 @@ where >, FB: Blocks, ES: Epocher, - T: Strategy, + T: Bridge, A: Acknowledgement, { // ---------- Context ---------- @@ -275,7 +275,7 @@ where >, FB: Blocks, ES: Epocher, - T: Strategy, + T: Bridge, A: Acknowledgement, { /// Create a new application actor. diff --git a/consensus/src/ordered_broadcast/config.rs b/consensus/src/ordered_broadcast/config.rs index 87a2a83ff34..eb2e4f1f802 100644 --- a/consensus/src/ordered_broadcast/config.rs +++ b/consensus/src/ordered_broadcast/config.rs @@ -4,7 +4,7 @@ use crate::{ Automaton, Monitor, Relay, Reporter, }; use commonware_cryptography::{certificate::Provider, Digest, Signer}; -use commonware_parallel::Strategy; +use commonware_parallel::Bridge; use commonware_runtime::buffer::paged::CacheRef; use std::{ num::{NonZeroU64, NonZeroUsize}, @@ -21,7 +21,7 @@ pub struct Config< R: Relay, Z: Reporter>, M: Monitor, - T: Strategy, + T: Bridge, > { /// The signer used when this engine acts as a sequencer. /// diff --git a/consensus/src/ordered_broadcast/engine.rs b/consensus/src/ordered_broadcast/engine.rs index 75fb05b2e12..d5c0e5470c6 100644 --- a/consensus/src/ordered_broadcast/engine.rs +++ b/consensus/src/ordered_broadcast/engine.rs @@ -29,7 +29,7 @@ use commonware_p2p::{ utils::codec::{wrap, WrappedSender}, Receiver, Recipients, Sender, }; -use commonware_parallel::Strategy; +use commonware_parallel::Bridge; use commonware_runtime::{ buffer::paged::CacheRef, spawn_cell, @@ -72,7 +72,7 @@ pub struct Engine< R: Relay, Z: Reporter>, M: Monitor, - T: Strategy, + T: Bridge, > { //////////////////////////////////////// // Interfaces @@ -210,7 +210,7 @@ impl< R: Relay, Z: Reporter>, M: Monitor, - T: Strategy, + T: Bridge, > Engine { /// Creates a new engine with the given context and configuration. @@ -404,7 +404,7 @@ impl< continue; } }; - let result = match self.validate_node(&node, &sender) { + let result = match self.validate_node(&node, &sender).await { Ok(result) => result, Err(err) => { debug!(?err, ?sender, "node validate failed"); @@ -453,7 +453,7 @@ impl< continue; } }; - if let Err(err) = self.validate_ack(&ack, &sender) { + if let Err(err) = self.validate_ack(&ack, &sender).await { debug!(?err, ?sender, "ack validate failed"); continue; }; @@ -879,7 +879,7 @@ impl< /// If valid (and not already the tracked tip for the sender), returns the implied /// parent chunk and its certificate. /// Else returns an error if the `Node` is invalid. - fn validate_node( + async fn validate_node( &mut self, node: &Node, sender: &C::PublicKey, @@ -900,20 +900,21 @@ impl< // Validate chunk self.validate_chunk(&node.chunk, self.epoch)?; - // Verify the node - node.verify( - &mut self.context, - &self.chunk_verifier, - &self.validators_provider, - &self.strategy, - ) + // Verify the node (spawned to avoid blocking the async worker) + let node = node.clone(); + let chunk_verifier = self.chunk_verifier.clone(); + let validators_provider = self.validators_provider.clone(); + let mut context = self.context.with_label("verify"); + self.strategy + .spawn(move |s| node.verify(&mut context, &chunk_verifier, &validators_provider, &s)) + .await } /// Takes a raw ack (from sender) from the p2p network and validates it. /// /// Returns the chunk, epoch, and vote if the ack is valid. /// Returns an error if the ack is invalid. - fn validate_ack( + async fn validate_ack( &mut self, ack: &Ack, sender: &::PublicKey, @@ -963,7 +964,14 @@ impl< } // Validate the vote signature - if !ack.verify(&mut self.context, scheme.as_ref(), &self.strategy) { + let ack_clone = ack.clone(); + let scheme = scheme.clone(); + let mut context = self.context.with_label("verify"); + if !self + .strategy + .spawn(move |s| ack_clone.verify(&mut context, &*scheme, &s)) + .await + { return Err(Error::InvalidAckSignature); } diff --git a/consensus/src/simplex/actors/batcher/actor.rs b/consensus/src/simplex/actors/batcher/actor.rs index d517e712fe3..f7115aabb98 100644 --- a/consensus/src/simplex/actors/batcher/actor.rs +++ b/consensus/src/simplex/actors/batcher/actor.rs @@ -15,7 +15,7 @@ use crate::{ use commonware_cryptography::Digest; use commonware_macros::select_loop; use commonware_p2p::{utils::codec::WrappedReceiver, Blocker, Receiver}; -use commonware_parallel::Strategy; +use commonware_parallel::Bridge; use commonware_runtime::{ spawn_cell, telemetry::metrics::{ @@ -51,7 +51,7 @@ where D: Digest, Re: Reporter>, Rl: Relay, - T: Strategy, + T: Bridge, { context: ContextCell, @@ -88,7 +88,7 @@ where D: Digest, Re: Reporter>, Rl: Relay>, - T: Strategy, + T: Bridge, { pub fn new(context: E, cfg: Config) -> (Self, Mailbox) { let participants = cfg.scheme.participants().clone(); @@ -424,7 +424,12 @@ where } // Verify the certificate - if !notarization.verify(&mut self.context, &self.scheme, &self.strategy) { + let n = notarization.clone(); + let scheme = self.scheme.clone(); + let mut context = self.context.with_label("verify"); + if !self.strategy.spawn(move |s| { + n.verify(&mut context, &scheme, &s) + }).await { commonware_p2p::block!(self.blocker, sender, %view, "invalid notarization"); continue; } @@ -445,11 +450,12 @@ where } // Verify the certificate - if !nullification.verify::<_, D>( - &mut self.context, - &self.scheme, - &self.strategy, - ) { + let n = nullification.clone(); + let scheme = self.scheme.clone(); + let mut context = self.context.with_label("verify"); + if !self.strategy.spawn(move |s| { + n.verify::<_, D>(&mut context, &scheme, &s) + }).await { commonware_p2p::block!(self.blocker, sender, %view, "invalid nullification"); continue; } @@ -470,7 +476,12 @@ where } // Verify the certificate - if !finalization.verify(&mut self.context, &self.scheme, &self.strategy) { + let f = finalization.clone(); + let scheme = self.scheme.clone(); + let mut context = self.context.with_label("verify"); + if !self.strategy.spawn(move |s| { + f.verify(&mut context, &scheme, &s) + }).await { commonware_p2p::block!(self.blocker, sender, %view, "invalid finalization"); continue; } diff --git a/consensus/src/simplex/actors/batcher/mod.rs b/consensus/src/simplex/actors/batcher/mod.rs index 733c8963847..53ecd32110b 100644 --- a/consensus/src/simplex/actors/batcher/mod.rs +++ b/consensus/src/simplex/actors/batcher/mod.rs @@ -11,12 +11,12 @@ use crate::{ pub use actor::Actor; use commonware_cryptography::certificate::Scheme; use commonware_p2p::Blocker; -use commonware_parallel::Strategy; +use commonware_parallel::Bridge; pub use ingress::{Mailbox, Message}; pub use round::Round; pub use verifier::Verifier; -pub struct Config { +pub struct Config { pub scheme: S, pub blocker: B, diff --git a/consensus/src/simplex/actors/resolver/actor.rs b/consensus/src/simplex/actors/resolver/actor.rs index 6e66639ee0b..ec8771e5a07 100644 --- a/consensus/src/simplex/actors/resolver/actor.rs +++ b/consensus/src/simplex/actors/resolver/actor.rs @@ -16,7 +16,7 @@ use commonware_codec::{Decode, Encode}; use commonware_cryptography::Digest; use commonware_macros::select_loop; use commonware_p2p::{utils::StaticProvider, Blocker, Receiver, Sender}; -use commonware_parallel::Strategy; +use commonware_parallel::Bridge; use commonware_resolver::p2p; use commonware_runtime::{spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner}; use commonware_utils::{ @@ -34,7 +34,7 @@ pub struct Actor< S: Scheme, B: Blocker, D: Digest, - T: Strategy, + T: Bridge, > { context: ContextCell, scheme: S, @@ -55,7 +55,7 @@ impl< S: Scheme, B: Blocker, D: Digest, - T: Strategy, + T: Bridge, > Actor { pub fn new(context: E, cfg: Config) -> (Self, Mailbox) { @@ -151,7 +151,7 @@ impl< } /// Validates an incoming message, returning the parsed message if valid. - fn validate(&mut self, view: View, data: Bytes) -> Option> { + async fn validate(&mut self, view: View, data: Bytes) -> Option> { // Decode message let incoming = Certificate::::decode_cfg(data, &self.scheme.certificate_codec_config()).ok()?; @@ -179,7 +179,14 @@ impl< ); return None; } - if !notarization.verify(&mut self.context, &self.scheme, &self.strategy) { + let n = notarization.clone(); + let scheme = self.scheme.clone(); + let mut context = self.context.with_label("verify"); + if !self + .strategy + .spawn(move |s| n.verify(&mut context, &scheme, &s)) + .await + { debug!(%view, "notarization failed verification"); return None; } @@ -199,7 +206,14 @@ impl< ); return None; } - if !finalization.verify(&mut self.context, &self.scheme, &self.strategy) { + let f = finalization.clone(); + let scheme = self.scheme.clone(); + let mut context = self.context.with_label("verify"); + if !self + .strategy + .spawn(move |s| f.verify(&mut context, &scheme, &s)) + .await + { debug!(%view, "finalization failed verification"); return None; } @@ -219,7 +233,14 @@ impl< ); return None; } - if !nullification.verify::<_, D>(&mut self.context, &self.scheme, &self.strategy) { + let n = nullification.clone(); + let scheme = self.scheme.clone(); + let mut context = self.context.with_label("verify"); + if !self + .strategy + .spawn(move |s| n.verify::<_, D>(&mut context, &scheme, &s)) + .await + { debug!(%view, "nullification failed verification"); return None; } @@ -243,7 +264,7 @@ impl< response, } => { // Validate incoming message - let Some(parsed) = self.validate(view, data) else { + let Some(parsed) = self.validate(view, data).await else { // Resolver will block any peers that send invalid responses, so // we don't need to do again here response.send_lossy(false); diff --git a/consensus/src/simplex/actors/resolver/mod.rs b/consensus/src/simplex/actors/resolver/mod.rs index 5bede0a2a38..5e225afd547 100644 --- a/consensus/src/simplex/actors/resolver/mod.rs +++ b/consensus/src/simplex/actors/resolver/mod.rs @@ -6,13 +6,13 @@ use crate::types::Epoch; pub use actor::Actor; use commonware_cryptography::certificate::Scheme; use commonware_p2p::Blocker; -use commonware_parallel::Strategy; +use commonware_parallel::Bridge; pub use ingress::Mailbox; #[cfg(test)] pub use ingress::MailboxMessage; use std::time::Duration; -pub struct Config { +pub struct Config { pub scheme: S, pub blocker: B, diff --git a/consensus/src/simplex/config.rs b/consensus/src/simplex/config.rs index 520c5a2a5fe..b567a287fde 100644 --- a/consensus/src/simplex/config.rs +++ b/consensus/src/simplex/config.rs @@ -8,7 +8,7 @@ use crate::{ }; use commonware_cryptography::{certificate::Scheme, Digest}; use commonware_p2p::Blocker; -use commonware_parallel::Strategy; +use commonware_parallel::Bridge; use commonware_runtime::buffer::paged::CacheRef; use std::{num::NonZeroUsize, time::Duration}; @@ -50,7 +50,7 @@ where A: CertifiableAutomaton>, R: Relay, F: Reporter>, - T: Strategy, + T: Bridge, { /// Signing scheme for the consensus engine. /// @@ -153,7 +153,7 @@ impl< A: CertifiableAutomaton>, R: Relay, F: Reporter>, - T: Strategy, + T: Bridge, > Config { /// Assert enforces that all configuration values are valid. diff --git a/consensus/src/simplex/engine.rs b/consensus/src/simplex/engine.rs index 1b609032908..78ce6645155 100644 --- a/consensus/src/simplex/engine.rs +++ b/consensus/src/simplex/engine.rs @@ -11,7 +11,7 @@ use crate::{ use commonware_cryptography::Digest; use commonware_macros::select; use commonware_p2p::{Blocker, Receiver, Sender}; -use commonware_parallel::Strategy; +use commonware_parallel::Bridge; use commonware_runtime::{ spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage, }; @@ -28,7 +28,7 @@ pub struct Engine< A: CertifiableAutomaton, Digest = D>, R: Relay>, F: Reporter>, - T: Strategy, + T: Bridge, > { context: ContextCell, @@ -51,7 +51,7 @@ impl< A: CertifiableAutomaton, Digest = D>, R: Relay>, F: Reporter>, - T: Strategy, + T: Bridge, > Engine { /// Create a new `simplex` consensus engine. diff --git a/consensus/src/simplex/scheme/reporter.rs b/consensus/src/simplex/scheme/reporter.rs index 6563f2f7af2..462b8125c33 100644 --- a/consensus/src/simplex/scheme/reporter.rs +++ b/consensus/src/simplex/scheme/reporter.rs @@ -23,7 +23,7 @@ use crate::{ Reporter, }; use commonware_cryptography::{certificate, Digest}; -use commonware_parallel::Strategy; +use commonware_parallel::Bridge; use rand_core::CryptoRngCore; /// Reporter wrapper that filters and verifies activities based on scheme attributability. @@ -36,7 +36,7 @@ pub struct AttributableReporter< E: Clone + CryptoRngCore + Send + 'static, S: certificate::Scheme, D: Digest, - T: Strategy, + T: Bridge, R: Reporter>, > { /// RNG for certificate verification @@ -55,7 +55,7 @@ impl< E: Clone + CryptoRngCore + Send + 'static, S: certificate::Scheme, D: Digest, - T: Strategy, + T: Bridge, R: Reporter>, > AttributableReporter { @@ -75,7 +75,7 @@ impl< E: Clone + CryptoRngCore + Send + 'static, S: Scheme, D: Digest, - T: Strategy, + T: Bridge, R: Reporter>, > Reporter for AttributableReporter { @@ -83,12 +83,18 @@ impl< async fn report(&mut self, activity: Self::Activity) { // Verify peer activities if verification is enabled - if self.verify - && !activity.verified() - && !activity.verify(&mut self.rng, &self.scheme, &self.strategy) - { - // Drop unverified peer activity - return; + if self.verify && !activity.verified() { + let a = activity.clone(); + let scheme = self.scheme.clone(); + let mut rng = self.rng.clone(); + if !self + .strategy + .spawn(move |s| a.verify(&mut rng, &scheme, &s)) + .await + { + // Drop unverified peer activity + return; + } } // Filter based on scheme attributability diff --git a/parallel/Cargo.toml b/parallel/Cargo.toml index 555c9eb1e7e..82b6a61cea0 100644 --- a/parallel/Cargo.toml +++ b/parallel/Cargo.toml @@ -15,11 +15,18 @@ workspace = true [dependencies] cfg-if.workspace = true commonware-macros.workspace = true +commonware-utils = { workspace = true, optional = true } rayon = { workspace = true, optional = true } [dev-dependencies] +futures.workspace = true proptest.workspace = true [features] default = [ "std" ] -std = [ "commonware-macros/std", "dep:rayon" ] +std = [ + "commonware-macros/std", + "commonware-utils/std", + "dep:commonware-utils", + "dep:rayon", +] diff --git a/parallel/src/lib.rs b/parallel/src/lib.rs index 60fe7248125..11099c936cc 100644 --- a/parallel/src/lib.rs +++ b/parallel/src/lib.rs @@ -62,10 +62,16 @@ commonware_macros::stability_scope!(BETA { use cfg_if::cfg_if; - use core::fmt; + use core::{ + fmt, + future::Future, + pin::Pin, + task::{Context, Poll}, + }; cfg_if! { if #[cfg(feature = "std")] { + use commonware_utils::channel::oneshot; use rayon::{ iter::{IntoParallelIterator, ParallelIterator}, ThreadPool as RThreadPool, ThreadPoolBuildError, ThreadPoolBuilder, @@ -76,6 +82,101 @@ commonware_macros::stability_scope!(BETA { use alloc::vec::Vec; } } + + /// A handle to a computation dispatched via [`Bridge::spawn`]. + /// + /// `SpawnHandle` implements [`Future`] and resolves with the value returned by the + /// spawned closure. Implementations that complete their work inline (for example, + /// [`Sequential`]) return an already-resolved handle via [`SpawnHandle::ready`]; + /// implementations that defer work to another thread (for example, [`Rayon`]) return + /// a handle backed by a oneshot channel. + /// + /// # Panics + /// + /// When the handle is backed by a channel, polling it panics if the sender is + /// dropped without sending a value. Under rayon's default behavior a worker panic + /// aborts the process before the sender can drop, so this branch is primarily a + /// safety net for custom executors that install their own panic handlers. + pub struct SpawnHandle { + inner: SpawnHandleInner, + } + + enum SpawnHandleInner { + Ready(Option), + #[cfg(feature = "std")] + Pending(oneshot::Receiver), + } + + impl SpawnHandle { + /// Construct an already-resolved [`SpawnHandle`] from a value. + /// + /// Intended for [`Bridge`] implementations that compute `f` inline and have the + /// result available before returning the handle (like [`Sequential`]). Available in + /// `no_std` builds. + pub const fn ready(value: R) -> Self { + Self { + inner: SpawnHandleInner::Ready(Some(value)), + } + } + } + + // `SpawnHandle` never structurally pins its inner variants: the `Ready` slot hands + // its value out by move, and `oneshot::Receiver` is itself `Unpin`. Opting into + // `Unpin` unconditionally frees callers from requiring `R: Unpin`. + impl Unpin for SpawnHandle {} + + impl fmt::Debug for SpawnHandle { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.inner { + SpawnHandleInner::Ready(value) => { + f.debug_tuple("SpawnHandle::Ready").field(value).finish() + } + #[cfg(feature = "std")] + SpawnHandleInner::Pending(_) => f.debug_struct("SpawnHandle::Pending").finish(), + } + } + } + + /// Construct a pending [`SpawnHandle`] from a oneshot receiver. + /// + /// Intended for implementors of [`Bridge::spawn`] that dispatch `f` to another + /// thread and deliver the result through a oneshot channel. Only available when the + /// `std` feature is enabled, because the underlying channel requires `std`. + #[cfg(feature = "std")] + impl From> for SpawnHandle { + fn from(receiver: oneshot::Receiver) -> Self { + Self { + inner: SpawnHandleInner::Pending(receiver), + } + } + } + + impl Future for SpawnHandle { + type Output = R; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // The `Context` is only used by the `Pending` variant; silence any unused + // warnings in `no_std` builds where that variant does not exist. + #[cfg(not(feature = "std"))] + let _ = cx; + + let this = Pin::into_inner(self); + match &mut this.inner { + SpawnHandleInner::Ready(slot) => { + Poll::Ready(slot.take().expect("SpawnHandle polled after resolving")) + } + #[cfg(feature = "std")] + SpawnHandleInner::Pending(rx) => match Pin::new(rx).poll(cx) { + Poll::Ready(Ok(value)) => Poll::Ready(value), + Poll::Ready(Err(_)) => { + panic!("commonware-parallel: worker panicked before sending result") + } + Poll::Pending => Poll::Pending, + }, + } + } + } + /// A strategy for executing fold operations. /// /// This trait abstracts over sequential and parallel execution, allowing algorithms @@ -382,6 +483,46 @@ commonware_macros::stability_scope!(BETA { fn parallelism_hint(&self) -> usize; } + /// A bridge between async runtimes and a [`Strategy`]. + /// + /// Code that uses a [`Strategy`] directly is fully synchronous - it never + /// needs to `.await` anything. When callers in an async context want to + /// offload a [`Strategy`] computation without blocking the executor, they + /// can require `Bridge` instead and `.await` the returned [`SpawnHandle`]. + pub trait Bridge: Strategy { + /// Dispatch `f` to the underlying executor and return a [`SpawnHandle`] + /// that resolves to its result. + /// + /// The closure receives an owned clone of the strategy, which lets it + /// invoke [`Strategy`] methods without the caller having to pre-clone + /// `self`. + /// + /// # Panics + /// + /// If `f` panics, the spawned task's oneshot sender is dropped without + /// sending a value. The returned [`SpawnHandle`] will panic when polled + /// in that case, matching the behavior the caller would observe if they + /// had invoked `f` inline. + /// + /// # Examples + /// + /// ``` + /// use commonware_parallel::{Bridge, Sequential}; + /// use futures::executor::block_on; + /// + /// let strategy = Sequential; + /// let data = vec![1, 2, 3, 4, 5]; + /// let handle = strategy.spawn(move |s| { + /// s.fold(&data, || 0, |acc, &x| acc + x, |a, b| a + b) + /// }); + /// assert_eq!(block_on(handle), 15); + /// ``` + fn spawn(&self, f: F) -> SpawnHandle + where + F: FnOnce(Self) -> R + Send + 'static, + R: Send + 'static; + } + /// A sequential execution strategy. /// /// This strategy executes all operations on the current thread without any @@ -443,6 +584,16 @@ commonware_macros::stability_scope!(BETA { 1 } } + + impl Bridge for Sequential { + fn spawn(&self, f: F) -> SpawnHandle + where + F: FnOnce(Self) -> R + Send + 'static, + R: Send + 'static, + { + SpawnHandle::ready(f(self.clone())) + } + } }); commonware_macros::stability_scope!(BETA, cfg(feature = "std") { /// A clone-able wrapper around a [rayon]-compatible thread pool. @@ -562,18 +713,43 @@ commonware_macros::stability_scope!(BETA, cfg(feature = "std") { self.thread_pool.current_num_threads() } } + + impl Bridge for Rayon { + fn spawn(&self, f: F) -> SpawnHandle + where + F: FnOnce(Self) -> R + Send + 'static, + R: Send + 'static, + { + let (tx, rx) = oneshot::channel(); + let me = self.clone(); + self.thread_pool.spawn(move || { + let _ = tx.send(f(me)); + }); + rx.into() + } + } }); #[cfg(test)] mod test { - use crate::{Rayon, Sequential, Strategy}; + use crate::{Bridge, Rayon, Sequential, Strategy}; use core::num::NonZeroUsize; + use futures::executor::block_on; use proptest::prelude::*; fn parallel_strategy() -> Rayon { Rayon::new(NonZeroUsize::new(4).unwrap()).unwrap() } + #[test] + #[should_panic(expected = "worker failure")] + fn spawn_sequential_panic_propagates() { + let s = Sequential; + // Sequential runs `f` inline before returning the handle, so a panic in `f` + // unwinds through `spawn` itself - the original payload propagates directly. + let _handle = s.spawn(|_| -> () { panic!("worker failure") }); + } + proptest! { #[test] fn parallel_fold_init_matches_sequential(data in prop::collection::vec(any::(), 0..500)) { @@ -663,6 +839,54 @@ mod test { prop_assert_eq!(via_map, via_fold_init); } + #[test] + fn spawn_sequential_matches_direct_fold(data in prop::collection::vec(any::(), 0..500)) { + let s = Sequential; + + let direct: i32 = s.fold( + &data, + || 0i32, + |acc, &x| acc.wrapping_add(x), + |a, b| a.wrapping_add(b), + ); + + let handle = s.spawn(move |s| { + s.fold( + &data, + || 0i32, + |acc, &x| acc.wrapping_add(x), + |a, b| a.wrapping_add(b), + ) + }); + let via_spawn = block_on(handle); + + prop_assert_eq!(direct, via_spawn); + } + + #[test] + fn spawn_rayon_matches_direct_fold(data in prop::collection::vec(any::(), 0..500)) { + let s = parallel_strategy(); + + let direct: i32 = s.fold( + &data, + || 0i32, + |acc, &x| acc.wrapping_add(x), + |a, b| a.wrapping_add(b), + ); + + let handle = s.spawn(move |s| { + s.fold( + &data, + || 0i32, + |acc, &x| acc.wrapping_add(x), + |a, b| a.wrapping_add(b), + ) + }); + let via_spawn = block_on(handle); + + prop_assert_eq!(direct, via_spawn); + } + #[test] fn map_partition_collect_vec_returns_valid_results(data in prop::collection::vec(any::(), 0..500)) { let s = Sequential;