Skip to content

Commit ef7d41a

Browse files
more
1 parent 31ef410 commit ef7d41a

2 files changed

Lines changed: 119 additions & 6 deletions

File tree

consensus/src/marshal/core/actor.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use crate::{
1818
scheme::Scheme,
1919
types::{verify_certificates, Finalization, Notarization, Subject},
2020
},
21-
types::{Epoch, Epocher, Height, Round, ViewDelta},
21+
types::{Epoch, Epocher, Height, Round, View, ViewDelta},
2222
Block, Epochable, Heightable, Reporter,
2323
};
2424
use bytes::Bytes;
@@ -54,7 +54,11 @@ use std::{collections::BTreeMap, future::Future, num::NonZeroUsize, sync::Arc};
5454
use tracing::{debug, warn};
5555

5656
/// The key used to store the last application-acknowledged height.
57-
const LATEST_KEY: U64 = U64::new(0xFF);
57+
const LATEST_HEIGHT_KEY: U64 = U64::new(0xFF);
58+
/// The key used to store the epoch component of the processed round floor.
59+
const LATEST_ROUND_EPOCH_KEY: U64 = U64::new(0xFE);
60+
/// The key used to store the view component of the processed round floor.
61+
const LATEST_ROUND_VIEW_KEY: U64 = U64::new(0xFD);
5862

5963
// Resolver request keys are expressed in the variant commitment type, which
6064
// may differ from the block digest for coded variants.
@@ -195,7 +199,7 @@ where
195199
)
196200
.await
197201
.expect("failed to initialize application metadata");
198-
let last_applied_height = application_metadata.get(&LATEST_KEY).copied();
202+
let last_applied_height = application_metadata.get(&LATEST_HEIGHT_KEY).copied();
199203
let processed_floor_height = last_applied_height.unwrap_or(Height::zero());
200204

201205
// Genesis is seeded as a local anchor. If the application has not
@@ -215,8 +219,11 @@ where
215219
}
216220
Start::Floor(finalization) => Some(finalization),
217221
};
218-
let last_processed_round =
222+
let height_derived_processed_round =
219223
Self::latest_processed_round(&finalizations_by_height, processed_floor_height).await;
224+
let last_processed_round = Self::metadata_round_floor(&application_metadata)
225+
.unwrap_or(Round::zero())
226+
.max(height_derived_processed_round);
220227

221228
// Create metrics
222229
let finalized_height = context.gauge("finalized_height", "Finalized height of application");
@@ -485,7 +492,7 @@ where
485492
}
486493
};
487494

488-
// Persist buffered processed-height updates once after draining all ready acks.
495+
// Persist buffered progress updates once after draining all ready acks.
489496
self.application_metadata
490497
.sync()
491498
.await
@@ -2042,7 +2049,7 @@ where
20422049
>,
20432050
) {
20442051
self.last_applied_height = Some(height);
2045-
self.application_metadata.put(LATEST_KEY, height);
2052+
self.application_metadata.put(LATEST_HEIGHT_KEY, height);
20462053
self.floor.set_processed_height(height);
20472054
let _ = self
20482055
.processed_height
@@ -2052,6 +2059,13 @@ where
20522059
resolver.retain(handler::above_height_floor::<V::Commitment>(height));
20532060
}
20542061

2062+
/// Returns the durably stored processed round floor, if present.
2063+
fn metadata_round_floor(metadata: &Metadata<E, U64, Height>) -> Option<Round> {
2064+
let epoch = metadata.get(&LATEST_ROUND_EPOCH_KEY)?;
2065+
let view = metadata.get(&LATEST_ROUND_VIEW_KEY)?;
2066+
Some(Round::new(Epoch::new(epoch.get()), View::new(view.get())))
2067+
}
2068+
20552069
/// Returns the latest known finalization round at or below the processed height.
20562070
async fn latest_processed_round(finalizations_by_height: &FC, height: Height) -> Round {
20572071
let Some(finalization_height) = finalizations_by_height
@@ -2106,6 +2120,10 @@ where
21062120

21072121
let previous = self.floor.processed_round();
21082122
self.floor.set_processed_round(round);
2123+
self.application_metadata
2124+
.put(LATEST_ROUND_EPOCH_KEY, Height::new(round.epoch().get()));
2125+
self.application_metadata
2126+
.put(LATEST_ROUND_VIEW_KEY, Height::new(round.view().get()));
21092127

21102128
// Retain view-indexed cache data for a window behind the previously
21112129
// processed finalized block.

consensus/src/marshal/standard/mod.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4851,6 +4851,101 @@ mod tests {
48514851
});
48524852
}
48534853

