Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
393eb38
spike
patrick-ogrady Apr 18, 2026
3748eb9
add certified
patrick-ogrady Apr 19, 2026
1f7f6f5
nits
patrick-ogrady Apr 19, 2026
650af6d
add tests
patrick-ogrady Apr 19, 2026
e1f6db9
fmt
patrick-ogrady Apr 19, 2026
08fa5dc
add regression test for pruning
patrick-ogrady Apr 19, 2026
3425d43
duplicate key
patrick-ogrady Apr 19, 2026
1693203
more tests
patrick-ogrady Apr 19, 2026
f8c29ed
cleanup fix
patrick-ogrady Apr 19, 2026
fc9d8b4
fmt
patrick-ogrady Apr 19, 2026
d38c058
cleanup
patrick-ogrady Apr 19, 2026
0f6c93b
fix docs
patrick-ogrady Apr 19, 2026
2545abe
increase coverage
patrick-ogrady Apr 20, 2026
81c3b60
test cleanup
patrick-ogrady Apr 20, 2026
9889ccb
use debug
patrick-ogrady Apr 20, 2026
b21d855
dedup persistence logic
patrick-ogrady Apr 20, 2026
12f6e2e
nits
patrick-ogrady Apr 20, 2026
162b596
nit
patrick-ogrady Apr 20, 2026
1f37b01
cleanup naming
patrick-ogrady Apr 20, 2026
b5887dd
cleanup comments on stage
patrick-ogrady Apr 20, 2026
962231e
cleanup check test
patrick-ogrady Apr 20, 2026
b440162
revert comment
patrick-ogrady Apr 20, 2026
5c165ae
cleanup comment
patrick-ogrady Apr 20, 2026
0e5ff9c
remove period
patrick-ogrady Apr 20, 2026
2fbbd2e
cleanup comments
patrick-ogrady Apr 20, 2026
91575dd
cleanup
patrick-ogrady Apr 20, 2026
898e58d
nit tests
patrick-ogrady Apr 20, 2026
7892cee
useless tests
patrick-ogrady Apr 20, 2026
e5ac784
nit
patrick-ogrady Apr 20, 2026
284e0ed
collapse helpers
patrick-ogrady Apr 20, 2026
eaa0120
nits
patrick-ogrady Apr 20, 2026
2fa60c4
nits
patrick-ogrady Apr 20, 2026
6e2b9c1
cleanup comments
patrick-ogrady Apr 20, 2026
ce44aea
nit
patrick-ogrady Apr 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion consensus/src/marshal/application/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,44 @@
//! This module centralizes pure invariant checks shared across marshal verification
//! and certification flows.

use crate::types::{Epoch, Epocher, Height, Round};
use crate::{
marshal::core::{Mailbox, Variant},
types::{Epoch, Epocher, Height, Round},
};
use commonware_cryptography::certificate::Scheme;
use commonware_utils::sync::Mutex;
use std::sync::Arc;

/// Cache for the last block built during proposal, shared between the
/// proposer task and the broadcast path.
pub(crate) type LastBuilt<B> = Arc<Mutex<Option<(Round, B)>>>;

/// Which stage of verification a block has reached.
///
/// This is used to determine which marshal cache a block should be stored in.
#[derive(Clone, Copy, Debug)]
pub(crate) enum Stage {
/// The block has been verified (store in `verified_blocks`).
Verified,
/// The block has been certified (store in `notarized_blocks`).
Certified,
Comment thread
patrick-ogrady marked this conversation as resolved.
}

impl Stage {
/// Store `block` in the marshal cache for the provided stage.
pub(crate) async fn store<S: Scheme, V: Variant>(
self,
marshal: &mut Mailbox<S, V>,
round: Round,
block: V::Block,
) -> bool {
match self {
Self::Verified => marshal.verified(round, block).await,
Self::Certified => marshal.certified(round, block).await,
}
}
}

