From 0ce17007e59d3670916e8a8c2cb57e210276246f Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Fri, 11 Apr 2025 14:30:21 -0700 Subject: [PATCH 1/9] create outdated worker --- consensus/src/threshold_simplex/mocks/mod.rs | 1 + .../src/threshold_simplex/mocks/outdated.rs | 216 ++++++++++++++++++ 2 files changed, 217 insertions(+) create mode 100644 consensus/src/threshold_simplex/mocks/outdated.rs diff --git a/consensus/src/threshold_simplex/mocks/mod.rs b/consensus/src/threshold_simplex/mocks/mod.rs index b9a3a949f84..c3e783b7d2b 100644 --- a/consensus/src/threshold_simplex/mocks/mod.rs +++ b/consensus/src/threshold_simplex/mocks/mod.rs @@ -3,5 +3,6 @@ pub mod application; pub mod conflicter; pub mod nuller; +pub mod outdated; pub mod relay; pub mod supervisor; diff --git a/consensus/src/threshold_simplex/mocks/outdated.rs b/consensus/src/threshold_simplex/mocks/outdated.rs new file mode 100644 index 00000000000..9af26f32aba --- /dev/null +++ b/consensus/src/threshold_simplex/mocks/outdated.rs @@ -0,0 +1,216 @@ +//! Byzantine participant that sends conflicting notarize/finalize messages. + +use crate::{ + threshold_simplex::{ + encoder::{ + finalize_namespace, notarize_namespace, nullify_message, nullify_namespace, + proposal_message, seed_message, seed_namespace, + }, + wire::{self, Proposal}, + View, + }, + ThresholdSupervisor, +}; +use bytes::Bytes; +use commonware_cryptography::{ + bls12381::primitives::{group, ops}, + Hasher, +}; +use commonware_p2p::{Receiver, Recipients, Sender}; +use commonware_runtime::{Clock, Handle, Spawner}; +use prost::Message; +use rand::{CryptoRng, Rng}; +use std::{collections::HashMap, marker::PhantomData}; +use tracing::debug; + +pub struct Config< + S: ThresholdSupervisor, +> { + pub supervisor: S, + pub namespace: Vec, + pub view_delta: u64, +} + +pub struct Outdated< + E: Clock + Rng + CryptoRng + Spawner, + H: Hasher, + S: ThresholdSupervisor, +> { + context: E, + supervisor: S, + _hasher: PhantomData, + + seed_namespace: Vec, + notarize_namespace: Vec, + nullify_namespace: Vec, + finalize_namespace: Vec, + + history: HashMap, + view_delta: u64, +} + +impl< + E: Clock + Rng + CryptoRng + Spawner, + H: Hasher, + S: ThresholdSupervisor, + > Outdated +{ + pub fn new(context: E, cfg: Config) -> Self { + Self { + context, + supervisor: cfg.supervisor, + _hasher: PhantomData, + + seed_namespace: seed_namespace(&cfg.namespace), + notarize_namespace: notarize_namespace(&cfg.namespace), + nullify_namespace: nullify_namespace(&cfg.namespace), + finalize_namespace: finalize_namespace(&cfg.namespace), + + history: HashMap::new(), + view_delta: cfg.view_delta, + } + } + + pub fn start(mut self, voter_network: (impl Sender, impl Receiver)) -> Handle<()> { + self.context.spawn_ref()(self.run(voter_network)) + } + + async fn run(mut self, voter_network: (impl Sender, impl Receiver)) { + let (mut sender, mut receiver) = voter_network; + while let Ok((s, msg)) = receiver.recv().await { + // Parse message + let msg = match wire::Voter::decode(msg) { + Ok(msg) => msg, + Err(err) => { + debug!(?err, sender = ?s, "failed to decode message"); + continue; + } + }; + let payload = match msg.payload { + Some(payload) => payload, + None => { + debug!(sender = ?s, "message missing payload"); + continue; + } + }; + + // Process message + match payload { + wire::voter::Payload::Nullify(nullify) => { + // Get our index + let view = nullify.view; + let share = self.supervisor.share(view).unwrap(); + + // Nullify provided view + let view = view.saturating_sub(self.view_delta); + let message = nullify_message(view); + let view_signature = + ops::partial_sign_message(share, Some(&self.nullify_namespace), &message) + .serialize(); + let message = seed_message(view); + let seed_signature = + ops::partial_sign_message(share, Some(&self.seed_namespace), &message) + .serialize(); + let n = wire::Nullify { + view, + view_signature, + seed_signature, + }; + let msg = wire::Voter { + payload: Some(wire::voter::Payload::Nullify(n)), + } + .encode_to_vec() + .into(); + sender.send(Recipients::All, msg, true).await.unwrap(); + } + wire::voter::Payload::Notarize(notarize) => { + // Get our index + let proposal = match notarize.proposal { + Some(proposal) => proposal, + None => { + debug!(sender = ?s, "notarize missing proposal"); + continue; + } + }; + let Ok(payload) = H::Digest::try_from(&proposal.payload) else { + debug!(sender = ?s, "invalid payload"); + continue; + }; + let view = proposal.view; + let share = self.supervisor.share(view).unwrap(); + + // Store proposal + self.history.insert(view, proposal.clone()); + + // Notarize old digest + let view = view.saturating_sub(self.view_delta); + let Some(proposal) = self.history.get(&view) else { + continue; + }; + let parent = proposal.parent; + let message = proposal_message(proposal.view, parent, &payload); + let proposal_signature = + ops::partial_sign_message(share, Some(&self.notarize_namespace), &message) + .serialize(); + let message = seed_message(view); + let seed_signature: Bytes = + ops::partial_sign_message(share, Some(&self.seed_namespace), &message) + .serialize() + .into(); + let n = wire::Notarize { + proposal: Some(proposal.clone()), + proposal_signature, + seed_signature: seed_signature.to_vec(), + }; + let msg = wire::Voter { + payload: Some(wire::voter::Payload::Notarize(n)), + } + .encode_to_vec() + .into(); + sender.send(Recipients::All, msg, true).await.unwrap(); + } + wire::voter::Payload::Finalize(finalize) => { + // Get our index + let proposal = match finalize.proposal { + Some(proposal) => proposal, + None => { + debug!(sender = ?s, "notarize missing proposal"); + continue; + } + }; + let Ok(payload) = H::Digest::try_from(&proposal.payload) else { + debug!(sender = ?s, "invalid payload"); + continue; + }; + let view = proposal.view; + let share = self.supervisor.share(view).unwrap(); + + // Store proposal + self.history.insert(view, proposal.clone()); + + // Finalize old digest + let view = view.saturating_sub(self.view_delta); + let Some(proposal) = self.history.get(&view) else { + continue; + }; + let parent = proposal.parent; + let message = proposal_message(view, parent, &payload); + let proposal_signature = + ops::partial_sign_message(share, Some(&self.finalize_namespace), &message) + .serialize(); + let f = wire::Finalize { + proposal: Some(proposal.clone()), + proposal_signature, + }; + let msg = wire::Voter { + payload: Some(wire::voter::Payload::Finalize(f)), + } + .encode_to_vec() + .into(); + sender.send(Recipients::All, msg, true).await.unwrap(); + } + _ => continue, + } + } + } +} From e24ea625541732df54419c834e2cf458e330ac19 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Fri, 11 Apr 2025 14:38:14 -0700 Subject: [PATCH 2/9] outdated test --- .../src/threshold_simplex/mocks/outdated.rs | 3 + consensus/src/threshold_simplex/mod.rs | 165 ++++++++++++++++++ 2 files changed, 168 insertions(+) diff --git a/consensus/src/threshold_simplex/mocks/outdated.rs b/consensus/src/threshold_simplex/mocks/outdated.rs index 9af26f32aba..31cd775231a 100644 --- a/consensus/src/threshold_simplex/mocks/outdated.rs +++ b/consensus/src/threshold_simplex/mocks/outdated.rs @@ -103,6 +103,7 @@ impl< // Nullify provided view let view = view.saturating_sub(self.view_delta); + debug!(?view, "nullifying outdated view"); let message = nullify_message(view); let view_signature = ops::partial_sign_message(share, Some(&self.nullify_namespace), &message) @@ -147,6 +148,7 @@ impl< let Some(proposal) = self.history.get(&view) else { continue; }; + debug!(?view, "notarizing outdated proposal"); let parent = proposal.parent; let message = proposal_message(proposal.view, parent, &payload); let proposal_signature = @@ -193,6 +195,7 @@ impl< let Some(proposal) = self.history.get(&view) else { continue; }; + debug!(?view, "finalizing outdated proposal"); let parent = proposal.parent; let message = proposal_message(view, parent, &payload); let proposal_signature = diff --git a/consensus/src/threshold_simplex/mod.rs b/consensus/src/threshold_simplex/mod.rs index 297501a5c58..7c54529ffdb 100644 --- a/consensus/src/threshold_simplex/mod.rs +++ b/consensus/src/threshold_simplex/mod.rs @@ -2376,4 +2376,169 @@ skip_timeout, assert!(count_nullify_and_finalize > 0); }); } + + #[test_traced] + fn test_outdated() { + // Create context + let n = 4; + let threshold = quorum(n).expect("unable to calculate threshold"); + let required_containers = 100; + let activity_timeout = 10; + let skip_timeout = 5; + let namespace = b"consensus".to_vec(); + let (executor, mut context, _) = Executor::timed(Duration::from_secs(30)); + executor.start(async move { + // Create simulated network + let (network, mut oracle) = Network::new( + context.with_label("network"), + Config { + max_size: 1024 * 1024, + }, + ); + + // Start network + network.start(); + + // Register participants + let mut schemes = Vec::new(); + let mut validators = Vec::new(); + for i in 0..n { + let scheme = Ed25519::from_seed(i as u64); + let pk = scheme.public_key(); + schemes.push(scheme); + validators.push(pk); + } + validators.sort(); + schemes.sort_by_key(|s| s.public_key()); + let mut registrations = register_validators(&mut oracle, &validators).await; + + // Link all validators + let link = Link { + latency: 10.0, + jitter: 1.0, + success_rate: 1.0, + }; + link_validators(&mut oracle, &validators, Action::Link(link), None).await; + + // Derive threshold + let (public, shares) = ops::generate_shares(&mut context, None, n, threshold); + let pk = poly::public(&public); + let prover = Prover::new(*pk, &namespace); + + // Create engines + let relay = Arc::new(mocks::relay::Relay::new()); + let mut supervisors = Vec::new(); + let (done_sender, mut done_receiver) = mpsc::unbounded(); + for (idx_scheme, scheme) in schemes.into_iter().enumerate() { + // Create scheme context + let context = context.with_label(&format!("validator-{}", scheme.public_key())); + + // Start engine + let validator = scheme.public_key(); + let mut participants = BTreeMap::new(); + participants.insert(0, (public.clone(), validators.clone(), shares[idx_scheme])); + let supervisor_config = mocks::supervisor::Config { + prover: prover.clone(), + participants, + }; + let supervisor = mocks::supervisor::Supervisor::new(supervisor_config); + let (voter, resolver) = registrations + .remove(&validator) + .expect("validator should be registered"); + if idx_scheme == 0 { + let cfg = mocks::outdated::Config { + supervisor, + namespace: namespace.clone(), + view_delta: activity_timeout + 10, + }; + let engine: mocks::outdated::Outdated<_, Sha256, _> = + mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg); + engine.start(voter); + } else { + supervisors.push(supervisor.clone()); + let application_cfg = mocks::application::Config { + hasher: Sha256::default(), + relay: relay.clone(), + participant: validator.clone(), + tracker: done_sender.clone(), + propose_latency: (10.0, 5.0), + verify_latency: (10.0, 5.0), + }; + let (actor, application) = mocks::application::Application::new( + context.with_label("application"), + application_cfg, + ); + actor.start(); + let cfg = JConfig { + partition: validator.to_string(), + }; + let journal = Journal::init(context.with_label("journal"), cfg) + .await + .expect("unable to create journal"); + let cfg = config::Config { + crypto: scheme, + automaton: application.clone(), + relay: application.clone(), + committer: application, + supervisor, + mailbox_size: 1024, + namespace: namespace.clone(), + leader_timeout: Duration::from_secs(1), + notarization_timeout: Duration::from_secs(2), + nullify_retry: Duration::from_secs(10), + fetch_timeout: Duration::from_secs(1), + activity_timeout, + skip_timeout, + max_fetch_count: 1, + max_fetch_size: 1024 * 512, + fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()), + fetch_concurrent: 1, + replay_concurrency: 1, + }; + let engine = Engine::new(context.with_label("engine"), journal, cfg); + engine.start(voter, resolver); + } + } + + // Wait for all engines to finish + let mut completed = HashSet::new(); + let mut finalized = HashMap::new(); + loop { + let (validator, event) = done_receiver.next().await.unwrap(); + if let mocks::application::Progress::Finalized(proof, digest) = event { + let (view, _, payload, _, _) = prover.deserialize_finalization(proof).unwrap(); + if digest != payload { + panic!( + "finalization mismatch digest: {:?}, payload: {:?}", + digest, payload + ); + } + if let Some(previous) = finalized.insert(view, digest) { + if previous != digest { + panic!( + "finalization mismatch at {:?} previous: {:?}, current: {:?}", + view, previous, digest + ); + } + } + if (finalized.len() as u64) < required_containers { + continue; + } + completed.insert(validator); + } + if completed.len() == (n - 1) as usize { + break; + } + } + + // Check supervisors for correct activity + for supervisor in supervisors.iter() { + // No faults + { + let faults = supervisor.faults.lock().unwrap(); + assert_eq!(faults.len(), 0); + } + } + }); + } } From 81f63942822ed98488ee96b8559a188df6f4c902 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Fri, 11 Apr 2025 15:15:05 -0700 Subject: [PATCH 3/9] test compiles --- .../actors/resolver/ingress.rs | 2 +- .../threshold_simplex/actors/voter/ingress.rs | 2 +- .../src/threshold_simplex/actors/voter/mod.rs | 147 ++++++++++++++++++ 3 files changed, 149 insertions(+), 2 deletions(-) diff --git a/consensus/src/threshold_simplex/actors/resolver/ingress.rs b/consensus/src/threshold_simplex/actors/resolver/ingress.rs index ae57b56b2b6..da2841a052f 100644 --- a/consensus/src/threshold_simplex/actors/resolver/ingress.rs +++ b/consensus/src/threshold_simplex/actors/resolver/ingress.rs @@ -24,7 +24,7 @@ pub struct Mailbox { } impl Mailbox { - pub(super) fn new(sender: mpsc::Sender) -> Self { + pub(crate) fn new(sender: mpsc::Sender) -> Self { Self { sender } } diff --git a/consensus/src/threshold_simplex/actors/voter/ingress.rs b/consensus/src/threshold_simplex/actors/voter/ingress.rs index 8a8d1b1e85f..d8d5a0b287f 100644 --- a/consensus/src/threshold_simplex/actors/voter/ingress.rs +++ b/consensus/src/threshold_simplex/actors/voter/ingress.rs @@ -18,7 +18,7 @@ pub struct Mailbox { } impl Mailbox { - pub(super) fn new(sender: mpsc::Sender>) -> Self { + pub(crate) fn new(sender: mpsc::Sender>) -> Self { Self { sender } } diff --git a/consensus/src/threshold_simplex/actors/voter/mod.rs b/consensus/src/threshold_simplex/actors/voter/mod.rs index 9ac6cb28ade..f509193f9d9 100644 --- a/consensus/src/threshold_simplex/actors/voter/mod.rs +++ b/consensus/src/threshold_simplex/actors/voter/mod.rs @@ -35,3 +35,150 @@ pub struct Config< pub skip_timeout: View, pub replay_concurrency: usize, } + +#[cfg(test)] +mod tests { + use super::*; + use crate::threshold_simplex::{ + actors::resolver, + mocks, + wire::{self, backfiller}, + Prover, View, CONFLICTING_FINALIZE, CONFLICTING_NOTARIZE, FINALIZE, NOTARIZE, + NULLIFY_AND_FINALIZE, + }; + use bytes::Bytes; + use commonware_cryptography::{ + bls12381::{ + dkg::ops, + primitives::{ + group::{self, Share, Signature}, + poly::{self, Public}, + }, + }, + sha256::Digest, + Ed25519, Scheme as CryptoScheme, Sha256, Signer, + }; + use commonware_macros::test_traced; + use commonware_p2p::simulated::{Config as NConfig, Network}; + use commonware_runtime::{ + deterministic::{self, Context as DeterministicContext, Executor}, + Blob, Clock, Metrics, Runner, Spawner, Storage, + }; + use commonware_storage::journal::variable::{Config as JConfig, Journal}; + use commonware_utils::{quorum, Array}; + use futures::{ + channel::mpsc::{self, UnboundedReceiver}, + StreamExt, + }; + use prost::Message; + use std::time::Duration; + use std::{ + collections::BTreeMap, + sync::{atomic::AtomicI64, Arc}, + }; + + // Mock Committer (no-op) + #[derive(Clone)] + struct MockCommitter; + impl Committer for MockCommitter { + type Digest = ::Digest; + async fn prepared(&mut self, _proof: Bytes, _payload: Self::Digest) {} + async fn finalized(&mut self, _proof: Bytes, _payload: Self::Digest) {} + } + + // Test for late notarization causing a panic + #[test_traced] + fn test_late_notarization_panic() { + let n = 5; + let threshold = quorum(n).expect("unable to calculate threshold"); + let namespace = b"consensus".to_vec(); + let (executor, mut context, _) = Executor::timed(Duration::from_secs(10)); + executor.start(async move { + // Create simulated network + let (network, mut oracle) = Network::new( + context.with_label("network"), + NConfig { + max_size: 1024 * 1024, + }, + ); + network.start(); + + // Get participants + let mut schemes = Vec::new(); + let mut validators = Vec::new(); + for i in 0..n { + let scheme = Ed25519::from_seed(i as u64); + let pk = scheme.public_key(); + schemes.push(scheme); + validators.push(pk); + } + validators.sort(); + schemes.sort_by_key(|s| s.public_key()); + + // Derive threshold + let (public, shares) = ops::generate_shares(&mut context, None, n, threshold); + let pk = poly::public(&public); + let prover = Prover::::new(*pk, &namespace); + + // Initialize voter actor + let (done_sender, mut done_receiver) = mpsc::unbounded(); + let scheme = schemes[0].clone(); + let validator = scheme.public_key(); + let mut participants = BTreeMap::new(); + participants.insert(0, (public.clone(), validators.clone(), shares[0])); + let supervisor_config = mocks::supervisor::Config { + prover: prover.clone(), + participants, + }; + let supervisor = mocks::supervisor::Supervisor::new(supervisor_config); + let relay = Arc::new(mocks::relay::Relay::new()); + let application_cfg = mocks::application::Config { + hasher: Sha256::default(), + relay: relay.clone(), + participant: validator.clone(), + tracker: done_sender.clone(), + propose_latency: (10.0, 5.0), + verify_latency: (10.0, 5.0), + }; + let (actor, application) = mocks::application::Application::new( + context.with_label("application"), + application_cfg, + ); + actor.start(); + let cfg = JConfig { + partition: "test".to_string(), + }; + let journal = Journal::init(context.with_label("journal"), cfg) + .await + .expect("unable to create journal"); + + let cfg = Config { + crypto: scheme, + automaton: application.clone(), + relay: application.clone(), + committer: MockCommitter, + supervisor, + + namespace, + mailbox_size: 10, + leader_timeout: Duration::from_secs(5), + notarization_timeout: Duration::from_secs(5), + nullify_retry: Duration::from_secs(5), + activity_timeout: 10, + skip_timeout: 10, + replay_concurrency: 1, + }; + let (mut actor, mut mailbox) = Actor::new(context.clone(), journal, cfg); + + // Create a dummy backfiller mailbox (not used in this path) + let (backfiller_sender, backfiller_receiver) = mpsc::channel(1); + let backfiller = resolver::Mailbox::new(backfiller_sender); + + // Create a dummy network mailbox + let (voter_sender, voter_receiver) = oracle.register(validator, 0).await.unwrap(); + + // Run the actor, expecting it to panic + actor.start(backfiller, voter_sender, voter_receiver); + }); + } +} From 50b58d5b27368e0bfe576c7ce05530ac5568f764 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Fri, 11 Apr 2025 15:40:24 -0700 Subject: [PATCH 4/9] test runs --- .../src/threshold_simplex/actors/voter/mod.rs | 175 +++++++++++++++++- 1 file changed, 171 insertions(+), 4 deletions(-) diff --git a/consensus/src/threshold_simplex/actors/voter/mod.rs b/consensus/src/threshold_simplex/actors/voter/mod.rs index f509193f9d9..b102650adf1 100644 --- a/consensus/src/threshold_simplex/actors/voter/mod.rs +++ b/consensus/src/threshold_simplex/actors/voter/mod.rs @@ -41,6 +41,9 @@ mod tests { use super::*; use crate::threshold_simplex::{ actors::resolver, + encoder::{ + finalize_namespace, notarize_namespace, proposal_message, seed_message, seed_namespace, + }, mocks, wire::{self, backfiller}, Prover, View, CONFLICTING_FINALIZE, CONFLICTING_NOTARIZE, FINALIZE, NOTARIZE, @@ -51,15 +54,20 @@ mod tests { bls12381::{ dkg::ops, primitives::{ - group::{self, Share, Signature}, + group::{self, Element, Share, Signature}, + ops::{partial_sign_message, sign_message, threshold_signature_recover}, poly::{self, Public}, }, }, + hash, sha256::Digest, Ed25519, Scheme as CryptoScheme, Sha256, Signer, }; use commonware_macros::test_traced; - use commonware_p2p::simulated::{Config as NConfig, Network}; + use commonware_p2p::{ + simulated::{Config as NConfig, Link, Network}, + Recipients, Sender, + }; use commonware_runtime::{ deterministic::{self, Context as DeterministicContext, Executor}, Blob, Clock, Metrics, Runner, Spawner, Storage, @@ -159,7 +167,7 @@ mod tests { committer: MockCommitter, supervisor, - namespace, + namespace: namespace.clone(), mailbox_size: 10, leader_timeout: Duration::from_secs(5), notarization_timeout: Duration::from_secs(5), @@ -175,10 +183,169 @@ mod tests { let backfiller = resolver::Mailbox::new(backfiller_sender); // Create a dummy network mailbox - let (voter_sender, voter_receiver) = oracle.register(validator, 0).await.unwrap(); + let peer = schemes[1].public_key(); + let (voter_sender, voter_receiver) = + oracle.register(validator.clone(), 0).await.unwrap(); + let (mut peer_sender, _) = oracle.register(peer.clone(), 0).await.unwrap(); + oracle + .add_link( + validator.clone(), + peer.clone(), + Link { + latency: 0.0, + jitter: 0.0, + success_rate: 1.0, + }, + ) + .await + .unwrap(); + oracle + .add_link( + peer, + validator, + Link { + latency: 0.0, + jitter: 0.0, + success_rate: 1.0, + }, + ) + .await + .unwrap(); // Run the actor, expecting it to panic actor.start(backfiller, voter_sender, voter_receiver); + + // Send finalization over network + let payload = hash(b"test"); + let proposal = wire::Proposal { + view: 100, + parent: 50, + payload: payload.to_vec(), + }; + let message = proposal_message(proposal.view, proposal.parent, &payload); + let finalize_namespace = finalize_namespace(&namespace); + let partials: Vec<_> = shares + .iter() + .map(|share| partial_sign_message(share, Some(&finalize_namespace), &message)) + .collect(); + let proposal_signature = threshold_signature_recover(threshold, partials) + .unwrap() + .serialize(); + let seed_namespace = seed_namespace(&namespace); + let message = seed_message(proposal.view); + let partials: Vec<_> = shares + .iter() + .map(|share| partial_sign_message(share, Some(&seed_namespace), &message)) + .collect(); + let seed_signature = threshold_signature_recover(threshold, partials) + .unwrap() + .serialize(); + let finalization = wire::Finalization { + proposal: Some(proposal), + proposal_signature, + seed_signature, + }; + let msg = wire::Voter { + payload: Some(wire::voter::Payload::Finalization(finalization)), + } + .encode_to_vec() + .into(); + peer_sender + .send(Recipients::All, msg, true) + .await + .expect("failed to send message"); + + // Wait for application to be notified + let (_, progress) = done_receiver.next().await.expect("failed to receive done"); + match progress { + mocks::application::Progress::Finalized(_, finalized_payload) => { + assert_eq!(finalized_payload, payload); + } + _ => panic!("unexpected progress"), + } + panic!("expected panic"); + + // Send old notarization from backfiller + let payload = hash(b"test2"); + let proposal = wire::Proposal { + view: 50, + parent: 49, + payload: payload.to_vec(), + }; + let message = proposal_message(proposal.view, proposal.parent, &payload); + let notarize_namespace = notarize_namespace(&namespace); + let partials: Vec<_> = shares + .iter() + .map(|share| partial_sign_message(share, Some(¬arize_namespace), &message)) + .collect(); + let proposal_signature = threshold_signature_recover(threshold, partials) + .unwrap() + .serialize(); + let message = seed_message(proposal.view); + let partials: Vec<_> = shares + .iter() + .map(|share| partial_sign_message(share, Some(&seed_namespace), &message)) + .collect(); + let seed_signature = threshold_signature_recover(threshold, partials) + .unwrap() + .serialize(); + let notarization = wire::Notarization { + proposal: Some(proposal), + proposal_signature, + seed_signature, + }; + let notarization = crate::Parsed { + message: notarization, + digest: payload, + }; + mailbox.notarization(notarization).await; + + // Send new finalization + let payload = hash(b"test3"); + let proposal = wire::Proposal { + view: 300, + parent: 100, + payload: payload.to_vec(), + }; + let message = proposal_message(proposal.view, proposal.parent, &payload); + let partials: Vec<_> = shares + .iter() + .map(|share| partial_sign_message(share, Some(&finalize_namespace), &message)) + .collect(); + let proposal_signature = threshold_signature_recover(threshold, partials) + .unwrap() + .serialize(); + let message = seed_message(proposal.view); + let partials: Vec<_> = shares + .iter() + .map(|share| partial_sign_message(share, Some(&seed_namespace), &message)) + .collect(); + let seed_signature = threshold_signature_recover(threshold, partials) + .unwrap() + .serialize(); + let finalization = wire::Finalization { + proposal: Some(proposal), + proposal_signature, + seed_signature, + }; + let msg = wire::Voter { + payload: Some(wire::voter::Payload::Finalization(finalization)), + } + .encode_to_vec() + .into(); + peer_sender + .send(Recipients::All, msg, true) + .await + .expect("failed to send message"); + + // Wait for application to be notified + let (_, progress) = done_receiver.next().await.expect("failed to receive done"); + match progress { + mocks::application::Progress::Finalized(_, finalized_payload) => { + assert_eq!(finalized_payload, payload); + } + _ => panic!("unexpected progress"), + } }); } } From 9eb1d4fc8763d155fa89e78b9b4acee7ab042fe9 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Fri, 11 Apr 2025 15:59:00 -0700 Subject: [PATCH 5/9] reproduce panic --- .../threshold_simplex/actors/resolver/mod.rs | 2 +- .../src/threshold_simplex/actors/voter/mod.rs | 46 ++++++++----------- 2 files changed, 19 insertions(+), 29 deletions(-) diff --git a/consensus/src/threshold_simplex/actors/resolver/mod.rs b/consensus/src/threshold_simplex/actors/resolver/mod.rs index 247d4a2b096..a82dc664e24 100644 --- a/consensus/src/threshold_simplex/actors/resolver/mod.rs +++ b/consensus/src/threshold_simplex/actors/resolver/mod.rs @@ -5,7 +5,7 @@ use crate::Supervisor; pub use actor::Actor; use commonware_cryptography::Scheme; use governor::Quota; -pub use ingress::Mailbox; +pub use ingress::{Mailbox, Message}; use std::time::Duration; pub struct Config { diff --git a/consensus/src/threshold_simplex/actors/voter/mod.rs b/consensus/src/threshold_simplex/actors/voter/mod.rs index b102650adf1..54e518077f3 100644 --- a/consensus/src/threshold_simplex/actors/voter/mod.rs +++ b/consensus/src/threshold_simplex/actors/voter/mod.rs @@ -44,46 +44,34 @@ mod tests { encoder::{ finalize_namespace, notarize_namespace, proposal_message, seed_message, seed_namespace, }, - mocks, - wire::{self, backfiller}, - Prover, View, CONFLICTING_FINALIZE, CONFLICTING_NOTARIZE, FINALIZE, NOTARIZE, - NULLIFY_AND_FINALIZE, + mocks, wire, Prover, }; use bytes::Bytes; use commonware_cryptography::{ bls12381::{ dkg::ops, primitives::{ - group::{self, Element, Share, Signature}, - ops::{partial_sign_message, sign_message, threshold_signature_recover}, - poly::{self, Public}, + group::Element, + ops::{partial_sign_message, threshold_signature_recover}, + poly, }, }, hash, sha256::Digest, - Ed25519, Scheme as CryptoScheme, Sha256, Signer, + Ed25519, Sha256, Signer, }; use commonware_macros::test_traced; use commonware_p2p::{ simulated::{Config as NConfig, Link, Network}, Recipients, Sender, }; - use commonware_runtime::{ - deterministic::{self, Context as DeterministicContext, Executor}, - Blob, Clock, Metrics, Runner, Spawner, Storage, - }; + use commonware_runtime::{deterministic::Executor, Metrics, Runner}; use commonware_storage::journal::variable::{Config as JConfig, Journal}; - use commonware_utils::{quorum, Array}; - use futures::{ - channel::mpsc::{self, UnboundedReceiver}, - StreamExt, - }; + use commonware_utils::quorum; + use futures::{channel::mpsc, StreamExt}; use prost::Message; use std::time::Duration; - use std::{ - collections::BTreeMap, - sync::{atomic::AtomicI64, Arc}, - }; + use std::{collections::BTreeMap, sync::Arc}; // Mock Committer (no-op) #[derive(Clone)] @@ -179,7 +167,7 @@ mod tests { let (mut actor, mut mailbox) = Actor::new(context.clone(), journal, cfg); // Create a dummy backfiller mailbox (not used in this path) - let (backfiller_sender, backfiller_receiver) = mpsc::channel(1); + let (backfiller_sender, mut backfiller_receiver) = mpsc::channel(1); let backfiller = resolver::Mailbox::new(backfiller_sender); // Create a dummy network mailbox @@ -256,14 +244,16 @@ mod tests { .expect("failed to send message"); // Wait for application to be notified - let (_, progress) = done_receiver.next().await.expect("failed to receive done"); - match progress { - mocks::application::Progress::Finalized(_, finalized_payload) => { - assert_eq!(finalized_payload, payload); + let msg = backfiller_receiver + .next() + .await + .expect("failed to receive backfiller message"); + match msg { + resolver::Message::Finalized { view } => { + assert_eq!(view, 100); } - _ => panic!("unexpected progress"), + _ => panic!("unexpected backfiller message"), } - panic!("expected panic"); // Send old notarization from backfiller let payload = hash(b"test2"); From a1c5a9a8fe96c2f047287908b958b7843513e0e8 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Fri, 11 Apr 2025 15:59:38 -0700 Subject: [PATCH 6/9] fix lint --- consensus/src/threshold_simplex/actors/resolver/mod.rs | 4 +++- consensus/src/threshold_simplex/actors/voter/mod.rs | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/consensus/src/threshold_simplex/actors/resolver/mod.rs b/consensus/src/threshold_simplex/actors/resolver/mod.rs index a82dc664e24..d49f8c45f5c 100644 --- a/consensus/src/threshold_simplex/actors/resolver/mod.rs +++ b/consensus/src/threshold_simplex/actors/resolver/mod.rs @@ -5,7 +5,9 @@ use crate::Supervisor; pub use actor::Actor; use commonware_cryptography::Scheme; use governor::Quota; -pub use ingress::{Mailbox, Message}; +pub use ingress::Mailbox; +#[cfg(test)] +pub use ingress::Message; use std::time::Duration; pub struct Config { diff --git a/consensus/src/threshold_simplex/actors/voter/mod.rs b/consensus/src/threshold_simplex/actors/voter/mod.rs index 54e518077f3..96a60fdf363 100644 --- a/consensus/src/threshold_simplex/actors/voter/mod.rs +++ b/consensus/src/threshold_simplex/actors/voter/mod.rs @@ -164,7 +164,7 @@ mod tests { skip_timeout: 10, replay_concurrency: 1, }; - let (mut actor, mut mailbox) = Actor::new(context.clone(), journal, cfg); + let (actor, mut mailbox) = Actor::new(context.clone(), journal, cfg); // Create a dummy backfiller mailbox (not used in this path) let (backfiller_sender, mut backfiller_receiver) = mpsc::channel(1); From d9ff34df95c600a7b9b98f3383bd16ac23554403 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Fri, 11 Apr 2025 16:07:05 -0700 Subject: [PATCH 7/9] fix finish --- .../src/threshold_simplex/actors/voter/mod.rs | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/consensus/src/threshold_simplex/actors/voter/mod.rs b/consensus/src/threshold_simplex/actors/voter/mod.rs index 96a60fdf363..9be564827ca 100644 --- a/consensus/src/threshold_simplex/actors/voter/mod.rs +++ b/consensus/src/threshold_simplex/actors/voter/mod.rs @@ -117,7 +117,7 @@ mod tests { let prover = Prover::::new(*pk, &namespace); // Initialize voter actor - let (done_sender, mut done_receiver) = mpsc::unbounded(); + let (done_sender, _) = mpsc::unbounded(); let scheme = schemes[0].clone(); let validator = scheme.public_key(); let mut participants = BTreeMap::new(); @@ -243,7 +243,7 @@ mod tests { .await .expect("failed to send message"); - // Wait for application to be notified + // Wait for backfiller to be notified let msg = backfiller_receiver .next() .await @@ -328,11 +328,14 @@ mod tests { .await .expect("failed to send message"); - // Wait for application to be notified - let (_, progress) = done_receiver.next().await.expect("failed to receive done"); - match progress { - mocks::application::Progress::Finalized(_, finalized_payload) => { - assert_eq!(finalized_payload, payload); + // Wait for backfiller to be notified + let msg = backfiller_receiver + .next() + .await + .expect("failed to receive backfiller message"); + match msg { + resolver::Message::Finalized { view } => { + assert_eq!(view, 300); } _ => panic!("unexpected progress"), } From 93f7ccc500805c2fe03da2156eb43991b5eb5168 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Fri, 11 Apr 2025 16:12:39 -0700 Subject: [PATCH 8/9] no more errors --- .../src/threshold_simplex/actors/voter/actor.rs | 16 ++++++++++++---- .../src/threshold_simplex/actors/voter/mod.rs | 16 ++++++++++++---- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/consensus/src/threshold_simplex/actors/voter/actor.rs b/consensus/src/threshold_simplex/actors/voter/actor.rs index 776a91de964..c2b95f2d43e 100644 --- a/consensus/src/threshold_simplex/actors/voter/actor.rs +++ b/consensus/src/threshold_simplex/actors/voter/actor.rs @@ -1367,6 +1367,7 @@ impl< round.advance_deadline = Some(self.context.current() + self.notarization_timeout); round.set_leader(seed); self.view = view; + self.current_view.set(view as i64); // If we are backfilling, exit early if self.journal.is_none() { @@ -1452,6 +1453,9 @@ impl< .await .expect("unable to prune journal"); } + + // Update metrics + self.tracked_views.set(self.views.len() as i64); } async fn notarize(&mut self, sender: &C::PublicKey, notarize: wire::Notarize) { @@ -2554,11 +2558,19 @@ impl< match msg { Message::Notarization{ notarization } => { view = notarization.message.proposal.as_ref().unwrap().view; + if !self.interesting(view, false) { + debug!(view, "backfilled notarization is not interesting"); + continue; + } debug!(view, "received notarization from backfiller"); self.handle_notarization(notarization).await; }, Message::Nullification { nullification } => { view = nullification.view; + if !self.interesting(view, false) { + debug!(view, "backfilled nullification is not interesting"); + continue; + } debug!(view, "received nullification from backfiller"); self.handle_nullification(nullification).await; }, @@ -2638,10 +2650,6 @@ impl< // After sending all required messages, prune any views // we no longer need self.prune_views().await; - - // Update metrics - self.current_view.set(view as i64); - self.tracked_views.set(self.views.len() as i64); } } } diff --git a/consensus/src/threshold_simplex/actors/voter/mod.rs b/consensus/src/threshold_simplex/actors/voter/mod.rs index 9be564827ca..2d9f1fbc292 100644 --- a/consensus/src/threshold_simplex/actors/voter/mod.rs +++ b/consensus/src/threshold_simplex/actors/voter/mod.rs @@ -63,9 +63,9 @@ mod tests { use commonware_macros::test_traced; use commonware_p2p::{ simulated::{Config as NConfig, Link, Network}, - Recipients, Sender, + Receiver, Recipients, Sender, }; - use commonware_runtime::{deterministic::Executor, Metrics, Runner}; + use commonware_runtime::{deterministic::Executor, Metrics, Runner, Spawner}; use commonware_storage::journal::variable::{Config as JConfig, Journal}; use commonware_utils::quorum; use futures::{channel::mpsc, StreamExt}; @@ -84,7 +84,7 @@ mod tests { // Test for late notarization causing a panic #[test_traced] - fn test_late_notarization_panic() { + fn test_old_backfill() { let n = 5; let threshold = quorum(n).expect("unable to calculate threshold"); let namespace = b"consensus".to_vec(); @@ -174,7 +174,8 @@ mod tests { let peer = schemes[1].public_key(); let (voter_sender, voter_receiver) = oracle.register(validator.clone(), 0).await.unwrap(); - let (mut peer_sender, _) = oracle.register(peer.clone(), 0).await.unwrap(); + let (mut peer_sender, mut peer_receiver) = + oracle.register(peer.clone(), 0).await.unwrap(); oracle .add_link( validator.clone(), @@ -200,6 +201,13 @@ mod tests { .await .unwrap(); + // Drain peer receiver + context.with_label("peer_receiver").spawn(|_| async move { + loop { + peer_receiver.recv().await.unwrap(); + } + }); + // Run the actor, expecting it to panic actor.start(backfiller, voter_sender, voter_receiver); From 60e658bb60da21094c1148b0261ac444f88ba479 Mon Sep 17 00:00:00 2001 From: Patrick O'Grady Date: Fri, 11 Apr 2025 16:16:21 -0700 Subject: [PATCH 9/9] add comments --- consensus/src/threshold_simplex/actors/voter/actor.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/consensus/src/threshold_simplex/actors/voter/actor.rs b/consensus/src/threshold_simplex/actors/voter/actor.rs index c2b95f2d43e..1a4400b3ec0 100644 --- a/consensus/src/threshold_simplex/actors/voter/actor.rs +++ b/consensus/src/threshold_simplex/actors/voter/actor.rs @@ -2559,6 +2559,8 @@ impl< Message::Notarization{ notarization } => { view = notarization.message.proposal.as_ref().unwrap().view; if !self.interesting(view, false) { + // It is possible that we receive a notarization for a view + // that we have pruned. We should ignore this. debug!(view, "backfilled notarization is not interesting"); continue; } @@ -2568,6 +2570,8 @@ impl< Message::Nullification { nullification } => { view = nullification.view; if !self.interesting(view, false) { + // It is possible that we receive a notarization for a view + // that we have pruned. We should ignore this. debug!(view, "backfilled nullification is not interesting"); continue; }