Skip to content
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
393eb38
spike
patrick-ogrady Apr 18, 2026
3748eb9
add certified
patrick-ogrady Apr 19, 2026
1f7f6f5
nits
patrick-ogrady Apr 19, 2026
650af6d
add tests
patrick-ogrady Apr 19, 2026
e1f6db9
fmt
patrick-ogrady Apr 19, 2026
08fa5dc
add regression test for pruning
patrick-ogrady Apr 19, 2026
3425d43
duplicate key
patrick-ogrady Apr 19, 2026
1693203
more tests
patrick-ogrady Apr 19, 2026
f8c29ed
cleanup fix
patrick-ogrady Apr 19, 2026
fc9d8b4
fmt
patrick-ogrady Apr 19, 2026
d38c058
cleanup
patrick-ogrady Apr 19, 2026
0f6c93b
fix docs
patrick-ogrady Apr 19, 2026
2545abe
increase coverage
patrick-ogrady Apr 20, 2026
81c3b60
test cleanup
patrick-ogrady Apr 20, 2026
9889ccb
use debug
patrick-ogrady Apr 20, 2026
b21d855
dedup persistence logic
patrick-ogrady Apr 20, 2026
12f6e2e
nits
patrick-ogrady Apr 20, 2026
162b596
nit
patrick-ogrady Apr 20, 2026
1f37b01
cleanup naming
patrick-ogrady Apr 20, 2026
b5887dd
cleanup comments on stage
patrick-ogrady Apr 20, 2026
962231e
cleanup check test
patrick-ogrady Apr 20, 2026
b440162
revert comment
patrick-ogrady Apr 20, 2026
5c165ae
cleanup comment
patrick-ogrady Apr 20, 2026
0e5ff9c
remove period
patrick-ogrady Apr 20, 2026
2fbbd2e
cleanup comments
patrick-ogrady Apr 20, 2026
91575dd
cleanup
patrick-ogrady Apr 20, 2026
898e58d
nit tests
patrick-ogrady Apr 20, 2026
7892cee
useless tests
patrick-ogrady Apr 20, 2026
e5ac784
nit
patrick-ogrady Apr 20, 2026
284e0ed
collapse helpers
patrick-ogrady Apr 20, 2026
eaa0120
nits
patrick-ogrady Apr 20, 2026
2fa60c4
nits
patrick-ogrady Apr 20, 2026
6e2b9c1
cleanup comments
patrick-ogrady Apr 20, 2026
ce44aea
nit
patrick-ogrady Apr 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 111 additions & 0 deletions consensus/src/marshal/application/verification_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,114 @@ where
.retain(|(task_round, _), _| task_round > finalized_round);
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::types::{Epoch, View};
use commonware_cryptography::{sha256::Digest as Sha256Digest, Hasher, Sha256};

type D = Sha256Digest;

fn round(view: u64) -> Round {
Round::new(Epoch::zero(), View::new(view))
}

fn pending_task() -> oneshot::Receiver<bool> {
let (_tx, rx) = oneshot::channel();
rx
}

#[test]
fn test_insert_and_take_returns_task() {
let tasks = VerificationTasks::<D>::new();
let digest = Sha256::hash(b"block");
tasks.insert(round(1), digest, pending_task());

assert!(tasks.take(round(1), digest).is_some());
assert!(
tasks.take(round(1), digest).is_none(),
"taking twice should yield None"
);
}

#[test]
fn test_take_absent_key_is_none() {
let tasks = VerificationTasks::<D>::new();
assert!(tasks.take(round(1), Sha256::hash(b"missing")).is_none());
}

#[test]
fn test_take_distinguishes_rounds_and_digests() {
let tasks = VerificationTasks::<D>::new();
let digest_a = Sha256::hash(b"a");
let digest_b = Sha256::hash(b"b");
tasks.insert(round(1), digest_a, pending_task());
tasks.insert(round(2), digest_a, pending_task());
tasks.insert(round(1), digest_b, pending_task());

assert!(tasks.take(round(1), digest_a).is_some());
assert!(tasks.take(round(2), digest_a).is_some());
assert!(tasks.take(round(1), digest_b).is_some());
}

#[test]
fn test_retain_after_drops_at_and_below_boundary() {
let tasks = VerificationTasks::<D>::new();
let digest = Sha256::hash(b"block");
tasks.insert(round(1), digest, pending_task());
tasks.insert(round(2), digest, pending_task());
tasks.insert(round(3), digest, pending_task());

tasks.retain_after(&round(2));

assert!(
tasks.take(round(1), digest).is_none(),
"tasks strictly below boundary should be dropped"
);
assert!(
tasks.take(round(2), digest).is_none(),
"tasks at boundary should be dropped"
);
assert!(
tasks.take(round(3), digest).is_some(),
"tasks strictly above boundary should be retained"
);
}

