Skip to content

Commit f87ceb0

Browse files
nits
1 parent 28f62e0 commit f87ceb0

4 files changed

Lines changed: 38 additions & 20 deletions

File tree

consensus/src/marshal/core/actor.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ where
160160
finalizations_by_height: FC,
161161
mut finalized_blocks: FB,
162162
config: Config<P, ES, T, V::ApplicationBlock, V::Block, V::Commitment>,
163-
) -> (Self, Mailbox<P::Scheme, V>, Height) {
163+
) -> (Self, Mailbox<P::Scheme, V>, Option<Height>) {
164164
// Initialize cache
165165
let prunable_config = cache::Config {
166166
partition_prefix: format!("{}-cache", config.partition_prefix),
@@ -178,7 +178,8 @@ where
178178
.await;
179179

180180
let stream = Stream::new(context.child("stream"), &config.partition_prefix).await;
181-
let last_processed_height = stream.processed_height().unwrap_or_else(Height::zero);
181+
let last_processed_height = stream.processed_height();
182+
let processed_height_floor = last_processed_height.unwrap_or_else(Height::zero);
182183

183184
// Genesis is a local anchor. A floor finalization is verified and
184185
// resolved after `run` receives the resolver and buffer.
@@ -201,11 +202,13 @@ where
201202
// Create metrics
202203
let finalized_height = context.gauge("finalized_height", "Finalized height of application");
203204
let processed_height = context.gauge("processed_height", "Processed height of application");
204-
let _ = processed_height.try_set(last_processed_height.get());
205+
if let Some(last_processed_height) = last_processed_height {
206+
let _ = processed_height.try_set(last_processed_height.get());
207+
}
205208
let floor = pending_floor_anchor.map_or_else(
206-
|| Floor::resolved(last_processed_height, last_processed_round),
209+
|| Floor::resolved(processed_height_floor, last_processed_round),
207210
|finalization| {
208-
Floor::awaiting_anchor(last_processed_height, last_processed_round, finalization)
211+
Floor::awaiting_anchor(processed_height_floor, last_processed_round, finalization)
209212
},
210213
);
211214

@@ -241,7 +244,7 @@ where
241244
async fn ensure_genesis_anchor(
242245
finalized_blocks: &mut FB,
243246
anchor: V::Block,
244-
last_processed_height: Height,
247+
last_processed_height: Option<Height>,
245248
) {
246249
let anchor_height = anchor.height();
247250
let anchor_commitment = V::commitment(&anchor);
@@ -262,10 +265,12 @@ where
262265
);
263266
}
264267
Ok(None) => {
265-
if anchor_height < last_processed_height {
268+
if let Some(existing) =
269+
last_processed_height.filter(|height| anchor_height < *height)
270+
{
266271
warn!(
267272
height = %anchor_height,
268-
existing = %last_processed_height,
273+
%existing,
269274
"ignoring stale anchor"
270275
);
271276
return;
@@ -675,7 +680,7 @@ where
675680
response.send_lossy(finalization);
676681
}
677682
Message::GetProcessedHeight { response } => {
678-
response.send_lossy(self.floor.processed_height());
683+
response.send_lossy(self.stream.processed_height());
679684
}
680685
Message::HintFinalized { height, targets } => {
681686
// Skip if finalization is already available locally.
@@ -2034,7 +2039,13 @@ where
20342039
}
20352040

20362041
/// Returns the latest known finalization round at or below the processed height.
2037-
async fn latest_processed_round(finalizations_by_height: &FC, height: Height) -> Round {
2042+
async fn latest_processed_round(
2043+
finalizations_by_height: &FC,
2044+
height: Option<Height>,
2045+
) -> Round {
2046+
let Some(height) = height else {
2047+
return Round::zero();
2048+
};
20382049
let Some(finalization_height) = finalizations_by_height
20392050
.ranges_from(Height::zero())
20402051
.filter_map(|(start, end)| (start <= height).then_some(end.min(height)))

consensus/src/marshal/core/mailbox.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ pub(crate) enum Message<S: Scheme, V: Variant> {
5151
/// A request to retrieve the latest processed height.
5252
GetProcessedHeight {
5353
/// A channel to send the latest processed height.
54-
response: oneshot::Sender<Height>,
54+
response: oneshot::Sender<Option<Height>>,
5555
},
5656
/// A hint that a finalized block may be available at a given height.
5757
///
@@ -590,7 +590,7 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
590590
let _ = self
591591
.sender
592592
.enqueue(Message::GetProcessedHeight { response });
593-
receiver.await.ok()
593+
receiver.await.ok().flatten()
594594
}
595595

596596
/// Hints that a finalized block may be available at the given height.

consensus/src/marshal/core/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
//! blocks_archive,
2929
//! config,
3030
//! ).await;
31+
//! // `last_height` is `None` until the application acknowledges a block.
3132
//!
3233
//! // Start with application and buffer
3334
//! actor.start(application, Some(buffer), resolver);

consensus/src/marshal/mocks/harness.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ pub struct ValidatorSetup<H: TestHarness> {
168168
pub application: Application<H::ApplicationBlock>,
169169
pub mailbox: Mailbox<S, H::Variant>,
170170
pub extra: H::ValidatorExtra,
171-
pub height: Height,
171+
pub height: Option<Height>,
172172
pub actor_handle: commonware_runtime::Handle<()>,
173173
}
174174

@@ -839,7 +839,7 @@ pub fn hailstorm<H: TestHarness>(
839839
.await;
840840
assert_eq!(
841841
restarted.height,
842-
Height::new(persisted_height),
842+
Some(Height::new(persisted_height)),
843843
"validator {idx} should recover its persisted finalized height before replay"
844844
);
845845

@@ -3170,7 +3170,7 @@ pub fn ack_pipeline_backlog_persists_on_restart<H: TestHarness>() {
31703170
Application::manual_ack(),
31713171
)
31723172
.await;
3173-
assert_eq!(restart.height, Height::new(3));
3173+
assert_eq!(restart.height, Some(Height::new(3)));
31743174
});
31753175
}
31763176

@@ -3200,8 +3200,14 @@ pub fn genesis_emitted_once<H: TestHarness>() {
32003200
Application::<H::ApplicationBlock>::manual_ack(),
32013201
)
32023202
.await;
3203-
assert_eq!(setup.height, Height::zero());
3203+
assert_eq!(setup.height, None);
3204+
assert_eq!(setup.mailbox.get_processed_height().await, None);
32043205
assert_eq!(setup.application.acknowledged().await, Height::zero());
3206+
context.sleep(Duration::from_millis(10)).await;
3207+
assert_eq!(
3208+
setup.mailbox.get_processed_height().await,
3209+
Some(Height::zero())
3210+
);
32053211
assert!(setup.application.blocks().contains_key(&Height::zero()));
32063212

32073213
// Let marshal persist the height-zero acknowledgement before restart.
@@ -3222,7 +3228,7 @@ pub fn genesis_emitted_once<H: TestHarness>() {
32223228
Application::<H::ApplicationBlock>::manual_ack(),
32233229
)
32243230
.await;
3225-
assert_eq!(restart.height, Height::zero());
3231+
assert_eq!(restart.height, Some(Height::zero()));
32263232
context.sleep(Duration::from_millis(100)).await;
32273233
assert!(
32283234
restart.application.blocks().is_empty(),
@@ -5083,8 +5089,8 @@ pub fn init_processed_height<H: TestHarness>() {
50835089
};
50845090
let initial_height = setup.height;
50855091

5086-
// Initially should be zero (no blocks processed)
5087-
assert_eq!(initial_height, Height::zero());
5092+
// Initially no block has been durably acknowledged.
5093+
assert_eq!(initial_height, None);
50885094

50895095
// Finalize blocks 1-5
50905096
let mut parent = Sha256::hash(b"");
@@ -5137,7 +5143,7 @@ pub fn init_processed_height<H: TestHarness>() {
51375143
let recovered_height = setup2.height;
51385144

51395145
// Should have recovered to height 5
5140-
assert_eq!(recovered_height, Height::new(5));
5146+
assert_eq!(recovered_height, Some(Height::new(5)));
51415147
})
51425148
}
51435149

0 commit comments

Comments
 (0)