diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index 4f656a4a309..3bb9663c7e4 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -194,16 +194,12 @@ stability_scope!(BETA, cfg(not(target_arch = "wasm32")) { /// treat every broadcast identically can set this to `()`. type Plan: Send; - /// Broadcast a payload to the given recipients. - /// - /// Returns `true` when the relay accepted the payload for the requested - /// broadcast plan. Returns `false` when the relay could not complete the - /// handoff. + /// Broadcast a payload according to the given plan. fn broadcast( &mut self, payload: Self::Digest, plan: Self::Plan, - ) -> impl Future + Send; + ) -> impl Future + Send; } /// Reporter is the interface responsible for reporting activity to some external actor. diff --git a/consensus/src/marshal/application/validation.rs b/consensus/src/marshal/application/validation.rs index e2e8c5e2b07..82b2697da6c 100644 --- a/consensus/src/marshal/application/validation.rs +++ b/consensus/src/marshal/application/validation.rs @@ -8,12 +8,6 @@ use crate::{ types::{Epoch, Epocher, Height, Round}, }; use commonware_cryptography::certificate::Scheme; -use commonware_utils::sync::Mutex; -use std::sync::Arc; - -/// Cache for the last block built during proposal, shared between the -/// proposer task and the broadcast path. -pub(crate) type LastBuilt = Arc>>; /// Which stage of verification a block has reached. /// diff --git a/consensus/src/marshal/coding/marshaled.rs b/consensus/src/marshal/coding/marshaled.rs index 48a7af37da5..4db0ddf6823 100644 --- a/consensus/src/marshal/coding/marshaled.rs +++ b/consensus/src/marshal/coding/marshaled.rs @@ -82,9 +82,7 @@ use crate::{ marshal::{ ancestry::AncestorStream, application::{ - validation::{ - is_inferred_reproposal_at_certify, is_valid_reproposal_at_verify, LastBuilt, Stage, - }, + validation::{is_inferred_reproposal_at_certify, is_valid_reproposal_at_verify, Stage}, verification_tasks::VerificationTasks, }, coding::{ @@ -106,6 +104,7 @@ use commonware_cryptography::{ Committable, Digestible, Hasher, }; use commonware_macros::select; +use commonware_p2p::Recipients; use commonware_parallel::Strategy; use commonware_runtime::{ telemetry::metrics::histogram::{Buckets, Timed}, @@ -116,7 +115,6 @@ use commonware_utils::{ fallible::OneshotExt, oneshot::{self, error::RecvError}, }, - sync::Mutex, NZU16, }; use futures::future::{ready, try_join, Either, Ready}; @@ -183,7 +181,6 @@ where scheme_provider: Z, epocher: ES, strategy: S, - last_built: LastBuilt>, verification_tasks: VerificationTasks, cached_genesis: Arc)>>, @@ -266,7 +263,6 @@ where scheme_provider, strategy, epocher, - last_built: Arc::new(Mutex::new(None)), verification_tasks: VerificationTasks::new(), cached_genesis: Arc::new(OnceLock::new()), @@ -491,15 +487,15 @@ where /// boundary block to avoid creating blocks that would be invalidated by the epoch transition. /// /// The proposal operation is spawned in a background task and returns a receiver that will - /// contain the proposed block's digest when ready. The built block is cached for later - /// broadcasting. + /// contain the proposed block's commitment when ready. The built block is persisted via + /// [`core::Mailbox::verified`] before the commitment is delivered, so consensus can rely + /// on the block surviving restart. async fn propose( &mut self, consensus_context: Context::PublicKey>, ) -> oneshot::Receiver { let mut marshal = self.marshal.clone(); let mut application = self.application.clone(); - let last_built = self.last_built.clone(); let epocher = self.epocher.clone(); let strategy = self.strategy.clone(); let cached_genesis = self.cached_genesis.clone(); @@ -537,10 +533,6 @@ where if let Some(block) = marshal.get_verified(consensus_context.round).await { let commitment = block.commitment(); let round = consensus_context.round; - { - let mut lock = last_built.lock(); - *lock = Some((round, block)); - } let success = tx.send_lossy(commitment); debug!( ?round, @@ -593,11 +585,14 @@ where if parent.height() == last_in_epoch { let commitment = parent.commitment(); let round = consensus_context.round; - { - let mut lock = last_built.lock(); - *lock = Some((round, parent)); + if !marshal.verified(round, parent).await { + debug!( + ?round, + ?commitment, + "marshal rejected re-proposed boundary block" + ); + return; } - let success = tx.send_lossy(commitment); debug!( ?round, @@ -643,11 +638,10 @@ where let commitment = coded_block.commitment(); let round = consensus_context.round; - { - let mut lock = last_built.lock(); - *lock = Some((round, coded_block)); + if !marshal.verified(round, coded_block).await { + debug!(?round, ?commitment, "marshal rejected proposed block"); + return; } - let success = tx.send_lossy(commitment); debug!(?round, ?commitment, success, "proposed new block"); }); @@ -973,38 +967,17 @@ where type PublicKey = ::PublicKey; type Plan = Plan; - async fn broadcast(&mut self, commitment: Self::Digest, plan: Self::Plan) -> bool { - match plan { - Plan::Propose => { - let Some((round, block)) = self.last_built.lock().take() else { - warn!(?commitment, "missing block to broadcast"); - return false; - }; - if block.commitment() != commitment { - warn!( - round = %round, - commitment = %block.commitment(), - height = %block.height(), - "skipping requested broadcast of block with mismatched commitment" - ); - return false; - }; - let height = block.height(); - if !self.marshal.proposed(round, block).await { - warn!(?round, ?commitment, %height, "marshal unable to accept block"); - return false; - } - debug!(?round, ?commitment, %height, "requested broadcast of built block"); - true - } - Plan::Forward { .. } => { - // Coding variant does not support targeted forwarding; - // peers reconstruct blocks from erasure-coded shards. - // - // TODO(#3389): Support checked data forwarding for PhasedScheme. - true - } - } + async fn broadcast(&mut self, commitment: Self::Digest, plan: Self::Plan) { + // Coding variant does not support targeted forwarding; + // peers reconstruct blocks from erasure-coded shards. + // + // TODO(#3389): Support checked data forwarding for PhasedScheme. + let Plan::Propose { round } = plan else { + return; + }; + self.marshal + .forward(round, commitment, Recipients::All) + .await; } } diff --git a/consensus/src/marshal/coding/mod.rs b/consensus/src/marshal/coding/mod.rs index f9c7ffaa68c..50b8741df16 100644 --- a/consensus/src/marshal/coding/mod.rs +++ b/consensus/src/marshal/coding/mod.rs @@ -80,11 +80,9 @@ mod tests { verifying::MockVerifyingApp, }, }, - simplex::{ - scheme::bls12381_threshold::vrf as bls12381_threshold_vrf, types::Proposal, Plan, - }, + simplex::{scheme::bls12381_threshold::vrf as bls12381_threshold_vrf, types::Proposal}, types::{coding::Commitment, Epoch, Epocher, FixedEpocher, Height, Round, View}, - Automaton, CertifiableAutomaton, Relay, + Automaton, CertifiableAutomaton, }; use commonware_codec::FixedSize; use commonware_coding::ReedSolomon; @@ -432,7 +430,6 @@ mod tests { let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential); let parent_commitment = coded_parent.commitment(); shards - .clone() .proposed(Round::new(Epoch::new(0), View::new(1)), coded_parent) .await; @@ -446,7 +443,7 @@ mod tests { let block_a = make_coding_block(context_a.clone(), parent_digest, Height::new(2), 200); let coded_block_a = CodedBlock::new(block_a.clone(), coding_config, &Sequential); let commitment_a = coded_block_a.commitment(); - shards.clone().proposed(round_a, coded_block_a).await; + shards.proposed(round_a, coded_block_a).await; // Block B at view 10 (height 2, different block same height - could happen with // different proposers or re-proposals) @@ -459,7 +456,7 @@ mod tests { let block_b = make_coding_block(context_b.clone(), parent_digest, Height::new(2), 300); let coded_block_b = CodedBlock::new(block_b.clone(), coding_config, &Sequential); let commitment_b = coded_block_b.commitment(); - shards.clone().proposed(round_b, coded_block_b).await; + shards.proposed(round_b, coded_block_b).await; context.sleep(Duration::from_millis(10)).await; @@ -558,7 +555,7 @@ mod tests { let block = make_coding_block(ctx.clone(), parent, Height::new(i), i * 100); let coded_block = CodedBlock::new(block.clone(), coding_config, &Sequential); last_commitment = coded_block.commitment(); - shards.clone().proposed(round, coded_block).await; + shards.proposed(round, coded_block).await; parent = block.digest(); last_view = View::new(i); } @@ -580,10 +577,7 @@ mod tests { let coded_boundary = CodedBlock::new(boundary_block.clone(), coding_config, &Sequential); let boundary_commitment = coded_boundary.commitment(); - shards - .clone() - .proposed(boundary_round, coded_boundary) - .await; + shards.proposed(boundary_round, coded_boundary).await; context.sleep(Duration::from_millis(10)).await; @@ -645,7 +639,6 @@ mod tests { // Make the non-boundary block available shards - .clone() .proposed(non_boundary_round, coded_non_boundary) .await; @@ -773,7 +766,6 @@ mod tests { let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential); let parent_commitment = coded_parent.commitment(); shards - .clone() .proposed(Round::new(Epoch::zero(), View::new(1)), coded_parent) .await; @@ -1134,7 +1126,6 @@ mod tests { let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential); let parent_commitment = coded_parent.commitment(); shards - .clone() .proposed(Round::new(Epoch::zero(), View::new(19)), coded_parent) .await; @@ -1148,7 +1139,6 @@ mod tests { let coded_block = CodedBlock::new(block.clone(), coding_config, &Sequential); let block_commitment = coded_block.commitment(); shards - .clone() .proposed(Round::new(Epoch::new(1), View::new(20)), coded_block) .await; @@ -1248,7 +1238,6 @@ mod tests { let coded_parent = CodedBlock::new(honest_parent.clone(), coding_config, &Sequential); let parent_commitment = coded_parent.commitment(); shards - .clone() .proposed(Round::new(Epoch::new(1), View::new(21)), coded_parent) .await; @@ -1270,10 +1259,7 @@ mod tests { let coded_malicious = CodedBlock::new(malicious_block.clone(), coding_config, &Sequential); let malicious_commitment = coded_malicious.commitment(); - shards - .clone() - .proposed(byzantine_round, coded_malicious) - .await; + shards.proposed(byzantine_round, coded_malicious).await; // Small delay to ensure broadcast is processed context.sleep(Duration::from_millis(10)).await; @@ -1318,10 +1304,7 @@ mod tests { let coded_malicious2 = CodedBlock::new(malicious_block2.clone(), coding_config, &Sequential); let malicious_commitment2 = coded_malicious2.commitment(); - shards - .clone() - .proposed(byzantine_round2, coded_malicious2) - .await; + shards.proposed(byzantine_round2, coded_malicious2).await; // Small delay to ensure broadcast is processed context.sleep(Duration::from_millis(10)).await; @@ -1410,7 +1393,7 @@ mod tests { let parent = make_coding_block(parent_ctx, genesis.digest(), Height::new(1), 100); let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential); let parent_commitment = coded_parent.commitment(); - shards.clone().proposed(parent_round, coded_parent).await; + shards.proposed(parent_round, coded_parent).await; // Create child at height 2. let child_round = Round::new(Epoch::zero(), View::new(2)); @@ -1422,7 +1405,7 @@ mod tests { let child = make_coding_block(child_ctx, parent.digest(), Height::new(2), 200); let coded_child = CodedBlock::new(child, coding_config, &Sequential); let child_commitment = coded_child.commitment(); - shards.clone().proposed(child_round, coded_child).await; + shards.proposed(child_round, coded_child).await; context.sleep(Duration::from_millis(10)).await; @@ -1531,7 +1514,7 @@ mod tests { let parent = make_coding_block(parent_context, genesis.digest(), Height::new(1), 100); let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential); let parent_commitment = coded_parent.commitment(); - shards.clone().proposed(parent_round, coded_parent).await; + shards.proposed(parent_round, coded_parent).await; // 3) Publish a valid child so optimistic verify can succeed. let round = Round::new(Epoch::zero(), View::new(2)); @@ -1544,7 +1527,7 @@ mod tests { make_coding_block(verify_context.clone(), parent.digest(), Height::new(2), 200); let coded_block = CodedBlock::new(block, coding_config, &Sequential); let commitment = coded_block.commitment(); - shards.clone().proposed(round, coded_block).await; + shards.proposed(round, coded_block).await; context.sleep(Duration::from_millis(10)).await; @@ -1639,7 +1622,7 @@ mod tests { // Validator 1 proposes coded_block_b (same inner block, different coding). // This stores it in v1's shard engine and actor cache. - assert!(v1_mailbox.proposed(round1, coded_block_b.clone()).await); + assert!(v1_mailbox.verified(round1, coded_block_b.clone()).await); context.sleep(Duration::from_millis(100)).await; // Create finalization referencing commitment_a (the "correct" commitment). @@ -1799,7 +1782,7 @@ mod tests { let parent = make_coding_block(parent_ctx, genesis.digest(), Height::new(1), 100); let coded_parent = CodedBlock::new(parent.clone(), coding_config, &Sequential); let parent_commitment = coded_parent.commitment(); - shards.clone().proposed(parent_round, coded_parent).await; + shards.proposed(parent_round, coded_parent).await; let child_round = Round::new(Epoch::zero(), View::new(2)); let child_ctx = CodingCtx { @@ -1811,7 +1794,7 @@ mod tests { let coded_child = CodedBlock::new(child.clone(), coding_config, &Sequential); let child_commitment = coded_child.commitment(); let child_digest = coded_child.digest(); - shards.clone().proposed(child_round, coded_child).await; + shards.proposed(child_round, coded_child).await; context.sleep(Duration::from_millis(10)).await; @@ -1870,10 +1853,10 @@ mod tests { } /// Regression: a proposer must be able to recover its own block after a - /// crash that occurs between `Marshaled::propose()` + `Relay::broadcast(Plan::Propose)` - /// and any verify-driven persistence. Without persisting on the broadcast - /// path, the block lives only in the in-memory shards cache and is lost - /// across restart. + /// crash that occurs immediately after `Marshaled::propose()` returns a + /// commitment. `propose` is responsible for persisting the block via + /// `marshal.verified`, so the block must survive restart even if + /// `Relay::broadcast` never runs or marshal aborts in between. #[test_traced("WARN")] fn test_marshaled_proposed_block_persists_across_restart() { let runner = deterministic::Runner::timed(Duration::from_secs(60)); @@ -1944,19 +1927,16 @@ mod tests { }; let mut marshaled = Marshaled::new(context.clone(), cfg); - // Drive the full leader-side propose + broadcast path. + // Drive the leader-side propose path. `propose` must persist the + // block before returning the commitment. let commitment = marshaled .propose(propose_context) .await .await .expect("propose should produce a commitment"); assert_eq!(commitment, expected_commitment); - assert!( - marshaled.broadcast(commitment, Plan::Propose).await, - "broadcast should persist the proposed block before returning" - ); - // Abort marshal immediately after broadcast returns; the propose + // Abort marshal immediately after propose returns; the propose // path must already have persisted the block. marshal_actor_handle.abort(); drop(marshaled); @@ -2036,7 +2016,7 @@ mod tests { let coded_a: CodedBlock<_, ReedSolomon, Sha256> = CodedBlock::new(block_a.clone(), coding_config, &Sequential); let commitment_a = coded_a.commitment(); - assert!(marshal.proposed(round, coded_a).await); + assert!(marshal.verified(round, coded_a).await); // After restart, a fresh application would build a different // block for the same round. @@ -2070,11 +2050,6 @@ mod tests { commitment, commitment_a, "propose must reuse the block marshal already persisted for this round" ); - - assert!( - marshaled.broadcast(commitment_a, Plan::Propose).await, - "relay broadcast must succeed after re-propose" - ); }); } } diff --git a/consensus/src/marshal/coding/variant.rs b/consensus/src/marshal/coding/variant.rs index cd939dffe74..49ca3df89e4 100644 --- a/consensus/src/marshal/coding/variant.rs +++ b/consensus/src/marshal/coding/variant.rs @@ -100,6 +100,7 @@ where } async fn send(&self, round: Round, block: CodedBlock, _recipients: Recipients

) { + // Targeted forwarding is not supported by the coding variant. self.proposed(round, block).await; } } diff --git a/consensus/src/marshal/core/actor.rs b/consensus/src/marshal/core/actor.rs index f6c6d168751..90cb2cd8fe4 100644 --- a/consensus/src/marshal/core/actor.rs +++ b/consensus/src/marshal/core/actor.rs @@ -529,18 +529,12 @@ where .map(Into::into); response.send_lossy(block); } - Message::Proposed { round, block, ack } => { - self.cache_verified(round, block.digest(), block.clone()) - .await; - buffer.send(round, block, Recipients::All).await; - ack.send_lossy(()); - } Message::Forward { round, commitment, - peers, + recipients, } => { - if peers.is_empty() { + if matches!(&recipients, Recipients::Some(peers) if peers.is_empty()) { continue; } let Some(block) = self.find_block_by_commitment(&buffer, commitment).await @@ -548,7 +542,7 @@ where debug!(?commitment, "block not found for forwarding"); continue; }; - buffer.send(round, block, Recipients::Some(peers)).await; + buffer.send(round, block, recipients).await; } Message::Verified { round, block, ack } => { // If the round has already been pruned by tip advancement, diff --git a/consensus/src/marshal/core/mailbox.rs b/consensus/src/marshal/core/mailbox.rs index c61107183f3..01ff946c2f3 100644 --- a/consensus/src/marshal/core/mailbox.rs +++ b/consensus/src/marshal/core/mailbox.rs @@ -9,6 +9,7 @@ use crate::{ Reporter, }; use commonware_cryptography::{certificate::Scheme, Digestible}; +use commonware_p2p::Recipients; use commonware_utils::{ channel::{fallible::AsyncFallibleExt, mpsc, oneshot}, vec::NonEmptyVec, @@ -92,23 +93,14 @@ pub(crate) enum Message { /// A channel to send the retrieved block, if any. response: oneshot::Sender>, }, - /// A request to broadcast a proposed block to peers. - Proposed { - /// The round in which the block was proposed. - round: Round, - /// The block to broadcast. - block: V::Block, - /// A channel signaled once the block is durably stored. - ack: oneshot::Sender<()>, - }, - /// A request to forward a block to a set of peers. + /// A request to forward a block to a set of recipients. Forward { /// The round in which the block was proposed. round: Round, /// The commitment of the block to forward. commitment: V::Commitment, - /// The peers to forward the block to. - peers: Vec, + /// The recipients to forward the block to. + recipients: Recipients, }, /// A notification that a block has been verified by the application. Verified { @@ -311,16 +303,6 @@ impl Mailbox { .flatten() } - /// Requests that a proposed block is sent to peers, awaiting the actor's - /// confirmation that the block has been durably persisted before returning. - #[must_use = "callers must consider block durability before proceeding"] - pub async fn proposed(&self, round: Round, block: V::Block) -> bool { - self.sender - .request(|ack| Message::Proposed { round, block, ack }) - .await - .is_some() - } - /// Notifies the actor that a block has been verified, awaiting the actor's /// confirmation that the block has been durably persisted before returning. #[must_use = "callers must consider block durability before proceeding"] @@ -365,13 +347,18 @@ impl Mailbox { self.sender.send_lossy(Message::Prune { height }).await; } - /// Forward a block to a set of peers. - pub async fn forward(&self, round: Round, commitment: V::Commitment, peers: Vec) { + /// Forward a block to a set of recipients. + pub async fn forward( + &self, + round: Round, + commitment: V::Commitment, + recipients: Recipients, + ) { self.sender .send_lossy(Message::Forward { round, commitment, - peers, + recipients, }) .await; } diff --git a/consensus/src/marshal/mocks/harness.rs b/consensus/src/marshal/mocks/harness.rs index ac1c60aa6b9..c348370c710 100644 --- a/consensus/src/marshal/mocks/harness.rs +++ b/consensus/src/marshal/mocks/harness.rs @@ -757,7 +757,7 @@ pub fn hailstorm( }) } -/// Contract: `marshal.proposed(...)=true` means the block survives an +/// Contract: `marshal.verified(...)=true` means the block survives an /// immediate crash and repeated recoveries. pub fn proposed_success_implies_recoverable_after_restart( seeds: impl IntoIterator, @@ -835,7 +835,7 @@ pub fn proposed_success_implies_recoverable_after_restart( .await; assert!( restarted.mailbox.get_block(&digest).await.is_some(), - "marshal.proposed() returning true must imply the block is recoverable \ + "marshal.verified() returning true must imply the block is recoverable \ after restart (seed={seed}, cycle={cycle})" ); } @@ -995,6 +995,8 @@ pub fn certify_at_later_view_survives_earlier_view_pruning() { let orphan_digest = H::digest(&orphan); // Verify `repeated` at V=1, then certify at V=25 (reproposal-style gap). + // The chain below starts at V=2 to avoid overwriting V=1 in the + // verified archive (which drops subsequent writes at an existing view). let v_early = Round::new(Epoch::zero(), View::new(1)); let v_late = Round::new(Epoch::zero(), View::new(25)); let mut peers: [ValidatorHandle; 0] = []; @@ -1010,7 +1012,7 @@ pub fn certify_at_later_view_survives_earlier_view_pruning() { // Drive the finalized chain forward to advance `last_processed_round` // past V=1's retention boundary but not past V=25's. With // view_retention_timeout=10 and prunable_items_per_section=10, - // processing views 1..=21 leaves `oldest_allowed=10` in both prunable + // processing views 2..=22 leaves `oldest_allowed=12` in both prunable // archives. V=1 is dropped, V=25 is retained. const CHAIN_LEN: u64 = 21; let mut parent = Sha256::hash(b""); @@ -1025,11 +1027,11 @@ pub fn certify_at_later_view_survives_earlier_view_pruning() { ); let digest = H::digest(&block); let commitment = H::commitment(&block); - let round = Round::new(Epoch::zero(), View::new(i)); + let round = Round::new(Epoch::zero(), View::new(i + 1)); H::propose(&mut handle, round, &block).await; let proposal = Proposal { round, - parent: View::new(i - 1), + parent: View::new(i), payload: commitment, }; let finalization = H::make_finalization(proposal, &schemes, QUORUM); @@ -1471,7 +1473,7 @@ impl TestHarness for StandardHarness { } async fn propose(handle: &mut ValidatorHandle, round: Round, block: &B) { - assert!(handle.mailbox.proposed(round, block.clone()).await); + assert!(handle.mailbox.verified(round, block.clone()).await); } async fn verify( @@ -2295,7 +2297,7 @@ impl TestHarness for CodingHarness { round: Round, block: &CodedBlock, Sha256>, ) { - assert!(handle.mailbox.proposed(round, block.clone()).await); + assert!(handle.mailbox.verified(round, block.clone()).await); } async fn verify( diff --git a/consensus/src/marshal/standard/deferred.rs b/consensus/src/marshal/standard/deferred.rs index 17740019523..857819a2bdb 100644 --- a/consensus/src/marshal/standard/deferred.rs +++ b/consensus/src/marshal/standard/deferred.rs @@ -74,7 +74,7 @@ use crate::{ marshal::{ ancestry::AncestorStream, application::{ - validation::{is_inferred_reproposal_at_certify, LastBuilt, Stage}, + validation::{is_inferred_reproposal_at_certify, Stage}, verification_tasks::VerificationTasks, }, core::Mailbox, @@ -93,17 +93,15 @@ use crate::{ }; use commonware_cryptography::{certificate::Scheme, Digestible}; use commonware_macros::select; +use commonware_p2p::Recipients; use commonware_runtime::{ telemetry::metrics::histogram::{Buckets, Timed}, Clock, Metrics, Spawner, }; -use commonware_utils::{ - channel::{fallible::OneshotExt, oneshot}, - sync::Mutex, -}; +use commonware_utils::channel::{fallible::OneshotExt, oneshot}; use rand::Rng; use std::sync::Arc; -use tracing::{debug, warn}; +use tracing::debug; /// An [`Application`] adapter that handles epoch transitions and validates block ancestry. /// @@ -146,7 +144,6 @@ where application: A, marshal: Mailbox>, epocher: ES, - last_built: LastBuilt, verification_tasks: VerificationTasks<::Digest>, build_duration: Timed, @@ -182,7 +179,6 @@ where application, marshal, epocher, - last_built: Arc::new(Mutex::new(None)), verification_tasks: VerificationTasks::new(), build_duration, @@ -293,15 +289,15 @@ where /// boundary block to avoid creating blocks that would be invalidated by the epoch transition. /// /// The proposal operation is spawned in a background task and returns a receiver that will - /// contain the proposed block's digest when ready. The built block is cached for later - /// broadcasting. + /// contain the proposed block's digest when ready. The built block is persisted via + /// [`Mailbox::verified`] before the digest is delivered, so consensus can rely on the + /// block surviving restart. async fn propose( &mut self, consensus_context: Context, ) -> oneshot::Receiver { let mut marshal = self.marshal.clone(); let mut application = self.application.clone(); - let last_built = self.last_built.clone(); let epocher = self.epocher.clone(); // Metrics @@ -320,10 +316,6 @@ where // stored block instead. if let Some(block) = marshal.get_verified(consensus_context.round).await { let digest = block.digest(); - { - let mut lock = last_built.lock(); - *lock = Some((consensus_context.round, block)); - } let success = tx.send_lossy(digest); debug!( round = ?consensus_context.round, @@ -372,11 +364,14 @@ where .expect("current epoch should exist"); if parent.height() == last_in_epoch { let digest = parent.digest(); - { - let mut lock = last_built.lock(); - *lock = Some((consensus_context.round, parent)); + if !marshal.verified(consensus_context.round, parent).await { + debug!( + round = ?consensus_context.round, + ?digest, + "marshal rejected re-proposed boundary block" + ); + return; } - let success = tx.send_lossy(digest); debug!( round = ?consensus_context.round, @@ -417,11 +412,14 @@ where build_timer.observe(); let digest = built_block.digest(); - { - let mut lock = last_built.lock(); - *lock = Some((consensus_context.round, built_block)); + if !marshal.verified(consensus_context.round, built_block).await { + debug!( + round = ?consensus_context.round, + ?digest, + "marshal rejected proposed block" + ); + return; } - let success = tx.send_lossy(digest); debug!( round = ?consensus_context.round, @@ -641,35 +639,12 @@ where type PublicKey = S::PublicKey; type Plan = Plan; - async fn broadcast(&mut self, digest: Self::Digest, plan: Plan) -> bool { - match plan { - Plan::Propose => { - let Some((round, block)) = self.last_built.lock().take() else { - warn!(?digest, "missing block to broadcast"); - return false; - }; - if block.digest() != digest { - warn!( - round = %round, - digest = %block.digest(), - height = %block.height(), - "skipping requested broadcast of block with mismatched digest" - ); - return false; - }; - let height = block.height(); - if !self.marshal.proposed(round, block).await { - warn!(?round, ?digest, %height, "marshal unable to accept block"); - return false; - } - debug!(?round, ?digest, %height, "requested broadcast of built block"); - true - } - Plan::Forward { round, peers } => { - self.marshal.forward(round, digest, peers).await; - true - } - } + async fn broadcast(&mut self, digest: Self::Digest, plan: Plan) { + let (round, recipients) = match plan { + Plan::Propose { round } => (round, Recipients::All), + Plan::Forward { round, recipients } => (round, recipients), + }; + self.marshal.forward(round, digest, recipients).await; } } @@ -705,9 +680,9 @@ mod tests { }, verifying::{GatedVerifyingApp, MockVerifyingApp}, }, - simplex::{scheme::bls12381_threshold::vrf as bls12381_threshold_vrf, Plan}, + simplex::scheme::bls12381_threshold::vrf as bls12381_threshold_vrf, types::{Epoch, Epocher, FixedEpocher, Height, Round, View}, - Automaton, CertifiableAutomaton, Relay, + Automaton, CertifiableAutomaton, }; use commonware_broadcast::Broadcaster; use commonware_cryptography::{ @@ -759,8 +734,7 @@ mod tests { let parent_digest = parent.digest(); assert!( marshal - .clone() - .proposed(Round::new(Epoch::new(0), View::new(1)), parent.clone()) + .verified(Round::new(Epoch::new(0), View::new(1)), parent.clone()) .await ); @@ -773,7 +747,7 @@ mod tests { }; let block_a = B::new::(context_a.clone(), parent_digest, Height::new(2), 200); let commitment_a = block_a.digest(); - assert!(marshal.clone().proposed(round_a, block_a.clone()).await); + assert!(marshal.verified(round_a, block_a.clone()).await); // Block B at view 10 (height 2, different block same height) let round_b = Round::new(Epoch::new(0), View::new(10)); @@ -784,7 +758,7 @@ mod tests { }; let block_b = B::new::(context_b.clone(), parent_digest, Height::new(2), 300); let commitment_b = block_b.digest(); - assert!(marshal.clone().proposed(round_b, block_b.clone()).await); + assert!(marshal.verified(round_b, block_b.clone()).await); context.sleep(Duration::from_millis(10)).await; @@ -894,7 +868,7 @@ mod tests { assert!( marshal .clone() - .proposed(Round::new(Epoch::zero(), View::new(19)), parent.clone()) + .verified(Round::new(Epoch::zero(), View::new(19)), parent.clone()) .await ); @@ -915,7 +889,7 @@ mod tests { assert!( marshal .clone() - .proposed(unsupported_round, block.clone()) + .verified(unsupported_round, block.clone()) .await ); @@ -987,7 +961,7 @@ mod tests { assert!( marshal .clone() - .proposed(Round::new(Epoch::zero(), View::new(1)), parent.clone()) + .verified(Round::new(Epoch::zero(), View::new(1)), parent.clone()) .await ); @@ -1000,7 +974,7 @@ mod tests { }; let block_a = B::new::(context_a, parent.digest(), Height::new(2), 200); let commitment_a = block_a.digest(); - assert!(marshal.clone().proposed(round_a, block_a).await); + assert!(marshal.verified(round_a, block_a).await); context.sleep(Duration::from_millis(10)).await; @@ -1160,7 +1134,7 @@ mod tests { }; let block_a = B::new::(ctx.clone(), genesis.digest(), Height::new(1), 100); let digest_a = block_a.digest(); - assert!(marshal.proposed(round, block_a.clone()).await); + assert!(marshal.verified(round, block_a.clone()).await); let block_b = B::new::(ctx.clone(), genesis.digest(), Height::new(1), 200); let digest_b = block_b.digest(); @@ -1181,11 +1155,6 @@ mod tests { digest, digest_a, "propose must reuse the block marshal already persisted for this round" ); - - assert!( - marshaled.broadcast(digest_a, Plan::Propose).await, - "relay broadcast must succeed after re-propose" - ); }); } } diff --git a/consensus/src/marshal/standard/inline.rs b/consensus/src/marshal/standard/inline.rs index 1be98c03680..ec89db53e14 100644 --- a/consensus/src/marshal/standard/inline.rs +++ b/consensus/src/marshal/standard/inline.rs @@ -45,7 +45,7 @@ use crate::{ marshal::{ ancestry::AncestorStream, - application::validation::{LastBuilt, Stage}, + application::validation::Stage, core::Mailbox, standard::{ validation::{ @@ -62,6 +62,7 @@ use crate::{ }; use commonware_cryptography::certificate::Scheme; use commonware_macros::select; +use commonware_p2p::Recipients; use commonware_runtime::{ telemetry::metrics::histogram::{Buckets, Timed}, Clock, Metrics, Spawner, @@ -73,7 +74,7 @@ use commonware_utils::{ use prometheus_client::metrics::histogram::Histogram; use rand::Rng; use std::{collections::BTreeSet, sync::Arc}; -use tracing::{debug, warn}; +use tracing::debug; /// Tracks `(round, digest)` pairs for which `verify` has already fetched the /// block, so `certify` can return immediately without re-subscribing to marshal. @@ -141,7 +142,6 @@ where application: A, marshal: Mailbox>, epocher: ES, - last_built: LastBuilt, available_blocks: AvailableBlocks, build_duration: Timed, @@ -162,8 +162,7 @@ where { /// Creates a new inline-verification wrapper. /// - /// Registers a `build_duration` histogram for proposal latency and initializes - /// the shared "last built block" cache used by [`Relay::broadcast`]. + /// Registers a `build_duration` histogram for proposal latency. pub fn new(context: E, application: A, marshal: Mailbox>, epocher: ES) -> Self { let build_histogram = Histogram::new(Buckets::LOCAL); context.register( @@ -178,7 +177,6 @@ where application, marshal, epocher, - last_built: Arc::new(Mutex::new(None)), available_blocks: Arc::new(Mutex::new(BTreeSet::new())), build_duration, } @@ -224,18 +222,15 @@ where /// Proposes a new block or re-proposes an epoch boundary block. /// /// Proposal runs in a spawned task and returns a receiver for the resulting digest. - /// The built block is cached in memory (`last_built`) for the subsequent - /// `Relay::broadcast(Plan::Propose)` call, which invokes `marshal.proposed()` - /// as the durable persistence boundary before consensus continues. Receiving - /// a digest from `propose()` alone does not mean the block is recoverable - /// after restart. + /// The built block is persisted via [`Mailbox::verified`] before the digest is + /// delivered, so a digest received from `propose()` implies the block is + /// recoverable after restart. async fn propose( &mut self, consensus_context: Context, ) -> oneshot::Receiver { let mut marshal = self.marshal.clone(); let mut application = self.application.clone(); - let last_built = self.last_built.clone(); let epocher = self.epocher.clone(); let build_duration = self.build_duration.clone(); @@ -252,10 +247,6 @@ where // stored block instead. if let Some(block) = marshal.get_verified(consensus_context.round).await { let digest = block.digest(); - { - let mut lock = last_built.lock(); - *lock = Some((consensus_context.round, block)); - } let success = tx.send_lossy(digest); debug!( round = ?consensus_context.round, @@ -302,11 +293,14 @@ where .expect("current epoch should exist"); if parent.height() == last_in_epoch { let digest = parent.digest(); - { - let mut lock = last_built.lock(); - *lock = Some((consensus_context.round, parent)); + if !marshal.verified(consensus_context.round, parent).await { + debug!( + round = ?consensus_context.round, + ?digest, + "marshal rejected re-proposed boundary block" + ); + return; } - let success = tx.send_lossy(digest); debug!( round = ?consensus_context.round, @@ -347,9 +341,13 @@ where build_timer.observe(); let digest = built_block.digest(); - { - let mut lock = last_built.lock(); - *lock = Some((consensus_context.round, built_block)); + if !marshal.verified(consensus_context.round, built_block).await { + debug!( + round = ?consensus_context.round, + ?digest, + "marshal rejected proposed block" + ); + return; } let success = tx.send_lossy(digest); debug!( @@ -524,33 +522,12 @@ where type PublicKey = S::PublicKey; type Plan = Plan; - async fn broadcast(&mut self, digest: Self::Digest, plan: Plan) -> bool { - match plan { - Plan::Propose => { - let Some((round, block)) = self.last_built.lock().take() else { - warn!(?digest, "missing block to broadcast"); - return false; - }; - if block.digest() != digest { - warn!( - round = %round, - digest = %block.digest(), - height = %block.height(), - "skipping requested broadcast of block with mismatched digest" - ); - return false; - }; - if !self.marshal.proposed(round, block).await { - warn!(?round, ?digest, "marshal unable to accept block"); - return false; - } - true - } - Plan::Forward { round, peers } => { - self.marshal.forward(round, digest, peers).await; - true - } - } + async fn broadcast(&mut self, digest: Self::Digest, plan: Plan) { + let (round, recipients) = match plan { + Plan::Propose { round } => (round, Recipients::All), + Plan::Forward { round, recipients } => (round, recipients), + }; + self.marshal.forward(round, digest, recipients).await; } } @@ -587,9 +564,7 @@ mod tests { }, verifying::{GatedVerifyingApp, MockVerifyingApp}, }, - simplex::{ - scheme::bls12381_threshold::vrf as bls12381_threshold_vrf, types::Context, Plan, - }, + simplex::{scheme::bls12381_threshold::vrf as bls12381_threshold_vrf, types::Context}, types::{Epoch, FixedEpocher, Height, Round, View}, Automaton, Block, CertifiableAutomaton, Relay, VerifyingApplication, }; @@ -670,7 +645,7 @@ mod tests { }; let parent = B::new::(parent_ctx, genesis.digest(), Height::new(1), 100); let parent_digest = parent.digest(); - assert!(marshal.proposed(parent_round, parent).await); + assert!(marshal.verified(parent_round, parent).await); let round = Round::new(Epoch::zero(), View::new(2)); let verify_context = Ctx { @@ -681,7 +656,7 @@ mod tests { let block = B::new::(verify_context.clone(), parent_digest, Height::new(2), 200); let digest = block.digest(); - assert!(marshal.proposed(round, block).await); + assert!(marshal.verified(round, block).await); // Complete verify first so the block is already available locally. let verify_rx = inline.verify(verify_context, digest).await; @@ -748,7 +723,7 @@ mod tests { }; let parent = B::new::(parent_ctx, genesis.digest(), Height::new(1), 100); let parent_digest = parent.digest(); - assert!(marshal.proposed(parent_round, parent).await); + assert!(marshal.verified(parent_round, parent).await); let round = Round::new(Epoch::zero(), View::new(2)); let verify_context = Ctx { @@ -759,7 +734,7 @@ mod tests { let block = B::new::(verify_context.clone(), parent_digest, Height::new(2), 200); let digest = block.digest(); - assert!(marshal.proposed(round, block).await); + assert!(marshal.verified(round, block).await); // Certify should still resolve by waiting on marshal block availability directly. let certify_rx = inline.certify(round, digest).await; @@ -824,7 +799,7 @@ mod tests { 1900, ); let boundary_digest = boundary_block.digest(); - assert!(marshal.proposed(boundary_round, boundary_block).await); + assert!(marshal.verified(boundary_round, boundary_block).await); let reproposal_round = Round::new(Epoch::zero(), View::new(boundary_height.get() + 1)); let reproposal_context = Ctx { @@ -1192,9 +1167,9 @@ mod tests { } /// Regression: if marshal persisted a verified block for a round before - /// a crash (say, via a prior `Relay::broadcast(Plan::Propose)` landing on - /// the verified cache) but the simplex notarize artifact never reached - /// the journal, a restarted leader must re-use the persisted block. + /// a crash (via a prior `propose` call) but the simplex notarize artifact + /// never reached the journal, a restarted leader must re-use the persisted + /// block. /// /// Otherwise the application is asked to build afresh, returns a new /// block whose digest does not match the one marshal already stored @@ -1235,7 +1210,7 @@ mod tests { }; let block_a = B::new::(ctx.clone(), genesis.digest(), Height::new(1), 100); let digest_a = block_a.digest(); - assert!(marshal.proposed(round, block_a.clone()).await); + assert!(marshal.verified(round, block_a.clone()).await); // After restart, the fresh application would build a different // block for the same round (distinct timestamp -> distinct digest). @@ -1258,14 +1233,6 @@ mod tests { digest, digest_a, "propose must reuse the block marshal already persisted for this round" ); - - // After the automaton hands the digest to the voter, the voter - // calls Relay::broadcast(Plan::Propose). That call must succeed so - // the leader actually votes instead of bailing out. - assert!( - inline.broadcast(digest_a, Plan::Propose).await, - "relay broadcast must succeed after re-propose" - ); }); } } diff --git a/consensus/src/marshal/standard/mod.rs b/consensus/src/marshal/standard/mod.rs index b4971181e57..c0657e35376 100644 --- a/consensus/src/marshal/standard/mod.rs +++ b/consensus/src/marshal/standard/mod.rs @@ -82,7 +82,7 @@ mod tests { use commonware_parallel::Sequential; use commonware_resolver::Resolver; use commonware_runtime::{ - buffer::paged::CacheRef, deterministic, Clock, Metrics, Quota, Runner, Spawner, + buffer::paged::CacheRef, deterministic, Clock, Metrics, Quota, Runner, }; use commonware_storage::{ archive::{immutable, prunable, Archive as _}, @@ -479,12 +479,12 @@ mod tests { .mailbox; assert!( peer_mailbox - .proposed(Round::new(Epoch::zero(), View::new(1)), block_one.clone()) + .verified(Round::new(Epoch::zero(), View::new(1)), block_one.clone()) .await ); assert!( peer_mailbox - .proposed(Round::new(Epoch::zero(), View::new(2)), block_two.clone()) + .verified(Round::new(Epoch::zero(), View::new(2)), block_two.clone()) .await ); StandardHarness::report_finalization(&mut peer_mailbox, finalization_two.clone()).await; @@ -576,17 +576,17 @@ mod tests { .mailbox; assert!( peer_mailbox - .proposed(Round::new(Epoch::zero(), View::new(1)), block_one.clone()) + .verified(Round::new(Epoch::zero(), View::new(1)), block_one.clone()) .await ); assert!( peer_mailbox - .proposed(Round::new(Epoch::zero(), View::new(2)), block_two.clone()) + .verified(Round::new(Epoch::zero(), View::new(2)), block_two.clone()) .await ); assert!( peer_mailbox - .proposed(Round::new(Epoch::zero(), View::new(3)), block_three.clone()) + .verified(Round::new(Epoch::zero(), View::new(3)), block_three.clone()) .await ); StandardHarness::report_finalization(&mut peer_mailbox, finalization_two.clone()).await; @@ -768,7 +768,7 @@ mod tests { for (i, block) in blocks.iter().enumerate() { assert!( peer_mailbox - .proposed( + .verified( Round::new(Epoch::zero(), View::new(block.height().get())), (*block).clone(), ) @@ -1200,7 +1200,7 @@ mod tests { assert!( marshal .clone() - .proposed(boundary_round, boundary_block.clone()) + .verified(boundary_round, boundary_block.clone()) .await ); @@ -1272,7 +1272,7 @@ mod tests { assert!( marshal .clone() - .proposed(boundary_round, boundary_block) + .verified(boundary_round, boundary_block) .await ); @@ -1311,7 +1311,7 @@ mod tests { assert!( marshal .clone() - .proposed(non_boundary_round, non_boundary_block) + .verified(non_boundary_round, non_boundary_block) .await ); @@ -1416,7 +1416,7 @@ mod tests { assert!( marshal .clone() - .proposed(malformed_round, malformed_block) + .verified(malformed_round, malformed_block) .await ); @@ -1456,7 +1456,7 @@ mod tests { let parent = B::new::(parent_context, genesis.digest(), Height::new(1), 300); let parent_digest = parent.digest(); - assert!(marshal.clone().proposed(parent_round, parent).await); + assert!(marshal.verified(parent_round, parent).await); let mismatch_round = Round::new(Epoch::zero(), View::new(3)); let mismatched_context = Ctx { @@ -1474,7 +1474,7 @@ mod tests { assert!( marshal .clone() - .proposed(mismatch_round, mismatched_block) + .verified(mismatch_round, mismatched_block) .await ); @@ -1549,7 +1549,7 @@ mod tests { }; let parent = B::new::(parent_context, genesis.digest(), Height::new(1), 100); let parent_digest = parent.digest(); - assert!(marshal.clone().proposed(parent_round, parent).await); + assert!(marshal.verified(parent_round, parent).await); // 2) Publish a valid child; only application-level verification should fail. let round = Round::new(Epoch::zero(), View::new(2)); @@ -1560,7 +1560,7 @@ mod tests { }; let block = B::new::(verify_context.clone(), parent_digest, Height::new(2), 200); let digest = block.digest(); - assert!(marshal.clone().proposed(round, block).await); + assert!(marshal.verified(round, block).await); context.sleep(Duration::from_millis(10)).await; @@ -1706,67 +1706,6 @@ mod tests { } } - /// A buffer whose `send` blocks until released, and signals when entered. - /// Used to verify `proposed` only resolves after `buffer.send` completes. - #[derive(Clone)] - struct GatingBuffer { - send_entered: Arc>>>, - release: Arc>>>, - } - - impl GatingBuffer { - fn new() -> (Self, oneshot::Receiver<()>, oneshot::Sender<()>) { - let (entered_tx, entered_rx) = oneshot::channel(); - let (release_tx, release_rx) = oneshot::channel(); - ( - Self { - send_entered: Arc::new(Mutex::new(Some(entered_tx))), - release: Arc::new(Mutex::new(Some(release_rx))), - }, - entered_rx, - release_tx, - ) - } - } - - impl crate::marshal::core::Buffer> for GatingBuffer { - type PublicKey = PublicKey; - type CachedBlock = B; - - async fn find_by_digest(&self, _digest: D) -> Option { - None - } - - async fn find_by_commitment(&self, _commitment: D) -> Option { - None - } - - async fn subscribe_by_digest(&self, _digest: D) -> oneshot::Receiver { - let (_sender, receiver) = oneshot::channel(); - receiver - } - - async fn subscribe_by_commitment( - &self, - _commitment: D, - ) -> oneshot::Receiver { - let (_sender, receiver) = oneshot::channel(); - receiver - } - - async fn finalized(&self, _commitment: D) {} - - async fn send(&self, _round: Round, _block: B, _recipients: Recipients) { - if let Some(entered) = self.send_entered.lock().take() { - entered.send_lossy(()); - } - let release = self.release.lock().take(); - if let Some(release) = release { - let _ = release.await; - } - } - } - /// A reporter that blocks inside `Update::Block` so tests can abort marshal /// exactly when application delivery starts. #[derive(Clone)] @@ -1914,70 +1853,6 @@ mod tests { (mailbox, buffer, resolver, actor_handle) } - /// Regression: `marshal.proposed` must not ack until the block has been - /// handed off to the provided buffer. - #[test_traced("WARN")] - fn test_standard_proposed_waits_for_buffer_send() { - let runner = deterministic::Runner::timed(Duration::from_secs(30)); - runner.start(|mut context| async move { - let Fixture { - participants, - schemes, - .. - } = bls12381_threshold_vrf::fixture::(&mut context, NAMESPACE, NUM_VALIDATORS); - let me = participants[0].clone(); - let partition_prefix = format!("proposed-waits-buffer-{me}"); - - let (buffer, send_entered, release) = GatingBuffer::new(); - let (mailbox, _buffer, _resolver, _actor_handle) = start_standard_actor( - context.with_label("validator_0"), - &partition_prefix, - ConstantProvider::new(schemes[0].clone()), - Application::::manual_ack(), - buffer, - ) - .await; - - let round = Round::new(Epoch::zero(), View::new(1)); - let block = make_raw_block(Sha256::hash(b""), Height::new(1), 100); - - // Drive `proposed` from a spawned task so we can observe its state - // from the main task via a completion channel. - let (done_tx, done_rx) = oneshot::channel(); - context - .with_label("proposed_caller") - .spawn(move |_| async move { - let ok = mailbox.proposed(round, block).await; - done_tx.send_lossy(ok); - }); - - // Wait for the marshal actor to enter `buffer.send`. - send_entered - .await - .expect("buffer.send should be entered after cache_verified"); - - // With the buffer held in `send`, `proposed` must remain pending. - // Poll it against a generous timer; the timer should always win. - futures::pin_mut!(done_rx); - select! { - _ = context.sleep(Duration::from_millis(500)) => {}, - _ = &mut done_rx => { - panic!("proposed returned before buffer.send released"); - }, - } - - // Releasing the gate lets `send` complete; `proposed` must then ack. - release.send_lossy(()); - let ok = select! { - result = &mut done_rx => result.expect("proposed channel closed"), - _ = context.sleep(Duration::from_secs(5)) => { - panic!("proposed did not complete after buffer release"); - }, - }; - assert!(ok, "proposed should return true after durable dispatch"); - }); - } - /// When the provider has no verifier for an epoch, in-flight deliveries /// for that epoch must be acknowledged (`true`) so the serving peer is /// not blamed, rather than rejected (`false`). @@ -2362,7 +2237,11 @@ mod tests { .await; mailbox - .forward(round, unknown, vec![participants[1].clone()]) + .forward( + round, + unknown, + Recipients::Some(vec![participants[1].clone()]), + ) .await; context.sleep(Duration::from_millis(50)).await; @@ -2401,7 +2280,9 @@ mod tests { assert!(mailbox.verified(round, block.clone()).await); let targets = vec![participants[1].clone(), participants[2].clone()]; - mailbox.forward(round, digest, targets.clone()).await; + mailbox + .forward(round, digest, Recipients::Some(targets.clone())) + .await; wait_until(&context, Duration::from_secs(5), "buffer.send", || { !buffer.sends.lock().is_empty() diff --git a/consensus/src/ordered_broadcast/mocks/automaton.rs b/consensus/src/ordered_broadcast/mocks/automaton.rs index 576971dbb7d..806846670cc 100644 --- a/consensus/src/ordered_broadcast/mocks/automaton.rs +++ b/consensus/src/ordered_broadcast/mocks/automaton.rs @@ -68,8 +68,7 @@ impl R for Automaton

{ type Plan = (); type PublicKey = P; - async fn broadcast(&mut self, payload: Self::Digest, _plan: ()) -> bool { + async fn broadcast(&mut self, payload: Self::Digest, _plan: ()) { trace!(?payload, "broadcast"); - true } } diff --git a/consensus/src/simplex/actors/batcher/actor.rs b/consensus/src/simplex/actors/batcher/actor.rs index d517e712fe3..46c61ee4cec 100644 --- a/consensus/src/simplex/actors/batcher/actor.rs +++ b/consensus/src/simplex/actors/batcher/actor.rs @@ -14,7 +14,7 @@ use crate::{ }; use commonware_cryptography::Digest; use commonware_macros::select_loop; -use commonware_p2p::{utils::codec::WrappedReceiver, Blocker, Receiver}; +use commonware_p2p::{utils::codec::WrappedReceiver, Blocker, Receiver, Recipients}; use commonware_parallel::Strategy; use commonware_runtime::{ spawn_cell, @@ -249,7 +249,7 @@ where proposal.payload, Plan::Forward { round: proposal.round, - peers, + recipients: Recipients::Some(peers), }, ) .await; diff --git a/consensus/src/simplex/actors/batcher/mod.rs b/consensus/src/simplex/actors/batcher/mod.rs index 5cf77ded322..6c52e1eab36 100644 --- a/consensus/src/simplex/actors/batcher/mod.rs +++ b/consensus/src/simplex/actors/batcher/mod.rs @@ -97,11 +97,14 @@ mod tests { type PublicKey = PublicKey; type Plan = Plan; - async fn broadcast(&mut self, payload: Sha256Digest, plan: Self::Plan) -> bool { - if let Plan::Forward { round, peers } = plan { + async fn broadcast(&mut self, payload: Sha256Digest, plan: Self::Plan) { + if let Plan::Forward { + round, + recipients: Recipients::Some(peers), + } = plan + { self.broadcasts.lock().push((payload, round, peers)); } - true } } diff --git a/consensus/src/simplex/actors/voter/actor.rs b/consensus/src/simplex/actors/voter/actor.rs index 55e66fff273..8a3ef3dfb4a 100644 --- a/consensus/src/simplex/actors/voter/actor.rs +++ b/consensus/src/simplex/actors/voter/actor.rs @@ -918,14 +918,15 @@ impl< } view = self.state.current_view(); - // Notify application of proposal - if !self.relay.broadcast(proposed, Plan::Propose).await { - warn!( - round = ?context.round, - "failed to broadcast proposed payload, stopping voter" - ); - break; - } + // Notify application of proposal. + self.relay + .broadcast( + proposed, + Plan::Propose { + round: context.round, + }, + ) + .await; }, (context, verified) = verify_wait => { // Clear verify waiter diff --git a/consensus/src/simplex/actors/voter/mod.rs b/consensus/src/simplex/actors/voter/mod.rs index 2441d2c2e81..1d724cc1d58 100644 --- a/consensus/src/simplex/actors/voter/mod.rs +++ b/consensus/src/simplex/actors/voter/mod.rs @@ -63,8 +63,8 @@ mod tests { secp256r1, Scheme, }, types::{ - Artifact, Certificate, Finalization, Finalize, Notarization, Notarize, - Nullification, Nullify, Proposal, Vote, + Certificate, Finalization, Finalize, Notarization, Notarize, Nullification, + Nullify, Proposal, Vote, }, }, types::{Participant, Round, View}, @@ -84,9 +84,8 @@ mod tests { use commonware_runtime::{ deterministic, telemetry::traces::collector::TraceStorage, Clock, Metrics, Quota, Runner, }; - use commonware_storage::journal::segmented::variable::{Config as JConfig, Journal}; use commonware_utils::{channel::mpsc, sync::Mutex, NZUsize, NZU16}; - use futures::{pin_mut, FutureExt, StreamExt}; + use futures::FutureExt; use std::{ num::{NonZeroU16, NonZeroU32}, sync::Arc, @@ -98,27 +97,6 @@ mod tests { const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(10); const TEST_QUOTA: Quota = Quota::per_second(NonZeroU32::MAX); - #[derive(Clone, Default)] - struct FailingRelay { - proposes: Arc>>, - } - - impl crate::Relay for FailingRelay { - type Digest = Sha256Digest; - type PublicKey = PublicKey; - type Plan = Plan; - - async fn broadcast(&mut self, payload: Sha256Digest, plan: Self::Plan) -> bool { - match plan { - Plan::Propose => { - self.proposes.lock().push(payload); - false - } - Plan::Forward { .. } => true, - } - } - } - async fn start_test_network_with_peers( context: deterministic::Context, peers: I, @@ -141,280 +119,6 @@ mod tests { oracle } - fn propose_broadcast_failure_stops_before_notarize(mut fixture: F) - where - S: Scheme, - F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture, - { - let namespace = b"propose_broadcast_failure_stops_before_notarize".to_vec(); - let partition = "propose_broadcast_failure_stops_before_notarize".to_string(); - let executor = deterministic::Runner::timed(Duration::from_secs(10)); - executor.start(|mut context| async move { - let Fixture { - participants, - schemes, - .. - } = fixture(&mut context, &namespace, 5); - let oracle = - start_test_network_with_peers(context.clone(), participants.clone(), true).await; - - let me = participants[0].clone(); - let elector = RoundRobin::::default(); - let reporter_cfg = mocks::reporter::Config { - participants: participants.clone().try_into().unwrap(), - scheme: schemes[0].clone(), - elector: elector.clone(), - }; - let reporter = mocks::reporter::Reporter::new( - context.with_label("reporter"), - reporter_cfg.clone(), - ); - - let app_relay = Arc::new(mocks::relay::Relay::new()); - let app_cfg = mocks::application::Config { - hasher: Sha256::default(), - relay: app_relay, - me: me.clone(), - propose_latency: (0.0, 0.0), - verify_latency: (0.0, 0.0), - certify_latency: (0.0, 0.0), - should_certify: mocks::application::Certifier::Always, - }; - let (app_actor, application) = - mocks::application::Application::new(context.with_label("app"), app_cfg); - app_actor.start(); - - let relay = FailingRelay::default(); - let propose_attempts = relay.proposes.clone(); - let voter_cfg = Config { - scheme: schemes[0].clone(), - elector, - blocker: oracle.control(me.clone()), - automaton: application.clone(), - relay, - reporter, - partition: partition.clone(), - epoch: Epoch::new(4), - mailbox_size: 128, - leader_timeout: Duration::from_secs(5), - certification_timeout: Duration::from_secs(5), - timeout_retry: Duration::from_mins(60), - activity_timeout: ViewDelta::new(10), - replay_buffer: NZUsize!(1024 * 1024), - write_buffer: NZUsize!(1024 * 1024), - page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE), - }; - let (voter, _mailbox) = Actor::new(context.with_label("voter"), voter_cfg); - let (resolver_sender, _resolver_receiver) = mpsc::channel(8); - let (batcher_sender, mut batcher_receiver) = mpsc::channel(8); - let (vote_sender, _) = oracle - .control(me.clone()) - .register(0, TEST_QUOTA) - .await - .unwrap(); - let (cert_sender, _) = oracle - .control(me.clone()) - .register(1, TEST_QUOTA) - .await - .unwrap(); - let handle = voter.start( - batcher::Mailbox::new(batcher_sender), - resolver::Mailbox::new(resolver_sender), - vote_sender, - cert_sender, - ); - - match batcher_receiver.recv().await.unwrap() { - batcher::Message::Update { - current, - leader, - response, - .. - } => { - assert_eq!(current, View::new(1)); - let _ = leader; - response.send(None).unwrap(); - } - _ => panic!("unexpected initial batcher message"), - } - - select! { - result = handle => { - result.expect("voter should stop cleanly after failed propose broadcast"); - }, - _ = context.sleep(Duration::from_secs(1)) => { - panic!("timed out waiting for voter to stop after failed propose broadcast"); - } - } - - assert_eq!( - propose_attempts.lock().len(), - 1, - "expected exactly one failed propose broadcast attempt" - ); - - while let Some(message) = batcher_receiver.recv().now_or_never().flatten() { - match message { - batcher::Message::Constructed(Vote::Notarize(notarize)) => { - panic!( - "unexpected notarize for view {} after failed propose broadcast", - notarize.view() - ); - } - batcher::Message::Update { response, .. } => { - response.send(None).unwrap(); - } - batcher::Message::Constructed(_) => {} - } - } - - let journal = Journal::<_, Artifact>::init( - context.with_label("journal_check"), - JConfig { - partition, - compression: None, - codec_config: schemes[0].certificate_codec_config(), - page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE), - write_buffer: NZUsize!(1024 * 1024), - }, - ) - .await - .expect("unable to open voter journal"); - let stream = journal - .replay(0, 0, NZUsize!(1024 * 1024)) - .await - .expect("unable to replay voter journal"); - pin_mut!(stream); - if let Some(entry) = stream.next().await { - let (_, _, _, artifact) = entry.expect("unable to decode voter journal artifact"); - panic!( - "failed propose broadcast must not leave durable vote remnants, found {artifact:?}" - ); - } - }); - } - - #[test_traced] - fn test_propose_broadcast_failure_stops_before_notarize() { - propose_broadcast_failure_stops_before_notarize::<_, _>( - bls12381_threshold_vrf::fixture::, - ); - propose_broadcast_failure_stops_before_notarize::<_, _>( - bls12381_threshold_vrf::fixture::, - ); - propose_broadcast_failure_stops_before_notarize::<_, _>( - bls12381_multisig::fixture::, - ); - propose_broadcast_failure_stops_before_notarize::<_, _>( - bls12381_multisig::fixture::, - ); - propose_broadcast_failure_stops_before_notarize::<_, _>(ed25519::fixture); - propose_broadcast_failure_stops_before_notarize::<_, _>(secp256r1::fixture); - } - - /// Engine must not panic when the voter exits cleanly after the local - /// relay rejects `Plan::Propose`. The voter treats that as a fatal stop; - /// the engine must agree and shut down gracefully. - #[test_traced] - fn test_engine_stops_cleanly_when_voter_exits_after_failed_propose_broadcast() { - let namespace = - b"engine_stops_cleanly_when_voter_exits_after_failed_propose_broadcast".to_vec(); - let partition = - "engine_stops_cleanly_when_voter_exits_after_failed_propose_broadcast".to_string(); - let executor = deterministic::Runner::timed(Duration::from_secs(10)); - executor.start(|mut context| async move { - let Fixture { - participants, - schemes, - .. - } = bls12381_threshold_vrf::fixture::(&mut context, &namespace, 5); - let oracle = - start_test_network_with_peers(context.clone(), participants.clone(), true).await; - - let me = participants[0].clone(); - let elector = RoundRobin::::default(); - let reporter_cfg = mocks::reporter::Config { - participants: participants.clone().try_into().unwrap(), - scheme: schemes[0].clone(), - elector: elector.clone(), - }; - let reporter = mocks::reporter::Reporter::new( - context.with_label("reporter"), - reporter_cfg.clone(), - ); - - let app_relay = Arc::new(mocks::relay::Relay::new()); - let app_cfg = mocks::application::Config { - hasher: Sha256::default(), - relay: app_relay, - me: me.clone(), - propose_latency: (0.0, 0.0), - verify_latency: (0.0, 0.0), - certify_latency: (0.0, 0.0), - should_certify: mocks::application::Certifier::Always, - }; - let (app_actor, application) = - mocks::application::Application::new(context.with_label("app"), app_cfg); - app_actor.start(); - - let cfg = crate::simplex::config::Config { - scheme: schemes[0].clone(), - elector, - blocker: oracle.control(me.clone()), - automaton: application, - relay: FailingRelay::default(), - reporter, - strategy: Sequential, - partition, - mailbox_size: 128, - epoch: Epoch::new(4), - leader_timeout: Duration::from_secs(5), - certification_timeout: Duration::from_secs(5), - timeout_retry: Duration::from_mins(60), - fetch_timeout: Duration::from_secs(1), - activity_timeout: ViewDelta::new(10), - skip_timeout: ViewDelta::new(5), - fetch_concurrent: 4, - replay_buffer: NZUsize!(1024 * 1024), - write_buffer: NZUsize!(1024 * 1024), - page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE), - forwarding: crate::simplex::config::ForwardingPolicy::Disabled, - }; - let engine = crate::simplex::Engine::new(context.with_label("engine"), cfg); - - let (vote_sender, vote_receiver) = oracle - .control(me.clone()) - .register(0, TEST_QUOTA) - .await - .unwrap(); - let (cert_sender, cert_receiver) = oracle - .control(me.clone()) - .register(1, TEST_QUOTA) - .await - .unwrap(); - let (resolver_sender, resolver_receiver) = oracle - .control(me.clone()) - .register(2, TEST_QUOTA) - .await - .unwrap(); - - let handle = engine.start( - (vote_sender, vote_receiver), - (cert_sender, cert_receiver), - (resolver_sender, resolver_receiver), - ); - - select! { - result = handle => { - result.expect("engine should stop cleanly after voter exit"); - }, - _ = context.sleep(Duration::from_secs(2)) => { - panic!("timed out waiting for engine to stop after voter exit"); - } - } - }); - } - fn build_notarization>( schemes: &[S], proposal: &Proposal, diff --git a/consensus/src/simplex/mocks/application.rs b/consensus/src/simplex/mocks/application.rs index 8602933211e..4648950ca75 100644 --- a/consensus/src/simplex/mocks/application.rs +++ b/consensus/src/simplex/mocks/application.rs @@ -116,8 +116,8 @@ impl Re for Mailbox { type PublicKey = P; type Plan = Plan

; - async fn broadcast(&mut self, payload: Self::Digest, _plan: Plan

) -> bool { - self.sender.send_lossy(Message::Broadcast { payload }).await + async fn broadcast(&mut self, payload: Self::Digest, _plan: Plan

) { + self.sender.send_lossy(Message::Broadcast { payload }).await; } } diff --git a/consensus/src/simplex/mod.rs b/consensus/src/simplex/mod.rs index 4a6738e04cd..84c016727ef 100644 --- a/consensus/src/simplex/mod.rs +++ b/consensus/src/simplex/mod.rs @@ -314,72 +314,71 @@ //! Before sending a message, the `Journal` sync is invoked to prevent inadvertent Byzantine behavior //! on restart (especially in the case of unclean shutdown). -use crate::types::Round; -use commonware_cryptography::PublicKey; - pub mod elector; pub mod scheme; pub mod types; cfg_if::cfg_if! { if #[cfg(not(target_arch = "wasm32"))] { + use crate::types::{Round, View, ViewDelta}; + use commonware_cryptography::PublicKey; + use commonware_p2p::Recipients; + mod actors; pub mod config; pub use config::{Config, ForwardingPolicy}; mod engine; pub use engine::Engine; mod metrics; - } -} -#[cfg(any(test, feature = "mocks"))] -pub mod mocks; - -#[cfg(not(target_arch = "wasm32"))] -use crate::types::{View, ViewDelta}; + /// The minimum view we are tracking both in-memory and on-disk. + pub(crate) const fn min_active(activity_timeout: ViewDelta, last_finalized: View) -> View { + last_finalized.saturating_sub(activity_timeout) + } -/// The minimum view we are tracking both in-memory and on-disk. -#[cfg(not(target_arch = "wasm32"))] -pub(crate) const fn min_active(activity_timeout: ViewDelta, last_finalized: View) -> View { - last_finalized.saturating_sub(activity_timeout) -} + /// Whether or not a view is interesting to us. This is a function + /// of both `min_active` and whether or not the view is too far + /// in the future (based on the view we are currently in). + pub(crate) fn interesting( + activity_timeout: ViewDelta, + last_finalized: View, + current: View, + pending: View, + allow_future: bool, + ) -> bool { + // If the view is genesis, skip it, genesis doesn't have votes + if pending.is_zero() { + return false; + } + if pending < min_active(activity_timeout, last_finalized) { + return false; + } + if !allow_future && pending > current.next() { + return false; + } + true + } -/// Whether or not a view is interesting to us. This is a function -/// of both `min_active` and whether or not the view is too far -/// in the future (based on the view we are currently in). -#[cfg(not(target_arch = "wasm32"))] -pub(crate) fn interesting( - activity_timeout: ViewDelta, - last_finalized: View, - current: View, - pending: View, - allow_future: bool, -) -> bool { - // If the view is genesis, skip it, genesis doesn't have votes - if pending.is_zero() { - return false; - } - if pending < min_active(activity_timeout, last_finalized) { - return false; - } - if !allow_future && pending > current.next() { - return false; + /// Describes how a payload should be broadcast to the network. + pub enum Plan { + /// Initial broadcast of a newly proposed block to all participants. + Propose { + /// The round in which the block was proposed. + round: Round, + }, + /// Forward a block to a specific set of peers. + Forward { + /// The round in which the forwarded block was proposed. + round: Round, + /// The recipients to forward the block to. + recipients: Recipients

, + }, + } } - true } -/// Describes how a payload should be broadcast to the network. -pub enum Plan { - /// Initial broadcast of a newly proposed block to all participants. - Propose, - /// Forward a block to a specific set of peers. - Forward { - /// The round in which the forwarded block was proposed. - round: Round, - /// The peers to forward the block to. - peers: Vec

