Skip to content

Commit d2b918e

Browse files
add verified check on propose
1 parent eca4cdc commit d2b918e

7 files changed

Lines changed: 342 additions & 3 deletions

File tree

consensus/src/marshal/coding/marshaled.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -529,6 +529,28 @@ where
529529
.with_label("propose")
530530
.with_attribute("round", consensus_context.round)
531531
.spawn(move |runtime_context| async move {
532+
// On leader recovery, marshal may already hold a verified
533+
// block for this round (persisted before voting in consensus). Building
534+
// a fresh block would land on the same view index in the
535+
// prunable archive and be silently dropped, so reuse the
536+
// stored block instead.
537+
if let Some(block) = marshal.get_verified(consensus_context.round).await {
538+
let commitment = block.commitment();
539+
let round = consensus_context.round;
540+
{
541+
let mut lock = last_built.lock();
542+
*lock = Some((round, block));
543+
}
544+
let success = tx.send_lossy(commitment);
545+
debug!(
546+
?round,
547+
?commitment,
548+
success,
549+
"reused verified block from marshal on leader recovery"
550+
);
551+
return;
552+
}
553+
532554
let (parent_view, parent_commitment) = consensus_context.parent;
533555
let parent_request = fetch_parent(
534556
parent_commitment,

consensus/src/marshal/coding/mod.rs

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1982,4 +1982,99 @@ mod tests {
19821982
);
19831983
});
19841984
}
1985+
1986+
/// Regression: if marshal already holds a verified block for a round
1987+
/// (say, persisted by a pre-crash propose whose notarize vote never
1988+
/// reached the journal), a restarted leader's `propose` must return
1989+
/// that block's commitment instead of rebuilding. Otherwise the
1990+
/// new block lands on the same view index in the prunable archive,
1991+
/// gets silently dropped (`skip_if_index_exists=true`), and the
1992+
/// leader's notarize targets a commitment no peer can serve.
1993+
#[test_traced("WARN")]
1994+
fn test_propose_reuses_verified_block_on_restart() {
1995+
let runner = deterministic::Runner::timed(Duration::from_secs(60));
1996+
runner.start(|mut context| async move {
1997+
let Fixture {
1998+
participants,
1999+
schemes,
2000+
..
2001+
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
2002+
let mut oracle =
2003+
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
2004+
.await;
2005+
2006+
let me = participants[0].clone();
2007+
let coding_config = coding_config_for_participants(NUM_VALIDATORS as u16);
2008+
2009+
let setup = CodingHarness::setup_validator(
2010+
context.with_label("validator_0"),
2011+
&mut oracle,
2012+
me.clone(),
2013+
ConstantProvider::new(schemes[0].clone()),
2014+
)
2015+
.await;
2016+
let marshal = setup.mailbox;
2017+
let shards = setup.extra;
2018+
2019+
let genesis_ctx = CodingCtx {
2020+
round: Round::zero(),
2021+
leader: default_leader(),
2022+
parent: (View::zero(), genesis_commitment()),
2023+
};
2024+
let genesis = make_coding_block(genesis_ctx, Sha256::hash(b""), Height::zero(), 0);
2025+
let genesis_parent_commitment = genesis_coding_commitment::<Sha256, _>(&genesis);
2026+
2027+
let round = Round::new(Epoch::zero(), View::new(1));
2028+
let ctx = CodingCtx {
2029+
round,
2030+
leader: me.clone(),
2031+
parent: (View::zero(), genesis_parent_commitment),
2032+
};
2033+
2034+
// Seed block A in marshal's verified cache for `round`.
2035+
let block_a = make_coding_block(ctx.clone(), genesis.digest(), Height::new(1), 100);
2036+
let coded_a: CodedBlock<_, ReedSolomon<Sha256>, Sha256> =
2037+
CodedBlock::new(block_a.clone(), coding_config, &Sequential);
2038+
let commitment_a = coded_a.commitment();
2039+
assert!(marshal.proposed(round, coded_a).await);
2040+
2041+
// After restart, a fresh application would build a different
2042+
// block for the same round.
2043+
let block_b = make_coding_block(ctx.clone(), genesis.digest(), Height::new(1), 200);
2044+
let coded_b: CodedBlock<_, ReedSolomon<Sha256>, Sha256> =
2045+
CodedBlock::new(block_b.clone(), coding_config, &Sequential);
2046+
let commitment_b = coded_b.commitment();
2047+
assert_ne!(
2048+
commitment_a, commitment_b,
2049+
"test requires distinct commitments"
2050+
);
2051+
2052+
let mock_app: MockVerifyingApp<CodingB, S> =
2053+
MockVerifyingApp::new(genesis).with_propose_result(block_b);
2054+
let cfg = MarshaledConfig {
2055+
application: mock_app,
2056+
marshal: marshal.clone(),
2057+
shards: shards.clone(),
2058+
scheme_provider: ConstantProvider::new(schemes[0].clone()),
2059+
epocher: FixedEpocher::new(BLOCKS_PER_EPOCH),
2060+
strategy: Sequential,
2061+
};
2062+
let mut marshaled = Marshaled::new(context.clone(), cfg);
2063+
2064+
let commitment = marshaled
2065+
.propose(ctx)
2066+
.await
2067+
.await
2068+
.expect("propose must return a commitment");
2069+
assert_eq!(
2070+
commitment, commitment_a,
2071+
"propose must reuse the block marshal already persisted for this round"
2072+
);
2073+
2074+
assert!(
2075+
marshaled.broadcast(commitment_a, Plan::Propose).await,
2076+
"relay broadcast must succeed after re-propose"
2077+
);
2078+
});
2079+
}
19852080
}

