Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions consensus/src/aggregation/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use commonware_cryptography::{
Digest,
};
use commonware_p2p::Blocker;
use commonware_parallel::Strategy;
use commonware_parallel::Bridge;
use commonware_runtime::buffer::paged::CacheRef;
use commonware_utils::NonZeroDuration;
use std::num::{NonZeroU64, NonZeroUsize};
Expand All @@ -21,7 +21,7 @@ pub struct Config<
Z: Reporter<Activity = Activity<P::Scheme, D>>,
M: Monitor<Index = Epoch>,
B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
T: Strategy,
T: Bridge,
> {
/// Tracks the current state of consensus (to determine which participants should
/// be involved in the current broadcast attempt).
Expand Down
19 changes: 13 additions & 6 deletions consensus/src/aggregation/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use commonware_p2p::{
utils::codec::{wrap, WrappedSender},
Blocker, Receiver, Recipients, Sender,
};
use commonware_parallel::Strategy;
use commonware_parallel::Bridge;
use commonware_runtime::{
buffer::paged::CacheRef,
spawn_cell,
Expand Down Expand Up @@ -78,7 +78,7 @@ pub struct Engine<
Z: Reporter<Activity = Activity<P::Scheme, D>>,
M: Monitor<Index = Epoch>,
B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
T: Strategy,
T: Bridge,
> {
// ---------- Interfaces ----------
context: ContextCell<E>,
Expand Down Expand Up @@ -161,7 +161,7 @@ impl<
Z: Reporter<Activity = Activity<P::Scheme, D>>,
M: Monitor<Index = Epoch>,
B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
T: Strategy,
T: Bridge,
> Engine<E, P, D, A, Z, M, B, T>
{
/// Creates a new engine with the given context and configuration.
Expand Down Expand Up @@ -385,7 +385,7 @@ impl<
}

// Validate that we need to process the ack
if let Err(err) = self.validate_ack(&ack, &sender) {
if let Err(err) = self.validate_ack(&ack, &sender).await {
if err.blockable() {
commonware_p2p::block!(
self.blocker,
Expand Down Expand Up @@ -613,7 +613,7 @@ impl<
/// Takes a raw ack (from sender) from the p2p network and validates it.
///
/// Returns an error if the ack is invalid.
fn validate_ack(
async fn validate_ack(
&mut self,
ack: &Ack<P::Scheme, D>,
sender: &<P::Scheme as Scheme>::PublicKey,
Expand Down Expand Up @@ -679,7 +679,14 @@ impl<
}

// Validate signature
if !ack.verify(&mut self.context, &*scheme, &self.strategy) {
let ack_clone = ack.clone();
let scheme = scheme.clone();
let mut context = self.context.with_label("verify");
if !self
.strategy
.spawn(move |s| ack_clone.verify(&mut context, &*scheme, &s))
.await
{
return Err(Error::InvalidAckSignature);
}

Expand Down
16 changes: 8 additions & 8 deletions consensus/src/marshal/coding/marshaled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ use commonware_cryptography::{
Committable, Digestible, Hasher,
};
use commonware_macros::select;
use commonware_parallel::Strategy;
use commonware_parallel::Bridge;
use commonware_runtime::{
telemetry::metrics::histogram::{Buckets, Timed},
Clock, Metrics, Spawner, Storage,
Expand Down Expand Up @@ -140,7 +140,7 @@ where
C: CodingScheme,
H: Hasher,
Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
S: Strategy,
S: Bridge,
ES: Epocher,
{
/// The underlying application to wrap.
Expand Down Expand Up @@ -173,7 +173,7 @@ where
C: CodingScheme,
H: Hasher,
Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
S: Strategy,
S: Bridge,
ES: Epocher,
{
context: E,
Expand Down Expand Up @@ -206,7 +206,7 @@ where
C: CodingScheme,
H: Hasher,
Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
S: Strategy,
S: Bridge,
ES: Epocher,
{
/// Creates a new [`Marshaled`] wrapper.
Expand Down Expand Up @@ -448,7 +448,7 @@ where
C: CodingScheme,
H: Hasher,
Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
S: Strategy,
S: Bridge,
ES: Epocher,
{
type Digest = Commitment;
Expand Down Expand Up @@ -825,7 +825,7 @@ where
C: CodingScheme,
H: Hasher,
Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
S: Strategy,
S: Bridge,
ES: Epocher,
{
async fn certify(&mut self, round: Round, payload: Self::Digest) -> oneshot::Receiver<bool> {
Expand Down Expand Up @@ -935,7 +935,7 @@ where
C: CodingScheme,
H: Hasher,
Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
S: Strategy,
S: Bridge,
ES: Epocher,
{
type Digest = Commitment;
Expand Down Expand Up @@ -988,7 +988,7 @@ where
C: CodingScheme,
H: Hasher,
Z: Provider<Scope = Epoch, Scheme: Scheme<Commitment>>,
S: Strategy,
S: Bridge,
ES: Epocher,
{
type Activity = A::Activity;
Expand Down
74 changes: 42 additions & 32 deletions consensus/src/marshal/coding/shards/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ use commonware_p2p::{
utils::codec::{WrappedBackgroundReceiver, WrappedSender},
Blocker, Provider as PeerProvider, Receiver, Recipients, Sender,
};
use commonware_parallel::Strategy;
use commonware_parallel::Bridge;
use commonware_runtime::{
spawn_cell,
telemetry::metrics::{histogram::HistogramExt, status::GaugeExt},
Expand Down Expand Up @@ -219,7 +219,7 @@ where
C: CodingScheme,
H: Hasher,
B: CertifiableBlock,
T: Strategy,
T: Bridge,
{
/// The scheme provider.
pub scheme_provider: S,
Expand Down Expand Up @@ -280,7 +280,7 @@ where
H: Hasher,
B: CertifiableBlock,
P: PublicKey,
T: Strategy,
T: Bridge,
{
/// Context held by the actor.
context: ContextCell<E>,
Expand Down Expand Up @@ -367,7 +367,7 @@ where
H: Hasher,
B: CertifiableBlock,
P: PublicKey,
T: Strategy,
T: Bridge,
{
/// Create a new [`Engine`] with the given configuration.
pub fn new(context: E, config: Config<P, S, X, D, C, H, B, T>) -> (Self, Mailbox<B, C, H, P>) {
Expand Down Expand Up @@ -577,7 +577,7 @@ where
/// - `Ok(None)` if reconstruction could not be attempted due to insufficient checked shards.
/// - `Err(_)` if reconstruction was attempted but failed.
#[allow(clippy::type_complexity)]
fn try_reconstruct(
async fn try_reconstruct(
&mut self,
commitment: Commitment,
) -> Result<Option<Arc<CodedBlock<B, C, H>>>, Error<C>> {
Expand All @@ -591,20 +591,21 @@ where
debug!(%commitment, "not enough checked shards to reconstruct block");
return Ok(None);
}
// Attempt to reconstruct the encoded blob

// Spawn only the erasure decode (which uses the strategy for parallel
// recovery) so we can yield while it runs.
let shards = state.take_checked_shards();
let start = self.context.current();
let blob = C::decode(
&commitment.config(),
&commitment.root(),
state.checked_shards().iter(),
&self.strategy,
)
.map_err(Error::Coding)?;
let blob = self
.strategy
.spawn(move |s| C::decode(&commitment.config(), &commitment.root(), shards.iter(), &s))
.await
.map_err(Error::Coding)?;
self.metrics
.erasure_decode_duration
.observe_between(start, self.context.current());

// Attempt to decode the block from the encoded blob
// Decode the block and validate the reconstruction.
let (inner, config): (B, CodingConfig) =
Decode::decode_cfg(&mut blob.as_slice(), &(self.block_codec_cfg.clone(), ()))?;

Expand Down Expand Up @@ -872,7 +873,7 @@ where
}
}

match self.try_reconstruct(commitment) {
match self.try_reconstruct(commitment).await {
Ok(Some(block)) => {
// Do not prune other reconstruction state here. A Byzantine
// leader can equivocate by proposing multiple commitments in
Expand Down Expand Up @@ -1220,26 +1221,30 @@ where
&mut self,
commitment: Commitment,
participants_len: u64,
strategy: &impl Strategy,
strategy: &impl Bridge,
blocker: &mut impl Blocker<PublicKey = P>,
) -> Option<ReadyState<P, C, H>> {
let minimum = usize::from(commitment.config().minimum_shards.get());
if self.common.checked_shards.len() + self.pending_shards.len() < minimum {
return None;
}

// Batch-validate all pending weak shards in parallel.
// Spawn batch-validation of all pending weak shards so we can yield
// while the work runs.
let pending = std::mem::take(&mut self.pending_shards);
let (new_checked, to_block) =
strategy.map_partition_collect_vec(pending, |(peer, shard)| {
let checked = C::check(
&commitment.config(),
&commitment.root(),
shard.index,
&shard.data,
);
(peer, checked.ok())
});
let (new_checked, to_block) = strategy
.spawn(move |s| {
s.map_partition_collect_vec(pending, |(peer, shard)| {
let checked = C::check(
&commitment.config(),
&commitment.root(),
shard.index,
&shard.data,
);
(peer, checked.ok())
})
})
.await;

for peer in to_block {
commonware_p2p::block!(blocker, peer, "invalid shard received");
Expand Down Expand Up @@ -1268,22 +1273,22 @@ where
struct InsertCtx<'a, Sch, S>
where
Sch: CertificateScheme,
S: Strategy,
S: Bridge,
{
scheme: &'a Sch,
strategy: &'a S,
participants_len: u64,
}

impl<Sch: CertificateScheme, S: Strategy> Clone for InsertCtx<'_, Sch, S> {
impl<Sch: CertificateScheme, S: Bridge> Clone for InsertCtx<'_, Sch, S> {
fn clone(&self) -> Self {
*self
}
}

impl<Sch: CertificateScheme, S: Strategy> Copy for InsertCtx<'_, Sch, S> {}
impl<Sch: CertificateScheme, S: Bridge> Copy for InsertCtx<'_, Sch, S> {}

impl<'a, Sch: CertificateScheme, S: Strategy> InsertCtx<'a, Sch, S> {
impl<'a, Sch: CertificateScheme, S: Bridge> InsertCtx<'a, Sch, S> {
fn new(scheme: &'a Sch, strategy: &'a S) -> Self {
let participants_len = u64::try_from(scheme.participants().len())
.expect("participant count impossibly out of bounds");
Expand Down Expand Up @@ -1345,6 +1350,11 @@ where
self.common().checked_shards.as_slice()
}

/// Takes the verified shards out of the state, leaving it empty.
fn take_checked_shards(&mut self) -> Vec<C::CheckedShard> {
std::mem::take(&mut self.common_mut().checked_shards)
}

/// Takes the pending action for this commitment's validated shard.
///
/// Returns [`None`] if the leader's shard hasn't been validated yet.
Expand Down Expand Up @@ -1407,7 +1417,7 @@ where
) -> bool
where
Sch: CertificateScheme<PublicKey = P>,
S: Strategy,
S: Bridge,
X: Blocker<PublicKey = P>,
{
let Some(sender_index) = ctx.scheme.participants().index(&sender) else {
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/marshal/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
Block,
};
use commonware_cryptography::certificate::Provider;
use commonware_parallel::Strategy;
use commonware_parallel::Bridge;
use commonware_runtime::buffer::paged::CacheRef;
use std::num::{NonZeroU64, NonZeroUsize};

Expand Down Expand Up @@ -31,7 +31,7 @@ where
B: Block,
P: Provider<Scope = Epoch>,
ES: Epocher,
T: Strategy,
T: Bridge,
{
/// Provider for epoch-specific signing schemes.
///
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/marshal/core/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use commonware_cryptography::{
};
use commonware_macros::select_loop;
use commonware_p2p::Recipients;
use commonware_parallel::Strategy;
use commonware_parallel::Bridge;
use commonware_resolver::Resolver;
use commonware_runtime::{
spawn_cell, telemetry::metrics::status::GaugeExt, BufferPooler, Clock, ContextCell, Handle,
Expand Down Expand Up @@ -210,7 +210,7 @@ where
>,
FB: Blocks<Block = V::StoredBlock>,
ES: Epocher,
T: Strategy,
T: Bridge,
A: Acknowledgement,
{
// ---------- Context ----------
Expand Down Expand Up @@ -275,7 +275,7 @@ where
>,
FB: Blocks<Block = V::StoredBlock>,
ES: Epocher,
T: Strategy,
T: Bridge,
A: Acknowledgement,
{
/// Create a new application actor.
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/ordered_broadcast/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::{
Automaton, Monitor, Relay, Reporter,
};
use commonware_cryptography::{certificate::Provider, Digest, Signer};
use commonware_parallel::Strategy;
use commonware_parallel::Bridge;
use commonware_runtime::buffer::paged::CacheRef;
use std::{
num::{NonZeroU64, NonZeroUsize},
Expand All @@ -21,7 +21,7 @@ pub struct Config<
R: Relay<Digest = D, PublicKey = C::PublicKey, Plan = ()>,
Z: Reporter<Activity = Activity<C::PublicKey, P::Scheme, D>>,
M: Monitor<Index = Epoch>,
T: Strategy,
T: Bridge,
> {
/// The signer used when this engine acts as a sequencer.
///
Expand Down
Loading
Loading