Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
393eb38
spike
patrick-ogrady Apr 18, 2026
3748eb9
add certified
patrick-ogrady Apr 19, 2026
1f7f6f5
nits
patrick-ogrady Apr 19, 2026
650af6d
add tests
patrick-ogrady Apr 19, 2026
e1f6db9
fmt
patrick-ogrady Apr 19, 2026
08fa5dc
add regression test for pruning
patrick-ogrady Apr 19, 2026
3425d43
duplicate key
patrick-ogrady Apr 19, 2026
1693203
more tests
patrick-ogrady Apr 19, 2026
f8c29ed
cleanup fix
patrick-ogrady Apr 19, 2026
fc9d8b4
fmt
patrick-ogrady Apr 19, 2026
d38c058
cleanup
patrick-ogrady Apr 19, 2026
0f6c93b
fix docs
patrick-ogrady Apr 19, 2026
2545abe
increase coverage
patrick-ogrady Apr 20, 2026
81c3b60
test cleanup
patrick-ogrady Apr 20, 2026
9889ccb
use debug
patrick-ogrady Apr 20, 2026
b21d855
dedup persistence logic
patrick-ogrady Apr 20, 2026
12f6e2e
nits
patrick-ogrady Apr 20, 2026
162b596
nit
patrick-ogrady Apr 20, 2026
1f37b01
cleanup naming
patrick-ogrady Apr 20, 2026
b5887dd
cleanup comments on stage
patrick-ogrady Apr 20, 2026
962231e
cleanup check test
patrick-ogrady Apr 20, 2026
b440162
revert comment
patrick-ogrady Apr 20, 2026
5c165ae
cleanup comment
patrick-ogrady Apr 20, 2026
0e5ff9c
remove period
patrick-ogrady Apr 20, 2026
2fbbd2e
cleanup comments
patrick-ogrady Apr 20, 2026
91575dd
cleanup
patrick-ogrady Apr 20, 2026
898e58d
nit tests
patrick-ogrady Apr 20, 2026
7892cee
useless tests
patrick-ogrady Apr 20, 2026
e5ac784
nit
patrick-ogrady Apr 20, 2026
284e0ed
collapse helpers
patrick-ogrady Apr 20, 2026
eaa0120
nits
patrick-ogrady Apr 20, 2026
2fa60c4
nits
patrick-ogrady Apr 20, 2026
6e2b9c1
cleanup comments
patrick-ogrady Apr 20, 2026
ce44aea
nit
patrick-ogrady Apr 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 35 additions & 8 deletions consensus/src/marshal/coding/marshaled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,15 @@ use rand::Rng;
use std::sync::{Arc, OnceLock};
use tracing::{debug, warn};

/// Which marshal cache a verified coded block should land in.
#[derive(Clone, Copy, Debug)]
enum CodingPersistMode {
Comment thread
patrick-ogrady marked this conversation as resolved.
Outdated
/// Write to `verified_blocks` via `Mailbox::verified`.
Verified,
/// Write to `notarized_blocks` via `Mailbox::certified`.
Certified,
}

/// The [`CodingConfig`] used for genesis blocks. These blocks are never broadcasted in
/// the proposal phase, and thus the configuration is irrelevant.
const GENESIS_CODING_CONFIG: CodingConfig = CodingConfig {
Expand Down Expand Up @@ -299,6 +308,7 @@ where
consensus_context: Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
commitment: Commitment,
prefetched_block: Option<CodedBlock<B, C, H>>,
persist: CodingPersistMode,
) -> oneshot::Receiver<bool> {
let mut marshal = self.marshal.clone();
let mut application = self.application.clone();
Expand Down Expand Up @@ -424,9 +434,15 @@ where
is_valid = validity_request => is_valid,
};
timer.observe();
if application_valid && !marshal.verified(round, block).await {
debug!(?round, "marshal unable to accept block");
return;
if application_valid {
let persisted = match persist {
CodingPersistMode::Verified => marshal.verified(round, block).await,
CodingPersistMode::Certified => marshal.certified(round, block).await,
};
if !persisted {
debug!(?round, "marshal unable to accept block");
return;
}
}
tx.send_lossy(application_valid);
});
Expand Down Expand Up @@ -779,7 +795,12 @@ where
// Kick off deferred verification early to hide verification latency behind
// shard validity checks and network latency for collecting votes.
let round = consensus_context.round;
let task = self.deferred_verify(consensus_context, payload, None);
let task = self.deferred_verify(
consensus_context,
payload,
None,
CodingPersistMode::Verified,
);
self.verification_tasks.insert(round, payload, task);

