Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
393eb38
spike
patrick-ogrady Apr 18, 2026
3748eb9
add certified
patrick-ogrady Apr 19, 2026
1f7f6f5
nits
patrick-ogrady Apr 19, 2026
650af6d
add tests
patrick-ogrady Apr 19, 2026
e1f6db9
fmt
patrick-ogrady Apr 19, 2026
08fa5dc
add regression test for pruning
patrick-ogrady Apr 19, 2026
3425d43
duplicate key
patrick-ogrady Apr 19, 2026
1693203
more tests
patrick-ogrady Apr 19, 2026
f8c29ed
cleanup fix
patrick-ogrady Apr 19, 2026
fc9d8b4
fmt
patrick-ogrady Apr 19, 2026
d38c058
cleanup
patrick-ogrady Apr 19, 2026
0f6c93b
fix docs
patrick-ogrady Apr 19, 2026
2545abe
increase coverage
patrick-ogrady Apr 20, 2026
81c3b60
test cleanup
patrick-ogrady Apr 20, 2026
9889ccb
use debug
patrick-ogrady Apr 20, 2026
b21d855
dedup persistence logic
patrick-ogrady Apr 20, 2026
12f6e2e
nits
patrick-ogrady Apr 20, 2026
162b596
nit
patrick-ogrady Apr 20, 2026
1f37b01
cleanup naming
patrick-ogrady Apr 20, 2026
b5887dd
cleanup comments on stage
patrick-ogrady Apr 20, 2026
962231e
cleanup check test
patrick-ogrady Apr 20, 2026
b440162
revert comment
patrick-ogrady Apr 20, 2026
5c165ae
cleanup comment
patrick-ogrady Apr 20, 2026
0e5ff9c
remove period
patrick-ogrady Apr 20, 2026
2fbbd2e
cleanup comments
patrick-ogrady Apr 20, 2026
91575dd
cleanup
patrick-ogrady Apr 20, 2026
898e58d
nit tests
patrick-ogrady Apr 20, 2026
7892cee
useless tests
patrick-ogrady Apr 20, 2026
e5ac784
nit
patrick-ogrady Apr 20, 2026
284e0ed
collapse helpers
patrick-ogrady Apr 20, 2026
eaa0120
nits
patrick-ogrady Apr 20, 2026
2fa60c4
nits
patrick-ogrady Apr 20, 2026
6e2b9c1
cleanup comments
patrick-ogrady Apr 20, 2026
ce44aea
nit
patrick-ogrady Apr 20, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 35 additions & 8 deletions consensus/src/marshal/coding/marshaled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,15 @@ use rand::Rng;
use std::sync::{Arc, OnceLock};
use tracing::{debug, warn};

/// Which marshal cache a verified coded block should land in.
#[derive(Clone, Copy, Debug)]
enum CodingPersistMode {
Comment thread
patrick-ogrady marked this conversation as resolved.
Outdated
/// Write to `verified_blocks` via `Mailbox::verified`.
Verified,
/// Write to `notarized_blocks` via `Mailbox::certified`.
Certified,
}

/// The [`CodingConfig`] used for genesis blocks. These blocks are never broadcasted in
/// the proposal phase, and thus the configuration is irrelevant.
const GENESIS_CODING_CONFIG: CodingConfig = CodingConfig {
Expand Down Expand Up @@ -299,6 +308,7 @@ where
consensus_context: Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
commitment: Commitment,
prefetched_block: Option<CodedBlock<B, C, H>>,
persist: CodingPersistMode,
) -> oneshot::Receiver<bool> {
let mut marshal = self.marshal.clone();
let mut application = self.application.clone();
Expand Down Expand Up @@ -424,9 +434,15 @@ where
is_valid = validity_request => is_valid,
};
timer.observe();
if application_valid && !marshal.verified(round, block).await {
debug!(?round, "marshal unable to accept block");
return;
if application_valid {
let persisted = match persist {
CodingPersistMode::Verified => marshal.verified(round, block).await,
CodingPersistMode::Certified => marshal.certified(round, block).await,
};
if !persisted {
debug!(?round, "marshal unable to accept block");
return;
}
}
tx.send_lossy(application_valid);
});
Expand Down Expand Up @@ -779,7 +795,12 @@ where
// Kick off deferred verification early to hide verification latency behind
// shard validity checks and network latency for collecting votes.
let round = consensus_context.round;
let task = self.deferred_verify(consensus_context, payload, None);
let task = self.deferred_verify(
consensus_context,
payload,
None,
CodingPersistMode::Verified,
);
self.verification_tasks.insert(round, payload, task);

