Skip to content

Commit a07ee17

Browse files
committed
[consensus/simplex] infer ancestor certifications from notarization
When a notarization for view V arrives, the f+1 signers must have certified V's parent before voting. Walk the ancestor chain and infer certifications for any ancestors that are in Ready state, notifying the resolver and reporter for each. Fixes sync ordering (single sync_all rather than one fsync per ancestor), avoids partial state mutation in certified, and handles replay safely: inference is skipped during journal replay and deferred to a post-replay pass once all Artifact::Certification entries are applied, preventing a panic when an ancestor was certified false by the automaton.
1 parent 5cce12d commit a07ee17

4 files changed

Lines changed: 520 additions & 19 deletions

File tree

consensus/src/simplex/actors/voter/actor.rs

Lines changed: 91 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,29 @@ impl<
251251
}
252252
}
253253

254+
/// Syncs all open journal sections.
255+
async fn sync_all_journal(&mut self) {
256+
if let Some(journal) = self.journal.as_mut() {
257+
journal.sync_all().await.expect("unable to sync journal");
258+
}
259+
}
260+
261+
/// Records a notarization during journal replay without running ancestor inference.
262+
///
263+
/// Inference is deferred to a post-replay pass (see `run`) so that
264+
/// Artifact::Certification entries already in the journal are applied before any
265+
/// inference fires. Running inference here would certify ancestors as true before
266+
/// their journal entry (which may say false) is replayed, causing a conflict panic.
267+
async fn record_notarization(&mut self, notarization: Notarization<S, D>) {
268+
let view = notarization.view();
269+
let artifact = Artifact::Notarization(notarization.clone());
270+
let (added, equivocator) = self.state.add_notarization(notarization);
271+
if added {
272+
self.append_journal(view, artifact).await;
273+
}
274+
self.block_equivocator(equivocator).await;
275+
}
276+
254277
/// Send a vote to every peer.
255278
async fn broadcast_vote<T: Sender>(
256279
&mut self,
@@ -414,33 +437,57 @@ impl<
414437
}
415438

416439
/// Records a notarization certificate and blocks any equivocating leader.
417-
async fn handle_notarization(&mut self, notarization: Notarization<S, D>) {
440+
/// Returns the notarizations of any ancestors inferred as certified so callers
441+
/// can forward them to the resolver and reporter.
442+
async fn handle_notarization(
443+
&mut self,
444+
notarization: Notarization<S, D>,
445+
) -> Vec<Notarization<S, D>> {
418446
let view = notarization.view();
419447
let artifact = Artifact::Notarization(notarization.clone());
420448
let (added, equivocator) = self.state.add_notarization(notarization);
449+
let mut inferred = Vec::new();
421450
if added {
422451
self.append_journal(view, artifact).await;
452+
// The f+1 signers of `view` must have certified the parent before voting,
453+
// and by the same logic the grandparent must have been certified before
454+
// the parent was proposed. Walk the ancestor chain until we reach a view
455+
// that is already certified, finalized, or has no notarization.
456+
let mut current = view;
457+
while let Some(ancestor) = self.state.infer_parent(current) {
458+
if let Some(n) = self.handle_certification(ancestor, true).await {
459+
inferred.push(n);
460+
}
461+
current = ancestor;
462+
}
463+
// Sync the notarization section first so it is durable before any ancestor
464+
// certification sections. This ordering ensures crash recovery always sees the
465+
// notarization that triggered the inferred certifications.
466+
self.sync_journal(view).await;
467+
if !inferred.is_empty() {
468+
// Sync all ancestor certification sections concurrently in a single call
469+
// rather than one fsync per ancestor.
470+
self.sync_all_journal().await;
471+
}
423472
}
424473
self.block_equivocator(equivocator).await;
474+
inferred
425475
}
426476

427477
/// Handles the certification of a proposal.
428478
///
429479
/// The certification may succeed, in which case the proposal can be used in future views—
430480
/// or fail, in which case we should nullify the view as fast as possible.
481+
///
482+
/// Appends to the journal but does not sync; callers are responsible for syncing.
431483
async fn handle_certification(
432484
&mut self,
433485
view: View,
434486
success: bool,
435487
) -> Option<Notarization<S, D>> {
436-
// Get the notarization before advancing state
437488
let notarization = self.state.certified(view, success)?;
438-
439-
// Persist certification result for recovery
440489
let artifact = Artifact::Certification(Rnd::new(self.state.epoch(), view), success);
441490
self.append_journal(view, artifact.clone()).await;
442-
self.sync_journal(view).await;
443-
444491
Some(notarization)
445492
}
446493

@@ -515,9 +562,13 @@ impl<
515562
.await;
516563
}
517564
// Update our local round with the certificate.
518-
self.handle_notarization(notarization.clone()).await;
519-
// Persist the certificate before informing others.
520-
self.sync_journal(view).await;
565+
// handle_notarization syncs all written sections before returning.
566+
let inferred = self.handle_notarization(notarization.clone()).await;
567+
for n in inferred {
568+
resolver.updated(Certificate::Notarization(n.clone())).await;
569+
resolver.certified(n.view(), true).await;
570+
self.reporter.report(Activity::Certification(n)).await;
571+
}
521572
// Broadcast the notarization certificate
522573
debug!(proposal=?notarization.proposal, "broadcasting notarization");
523574
self.broadcast_certificate(
@@ -744,7 +795,7 @@ impl<
744795
self.reporter.report(Activity::Notarize(notarize)).await;
745796
}
746797
Artifact::Notarization(notarization) => {
747-
self.handle_notarization(notarization.clone()).await;
798+
self.record_notarization(notarization.clone()).await;
748799
resolver
749800
.updated(Certificate::Notarization(notarization.clone()))
750801
.await;
@@ -796,6 +847,29 @@ impl<
796847
}
797848
self.journal = Some(journal);
798849