match scheme.me() {
Expand Down Expand Up @@ -895,9 +916,10 @@ where
round,
);
if is_reproposal {
// During crash recovery we may call `marshal.verified` twice for
// the same block; the call is idempotent.
if !marshaled.marshal.verified(round, block).await {
// Certifier holds a notarization for this block, so route
// the write to the notarized cache. `certified` is
// idempotent, so crash-recovery double-invocation is safe.
if !marshaled.marshal.certified(round, block).await {
debug!(?round, "marshal unable to accept block");
return;
}
Expand All @@ -916,7 +938,12 @@ where

// Use the block's embedded context for verification, passing the
// prefetched block to avoid fetching it again inside deferred_verify.
let verify_rx = marshaled.deferred_verify(embedded_context, payload, Some(block));
let verify_rx = marshaled.deferred_verify(
embedded_context,
payload,
Some(block),
CodingPersistMode::Certified,
);
if let Ok(result) = verify_rx.await {
tx.send_lossy(result);
}
Expand Down
108 changes: 108 additions & 0 deletions consensus/src/marshal/coding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,114 @@ mod tests {
harness::finalize_same_height_different_views::<CodingHarness>();
}

#[test_traced("WARN")]
fn test_coding_certify_persists_equivocated_block() {
harness::certify_persists_equivocated_block::<CodingHarness>();
}

#[test_traced("WARN")]
fn test_coding_certify_at_later_view_survives_earlier_view_pruning() {
harness::certify_at_later_view_survives_earlier_view_pruning::<CodingHarness>();
}

/// Regression test for issue #167: finalizing a descendant must not
Comment thread
patrick-ogrady marked this conversation as resolved.
Outdated
/// height-prune the shard-engine buffer before `try_repair_gaps` has
/// consumed buffer-only ancestors.
///
/// Places parent (height 1) and descendant (height 2) in the shard engine's
/// reconstructed-block cache via `proposed()`, then reports a finalization
/// for the descendant only. With the fix, the gap repair pass pulls the
/// parent from the buffer and archives it before any height-prune runs. The
/// pre-fix behavior would prune the parent from the buffer inside
/// `store_finalization(descendant)` and leave the finalized chain waiting
/// on a network fetch that never arrives.
#[test_traced("WARN")]
fn test_coding_store_finalization_does_not_prune_buffer_before_repair() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;

let setup = CodingHarness::setup_validator(
context.with_label("validator_0"),
&mut oracle,
participants[0].clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let mut handle = harness::ValidatorHandle::<CodingHarness> {
mailbox: setup.mailbox,
extra: setup.extra,
};

// Build a 2-block chain: parent at height 1, descendant at height 2.
let parent_block = CodingHarness::make_test_block(
Sha256::hash(b""),
CodingHarness::genesis_parent_commitment(NUM_VALIDATORS as u16),
Height::new(1),
1,
NUM_VALIDATORS as u16,
);
let parent_digest = CodingHarness::digest(&parent_block);
let parent_commitment = CodingHarness::commitment(&parent_block);

let descendant_block = CodingHarness::make_test_block(
parent_digest,
parent_commitment,
Height::new(2),
2,
NUM_VALIDATORS as u16,
);
let descendant_commitment = CodingHarness::commitment(&descendant_block);

// Seed the shard engine's reconstructed-block cache with both blocks.
CodingHarness::propose(
&mut handle,
Round::new(Epoch::new(0), View::new(1)),
&parent_block,
)
.await;
CodingHarness::propose(
&mut handle,
Round::new(Epoch::new(0), View::new(2)),
&descendant_block,
)
.await;

// Report finalization for the descendant only. The parent has no
// finalization certificate: it must be archived by walking the
// parent link from the descendant and sourcing the block from the
// shard-engine buffer.
let descendant_proposal = Proposal {
round: Round::new(Epoch::new(0), View::new(2)),
parent: View::new(1),
payload: descendant_commitment,
};
let descendant_finalization =
CodingHarness::make_finalization(descendant_proposal, &schemes, QUORUM);
CodingHarness::report_finalization(&mut handle.mailbox, descendant_finalization).await;

context.sleep(Duration::from_millis(200)).await;

let parent = handle.mailbox.get_block(Height::new(1)).await;
assert!(
parent.is_some(),
"parent must be archived from shard buffer before height-prune evicts it"
);
let descendant = handle.mailbox.get_block(Height::new(2)).await;
assert!(
descendant.is_some(),
"descendant must be archived after finalization"
);
});
}

