Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 61 additions & 2 deletions consensus/src/simplex/actors/voter/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,8 @@ impl<
///
/// The certification may succeed, in which case the proposal can be used in future views—
/// or fail, in which case we should nullify the view as fast as possible.
///
/// This is append-only: callers are responsible for calling sync_journal after this returns.
async fn handle_certification(
&mut self,
view: View,
Expand All @@ -430,7 +432,6 @@ impl<
// Persist certification result for recovery
let artifact = Artifact::Certification(Rnd::new(self.state.epoch(), view), success);
self.append_journal(view, artifact.clone()).await;
self.sync_journal(view).await;

Some(notarization)
}
Expand Down Expand Up @@ -507,8 +508,33 @@ impl<
}
// Update our local round with the certificate.
self.handle_notarization(notarization.clone()).await;
// Persist the certificate before informing others.
// Persist the certificate before informing others. The notarization is durable
// here, so it serves as proof that ancestor certifications derived below are valid.
self.sync_journal(view).await;

// Infer certifications for ancestors whose notarizations prove they were certified
// by f+1 signers before they could vote on view N.
let ancestors = self.state.infer_ancestors(view);
let epoch = self.state.epoch();
for (ancestor_view, _) in &ancestors {
self.append_journal(
*ancestor_view,
Artifact::Certification(Rnd::new(epoch, *ancestor_view), true),
)
.await;
}
if !ancestors.is_empty() {
if let Some(journal) = &self.journal {
journal.sync_all().await.expect("unable to sync journal");
}
for (ancestor_view, ancestor_notarization) in ancestors {
resolver.certified(ancestor_view, true).await;
self.reporter
.report(Activity::Certification(ancestor_notarization))
.await;
}
}

