diff --git a/consensus/src/threshold_simplex/actors/resolver/ingress.rs b/consensus/src/threshold_simplex/actors/resolver/ingress.rs index ae57b56b2b6..da2841a052f 100644 --- a/consensus/src/threshold_simplex/actors/resolver/ingress.rs +++ b/consensus/src/threshold_simplex/actors/resolver/ingress.rs @@ -24,7 +24,7 @@ pub struct Mailbox { } impl Mailbox { - pub(super) fn new(sender: mpsc::Sender) -> Self { + pub(crate) fn new(sender: mpsc::Sender) -> Self { Self { sender } } diff --git a/consensus/src/threshold_simplex/actors/resolver/mod.rs b/consensus/src/threshold_simplex/actors/resolver/mod.rs index 247d4a2b096..d49f8c45f5c 100644 --- a/consensus/src/threshold_simplex/actors/resolver/mod.rs +++ b/consensus/src/threshold_simplex/actors/resolver/mod.rs @@ -6,6 +6,8 @@ pub use actor::Actor; use commonware_cryptography::Scheme; use governor::Quota; pub use ingress::Mailbox; +#[cfg(test)] +pub use ingress::Message; use std::time::Duration; pub struct Config { diff --git a/consensus/src/threshold_simplex/actors/voter/actor.rs b/consensus/src/threshold_simplex/actors/voter/actor.rs index 776a91de964..1a4400b3ec0 100644 --- a/consensus/src/threshold_simplex/actors/voter/actor.rs +++ b/consensus/src/threshold_simplex/actors/voter/actor.rs @@ -1367,6 +1367,7 @@ impl< round.advance_deadline = Some(self.context.current() + self.notarization_timeout); round.set_leader(seed); self.view = view; + self.current_view.set(view as i64); // If we are backfilling, exit early if self.journal.is_none() { @@ -1452,6 +1453,9 @@ impl< .await .expect("unable to prune journal"); } + + // Update metrics + self.tracked_views.set(self.views.len() as i64); } async fn notarize(&mut self, sender: &C::PublicKey, notarize: wire::Notarize) { @@ -2554,11 +2558,23 @@ impl< match msg { Message::Notarization{ notarization } => { view = notarization.message.proposal.as_ref().unwrap().view; + if !self.interesting(view, false) { + // It is possible that we receive a notarization for a view + // that we have pruned. We should ignore this. + debug!(view, "backfilled notarization is not interesting"); + continue; + } debug!(view, "received notarization from backfiller"); self.handle_notarization(notarization).await; }, Message::Nullification { nullification } => { view = nullification.view; + if !self.interesting(view, false) { + // It is possible that we receive a notarization for a view + // that we have pruned. We should ignore this. + debug!(view, "backfilled nullification is not interesting"); + continue; + } debug!(view, "received nullification from backfiller"); self.handle_nullification(nullification).await; }, @@ -2638,10 +2654,6 @@ impl< // After sending all required messages, prune any views // we no longer need self.prune_views().await; - - // Update metrics - self.current_view.set(view as i64); - self.tracked_views.set(self.views.len() as i64); } } } diff --git a/consensus/src/threshold_simplex/actors/voter/ingress.rs b/consensus/src/threshold_simplex/actors/voter/ingress.rs index 8a8d1b1e85f..d8d5a0b287f 100644 --- a/consensus/src/threshold_simplex/actors/voter/ingress.rs +++ b/consensus/src/threshold_simplex/actors/voter/ingress.rs @@ -18,7 +18,7 @@ pub struct Mailbox { } impl Mailbox { - pub(super) fn new(sender: mpsc::Sender>) -> Self { + pub(crate) fn new(sender: mpsc::Sender>) -> Self { Self { sender } } diff --git a/consensus/src/threshold_simplex/actors/voter/mod.rs b/consensus/src/threshold_simplex/actors/voter/mod.rs index 9ac6cb28ade..2d9f1fbc292 100644 --- a/consensus/src/threshold_simplex/actors/voter/mod.rs +++ b/consensus/src/threshold_simplex/actors/voter/mod.rs @@ -35,3 +35,318 @@ pub struct Config< pub skip_timeout: View, pub replay_concurrency: usize, } + +#[cfg(test)] +mod tests { + use super::*; + use crate::threshold_simplex::{ + actors::resolver, + encoder::{ + finalize_namespace, notarize_namespace, proposal_message, seed_message, seed_namespace, + }, + mocks, wire, Prover, + }; + use bytes::Bytes; + use commonware_cryptography::{ + bls12381::{ + dkg::ops, + primitives::{ + group::Element, + ops::{partial_sign_message, threshold_signature_recover}, + poly, + }, + }, + hash, + sha256::Digest, + Ed25519, Sha256, Signer, + }; + use commonware_macros::test_traced; + use commonware_p2p::{ + simulated::{Config as NConfig, Link, Network}, + Receiver, Recipients, Sender, + }; + use commonware_runtime::{deterministic::Executor, Metrics, Runner, Spawner}; + use commonware_storage::journal::variable::{Config as JConfig, Journal}; + use commonware_utils::quorum; + use futures::{channel::mpsc, StreamExt}; + use prost::Message; + use std::time::Duration; + use std::{collections::BTreeMap, sync::Arc}; + + // Mock Committer (no-op) + #[derive(Clone)] + struct MockCommitter; + impl Committer for MockCommitter { + type Digest = ::Digest; + async fn prepared(&mut self, _proof: Bytes, _payload: Self::Digest) {} + async fn finalized(&mut self, _proof: Bytes, _payload: Self::Digest) {} + } + + // Test for late notarization causing a panic + #[test_traced] + fn test_old_backfill() { + let n = 5; + let threshold = quorum(n).expect("unable to calculate threshold"); + let namespace = b"consensus".to_vec(); + let (executor, mut context, _) = Executor::timed(Duration::from_secs(10)); + executor.start(async move { + // Create simulated network + let (network, mut oracle) = Network::new( + context.with_label("network"), + NConfig { + max_size: 1024 * 1024, + }, + ); + network.start(); + + // Get participants + let mut schemes = Vec::new(); + let mut validators = Vec::new(); + for i in 0..n { + let scheme = Ed25519::from_seed(i as u64); + let pk = scheme.public_key(); + schemes.push(scheme); + validators.push(pk); + } + validators.sort(); + schemes.sort_by_key(|s| s.public_key()); + + // Derive threshold + let (public, shares) = ops::generate_shares(&mut context, None, n, threshold); + let pk = poly::public(&public); + let prover = Prover::::new(*pk, &namespace); + + // Initialize voter actor + let (done_sender, _) = mpsc::unbounded(); + let scheme = schemes[0].clone(); + let validator = scheme.public_key(); + let mut participants = BTreeMap::new(); + participants.insert(0, (public.clone(), validators.clone(), shares[0])); + let supervisor_config = mocks::supervisor::Config { + prover: prover.clone(), + participants, + }; + let supervisor = mocks::supervisor::Supervisor::new(supervisor_config); + let relay = Arc::new(mocks::relay::Relay::new()); + let application_cfg = mocks::application::Config { + hasher: Sha256::default(), + relay: relay.clone(), + participant: validator.clone(), + tracker: done_sender.clone(), + propose_latency: (10.0, 5.0), + verify_latency: (10.0, 5.0), + }; + let (actor, application) = mocks::application::Application::new( + context.with_label("application"), + application_cfg, + ); + actor.start(); + let cfg = JConfig { + partition: "test".to_string(), + }; + let journal = Journal::init(context.with_label("journal"), cfg) + .await + .expect("unable to create journal"); + + let cfg = Config { + crypto: scheme, + automaton: application.clone(), + relay: application.clone(), + committer: MockCommitter, + supervisor, + + namespace: namespace.clone(), + mailbox_size: 10, + leader_timeout: Duration::from_secs(5), + notarization_timeout: Duration::from_secs(5), + nullify_retry: Duration::from_secs(5), + activity_timeout: 10, + skip_timeout: 10, + replay_concurrency: 1, + }; + let (actor, mut mailbox) = Actor::new(context.clone(), journal, cfg); + + // Create a dummy backfiller mailbox (not used in this path) + let (backfiller_sender, mut backfiller_receiver) = mpsc::channel(1); + let backfiller = resolver::Mailbox::new(backfiller_sender); + + // Create a dummy network mailbox + let peer = schemes[1].public_key(); + let (voter_sender, voter_receiver) = + oracle.register(validator.clone(), 0).await.unwrap(); + let (mut peer_sender, mut peer_receiver) = + oracle.register(peer.clone(), 0).await.unwrap(); + oracle + .add_link( + validator.clone(), + peer.clone(), + Link { + latency: 0.0, + jitter: 0.0, + success_rate: 1.0, + }, + ) + .await + .unwrap(); + oracle + .add_link( + peer, + validator, + Link { + latency: 0.0, + jitter: 0.0, + success_rate: 1.0, + }, + ) + .await + .unwrap(); + + // Drain peer receiver + context.with_label("peer_receiver").spawn(|_| async move { + loop { + peer_receiver.recv().await.unwrap(); + } + }); + + // Run the actor, expecting it to panic + actor.start(backfiller, voter_sender, voter_receiver); + + // Send finalization over network + let payload = hash(b"test"); + let proposal = wire::Proposal { + view: 100, + parent: 50, + payload: payload.to_vec(), + }; + let message = proposal_message(proposal.view, proposal.parent, &payload); + let finalize_namespace = finalize_namespace(&namespace); + let partials: Vec<_> = shares + .iter() + .map(|share| partial_sign_message(share, Some(&finalize_namespace), &message)) + .collect(); + let proposal_signature = threshold_signature_recover(threshold, partials) + .unwrap() + .serialize(); + let seed_namespace = seed_namespace(&namespace); + let message = seed_message(proposal.view); + let partials: Vec<_> = shares + .iter() + .map(|share| partial_sign_message(share, Some(&seed_namespace), &message)) + .collect(); + let seed_signature = threshold_signature_recover(threshold, partials) + .unwrap() + .serialize(); + let finalization = wire::Finalization { + proposal: Some(proposal), + proposal_signature, + seed_signature, + }; + let msg = wire::Voter { + payload: Some(wire::voter::Payload::Finalization(finalization)), + } + .encode_to_vec() + .into(); + peer_sender + .send(Recipients::All, msg, true) + .await + .expect("failed to send message"); + + // Wait for backfiller to be notified + let msg = backfiller_receiver + .next() + .await + .expect("failed to receive backfiller message"); + match msg { + resolver::Message::Finalized { view } => { + assert_eq!(view, 100); + } + _ => panic!("unexpected backfiller message"), + } + + // Send old notarization from backfiller + let payload = hash(b"test2"); + let proposal = wire::Proposal { + view: 50, + parent: 49, + payload: payload.to_vec(), + }; + let message = proposal_message(proposal.view, proposal.parent, &payload); + let notarize_namespace = notarize_namespace(&namespace); + let partials: Vec<_> = shares + .iter() + .map(|share| partial_sign_message(share, Some(¬arize_namespace), &message)) + .collect(); + let proposal_signature = threshold_signature_recover(threshold, partials) + .unwrap() + .serialize(); + let message = seed_message(proposal.view); + let partials: Vec<_> = shares + .iter() + .map(|share| partial_sign_message(share, Some(&seed_namespace), &message)) + .collect(); + let seed_signature = threshold_signature_recover(threshold, partials) + .unwrap() + .serialize(); + let notarization = wire::Notarization { + proposal: Some(proposal), + proposal_signature, + seed_signature, + }; + let notarization = crate::Parsed { + message: notarization, + digest: payload, + }; + mailbox.notarization(notarization).await; + + // Send new finalization + let payload = hash(b"test3"); + let proposal = wire::Proposal { + view: 300, + parent: 100, + payload: payload.to_vec(), + }; + let message = proposal_message(proposal.view, proposal.parent, &payload); + let partials: Vec<_> = shares + .iter() + .map(|share| partial_sign_message(share, Some(&finalize_namespace), &message)) + .collect(); + let proposal_signature = threshold_signature_recover(threshold, partials) + .unwrap() + .serialize(); + let message = seed_message(proposal.view); + let partials: Vec<_> = shares + .iter() + .map(|share| partial_sign_message(share, Some(&seed_namespace), &message)) + .collect(); + let seed_signature = threshold_signature_recover(threshold, partials) + .unwrap() + .serialize(); + let finalization = wire::Finalization { + proposal: Some(proposal), + proposal_signature, + seed_signature, + }; + let msg = wire::Voter { + payload: Some(wire::voter::Payload::Finalization(finalization)), + } + .encode_to_vec() + .into(); + peer_sender + .send(Recipients::All, msg, true) + .await + .expect("failed to send message"); + + // Wait for backfiller to be notified + let msg = backfiller_receiver + .next() + .await + .expect("failed to receive backfiller message"); + match msg { + resolver::Message::Finalized { view } => { + assert_eq!(view, 300); + } + _ => panic!("unexpected progress"), + } + }); + } +} diff --git a/consensus/src/threshold_simplex/mocks/mod.rs b/consensus/src/threshold_simplex/mocks/mod.rs index b9a3a949f84..c3e783b7d2b 100644 --- a/consensus/src/threshold_simplex/mocks/mod.rs +++ b/consensus/src/threshold_simplex/mocks/mod.rs @@ -3,5 +3,6 @@ pub mod application; pub mod conflicter; pub mod nuller; +pub mod outdated; pub mod relay; pub mod supervisor; diff --git a/consensus/src/threshold_simplex/mocks/outdated.rs b/consensus/src/threshold_simplex/mocks/outdated.rs new file mode 100644 index 00000000000..31cd775231a --- /dev/null +++ b/consensus/src/threshold_simplex/mocks/outdated.rs @@ -0,0 +1,219 @@ +//! Byzantine participant that sends conflicting notarize/finalize messages. + +use crate::{ + threshold_simplex::{ + encoder::{ + finalize_namespace, notarize_namespace, nullify_message, nullify_namespace, + proposal_message, seed_message, seed_namespace, + }, + wire::{self, Proposal}, + View, + }, + ThresholdSupervisor, +}; +use bytes::Bytes; +use commonware_cryptography::{ + bls12381::primitives::{group, ops}, + Hasher, +}; +use commonware_p2p::{Receiver, Recipients, Sender}; +use commonware_runtime::{Clock, Handle, Spawner}; +use prost::Message; +use rand::{CryptoRng, Rng}; +use std::{collections::HashMap, marker::PhantomData}; +use tracing::debug; + +pub struct Config< + S: ThresholdSupervisor, +> { + pub supervisor: S, + pub namespace: Vec, + pub view_delta: u64, +} + +pub struct Outdated< + E: Clock + Rng + CryptoRng + Spawner, + H: Hasher, + S: ThresholdSupervisor, +> { + context: E, + supervisor: S, + _hasher: PhantomData, + + seed_namespace: Vec, + notarize_namespace: Vec, + nullify_namespace: Vec, + finalize_namespace: Vec, + + history: HashMap, + view_delta: u64, +} + +impl< + E: Clock + Rng + CryptoRng + Spawner, + H: Hasher, + S: ThresholdSupervisor, + > Outdated +{ + pub fn new(context: E, cfg: Config) -> Self { + Self { + context, + supervisor: cfg.supervisor, + _hasher: PhantomData, + + seed_namespace: seed_namespace(&cfg.namespace), + notarize_namespace: notarize_namespace(&cfg.namespace), + nullify_namespace: nullify_namespace(&cfg.namespace), + finalize_namespace: finalize_namespace(&cfg.namespace), + + history: HashMap::new(), + view_delta: cfg.view_delta, + } + } + + pub fn start(mut self, voter_network: (impl Sender, impl Receiver)) -> Handle<()> { + self.context.spawn_ref()(self.run(voter_network)) + } + + async fn run(mut self, voter_network: (impl Sender, impl Receiver)) { + let (mut sender, mut receiver) = voter_network; + while let Ok((s, msg)) = receiver.recv().await { + // Parse message + let msg = match wire::Voter::decode(msg) { + Ok(msg) => msg, + Err(err) => { + debug!(?err, sender = ?s, "failed to decode message"); + continue; + } + }; + let payload = match msg.payload { + Some(payload) => payload, + None => { + debug!(sender = ?s, "message missing payload"); + continue; + } + }; + + // Process message + match payload { + wire::voter::Payload::Nullify(nullify) => { + // Get our index + let view = nullify.view; + let share = self.supervisor.share(view).unwrap(); + + // Nullify provided view + let view = view.saturating_sub(self.view_delta); + debug!(?view, "nullifying outdated view"); + let message = nullify_message(view); + let view_signature = + ops::partial_sign_message(share, Some(&self.nullify_namespace), &message) + .serialize(); + let message = seed_message(view); + let seed_signature = + ops::partial_sign_message(share, Some(&self.seed_namespace), &message) + .serialize(); + let n = wire::Nullify { + view, + view_signature, + seed_signature, + }; + let msg = wire::Voter { + payload: Some(wire::voter::Payload::Nullify(n)), + } + .encode_to_vec() + .into(); + sender.send(Recipients::All, msg, true).await.unwrap(); + } + wire::voter::Payload::Notarize(notarize) => { + // Get our index + let proposal = match notarize.proposal { + Some(proposal) => proposal, + None => { + debug!(sender = ?s, "notarize missing proposal"); + continue; + } + }; + let Ok(payload) = H::Digest::try_from(&proposal.payload) else { + debug!(sender = ?s, "invalid payload"); + continue; + }; + let view = proposal.view; + let share = self.supervisor.share(view).unwrap(); + + // Store proposal + self.history.insert(view, proposal.clone()); + + // Notarize old digest + let view = view.saturating_sub(self.view_delta); + let Some(proposal) = self.history.get(&view) else { + continue; + }; + debug!(?view, "notarizing outdated proposal"); + let parent = proposal.parent; + let message = proposal_message(proposal.view, parent, &payload); + let proposal_signature = + ops::partial_sign_message(share, Some(&self.notarize_namespace), &message) + .serialize(); + let message = seed_message(view); + let seed_signature: Bytes = + ops::partial_sign_message(share, Some(&self.seed_namespace), &message) + .serialize() + .into(); + let n = wire::Notarize { + proposal: Some(proposal.clone()), + proposal_signature, + seed_signature: seed_signature.to_vec(), + }; + let msg = wire::Voter { + payload: Some(wire::voter::Payload::Notarize(n)), + } + .encode_to_vec() + .into(); + sender.send(Recipients::All, msg, true).await.unwrap(); + } + wire::voter::Payload::Finalize(finalize) => { + // Get our index + let proposal = match finalize.proposal { + Some(proposal) => proposal, + None => { + debug!(sender = ?s, "notarize missing proposal"); + continue; + } + }; + let Ok(payload) = H::Digest::try_from(&proposal.payload) else { + debug!(sender = ?s, "invalid payload"); + continue; + }; + let view = proposal.view; + let share = self.supervisor.share(view).unwrap(); + + // Store proposal + self.history.insert(view, proposal.clone()); + + // Finalize old digest + let view = view.saturating_sub(self.view_delta); + let Some(proposal) = self.history.get(&view) else { + continue; + }; + debug!(?view, "finalizing outdated proposal"); + let parent = proposal.parent; + let message = proposal_message(view, parent, &payload); + let proposal_signature = + ops::partial_sign_message(share, Some(&self.finalize_namespace), &message) + .serialize(); + let f = wire::Finalize { + proposal: Some(proposal.clone()), + proposal_signature, + }; + let msg = wire::Voter { + payload: Some(wire::voter::Payload::Finalize(f)), + } + .encode_to_vec() + .into(); + sender.send(Recipients::All, msg, true).await.unwrap(); + } + _ => continue, + } + } + } +} diff --git a/consensus/src/threshold_simplex/mod.rs b/consensus/src/threshold_simplex/mod.rs index 297501a5c58..7c54529ffdb 100644 --- a/consensus/src/threshold_simplex/mod.rs +++ b/consensus/src/threshold_simplex/mod.rs @@ -2376,4 +2376,169 @@ skip_timeout, assert!(count_nullify_and_finalize > 0); }); } + + #[test_traced] + fn test_outdated() { + // Create context + let n = 4; + let threshold = quorum(n).expect("unable to calculate threshold"); + let required_containers = 100; + let activity_timeout = 10; + let skip_timeout = 5; + let namespace = b"consensus".to_vec(); + let (executor, mut context, _) = Executor::timed(Duration::from_secs(30)); + executor.start(async move { + // Create simulated network + let (network, mut oracle) = Network::new( + context.with_label("network"), + Config { + max_size: 1024 * 1024, + }, + ); + + // Start network + network.start(); + + // Register participants + let mut schemes = Vec::new(); + let mut validators = Vec::new(); + for i in 0..n { + let scheme = Ed25519::from_seed(i as u64); + let pk = scheme.public_key(); + schemes.push(scheme); + validators.push(pk); + } + validators.sort(); + schemes.sort_by_key(|s| s.public_key()); + let mut registrations = register_validators(&mut oracle, &validators).await; + + // Link all validators + let link = Link { + latency: 10.0, + jitter: 1.0, + success_rate: 1.0, + }; + link_validators(&mut oracle, &validators, Action::Link(link), None).await; + + // Derive threshold + let (public, shares) = ops::generate_shares(&mut context, None, n, threshold); + let pk = poly::public(&public); + let prover = Prover::new(*pk, &namespace); + + // Create engines + let relay = Arc::new(mocks::relay::Relay::new()); + let mut supervisors = Vec::new(); + let (done_sender, mut done_receiver) = mpsc::unbounded(); + for (idx_scheme, scheme) in schemes.into_iter().enumerate() { + // Create scheme context + let context = context.with_label(&format!("validator-{}", scheme.public_key())); + + // Start engine + let validator = scheme.public_key(); + let mut participants = BTreeMap::new(); + participants.insert(0, (public.clone(), validators.clone(), shares[idx_scheme])); + let supervisor_config = mocks::supervisor::Config { + prover: prover.clone(), + participants, + }; + let supervisor = mocks::supervisor::Supervisor::new(supervisor_config); + let (voter, resolver) = registrations + .remove(&validator) + .expect("validator should be registered"); + if idx_scheme == 0 { + let cfg = mocks::outdated::Config { + supervisor, + namespace: namespace.clone(), + view_delta: activity_timeout + 10, + }; + let engine: mocks::outdated::Outdated<_, Sha256, _> = + mocks::outdated::Outdated::new(context.with_label("byzantine_engine"), cfg); + engine.start(voter); + } else { + supervisors.push(supervisor.clone()); + let application_cfg = mocks::application::Config { + hasher: Sha256::default(), + relay: relay.clone(), + participant: validator.clone(), + tracker: done_sender.clone(), + propose_latency: (10.0, 5.0), + verify_latency: (10.0, 5.0), + }; + let (actor, application) = mocks::application::Application::new( + context.with_label("application"), + application_cfg, + ); + actor.start(); + let cfg = JConfig { + partition: validator.to_string(), + }; + let journal = Journal::init(context.with_label("journal"), cfg) + .await + .expect("unable to create journal"); + let cfg = config::Config { + crypto: scheme, + automaton: application.clone(), + relay: application.clone(), + committer: application, + supervisor, + mailbox_size: 1024, + namespace: namespace.clone(), + leader_timeout: Duration::from_secs(1), + notarization_timeout: Duration::from_secs(2), + nullify_retry: Duration::from_secs(10), + fetch_timeout: Duration::from_secs(1), + activity_timeout, + skip_timeout, + max_fetch_count: 1, + max_fetch_size: 1024 * 512, + fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(1).unwrap()), + fetch_concurrent: 1, + replay_concurrency: 1, + }; + let engine = Engine::new(context.with_label("engine"), journal, cfg); + engine.start(voter, resolver); + } + } + + // Wait for all engines to finish + let mut completed = HashSet::new(); + let mut finalized = HashMap::new(); + loop { + let (validator, event) = done_receiver.next().await.unwrap(); + if let mocks::application::Progress::Finalized(proof, digest) = event { + let (view, _, payload, _, _) = prover.deserialize_finalization(proof).unwrap(); + if digest != payload { + panic!( + "finalization mismatch digest: {:?}, payload: {:?}", + digest, payload + ); + } + if let Some(previous) = finalized.insert(view, digest) { + if previous != digest { + panic!( + "finalization mismatch at {:?} previous: {:?}, current: {:?}", + view, previous, digest + ); + } + } + if (finalized.len() as u64) < required_containers { + continue; + } + completed.insert(validator); + } + if completed.len() == (n - 1) as usize { + break; + } + } + + // Check supervisors for correct activity + for supervisor in supervisors.iter() { + // No faults + { + let faults = supervisor.faults.lock().unwrap(); + assert_eq!(faults.len(), 0); + } + } + }); + } }