Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 73 additions & 22 deletions consensus/src/marshal/core/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -474,11 +474,23 @@ where
"startup anchor below existing processed height, ignoring"
);
} else {
let anchor_commitment = V::commitment(&anchor);
match finalized_blocks
.get(ArchiveID::Index(anchor_height.get()))
.await
{
Ok(Some(_)) => {}
Ok(Some(stored)) => {
let stored = stored.into();
assert_eq!(
stored.height(),
anchor_height,
"stored genesis block height mismatch"
);
assert!(
Comment thread
commonware-llm marked this conversation as resolved.
Outdated
V::commitment(&stored) == anchor_commitment,
"stored genesis block does not match configured anchor"
);
}
Ok(None) => {
finalized_blocks
.put(anchor.into())
Expand Down Expand Up @@ -592,7 +604,6 @@ where
if let Some(finalization) = self.floor.take_pending_anchor() {
self.install_floor(
finalization,
true,
false,
&mut resolver,
&mut buffer,
Expand Down Expand Up @@ -752,7 +763,12 @@ where
// the retention floor (and no longer is required by consensus
// to make progress).
self.cache_verified(round, block.digest(), block.clone()).await;
self.apply_floor_anchor(&block, &mut application, &mut resolver)
self.apply_floor_anchor(
&block,
&mut buffer,
&mut application,
&mut resolver,
)
.await;
// Retain the block in memory so the subsequent
// `Forward` can broadcast it without reloading from
Expand All @@ -768,7 +784,12 @@ where
// the retention floor (and no longer is required by consensus
// to make progress).
self.cache_verified(round, block.digest(), block.clone()).await;
self.apply_floor_anchor(&block, &mut application, &mut resolver)
self.apply_floor_anchor(
&block,
&mut buffer,
&mut application,
&mut resolver,
)
.await;
ack.expect("durable ack present").send_lossy(());
}
Expand All @@ -778,7 +799,12 @@ where
// the retention floor (and no longer is required by consensus
// to make progress).
self.cache_block(round, block.digest(), block.clone()).await;
self.apply_floor_anchor(&block, &mut application, &mut resolver)
self.apply_floor_anchor(
&block,
&mut buffer,
&mut application,
&mut resolver,
)
.await;
ack.expect("durable ack present").send_lossy(());
}
Expand All @@ -802,7 +828,12 @@ where
{
// If found, persist the block
self.cache_block(round, digest, block.clone()).await;
self.apply_floor_anchor(&block, &mut application, &mut resolver)
self.apply_floor_anchor(
&block,
&mut buffer,
&mut application,
&mut resolver,
)
.await;
} else {
debug!(?round, "notarized block unavailable locally");
Expand All @@ -825,7 +856,12 @@ where
// finalization, advances floors, prunes below
// them, and resumes dispatch.
if self
.apply_floor_anchor(&block, &mut application, &mut resolver)
.apply_floor_anchor(
&block,
&mut buffer,
&mut application,
&mut resolver,
)
.await
{
continue;
Expand Down Expand Up @@ -948,7 +984,6 @@ where
Message::SetFloor { finalization } => {
self.install_floor(
finalization,
false,
true,
&mut resolver,
&mut buffer,
Expand Down Expand Up @@ -1010,6 +1045,7 @@ where
value,
response,
&mut delivers,
&mut buffer,
&mut application,
&mut resolver,
)
Expand All @@ -1023,7 +1059,7 @@ where

// Batch verify and process all delivers.
needs_sync |= self
.verify_delivered(delivers, &mut application, &mut resolver)
.verify_delivered(delivers, &mut buffer, &mut application, &mut resolver)
.await;

// Attempt to fill gaps before handling produce requests (so we
Expand Down Expand Up @@ -1225,7 +1261,6 @@ where
async fn install_floor<Buf, R>(
&mut self,
finalization: Finalization<P::Scheme, V::Commitment>,
fatal_on_invalid: bool,
skip_if_superseded: bool,
resolver: &mut R,
buffer: &mut Buf,
Expand All @@ -1249,11 +1284,7 @@ where
}

if !self.verify_finalization(&finalization) {
if fatal_on_invalid {
panic!("floor finalization must verify");
}
warn!(?round, "floor finalization failed verification");
return;
panic!("floor finalization must verify");
}
Comment thread
commonware-llm marked this conversation as resolved.
Outdated

let commitment = finalization.proposal.payload;
Expand All @@ -1270,7 +1301,9 @@ where

if let Some(block) = self.find_block_by_commitment(buffer, commitment).await {
self.floor.await_anchor(finalization);
let applied = self.apply_floor_anchor(&block, application, resolver).await;
let applied = self
.apply_floor_anchor(&block, buffer, application, resolver)
.await;
debug_assert!(applied);
return;
}
Expand All @@ -1295,9 +1328,10 @@ where
}

/// Applies a block if it satisfies the current floor transition.
async fn apply_floor_anchor(
async fn apply_floor_anchor<Buf: Buffer<V>>(
&mut self,
block: &V::Block,
buffer: &mut Buf,
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
resolver: &mut impl Resolver<
Key = ResolverRequestFor<V>,
Expand Down Expand Up @@ -1334,6 +1368,9 @@ where
.expect("pending floor anchor missing");
self.update_processed_round_floor(height, finalization.round(), resolver)
.await;
if self.try_repair_gaps(buffer, resolver, application).await {
self.sync_finalized().await;
}
self.try_dispatch_blocks(application).await;
return true;
}
Expand Down Expand Up @@ -1391,6 +1428,9 @@ where
// Intentionally keep existing block subscriptions alive. Canceling
// waiters can have catastrophic consequences (nodes can get stuck in
// different views) as actors do not retry subscriptions on failed channels.
if self.try_repair_gaps(buffer, resolver, application).await {
self.sync_finalized().await;
}
self.try_dispatch_blocks(application).await;
true
}
Expand All @@ -1399,12 +1439,13 @@ where
/// immediately. Finalized/Notarized delivers are parsed and structurally
/// validated, then collected into `delivers` for batch certificate verification.
/// Returns true if finalization archives were written and need syncing.
async fn handle_deliver(
async fn handle_deliver<Buf: Buffer<V>>(
&mut self,
delivery: Delivery<ResolverRequestFor<V>, Annotation>,
mut value: Bytes,
response: oneshot::Sender<bool>,
delivers: &mut Vec<PendingVerification<P::Scheme, V>>,
buffer: &mut Buf,
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
resolver: &mut impl Resolver<
Key = ResolverRequestFor<V>,
Expand All @@ -1428,7 +1469,10 @@ where
// This block may match the pending floor request. Whether it
// installs or is rejected as the floor anchor, do not also
// process it as an ordinary block delivery.
if self.apply_floor_anchor(&block, application, resolver).await {
if self
.apply_floor_anchor(&block, buffer, application, resolver)
.await
{
response.send_lossy(true);
return false;
}
Expand Down Expand Up @@ -1571,9 +1615,10 @@ where

/// Batch verify pending certificates and process valid items. Returns true
/// if finalization archives were written and need syncing.
async fn verify_delivered(
async fn verify_delivered<Buf: Buffer<V>>(
&mut self,
mut delivers: Vec<PendingVerification<P::Scheme, V>>,
buffer: &mut Buf,
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
resolver: &mut impl Resolver<
Key = ResolverRequestFor<V>,
Expand Down Expand Up @@ -1673,7 +1718,10 @@ where

// The floor-anchor path fully handles this finalization
// and moves the lower bound past it.
if self.apply_floor_anchor(&block, application, resolver).await {
if self
.apply_floor_anchor(&block, buffer, application, resolver)
.await
{
continue;
}

Expand Down Expand Up @@ -1705,7 +1753,10 @@ where

// A notarized delivery can carry the pending floor block
// after the finalization is cached.
if self.apply_floor_anchor(&block, application, resolver).await {
if self
.apply_floor_anchor(&block, buffer, application, resolver)
.await
{
continue;
}

Expand Down
Loading
Loading