// Broadcast the notarization certificate
debug!(proposal=?notarization.proposal, "broadcasting notarization");
self.broadcast_certificate(
Expand Down Expand Up @@ -744,6 +770,9 @@ impl<
.await;
}
Artifact::Certification(round, success) => {
// No sync here: replay data is already durable. Inference does not
// fire during replay because try_broadcast_notarization is never
// called from the replay path.
let Some(notarization) =
self.handle_certification(round.view(), success).await
else {
Expand Down Expand Up @@ -795,6 +824,35 @@ impl<
}
self.journal = Some(journal);

// Post-replay inference pass: recover any ancestor certifications that were inferred
// before a crash but not yet written to the journal. CertifyState::Outstanding cannot
// appear here since the certify pool is empty on startup.
let notarized_views = self.state.notarized_views_descending();
let mut all_inferred: Vec<(View, Notarization<S, D>)> = Vec::new();
let epoch = self.state.epoch();
for view in notarized_views {
let ancestors = self.state.infer_ancestors(view);
all_inferred.extend(ancestors);
}
for (ancestor_view, _) in &all_inferred {
self.append_journal(
*ancestor_view,
Artifact::Certification(Rnd::new(epoch, *ancestor_view), true),
)
.await;
}
if !all_inferred.is_empty() {
if let Some(journal) = &self.journal {
journal.sync_all().await.expect("unable to sync journal");
}
for (ancestor_view, ancestor_notarization) in all_inferred {
resolver.certified(ancestor_view, true).await;
self.reporter
.report(Activity::Certification(ancestor_notarization))
.await;
}
}

// Log current view after recovery
let end = self.context.current();
let elapsed = end.duration_since(start).unwrap_or_default();
Expand Down Expand Up @@ -960,6 +1018,7 @@ impl<
else {
continue;
};
self.sync_journal(view).await;
// Always forward certification outcomes to resolver.
// This can happen after a nullification for the same view because
// certification is asynchronous; finalization is the boundary that
Expand Down
256 changes: 256 additions & 0 deletions consensus/src/simplex/actors/voter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8770,4 +8770,260 @@ mod tests {
batcher_update_triggers_timeout(ed25519::fixture);
batcher_update_triggers_timeout(secp256r1::fixture);
}

/// Verifies that after a crash, the post-replay inference pass certifies ancestors whose
/// notarizations were durable but whose certifications were not written before the crash.
///
/// Scenario:
/// 1. First run: view 3 certification hangs (Pending). View 4 notarization arrives from
/// resolver, triggering try_broadcast_notarization(4). Inference stops because view 3 is
/// Outstanding. Journal has notarization_3 and notarization_4 but no certification_3.
/// 2. Restart with Certifier::Cancel (automaton cannot certify). Inference must be the
/// source of certification. Post-replay pass certifies view 3 via infer_ancestors(4).
fn post_replay_inference_certifies_ancestor<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum = quorum(n);
let namespace = b"post_replay_infer_ancestor".to_vec();
let executor = deterministic::Runner::timed(Duration::from_secs(30));
executor.start(|mut context| async move {
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);

let oracle =
start_test_network_with_peers(context.clone(), participants.clone(), true).await;

let me = participants[0].clone();
let elector = RoundRobin::<Sha256>::default();
let reporter_cfg = mocks::reporter::Config {
participants: participants.clone().try_into().unwrap(),
scheme: schemes[0].clone(),
elector: elector.clone(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);
let relay = Arc::new(mocks::relay::Relay::new());

let partition = "post_replay_infer_ancestor".to_string();
let epoch = Epoch::new(333);

// First run: certification of view 3 hangs indefinitely (Pending).
let app_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: me.clone(),
propose_latency: (1.0, 0.0),
verify_latency: (1.0, 0.0),
certify_latency: (1.0, 0.0),
should_certify: mocks::application::Certifier::Pending,
};
let (app_actor, application) =
mocks::application::Application::new(context.with_label("app_pending"), app_cfg);
app_actor.start();

let voter_cfg = Config {
scheme: schemes[0].clone(),
elector: elector.clone(),
blocker: oracle.control(me.clone()),
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition: partition.clone(),
epoch,
mailbox_size: 128,
leader_timeout: Duration::from_secs(5),
certification_timeout: Duration::from_secs(5),
timeout_retry: Duration::from_mins(60),
activity_timeout: ViewDelta::new(10),
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let (voter, mut mailbox) = Actor::new(context.with_label("voter_pending"), voter_cfg);

let (resolver_sender, _resolver_receiver) = mpsc::channel(8);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(8);
let (vote_sender, _) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (cert_sender, _) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();

let handle = voter.start(
batcher::Mailbox::new(batcher_sender),
resolver::Mailbox::new(resolver_sender),
vote_sender,
cert_sender,
);

if let batcher::Message::Update { response, .. } =
batcher_receiver.recv().await.unwrap()
{
response.send(None).unwrap();
}

// Advance to view 3 (finalizes view 2, puts us at view 3).
let view_3 = View::new(3);
let parent_payload = advance_to_view(
&mut mailbox,
&mut batcher_receiver,
&schemes,
quorum,
view_3,
)
.await;

// Send proposal for view 3 so the voter can verify and notarize it.
let proposal_3 = Proposal::new(
Round::new(epoch, view_3),
view_3.previous().unwrap(),
Sha256::hash(b"post_replay_infer_payload"),
);
let leader = participants[1].clone();
let contents = (proposal_3.round, parent_payload, 0u64).encode();
relay.broadcast(&leader, (proposal_3.payload, contents));
mailbox.proposal(proposal_3.clone()).await;

// Build notarization for view 3 and deliver it — this triggers a (pending) certify.
let (_, notarization_3) = build_notarization(&schemes, &proposal_3, quorum);
mailbox
.resolved(Certificate::Notarization(notarization_3))
.await;

// Give the actor time to process the notarization and dispatch the (pending) certify.
context.sleep(Duration::from_millis(200)).await;

// Build a notarization for view 4 (parent = view 3) and deliver it via resolver.
// This fires try_broadcast_notarization(4), which runs infer_ancestors(4). Because
// view 3 is Outstanding, inference stops — no certification_3 is written to journal.
let view_4 = View::new(4);
let proposal_4 = Proposal::new(
Round::new(epoch, view_4),
view_3,
Sha256::hash(b"post_replay_infer_payload_4"),
);
let (_, notarization_4) = build_notarization(&schemes, &proposal_4, quorum);
mailbox
.resolved(Certificate::Notarization(notarization_4))
.await;

// Wait for the notarization_4 broadcast to be processed (gives sync time).
context.sleep(Duration::from_millis(200)).await;

// Abort first voter — simulates crash.
handle.abort();

// Second run: Certifier::Cancel so the automaton cannot certify. If certification
// of view 3 occurs, it must come from the post-replay inference pass.
let app_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
me: me.clone(),
propose_latency: (1.0, 0.0),
verify_latency: (1.0, 0.0),
certify_latency: (1.0, 0.0),
should_certify: mocks::application::Certifier::Cancel,
};
let (app_actor, application) =
mocks::application::Application::new(context.with_label("app_cancel"), app_cfg);
app_actor.start();

let voter_cfg = Config {
scheme: schemes[0].clone(),
elector,
blocker: oracle.control(me.clone()),
automaton: application.clone(),
relay: application.clone(),
reporter: reporter.clone(),
partition,
epoch,
mailbox_size: 128,
leader_timeout: Duration::from_secs(5),
certification_timeout: Duration::from_secs(5),
timeout_retry: Duration::from_mins(60),
activity_timeout: ViewDelta::new(10),
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
};
let (voter, _mailbox) = Actor::new(context.with_label("voter_restarted"), voter_cfg);

let (resolver_sender, mut resolver_receiver) = mpsc::channel(8);
let (batcher_sender, mut batcher_receiver) = mpsc::channel(8);
let (vote_sender, _) = oracle
.control(me.clone())
.register(2, TEST_QUOTA)
.await
.unwrap();
let (cert_sender, _) = oracle
.control(me.clone())
.register(3, TEST_QUOTA)
.await
.unwrap();

voter.start(
batcher::Mailbox::new(batcher_sender),
resolver::Mailbox::new(resolver_sender),
vote_sender,
cert_sender,
);

if let batcher::Message::Update { response, .. } =
batcher_receiver.recv().await.unwrap()
{
response.send(None).unwrap();
}

// The post-replay inference pass must have certified view 3.
loop {
select! {
msg = resolver_receiver.recv() => match msg.unwrap() {
MailboxMessage::Certified { view, success } if view == view_3 => {
assert!(
success,
"post-replay inference must certify view 3 as successful"
);
return;
}
MailboxMessage::Certified { .. } | MailboxMessage::Certificate(_) => {}
},
msg = batcher_receiver.recv() => {
if let batcher::Message::Update { response, .. } = msg.unwrap() {
response.send(None).unwrap();
}
},
_ = context.sleep(Duration::from_secs(5)) => {
panic!(
"timed out waiting for post-replay inference to certify view {view_3}"
);
},
}
}
});
}