#[test]
fn test_retain_after_spans_epochs() {
let tasks = VerificationTasks::<D>::new();
let digest = Sha256::hash(b"block");
let early = Round::new(Epoch::zero(), View::new(100));
let late = Round::new(Epoch::new(1), View::zero());
tasks.insert(early, digest, pending_task());
tasks.insert(late, digest, pending_task());

tasks.retain_after(&early);

assert!(
tasks.take(early, digest).is_none(),
"task at boundary must be dropped"
);
assert!(
tasks.take(late, digest).is_some(),
"task in later epoch must outlive an earlier boundary"
);
}

#[test]
fn test_retain_after_empty_map_is_noop() {
let tasks = VerificationTasks::<D>::new();
tasks.retain_after(&round(5));
assert!(tasks.take(round(5), Sha256::hash(b"x")).is_none());
}

#[test]
fn test_default_matches_new() {
let default = <VerificationTasks<D> as Default>::default();
let digest = Sha256::hash(b"block");
default.insert(round(1), digest, pending_task());
assert!(default.take(round(1), digest).is_some());
}
}
43 changes: 35 additions & 8 deletions consensus/src/marshal/coding/marshaled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,15 @@ use rand::Rng;
use std::sync::{Arc, OnceLock};
use tracing::{debug, warn};

/// Which marshal cache a verified coded block should land in.
#[derive(Clone, Copy, Debug)]
enum CodingPersistMode {
Comment thread
patrick-ogrady marked this conversation as resolved.
Outdated
/// Write to `verified_blocks` via `Mailbox::verified`.
Verified,
/// Write to `notarized_blocks` via `Mailbox::certified`.
Certified,
}

/// The [`CodingConfig`] used for genesis blocks. These blocks are never broadcasted in
/// the proposal phase, and thus the configuration is irrelevant.
const GENESIS_CODING_CONFIG: CodingConfig = CodingConfig {
Expand Down Expand Up @@ -299,6 +308,7 @@ where
consensus_context: Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
commitment: Commitment,
prefetched_block: Option<CodedBlock<B, C, H>>,
persist: CodingPersistMode,
) -> oneshot::Receiver<bool> {
let mut marshal = self.marshal.clone();
let mut application = self.application.clone();
Expand Down Expand Up @@ -424,9 +434,15 @@ where
is_valid = validity_request => is_valid,
};
timer.observe();
if application_valid && !marshal.verified(round, block).await {
debug!(?round, "marshal unable to accept block");
return;
if application_valid {
let persisted = match persist {
CodingPersistMode::Verified => marshal.verified(round, block).await,
CodingPersistMode::Certified => marshal.certified(round, block).await,
};
if !persisted {
debug!(?round, "marshal unable to accept block");
return;
}
}
tx.send_lossy(application_valid);
});
Expand Down Expand Up @@ -779,7 +795,12 @@ where
// Kick off deferred verification early to hide verification latency behind
// shard validity checks and network latency for collecting votes.
let round = consensus_context.round;
let task = self.deferred_verify(consensus_context, payload, None);
let task = self.deferred_verify(
consensus_context,
payload,
None,
CodingPersistMode::Verified,
);
self.verification_tasks.insert(round, payload, task);