850+
// Run one inference pass now that all journal entries are applied and their
851+
// certification states are authoritative. This recovers certifications that
852+
// were inferred during the previous run but whose Artifact::Certification
853+
// journal entries were not synced before a crash.
854+
let infer_views = self.state.views_with_notarization();
855+
let mut any_inferred = false;
856+
for view in infer_views.into_iter().rev() {
857+
let mut current = view;
858+
while let Some(ancestor) = self.state.infer_parent(current) {
859+
let Some(n) = self.handle_certification(ancestor, true).await else {
860+
break;
861+
};
862+
resolver.updated(Certificate::Notarization(n.clone())).await;
863+
resolver.certified(n.view(), true).await;
864+
self.reporter.report(Activity::Certification(n)).await;
865+
any_inferred = true;
866+
current = ancestor;
867+
}
868+
}
869+
if any_inferred {
870+
self.sync_all_journal().await;
871+
}
872+
799873
// Log current view after recovery
800874
let end = self.context.current();
801875
let elapsed = end.duration_since(start).unwrap_or_default();
@@ -957,6 +1031,7 @@ impl<
9571031
else {
9581032
continue;
9591033
};
1034+
self.sync_journal(view).await;
9601035
// Always forward certification outcomes to resolver.
9611036
// This can happen after a nullification for the same view because
9621037
// certification is asynchronous; finalization is the boundary that
@@ -1005,7 +1080,12 @@ impl<
10051080
match certificate {
10061081
Certificate::Notarization(notarization) => {
10071082
trace!(%view, from_resolver, "received notarization");
1008-
self.handle_notarization(notarization).await;
1083+
let inferred = self.handle_notarization(notarization).await;
1084+
for n in inferred {
1085+
resolver.updated(Certificate::Notarization(n.clone())).await;
1086+
resolver.certified(n.view(), true).await;
1087+
self.reporter.report(Activity::Certification(n)).await;
1088+
}
10091089
if from_resolver {
10101090
resolved = Resolved::Notarization;
10111091
}

consensus/src/simplex/actors/voter/mod.rs

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6834,4 +6834,105 @@ mod tests {
68346834
first_view_progress_without_timeout::<_, _, RoundRobin>(ed25519::fixture);
68356835
first_view_progress_without_timeout::<_, _, RoundRobin>(secp256r1::fixture);
68366836
}
6837+
6838+
/// Verifies that certifying a view sends `resolver.certified(view, true)` and
6839+
/// `Activity::Certification` to the reporter. This covers the notification
6840+
/// infrastructure shared with the infer-path; the state-level infer logic is
6841+
/// exercised by the `infer_parent_*` unit tests in state.rs.
6842+
fn certification_notifies_resolver_and_reporter<S, F, L>(mut fixture: F)
6843+
where
6844+
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
6845+
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
6846+
L: ElectorConfig<S>,
6847+
{
6848+
let n = 5;
6849+
let q = quorum(n);
6850+
let namespace = b"cert_notifies_resolver_reporter".to_vec();
6851+
let executor = deterministic::Runner::timed(Duration::from_secs(10));
6852+
executor.start(|mut context| async move {
6853+
let Fixture {
6854+
participants,
6855+
schemes,
6856+
..
6857+
} = fixture(&mut context, &namespace, n);
6858+
6859+
let oracle =
6860+
start_test_network_with_peers(context.clone(), participants.clone(), false).await;
6861+
6862+
let elector = L::default();
6863+
let (mut mailbox, mut batcher_receiver, mut resolver_receiver, _, reporter) =
6864+
setup_voter(
6865+
&mut context,
6866+
&oracle,
6867+
&participants,
6868+
&schemes,
6869+
elector,
6870+
Duration::from_secs(30),
6871+
Duration::from_secs(30),
6872+
Duration::from_secs(30),
6873+
mocks::application::Certifier::Always,
6874+
)
6875+
.await;
6876+
6877+
// Respond to initial batcher Update so the voter can proceed.
6878+
match batcher_receiver.recv().await.unwrap() {
6879+
batcher::Message::Update {
6880+
current, response, ..
6881+
} => {
6882+
assert_eq!(current, View::new(1));
6883+
response.send(None).unwrap();
6884+
}
6885+
_ => panic!("unexpected batcher message"),
6886+
}
6887+
6888+
// Build a notarization for view 1 (parent = genesis).
6889+
let target_view = View::new(1);
6890+
let payload = Sha256::hash(b"cert_notifies_v1");
6891+
let proposal = Proposal::new(
6892+
Round::new(Epoch::new(333), target_view),
6893+
View::zero(),
6894+
payload,
6895+
);
6896+
let (_, notarization) = build_notarization(&schemes, &proposal, q);
6897+
mailbox
6898+
.recovered(Certificate::Notarization(notarization.clone()))
6899+
.await;
6900+
6901+
// Wait for Certified { view: 1, success: true } from the resolver.
6902+
// The voter also sends Certificate(Notarization) from try_broadcast_notarization
6903+
// before the automaton responds; those are ignored here.
6904+
loop {
6905+
match resolver_receiver.recv().await.unwrap() {
6906+
MailboxMessage::Certified { view, success } if view == target_view => {
6907+
assert!(success, "certification must succeed");
6908+
break;
6909+
}
6910+
_ => continue,
6911+
}
6912+
}
6913+
6914+
// reporter.report runs synchronously before on_end, so reporter is
6915+
// guaranteed to be updated by the time Certified arrives in the channel.
6916+
assert!(reporter.certified.lock().contains(&target_view));
6917+
assert!(reporter.notarizations.lock().contains_key(&target_view));
6918+
});
6919+
}
6920+
6921+
#[test_traced]
6922+
fn test_certification_notifies_resolver_and_reporter() {
6923+
certification_notifies_resolver_and_reporter::<_, _, Random>(
6924+
bls12381_threshold_vrf::fixture::<MinPk, _>,
6925+
);
6926+
certification_notifies_resolver_and_reporter::<_, _, Random>(
6927+
bls12381_threshold_vrf::fixture::<MinSig, _>,
6928+
);
6929+
certification_notifies_resolver_and_reporter::<_, _, RoundRobin>(
6930+
bls12381_multisig::fixture::<MinPk, _>,
6931+
);
6932+
certification_notifies_resolver_and_reporter::<_, _, RoundRobin>(
6933+
bls12381_multisig::fixture::<MinSig, _>,
6934+
);
6935+
certification_notifies_resolver_and_reporter::<_, _, RoundRobin>(ed25519::fixture);
6936+
certification_notifies_resolver_and_reporter::<_, _, RoundRobin>(secp256r1::fixture);
6937+
}
68376938
}

consensus/src/simplex/actors/voter/round.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,12 @@ impl<S: Scheme, D: Digest> Round<S, D> {
219219
matches!(self.certify, CertifyState::Certified(true))
220220
}
221221

222+
/// Returns true if the round has a notarization and no certification has been
223+
/// initiated or completed.
224+
pub const fn is_certify_ready(&self) -> bool {
225+
matches!(self.certify, CertifyState::Ready) && self.notarization.is_some()
226+
}
227+
222228
/// Returns true if certification was aborted due to finalization.
223229
#[cfg(test)]
224230
pub const fn is_certify_aborted(&self) -> bool {

0 commit comments

Comments
 (0)