#[test_traced]
fn test_post_replay_inference_certifies_ancestor() {
post_replay_inference_certifies_ancestor::<_, _>(
bls12381_threshold_vrf::fixture::<MinPk, _>,
);
post_replay_inference_certifies_ancestor::<_, _>(
bls12381_threshold_vrf::fixture::<MinSig, _>,
);
post_replay_inference_certifies_ancestor::<_, _>(bls12381_multisig::fixture::<MinPk, _>);
post_replay_inference_certifies_ancestor::<_, _>(bls12381_multisig::fixture::<MinSig, _>);
post_replay_inference_certifies_ancestor::<_, _>(ed25519::fixture);
post_replay_inference_certifies_ancestor::<_, _>(secp256r1::fixture);
}
}
5 changes: 5 additions & 0 deletions consensus/src/simplex/actors/voter/round.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,11 @@ impl<S: Scheme, D: Digest> Round<S, D> {
matches!(self.certify, CertifyState::Certified(true))
}

/// Returns true if certification has not yet started (inference-eligible).
pub const fn is_certify_ready(&self) -> bool {
matches!(self.certify, CertifyState::Ready)
}

/// Returns true if certification was aborted due to finalization.
#[cfg(test)]
pub const fn is_certify_aborted(&self) -> bool {
Expand Down
Loading