Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions consensus/src/marshal/coding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,11 @@ mod tests {
harness::ack_pipeline_backlog_persists_on_restart::<CodingHarness>();
}

#[test_traced("WARN")]
fn test_coding_genesis_emitted_once() {
harness::genesis_emitted_once::<CodingHarness>();
}

#[test_traced("WARN")]
fn test_coding_proposed_success_implies_recoverable_after_restart() {
harness::proposed_success_implies_recoverable_after_restart::<CodingHarness>(0..16);
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/marshal/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::num::{NonZeroU64, NonZeroUsize};
pub enum Start<S: Scheme, C: Digest, B> {
/// Start from the height-zero genesis block.
Genesis(B),
/// Start from an already-processed finalized commitment.
/// Start from a finalized commitment.
Floor(Finalization<S, C>),
}

Expand Down
12 changes: 6 additions & 6 deletions consensus/src/marshal/core/acks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,12 @@ impl<V: Variant, A: Acknowledgement> PendingAcks<V, A> {
}

/// Returns the next height to dispatch while preserving sequential order.
pub(super) fn next_dispatch_height(&self, last_processed_height: Height) -> Height {
pub(super) fn next_dispatch_height(&self, start_height: Height) -> Height {
self.queue
.back()
.map(|ack| ack.height.next())
.or_else(|| self.current.as_ref().map(|ack| ack.height.next()))
.unwrap_or_else(|| last_processed_height.next())
.unwrap_or(start_height)
}

/// Enqueues a newly dispatched ack, arming it immediately when idle.
Expand Down Expand Up @@ -136,18 +136,18 @@ mod tests {
fn enqueue_tracks_capacity_and_fifo_ready_order() {
let mut pending = PendingAcks::<TestVariant, Exact>::new(2);
assert!(pending.has_capacity());
assert_eq!(pending.next_dispatch_height(Height::new(7)), Height::new(8));
assert_eq!(pending.next_dispatch_height(Height::new(8)), Height::new(8));

let (first, first_ack) = pending_ack(8, 1);
pending.enqueue(first);
assert!(pending.has_capacity());
assert_eq!(pending.next_dispatch_height(Height::new(7)), Height::new(9));
assert_eq!(pending.next_dispatch_height(Height::new(8)), Height::new(9));

let (second, second_ack) = pending_ack(9, 2);
pending.enqueue(second);
assert!(!pending.has_capacity());
assert_eq!(
pending.next_dispatch_height(Height::new(7)),
pending.next_dispatch_height(Height::new(8)),
Height::new(10)
);

Expand Down Expand Up @@ -185,7 +185,7 @@ mod tests {
assert!(pending.pop_ready().is_none());
assert!(pending.has_capacity());
assert_eq!(
pending.next_dispatch_height(Height::new(9)),
pending.next_dispatch_height(Height::new(10)),
Height::new(10)
);
}
Expand Down
54 changes: 22 additions & 32 deletions consensus/src/marshal/core/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use super::{
delivery::PendingVerification,
floor::Floor,
mailbox::{CommitmentFallback, Mailbox, Message},
stream::Stream,
subscriptions::{Key as SubscriptionKey, KeyFor as SubscriptionKeyFor, Subscriptions},
variant::OptionalBuffer,
Buffer, Variant,
Expand Down Expand Up @@ -37,25 +38,18 @@ use commonware_runtime::{
telemetry::metrics::{Gauge, GaugeExt, MetricsExt as _},
BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage,
};
use commonware_storage::{
archive::Identifier as ArchiveID,
metadata::{self, Metadata},
};
use commonware_storage::archive::Identifier as ArchiveID;
use commonware_utils::{
acknowledgement::Exact,
channel::{fallible::OneshotExt, oneshot},
futures::AbortablePool,
sequence::U64,
Acknowledgement, BoxedError,
};
use futures::{future::join_all, try_join};
use rand_core::CryptoRngCore;
use std::{collections::BTreeMap, future::Future, num::NonZeroUsize, sync::Arc};
use tracing::{debug, warn};

/// The key used to store the last processed height in the metadata store.
const LATEST_KEY: U64 = U64::new(0xFF);

// Resolver request keys are expressed in the variant commitment type, which
// may differ from the block digest for coded variants.
type ResolverRequestFor<V> = Key<<V as Variant>::Commitment>;
Expand Down Expand Up @@ -121,6 +115,8 @@ where
last_proposed_block: Option<(Round, V::Commitment, V::Block)>,
// Current processed floor and any pending floor update
floor: Floor<P::Scheme, V::Commitment>,
// Application delivery cursor
stream: Stream<E>,
// Pending application acknowledgements
pending_acks: PendingAcks<V, A>,
// Highest known finalized height
Expand All @@ -131,8 +127,6 @@ where
// ---------- Storage ----------
// Prunable cache
cache: cache::Manager<E, V, P::Scheme>,
// Metadata tracking application progress
application_metadata: Metadata<E, U64, Height>,
// Finalizations stored by height
finalizations_by_height: FC,
// Finalized blocks stored by height
Expand Down Expand Up @@ -183,20 +177,12 @@ where
)
.await;

// Initialize metadata tracking application progress
let application_metadata = Metadata::init(
let stream = Stream::new(
context.child("application_metadata"),
Comment thread
patrick-ogrady marked this conversation as resolved.
Outdated
metadata::Config {
partition: format!("{}-application-metadata", config.partition_prefix),
codec_config: (),
},
&config.partition_prefix,
)
.await
.expect("failed to initialize application metadata");
let last_processed_height = application_metadata
.get(&LATEST_KEY)
.copied()
.unwrap_or(Height::zero());
.await;
let last_processed_height = stream.processed_height().unwrap_or_else(Height::zero);

// Genesis is a local anchor. A floor finalization is verified and
// resolved after `run` receives the resolver and buffer.
Expand Down Expand Up @@ -241,11 +227,11 @@ where
strategy: config.strategy,
last_proposed_block: None,
floor,
stream,
pending_acks: PendingAcks::new(config.max_pending_acks.get()),
tip: Height::zero(),
block_subscriptions: Subscriptions::new(),
cache,
application_metadata,
finalizations_by_height,
finalized_blocks,
finalized_height,
Expand Down Expand Up @@ -482,8 +468,8 @@ where
}
};

// Persist buffered processed-height updates once after draining all ready acks.
self.application_metadata
// Persist buffered progress updates once after draining all ready acks.
self.stream
.sync()
.await
.expect("failed to sync application progress");
Expand Down Expand Up @@ -1142,11 +1128,15 @@ where
let _ = self.finalized_height.try_set(height.get());
}

// Update the processed floor after the anchor and certificate are durable.
self.update_processed_height(height, resolver);
self.update_processed_round_floor(height, round, resolver)
// The anchor is durable, but the application still needs to process it.
// Record the previous height so dispatch resumes at the anchor itself.
let dispatch_floor = height
.previous()
.expect("floor anchor above processed height must have predecessor");
self.update_processed_height(dispatch_floor, resolver);
self.update_processed_round_floor(dispatch_floor, round, resolver)
.await;
self.application_metadata
self.stream
.sync()
.await
.expect("failed to sync floor metadata");
Expand Down Expand Up @@ -1567,7 +1557,7 @@ where
///
/// Iteration M (ack handler, M > N):
/// ack handler -> update_processed_height -> metadata buffered
/// application_metadata.sync -> metadata durable
/// stream.sync -> metadata durable
/// ```
async fn try_dispatch_blocks(
&mut self,
Expand All @@ -1581,7 +1571,7 @@ where
while self.pending_acks.has_capacity() {
let next_height = self
.pending_acks
.next_dispatch_height(self.floor.processed_height());
.next_dispatch_height(self.stream.next_height());
let Some(block) = self.get_finalized_block(next_height).await else {
return;
};
Expand Down Expand Up @@ -2037,7 +2027,7 @@ where
PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
>,
) {
self.application_metadata.put(LATEST_KEY, height);
self.stream.acknowledge(height);
self.floor.set_processed_height(height);
let _ = self
.processed_height
Expand Down
10 changes: 4 additions & 6 deletions consensus/src/marshal/core/mailbox.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,12 +144,11 @@ pub(crate) enum Message<S: Scheme, V: Variant> {
/// A channel signaled once the block is durably stored.
ack: Option<oneshot::Sender<()>>,
},
/// Attempts to set the sync starting point from an already-processed finalization.
/// Attempts to set the sync starting point from a finalized commitment.
///
/// If the verified finalization advances marshal's current floor, marshal
/// anchors on its block, prunes below it, then syncs and delivers blocks
/// starting at the floor height + 1. Stale or superseded floors may be
/// ignored.
/// starting at the floor height. Stale or superseded floors may be ignored.
///
/// To prune data without changing the sync starting point, use
/// [Message::Prune] instead.
Expand Down Expand Up @@ -756,12 +755,11 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
receiver.await.is_ok()
}

/// Attempts to set the sync starting point from an already-processed finalization.
/// Attempts to set the sync starting point from a finalized commitment.
///
/// If the verified finalization advances marshal's current floor, marshal
/// anchors on its block, prunes below it, then syncs and delivers blocks
/// starting at the floor height + 1. Stale or superseded floors may be
/// ignored.
/// starting at the floor height. Stale or superseded floors may be ignored.
///
/// To prune data without changing the sync starting point, use
/// [Self::prune] instead.
Expand Down
1 change: 1 addition & 0 deletions consensus/src/marshal/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ mod acks;
pub(crate) mod cache;
mod delivery;
mod floor;
mod stream;

mod mailbox;
pub use mailbox::{CommitmentFallback, DigestFallback, Mailbox};
Expand Down
82 changes: 82 additions & 0 deletions consensus/src/marshal/core/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
use crate::types::Height;
use commonware_storage::{
metadata::{self, Metadata},
Context,
};
use commonware_utils::sequence::U64;

/// The key used to store the last processed height in the metadata store.
const LATEST_KEY: U64 = U64::new(0xFF);

/// Last block acknowledged by the application.
#[derive(Clone, Copy)]
enum State {
Unprocessed,
Processed(Height),
}

impl State {
const fn new(processed_height: Option<Height>) -> Self {
match processed_height {
Some(height) => Self::Processed(height),
None => Self::Unprocessed,
}
}

const fn processed_height(self) -> Option<Height> {
match self {
Self::Unprocessed => None,
Self::Processed(height) => Some(height),
}
}

const fn next_height(self) -> Height {
match self {
Self::Unprocessed => Height::zero(),
Self::Processed(height) => height.next(),
}
}

const fn acknowledge(&mut self, height: Height) {
*self = Self::Processed(height);
}
}

/// Application delivery stream progress and durable metadata.
pub(super) struct Stream<E: Context> {
metadata: Metadata<E, U64, Height>,
state: State,
}

impl<E: Context> Stream<E> {
pub(super) async fn new(context: E, partition_prefix: &str) -> Self {
let metadata = Metadata::init(
context,
metadata::Config {
partition: format!("{partition_prefix}-application-metadata"),
Comment thread
patrick-ogrady marked this conversation as resolved.
Outdated
codec_config: (),
},
)
.await
.expect("failed to initialize application metadata");
let state = State::new(metadata.get(&LATEST_KEY).copied());
Self { metadata, state }
}

pub(super) const fn processed_height(&self) -> Option<Height> {
self.state.processed_height()
}

pub(super) const fn next_height(&self) -> Height {
self.state.next_height()
}

pub(super) fn acknowledge(&mut self, height: Height) {
self.state.acknowledge(height);
self.metadata.put(LATEST_KEY, height);
}

pub(super) async fn sync(&self) -> Result<(), metadata::Error> {
self.metadata.sync().await
}
}
Loading
Loading