Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions consensus/src/marshal/coding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,11 @@ mod tests {
harness::ack_pipeline_backlog_persists_on_restart::<CodingHarness>();
}

#[test_traced("WARN")]
fn test_coding_genesis_emitted_once() {
harness::genesis_emitted_once::<CodingHarness>();
}

#[test_traced("WARN")]
fn test_coding_proposed_success_implies_recoverable_after_restart() {
harness::proposed_success_implies_recoverable_after_restart::<CodingHarness>(0..16);
Expand Down
6 changes: 5 additions & 1 deletion consensus/src/marshal/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@ use std::num::{NonZeroU64, NonZeroUsize};
/// supersedes the configured anchor.
pub enum Start<S: Scheme, C: Digest, B> {
/// Start from the height-zero genesis block.
///
/// Genesis is emitted to the application unless durable metadata shows it
Comment thread
patrick-ogrady marked this conversation as resolved.
Outdated
/// was already acknowledged.
Genesis(B),
/// Start from an already-processed finalized commitment.
/// Start from a finalized commitment, delivering the floor block to the
/// application if it has not already been acknowledged.
Floor(Finalization<S, C>),
}

Expand Down
12 changes: 6 additions & 6 deletions consensus/src/marshal/core/acks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ impl<V: Variant, A: Acknowledgement> PendingAcks<V, A> {
}

/// Returns the next height to dispatch while preserving sequential order.
pub(super) fn next_dispatch_height(&self, last_processed_height: Height) -> Height {
pub(super) fn next_dispatch_height(&self, start_height: Height) -> Height {
self.queue
.back()
.map(|ack| ack.height.next())
.or_else(|| self.current.as_ref().map(|ack| ack.height.next()))
.unwrap_or_else(|| last_processed_height.next())
.unwrap_or(start_height)
}

/// Enqueues a newly dispatched ack, arming it immediately when idle.
Expand Down Expand Up @@ -136,18 +136,18 @@ mod tests {
fn enqueue_tracks_capacity_and_fifo_ready_order() {
let mut pending = PendingAcks::<TestVariant, Exact>::new(2);
assert!(pending.has_capacity());
assert_eq!(pending.next_dispatch_height(Height::new(7)), Height::new(8));
assert_eq!(pending.next_dispatch_height(Height::new(8)), Height::new(8));

let (first, first_ack) = pending_ack(8, 1);
pending.enqueue(first);
assert!(pending.has_capacity());
assert_eq!(pending.next_dispatch_height(Height::new(7)), Height::new(9));
assert_eq!(pending.next_dispatch_height(Height::new(8)), Height::new(9));

let (second, second_ack) = pending_ack(9, 2);
pending.enqueue(second);
assert!(!pending.has_capacity());
assert_eq!(
pending.next_dispatch_height(Height::new(7)),
pending.next_dispatch_height(Height::new(8)),
Height::new(10)
);

Expand Down Expand Up @@ -185,7 +185,7 @@ mod tests {
assert!(pending.pop_ready().is_none());
assert!(pending.has_capacity());
assert_eq!(
pending.next_dispatch_height(Height::new(9)),
pending.next_dispatch_height(Height::new(10)),
Height::new(10)
);
}
Expand Down
76 changes: 65 additions & 11 deletions consensus/src/marshal/core/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,47 @@ struct ResolverDelivery<V: Variant> {
response: oneshot::Sender<bool>,
}

/// Last block acknowledged by the application.
#[derive(Clone, Copy)]
enum ApplicationFloor {
Comment thread
patrick-ogrady marked this conversation as resolved.
Outdated
BeforeGenesis,
Acknowledged(Height),
}

impl ApplicationFloor {
Comment thread
patrick-ogrady marked this conversation as resolved.
Outdated
const fn from_metadata(height: Option<Height>) -> Self {
match height {
Some(height) => Self::Acknowledged(height),
None => Self::BeforeGenesis,
}
}

const fn processed_height(self) -> Height {
match self {
Self::BeforeGenesis => Height::zero(),
Self::Acknowledged(height) => height,
}
}

const fn round_restore_height(self) -> Height {
match self {
Self::BeforeGenesis => Height::zero(),
Self::Acknowledged(height) => height.next(),
}
}

const fn next_dispatch_height(self) -> Height {
match self {
Self::BeforeGenesis => Height::zero(),
Self::Acknowledged(height) => height.next(),
}
}

const fn acknowledge(&mut self, height: Height) {
*self = Self::Acknowledged(height);
}
}

/// The [Actor] is responsible for receiving uncertified blocks from the broadcast mechanism,
/// receiving notarizations and finalizations from consensus, and reconstructing a total order
/// of blocks.
Expand Down Expand Up @@ -121,6 +162,8 @@ where
last_proposed_block: Option<(Round, V::Commitment, V::Block)>,
// Current processed floor and any pending floor update
floor: Floor<P::Scheme, V::Commitment>,
// Application delivery cursor
application_floor: ApplicationFloor,
// Pending application acknowledgements
pending_acks: PendingAcks<V, A>,
// Highest known finalized height
Expand Down Expand Up @@ -193,10 +236,9 @@ where
)
.await
.expect("failed to initialize application metadata");
let last_processed_height = application_metadata
.get(&LATEST_KEY)
.copied()
.unwrap_or(Height::zero());
let application_floor =
ApplicationFloor::from_metadata(application_metadata.get(&LATEST_KEY).copied());
let last_processed_height = application_floor.processed_height();
Comment thread
patrick-ogrady marked this conversation as resolved.
Outdated

// Genesis is a local anchor. A floor finalization is verified and
// resolved after `run` receives the resolver and buffer.
Expand All @@ -213,8 +255,14 @@ where
}
Start::Floor(finalization) => Some(finalization),
};
let last_processed_round =
Self::latest_processed_round(&finalizations_by_height, last_processed_height).await;
// A floor transition to height H is persisted as H - 1 until the
// application acknowledges H. If that persisted cursor exists, looking
// one height ahead restores the anchor's round across the crash window.
let last_processed_round = Self::latest_processed_round(
&finalizations_by_height,
application_floor.round_restore_height(),
)
.await;

// Create metrics
let finalized_height = context.gauge("finalized_height", "Finalized height of application");
Expand All @@ -241,6 +289,7 @@ where
strategy: config.strategy,
last_proposed_block: None,
floor,
application_floor,
pending_acks: PendingAcks::new(config.max_pending_acks.get()),
tip: Height::zero(),
block_subscriptions: Subscriptions::new(),
Expand Down Expand Up @@ -482,7 +531,7 @@ where
}
};

