Skip to content

Commit dd097ec

Browse files
committed
[consensus/marshal] commonware-glue marshal changes
1 parent edc5cce commit dd097ec

8 files changed

Lines changed: 334 additions & 63 deletions

File tree

consensus/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,8 +233,8 @@ stability_scope!(ALPHA {
233233
pub mod ordered_broadcast;
234234
});
235235
stability_scope!(ALPHA, cfg(not(target_arch = "wasm32")) {
236-
use commonware_cryptography::certificate::Scheme;
237236
use crate::marshal::ancestry::Ancestry;
237+
use commonware_cryptography::certificate::Scheme;
238238
use commonware_runtime::{Clock, Metrics, Spawner};
239239
use rand::Rng;
240240

consensus/src/marshal/ancestry.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,12 +86,21 @@ pub struct AncestorStream<M: BlockProvider> {
8686
}
8787

8888
impl<M: BlockProvider> AncestorStream<M> {
89+
/// Returns a reference to the next block that will be yielded by the
90+
/// stream, without consuming it.
91+
///
92+
/// Returns `None` if the buffer is empty and the next block is being
93+
/// fetched asynchronously.
94+
pub fn peek(&self) -> Option<&M::Block> {
95+
self.buffered.last()
96+
}
97+
8998
/// Creates a new [AncestorStream] starting from the given ancestry.
9099
///
91100
/// # Panics
92101
///
93102
/// Panics if the initial blocks are not contiguous.
94-
pub(crate) fn new(marshal: M, initial: impl IntoIterator<Item = M::Block>) -> Self {
103+
pub fn new(marshal: M, initial: impl IntoIterator<Item = M::Block>) -> Self {
95104
let mut buffered = initial.into_iter().collect::<Vec<M::Block>>();
96105
buffered.sort_by_key(Heightable::height);
97106

consensus/src/marshal/coding/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -809,6 +809,11 @@ mod tests {
809809
harness::prune_finalized_archives::<CodingHarness>();
810810
}
811811

812+
#[test_traced("WARN")]
813+
fn test_coding_set_floor_without_pruning_preserves_archives() {
814+
harness::set_floor_without_pruning_preserves_archives::<CodingHarness>();
815+
}
816+
812817
#[test_traced("WARN")]
813818
fn test_coding_rejects_block_delivery_below_floor() {
814819
harness::reject_stale_block_delivery_after_floor_update::<CodingHarness>();

consensus/src/marshal/core/actor.rs

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -353,9 +353,10 @@ where
353353

354354
// A configured floor follows the same path as `SetFloor`: verify it,
355355
// then apply a local anchor or fetch the anchor block.
356-
if let Some(finalization) = self.floor.take_pending_anchor() {
356+
if let Some((finalization, prune_archives)) = self.floor.take_pending_anchor() {
357357
self.install_floor(
358358
finalization,
359+
prune_archives,
359360
false,
360361
&mut resolver,
361362
&mut buffer,
@@ -692,9 +693,19 @@ where
692693
let finalization = self.get_finalization_by_height(height).await;
693694
response.send_lossy(finalization);
694695
}
696+
Message::GetProcessedHeight { response } => {
697+
response.send_lossy(self.floor.processed_height());
698+
}
695699
Message::HintFinalized { height, targets } => {
696-
// Skip if finalization is already available locally.
697-
if self.get_finalization_by_height(height).await.is_some() {
700+
// Skip if height is below the floor.
701+
if height < self.floor.processed_height() {
702+
return;
703+
}
704+
705+
// Skip if both the finalization and block are already local.
706+
if self.get_finalization_by_height(height).await.is_some()
707+
&& self.get_finalized_block(height).await.is_some()
708+
{
698709
return;
699710
}
700711

@@ -743,9 +754,19 @@ where
743754
.ignore();
744755
}
745756
}
746-
Message::SetFloor { finalization } => {
747-
self.install_floor(finalization, true, resolver, buffer, application)
748-
.await;
757+
Message::SetFloor {
758+
finalization,
759+
prune_archives,
760+
} => {
761+
self.install_floor(
762+
finalization,
763+
prune_archives,
764+
true,
765+
resolver,
766+
buffer,
767+
application,
768+
)
769+
.await;
749770
}
750771
Message::Prune { height } => {
751772
// Only allow pruning at or below the current floor.
@@ -991,6 +1012,7 @@ where
9911012
async fn install_floor<Buf, R>(
9921013
&mut self,
9931014
finalization: Finalization<P::Scheme, V::Commitment>,
1015+
prune_archives: bool,
9941016
skip_if_superseded: bool,
9951017
resolver: &mut R,
9961018
buffer: &mut OptionalBuffer<V, Buf>,
@@ -1034,7 +1056,7 @@ where
10341056
}
10351057

10361058
if let Some(block) = self.find_block_by_commitment(buffer, commitment).await {
1037-
self.floor.await_anchor(finalization);
1059+
self.floor.await_anchor(finalization, prune_archives);
10381060
assert!(
10391061
self.apply_floor_anchor(&block, buffer, application, resolver)
10401062
.await
@@ -1047,7 +1069,7 @@ where
10471069
self.pending_acks.clear();
10481070

10491071
debug!(?round, ?commitment, "starting fetch for floor block");
1050-
self.floor.await_anchor(finalization);
1072+
self.floor.await_anchor(finalization, prune_archives);
10511073
self.floor
10521074
.fetch_if_permitted(
10531075
resolver,
@@ -1094,7 +1116,7 @@ where
10941116
existing = %self.floor.processed_height(),
10951117
"floor not updated, at or below existing"
10961118
);
1097-
let finalization = self
1119+
let (finalization, _prune_archives) = self
10981120
.floor
10991121
.take_pending_anchor()
11001122
.expect("pending floor anchor missing");
@@ -1108,7 +1130,7 @@ where
11081130
}
11091131

11101132
let digest = block.digest();
1111-
let finalization = self
1133+
let (finalization, prune_archives) = self
11121134
.floor
11131135
.take_pending_anchor()
11141136
.expect("pending floor anchor missing");
@@ -1152,10 +1174,11 @@ where
11521174
// acks for blocks below the new floor cannot rewrite the processed floor.
11531175
self.pending_acks.clear();
11541176

1155-
// The floor is durable, so cache/finalized data below it can be pruned.
1156-
self.prune_after_floor(height)
1157-
.await
1158-
.expect("failed to prune data below floor");
1177+
if prune_archives {
1178+
self.prune_after_floor(height)
1179+
.await
1180+
.expect("failed to prune data below floor");
1181+
}
11591182

11601183
// Intentionally keep existing block subscriptions alive. Canceling
11611184
// waiters can have catastrophic consequences (nodes can get stuck in
@@ -1726,9 +1749,9 @@ where
17261749
finalization: Option<Finalization<P::Scheme, V::Commitment>>,
17271750
application: &mut impl Reporter<Activity = Update<V::ApplicationBlock, A>>,
17281751
) -> bool {
1729-
// Blocks below the last processed height are not useful to us, so we ignore them (this
1730-
// has the nice byproduct of ensuring we don't call a backing store with a block below the
1731-
// pruning boundary)
1752+
// Blocks at or below the last processed height are not useful to us, so we
1753+
// ignore them (this has the nice byproduct of ensuring we don't call
1754+
// a backing store with a block below the pruning boundary).
17321755
if height <= self.floor.processed_height() {
17331756
debug!(
17341757
%height,

consensus/src/marshal/core/floor.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ impl FetchAdmission {
4242
/// The processed floor plus any pending floor update awaiting its anchor block.
4343
pub(super) struct Floor<S: CertificateScheme, C: Digest> {
4444
processed: ProcessedFloor,
45-
pending: Option<Finalization<S, C>>,
45+
pending: Option<(Finalization<S, C>, bool)>,
4646
}
4747

4848
impl<S: CertificateScheme, C: Digest> Floor<S, C> {
@@ -60,7 +60,7 @@ impl<S: CertificateScheme, C: Digest> Floor<S, C> {
6060
) -> Self {
6161
Self {
6262
processed: ProcessedFloor { height, round },
63-
pending: Some(finalization),
63+
pending: Some((finalization, true)),
6464
}
6565
}
6666

@@ -87,22 +87,22 @@ impl<S: CertificateScheme, C: Digest> Floor<S, C> {
8787

8888
/// Returns true if a pending floor already supersedes the candidate floor round.
8989
pub(super) fn has_pending_anchor_at_or_after(&self, round: Round) -> bool {
90-
matches!(&self.pending, Some(pending) if pending.round() >= round)
90+
matches!(&self.pending, Some((pending, _)) if pending.round() >= round)
9191
}
9292

9393
/// Returns true when `commitment` is the awaited anchor.
9494
pub(super) fn matches_pending_anchor(&self, commitment: C) -> bool {
95-
matches!(&self.pending, Some(pending) if pending.proposal.payload == commitment)
95+
matches!(&self.pending, Some((pending, _)) if pending.proposal.payload == commitment)
9696
}
9797

9898
/// Records a verified floor finalization whose block anchor still needs to arrive.
99-
pub(super) fn await_anchor(&mut self, finalization: Finalization<S, C>) {
100-
self.pending = Some(finalization);
99+
pub(super) fn await_anchor(&mut self, finalization: Finalization<S, C>, prune_archives: bool) {
100+
self.pending = Some((finalization, prune_archives));
101101
}
102102

103103
/// Takes the pending anchor finalization, if any.
104104
#[must_use]
105-
pub(super) const fn take_pending_anchor(&mut self) -> Option<Finalization<S, C>> {
105+
pub(super) const fn take_pending_anchor(&mut self) -> Option<(Finalization<S, C>, bool)> {
106106
self.pending.take()
107107
}
108108

0 commit comments

Comments
 (0)