diff --git a/consensus/src/marshal/coding/marshaled.rs b/consensus/src/marshal/coding/marshaled.rs index baa15c9d59..f8016b5b81 100644 --- a/consensus/src/marshal/coding/marshaled.rs +++ b/consensus/src/marshal/coding/marshaled.rs @@ -799,12 +799,25 @@ where }; build_timer.observe(&runtime_context); + let round = consensus_context.round; let erasure_timer = erasure_encode_duration.timer(&runtime_context); - let coded_block = CodedBlock::::new(built_block, coding_config, &strategy); + let handle = runtime_context + .child("erasure_encode") + .with_attribute("round", round) + .shared(true) + .spawn(move |_| async move { + CodedBlock::::new(built_block, coding_config, &strategy) + }); + let coded_block = select! { + _ = tx.closed() => { + debug!(reason = "consensus dropped receiver", "skipping proposal"); + return; + }, + result = handle => result.expect("strategy task failed"), + }; erasure_timer.observe(&runtime_context); let commitment = coded_block.commitment(); - let round = consensus_context.round; if !marshal.proposed(round, coded_block).await { debug!(?round, ?commitment, "marshal rejected proposed block"); return; diff --git a/consensus/src/marshal/coding/shards/engine.rs b/consensus/src/marshal/coding/shards/engine.rs index 171852d8e4..f2bc23f84c 100644 --- a/consensus/src/marshal/coding/shards/engine.rs +++ b/consensus/src/marshal/coding/shards/engine.rs @@ -485,17 +485,17 @@ where match message { Message::Proposed { block, round } => { - self.broadcast_shards(&mut sender, round, block); + self.broadcast_shards(&mut sender, round, block).await; } Message::Discovered { commitment, leader, round, } => { - self.handle_external_proposal(&mut sender, commitment, leader, round); + self.handle_external_proposal(&mut sender, commitment, leader, round).await; } Message::Notarized { commitment, round } => { - self.handle_notarized_commitment(&mut sender, commitment, round); + self.handle_notarized_commitment(&mut sender, commitment, round).await; } Message::GetByCommitment { commitment, @@ -546,13 +546,13 @@ where debug!("receiver closed, stopping shard engine"); return; } => { - self.handle_network_shard(&mut sender, peer, shard); + self.handle_network_shard(&mut sender, peer, shard).await; }, } } /// Handles a decoded shard received from the network. - fn handle_network_shard>( + async fn handle_network_shard>( &mut self, sender: &mut WrappedSender>, peer: P, @@ -591,24 +591,44 @@ where } } - let state = self - .state - .get_mut(&commitment) - .expect("state checked as present"); - let progressed = state.on_network_shard( - peer, - shard, - InsertCtx::new(scheme.as_ref(), &self.strategy), - &mut self.blocker, - ); + let progressed = self + .handle_state_network_shard(commitment, peer, shard, scheme) + .await; if progressed { - self.try_advance(sender, commitment); + self.try_advance(sender, commitment).await; } } else { self.buffer_peer_shard(peer, shard); } } + async fn handle_state_network_shard( + &mut self, + commitment: Commitment, + peer: P, + shard: Shard, + scheme: std::sync::Arc, + ) -> bool { + let Some(mut state) = self.state.remove(&commitment) else { + return false; + }; + let participants_len = u64::try_from(scheme.participants().len()) + .expect("participant count impossibly out of bounds"); + let progressed = state.on_network_shard( + peer, + shard, + InsertCtx::new(scheme.as_ref()), + &mut self.blocker, + ); + if progressed { + state = self + .try_transition_state(commitment, participants_len, state) + .await; + } + self.state.insert(commitment, state); + progressed + } + /// Returns whether an incoming network shard should still be processed. /// /// Shards for reconstructed commitments are normally ignored. The only @@ -637,30 +657,44 @@ where /// - `Ok(None)` if reconstruction could not be attempted due to insufficient checked shards. /// - `Err(_)` if reconstruction was attempted but failed. #[allow(clippy::type_complexity)] - fn try_reconstruct( + async fn try_reconstruct( &mut self, commitment: Commitment, ) -> Result>, Error> { if let Some(entry) = self.reconstructed_blocks.get(&commitment) { return Ok(Some(entry.block.clone())); } - let Some(state) = self.state.get_mut(&commitment) else { + let Some(state) = self.state.remove(&commitment) else { return Ok(None); }; let round = state.round(); if state.checked_shards().len() < usize::from(commitment.config().minimum_shards.get()) { debug!(%commitment, "not enough checked shards to reconstruct block"); + self.state.insert(commitment, state); return Ok(None); } // Attempt to reconstruct the encoded blob let start = self.context.current(); - let blob = C::decode( - &commitment.config(), - &commitment.root(), - state.checked_shards().iter(), - &self.strategy, - ) - .map_err(Error::Coding)?; + let handle = self + .context + .child("erasure_decode") + .with_attribute("round", round) + .shared(true) + .spawn({ + let strategy = self.strategy.clone(); + move |_| async move { + let blob = C::decode( + &commitment.config(), + &commitment.root(), + state.checked_shards().iter(), + &strategy, + ); + (state, blob) + } + }); + let (state, blob) = handle.await.expect("strategy task failed"); + self.state.insert(commitment, state); + let blob = blob.map_err(Error::Coding)?; self.metrics .erasure_decode_duration .observe_between(start, self.context.current()); @@ -703,7 +737,7 @@ where } /// Handles leader announcements for a commitment and advances reconstruction. - fn handle_external_proposal>( + async fn handle_external_proposal>( &mut self, sender: &mut WrappedSender>, commitment: Commitment, @@ -754,9 +788,9 @@ where ReconstructionState::new(Some(leader), round, participants_len), ); } - let buffered_progress = self.ingest_buffered_shards(commitment); + let buffered_progress = self.ingest_buffered_shards(commitment).await; if buffered_progress { - self.try_advance(sender, commitment); + self.try_advance(sender, commitment).await; } } @@ -765,7 +799,7 @@ where /// This is intentionally narrower than leader discovery: it may reconstruct /// the block from sender-indexed gossip shards, but it cannot mark the /// local assigned shard as verified. - fn handle_notarized_commitment>( + async fn handle_notarized_commitment>( &mut self, sender: &mut WrappedSender>, commitment: Commitment, @@ -775,9 +809,9 @@ where return; } if self.state.contains_key(&commitment) { - let buffered_progress = self.ingest_buffered_shards(commitment); + let buffered_progress = self.ingest_buffered_shards(commitment).await; if buffered_progress { - self.try_advance(sender, commitment); + self.try_advance(sender, commitment).await; } return; } @@ -791,9 +825,9 @@ where commitment, ReconstructionState::new(None, round, participants_len), ); - let buffered_progress = self.ingest_buffered_shards(commitment); + let buffered_progress = self.ingest_buffered_shards(commitment).await; if buffered_progress { - self.try_advance(sender, commitment); + self.try_advance(sender, commitment).await; } } @@ -828,7 +862,7 @@ where /// reconstruct from many peer-gossiped shards. The local assigned shard is /// different: it is only valid when it came from the leader, and the leader's /// identity is needed before it can be accepted as assigned-shard evidence. - fn ingest_buffered_shards(&mut self, commitment: Commitment) -> bool { + async fn ingest_buffered_shards(&mut self, commitment: Commitment) -> bool { let state = self .state .get(&commitment) @@ -867,21 +901,55 @@ where } } - let state = self + let mut state = self .state - .get_mut(&commitment) + .remove(&commitment) .expect("reconstruction state checked before buffered shard drain"); // Ingest buffered shards into the active reconstruction state. Batch verification // will be triggered if there are enough shards to meet the quorum threshold. let mut progressed = false; - let ctx = InsertCtx::new(scheme.as_ref(), &self.strategy); + let participants_len = u64::try_from(scheme.participants().len()) + .expect("participant count impossibly out of bounds"); + let ctx = InsertCtx::new(scheme.as_ref()); for (peer, shard) in buffered { progressed |= state.on_network_shard(peer, shard, ctx, &mut self.blocker); } + if progressed { + state = self + .try_transition_state(commitment, participants_len, state) + .await; + } + self.state.insert(commitment, state); progressed } + async fn try_transition_state( + &mut self, + commitment: Commitment, + participants_len: u64, + state: ReconstructionState, + ) -> ReconstructionState { + let handle = self + .context + .child("transition_reconstruction") + .with_attribute("round", state.round()) + .shared(true) + .spawn({ + let strategy = self.strategy.clone(); + move |_| async move { + let mut state = state; + let to_block = state.try_transition(commitment, participants_len, &strategy); + (state, to_block) + } + }); + let (state, to_block) = handle.await.expect("strategy task failed"); + for peer in to_block { + commonware_p2p::block!(self.blocker, peer, "invalid shard received"); + } + state + } + /// Cache a block and notify all subscribers waiting on it. fn cache_block(&mut self, round: Round, block: CodedBlock) { let commitment = block.commitment(); @@ -899,7 +967,7 @@ where /// /// - Participants receive the shard matching their participant index. /// - Non-participants in aggregate membership receive the leader's shard. - fn broadcast_shards>( + async fn broadcast_shards>( &mut self, sender: &mut WrappedSender>, round: Round, @@ -920,7 +988,19 @@ where return; }; - let shard_count = block.shards(&self.strategy).len(); + let handle = self + .context + .child("count_shards") + .with_attribute("round", round) + .shared(true) + .spawn({ + let strategy = self.strategy.clone(); + move |_| async move { + let shard_count = block.shards(&strategy).len(); + (block, shard_count) + } + }); + let (block, shard_count) = handle.await.expect("strategy task failed"); if shard_count != participants.len() { warn!( %commitment, @@ -992,7 +1072,7 @@ where /// Broadcasts any pending validated shard for the given commitment and attempts /// reconstruction. If reconstruction succeeds or fails, the state is cleaned /// up and subscribers are notified. - fn try_advance>( + async fn try_advance>( &mut self, sender: &mut WrappedSender>, commitment: Commitment, @@ -1010,7 +1090,7 @@ where } } - match self.try_reconstruct(commitment) { + match self.try_reconstruct(commitment).await { Ok(Some(block)) => { // Do not prune other reconstruction state here. A Byzantine // leader can equivocate by proposing multiple commitments in @@ -1362,17 +1442,16 @@ where H: Hasher, { /// Check whether quorum is met and, if so, batch-validate all pending - /// shards in parallel. Returns `Some(ReadyState)` on successful transition. + /// shards in parallel. fn try_transition( &mut self, commitment: Commitment, participants_len: u64, strategy: &impl Strategy, - blocker: &mut impl Blocker, - ) -> Option> { + ) -> (Option>, Vec

) { let minimum = usize::from(commitment.config().minimum_shards.get()); if self.common.checked_shards.len() + self.pending_shards.len() < minimum { - return None; + return (None, Vec::new()); } // Batch-validate all pending weak shards in parallel. @@ -1388,16 +1467,13 @@ where (peer, checked.ok()) }); - for peer in to_block { - commonware_p2p::block!(blocker, peer, "invalid shard received"); - } for checked in new_checked { self.common.checked_shards.push(checked); } // After validation, some may have failed; recheck threshold. if self.common.checked_shards.len() < minimum { - return None; + return (None, to_block); } // Transition to Ready. @@ -1407,38 +1483,29 @@ where &mut self.common, CommonState::new(leader, round, participants_len), ); - Some(ReadyState { common }) + (Some(ReadyState { common }), to_block) } } /// Context required for processing incoming network shards. -struct InsertCtx<'a, Sch, S> +struct InsertCtx<'a, Sch> where Sch: CertificateScheme, - S: Strategy, { scheme: &'a Sch, - strategy: &'a S, - participants_len: u64, } -impl Clone for InsertCtx<'_, Sch, S> { +impl Clone for InsertCtx<'_, Sch> { fn clone(&self) -> Self { *self } } -impl Copy for InsertCtx<'_, Sch, S> {} +impl Copy for InsertCtx<'_, Sch> {} -impl<'a, Sch: CertificateScheme, S: Strategy> InsertCtx<'a, Sch, S> { - fn new(scheme: &'a Sch, strategy: &'a S) -> Self { - let participants_len = u64::try_from(scheme.participants().len()) - .expect("participant count impossibly out of bounds"); - Self { - scheme, - strategy, - participants_len, - } +impl<'a, Sch: CertificateScheme> InsertCtx<'a, Sch> { + const fn new(scheme: &'a Sch) -> Self { + Self { scheme } } } @@ -1496,6 +1563,23 @@ where self.common().round } + /// Transition to `Ready` when enough shards have been collected. + fn try_transition( + &mut self, + commitment: Commitment, + participants_len: u64, + strategy: &impl Strategy, + ) -> Vec

{ + let Self::AwaitingQuorum(state) = self else { + return Vec::new(); + }; + let (ready, to_block) = state.try_transition(commitment, participants_len, strategy); + if let Some(ready) = ready { + *self = Self::Ready(ready); + } + to_block + } + /// Returns all verified shards accumulated for reconstruction. const fn checked_shards(&self) -> &[C::CheckedShard] { self.common().checked_shards.as_slice() @@ -1554,16 +1638,15 @@ where /// - Before a reconstruction state exists, shards are buffered at the /// engine level in bounded per-peer queues until [`Mailbox::discovered`] /// or [`Mailbox::notarized`] creates state for this commitment. - fn on_network_shard( + fn on_network_shard( &mut self, sender: P, shard: Shard, - ctx: InsertCtx<'_, Sch, S>, + ctx: InsertCtx<'_, Sch>, blocker: &mut X, ) -> bool where Sch: CertificateScheme, - S: Strategy, X: Blocker, { let Some(sender_index) = ctx.scheme.participants().index(&sender) else { @@ -1621,27 +1704,13 @@ where // even after transitioning to Ready. This ensures we broadcast // our own shard to help slower peers reach quorum. if is_from_leader && !self.common().assigned_shard_verified { - let progressed = self.common_mut().verify_assigned_shard( + return self.common_mut().verify_assigned_shard( sender, commitment, indexed, ctx.scheme.me().is_some(), blocker, ); - - if progressed { - if let Self::AwaitingQuorum(state) = self { - if let Some(ready) = state.try_transition( - commitment, - ctx.participants_len, - ctx.strategy, - blocker, - ) { - *self = Self::Ready(ready); - } - } - } - return progressed; } // Non-leader shards are only accepted while awaiting quorum. @@ -1656,11 +1725,6 @@ where .insert(indexed.index, indexed.data.clone()); state.common.contributed.set(u64::from(indexed.index), true); state.pending_shards.insert(sender, indexed); - if let Some(ready) = - state.try_transition(commitment, ctx.participants_len, ctx.strategy, blocker) - { - *self = Self::Ready(ready); - } true } diff --git a/consensus/src/marshal/core/actor.rs b/consensus/src/marshal/core/actor.rs index 2133ab6087..36c4aba10a 100644 --- a/consensus/src/marshal/core/actor.rs +++ b/consensus/src/marshal/core/actor.rs @@ -16,7 +16,7 @@ use crate::{ }, simplex::{ scheme::Scheme, - types::{verify_certificates, Finalization, Notarization, Subject}, + types::{verify_certificates, Finalization, Notarization}, }, types::{Epoch, Epocher, Height, Round, ViewDelta}, Block, Epochable, Heightable, Reporter, @@ -1366,59 +1366,71 @@ where return false; } - // Extract (subject, certificate) pairs for batch verification. - let certs: Vec<_> = delivers - .iter() - .map(|item| match item { - PendingVerification::Finalized { finalization, .. } => ( - Subject::Finalize { - proposal: &finalization.proposal, - }, - &finalization.certificate, - ), - PendingVerification::Notarized { notarization, .. } => ( - Subject::Notarize { - proposal: ¬arization.proposal, - }, - ¬arization.certificate, - ), - }) - .collect(); - // Batch verify using the all-epoch verifier if available, otherwise // batch verify per epoch using scoped verifiers. let verified = if let Some(scheme) = self.provider.all() { - verify_certificates( - self.context.as_mut(), - scheme.as_ref(), - &certs, - &self.strategy, - ) + let handle = self + .context + .child("verify_deliveries") + .with_attribute("count", delivers.len()) + .shared(true) + .spawn({ + let strategy = self.strategy.clone(); + move |mut context| async move { + let cert_refs: Vec<_> = delivers + .iter() + .map(PendingVerification::as_subject_and_certificate) + .collect(); + let verified = verify_certificates( + &mut context, + scheme.as_ref(), + &cert_refs, + &strategy, + ); + (delivers, verified) + } + }); + let (returned, verified) = handle.await.expect("strategy task failed"); + delivers = returned; + verified } else { let mut verified = vec![false; delivers.len()]; // Group indices by epoch. let mut by_epoch: BTreeMap> = BTreeMap::new(); - for (i, item) in delivers.iter().enumerate() { - let epoch = match item { - PendingVerification::Notarized { notarization, .. } => notarization.epoch(), - PendingVerification::Finalized { finalization, .. } => finalization.epoch(), - }; - by_epoch.entry(epoch).or_default().push(i); + for (i, cert) in delivers.iter().enumerate() { + by_epoch.entry(cert.epoch()).or_default().push(i); } // Batch verify each epoch group. - for (epoch, indices) in &by_epoch { - let Some(scheme) = self.provider.scoped(*epoch) else { + for (epoch, indices) in by_epoch { + let Some(scheme) = self.provider.scoped(epoch) else { continue; }; - let group: Vec<_> = indices.iter().map(|&i| certs[i]).collect(); - let results = verify_certificates( - self.context.as_mut(), - scheme.as_ref(), - &group, - &self.strategy, - ); + let handle = self + .context + .child("verify_epoch_deliveries") + .with_attribute("epoch", epoch) + .with_attribute("count", indices.len()) + .shared(true) + .spawn({ + let strategy = self.strategy.clone(); + move |mut context| async move { + let group_refs: Vec<_> = indices + .iter() + .map(|&i| delivers[i].as_subject_and_certificate()) + .collect(); + let results = verify_certificates( + &mut context, + scheme.as_ref(), + &group_refs, + &strategy, + ); + (delivers, indices, results) + } + }); + let (returned, indices, results) = handle.await.expect("strategy task failed"); + delivers = returned; for (j, &idx) in indices.iter().enumerate() { verified[idx] = results[j]; } diff --git a/consensus/src/marshal/core/delivery.rs b/consensus/src/marshal/core/delivery.rs index e67a06edb8..d363c12b6a 100644 --- a/consensus/src/marshal/core/delivery.rs +++ b/consensus/src/marshal/core/delivery.rs @@ -1,7 +1,10 @@ use super::Variant; -use crate::simplex::{ - scheme::Scheme, - types::{Finalization, Notarization}, +use crate::{ + simplex::{ + scheme::Scheme, + types::{Finalization, Notarization, Subject}, + }, + types::Epoch, Epochable, }; use commonware_cryptography::certificate::Scheme as CertificateScheme; use commonware_utils::channel::oneshot; @@ -34,4 +37,30 @@ where } } } + + pub(super) fn epoch(&self) -> Epoch { + match self { + Self::Notarized { notarization, .. } => notarization.epoch(), + Self::Finalized { finalization, .. } => finalization.epoch(), + } + } + + pub(super) const fn as_subject_and_certificate( + &self, + ) -> (Subject<'_, V::Commitment>, &S::Certificate) { + match self { + Self::Notarized { notarization, .. } => ( + Subject::Notarize { + proposal: ¬arization.proposal, + }, + ¬arization.certificate, + ), + Self::Finalized { finalization, .. } => ( + Subject::Finalize { + proposal: &finalization.proposal, + }, + &finalization.certificate, + ), + } + } } diff --git a/consensus/src/simplex/actors/batcher/actor.rs b/consensus/src/simplex/actors/batcher/actor.rs index 253a3ec96a..98888515de 100644 --- a/consensus/src/simplex/actors/batcher/actor.rs +++ b/consensus/src/simplex/actors/batcher/actor.rs @@ -6,7 +6,7 @@ use crate::{ interesting, metrics::{Inbound, Peer, TimeoutReason}, scheme::Scheme, - types::{Activity, Certificate, Proposal, Vote}, + types::{Activity, Certificate, Finalization, Notarization, Nullification, Proposal, Vote}, Plan, }, types::{Epoch, Participant, View, ViewDelta}, @@ -27,7 +27,7 @@ use commonware_runtime::{ }; use commonware_utils::ordered::{Quorum, Set}; use rand_core::CryptoRngCore; -use std::collections::BTreeMap; +use std::{collections::BTreeMap, future::Future, sync::Arc}; use tracing::{debug, trace}; /// Tracks the current view, its leader, and whether the voter has @@ -38,6 +38,9 @@ struct Current { timed_out: bool, } +type VerifiedVotes = (Vec>, Vec); +type RoundOutput = (Round, Option); + pub struct Actor where E: Spawner + Metrics + Clock + CryptoRngCore, @@ -51,7 +54,7 @@ where context: ContextCell, participants: Set, - scheme: S, + scheme: Arc, blocker: B, reporter: Re, @@ -86,7 +89,8 @@ where T: Strategy, { pub fn new(context: E, cfg: Config) -> (Self, Mailbox) { - let participants = cfg.scheme.participants().clone(); + let scheme = Arc::new(cfg.scheme); + let participants = scheme.participants().clone(); let participant_count = participants.len(); let added = context.counter("added", "number of messages added to the verifier"); let verified = context.counter("verified", "number of messages verified"); @@ -117,7 +121,7 @@ where context: ContextCell::new(context), participants, - scheme: cfg.scheme, + scheme, blocker: cfg.blocker, reporter: cfg.reporter, @@ -153,6 +157,102 @@ where ) } + fn verify_round( + &self, + view: View, + round: Round, + ) -> impl Future>> + Send + 'static { + let handle = self + .context + .child("verify_round") + .with_attribute("epoch", self.epoch) + .with_attribute("view", view) + .shared(true) + .spawn({ + let strategy = self.strategy.clone(); + move |mut context| async move { + let mut round = round; + let verified = if round.ready_notarizes() { + Some(round.verify_notarizes(&mut context, &strategy)) + } else if round.ready_nullifies() { + Some(round.verify_nullifies(&mut context, &strategy)) + } else if round.ready_finalizes() { + Some(round.verify_finalizes(&mut context, &strategy)) + } else { + None + }; + (round, verified) + } + }); + async move { handle.await.expect("strategy task failed") } + } + + fn try_construct_notarization( + &self, + view: View, + round: Round, + ) -> impl Future>> + Send + 'static { + let handle = self + .context + .child("construct_notarization") + .with_attribute("epoch", self.epoch) + .with_attribute("view", view) + .shared(true) + .spawn({ + let strategy = self.strategy.clone(); + move |_| async move { + let mut round = round; + let notarization = round.try_construct_notarization(&strategy); + (round, notarization) + } + }); + async move { handle.await.expect("strategy task failed") } + } + + fn try_construct_nullification( + &self, + view: View, + round: Round, + ) -> impl Future>> + Send + 'static { + let handle = self + .context + .child("construct_nullification") + .with_attribute("epoch", self.epoch) + .with_attribute("view", view) + .shared(true) + .spawn({ + let strategy = self.strategy.clone(); + move |_| async move { + let mut round = round; + let nullification = round.try_construct_nullification(&strategy); + (round, nullification) + } + }); + async move { handle.await.expect("strategy task failed") } + } + + fn try_construct_finalization( + &self, + view: View, + round: Round, + ) -> impl Future>> + Send + 'static { + let handle = self + .context + .child("construct_finalization") + .with_attribute("epoch", self.epoch) + .with_attribute("view", view) + .shared(true) + .spawn({ + let strategy = self.strategy.clone(); + move |_| async move { + let mut round = round; + let finalization = round.try_construct_finalization(&strategy); + (round, finalization) + } + }); + async move { handle.await.expect("strategy task failed") } + } + /// Records the latest view message received from a participant. /// /// This mechanism is not resistant to malicious validators (nor is @@ -534,24 +634,24 @@ where } // Process the updated view (if any) - let Some(round) = work.get_mut(&updated_view) else { + let Some(round) = work.remove(&updated_view) else { continue; }; // Batch verify votes if ready - let timer = self.verify_latency.timer(self.context.as_ref()); - let verified = if round.ready_notarizes() { - Some(round.verify_notarizes(self.context.as_mut(), &self.strategy)) - } else if round.ready_nullifies() { - Some(round.verify_nullifies(self.context.as_mut(), &self.strategy)) - } else if round.ready_finalizes() { - Some(round.verify_finalizes(self.context.as_mut(), &self.strategy)) + let (mut round, verified) = if round.ready_notarizes() + || round.ready_nullifies() + || round.ready_finalizes() + { + let timer = self.verify_latency.timer(self.context.as_ref()); + let (round, verified) = self.verify_round(updated_view, round).await; + (round, verified.map(|verified| (timer, verified))) } else { - None + (round, None) }; // Process batch verification results - if let Some((voters, failed)) = verified { + if let Some((timer, (voters, failed))) = verified { timer.observe(self.context.as_ref()); // Process verified votes @@ -584,32 +684,49 @@ where } // Try to construct and forward certificates - if let Some(notarization) = - self.recover_latency.time_some(self.context.as_ref(), || { - round.try_construct_notarization(&self.scheme, &self.strategy) - }) - { - debug!(view = %updated_view, "constructed notarization, forwarding to voter"); + let round = if round.ready_construct_notarization() { + let timer = self.recover_latency.timer(self.context.as_ref()); + let (round, notarization) = + self.try_construct_notarization(updated_view, round).await; + if let Some(notarization) = notarization { + timer.observe(self.context.as_ref()); + debug!(view = %updated_view, "constructed notarization, forwarding to voter"); + + // Forward notarization to voter + voter.recovered(Certificate::Notarization(notarization)); + } + round + } else { + round + }; + let round = if round.ready_construct_nullification() { + let timer = self.recover_latency.timer(self.context.as_ref()); + let (round, nullification) = + self.try_construct_nullification(updated_view, round).await; + if let Some(nullification) = nullification { + timer.observe(self.context.as_ref()); + debug!(view = %updated_view, "constructed nullification, forwarding to voter"); + voter.recovered(Certificate::Nullification(nullification)); + } + round + } else { + round + }; + let round = if round.ready_construct_finalization() { + let timer = self.recover_latency.timer(self.context.as_ref()); + let (round, finalization) = + self.try_construct_finalization(updated_view, round).await; + if let Some(finalization) = finalization { + timer.observe(self.context.as_ref()); + debug!(view = %updated_view, "constructed finalization, forwarding to voter"); + voter.recovered(Certificate::Finalization(finalization)); + } + round + } else { + round + }; - // Forward notarization to voter - voter.recovered(Certificate::Notarization(notarization)); - } - if let Some(nullification) = - self.recover_latency.time_some(self.context.as_ref(), || { - round.try_construct_nullification(&self.scheme, &self.strategy) - }) - { - debug!(view = %updated_view, "constructed nullification, forwarding to voter"); - voter.recovered(Certificate::Nullification(nullification)); - } - if let Some(finalization) = - self.recover_latency.time_some(self.context.as_ref(), || { - round.try_construct_finalization(&self.scheme, &self.strategy) - }) - { - debug!(view = %updated_view, "constructed finalization, forwarding to voter"); - voter.recovered(Certificate::Finalization(finalization)); - } + work.insert(updated_view, round); // Drop any rounds that are no longer interesting while work.first_key_value().is_some_and(|(&view, _)| { diff --git a/consensus/src/simplex/actors/batcher/round.rs b/consensus/src/simplex/actors/batcher/round.rs index 2e65987d57..54bf6fb57f 100644 --- a/consensus/src/simplex/actors/batcher/round.rs +++ b/consensus/src/simplex/actors/batcher/round.rs @@ -18,6 +18,7 @@ use commonware_utils::{ N3f1, }; use rand_core::CryptoRngCore; +use std::sync::Arc; /// Per-view state for vote accumulation and certificate tracking. pub struct Round< @@ -58,7 +59,7 @@ impl< R: Reporter>, > Round { - pub fn new(participants: Set, scheme: S, blocker: B, reporter: R) -> Self { + pub fn new(participants: Set, scheme: Arc, blocker: B, reporter: R) -> Self { let quorum = participants.quorum::(); let len = participants.len(); Self { @@ -393,12 +394,17 @@ impl< } } + /// Returns true if a notarization can be constructed from verified votes. + pub fn ready_construct_notarization(&self) -> bool { + !self.has_notarization() + && self.verified_votes.len_notarizes() >= self.participants.quorum::() + } + /// Attempts to construct a notarization certificate from verified votes. /// /// Returns the certificate if we have quorum and haven't already constructed one. pub fn try_construct_notarization( &mut self, - scheme: &S, strategy: &impl Strategy, ) -> Option> { if self.has_notarization() { @@ -407,18 +413,26 @@ impl< if self.verified_votes.len_notarizes() < self.participants.quorum::() { return None; } - let notarization = - Notarization::from_notarizes(scheme, self.verified_votes.iter_notarizes(), strategy)?; + let notarization = Notarization::from_notarizes( + self.verifier.scheme(), + self.verified_votes.iter_notarizes(), + strategy, + )?; self.set_notarization(notarization.clone()); Some(notarization) } + /// Returns true if a nullification can be constructed from verified votes. + pub fn ready_construct_nullification(&self) -> bool { + !self.has_nullification() + && self.verified_votes.len_nullifies() >= self.participants.quorum::() + } + /// Attempts to construct a nullification certificate from verified votes. /// /// Returns the certificate if we have quorum and haven't already constructed one. pub fn try_construct_nullification( &mut self, - scheme: &S, strategy: &impl Strategy, ) -> Option> { if self.has_nullification() { @@ -427,18 +441,26 @@ impl< if self.verified_votes.len_nullifies() < self.participants.quorum::() { return None; } - let nullification = - Nullification::from_nullifies(scheme, self.verified_votes.iter_nullifies(), strategy)?; + let nullification = Nullification::from_nullifies( + self.verifier.scheme(), + self.verified_votes.iter_nullifies(), + strategy, + )?; self.set_nullification(nullification.clone()); Some(nullification) } + /// Returns true if a finalization can be constructed from verified votes. + pub fn ready_construct_finalization(&self) -> bool { + !self.has_finalization() + && self.verified_votes.len_finalizes() >= self.participants.quorum::() + } + /// Attempts to construct a finalization certificate from verified votes. /// /// Returns the certificate if we have quorum and haven't already constructed one. pub fn try_construct_finalization( &mut self, - scheme: &S, strategy: &impl Strategy, ) -> Option> { if self.has_finalization() { @@ -447,8 +469,11 @@ impl< if self.verified_votes.len_finalizes() < self.participants.quorum::() { return None; } - let finalization = - Finalization::from_finalizes(scheme, self.verified_votes.iter_finalizes(), strategy)?; + let finalization = Finalization::from_finalizes( + self.verifier.scheme(), + self.verified_votes.iter_finalizes(), + strategy, + )?; self.set_finalization(finalization.clone()); Some(finalization) } diff --git a/consensus/src/simplex/actors/batcher/verifier.rs b/consensus/src/simplex/actors/batcher/verifier.rs index fffd7bfa87..effdeebe71 100644 --- a/consensus/src/simplex/actors/batcher/verifier.rs +++ b/consensus/src/simplex/actors/batcher/verifier.rs @@ -8,6 +8,7 @@ use crate::{ use commonware_cryptography::{certificate::Verification, Digest}; use commonware_parallel::Strategy; use rand_core::CryptoRngCore; +use std::sync::Arc; /// `Verifier` is a utility for tracking and verifying consensus messages. /// @@ -26,7 +27,7 @@ use rand_core::CryptoRngCore; /// [secp256r1]: crate::simplex::scheme::secp256r1 pub struct Verifier, D: Digest> { /// Signing scheme used to verify votes and assemble certificates. - scheme: S, + scheme: Arc, /// Required quorum size. quorum: usize, @@ -53,6 +54,11 @@ pub struct Verifier, D: Digest> { } impl, D: Digest> Verifier { + /// Returns the scheme used by this verifier. + pub(super) fn scheme(&self) -> &S { + self.scheme.as_ref() + } + /// Creates a new `Verifier`. /// /// # Arguments @@ -61,9 +67,9 @@ impl, D: Digest> Verifier { /// * `quorum` - An optional `u32` specifying the number of votes (2f+1) /// required to reach a quorum. If `None`, batch verification readiness /// checks based on quorum size are skipped. - pub const fn new(scheme: S, quorum: u32) -> Self { + pub fn new(scheme: impl Into>, quorum: u32) -> Self { Self { - scheme, + scheme: scheme.into(), // Store quorum as usize to simplify comparisons against queue lengths. quorum: quorum as usize, diff --git a/consensus/src/simplex/actors/resolver/actor.rs b/consensus/src/simplex/actors/resolver/actor.rs index aa67b254be..1c0bc386e7 100644 --- a/consensus/src/simplex/actors/resolver/actor.rs +++ b/consensus/src/simplex/actors/resolver/actor.rs @@ -22,7 +22,7 @@ use commonware_resolver::p2p; use commonware_runtime::{spawn_cell, BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner}; use commonware_utils::{channel::fallible::OneshotExt, ordered::Quorum, sequence::U64}; use rand_core::CryptoRngCore; -use std::{num::NonZeroUsize, time::Duration}; +use std::{num::NonZeroUsize, sync::Arc, time::Duration}; use tracing::debug; /// Requests are made concurrently to multiple peers. @@ -34,7 +34,7 @@ pub struct Actor< T: Strategy, > { context: ContextCell, - scheme: S, + scheme: Arc, blocker: Option, strategy: T, @@ -60,7 +60,7 @@ impl< ( Self { context: ContextCell::new(context), - scheme: cfg.scheme, + scheme: Arc::new(cfg.scheme), blocker: Some(cfg.blocker), strategy: cfg.strategy, @@ -143,13 +143,13 @@ impl< if message.response_closed() { continue; } - self.handle_resolver(message, &mut voter, &mut resolver); + self.handle_resolver(message, &mut voter, &mut resolver).await; }, } } /// Validates an incoming message, returning the parsed message if valid. - fn validate(&mut self, view: View, data: Bytes) -> Option> { + async fn validate(&mut self, view: View, data: Bytes) -> Option> { // Decode message let incoming = Certificate::::decode_cfg(data, &self.scheme.certificate_codec_config()).ok()?; @@ -177,7 +177,22 @@ impl< ); return None; } - if !notarization.verify(self.context.as_mut(), &self.scheme, &self.strategy) { + let scheme = self.scheme.clone(); + let handle = self + .context + .child("verify_notarization") + .with_attribute("epoch", self.epoch) + .with_attribute("view", view) + .shared(true) + .spawn({ + let strategy = self.strategy.clone(); + move |mut context| async move { + let valid = notarization.verify(&mut context, &scheme, &strategy); + (notarization, valid) + } + }); + let (notarization, valid) = handle.await.expect("strategy task failed"); + if !valid { debug!(%view, "notarization failed verification"); return None; } @@ -197,7 +212,22 @@ impl< ); return None; } - if !finalization.verify(self.context.as_mut(), &self.scheme, &self.strategy) { + let scheme = self.scheme.clone(); + let handle = self + .context + .child("verify_finalization") + .with_attribute("epoch", self.epoch) + .with_attribute("view", view) + .shared(true) + .spawn({ + let strategy = self.strategy.clone(); + move |mut context| async move { + let valid = finalization.verify(&mut context, &scheme, &strategy); + (finalization, valid) + } + }); + let (finalization, valid) = handle.await.expect("strategy task failed"); + if !valid { debug!(%view, "finalization failed verification"); return None; } @@ -217,11 +247,23 @@ impl< ); return None; } - if !nullification.verify::<_, D>( - self.context.as_mut(), - &self.scheme, - &self.strategy, - ) { + let scheme = self.scheme.clone(); + let handle = self + .context + .child("verify_nullification") + .with_attribute("epoch", self.epoch) + .with_attribute("view", view) + .shared(true) + .spawn({ + let strategy = self.strategy.clone(); + move |mut context| async move { + let valid = + nullification.verify::<_, D>(&mut context, &scheme, &strategy); + (nullification, valid) + } + }); + let (nullification, valid) = handle.await.expect("strategy task failed"); + if !valid { debug!(%view, "nullification failed verification"); return None; } @@ -232,7 +274,7 @@ impl< } /// Handles a message from the [p2p::Engine]. - fn handle_resolver( + async fn handle_resolver( &mut self, message: HandlerMessage, voter: &mut voter::Mailbox, @@ -245,7 +287,7 @@ impl< response, } => { // Validate incoming message - let Some(parsed) = self.validate(view, data) else { + let Some(parsed) = self.validate(view, data).await else { // Resolver will block any peers that send invalid responses, so // we don't need to do again here response.send_lossy(false);