#[test_traced("WARN")]
fn test_coding_init_processed_height() {
harness::init_processed_height::<CodingHarness>();
Expand Down
16 changes: 10 additions & 6 deletions consensus/src/marshal/coding/shards/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1017,13 +1017,17 @@ where

/// Prunes all blocks in the reconstructed block cache that are older than the block
/// with the given commitment. Also cleans up stale reconstruction state
/// and subscriptions.
/// Prune reconstruction state at or below the given commitment's height,
Comment thread
patrick-ogrady marked this conversation as resolved.
Outdated
/// alongside the per-proposal state and subscriptions for that round and
/// below.
///
/// This is the only place reconstruction state is pruned by round. We
/// intentionally avoid pruning on reconstruction success because a
/// Byzantine leader can equivocate, producing multiple valid commitments
/// in the same round. Both must remain recoverable until finalization
/// determines which one is canonical.
/// Called from marshal's application-ack path, which guarantees every
/// finalized block at or below `through` has already been archived,
/// synced, delivered, and acknowledged. Gap repair for those heights has
/// completed, so eviction here cannot race ahead of any repair lookup —
/// cascading by height safely sweeps both canonical ancestors (now only
/// reached via the finalized archive) and stale or equivocated blocks at
/// the same or lower heights.
fn prune(&mut self, through: Commitment) {
if let Some(height) = self.reconstructed_blocks.get(&through).map(|b| b.height()) {
self.reconstructed_blocks
Expand Down
43 changes: 19 additions & 24 deletions consensus/src/marshal/core/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,14 @@ where
// Apply in-memory progress updates for this acknowledged block.
self.handle_block_processed(height, commitment, &mut resolver)
.await;
// The application has archived, synced, delivered
// and acknowledged this block: gap repair is done
// for its height, so variant-specific buffer
// cleanup can safely cascade (e.g. the coding
// shard engine evicts this commitment along with
// any stale/equivocated blocks at the same or
// lower heights).
buffer.finalized(commitment).await;
Comment thread
patrick-ogrady marked this conversation as resolved.
Outdated
}
Err(e) => {
// Ack failures are fatal for marshal/application coordination.
Expand Down Expand Up @@ -543,6 +551,10 @@ where
self.cache_verified(round, block.digest(), block).await;
ack.send_lossy(());
}
Message::Certified { round, block, ack } => {
self.cache_block(round, block.digest(), block).await;
ack.send_lossy(());
}
Message::Notarization { notarization } => {
let round = notarization.round();
let commitment = notarization.proposal.payload;
Expand Down Expand Up @@ -588,7 +600,6 @@ where
block,
Some(finalization),
&mut application,
&mut buffer,
)
.await
{
Expand Down Expand Up @@ -758,7 +769,6 @@ where
response,
&mut delivers,
&mut application,
&mut buffer,
)
.await;
}
Expand All @@ -767,7 +777,7 @@ where

// Batch verify and process all delivers.
needs_sync |= self
.verify_delivered(delivers, &mut application, &mut buffer)
.verify_delivered(delivers, &mut application)
.await;

// Attempt to fill gaps before handling produce requests (so we
Expand Down Expand Up @@ -923,14 +933,13 @@ where
/// immediately. Finalized/Notarized delivers are parsed and structurally
/// validated, then collected into `delivers` for batch certificate verification.
/// Returns true if finalization archives were written and need syncing.
async fn handle_deliver<Buf: Buffer<V>>(
async fn handle_deliver(
&mut self,
key: Request<V::Commitment>,
value: Bytes,
response: oneshot::Sender<bool>,
delivers: &mut Vec<PendingVerification<P::Scheme, V>>,
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
buffer: &mut Buf,
) -> bool {
match key {
Request::Block(commitment) => {
Expand All @@ -949,7 +958,7 @@ where
let digest = block.digest();
let finalization = self.cache.get_finalization_for(digest).await;
let wrote = self
.store_finalization(height, digest, block, finalization, application, buffer)
.store_finalization(height, digest, block, finalization, application)
.await;
debug!(?digest, %height, "received block");
response.send_lossy(true); // if a valid block is received, we should still send true (even if it was stale)
Expand Down Expand Up @@ -1045,11 +1054,10 @@ where

/// Batch verify pending certificates and process valid items. Returns true
/// if finalization archives were written and need syncing.
async fn verify_delivered<Buf: Buffer<V>>(
async fn verify_delivered(
&mut self,
mut delivers: Vec<PendingVerification<P::Scheme, V>>,
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
buffer: &mut Buf,
) -> bool {
if delivers.is_empty() {
return false;
Expand Down Expand Up @@ -1132,14 +1140,7 @@ where
debug!(?round, %height, "received finalization");

wrote |= self
.store_finalization(
height,
digest,
block,
Some(finalization),
application,
buffer,
)
.store_finalization(height, digest, block, Some(finalization), application)
.await;
}
PendingVerification::Notarized {
Expand Down Expand Up @@ -1169,7 +1170,6 @@ where
block.clone(),
Some(finalization),
application,
buffer,
)
.await;
}
Expand Down Expand Up @@ -1414,14 +1414,13 @@ where
/// `select_loop!` so that archive data is durable before the ack handler
/// advances `last_processed_height`. See [`Self::try_dispatch_blocks`] for the
/// crash safety invariant.
async fn store_finalization<Buf: Buffer<V>>(
async fn store_finalization(
&mut self,
height: Height,
digest: <V::Block as Digestible>::Digest,
block: V::Block,
finalization: Option<Finalization<P::Scheme, V::Commitment>>,
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
buffer: &mut Buf,
) -> bool {
// Blocks below the last processed height are not useful to us, so we ignore them (this
// has the nice byproduct of ensuring we don't call a backing store with a block below the
Expand All @@ -1438,7 +1437,6 @@ where
self.notify_subscribers(&block);

// Convert block to storage format
let commitment = V::commitment(&block);
let stored: V::StoredBlock = block.into();
let round = finalization.as_ref().map(|f| f.round());

Expand All @@ -1463,13 +1461,12 @@ where
panic!("failed to finalize: {e}");
}

// Update metrics, buffer, and application
// Update metrics and application.
if let Some(round) = round.filter(|_| height > self.tip) {
application.report(Update::Tip(round, height, digest)).await;
self.tip = height;
let _ = self.finalized_height.try_set(height.get());
}
buffer.finalized(commitment).await;

true
}
Expand Down Expand Up @@ -1592,7 +1589,6 @@ where
block,
Some(finalization),
application,
buffer,
)
.await;
} else {
Expand Down Expand Up @@ -1638,7 +1634,6 @@ where
block.clone(),
finalization,
application,
buffer,
)
.await;
debug!(height = %block.height(), "repaired block");
Expand Down
Loading
Loading