match scheme.me() {
Expand Down Expand Up @@ -895,9 +916,10 @@ where
round,
);
if is_reproposal {
// During crash recovery we may call `marshal.verified` twice for
// the same block; the call is idempotent.
if !marshaled.marshal.verified(round, block).await {
// Certifier holds a notarization for this block, so route
// the write to the notarized cache. `certified` is
// idempotent, so crash-recovery double-invocation is safe.
if !marshaled.marshal.certified(round, block).await {
debug!(?round, "marshal unable to accept block");
return;
}
Expand All @@ -916,7 +938,12 @@ where

// Use the block's embedded context for verification, passing the
// prefetched block to avoid fetching it again inside deferred_verify.
let verify_rx = marshaled.deferred_verify(embedded_context, payload, Some(block));
let verify_rx = marshaled.deferred_verify(
embedded_context,
payload,
Some(block),
CodingPersistMode::Certified,
);
if let Ok(result) = verify_rx.await {
tx.send_lossy(result);
}
Expand Down
103 changes: 103 additions & 0 deletions consensus/src/marshal/coding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,109 @@ mod tests {
harness::finalize_same_height_different_views::<CodingHarness>();
}

#[test_traced("WARN")]
fn test_coding_certify_persists_equivocated_block() {
harness::certify_persists_equivocated_block::<CodingHarness>();
}

/// Regression test for issue #167: finalizing a descendant must not
Comment thread
patrick-ogrady marked this conversation as resolved.
Outdated
/// height-prune the shard-engine buffer before `try_repair_gaps` has
/// consumed buffer-only ancestors.
///
/// Places parent (height 1) and descendant (height 2) in the shard engine's
/// reconstructed-block cache via `proposed()`, then reports a finalization
/// for the descendant only. With the fix, the gap repair pass pulls the
/// parent from the buffer and archives it before any height-prune runs. The
/// pre-fix behavior would prune the parent from the buffer inside
/// `store_finalization(descendant)` and leave the finalized chain waiting
/// on a network fetch that never arrives.
#[test_traced("WARN")]
fn test_coding_store_finalization_does_not_prune_buffer_before_repair() {
let runner = deterministic::Runner::timed(Duration::from_secs(60));
runner.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
let mut oracle =
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
.await;

let setup = CodingHarness::setup_validator(
context.with_label("validator_0"),
&mut oracle,
participants[0].clone(),
ConstantProvider::new(schemes[0].clone()),
)
.await;
let mut handle = harness::ValidatorHandle::<CodingHarness> {
mailbox: setup.mailbox,
extra: setup.extra,
};

// Build a 2-block chain: parent at height 1, descendant at height 2.
let parent_block = CodingHarness::make_test_block(
Sha256::hash(b""),
CodingHarness::genesis_parent_commitment(NUM_VALIDATORS as u16),
Height::new(1),
1,
NUM_VALIDATORS as u16,
);
let parent_digest = CodingHarness::digest(&parent_block);
let parent_commitment = CodingHarness::commitment(&parent_block);

let descendant_block = CodingHarness::make_test_block(
parent_digest,
parent_commitment,
Height::new(2),
2,
NUM_VALIDATORS as u16,
);
let descendant_commitment = CodingHarness::commitment(&descendant_block);

// Seed the shard engine's reconstructed-block cache with both blocks.
CodingHarness::propose(
&mut handle,
Round::new(Epoch::new(0), View::new(1)),
&parent_block,
)
.await;
CodingHarness::propose(
&mut handle,
Round::new(Epoch::new(0), View::new(2)),
&descendant_block,
)
.await;

// Report finalization for the descendant only. The parent has no
// finalization certificate: it must be archived by walking the
// parent link from the descendant and sourcing the block from the
// shard-engine buffer.
let descendant_proposal = Proposal {
round: Round::new(Epoch::new(0), View::new(2)),
parent: View::new(1),
payload: descendant_commitment,
};
let descendant_finalization =
CodingHarness::make_finalization(descendant_proposal, &schemes, QUORUM);
CodingHarness::report_finalization(&mut handle.mailbox, descendant_finalization).await;

context.sleep(Duration::from_millis(200)).await;

let parent = handle.mailbox.get_block(Height::new(1)).await;
assert!(
parent.is_some(),
"parent must be archived from shard buffer before height-prune evicts it"
);
let descendant = handle.mailbox.get_block(Height::new(2)).await;
assert!(
descendant.is_some(),
"descendant must be archived after finalization"
);
});
}

