Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion consensus/src/threshold_simplex/actors/resolver/ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct Mailbox {
}

impl Mailbox {
pub(super) fn new(sender: mpsc::Sender<Message>) -> Self {
pub(crate) fn new(sender: mpsc::Sender<Message>) -> Self {
Self { sender }
}

Expand Down
2 changes: 2 additions & 0 deletions consensus/src/threshold_simplex/actors/resolver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ pub use actor::Actor;
use commonware_cryptography::Scheme;
use governor::Quota;
pub use ingress::Mailbox;
#[cfg(test)]
pub use ingress::Message;
use std::time::Duration;

pub struct Config<C: Scheme, S: Supervisor> {
Expand Down
20 changes: 16 additions & 4 deletions consensus/src/threshold_simplex/actors/voter/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1367,6 +1367,7 @@ impl<
round.advance_deadline = Some(self.context.current() + self.notarization_timeout);
round.set_leader(seed);
self.view = view;
self.current_view.set(view as i64);

// If we are backfilling, exit early
if self.journal.is_none() {
Expand Down Expand Up @@ -1452,6 +1453,9 @@ impl<
.await
.expect("unable to prune journal");
}

// Update metrics
self.tracked_views.set(self.views.len() as i64);
}

async fn notarize(&mut self, sender: &C::PublicKey, notarize: wire::Notarize) {
Expand Down Expand Up @@ -2554,11 +2558,23 @@ impl<
match msg {
Message::Notarization{ notarization } => {
view = notarization.message.proposal.as_ref().unwrap().view;
if !self.interesting(view, false) {
// It is possible that we receive a notarization for a view
// that we have pruned. We should ignore this.
debug!(view, "backfilled notarization is not interesting");
continue;
}
debug!(view, "received notarization from backfiller");
self.handle_notarization(notarization).await;
},
Message::Nullification { nullification } => {
view = nullification.view;
if !self.interesting(view, false) {
// It is possible that we receive a notarization for a view
// that we have pruned. We should ignore this.
debug!(view, "backfilled nullification is not interesting");
continue;
}
debug!(view, "received nullification from backfiller");
self.handle_nullification(nullification).await;
},
Expand Down Expand Up @@ -2638,10 +2654,6 @@ impl<
// After sending all required messages, prune any views
// we no longer need
self.prune_views().await;

// Update metrics
self.current_view.set(view as i64);
self.tracked_views.set(self.views.len() as i64);
}
}
}
2 changes: 1 addition & 1 deletion consensus/src/threshold_simplex/actors/voter/ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub struct Mailbox<D: Array> {
}

impl<D: Array> Mailbox<D> {
pub(super) fn new(sender: mpsc::Sender<Message<D>>) -> Self {
pub(crate) fn new(sender: mpsc::Sender<Message<D>>) -> Self {
Self { sender }
}

Expand Down
315 changes: 315 additions & 0 deletions consensus/src/threshold_simplex/actors/voter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,318 @@ pub struct Config<
pub skip_timeout: View,
pub replay_concurrency: usize,
}

#[cfg(test)]
mod tests {
use super::*;
use crate::threshold_simplex::{
actors::resolver,
encoder::{
finalize_namespace, notarize_namespace, proposal_message, seed_message, seed_namespace,
},
mocks, wire, Prover,
};
use bytes::Bytes;
use commonware_cryptography::{
bls12381::{
dkg::ops,
primitives::{
group::Element,
ops::{partial_sign_message, threshold_signature_recover},
poly,
},
},
hash,
sha256::Digest,
Ed25519, Sha256, Signer,
};
use commonware_macros::test_traced;
use commonware_p2p::{
simulated::{Config as NConfig, Link, Network},
Receiver, Recipients, Sender,
};
use commonware_runtime::{deterministic::Executor, Metrics, Runner, Spawner};
use commonware_storage::journal::variable::{Config as JConfig, Journal};
use commonware_utils::quorum;
use futures::{channel::mpsc, StreamExt};
use prost::Message;
use std::time::Duration;
use std::{collections::BTreeMap, sync::Arc};

// Mock Committer (no-op)
#[derive(Clone)]
struct MockCommitter;
impl Committer for MockCommitter {
type Digest = <Sha256 as commonware_cryptography::Hasher>::Digest;
async fn prepared(&mut self, _proof: Bytes, _payload: Self::Digest) {}
async fn finalized(&mut self, _proof: Bytes, _payload: Self::Digest) {}
}

// Test for late notarization causing a panic
#[test_traced]
fn test_old_backfill() {
let n = 5;
let threshold = quorum(n).expect("unable to calculate threshold");
let namespace = b"consensus".to_vec();
let (executor, mut context, _) = Executor::timed(Duration::from_secs(10));
executor.start(async move {
// Create simulated network
let (network, mut oracle) = Network::new(
context.with_label("network"),
NConfig {
max_size: 1024 * 1024,
},
);
network.start();

// Get participants
let mut schemes = Vec::new();
let mut validators = Vec::new();
for i in 0..n {
let scheme = Ed25519::from_seed(i as u64);
let pk = scheme.public_key();
schemes.push(scheme);
validators.push(pk);
}
validators.sort();
schemes.sort_by_key(|s| s.public_key());

// Derive threshold
let (public, shares) = ops::generate_shares(&mut context, None, n, threshold);
let pk = poly::public(&public);
let prover = Prover::<Digest>::new(*pk, &namespace);

// Initialize voter actor
let (done_sender, _) = mpsc::unbounded();
let scheme = schemes[0].clone();
let validator = scheme.public_key();
let mut participants = BTreeMap::new();
participants.insert(0, (public.clone(), validators.clone(), shares[0]));
let supervisor_config = mocks::supervisor::Config {
prover: prover.clone(),
participants,
};
let supervisor = mocks::supervisor::Supervisor::new(supervisor_config);
let relay = Arc::new(mocks::relay::Relay::new());
let application_cfg = mocks::application::Config {
hasher: Sha256::default(),
relay: relay.clone(),
participant: validator.clone(),
tracker: done_sender.clone(),
propose_latency: (10.0, 5.0),
verify_latency: (10.0, 5.0),
};
let (actor, application) = mocks::application::Application::new(
context.with_label("application"),
application_cfg,
);
actor.start();
let cfg = JConfig {
partition: "test".to_string(),
};
let journal = Journal::init(context.with_label("journal"), cfg)
.await
.expect("unable to create journal");

let cfg = Config {
crypto: scheme,
automaton: application.clone(),
relay: application.clone(),
committer: MockCommitter,
supervisor,

namespace: namespace.clone(),
mailbox_size: 10,
leader_timeout: Duration::from_secs(5),
notarization_timeout: Duration::from_secs(5),
nullify_retry: Duration::from_secs(5),
activity_timeout: 10,
skip_timeout: 10,
replay_concurrency: 1,
};
let (actor, mut mailbox) = Actor::new(context.clone(), journal, cfg);

// Create a dummy backfiller mailbox (not used in this path)
let (backfiller_sender, mut backfiller_receiver) = mpsc::channel(1);
let backfiller = resolver::Mailbox::new(backfiller_sender);

// Create a dummy network mailbox
let peer = schemes[1].public_key();
let (voter_sender, voter_receiver) =
oracle.register(validator.clone(), 0).await.unwrap();
let (mut peer_sender, mut peer_receiver) =
oracle.register(peer.clone(), 0).await.unwrap();
oracle
.add_link(
validator.clone(),
peer.clone(),
Link {
latency: 0.0,
jitter: 0.0,
success_rate: 1.0,
},
)
.await
.unwrap();
oracle
.add_link(
peer,
validator,
Link {
latency: 0.0,
jitter: 0.0,
success_rate: 1.0,
},
)
.await
.unwrap();

// Drain peer receiver
context.with_label("peer_receiver").spawn(|_| async move {
loop {
peer_receiver.recv().await.unwrap();
}
});

// Run the actor, expecting it to panic
actor.start(backfiller, voter_sender, voter_receiver);

// Send finalization over network
let payload = hash(b"test");
let proposal = wire::Proposal {
view: 100,
parent: 50,
payload: payload.to_vec(),
};
let message = proposal_message(proposal.view, proposal.parent, &payload);
let finalize_namespace = finalize_namespace(&namespace);
let partials: Vec<_> = shares
.iter()
.map(|share| partial_sign_message(share, Some(&finalize_namespace), &message))
.collect();
let proposal_signature = threshold_signature_recover(threshold, partials)
.unwrap()
.serialize();
let seed_namespace = seed_namespace(&namespace);
let message = seed_message(proposal.view);
let partials: Vec<_> = shares
.iter()
.map(|share| partial_sign_message(share, Some(&seed_namespace), &message))
.collect();
let seed_signature = threshold_signature_recover(threshold, partials)
.unwrap()
.serialize();
let finalization = wire::Finalization {
proposal: Some(proposal),
proposal_signature,
seed_signature,
};
let msg = wire::Voter {
payload: Some(wire::voter::Payload::Finalization(finalization)),
}
.encode_to_vec()
.into();
peer_sender
.send(Recipients::All, msg, true)
.await
.expect("failed to send message");

// Wait for backfiller to be notified
let msg = backfiller_receiver
.next()
.await
.expect("failed to receive backfiller message");
match msg {
resolver::Message::Finalized { view } => {
assert_eq!(view, 100);
}
_ => panic!("unexpected backfiller message"),
}

// Send old notarization from backfiller
let payload = hash(b"test2");
let proposal = wire::Proposal {
view: 50,
parent: 49,
payload: payload.to_vec(),
};
let message = proposal_message(proposal.view, proposal.parent, &payload);
let notarize_namespace = notarize_namespace(&namespace);
let partials: Vec<_> = shares
.iter()
.map(|share| partial_sign_message(share, Some(&notarize_namespace), &message))
.collect();
let proposal_signature = threshold_signature_recover(threshold, partials)
.unwrap()
.serialize();
let message = seed_message(proposal.view);
let partials: Vec<_> = shares
.iter()
.map(|share| partial_sign_message(share, Some(&seed_namespace), &message))
.collect();
let seed_signature = threshold_signature_recover(threshold, partials)
.unwrap()
.serialize();
let notarization = wire::Notarization {
proposal: Some(proposal),
proposal_signature,
seed_signature,
};
let notarization = crate::Parsed {
message: notarization,
digest: payload,
};
mailbox.notarization(notarization).await;

// Send new finalization
let payload = hash(b"test3");
let proposal = wire::Proposal {
view: 300,
parent: 100,
payload: payload.to_vec(),
};
let message = proposal_message(proposal.view, proposal.parent, &payload);
let partials: Vec<_> = shares
.iter()
.map(|share| partial_sign_message(share, Some(&finalize_namespace), &message))
.collect();
let proposal_signature = threshold_signature_recover(threshold, partials)
.unwrap()
.serialize();
let message = seed_message(proposal.view);
let partials: Vec<_> = shares
.iter()
.map(|share| partial_sign_message(share, Some(&seed_namespace), &message))
.collect();
let seed_signature = threshold_signature_recover(threshold, partials)
.unwrap()
.serialize();
let finalization = wire::Finalization {
proposal: Some(proposal),
proposal_signature,
seed_signature,
};
let msg = wire::Voter {
payload: Some(wire::voter::Payload::Finalization(finalization)),
}
.encode_to_vec()
.into();
peer_sender
.send(Recipients::All, msg, true)
.await
.expect("failed to send message");

// Wait for backfiller to be notified
let msg = backfiller_receiver
.next()
.await
.expect("failed to receive backfiller message");
match msg {
resolver::Message::Finalized { view } => {
assert_eq!(view, 300);
}
_ => panic!("unexpected progress"),
}
});
}
}
Loading