/// Returns true if the block is at an epoch boundary (last block in its epoch).
#[inline]
fn is_at_epoch_boundary<ES: Epocher>(epocher: &ES, block_height: Height, epoch: Epoch) -> bool {
Expand Down
111 changes: 111 additions & 0 deletions consensus/src/marshal/application/verification_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,114 @@ where
.retain(|(task_round, _), _| task_round > finalized_round);
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::types::{Epoch, View};
use commonware_cryptography::{sha256::Digest as Sha256Digest, Hasher, Sha256};

type D = Sha256Digest;

fn round(view: u64) -> Round {
Round::new(Epoch::zero(), View::new(view))
}

fn pending_task() -> oneshot::Receiver<bool> {
let (_tx, rx) = oneshot::channel();
rx
}

#[test]
fn test_insert_and_take_returns_task() {
let tasks = VerificationTasks::<D>::new();
let digest = Sha256::hash(b"block");
tasks.insert(round(1), digest, pending_task());

assert!(tasks.take(round(1), digest).is_some());
assert!(
tasks.take(round(1), digest).is_none(),
"taking twice should yield None"
);
}

#[test]
fn test_take_absent_key_is_none() {
let tasks = VerificationTasks::<D>::new();
assert!(tasks.take(round(1), Sha256::hash(b"missing")).is_none());
}

#[test]
fn test_take_distinguishes_rounds_and_digests() {
let tasks = VerificationTasks::<D>::new();
let digest_a = Sha256::hash(b"a");
let digest_b = Sha256::hash(b"b");
tasks.insert(round(1), digest_a, pending_task());
tasks.insert(round(2), digest_a, pending_task());
tasks.insert(round(1), digest_b, pending_task());

assert!(tasks.take(round(1), digest_a).is_some());
assert!(tasks.take(round(2), digest_a).is_some());
assert!(tasks.take(round(1), digest_b).is_some());
}

#[test]
fn test_retain_after_drops_at_and_below_boundary() {
let tasks = VerificationTasks::<D>::new();
let digest = Sha256::hash(b"block");
tasks.insert(round(1), digest, pending_task());
tasks.insert(round(2), digest, pending_task());
tasks.insert(round(3), digest, pending_task());

tasks.retain_after(&round(2));

assert!(
tasks.take(round(1), digest).is_none(),
"tasks strictly below boundary should be dropped"
);
assert!(
tasks.take(round(2), digest).is_none(),
"tasks at boundary should be dropped"
);
assert!(
tasks.take(round(3), digest).is_some(),
"tasks strictly above boundary should be retained"
);
}

#[test]
fn test_retain_after_spans_epochs() {
let tasks = VerificationTasks::<D>::new();
let digest = Sha256::hash(b"block");
let early = Round::new(Epoch::zero(), View::new(100));
let late = Round::new(Epoch::new(1), View::zero());
tasks.insert(early, digest, pending_task());
tasks.insert(late, digest, pending_task());

tasks.retain_after(&early);

assert!(
tasks.take(early, digest).is_none(),
"task at boundary must be dropped"
);
assert!(
tasks.take(late, digest).is_some(),
"task in later epoch must outlive an earlier boundary"
);
}

#[test]
fn test_retain_after_empty_map_is_noop() {
let tasks = VerificationTasks::<D>::new();
tasks.retain_after(&round(5));
assert!(tasks.take(round(5), Sha256::hash(b"x")).is_none());
}

#[test]
fn test_default_matches_new() {
let default = <VerificationTasks<D> as Default>::default();
let digest = Sha256::hash(b"block");
default.insert(round(1), digest, pending_task());
assert!(default.take(round(1), digest).is_some());
}
}
21 changes: 14 additions & 7 deletions consensus/src/marshal/coding/marshaled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ use crate::{
ancestry::AncestorStream,
application::{
validation::{
is_inferred_reproposal_at_certify, is_valid_reproposal_at_verify, LastBuilt,
is_inferred_reproposal_at_certify, is_valid_reproposal_at_verify, LastBuilt, Stage,
},
verification_tasks::VerificationTasks,
},
Expand Down Expand Up @@ -299,6 +299,7 @@ where
consensus_context: Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
commitment: Commitment,
prefetched_block: Option<CodedBlock<B, C, H>>,
stage: Stage,
) -> oneshot::Receiver<bool> {
let mut marshal = self.marshal.clone();
let mut application = self.application.clone();
Expand Down Expand Up @@ -424,7 +425,7 @@ where
is_valid = validity_request => is_valid,
};
timer.observe();
if application_valid && !marshal.verified(round, block).await {
if application_valid && !stage.store(&mut marshal, round, block).await {
debug!(?round, "marshal unable to accept block");
return;
}
Expand Down Expand Up @@ -779,7 +780,7 @@ where
// Kick off deferred verification early to hide verification latency behind
// shard validity checks and network latency for collecting votes.
let round = consensus_context.round;
let task = self.deferred_verify(consensus_context, payload, None);
let task = self.deferred_verify(consensus_context, payload, None, Stage::Verified);
self.verification_tasks.insert(round, payload, task);

match scheme.me() {
Expand Down Expand Up @@ -895,9 +896,10 @@ where
round,
);
if is_reproposal {
// During crash recovery we may call `marshal.verified` twice for
// the same block; the call is idempotent.
if !marshaled.marshal.verified(round, block).await {
// Certifier holds a notarization for this block, so route
// the write to the notarized cache. `certified` is
// idempotent, so crash-recovery double-invocation is safe.
if !marshaled.marshal.certified(round, block).await {
debug!(?round, "marshal unable to accept block");
return;
}
Expand All @@ -916,7 +918,12 @@ where

// Use the block's embedded context for verification, passing the
// prefetched block to avoid fetching it again inside deferred_verify.
let verify_rx = marshaled.deferred_verify(embedded_context, payload, Some(block));
let verify_rx = marshaled.deferred_verify(
embedded_context,
payload,
Some(block),
Stage::Certified,
);
if let Ok(result) = verify_rx.await {
tx.send_lossy(result);
}
Expand Down
103 changes: 103 additions & 0 deletions consensus/src/marshal/coding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,109 @@ mod tests {
harness::finalize_same_height_different_views::<CodingHarness>();
}

#[test_traced("WARN")]
fn test_coding_certify_persists_equivocated_block() {
harness::certify_persists_equivocated_block::<CodingHarness>();
}

#[test_traced("WARN")]
fn test_coding_certify_at_later_view_survives_earlier_view_pruning() {
harness::certify_at_later_view_survives_earlier_view_pruning::<CodingHarness>();
}

/// Finalizing a descendant must not height-prune the shard-engine buffer before
/// `try_repair_gaps` has consumed buffer-only ancestors.
///
/// Places parent (height 1) and descendant (height 2) in the shard engine's
/// reconstructed-block cache via `proposed()`, then reports a finalization
/// for the descendant only.
#[test_traced("WARN")]
fn test_coding_store_finalization_does_not_prune_buffer_before_repair() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;

let setup = CodingHarness::setup_validator(
context.with_label("validator_0"),
&mut oracle,
participants[0].clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let mut handle = harness::ValidatorHandle::<CodingHarness> {
mailbox: setup.mailbox,
extra: setup.extra,
};

// Build a 2-block chain: parent at height 1, descendant at height 2.
let parent_block = CodingHarness::make_test_block(
Sha256::hash(b""),
CodingHarness::genesis_parent_commitment(NUM_VALIDATORS as u16),
Height::new(1),
1,
NUM_VALIDATORS as u16,
);
let parent_digest = CodingHarness::digest(&parent_block);
let parent_commitment = CodingHarness::commitment(&parent_block);

let descendant_block = CodingHarness::make_test_block(
parent_digest,
parent_commitment,
Height::new(2),
2,
NUM_VALIDATORS as u16,
);
let descendant_commitment = CodingHarness::commitment(&descendant_block);

// Seed the shard engine's reconstructed-block cache with both blocks.
CodingHarness::propose(
&mut handle,
Round::new(Epoch::new(0), View::new(1)),
&parent_block,
)
.await;
CodingHarness::propose(
&mut handle,
Round::new(Epoch::new(0), View::new(2)),
&descendant_block,
)
.await;

// Report finalization for the descendant only. The parent has no
// finalization certificate: it must be archived by walking the
// parent link from the descendant and sourcing the block from the
// shard-engine buffer.
let descendant_proposal = Proposal {
round: Round::new(Epoch::new(0), View::new(2)),
parent: View::new(1),
payload: descendant_commitment,
};
let descendant_finalization =
CodingHarness::make_finalization(descendant_proposal, &schemes, QUORUM);
CodingHarness::report_finalization(&mut handle.mailbox, descendant_finalization).await;

// Wait until the descendant is archived: that proves finalization processing
// has completed, at which point the parent must already have been repaired
// from the shard buffer.
while handle.mailbox.get_block(Height::new(2)).await.is_none() {
context.sleep(Duration::from_millis(10)).await;
}

let parent = handle.mailbox.get_block(Height::new(1)).await;
assert!(
parent.is_some(),
"parent must be archived from shard buffer before height-prune evicts it"
);
});
}

#[test_traced("WARN")]
fn test_coding_init_processed_height() {
harness::init_processed_height::<CodingHarness>();
Expand Down
Loading
Loading