// Persist buffered processed-height updates once after draining all ready acks.
// Persist buffered progress updates once after draining all ready acks.
self.application_metadata
.sync()
.await
Expand Down Expand Up @@ -1139,9 +1188,13 @@ where
let _ = self.finalized_height.try_set(height.get());
}

// Update the processed floor after the anchor and certificate are durable.
self.update_processed_height(height, resolver);
self.update_processed_round_floor(height, round, resolver)
// The anchor is durable, but the application still needs to process it.
// Record the previous height so dispatch resumes at the anchor itself.
let dispatch_floor = height
.previous()
.expect("floor anchor above processed height must have predecessor");
self.update_processed_height(dispatch_floor, resolver);
self.update_processed_round_floor(dispatch_floor, round, resolver)
.await;
self.application_metadata
.sync()
Expand Down Expand Up @@ -1578,7 +1631,7 @@ where
while self.pending_acks.has_capacity() {
let next_height = self
.pending_acks
.next_dispatch_height(self.floor.processed_height());
.next_dispatch_height(self.application_floor.next_dispatch_height());
let Some(block) = self.get_finalized_block(next_height).await else {
return;
};
Expand Down Expand Up @@ -2034,6 +2087,7 @@ where
PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
>,
) {
self.application_floor.acknowledge(height);
self.application_metadata.put(LATEST_KEY, height);
self.floor.set_processed_height(height);
let _ = self
Expand Down
10 changes: 4 additions & 6 deletions consensus/src/marshal/core/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,11 @@ pub(crate) enum Message<S: Scheme, V: Variant> {
/// A channel signaled once the block is durably stored.
ack: Option<oneshot::Sender<()>>,
},
/// Attempts to set the sync starting point from an already-processed finalization.
/// Attempts to set the sync starting point from a finalized commitment.
///
/// If the verified finalization advances marshal's current floor, marshal
/// anchors on its block, prunes below it, then syncs and delivers blocks
/// starting at the floor height + 1. Stale or superseded floors may be
/// ignored.
/// starting at the floor height. Stale or superseded floors may be ignored.
///
/// To prune data without changing the sync starting point, use
/// [Message::Prune] instead.
Expand Down Expand Up @@ -741,12 +740,11 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
receiver.await.is_ok()
}

/// Attempts to set the sync starting point from an already-processed finalization.
/// Attempts to set the sync starting point from a finalized commitment.
///
/// If the verified finalization advances marshal's current floor, marshal
/// anchors on its block, prunes below it, then syncs and delivers blocks
/// starting at the floor height + 1. Stale or superseded floors may be
/// ignored.
/// starting at the floor height. Stale or superseded floors may be ignored.
///
/// To prune data without changing the sync starting point, use
/// [Self::prune] instead.
Expand Down
Loading
Loading