#[test_traced("WARN")]
fn test_coding_init_processed_height() {
harness::init_processed_height::<CodingHarness>();
Expand Down
54 changes: 31 additions & 23 deletions consensus/src/marshal/core/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,11 @@ where
pending_acks: PendingAcks<V, A>,
// Highest known finalized height
tip: Height,
// Commitment of the highest finalized block awaiting buffer cleanup.
// Deferred from `store_finalization` so that `try_repair_gaps` can consume
// buffer-only ancestors before variant cleanup (e.g. the coding shard
// engine's height-based prune) evicts them.
pending_buffer_finalize: Option<V::Commitment>,
Comment thread
patrick-ogrady marked this conversation as resolved.
Outdated
// Outstanding subscriptions for blocks
block_subscriptions: BTreeMap<BlockSubscriptionKeyFor<V>, BlockSubscription<V>>,

Expand Down Expand Up @@ -347,6 +352,7 @@ where
last_processed_height,
pending_acks: PendingAcks::new(config.max_pending_acks.get()),
tip: Height::zero(),
pending_buffer_finalize: None,
block_subscriptions: BTreeMap::new(),
cache,
application_metadata,
Expand Down Expand Up @@ -412,6 +418,7 @@ where
{
self.sync_finalized().await;
}
self.flush_buffer_finalize(&mut buffer).await;

// Attempt to dispatch the next finalized block to the application, if it is ready.
self.try_dispatch_blocks(&mut application).await;
Expand Down Expand Up @@ -543,6 +550,10 @@ where
self.cache_verified(round, block.digest(), block).await;
ack.send_lossy(());
}
Message::Certified { round, block, ack } => {
self.cache_block(round, block.digest(), block).await;
ack.send_lossy(());
}
Message::Notarization { notarization } => {
let round = notarization.round();
let commitment = notarization.proposal.payload;
Expand Down Expand Up @@ -588,13 +599,13 @@ where
block,
Some(finalization),
&mut application,
&mut buffer,
)
.await
{
self.try_repair_gaps(&mut buffer, &mut resolver, &mut application)
.await;
self.sync_finalized().await;
self.flush_buffer_finalize(&mut buffer).await;
self.try_dispatch_blocks(&mut application).await;
debug!(?round, %height, "finalized block stored");
}
Expand Down Expand Up @@ -758,7 +769,6 @@ where
response,
&mut delivers,
&mut application,
&mut buffer,
)
.await;
}
Expand All @@ -767,7 +777,7 @@ where

// Batch verify and process all delivers.
needs_sync |= self
.verify_delivered(delivers, &mut application, &mut buffer)
.verify_delivered(delivers, &mut application)
.await;

// Attempt to fill gaps before handling produce requests (so we
Expand All @@ -780,6 +790,7 @@ where
// durability).
if needs_sync {
self.sync_finalized().await;
self.flush_buffer_finalize(&mut buffer).await;
self.try_dispatch_blocks(&mut application).await;
}