consensus/src/marshal/core/actor.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -521,6 +521,14 @@ where
521521
};
522522
response.send_lossy(info);
523523
}
524+
Message::GetVerified { round, response } => {
525+
let block = self
526+
.cache
527+
.get_verified(round)
528+
.await
529+
.map(Into::into);
530+
response.send_lossy(block);
531+
}
524532
Message::Proposed { round, block, ack } => {
525533
self.cache_verified(round, block.digest(), block.clone())
526534
.await;

consensus/src/marshal/core/cache.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,16 @@ where
371371
.expect("failed to get notarization")
372372
}
373373

374+
/// Get the block previously persisted in the verified archive for `round`.
375+
pub(crate) async fn get_verified(&self, round: Round) -> Option<V::StoredBlock> {
376+
let cache = self.caches.get(&round.epoch())?;
377+
cache
378+
.verified_blocks
379+
.get(Identifier::Index(round.view().get()))
380+
.await
381+
.expect("failed to get verified block")
382+
}
383+
374384
/// Get a finalization from the prunable archive by block digest.
375385
///
376386
/// SAFETY: For blocks/certificates admitted by marshal verification, a block digest

consensus/src/marshal/core/mailbox.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,13 @@ pub(crate) enum Message<S: Scheme, V: Variant> {
8585
/// A channel to send the retrieved block.
8686
response: oneshot::Sender<V::Block>,
8787
},
88+
/// A request to retrieve the verified block previously persisted for `round`.
89+
GetVerified {
90+
/// The round to query.
91+
round: Round,
92+
/// A channel to send the retrieved block, if any.
93+
response: oneshot::Sender<Option<V::Block>>,
94+
},
8895
/// A request to broadcast a proposed block to peers.
8996
Proposed {
9097
/// The round in which the block was proposed.
@@ -296,6 +303,14 @@ impl<S: Scheme, V: Variant> Mailbox<S, V> {
296303
.map(|block| AncestorStream::new(self.clone(), [V::into_inner(block)]))
297304
}
298305

306+
/// Returns the verified block previously persisted for `round`, if any.
307+
pub async fn get_verified(&self, round: Round) -> Option<V::Block> {
308+
self.sender
309+
.request(|response| Message::GetVerified { round, response })
310+
.await
311+
.flatten()
312+
}
313+
299314
/// Requests that a proposed block is sent to peers, awaiting the actor's
300315
/// confirmation that the block has been durably persisted before returning.
301316
#[must_use = "callers must consider block durability before proceeding"]

consensus/src/marshal/standard/deferred.rs

Lines changed: 89 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,28 @@ where
312312
.with_label("propose")
313313
.with_attribute("round", consensus_context.round)
314314
.spawn(move |runtime_context| async move {
315+
// On leader recovery, marshal may already hold a verified
316+
// block for this round (persisted by a pre-crash propose
317+
// whose notarize vote never reached the journal). Building
318+
// a fresh block would land on the same view index in the
319+
// prunable archive and be silently dropped, so reuse the
320+
// stored block instead.
321+
if let Some(block) = marshal.get_verified(consensus_context.round).await {
322+
let digest = block.digest();
323+
{
324+
let mut lock = last_built.lock();
325+
*lock = Some((consensus_context.round, block));
326+
}
327+
let success = tx.send_lossy(digest);
328+
debug!(
329+
round = ?consensus_context.round,
330+
?digest,
331+
success,
332+
"reused verified block from marshal on leader recovery"
333+
);
334+
return;
335+
}
336+
315337
let (parent_view, parent_digest) = consensus_context.parent;
316338
let parent_request = fetch_parent(
317339
parent_digest,
@@ -683,9 +705,9 @@ mod tests {
683705
},
684706
verifying::{GatedVerifyingApp, MockVerifyingApp},
685707
},
686-
simplex::scheme::bls12381_threshold::vrf as bls12381_threshold_vrf,
708+
simplex::{scheme::bls12381_threshold::vrf as bls12381_threshold_vrf, Plan},
687709
types::{Epoch, Epocher, FixedEpocher, Height, Round, View},
688-
Automaton, CertifiableAutomaton,
710+
Automaton, CertifiableAutomaton, Relay,
689711
};
690712
use commonware_broadcast::Broadcaster;
691713
use commonware_cryptography::{
@@ -1101,4 +1123,69 @@ mod tests {
11011123
}
11021124
});
11031125
}
1126+
1127+
/// Regression: when marshal holds a verified block for a round from a
1128+
/// pre-crash propose, a restarted leader's `propose` must return that
1129+
/// block's digest instead of asking the application to build afresh.
1130+
/// See `standard::inline::tests::test_propose_reuses_verified_block_on_restart`.
1131+
#[test_traced("WARN")]
1132+
fn test_propose_reuses_verified_block_on_restart() {
1133+
let runner = deterministic::Runner::timed(Duration::from_secs(30));
1134+
runner.start(|mut context| async move {
1135+
let Fixture {
1136+
participants,
1137+
schemes,
1138+
..
1139+
} = bls12381_threshold_vrf::fixture::<V, _>(&mut context, NAMESPACE, NUM_VALIDATORS);
1140+
let mut oracle =
1141+
setup_network_with_participants(context.clone(), NZUsize!(1), participants.clone())
1142+
.await;
1143+
1144+
let me = participants[0].clone();
1145+
let setup = StandardHarness::setup_validator(
1146+
context.with_label("validator_0"),
1147+
&mut oracle,
1148+
me.clone(),
1149+
ConstantProvider::new(schemes[0].clone()),
1150+
)
1151+
.await;
1152+
let marshal = setup.mailbox;
1153+
1154+
let genesis = make_raw_block(Sha256::hash(b""), Height::zero(), 0);
1155+
let round = Round::new(Epoch::zero(), View::new(1));
1156+
let ctx = Ctx {
1157+
round,
1158+
leader: me.clone(),
1159+
parent: (View::zero(), genesis.digest()),
1160+
};
1161+
let block_a = B::new::<Sha256>(ctx.clone(), genesis.digest(), Height::new(1), 100);
1162+
let digest_a = block_a.digest();
1163+
assert!(marshal.proposed(round, block_a.clone()).await);
1164+
1165+
let block_b = B::new::<Sha256>(ctx.clone(), genesis.digest(), Height::new(1), 200);
1166+
let digest_b = block_b.digest();
1167+
assert_ne!(digest_a, digest_b, "test requires distinct digests");
1168+
1169+
let mock_app: MockVerifyingApp<B, S> =
1170+
MockVerifyingApp::new(genesis.clone()).with_propose_result(block_b);
1171+
let mut marshaled = Deferred::new(
1172+
context.clone(),
1173+
mock_app,
1174+
marshal.clone(),
1175+
FixedEpocher::new(BLOCKS_PER_EPOCH),
1176+
);
1177+
1178+
let digest_rx = marshaled.propose(ctx).await;
1179+
let digest = digest_rx.await.expect("propose must return a digest");
1180+
assert_eq!(
1181+
digest, digest_a,
1182+
"propose must reuse the block marshal already persisted for this round"
1183+
);
1184+
1185+
assert!(
1186+
marshaled.broadcast(digest_a, Plan::Propose).await,
1187+
"relay broadcast must succeed after re-propose"
1188+
);
1189+
});
1190+
}
11041191
}

0 commit comments

Comments
 (0)