, - }, -} +#[cfg(any(test, feature = "mocks"))] +pub mod mocks; /// Convenience alias for [`N3f1::quorum`]. #[cfg(test)] diff --git a/examples/bridge/src/application/ingress.rs b/examples/bridge/src/application/ingress.rs index c1abebaf54f..6881ad3f127 100644 --- a/examples/bridge/src/application/ingress.rs +++ b/examples/bridge/src/application/ingress.rs @@ -96,12 +96,11 @@ impl Re for Mailbox { type PublicKey = PublicKey; type Plan = Plan; - async fn broadcast(&mut self, _: Self::Digest, _: Self::Plan) -> bool { + async fn broadcast(&mut self, _: Self::Digest, _: Self::Plan) { // We don't broadcast our raw messages to other peers. // // If we were building an EVM blockchain, for example, we'd // send the block to other peers here. - true } } diff --git a/examples/log/src/application/ingress.rs b/examples/log/src/application/ingress.rs index 064fdd7a095..17bb94244ab 100644 --- a/examples/log/src/application/ingress.rs +++ b/examples/log/src/application/ingress.rs @@ -85,11 +85,10 @@ impl Re for Mailbox { type PublicKey = PublicKey; type Plan = Plan; - async fn broadcast(&mut self, _: Self::Digest, _: Self::Plan) -> bool { + async fn broadcast(&mut self, _: Self::Digest, _: Self::Plan) { // We don't broadcast our raw messages to other peers. // // If we were building an EVM blockchain, for example, we'd // send the block to other peers here. - true } }