Expand Down Expand Up @@ -923,14 +934,13 @@ where
/// immediately. Finalized/Notarized delivers are parsed and structurally
/// validated, then collected into `delivers` for batch certificate verification.
/// Returns true if finalization archives were written and need syncing.
async fn handle_deliver<Buf: Buffer<V>>(
async fn handle_deliver(
&mut self,
key: Request<V::Commitment>,
value: Bytes,
response: oneshot::Sender<bool>,
delivers: &mut Vec<PendingVerification<P::Scheme, V>>,
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
buffer: &mut Buf,
) -> bool {
match key {
Request::Block(commitment) => {
Expand All @@ -949,7 +959,7 @@ where
let digest = block.digest();
let finalization = self.cache.get_finalization_for(digest).await;
let wrote = self
.store_finalization(height, digest, block, finalization, application, buffer)
.store_finalization(height, digest, block, finalization, application)
.await;
debug!(?digest, %height, "received block");
response.send_lossy(true); // if a valid block is received, we should still send true (even if it was stale)
Expand Down Expand Up @@ -1045,11 +1055,10 @@ where

/// Batch verify pending certificates and process valid items. Returns true
/// if finalization archives were written and need syncing.
async fn verify_delivered<Buf: Buffer<V>>(
async fn verify_delivered(
&mut self,
mut delivers: Vec<PendingVerification<P::Scheme, V>>,
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
buffer: &mut Buf,
) -> bool {
if delivers.is_empty() {
return false;
Expand Down Expand Up @@ -1132,14 +1141,7 @@ where
debug!(?round, %height, "received finalization");

wrote |= self
.store_finalization(
height,
digest,
block,
Some(finalization),
application,
buffer,
)
.store_finalization(height, digest, block, Some(finalization), application)
.await;
}
PendingVerification::Notarized {
Expand Down Expand Up @@ -1169,7 +1171,6 @@ where
block.clone(),
Some(finalization),
application,
buffer,
)
.await;
}
Expand Down Expand Up @@ -1414,14 +1415,13 @@ where
/// `select_loop!` so that archive data is durable before the ack handler
/// advances `last_processed_height`. See [`Self::try_dispatch_blocks`] for the
/// crash safety invariant.
async fn store_finalization<Buf: Buffer<V>>(
async fn store_finalization(
&mut self,
height: Height,
digest: <V::Block as Digestible>::Digest,
block: V::Block,
finalization: Option<Finalization<P::Scheme, V::Commitment>>,
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
buffer: &mut Buf,
) -> bool {
// Blocks below the last processed height are not useful to us, so we ignore them (this
// has the nice byproduct of ensuring we don't call a backing store with a block below the
Expand Down Expand Up @@ -1463,17 +1463,27 @@ where
panic!("failed to finalize: {e}");
}

// Update metrics, buffer, and application
// Update metrics and application
if let Some(round) = round.filter(|_| height > self.tip) {
application.report(Update::Tip(round, height, digest)).await;
self.tip = height;
let _ = self.finalized_height.try_set(height.get());
self.pending_buffer_finalize = Some(commitment);
}
buffer.finalized(commitment).await;

true
}

/// Flush any deferred `buffer.finalized` notification accumulated by
/// `store_finalization`. Must be invoked after `try_repair_gaps` and
/// `sync_finalized` so that variant cleanup (e.g. coding height-prune)
/// only runs once buffer-only ancestors have been archived.
async fn flush_buffer_finalize<Buf: Buffer<V>>(&mut self, buffer: &mut Buf) {
if let Some(commitment) = self.pending_buffer_finalize.take() {
buffer.finalized(commitment).await;
}
}

/// Get the latest finalized block information (height and digest tuple).
///
/// Blocks are only finalized directly with a finalization or indirectly via a descendant
Expand Down Expand Up @@ -1592,7 +1602,6 @@ where
block,
Some(finalization),
application,
buffer,
)
.await;
} else {
Expand Down Expand Up @@ -1638,7 +1647,6 @@ where
block.clone(),
finalization,
application,
buffer,
)
.await;
debug!(height = %block.height(), "repaired block");
Expand Down
Loading
Loading