Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
135 changes: 95 additions & 40 deletions consensus/src/simplex/actors/voter/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -499,10 +499,10 @@ impl<
certificate_sender: &mut WrappedSender<Sr, Certificate<S, D>>,
view: View,
resolved: Resolved,
) {
) -> Vec<View> {
// Construct a notarization certificate
let Some(notarization) = self.state.broadcast_notarization(view) else {
return;
return Vec::new();
};

// Only the leader sees an unbiased latency sample, so record it now.
Expand All @@ -517,10 +517,11 @@ impl<
.updated(Certificate::Notarization(notarization.clone()))
.await;
}
// Update our local round with the certificate.
self.handle_notarization(notarization.clone()).await;
// Persist the certificate before informing others.
// The notarization must be durable before we derive ancestor certifications from it.
self.sync_journal(view).await;
let inferred = self.apply_inferred_certifications(resolver, view).await;

// Broadcast the notarization certificate
debug!(proposal=?notarization.proposal, "broadcasting notarization");
self.broadcast_certificate(
Expand All @@ -532,6 +533,52 @@ impl<
self.reporter
.report(Activity::Notarization(notarization))
.await;
inferred
}

/// Journals the certification decision and signals the resolver and reporter.
///
/// Idempotent: returns `false` if the view was already concluded.
async fn conclude_certification(
&mut self,
resolver: &mut resolver::Mailbox<S, D>,
view: View,
success: bool,
) -> bool {
let Some(notarization) = self.handle_certification(view, success).await else {
return false;
};
resolver.certified(view, success).await;
if success {
self.reporter
.report(Activity::Certification(notarization))
.await;
}
true
}

/// Derives certifications for rounds whose notarizations prove certification without
/// waiting on the automaton, then journals and signals each one.
///
/// Important nuance: inferred certification establishes the protocol fact that the
/// round is certified, but it does not retroactively mean this replica locally verified
/// the proposal. Callers should therefore revisit the normal notify path for the inferred
/// view and rely on each vote/certificate constructor to enforce its own local gates.
async fn apply_inferred_certifications(
&mut self,
resolver: &mut resolver::Mailbox<S, D>,
view: View,
) -> Vec<View> {
let mut inferred = Vec::new();
for inferred_view in self.state.infer_certifications(view) {
if self
.conclude_certification(resolver, inferred_view, true)
.await
{
inferred.push(inferred_view);
}
}
inferred
}

/// Broadcast a nullify vote for `view` if the state machine allows it.
Expand Down Expand Up @@ -670,17 +717,24 @@ impl<
view: View,
resolved: Resolved,
) {
self.try_broadcast_notarize(batcher, vote_sender, view)
.await;
self.try_broadcast_notarization(resolver, certificate_sender, view, resolved)
.await;
// We handle broadcast of `Nullify` votes in `timeout`, so this only emits certificates.
self.try_broadcast_nullification(resolver, certificate_sender, view, resolved)
.await;
self.try_broadcast_finalize(batcher, vote_sender, view)
.await;
self.try_broadcast_finalization(resolver, certificate_sender, view, resolved)
.await;
// Use an explicit worklist so inferred certification can revisit the same notify flow
// without building a recursive async future.
let mut pending = vec![(view, resolved)];
while let Some((view, resolved)) = pending.pop() {
self.try_broadcast_notarize(batcher, vote_sender, view)
.await;
let inferred = self
.try_broadcast_notarization(resolver, certificate_sender, view, resolved)
.await;
// We handle broadcast of `Nullify` votes in `timeout`, so this only emits certificates.
self.try_broadcast_nullification(resolver, certificate_sender, view, resolved)
.await;
self.try_broadcast_finalize(batcher, vote_sender, view)
.await;
self.try_broadcast_finalization(resolver, certificate_sender, view, resolved)
.await;
pending.extend(inferred.into_iter().rev().map(|view| (view, Resolved::None)));
}
}

/// Spawns the actor event loop with the provided channels.
Expand Down Expand Up @@ -756,17 +810,8 @@ impl<
.await;
}
Artifact::Certification(round, success) => {
let Some(notarization) =
self.handle_certification(round.view(), success).await
else {
continue;
};
resolver.certified(round.view(), success).await;
if success {
self.reporter
.report(Activity::Certification(notarization))
.await;
}
self.conclude_certification(&mut resolver, round.view(), success)
.await;
}
Artifact::Nullify(nullify) => {
self.handle_nullify(nullify.clone()).await;
Expand Down Expand Up @@ -807,6 +852,13 @@ impl<
}
self.journal = Some(journal);

// Recover inferred certifications that were not journaled before a crash.
let recovery_views: Vec<View> = self.state.notarized_views_desc().collect();
let mut recovered_inferred = Vec::new();
for view in recovery_views {
recovered_inferred.extend(self.apply_inferred_certifications(&mut resolver, view).await);
}

// Log current view after recovery
let end = self.context.current();
let elapsed = end.duration_since(start).unwrap_or_default();
Expand All @@ -829,6 +881,17 @@ impl<
debug!(%observed_view, %leader, ?reason, "nullifying round");
self.state.trigger_timeout(observed_view, reason);
}
for view in recovered_inferred {
self.notify(
&mut batcher,
&mut resolver,
&mut vote_sender,
&mut certificate_sender,
view,
Resolved::None,
)
.await;
}

// Process messages
let mut pending_propose: Option<Request<Context<D, S::PublicKey>, D>> = None;
Expand Down Expand Up @@ -968,20 +1031,12 @@ impl<
if !certified {
warn!(?round, "proposal failed certification");
}
let Some(notarization) = self.handle_certification(view, certified).await
else {
continue;
};
// Always forward certification outcomes to resolver.
// This can happen after a nullification for the same view because
// certification is asynchronous; finalization is the boundary that
// cancels in-flight certification and suppresses late reporting.
resolver.certified(view, certified).await;
if certified {
self.reporter
.report(Activity::Certification(notarization))
.await;
}
// Always forward certification outcomes to resolver. This can happen
// after a nullification for the same view because certification is
// asynchronous; finalization is the boundary that cancels in-flight
// certification and suppresses late reporting.
self.conclude_certification(&mut resolver, view, certified)
.await;
}
Err(err) => {
// Unlike propose/verify (where failing to act will lead to a timeout
Expand Down
Loading
Loading