Skip to content

Commit cb882d2

Browse files
nits
1 parent da1717e commit cb882d2

6 files changed

Lines changed: 183 additions & 32 deletions

File tree

consensus/src/marshal/coding/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -779,6 +779,11 @@ mod tests {
779779
harness::ack_pipeline_backlog_persists_on_restart::<CodingHarness>();
780780
}
781781

782+
#[test_traced("WARN")]
783+
fn test_coding_genesis_emitted_once() {
784+
harness::genesis_emitted_once::<CodingHarness>();
785+
}
786+
782787
#[test_traced("WARN")]
783788
fn test_coding_proposed_success_implies_recoverable_after_restart() {
784789
harness::proposed_success_implies_recoverable_after_restart::<CodingHarness>(0..16);

consensus/src/marshal/config.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ use std::num::{NonZeroU64, NonZeroUsize};
1717
/// supersedes the configured anchor.
1818
pub enum Start<S: Scheme, C: Digest, B> {
1919
/// Start from the height-zero genesis block.
20+
///
21+
/// Genesis is emitted to the application unless durable metadata shows it
22+
/// was already acknowledged.
2023
Genesis(B),
2124
/// Start from a finalized commitment, delivering the floor block to the
2225
/// application if it has not already been acknowledged.

consensus/src/marshal/core/acks.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,12 +62,12 @@ impl<V: Variant, A: Acknowledgement> PendingAcks<V, A> {
6262
}
6363

6464
/// Returns the next height to dispatch while preserving sequential order.
65-
pub(super) fn next_dispatch_height(&self, last_processed_height: Height) -> Height {
65+
pub(super) fn next_dispatch_height(&self, start_height: Height) -> Height {
6666
self.queue
6767
.back()
6868
.map(|ack| ack.height.next())
6969
.or_else(|| self.current.as_ref().map(|ack| ack.height.next()))
70-
.unwrap_or_else(|| last_processed_height.next())
70+
.unwrap_or(start_height)
7171
}
7272

7373
/// Enqueues a newly dispatched ack, arming it immediately when idle.
@@ -136,18 +136,18 @@ mod tests {
136136
fn enqueue_tracks_capacity_and_fifo_ready_order() {
137137
let mut pending = PendingAcks::<TestVariant, Exact>::new(2);
138138
assert!(pending.has_capacity());
139-
assert_eq!(pending.next_dispatch_height(Height::new(7)), Height::new(8));
139+
assert_eq!(pending.next_dispatch_height(Height::new(8)), Height::new(8));
140140

141141
let (first, first_ack) = pending_ack(8, 1);
142142
pending.enqueue(first);
143143
assert!(pending.has_capacity());
144-
assert_eq!(pending.next_dispatch_height(Height::new(7)), Height::new(9));
144+
assert_eq!(pending.next_dispatch_height(Height::new(8)), Height::new(9));
145145

146146
let (second, second_ack) = pending_ack(9, 2);
147147
pending.enqueue(second);
148148
assert!(!pending.has_capacity());
149149
assert_eq!(
150-
pending.next_dispatch_height(Height::new(7)),
150+
pending.next_dispatch_height(Height::new(8)),
151151
Height::new(10)
152152
);
153153

@@ -185,7 +185,7 @@ mod tests {
185185
assert!(pending.pop_ready().is_none());
186186
assert!(pending.has_capacity());
187187
assert_eq!(
188-
pending.next_dispatch_height(Height::new(9)),
188+
pending.next_dispatch_height(Height::new(10)),
189189
Height::new(10)
190190
);
191191
}

consensus/src/marshal/core/actor.rs

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,40 @@ struct ResolverDelivery<V: Variant> {
6868
response: oneshot::Sender<bool>,
6969
}
7070

71+
/// Last block acknowledged by the application.
72+
#[derive(Clone, Copy)]
73+
enum ApplicationFloor {
74+
BeforeGenesis,
75+
Acknowledged(Height),
76+
}
77+
78+
impl ApplicationFloor {
79+
const fn from_metadata(height: Option<Height>) -> Self {
80+
match height {
81+
Some(height) => Self::Acknowledged(height),
82+
None => Self::BeforeGenesis,
83+
}
84+
}
85+
86+
const fn processed_height(self) -> Height {
87+
match self {
88+
Self::BeforeGenesis => Height::zero(),
89+
Self::Acknowledged(height) => height,
90+
}
91+
}
92+
93+
fn next_dispatch_height(self) -> Height {
94+
match self {
95+
Self::BeforeGenesis => Height::zero(),
96+
Self::Acknowledged(height) => height.next(),
97+
}
98+
}
99+
100+
const fn acknowledge(&mut self, height: Height) {
101+
*self = Self::Acknowledged(height);
102+
}
103+
}
104+
71105
/// The [Actor] is responsible for receiving uncertified blocks from the broadcast mechanism,
72106
/// receiving notarizations and finalizations from consensus, and reconstructing a total order
73107
/// of blocks.
@@ -121,6 +155,8 @@ where
121155
last_proposed_block: Option<(Round, V::Commitment, V::Block)>,
122156
// Current processed floor and any pending floor update
123157
floor: Floor<P::Scheme, V::Commitment>,
158+
// Application delivery cursor
159+
application_floor: ApplicationFloor,
124160
// Pending application acknowledgements
125161
pending_acks: PendingAcks<V, A>,
126162
// Highest known finalized height
@@ -193,10 +229,9 @@ where
193229
)
194230
.await
195231
.expect("failed to initialize application metadata");
196-
let last_processed_height = application_metadata
197-
.get(&LATEST_KEY)
198-
.copied()
199-
.unwrap_or(Height::zero());
232+
let application_floor =
233+
ApplicationFloor::from_metadata(application_metadata.get(&LATEST_KEY).copied());
234+
let last_processed_height = application_floor.processed_height();
200235

201236
// Genesis is a local anchor. A floor finalization is verified and
202237
// resolved after `run` receives the resolver and buffer.
@@ -245,6 +280,7 @@ where
245280
strategy: config.strategy,
246281
last_proposed_block: None,
247282
floor,
283+
application_floor,
248284
pending_acks: PendingAcks::new(config.max_pending_acks.get()),
249285
tip: Height::zero(),
250286
block_subscriptions: Subscriptions::new(),
@@ -1586,7 +1622,7 @@ where
15861622
while self.pending_acks.has_capacity() {
15871623
let next_height = self
15881624
.pending_acks
1589-
.next_dispatch_height(self.floor.processed_height());
1625+
.next_dispatch_height(self.application_floor.next_dispatch_height());
15901626
let Some(block) = self.get_finalized_block(next_height).await else {
15911627
return;
15921628
};
@@ -2042,6 +2078,7 @@ where
20422078
PublicKey = <P::Scheme as CertificateScheme>::PublicKey,
20432079
>,
20442080
) {
2081+
self.application_floor.acknowledge(height);
20452082
self.application_metadata.put(LATEST_KEY, height);
20462083
self.floor.set_processed_height(height);
20472084
let _ = self

consensus/src/marshal/mocks/harness.rs

Lines changed: 82 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1323,7 +1323,7 @@ pub fn certify_at_later_view_survives_earlier_view_pruning<H: TestHarness>() {
13231323
parent = digest;
13241324
parent_commitment = commitment;
13251325
}
1326-
while (application.blocks().len() as u64) < CHAIN_LEN {
1326+
while application.tip().map(|(height, _)| height) != Some(Height::new(CHAIN_LEN)) {
13271327
context.sleep(Duration::from_millis(10)).await;
13281328
}
13291329
context.sleep(Duration::from_millis(100)).await;
@@ -1491,13 +1491,18 @@ pub fn delivery_visibility_implies_recoverable_after_restart<H: TestHarness>(
14911491
H::verify(&mut handle, round, &block, &mut peers).await;
14921492
H::report_finalization(&mut mailbox, finalization.clone()).await;
14931493

1494-
let height = application.acknowledged().await;
1495-
assert_eq!(
1496-
height,
1497-
Height::new(1),
1498-
"expected the first delivered finalized block to become visible at height 1 \
1499-
before restart (seed={seed})"
1500-
);
1494+
loop {
1495+
let height = application.acknowledged().await;
1496+
if height == Height::new(1) {
1497+
break;
1498+
}
1499+
assert_eq!(
1500+
height,
1501+
Height::zero(),
1502+
"only genesis may precede the first finalized block before restart \
1503+
(seed={seed})"
1504+
);
1505+
}
15011506
}
15021507
});
15031508

@@ -2925,7 +2930,7 @@ pub fn finalize<H: TestHarness>(seed: u64, link: Link, quorum_sees_finalization:
29252930
}
29262931
finished = true;
29272932
for app in applications.values() {
2928-
if app.blocks().len() != NUM_BLOCKS as usize {
2933+
if app.blocks().len() != (NUM_BLOCKS + 1) as usize {
29292934
finished = false;
29302935
break;
29312936
}
@@ -2981,6 +2986,7 @@ pub fn ack_pipeline_backlog<H: TestHarness>() {
29812986
extra: setup.extra,
29822987
}];
29832988
let mut handle = handles[0].clone();
2989+
assert_eq!(application.acknowledged().await, Height::zero());
29842990

29852991
let epocher = FixedEpocher::new(BLOCKS_PER_EPOCH);
29862992
let mut parent = Sha256::hash(b"");
@@ -3077,6 +3083,7 @@ pub fn ack_pipeline_backlog_persists_on_restart<H: TestHarness>() {
30773083
extra: setup.extra,
30783084
}];
30793085
let mut handle = handles[0].clone();
3086+
assert_eq!(application.acknowledged().await, Height::zero());
30803087

30813088
let epocher = FixedEpocher::new(BLOCKS_PER_EPOCH);
30823089
let mut parent = Sha256::hash(b"");
@@ -3145,6 +3152,67 @@ pub fn ack_pipeline_backlog_persists_on_restart<H: TestHarness>() {
31453152
});
31463153
}
31473154

3155+
/// Test that genesis is delivered once and then suppressed after its ack is durable.
3156+
pub fn genesis_emitted_once<H: TestHarness>() {
3157+
let runner = deterministic::Runner::timed(Duration::from_secs(60));
3158+
runner.start(|mut context| async move {
3159+
let Fixture {
3160+
participants,
3161+
schemes,
3162+
..
3163+
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
3164+
let mut oracle = setup_network_with_participants(
3165+
context.child("network"),
3166+
NZUsize!(1),
3167+
participants.clone(),
3168+
)
3169+
.await;
3170+
3171+
let validator = participants[0].clone();
3172+
let setup = H::setup_validator_with(
3173+
context.child("validator").with_attribute("index", 0),
3174+
&mut oracle,
3175+
validator.clone(),
3176+
ConstantProvider::new(schemes[0].clone()),
3177+
NZUsize!(1),
3178+
Application::<H::ApplicationBlock>::manual_ack(),
3179+
)
3180+
.await;
3181+
assert_eq!(setup.height, Height::zero());
3182+
assert_eq!(setup.application.acknowledged().await, Height::zero());
3183+
assert!(setup.application.blocks().contains_key(&Height::zero()));
3184+
3185+
// Let marshal persist the height-zero acknowledgement before restart.
3186+
context.sleep(Duration::from_millis(10)).await;
3187+
setup.actor_handle.abort();
3188+
drop(setup.mailbox);
3189+
drop(setup.extra);
3190+
context.sleep(Duration::from_millis(10)).await;
3191+
3192+
let restart = H::setup_validator_with(
3193+
context
3194+
.child("validator_restart")
3195+
.with_attribute("index", 0),
3196+
&mut oracle,
3197+
validator,
3198+
ConstantProvider::new(schemes[0].clone()),
3199+
NZUsize!(1),
3200+
Application::<H::ApplicationBlock>::manual_ack(),
3201+
)
3202+
.await;
3203+
assert_eq!(restart.height, Height::zero());
3204+
context.sleep(Duration::from_millis(100)).await;
3205+
assert!(
3206+
restart.application.blocks().is_empty(),
3207+
"genesis should not be emitted again after height zero is acknowledged"
3208+
);
3209+
assert!(
3210+
restart.application.pending_ack_heights().is_empty(),
3211+
"restart should not leave a duplicate genesis ack pending"
3212+
);
3213+
});
3214+
}
3215+
31483216
/// Test sync height floor.
31493217
pub fn sync_height_floor<H: TestHarness>() {
31503218
let runner = deterministic::Runner::new(
@@ -3240,7 +3308,7 @@ pub fn sync_height_floor<H: TestHarness>() {
32403308
context.sleep(Duration::from_secs(1)).await;
32413309
finished = true;
32423310
for app in applications.values().skip(1) {
3243-
if app.blocks().len() != NUM_BLOCKS as usize {
3311+
if app.blocks().len() != (NUM_BLOCKS + 1) as usize {
32443312
finished = false;
32453313
break;
32463314
}
@@ -3289,7 +3357,7 @@ pub fn sync_height_floor<H: TestHarness>() {
32893357
while !finished {
32903358
context.sleep(Duration::from_secs(1)).await;
32913359
finished = true;
3292-
if app.blocks().len() != (NUM_BLOCKS - NEW_SYNC_FLOOR + 1) as usize {
3360+
if app.blocks().len() != (NUM_BLOCKS - NEW_SYNC_FLOOR + 2) as usize {
32933361
finished = false;
32943362
continue;
32953363
}
@@ -3393,7 +3461,7 @@ pub fn prune_finalized_archives<H: TestHarness>() {
33933461
H::report_finalization(&mut mailbox, finalization).await;
33943462
}
33953463

3396-
while application.blocks().len() < 20 {
3464+
while application.tip().map(|(height, _)| height) != Some(Height::new(20)) {
33973465
context.sleep(Duration::from_millis(10)).await;
33983466
}
33993467

@@ -4712,7 +4780,7 @@ pub fn hint_finalized_triggers_fetch<H: TestHarness>() {
47124780
}
47134781

47144782
// Wait for validator 0 to process all blocks
4715-
while app0.blocks().len() < 5 {
4783+
while app0.tip().map(|(height, _)| height) != Some(Height::new(5)) {
47164784
context.sleep(Duration::from_millis(10)).await;
47174785
}
47184786

@@ -5027,7 +5095,7 @@ pub fn init_processed_height<H: TestHarness>() {
50275095
}
50285096

50295097
// Wait for application to process all blocks
5030-
while app.blocks().len() < 5 {
5098+
while app.tip().map(|(height, _)| height) != Some(Height::new(5)) {
50315099
context.sleep(Duration::from_millis(10)).await;
50325100
}
50335101

0 commit comments

Comments
 (0)