4854+
#[test_traced("WARN")]
4855+
fn test_standard_floor_round_restored_before_anchor_ack() {
4856+
let runner = deterministic::Runner::timed(Duration::from_secs(30));
4857+
runner.start(|mut context| async move {
4858+
let Fixture { schemes, .. } =
4859+
bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
4860+
let partition_prefix = "floor-round-before-anchor-ack";
4861+
4862+
let floor_round = Round::new(Epoch::zero(), View::new(5));
4863+
let floor_block = make_raw_block(Sha256::hash(b"floor-parent"), Height::new(5), 500);
4864+
let floor_finalization = StandardHarness::make_finalization(
4865+
Proposal::new(
4866+
floor_round,
4867+
View::new(4),
4868+
StandardHarness::commitment(&floor_block),
4869+
),
4870+
&schemes,
4871+
QUORUM,
4872+
);
4873+
4874+
let (application, started) = HoldingBlockReporter::new_after(Height::zero());
4875+
let (mailbox, buffer, resolver, actor_handle) = start_standard_actor(
4876+
context.child("validator").with_attribute("index", 0),
4877+
partition_prefix,
4878+
ConstantProvider::new(schemes[0].clone()),
4879+
application,
4880+
Some(RecordingBuffer::default()),
4881+
Start::Genesis(StandardHarness::genesis_block(NUM_VALIDATORS as u16)),
4882+
)
4883+
.await;
4884+
4885+
assert!(mailbox.verified(floor_round, floor_block.clone()).await);
4886+
mailbox.set_floor(floor_finalization);
4887+
select! {
4888+
height = started => {
4889+
assert_eq!(
4890+
height.expect("floor anchor delivery signal missing"),
4891+
Height::new(5),
4892+
"floor anchor should be delivered before the simulated crash"
4893+
);
4894+
},
4895+
_ = context.sleep(Duration::from_secs(5)) => {
4896+
panic!("floor anchor was not delivered before restart");
4897+
},
4898+
}
4899+
4900+
actor_handle.abort();
4901+
drop(mailbox);
4902+
drop(buffer);
4903+
drop(resolver);
4904+
context.sleep(Duration::from_millis(1)).await;
4905+
4906+
let (mailbox, _buffer, resolver, _actor_handle) = start_standard_actor(
4907+
context
4908+
.child("validator_restart")
4909+
.with_attribute("index", 0),
4910+
partition_prefix,
4911+
ConstantProvider::new(schemes[0].clone()),
4912+
Application::<B>::manual_ack(),
4913+
Some(RecordingBuffer::default()),
4914+
Start::Genesis(StandardHarness::genesis_block(NUM_VALIDATORS as u16)),
4915+
)
4916+
.await;
4917+
4918+
let fetches_before = resolver.fetches().len();
4919+
let subscription = mailbox.subscribe_by_commitment(
4920+
Sha256::hash(b"missing-at-restored-floor-round"),
4921+
CommitmentFallback::FetchByRound { round: floor_round },
4922+
);
4923+
let barrier = make_raw_block(floor_block.digest(), Height::new(6), 600);
4924+
assert!(
4925+
mailbox
4926+
.verified(Round::new(Epoch::zero(), View::new(6)), barrier)
4927+
.await,
4928+
"barrier verification should be processed"
4929+
);
4930+
assert_eq!(
4931+
resolver.fetches().len(),
4932+
fetches_before,
4933+
"restart must restore the floor round before the anchor is acknowledged"
4934+
);
4935+
select! {
4936+
result = subscription => {
4937+
assert!(
4938+
result.is_err(),
4939+
"floor-round subscription should be canceled after restart"
4940+
);
4941+
},
4942+
_ = context.sleep(Duration::from_secs(5)) => {
4943+
panic!("floor-round subscription remained open after restart");
4944+
},
4945+
}
4946+
});
4947+
}
4948+
48544949
#[test_traced("WARN")]
48554950
fn test_standard_set_floor_prunes_round_bound_fetches() {
48564951
let runner = deterministic::Runner::timed(Duration::from_secs(30));

0 commit comments

Comments
 (0)