match scheme.me() {
Expand Down Expand Up @@ -895,9 +916,10 @@ where
round,
);
if is_reproposal {
// During crash recovery we may call `marshal.verified` twice for
// the same block; the call is idempotent.
if !marshaled.marshal.verified(round, block).await {
// Certifier holds a notarization for this block, so route
// the write to the notarized cache. `certified` is
// idempotent, so crash-recovery double-invocation is safe.
if !marshaled.marshal.certified(round, block).await {
debug!(?round, "marshal unable to accept block");
return;
}
Expand All @@ -916,7 +938,12 @@ where

// Use the block's embedded context for verification, passing the
// prefetched block to avoid fetching it again inside deferred_verify.
let verify_rx = marshaled.deferred_verify(embedded_context, payload, Some(block));
let verify_rx = marshaled.deferred_verify(
embedded_context,
payload,
Some(block),
CodingPersistMode::Certified,
);
if let Ok(result) = verify_rx.await {
tx.send_lossy(result);
}
Expand Down
108 changes: 108 additions & 0 deletions consensus/src/marshal/coding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,114 @@ mod tests {
harness::finalize_same_height_different_views::<CodingHarness>();
}

#[test_traced("WARN")]
fn test_coding_certify_persists_equivocated_block() {
harness::certify_persists_equivocated_block::<CodingHarness>();
}

#[test_traced("WARN")]
fn test_coding_certify_at_later_view_survives_earlier_view_pruning() {
harness::certify_at_later_view_survives_earlier_view_pruning::<CodingHarness>();
}

/// Regression test for issue #167: finalizing a descendant must not
Comment thread
patrick-ogrady marked this conversation as resolved.
Outdated
/// height-prune the shard-engine buffer before `try_repair_gaps` has
/// consumed buffer-only ancestors.
///
/// Places parent (height 1) and descendant (height 2) in the shard engine's
/// reconstructed-block cache via `proposed()`, then reports a finalization
/// for the descendant only. With the fix, the gap repair pass pulls the
/// parent from the buffer and archives it before any height-prune runs. The
/// pre-fix behavior would prune the parent from the buffer inside
/// `store_finalization(descendant)` and leave the finalized chain waiting
/// on a network fetch that never arrives.
#[test_traced("WARN")]
fn test_coding_store_finalization_does_not_prune_buffer_before_repair() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;

let setup = CodingHarness::setup_validator(
context.with_label("validator_0"),
&mut oracle,
participants[0].clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let mut handle = harness::ValidatorHandle::<CodingHarness> {
mailbox: setup.mailbox,
extra: setup.extra,
};

// Build a 2-block chain: parent at height 1, descendant at height 2.
let parent_block = CodingHarness::make_test_block(
Sha256::hash(b""),
CodingHarness::genesis_parent_commitment(NUM_VALIDATORS as u16),
Height::new(1),
1,
NUM_VALIDATORS as u16,
);
let parent_digest = CodingHarness::digest(&parent_block);
let parent_commitment = CodingHarness::commitment(&parent_block);

let descendant_block = CodingHarness::make_test_block(
parent_digest,
parent_commitment,
Height::new(2),
2,
NUM_VALIDATORS as u16,
);
let descendant_commitment = CodingHarness::commitment(&descendant_block);

// Seed the shard engine's reconstructed-block cache with both blocks.
CodingHarness::propose(
&mut handle,
Round::new(Epoch::new(0), View::new(1)),
&parent_block,
)
.await;
CodingHarness::propose(
&mut handle,
Round::new(Epoch::new(0), View::new(2)),
&descendant_block,
)
.await;

// Report finalization for the descendant only. The parent has no
// finalization certificate: it must be archived by walking the
// parent link from the descendant and sourcing the block from the
// shard-engine buffer.
let descendant_proposal = Proposal {
round: Round::new(Epoch::new(0), View::new(2)),
parent: View::new(1),
payload: descendant_commitment,
};
let descendant_finalization =
CodingHarness::make_finalization(descendant_proposal, &schemes, QUORUM);
CodingHarness::report_finalization(&mut handle.mailbox, descendant_finalization).await;

context.sleep(Duration::from_millis(200)).await;

let parent = handle.mailbox.get_block(Height::new(1)).await;
assert!(
parent.is_some(),
"parent must be archived from shard buffer before height-prune evicts it"
);
let descendant = handle.mailbox.get_block(Height::new(2)).await;
assert!(
descendant.is_some(),
"descendant must be archived after finalization"
);
});
}

#[test_traced("WARN")]
fn test_coding_init_processed_height() {
harness::init_processed_height::<CodingHarness>();
Expand Down
16 changes: 10 additions & 6 deletions consensus/src/marshal/coding/shards/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1017,13 +1017,17 @@ where

/// Prunes all blocks in the reconstructed block cache that are older than the block
/// with the given commitment. Also cleans up stale reconstruction state
/// and subscriptions.
/// Prune reconstruction state at or below the given commitment's height,
Comment thread
patrick-ogrady marked this conversation as resolved.
Outdated
/// alongside the per-proposal state and subscriptions for that round and
/// below.
///
/// This is the only place reconstruction state is pruned by round. We
/// intentionally avoid pruning on reconstruction success because a
/// Byzantine leader can equivocate, producing multiple valid commitments
/// in the same round. Both must remain recoverable until finalization
/// determines which one is canonical.
/// Called from marshal's application-ack path, which guarantees every
/// finalized block at or below `through` has already been archived,
/// synced, delivered, and acknowledged. Gap repair for those heights has
/// completed, so eviction here cannot race ahead of any repair lookup —
/// cascading by height safely sweeps both canonical ancestors (now only
/// reached via the finalized archive) and stale or equivocated blocks at
/// the same or lower heights.
fn prune(&mut self, through: Commitment) {
if let Some(height) = self.reconstructed_blocks.get(&through).map(|b| b.height()) {
self.reconstructed_blocks
Expand Down
Loading
Loading