diff --git a/consensus/src/marshal/application/validation.rs b/consensus/src/marshal/application/validation.rs index 46df606ffa5..e2e8c5e2b07 100644 --- a/consensus/src/marshal/application/validation.rs +++ b/consensus/src/marshal/application/validation.rs @@ -3,7 +3,11 @@ //! This module centralizes pure invariant checks shared across marshal verification //! and certification flows. -use crate::types::{Epoch, Epocher, Height, Round}; +use crate::{ + marshal::core::{Mailbox, Variant}, + types::{Epoch, Epocher, Height, Round}, +}; +use commonware_cryptography::certificate::Scheme; use commonware_utils::sync::Mutex; use std::sync::Arc; @@ -11,6 +15,32 @@ use std::sync::Arc; /// proposer task and the broadcast path. pub(crate) type LastBuilt = Arc>>; +/// Which stage of verification a block has reached. +/// +/// This is used to determine which marshal cache a block should be stored in. +#[derive(Clone, Copy, Debug)] +pub(crate) enum Stage { + /// The block has been verified (store in `verified_blocks`). + Verified, + /// The block has been certified (store in `notarized_blocks`). + Certified, +} + +impl Stage { + /// Store `block` in the marshal cache for the provided stage. + pub(crate) async fn store( + self, + marshal: &mut Mailbox, + round: Round, + block: V::Block, + ) -> bool { + match self { + Self::Verified => marshal.verified(round, block).await, + Self::Certified => marshal.certified(round, block).await, + } + } +} + /// Returns true if the block is at an epoch boundary (last block in its epoch). #[inline] fn is_at_epoch_boundary(epocher: &ES, block_height: Height, epoch: Epoch) -> bool { diff --git a/consensus/src/marshal/application/verification_tasks.rs b/consensus/src/marshal/application/verification_tasks.rs index 98df16a77ab..7c85b0502e5 100644 --- a/consensus/src/marshal/application/verification_tasks.rs +++ b/consensus/src/marshal/application/verification_tasks.rs @@ -59,3 +59,114 @@ where .retain(|(task_round, _), _| task_round > finalized_round); } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::types::{Epoch, View}; + use commonware_cryptography::{sha256::Digest as Sha256Digest, Hasher, Sha256}; + + type D = Sha256Digest; + + fn round(view: u64) -> Round { + Round::new(Epoch::zero(), View::new(view)) + } + + fn pending_task() -> oneshot::Receiver { + let (_tx, rx) = oneshot::channel(); + rx + } + + #[test] + fn test_insert_and_take_returns_task() { + let tasks = VerificationTasks::::new(); + let digest = Sha256::hash(b"block"); + tasks.insert(round(1), digest, pending_task()); + + assert!(tasks.take(round(1), digest).is_some()); + assert!( + tasks.take(round(1), digest).is_none(), + "taking twice should yield None" + ); + } + + #[test] + fn test_take_absent_key_is_none() { + let tasks = VerificationTasks::::new(); + assert!(tasks.take(round(1), Sha256::hash(b"missing")).is_none()); + } + + #[test] + fn test_take_distinguishes_rounds_and_digests() { + let tasks = VerificationTasks::::new(); + let digest_a = Sha256::hash(b"a"); + let digest_b = Sha256::hash(b"b"); + tasks.insert(round(1), digest_a, pending_task()); + tasks.insert(round(2), digest_a, pending_task()); + tasks.insert(round(1), digest_b, pending_task()); + + assert!(tasks.take(round(1), digest_a).is_some()); + assert!(tasks.take(round(2), digest_a).is_some()); + assert!(tasks.take(round(1), digest_b).is_some()); + } + + #[test] + fn test_retain_after_drops_at_and_below_boundary() { + let tasks = VerificationTasks::::new(); + let digest = Sha256::hash(b"block"); + tasks.insert(round(1), digest, pending_task()); + tasks.insert(round(2), digest, pending_task()); + tasks.insert(round(3), digest, pending_task()); + + tasks.retain_after(&round(2)); + + assert!( + tasks.take(round(1), digest).is_none(), + "tasks strictly below boundary should be dropped" + ); + assert!( + tasks.take(round(2), digest).is_none(), + "tasks at boundary should be dropped" + ); + assert!( + tasks.take(round(3), digest).is_some(), + "tasks strictly above boundary should be retained" + ); + } + + #[test] + fn test_retain_after_spans_epochs() { + let tasks = VerificationTasks::::new(); + let digest = Sha256::hash(b"block"); + let early = Round::new(Epoch::zero(), View::new(100)); + let late = Round::new(Epoch::new(1), View::zero()); + tasks.insert(early, digest, pending_task()); + tasks.insert(late, digest, pending_task()); + + tasks.retain_after(&early); + + assert!( + tasks.take(early, digest).is_none(), + "task at boundary must be dropped" + ); + assert!( + tasks.take(late, digest).is_some(), + "task in later epoch must outlive an earlier boundary" + ); + } + + #[test] + fn test_retain_after_empty_map_is_noop() { + let tasks = VerificationTasks::::new(); + tasks.retain_after(&round(5)); + assert!(tasks.take(round(5), Sha256::hash(b"x")).is_none()); + } + + #[test] + fn test_default_matches_new() { + let default = as Default>::default(); + let digest = Sha256::hash(b"block"); + default.insert(round(1), digest, pending_task()); + assert!(default.take(round(1), digest).is_some()); + } +} diff --git a/consensus/src/marshal/coding/marshaled.rs b/consensus/src/marshal/coding/marshaled.rs index cc3d1def0ff..dcd0018d711 100644 --- a/consensus/src/marshal/coding/marshaled.rs +++ b/consensus/src/marshal/coding/marshaled.rs @@ -83,7 +83,7 @@ use crate::{ ancestry::AncestorStream, application::{ validation::{ - is_inferred_reproposal_at_certify, is_valid_reproposal_at_verify, LastBuilt, + is_inferred_reproposal_at_certify, is_valid_reproposal_at_verify, LastBuilt, Stage, }, verification_tasks::VerificationTasks, }, @@ -299,6 +299,7 @@ where consensus_context: Context::PublicKey>, commitment: Commitment, prefetched_block: Option>, + stage: Stage, ) -> oneshot::Receiver { let mut marshal = self.marshal.clone(); let mut application = self.application.clone(); @@ -424,7 +425,7 @@ where is_valid = validity_request => is_valid, }; timer.observe(); - if application_valid && !marshal.verified(round, block).await { + if application_valid && !stage.store(&mut marshal, round, block).await { debug!(?round, "marshal unable to accept block"); return; } @@ -779,7 +780,7 @@ where // Kick off deferred verification early to hide verification latency behind // shard validity checks and network latency for collecting votes. let round = consensus_context.round; - let task = self.deferred_verify(consensus_context, payload, None); + let task = self.deferred_verify(consensus_context, payload, None, Stage::Verified); self.verification_tasks.insert(round, payload, task); match scheme.me() { @@ -895,9 +896,10 @@ where round, ); if is_reproposal { - // During crash recovery we may call `marshal.verified` twice for - // the same block; the call is idempotent. - if !marshaled.marshal.verified(round, block).await { + // Certifier holds a notarization for this block, so route + // the write to the notarized cache. `certified` is + // idempotent, so crash-recovery double-invocation is safe. + if !marshaled.marshal.certified(round, block).await { debug!(?round, "marshal unable to accept block"); return; } @@ -916,7 +918,12 @@ where // Use the block's embedded context for verification, passing the // prefetched block to avoid fetching it again inside deferred_verify. - let verify_rx = marshaled.deferred_verify(embedded_context, payload, Some(block)); + let verify_rx = marshaled.deferred_verify( + embedded_context, + payload, + Some(block), + Stage::Certified, + ); if let Ok(result) = verify_rx.await { tx.send_lossy(result); } diff --git a/consensus/src/marshal/coding/mod.rs b/consensus/src/marshal/coding/mod.rs index 4b83aa9765a..e69e8f23d8b 100644 --- a/consensus/src/marshal/coding/mod.rs +++ b/consensus/src/marshal/coding/mod.rs @@ -259,6 +259,109 @@ mod tests { harness::finalize_same_height_different_views::(); } + #[test_traced("WARN")] + fn test_coding_certify_persists_equivocated_block() { + harness::certify_persists_equivocated_block::(); + } + + #[test_traced("WARN")] + fn test_coding_certify_at_later_view_survives_earlier_view_pruning() { + harness::certify_at_later_view_survives_earlier_view_pruning::(); + } + + /// Finalizing a descendant must not height-prune the shard-engine buffer before + /// `try_repair_gaps` has consumed buffer-only ancestors. + /// + /// Places parent (height 1) and descendant (height 2) in the shard engine's + /// reconstructed-block cache via `proposed()`, then reports a finalization + /// for the descendant only. + #[test_traced("WARN")] + fn test_coding_store_finalization_does_not_prune_buffer_before_repair() { + let runner = deterministic::Runner::timed(Duration::from_secs(60)); + runner.start(|mut context| async move { + let Fixture { + participants, + schemes, + .. + } = bls12381_threshold_vrf::fixture::(&mut context, NAMESPACE, NUM_VALIDATORS); + let mut oracle = + setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone()) + .await; + + let setup = CodingHarness::setup_validator( + context.with_label("validator_0"), + &mut oracle, + participants[0].clone(), + ConstantProvider::new(schemes[0].clone()), + ) + .await; + let mut handle = harness::ValidatorHandle:: { + mailbox: setup.mailbox, + extra: setup.extra, + }; + + // Build a 2-block chain: parent at height 1, descendant at height 2. + let parent_block = CodingHarness::make_test_block( + Sha256::hash(b""), + CodingHarness::genesis_parent_commitment(NUM_VALIDATORS as u16), + Height::new(1), + 1, + NUM_VALIDATORS as u16, + ); + let parent_digest = CodingHarness::digest(&parent_block); + let parent_commitment = CodingHarness::commitment(&parent_block); + + let descendant_block = CodingHarness::make_test_block( + parent_digest, + parent_commitment, + Height::new(2), + 2, + NUM_VALIDATORS as u16, + ); + let descendant_commitment = CodingHarness::commitment(&descendant_block); + + // Seed the shard engine's reconstructed-block cache with both blocks. + CodingHarness::propose( + &mut handle, + Round::new(Epoch::new(0), View::new(1)), + &parent_block, + ) + .await; + CodingHarness::propose( + &mut handle, + Round::new(Epoch::new(0), View::new(2)), + &descendant_block, + ) + .await; + + // Report finalization for the descendant only. The parent has no + // finalization certificate: it must be archived by walking the + // parent link from the descendant and sourcing the block from the + // shard-engine buffer. + let descendant_proposal = Proposal { + round: Round::new(Epoch::new(0), View::new(2)), + parent: View::new(1), + payload: descendant_commitment, + }; + let descendant_finalization = + CodingHarness::make_finalization(descendant_proposal, &schemes, QUORUM); + CodingHarness::report_finalization(&mut handle.mailbox, descendant_finalization).await; + + // Wait until the descendant is archived: that proves finalization processing + // has completed, at which point the parent must already have been repaired + // from the shard buffer. + while handle.mailbox.get_block(Height::new(2)).await.is_none() { + context.sleep(Duration::from_millis(10)).await; + } + + let parent = handle.mailbox.get_block(Height::new(1)).await; + assert!( + parent.is_some(), + "parent must be archived from shard buffer before height-prune evicts it" + ); + }); + } + #[test_traced("WARN")] fn test_coding_init_processed_height() { harness::init_processed_height::(); diff --git a/consensus/src/marshal/core/actor.rs b/consensus/src/marshal/core/actor.rs index 1581358b7e0..7d1fa31f1d8 100644 --- a/consensus/src/marshal/core/actor.rs +++ b/consensus/src/marshal/core/actor.rs @@ -52,7 +52,7 @@ use std::{ pin::Pin, sync::Arc, }; -use tracing::{debug, error, info, warn}; +use tracing::{debug, error, warn}; /// The key used to store the last processed height in the metadata store. const LATEST_KEY: U64 = U64::new(0xFF); @@ -453,7 +453,7 @@ where result = self.pending_acks.current() => { // Start with the ack that woke this `select_loop!` arm. let mut pending = Some(self.pending_acks.complete_current(result)); - loop { + let last_acked_commitment = loop { let (height, commitment, result) = pending.take().expect("pending ack must exist"); match result { @@ -471,11 +471,11 @@ where // Opportunistically drain any additional already-ready acks so we // can persist one metadata sync for the whole batch below. - let Some(next) = self.pending_acks.pop_ready() else { - break; - }; - pending = Some(next); - } + match self.pending_acks.pop_ready() { + Some(next) => pending = Some(next), + None => break commitment, + } + }; // Persist buffered processed-height updates once after draining all ready acks. if let Err(e) = self.application_metadata.sync().await { @@ -483,12 +483,15 @@ where return; } + // Inform the buffer of the last acknowledged commitment (anything below this is safe to prune). + buffer.finalized(last_acked_commitment).await; + // Fill the pipeline self.try_dispatch_blocks(&mut application).await; }, // Handle consensus inputs before backfill or resolver traffic Some(message) = self.mailbox.recv() else { - info!("mailbox closed, shutting down"); + debug!("mailbox closed, shutting down"); break; } => { match message { @@ -543,6 +546,10 @@ where self.cache_verified(round, block.digest(), block).await; ack.send_lossy(()); } + Message::Certified { round, block, ack } => { + self.cache_block(round, block.digest(), block).await; + ack.send_lossy(()); + } Message::Notarization { notarization } => { let round = notarization.round(); let commitment = notarization.proposal.payload; @@ -588,7 +595,6 @@ where block, Some(finalization), &mut application, - &mut buffer, ) .await { @@ -730,7 +736,7 @@ where }, // Handle resolver messages last (batched up to max_repair, sync once) Some(message) = resolver_rx.recv() else { - info!("handler closed, shutting down"); + debug!("handler closed, shutting down"); return; } => { // Drain up to max_repair messages: blocks handled immediately, @@ -758,7 +764,6 @@ where response, &mut delivers, &mut application, - &mut buffer, ) .await; } @@ -767,7 +772,7 @@ where // Batch verify and process all delivers. needs_sync |= self - .verify_delivered(delivers, &mut application, &mut buffer) + .verify_delivered(delivers, &mut application) .await; // Attempt to fill gaps before handling produce requests (so we @@ -923,14 +928,13 @@ where /// immediately. Finalized/Notarized delivers are parsed and structurally /// validated, then collected into `delivers` for batch certificate verification. /// Returns true if finalization archives were written and need syncing. - async fn handle_deliver>( + async fn handle_deliver( &mut self, key: Request, value: Bytes, response: oneshot::Sender, delivers: &mut Vec>, application: &mut impl Reporter>, - buffer: &mut Buf, ) -> bool { match key { Request::Block(commitment) => { @@ -949,7 +953,7 @@ where let digest = block.digest(); let finalization = self.cache.get_finalization_for(digest).await; let wrote = self - .store_finalization(height, digest, block, finalization, application, buffer) + .store_finalization(height, digest, block, finalization, application) .await; debug!(?digest, %height, "received block"); response.send_lossy(true); // if a valid block is received, we should still send true (even if it was stale) @@ -1045,11 +1049,10 @@ where /// Batch verify pending certificates and process valid items. Returns true /// if finalization archives were written and need syncing. - async fn verify_delivered>( + async fn verify_delivered( &mut self, mut delivers: Vec>, application: &mut impl Reporter>, - buffer: &mut Buf, ) -> bool { if delivers.is_empty() { return false; @@ -1132,14 +1135,7 @@ where debug!(?round, %height, "received finalization"); wrote |= self - .store_finalization( - height, - digest, - block, - Some(finalization), - application, - buffer, - ) + .store_finalization(height, digest, block, Some(finalization), application) .await; } PendingVerification::Notarized { @@ -1169,7 +1165,6 @@ where block.clone(), Some(finalization), application, - buffer, ) .await; } @@ -1414,14 +1409,13 @@ where /// `select_loop!` so that archive data is durable before the ack handler /// advances `last_processed_height`. See [`Self::try_dispatch_blocks`] for the /// crash safety invariant. - async fn store_finalization>( + async fn store_finalization( &mut self, height: Height, digest: ::Digest, block: V::Block, finalization: Option>, application: &mut impl Reporter>, - buffer: &mut Buf, ) -> bool { // Blocks below the last processed height are not useful to us, so we ignore them (this // has the nice byproduct of ensuring we don't call a backing store with a block below the @@ -1438,7 +1432,6 @@ where self.notify_subscribers(&block); // Convert block to storage format - let commitment = V::commitment(&block); let stored: V::StoredBlock = block.into(); let round = finalization.as_ref().map(|f| f.round()); @@ -1463,13 +1456,12 @@ where panic!("failed to finalize: {e}"); } - // Update metrics, buffer, and application + // Update metrics and application if let Some(round) = round.filter(|_| height > self.tip) { application.report(Update::Tip(round, height, digest)).await; self.tip = height; let _ = self.finalized_height.try_set(height.get()); } - buffer.finalized(commitment).await; true } @@ -1592,7 +1584,6 @@ where block, Some(finalization), application, - buffer, ) .await; } else { @@ -1638,7 +1629,6 @@ where block.clone(), finalization, application, - buffer, ) .await; debug!(height = %block.height(), "repaired block"); diff --git a/consensus/src/marshal/core/mailbox.rs b/consensus/src/marshal/core/mailbox.rs index 2042b623be6..3dd41b1cdb8 100644 --- a/consensus/src/marshal/core/mailbox.rs +++ b/consensus/src/marshal/core/mailbox.rs @@ -112,6 +112,15 @@ pub(crate) enum Message { /// A channel signaled once the block is durably stored. ack: oneshot::Sender<()>, }, + /// A notification that a block has been certified by the application. + Certified { + /// The round in which the block was certified. + round: Round, + /// The certified block. + block: V::Block, + /// A channel signaled once the block is durably stored. + ack: oneshot::Sender<()>, + }, /// Sets the sync starting point (advances if higher than current). /// /// Marshal will sync and deliver blocks starting at `floor + 1`. Data below @@ -307,6 +316,16 @@ impl Mailbox { .is_some() } + /// Notifies the actor that a block has been certified, awaiting the actor's + /// confirmation that the block has been durably persisted before returning. + #[must_use = "callers must consider block durability before proceeding"] + pub async fn certified(&self, round: Round, block: V::Block) -> bool { + self.sender + .request(|ack| Message::Certified { round, block, ack }) + .await + .is_some() + } + /// Sets the sync starting point (advances if higher than current). /// /// Marshal will sync and deliver blocks starting at `floor + 1`. Data below diff --git a/consensus/src/marshal/mocks/harness.rs b/consensus/src/marshal/mocks/harness.rs index 18c9e0d06e4..5480677bc42 100644 --- a/consensus/src/marshal/mocks/harness.rs +++ b/consensus/src/marshal/mocks/harness.rs @@ -259,6 +259,13 @@ pub trait TestHarness: 'static + Sized { all_handles: &mut [ValidatorHandle], ) -> impl Future + Send; + /// Mark a block as certified. + fn certify( + handle: &mut ValidatorHandle, + round: Round, + block: &Self::TestBlock, + ) -> impl Future + Send; + /// Create a finalization certificate. fn make_finalization( proposal: Proposal, @@ -927,6 +934,207 @@ pub fn verified_success_implies_recoverable_after_restart( } } +/// Regression: when the same block is verified at an earlier view and later +/// certified at a much later view (epoch-boundary reproposal), both writes +/// must land so retention can prune the earlier view without losing the +/// block. A naive "skip the sibling write if the block's digest is already +/// present in the other archive" optimization is unsafe because the two +/// archives prune per-view on the same boundary: if the block lives only in +/// `verified_blocks[V_early]` and never gets written to +/// `notarized_blocks[V_late]`, advancing retention past V_early drops the +/// block even though V_late is still within the window. +pub fn certify_at_later_view_survives_earlier_view_pruning() { + let runner = deterministic::Runner::timed(Duration::from_secs(60)); + runner.start(|mut context| async move { + let Fixture { + participants, + schemes, + .. + } = bls12381_threshold_vrf::fixture::(&mut context, NAMESPACE, NUM_VALIDATORS); + let mut oracle = + setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone()) + .await; + let setup = H::setup_validator( + context.with_label("validator_0"), + &mut oracle, + participants[0].clone(), + ConstantProvider::new(schemes[0].clone()), + ) + .await; + let application = setup.application; + let mut handle = ValidatorHandle:: { + mailbox: setup.mailbox, + extra: setup.extra, + }; + + // A repeated block that we will verify at an early view and certify + // at a later view. Its height is intentionally well beyond the chain + // we'll drive below, so it never enters the finalized archive via + // gap repair and lives solely in the prunable caches. + let repeated = H::make_test_block( + Sha256::hash(b""), + H::genesis_parent_commitment(NUM_VALIDATORS as u16), + Height::new(5_000), + 9_999, + NUM_VALIDATORS as u16, + ); + let repeated_digest = H::digest(&repeated); + + // Negative control: a verify-only block at the same early view. Because + // it is never certified, it lives solely in `verified_blocks[V=1]` and + // must disappear once retention pruning advances past V=1. Asserting it + // is gone confirms the prune actually fires at the expected floor, so + // the `repeated` survivor assertion below is genuinely load-bearing. + let orphan = H::make_test_block( + Sha256::hash(b"orphan"), + H::genesis_parent_commitment(NUM_VALIDATORS as u16), + Height::new(6_000), + 9_998, + NUM_VALIDATORS as u16, + ); + let orphan_digest = H::digest(&orphan); + + // Verify `repeated` at V=1, then certify at V=25 (reproposal-style gap). + let v_early = Round::new(Epoch::zero(), View::new(1)); + let v_late = Round::new(Epoch::zero(), View::new(25)); + let mut peers: [ValidatorHandle; 0] = []; + H::verify(&mut handle, v_early, &repeated, &mut peers).await; + assert!( + H::certify(&mut handle, v_late, &repeated).await, + "certify must ack" + ); + + // Verify `orphan` at V=1 only (no certify). + H::verify(&mut handle, v_early, &orphan, &mut peers).await; + + // Drive the finalized chain forward to advance `last_processed_round` + // past V=1's retention boundary but not past V=25's. With + // view_retention_timeout=10 and prunable_items_per_section=10, + // processing views 1..=21 leaves `oldest_allowed=10` in both prunable + // archives — V=1 is dropped, V=25 is retained. + const CHAIN_LEN: u64 = 21; + let mut parent = Sha256::hash(b""); + let mut parent_commitment = H::genesis_parent_commitment(NUM_VALIDATORS as u16); + for i in 1..=CHAIN_LEN { + let block = H::make_test_block( + parent, + parent_commitment, + Height::new(i), + i, + NUM_VALIDATORS as u16, + ); + let digest = H::digest(&block); + let commitment = H::commitment(&block); + let round = Round::new(Epoch::zero(), View::new(i)); + H::propose(&mut handle, round, &block).await; + let proposal = Proposal { + round, + parent: View::new(i - 1), + payload: commitment, + }; + let finalization = H::make_finalization(proposal, &schemes, QUORUM); + H::report_finalization(&mut handle.mailbox, finalization).await; + parent = digest; + parent_commitment = commitment; + } + while (application.blocks().len() as u64) < CHAIN_LEN { + context.sleep(Duration::from_millis(10)).await; + } + context.sleep(Duration::from_millis(100)).await; + + // Negative control: the verify-only orphan at V=1 must be gone, which + // proves retention pruning actually evicted V=1 at the expected floor. + assert!( + handle.mailbox.get_block(&orphan_digest).await.is_none(), + "verify-only block at V=1 must be evicted by retention pruning" + ); + + // The repeated block must still be retrievable: verified_blocks[V=1] + // has been pruned, but notarized_blocks[V=25] still holds it. + let recovered = handle.mailbox.get_block(&repeated_digest).await; + assert!( + recovered.is_some(), + "block certified at V=25 must survive retention pruning of V=1" + ); + assert_eq!(recovered.unwrap().digest(), repeated_digest); + }); +} + +/// Regression: when a leader equivocates, a validator may verify one block +/// (A) and then certify a different block (B) at the same round. `verified()` +/// and `certified()` must write to distinct archives so both blocks are +/// retained and retrievable; otherwise the second write collides on the same +/// prunable-archive index (`skip_if_index_exists=true`) and is silently +/// dropped despite the mailbox returning success. +pub fn certify_persists_equivocated_block() { + let runner = deterministic::Runner::timed(Duration::from_secs(60)); + runner.start(|mut context| async move { + let Fixture { + participants, + schemes, + .. + } = bls12381_threshold_vrf::fixture::(&mut context, NAMESPACE, NUM_VALIDATORS); + let mut oracle = + setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone()) + .await; + let setup = H::setup_validator( + context.with_label("validator_0"), + &mut oracle, + participants[0].clone(), + ConstantProvider::new(schemes[0].clone()), + ) + .await; + let mut handle = ValidatorHandle:: { + mailbox: setup.mailbox, + extra: setup.extra, + }; + + let round = Round::new(Epoch::zero(), View::new(1)); + let parent = Sha256::hash(b""); + let parent_commitment = H::genesis_parent_commitment(NUM_VALIDATORS as u16); + + // Two distinct blocks at the same height/round (leader equivocation): + // distinct timestamps yield distinct digests. + let block_a = H::make_test_block( + parent, + parent_commitment, + Height::new(1), + 1, + NUM_VALIDATORS as u16, + ); + let digest_a = H::digest(&block_a); + let block_b = H::make_test_block( + parent, + parent_commitment, + Height::new(1), + 2, + NUM_VALIDATORS as u16, + ); + let digest_b = H::digest(&block_b); + assert_ne!(digest_a, digest_b, "test requires distinct digests"); + + let mut peers: [ValidatorHandle; 0] = []; + H::verify(&mut handle, round, &block_a, &mut peers).await; + assert!( + H::certify(&mut handle, round, &block_b).await, + "certified must ack" + ); + + let got_a = handle.mailbox.get_block(&digest_a).await; + assert!( + got_a.is_some(), + "verified block A must be persisted in verified_blocks" + ); + assert_eq!(got_a.unwrap().digest(), digest_a); + let got_b = handle.mailbox.get_block(&digest_b).await; + assert!( + got_b.is_some(), + "certified block B must be persisted despite a verify at the same round" + ); + assert_eq!(got_b.unwrap().digest(), digest_b); + }); +} + /// Contract: once marshal has delivered a finalized block to the application, /// that finalized block and its certificate must already be durable. pub fn delivery_visibility_implies_recoverable_after_restart( @@ -1275,6 +1483,10 @@ impl TestHarness for StandardHarness { assert!(handle.mailbox.verified(round, block.clone()).await); } + async fn certify(handle: &mut ValidatorHandle, round: Round, block: &B) -> bool { + handle.mailbox.certified(round, block.clone()).await + } + fn make_finalization(proposal: Proposal, schemes: &[S], quorum: u32) -> Finalization { let finalizes: Vec<_> = schemes .iter() @@ -1536,6 +1748,22 @@ impl TestHarness for InlineHarness { .await; } + async fn certify( + handle: &mut ValidatorHandle, + round: Round, + block: &Self::TestBlock, + ) -> bool { + StandardHarness::certify( + &mut ValidatorHandle:: { + mailbox: handle.mailbox.clone(), + extra: handle.extra.clone(), + }, + round, + block, + ) + .await + } + fn make_finalization( proposal: Proposal, schemes: &[S], @@ -1724,6 +1952,22 @@ impl TestHarness for DeferredHarness { .await; } + async fn certify( + handle: &mut ValidatorHandle, + round: Round, + block: &Self::TestBlock, + ) -> bool { + InlineHarness::certify( + &mut ValidatorHandle:: { + mailbox: handle.mailbox.clone(), + extra: handle.extra.clone(), + }, + round, + block, + ) + .await + } + fn make_finalization( proposal: Proposal, schemes: &[S], @@ -2063,6 +2307,14 @@ impl TestHarness for CodingHarness { assert!(handle.mailbox.verified(round, block.clone()).await); } + async fn certify( + handle: &mut ValidatorHandle, + round: Round, + block: &CodedBlock, Sha256>, + ) -> bool { + handle.mailbox.certified(round, block.clone()).await + } + fn make_finalization( proposal: Proposal, schemes: &[S], diff --git a/consensus/src/marshal/resolver/handler.rs b/consensus/src/marshal/resolver/handler.rs index df54ec15ab9..f0ffdcc0910 100644 --- a/consensus/src/marshal/resolver/handler.rs +++ b/consensus/src/marshal/resolver/handler.rs @@ -342,6 +342,16 @@ mod tests { assert_eq!(decoded, Request::Notarized { round }); } + #[test] + fn test_subject_decode_rejects_invalid_enum_tag() { + let bad = [3u8]; + let mut buf = bad.as_ref(); + assert!(matches!( + Request::::read(&mut buf), + Err(CodecError::InvalidEnum(3)) + )); + } + #[test] fn test_subject_hash() { use std::collections::HashSet; diff --git a/consensus/src/marshal/standard/deferred.rs b/consensus/src/marshal/standard/deferred.rs index 295f95b7217..396a86a0f19 100644 --- a/consensus/src/marshal/standard/deferred.rs +++ b/consensus/src/marshal/standard/deferred.rs @@ -74,7 +74,7 @@ use crate::{ marshal::{ ancestry::AncestorStream, application::{ - validation::{is_inferred_reproposal_at_certify, LastBuilt}, + validation::{is_inferred_reproposal_at_certify, LastBuilt, Stage}, verification_tasks::VerificationTasks, }, core::Mailbox, @@ -203,6 +203,7 @@ where &mut self, context: ::Context, block: B, + stage: Stage, ) -> oneshot::Receiver { let mut marshal = self.marshal.clone(); let mut application = self.application.clone(); @@ -226,6 +227,7 @@ where &mut application, &mut marshal, &mut tx, + stage, ) .await { @@ -500,7 +502,7 @@ where // Begin the rest of the verification process asynchronously. let round = context.round; - let task = marshaled.deferred_verify(context, block); + let task = marshaled.deferred_verify(context, block, Stage::Verified); marshaled.verification_tasks.insert(round, digest, task); tx.send_lossy(true); @@ -584,9 +586,10 @@ where round, ); if is_reproposal { - // It is possible that, during crash recovery, we call `marshal.verified` - // twice for the same block. That function is idempotent, so this is safe. - if !marshaled.marshal.verified(round, block).await { + // Certifier holds a notarization for this block, so route + // the write to the notarized cache. `certified` is + // idempotent, so crash-recovery double-invocation is safe. + if !marshaled.marshal.certified(round, block).await { debug!(?round, "marshal unable to accept block"); return; } @@ -594,7 +597,8 @@ where return; } - let verify_rx = marshaled.deferred_verify(embedded_context, block); + let verify_rx = + marshaled.deferred_verify(embedded_context, block, Stage::Certified); if let Ok(result) = verify_rx.await { tx.send_lossy(result); } diff --git a/consensus/src/marshal/standard/inline.rs b/consensus/src/marshal/standard/inline.rs index fe3c695b136..87be653eea8 100644 --- a/consensus/src/marshal/standard/inline.rs +++ b/consensus/src/marshal/standard/inline.rs @@ -45,7 +45,7 @@ use crate::{ marshal::{ ancestry::AncestorStream, - application::validation::LastBuilt, + application::validation::{LastBuilt, Stage}, core::Mailbox, standard::{ validation::{ @@ -224,8 +224,11 @@ where /// Proposes a new block or re-proposes an epoch boundary block. /// /// Proposal runs in a spawned task and returns a receiver for the resulting digest. - /// Blocks are persisted and broadcast via `marshal.proposed()` before the digest - /// is returned to consensus. + /// The built block is cached in memory (`last_built`) for the subsequent + /// `Relay::broadcast(Plan::Propose)` call, which invokes `marshal.proposed()` + /// as the durable persistence boundary before consensus continues. Receiving + /// a digest from `propose()` alone does not mean the block is recoverable + /// after restart. async fn propose( &mut self, consensus_context: Context, @@ -408,6 +411,7 @@ where &mut application, &mut marshal, &mut tx, + Stage::Verified, ) .await { @@ -461,8 +465,10 @@ where // `certify` resolving true drives the finalize vote, so mere // buffered availability is not sufficient here. Persist the - // block through marshal before signaling success. - if marshal.verified(round, block).await { + // block through marshal before signaling success. The caller + // holds a notarization for this block, so route it into the + // notarized cache directly rather than the verified cache. + if marshal.certified(round, block).await { tx.send_lossy(true); } }); diff --git a/consensus/src/marshal/standard/mod.rs b/consensus/src/marshal/standard/mod.rs index 1aa7bb27003..b4971181e57 100644 --- a/consensus/src/marshal/standard/mod.rs +++ b/consensus/src/marshal/standard/mod.rs @@ -201,6 +201,18 @@ mod tests { harness::verified_success_implies_recoverable_after_restart::(0..16); } + #[test_traced("WARN")] + fn test_standard_certify_persists_equivocated_block() { + harness::certify_persists_equivocated_block::(); + harness::certify_persists_equivocated_block::(); + } + + #[test_traced("WARN")] + fn test_standard_certify_at_later_view_survives_earlier_view_pruning() { + harness::certify_at_later_view_survives_earlier_view_pruning::(); + harness::certify_at_later_view_survives_earlier_view_pruning::(); + } + #[test_traced("WARN")] fn test_standard_delivery_visibility_implies_recoverable_after_restart() { harness::delivery_visibility_implies_recoverable_after_restart::(0..16); @@ -1580,48 +1592,22 @@ mod tests { } } - /// A no-op resolver used by tests that drive the marshal actor's - /// resolver_rx channel directly. Outbound fetches/cancellations are dropped. + /// Recorded `send` call on the [`RecordingBuffer`]. + type BufferSend = (Round, B, Recipients); + + /// A buffer that records each `send` invocation; other methods are no-ops. #[derive(Clone, Default)] - struct NoopResolver { - _keepalive: Option>>, + struct RecordingBuffer { + sends: Arc>>, } - impl NoopResolver { - fn holding(sender: mpsc::Sender>) -> Self { - Self { - _keepalive: Some(sender), - } + impl RecordingBuffer { + fn sends(&self) -> Vec { + self.sends.lock().clone() } } - impl Resolver for NoopResolver { - type Key = handler::Request; - type PublicKey = PublicKey; - - async fn fetch(&mut self, _key: Self::Key) {} - async fn fetch_all(&mut self, _keys: Vec) {} - async fn fetch_targeted( - &mut self, - _key: Self::Key, - _targets: NonEmptyVec, - ) { - } - async fn fetch_all_targeted( - &mut self, - _requests: Vec<(Self::Key, NonEmptyVec)>, - ) { - } - async fn cancel(&mut self, _key: Self::Key) {} - async fn clear(&mut self) {} - async fn retain(&mut self, _predicate: impl Fn(&Self::Key) -> bool + Send + 'static) {} - } - - /// A no-op buffer used by tests that do not need marshal's dissemination path. - #[derive(Clone, Default)] - struct NoopBuffer; - - impl crate::marshal::core::Buffer> for NoopBuffer { + impl crate::marshal::core::Buffer> for RecordingBuffer { type PublicKey = PublicKey; type CachedBlock = B; @@ -1648,7 +1634,76 @@ mod tests { async fn finalized(&self, _commitment: D) {} - async fn send(&self, _round: Round, _block: B, _recipients: Recipients) {} + async fn send(&self, round: Round, block: B, recipients: Recipients) { + self.sends.lock().push((round, block, recipients)); + } + } + + /// Recorded `fetch_targeted` call on the [`RecordingResolver`]. + type TargetedFetch = (handler::Request, NonEmptyVec); + + /// A resolver that records each `fetch_targeted` invocation; other + /// methods are no-ops. + /// + /// `_keepalive` optionally retains a resolver-message sender so the + /// actor's corresponding receiver stays alive when nothing else owns it. + #[derive(Clone, Default)] + struct RecordingResolver { + targeted: Arc>>, + _keepalive: Option>>, + } + + impl RecordingResolver { + fn holding(sender: mpsc::Sender>) -> Self { + Self { + targeted: Arc::new(Mutex::new(Vec::new())), + _keepalive: Some(sender), + } + } + + fn targeted(&self) -> Vec { + self.targeted.lock().clone() + } + + fn targeted_is_empty(&self) -> bool { + self.targeted.lock().is_empty() + } + } + + impl Resolver for RecordingResolver { + type Key = handler::Request; + type PublicKey = PublicKey; + + async fn fetch(&mut self, _key: Self::Key) {} + async fn fetch_all(&mut self, _keys: Vec) {} + async fn fetch_targeted(&mut self, key: Self::Key, targets: NonEmptyVec) { + self.targeted.lock().push((key, targets)); + } + async fn fetch_all_targeted( + &mut self, + requests: Vec<(Self::Key, NonEmptyVec)>, + ) { + self.targeted.lock().extend(requests); + } + async fn cancel(&mut self, _key: Self::Key) {} + async fn clear(&mut self) {} + async fn retain(&mut self, _predicate: impl Fn(&Self::Key) -> bool + Send + 'static) {} + } + + /// Poll `cond` on a 10ms tick until it returns true, panicking on timeout. + async fn wait_until bool>( + context: &deterministic::Context, + deadline: Duration, + label: &str, + mut cond: F, + ) { + let start = context.current(); + while !cond() { + if context.current().duration_since(start).unwrap_or_default() > deadline { + panic!("{label} did not hold within {deadline:?}"); + } + context.sleep(Duration::from_millis(10)).await; + } } /// A buffer whose `send` blocks until released, and signals when entered. @@ -1754,32 +1809,22 @@ mod tests { } } - async fn start_standard_actor>>( - context: deterministic::Context, - partition_prefix: &str, - provider: ConstantProvider, - application: R, - ) -> (Mailbox>, commonware_runtime::Handle<()>) { - start_standard_actor_with_buffer( - context, - partition_prefix, - provider, - application, - NoopBuffer, - ) - .await - } - - async fn start_standard_actor_with_buffer( + async fn start_standard_actor( context: deterministic::Context, partition_prefix: &str, provider: ConstantProvider, application: R, buffer: Buf, - ) -> (Mailbox>, commonware_runtime::Handle<()>) + ) -> ( + Mailbox>, + Buf, + RecordingResolver, + commonware_runtime::Handle<()>, + ) where R: Reporter>, - Buf: crate::marshal::core::Buffer, PublicKey = PublicKey, CachedBlock = B>, + Buf: crate::marshal::core::Buffer, PublicKey = PublicKey, CachedBlock = B> + + Clone, { let config = Config { provider, @@ -1863,12 +1908,10 @@ mod tests { ) .await; let (resolver_tx, resolver_rx) = mpsc::channel(100); - let actor_handle = actor.start( - application, - buffer, - (resolver_rx, NoopResolver::holding(resolver_tx)), - ); - (mailbox, actor_handle) + let resolver = RecordingResolver::holding(resolver_tx); + let actor_handle = + actor.start(application, buffer.clone(), (resolver_rx, resolver.clone())); + (mailbox, buffer, resolver, actor_handle) } /// Regression: `marshal.proposed` must not ack until the block has been @@ -1886,7 +1929,7 @@ mod tests { let partition_prefix = format!("proposed-waits-buffer-{me}"); let (buffer, send_entered, release) = GatingBuffer::new(); - let (mailbox, _actor_handle) = start_standard_actor_with_buffer( + let (mailbox, _buffer, _resolver, _actor_handle) = start_standard_actor( context.with_label("validator_0"), &partition_prefix, ConstantProvider::new(schemes[0].clone()), @@ -2037,7 +2080,7 @@ mod tests { actor.start( Application::::default(), buffer, - (resolver_rx, NoopResolver::default()), + (resolver_rx, RecordingResolver::default()), ); // Inject a Finalized delivery with garbage payload. The @@ -2096,11 +2139,12 @@ mod tests { ); let (application, started, release) = GatedBlockReporter::new(); - let (mut mailbox, actor_handle) = start_standard_actor( + let (mut mailbox, _buffer, _resolver, actor_handle) = start_standard_actor( context.with_label("validator_0"), &partition_prefix, ConstantProvider::new(schemes[0].clone()), application, + RecordingBuffer::default(), ) .await; @@ -2130,11 +2174,12 @@ mod tests { // Yield once so the aborted actor drops its storage handles before restart. context.sleep(Duration::from_millis(1)).await; - let (mailbox, _actor_handle) = start_standard_actor( + let (mailbox, _buffer, _resolver, _actor_handle) = start_standard_actor( context.with_label("validator_0_restart"), &partition_prefix, ConstantProvider::new(schemes[0].clone()), Application::::manual_ack(), + RecordingBuffer::default(), ) .await; @@ -2291,4 +2336,266 @@ mod tests { } }); } + + /// `Forward` for an unknown commitment must early-return without + /// dispatching, even when peers are provided. + #[test_traced("WARN")] + fn test_standard_forward_unknown_block_is_noop() { + let runner = deterministic::Runner::timed(Duration::from_secs(30)); + runner.start(|mut context| async move { + let Fixture { + participants, + schemes, + .. + } = bls12381_threshold_vrf::fixture::(&mut context, NAMESPACE, NUM_VALIDATORS); + let me = participants[0].clone(); + let round = Round::new(Epoch::zero(), View::new(1)); + let unknown = Sha256::hash(b"unknown-block"); + + let (mailbox, buffer, _resolver, _actor_handle) = start_standard_actor( + context.with_label("validator_0"), + &format!("forward-unknown-{me}"), + ConstantProvider::new(schemes[0].clone()), + Application::::manual_ack(), + RecordingBuffer::default(), + ) + .await; + + mailbox + .forward(round, unknown, vec![participants[1].clone()]) + .await; + context.sleep(Duration::from_millis(50)).await; + + assert!( + buffer.sends().is_empty(), + "forward for an unknown block must not dispatch" + ); + }); + } + + /// `Forward` for a block that marshal has cached must dispatch that block + /// to exactly the provided peer set via the buffer. + #[test_traced("WARN")] + fn test_standard_forward_cached_block_sends_to_peers() { + let runner = deterministic::Runner::timed(Duration::from_secs(30)); + runner.start(|mut context| async move { + let Fixture { + participants, + schemes, + .. + } = bls12381_threshold_vrf::fixture::(&mut context, NAMESPACE, NUM_VALIDATORS); + let me = participants[0].clone(); + let round = Round::new(Epoch::zero(), View::new(1)); + let block = make_raw_block(Sha256::hash(b""), Height::new(1), 100); + let digest = block.digest(); + + let (mailbox, buffer, _resolver, _actor_handle) = start_standard_actor( + context.with_label("validator_0"), + &format!("forward-cached-{me}"), + ConstantProvider::new(schemes[0].clone()), + Application::::manual_ack(), + RecordingBuffer::default(), + ) + .await; + + assert!(mailbox.verified(round, block.clone()).await); + + let targets = vec![participants[1].clone(), participants[2].clone()]; + mailbox.forward(round, digest, targets.clone()).await; + + wait_until(&context, Duration::from_secs(5), "buffer.send", || { + !buffer.sends.lock().is_empty() + }) + .await; + + let sends = buffer.sends(); + assert_eq!(sends.len(), 1); + let (sent_round, sent_block, sent_recipients) = &sends[0]; + assert_eq!(*sent_round, round); + assert_eq!(sent_block.digest(), digest); + match sent_recipients { + Recipients::Some(peers) => assert_eq!(peers, &targets), + other => panic!("expected Recipients::Some, got {other:?}"), + } + }); + } + + /// `HintFinalized` at or below the floor must be a no-op: marshal must + /// not fire a targeted resolver fetch since the hint is stale. + #[test_traced("WARN")] + fn test_standard_hint_finalized_below_floor_is_noop() { + let runner = deterministic::Runner::timed(Duration::from_secs(30)); + runner.start(|mut context| async move { + let Fixture { + participants, + schemes, + .. + } = bls12381_threshold_vrf::fixture::(&mut context, NAMESPACE, NUM_VALIDATORS); + let me = participants[0].clone(); + + let (mailbox, _buffer, resolver, _actor_handle) = start_standard_actor( + context.with_label("validator_0"), + &format!("hint-below-floor-{me}"), + ConstantProvider::new(schemes[0].clone()), + Application::::manual_ack(), + RecordingBuffer::default(), + ) + .await; + + // Raise the floor above the hint we are about to send. + mailbox.set_floor(Height::new(10)).await; + context.sleep(Duration::from_millis(50)).await; + + mailbox + .hint_finalized(Height::new(5), NonEmptyVec::new(participants[1].clone())) + .await; + context.sleep(Duration::from_millis(50)).await; + + assert!( + resolver.targeted_is_empty(), + "hint at or below floor must not fetch" + ); + }); + } + + /// `HintFinalized` for a height whose finalization is already durable must + /// be a no-op: marshal already has everything needed and must not + /// initiate a redundant fetch. + #[test_traced("WARN")] + fn test_standard_hint_finalized_skips_when_already_finalized() { + let runner = deterministic::Runner::timed(Duration::from_secs(30)); + runner.start(|mut context| async move { + let Fixture { + participants, + schemes, + .. + } = bls12381_threshold_vrf::fixture::(&mut context, NAMESPACE, NUM_VALIDATORS); + let me = participants[0].clone(); + let round = Round::new(Epoch::zero(), View::new(1)); + let block = make_raw_block(Sha256::hash(b""), Height::new(1), 100); + let finalization = StandardHarness::make_finalization( + Proposal::new(round, View::zero(), block.digest()), + &schemes, + QUORUM, + ); + + let (mut mailbox, _buffer, resolver, _actor_handle) = start_standard_actor( + context.with_label("validator_0"), + &format!("hint-already-final-{me}"), + ConstantProvider::new(schemes[0].clone()), + Application::::manual_ack(), + RecordingBuffer::default(), + ) + .await; + + assert!(mailbox.verified(round, block.clone()).await); + StandardHarness::report_finalization(&mut mailbox, finalization).await; + + // Wait until marshal has durably stored the finalization. + while mailbox.get_finalization(Height::new(1)).await.is_none() { + context.sleep(Duration::from_millis(10)).await; + } + + mailbox + .hint_finalized(Height::new(1), NonEmptyVec::new(participants[1].clone())) + .await; + context.sleep(Duration::from_millis(50)).await; + + assert!( + resolver.targeted_is_empty(), + "hint for a locally-finalized height must not fetch" + ); + }); + } + + /// `HintFinalized` above the floor for a not-yet-finalized height must + /// trigger exactly one targeted fetch via the resolver. + #[test_traced("WARN")] + fn test_standard_hint_finalized_emits_targeted_fetch() { + let runner = deterministic::Runner::timed(Duration::from_secs(30)); + runner.start(|mut context| async move { + let Fixture { + participants, + schemes, + .. + } = bls12381_threshold_vrf::fixture::(&mut context, NAMESPACE, NUM_VALIDATORS); + let me = participants[0].clone(); + + let (mailbox, _buffer, resolver, _actor_handle) = start_standard_actor( + context.with_label("validator_0"), + &format!("hint-targets-{me}"), + ConstantProvider::new(schemes[0].clone()), + Application::::manual_ack(), + RecordingBuffer::default(), + ) + .await; + + let target = participants[1].clone(); + mailbox + .hint_finalized(Height::new(7), NonEmptyVec::new(target.clone())) + .await; + + wait_until(&context, Duration::from_secs(5), "fetch_targeted", || { + !resolver.targeted.lock().is_empty() + }) + .await; + + let targeted = resolver.targeted(); + assert_eq!(targeted.len(), 1); + let (request, targets) = &targeted[0]; + assert_eq!( + request, + &handler::Request::Finalized { + height: Height::new(7) + } + ); + assert_eq!(&targets[..], &[target]); + }); + } + + /// `Prune` for a height above the floor must be rejected (warn + continue) + /// and must not advance the floor or alter the finalized archive contents. + #[test_traced("WARN")] + fn test_standard_prune_above_floor_is_rejected() { + let runner = deterministic::Runner::timed(Duration::from_secs(30)); + runner.start(|mut context| async move { + let Fixture { + participants, + schemes, + .. + } = bls12381_threshold_vrf::fixture::(&mut context, NAMESPACE, NUM_VALIDATORS); + let me = participants[0].clone(); + let round = Round::new(Epoch::zero(), View::new(1)); + let block = make_raw_block(Sha256::hash(b""), Height::new(1), 100); + let finalization = StandardHarness::make_finalization( + Proposal::new(round, View::zero(), block.digest()), + &schemes, + QUORUM, + ); + + let (mut mailbox, _buffer, _resolver, _actor_handle) = start_standard_actor( + context.with_label("validator_0"), + &format!("prune-above-floor-{me}"), + ConstantProvider::new(schemes[0].clone()), + Application::::manual_ack(), + RecordingBuffer::default(), + ) + .await; + + assert!(mailbox.verified(round, block.clone()).await); + StandardHarness::report_finalization(&mut mailbox, finalization).await; + + while mailbox.get_finalization(Height::new(1)).await.is_none() { + context.sleep(Duration::from_millis(10)).await; + } + + // Prune above the floor must be a no-op, not an error. + mailbox.prune(Height::new(100)).await; + context.sleep(Duration::from_millis(50)).await; + + // The finalized block and its finalization must still be retrievable. + assert!(mailbox.get_block(Height::new(1)).await.is_some()); + assert!(mailbox.get_finalization(Height::new(1)).await.is_some()); + }); + } } diff --git a/consensus/src/marshal/standard/validation.rs b/consensus/src/marshal/standard/validation.rs index 31d96901ef8..673f8e418a5 100644 --- a/consensus/src/marshal/standard/validation.rs +++ b/consensus/src/marshal/standard/validation.rs @@ -2,7 +2,7 @@ use crate::{ marshal::{ ancestry::AncestorStream, application::validation::{ - has_contiguous_height, is_block_in_expected_epoch, is_valid_reproposal_at_verify, + has_contiguous_height, is_block_in_expected_epoch, is_valid_reproposal_at_verify, Stage, }, core::Mailbox, standard::Standard, @@ -125,6 +125,7 @@ pub(super) async fn verify_with_parent( application: &mut A, marshal: &mut Mailbox>, tx: &mut oneshot::Sender, + stage: Stage, ) -> Option where E: Rng + Spawner + Metrics + Clock, @@ -201,7 +202,7 @@ where valid = validity_request => valid, }; - if application_valid && !marshal.verified(context.round, block).await { + if application_valid && !stage.store(marshal, context.round, block).await { debug!(round = ?context.round, "marshal unable to accept block"); return None; } diff --git a/consensus/src/simplex/actors/voter/mod.rs b/consensus/src/simplex/actors/voter/mod.rs index 87f25a34584..2441d2c2e81 100644 --- a/consensus/src/simplex/actors/voter/mod.rs +++ b/consensus/src/simplex/actors/voter/mod.rs @@ -312,6 +312,109 @@ mod tests { propose_broadcast_failure_stops_before_notarize::<_, _>(secp256r1::fixture); } + /// Engine must not panic when the voter exits cleanly after the local + /// relay rejects `Plan::Propose`. The voter treats that as a fatal stop; + /// the engine must agree and shut down gracefully. + #[test_traced] + fn test_engine_stops_cleanly_when_voter_exits_after_failed_propose_broadcast() { + let namespace = + b"engine_stops_cleanly_when_voter_exits_after_failed_propose_broadcast".to_vec(); + let partition = + "engine_stops_cleanly_when_voter_exits_after_failed_propose_broadcast".to_string(); + let executor = deterministic::Runner::timed(Duration::from_secs(10)); + executor.start(|mut context| async move { + let Fixture { + participants, + schemes, + .. + } = bls12381_threshold_vrf::fixture::(&mut context, &namespace, 5); + let oracle = + start_test_network_with_peers(context.clone(), participants.clone(), true).await; + + let me = participants[0].clone(); + let elector = RoundRobin::::default(); + let reporter_cfg = mocks::reporter::Config { + participants: participants.clone().try_into().unwrap(), + scheme: schemes[0].clone(), + elector: elector.clone(), + }; + let reporter = mocks::reporter::Reporter::new( + context.with_label("reporter"), + reporter_cfg.clone(), + ); + + let app_relay = Arc::new(mocks::relay::Relay::new()); + let app_cfg = mocks::application::Config { + hasher: Sha256::default(), + relay: app_relay, + me: me.clone(), + propose_latency: (0.0, 0.0), + verify_latency: (0.0, 0.0), + certify_latency: (0.0, 0.0), + should_certify: mocks::application::Certifier::Always, + }; + let (app_actor, application) = + mocks::application::Application::new(context.with_label("app"), app_cfg); + app_actor.start(); + + let cfg = crate::simplex::config::Config { + scheme: schemes[0].clone(), + elector, + blocker: oracle.control(me.clone()), + automaton: application, + relay: FailingRelay::default(), + reporter, + strategy: Sequential, + partition, + mailbox_size: 128, + epoch: Epoch::new(4), + leader_timeout: Duration::from_secs(5), + certification_timeout: Duration::from_secs(5), + timeout_retry: Duration::from_mins(60), + fetch_timeout: Duration::from_secs(1), + activity_timeout: ViewDelta::new(10), + skip_timeout: ViewDelta::new(5), + fetch_concurrent: 4, + replay_buffer: NZUsize!(1024 * 1024), + write_buffer: NZUsize!(1024 * 1024), + page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE), + forwarding: crate::simplex::config::ForwardingPolicy::Disabled, + }; + let engine = crate::simplex::Engine::new(context.with_label("engine"), cfg); + + let (vote_sender, vote_receiver) = oracle + .control(me.clone()) + .register(0, TEST_QUOTA) + .await + .unwrap(); + let (cert_sender, cert_receiver) = oracle + .control(me.clone()) + .register(1, TEST_QUOTA) + .await + .unwrap(); + let (resolver_sender, resolver_receiver) = oracle + .control(me.clone()) + .register(2, TEST_QUOTA) + .await + .unwrap(); + + let handle = engine.start( + (vote_sender, vote_receiver), + (cert_sender, cert_receiver), + (resolver_sender, resolver_receiver), + ); + + select! { + result = handle => { + result.expect("engine should stop cleanly after voter exit"); + }, + _ = context.sleep(Duration::from_secs(2)) => { + panic!("timed out waiting for engine to stop after voter exit"); + } + } + }); + } + fn build_notarization>( schemes: &[S], proposal: &Proposal, diff --git a/consensus/src/simplex/engine.rs b/consensus/src/simplex/engine.rs index 1b609032908..22edfd73b58 100644 --- a/consensus/src/simplex/engine.rs +++ b/consensus/src/simplex/engine.rs @@ -225,20 +225,20 @@ impl< certificate_sender, ); - // Wait for the resolver or voter to finish + // If any task completes, the engine should stop let mut shutdown = self.context.stopped(); select! { _ = &mut shutdown => { debug!("context shutdown, stopping engine"); }, - _ = &mut voter_task => { - panic!("voter should not finish"); + voter = &mut voter_task => { + debug!(?voter, "voter stopped, shutting down engine"); }, - _ = &mut batcher_task => { - panic!("batcher should not finish"); + batcher = &mut batcher_task => { + debug!(?batcher, "batcher stopped, shutting down engine"); }, - _ = &mut resolver_task => { - panic!("resolver should not finish"); + resolver = &mut resolver_task => { + debug!(?resolver, "resolver stopped, shutting down engine"); }, } } diff --git a/p2p/src/authenticated/discovery/network.rs b/p2p/src/authenticated/discovery/network.rs index 69a3cef9c43..d8408b794c8 100644 --- a/p2p/src/authenticated/discovery/network.rs +++ b/p2p/src/authenticated/discovery/network.rs @@ -201,26 +201,26 @@ impl< let mut shutdown = self.context.stopped(); - // Wait for first actor to exit + // If any task completes, the network should stop info!("network started"); select! { _ = &mut shutdown => { debug!("context shutdown, stopping network"); }, tracker = &mut tracker_task => { - panic!("tracker exited unexpectedly: {tracker:?}"); + debug!(?tracker, "tracker stopped, shutting down network"); }, router = &mut router_task => { - panic!("router exited unexpectedly: {router:?}"); + debug!(?router, "router stopped, shutting down network"); }, spawner = &mut spawner_task => { - panic!("spawner exited unexpectedly: {spawner:?}"); + debug!(?spawner, "spawner stopped, shutting down network"); }, listener = &mut listener_task => { - panic!("listener exited unexpectedly: {listener:?}"); + debug!(?listener, "listener stopped, shutting down network"); }, dialer = &mut dialer_task => { - panic!("dialer exited unexpectedly: {dialer:?}"); + debug!(?dialer, "dialer stopped, shutting down network"); }, } } diff --git a/p2p/src/authenticated/lookup/network.rs b/p2p/src/authenticated/lookup/network.rs index ddb54c633a4..79a78200c15 100644 --- a/p2p/src/authenticated/lookup/network.rs +++ b/p2p/src/authenticated/lookup/network.rs @@ -194,26 +194,26 @@ impl< let mut shutdown = self.context.stopped(); - // Wait for first actor to exit + // If any task completes, the network should stop info!("network started"); select! { _ = &mut shutdown => { debug!("context shutdown, stopping network"); }, tracker = &mut tracker_task => { - panic!("tracker exited unexpectedly: {tracker:?}"); + debug!(?tracker, "tracker stopped, shutting down network"); }, router = &mut router_task => { - panic!("router exited unexpectedly: {router:?}"); + debug!(?router, "router stopped, shutting down network"); }, spawner = &mut spawner_task => { - panic!("spawner exited unexpectedly: {spawner:?}"); + debug!(?spawner, "spawner stopped, shutting down network"); }, listener = &mut listener_task => { - panic!("listener exited unexpectedly: {listener:?}"); + debug!(?listener, "listener stopped, shutting down network"); }, dialer = &mut dialer_task => { - panic!("dialer exited unexpectedly: {dialer:?}"); + debug!(?dialer, "dialer stopped, shutting down network"); }, } } diff --git a/storage/src/archive/immutable/mod.rs b/storage/src/archive/immutable/mod.rs index 6db7410f25f..d64090f8da4 100644 --- a/storage/src/archive/immutable/mod.rs +++ b/storage/src/archive/immutable/mod.rs @@ -5,9 +5,9 @@ //! //! # Uniqueness //! -//! [Archive] assumes all stored indexes and keys are unique. If the same key is associated with -//! multiple `indices`, there is no guarantee which value will be returned. If the key is written to -//! an existing `index`, [Archive] will return an error. +//! [Archive] assumes all stored indices are unique. Writing to an occupied index is a no-op. +//! If the same key is associated with multiple indices, there is no guarantee which value will +//! be returned. //! //! # Compression //! diff --git a/storage/src/archive/mod.rs b/storage/src/archive/mod.rs index 8955af09c5b..dc290b1ffb3 100644 --- a/storage/src/archive/mod.rs +++ b/storage/src/archive/mod.rs @@ -1,7 +1,10 @@ //! A write-once key-value store for ordered data. //! -//! [Archive] is a key-value store designed for workloads where all data is written only once and is -//! uniquely associated with both an `index` and a `key`. +//! [Archive] is a key-value store designed for workloads where data is written only once and each +//! item is addressed by both an `index` and a `key`. Workloads with unique indices should use [Archive] +//! and workloads with overlapping indices should use [MultiArchive] (allows all items with the same index +//! to be retrieved). The same key may be stored at multiple indices in either case, and a key lookup may +//! return any of the associated values. use commonware_codec::Codec; use commonware_utils::Array; @@ -39,7 +42,7 @@ pub enum Error { RecordTooLarge, } -/// A write-once key-value store where each key is associated with a unique index. +/// A write-once key-value store addressed by both an index and a key. pub trait Archive: Send { /// The type of the key. type Key: Array; @@ -47,10 +50,12 @@ pub trait Archive: Send { /// The type of the value. type Value: Codec + Send; - /// Store an item in [Archive]. Both indices and keys are assumed to both be globally unique. + /// Store an item in [Archive]. /// - /// If the index already exists, put does nothing and returns. If the same key is stored multiple times - /// at different indices (not recommended), any value associated with the key may be returned. + /// Indices are unique: if the index already exists, put does nothing and returns. Duplicate + /// indices can be stored via [MultiArchive::put_multi]. Keys need not be unique — the same key + /// may be stored at multiple indices, and a subsequent [Archive::get] or [Archive::has] call + /// with an [Identifier::Key] identifier may return any of the values associated with that key. fn put( &mut self, index: u64, @@ -122,8 +127,7 @@ pub trait Archive: Send { /// /// Unlike [Archive::put], which is a no-op when the index already exists, /// [MultiArchive::put_multi] allows storing additional `(key, value)` pairs -/// at an existing index. As with [Archive::put], keys are assumed to be globally -/// unique, but duplicate keys are not rejected. +/// at an existing index. pub trait MultiArchive: Archive { /// Retrieve all values stored at the given index. /// @@ -383,6 +387,74 @@ mod tests { }); } + async fn test_duplicate_key_cross_index_impl( + mut archive: impl Archive, Value = i32>, + ) { + // Store the same key at two different indices; distinct values only so + // the test can observe which entry wins a key lookup. + let key = test_key("dupe-xindex"); + archive.put(2, key.clone(), 20).await.expect("put(2)"); + archive.put(5, key.clone(), 50).await.expect("put(5)"); + + // Both indices must resolve individually. + assert_eq!( + archive.get(Identifier::Index(2)).await.unwrap(), + Some(20), + "Index(2) must resolve to the value stored at 2" + ); + assert_eq!( + archive.get(Identifier::Index(5)).await.unwrap(), + Some(50), + "Index(5) must resolve to the value stored at 5" + ); + + // Key lookup may return either value per the contract; just assert it + // returns one of them and that `has` reports presence. + let got = archive + .get(Identifier::Key(&key)) + .await + .unwrap() + .expect("key lookup must find at least one entry"); + assert!(got == 20 || got == 50, "unexpected value: {got}"); + assert!(archive.has(Identifier::Key(&key)).await.unwrap()); + } + + #[test_traced] + fn test_duplicate_key_cross_index_prunable_no_compression() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let archive = create_prunable(context, None).await; + test_duplicate_key_cross_index_impl(archive).await; + }); + } + + #[test_traced] + fn test_duplicate_key_cross_index_prunable_compression() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let archive = create_prunable(context, Some(3)).await; + test_duplicate_key_cross_index_impl(archive).await; + }); + } + + #[test_traced] + fn test_duplicate_key_cross_index_immutable_no_compression() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let archive = create_immutable(context, None).await; + test_duplicate_key_cross_index_impl(archive).await; + }); + } + + #[test_traced] + fn test_duplicate_key_cross_index_immutable_compression() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let archive = create_immutable(context, Some(3)).await; + test_duplicate_key_cross_index_impl(archive).await; + }); + } + #[test_traced] fn test_duplicate_key_immutable_compression() { let executor = deterministic::Runner::default(); diff --git a/storage/src/archive/prunable/mod.rs b/storage/src/archive/prunable/mod.rs index 1025ab66735..6e2c01465e2 100644 --- a/storage/src/archive/prunable/mod.rs +++ b/storage/src/archive/prunable/mod.rs @@ -30,9 +30,13 @@ //! //! # Uniqueness //! -//! [Archive] assumes all stored indexes and keys are unique. If the same key is associated with -//! multiple `indices`, there is no guarantee which value will be returned. If the key is written to -//! an existing `index`, [Archive] will return an error. +//! Indices are unique for [Archive] and writing to an occupied index is a no-op. Duplicate +//! indices can be stored via [`crate::archive::MultiArchive::put_multi`]. +//! +//! Keys may be stored at multiple indices with either put variant. A lookup by +//! [`crate::archive::Identifier::Key`] may return any of the values at that key. Entries +//! whose index has been pruned are never returned or reported as present, so a key matching +//! both a pruned and a non-pruned entry resolves to the non-pruned entry. //! //! ## Conflicts //! @@ -681,6 +685,63 @@ mod tests { assert_eq!(state1, state2); } + /// Regression: when the same key is stored at multiple indices and the + /// earlier index is pruned, a subsequent `get`/`has` by key must resolve + /// to the surviving, non-pruned entry rather than report the pruned one. + /// Callers such as consensus's marshal cache rely on this to retain a + /// reproposal of the same block at a later index even after the + /// earlier index's retention window closes. + #[test_traced] + fn test_archive_key_lookup_skips_pruned_duplicates() { + let executor = deterministic::Runner::default(); + executor.start(|context| async move { + let cfg = Config { + translator: FourCap, + key_partition: "test-index".into(), + key_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE), + value_partition: "test-value".into(), + codec_config: (), + compression: None, + key_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER), + value_write_buffer: NZUsize!(DEFAULT_WRITE_BUFFER), + replay_buffer: NZUsize!(DEFAULT_REPLAY_BUFFER), + items_per_section: NZU64!(1), + }; + let mut archive = Archive::init(context.clone(), cfg) + .await + .expect("Failed to initialize archive"); + + // Same key stored at two different indices. Distinct values only + // to make it observable which entry wins; a real caller would + // store the same value (e.g. the same block) at both indices. + let key = test_key("dupe-key"); + archive.put(2, key.clone(), 20).await.unwrap(); + archive.put(5, key.clone(), 50).await.unwrap(); + + // Before pruning, either entry is a permitted answer per the + // trait contract. The implementation happens to return the + // earlier index, but we only assert a value is present. + assert!(archive.get(Identifier::Key(&key)).await.unwrap().is_some()); + assert!(archive.has(Identifier::Key(&key)).await.unwrap()); + + // Prune the earlier index (section 2). The later index must be + // the sole surviving answer. + archive.prune(3).await.unwrap(); + let got = archive.get(Identifier::Key(&key)).await.unwrap(); + assert_eq!( + got, + Some(50), + "key lookup must skip the pruned entry and return the surviving one" + ); + assert!(archive.has(Identifier::Key(&key)).await.unwrap()); + + // Prune past the later index too — now nothing survives. + archive.prune(6).await.unwrap(); + assert_eq!(archive.get(Identifier::Key(&key)).await.unwrap(), None); + assert!(!archive.has(Identifier::Key(&key)).await.unwrap()); + }); + } + #[test_traced] fn test_get_all_after_prune() { let executor = deterministic::Runner::default();