Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions consensus/fuzz/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use commonware_p2p::{
simulated::{Config as NetworkConfig, Link, Network, Oracle, SplitOrigin, SplitTarget},
Recipients,
};
use commonware_parallel::Sequential;
use commonware_runtime::{
buffer::paged::CacheRef, deterministic, Clock, IoBuf, Metrics, Runner, Spawner,
};
Expand Down Expand Up @@ -402,7 +401,6 @@ where
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
strategy: Sequential,
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(context.with_label("engine"), engine_cfg);
Expand Down Expand Up @@ -636,7 +634,6 @@ fn run_with_twin_mutator<P: simplex::Simplex>(input: FuzzInput) {
replay_buffer: NZUsize!(1024 * 1024),
write_buffer: NZUsize!(1024 * 1024),
page_cache: CacheRef::from_pooler(&primary_context, PAGE_SIZE, PAGE_CACHE_SIZE),
strategy: Sequential,
forwarding: ForwardingPolicy::Disabled,
};
let engine = Engine::new(primary_context.with_label("engine"), engine_cfg);
Expand Down
5 changes: 0 additions & 5 deletions consensus/src/aggregation/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use commonware_cryptography::{
Digest,
};
use commonware_p2p::Blocker;
use commonware_parallel::Strategy;
use commonware_runtime::buffer::paged::CacheRef;
use commonware_utils::NonZeroDuration;
use std::num::{NonZeroU64, NonZeroUsize};
Expand All @@ -21,7 +20,6 @@ pub struct Config<
Z: Reporter<Activity = Activity<P::Scheme, D>>,
M: Monitor<Index = Epoch>,
B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
T: Strategy,
> {
/// Tracks the current state of consensus (to determine which participants should
/// be involved in the current broadcast attempt).
Expand Down Expand Up @@ -79,7 +77,4 @@ pub struct Config<

/// Page cache for the journal.
pub journal_page_cache: CacheRef,

/// Strategy for parallel operations.
pub strategy: T,
}
36 changes: 22 additions & 14 deletions consensus/src/aggregation/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@ use commonware_p2p::{
utils::codec::{wrap, WrappedSender},
Blocker, Receiver, Recipients, Sender,
};
use commonware_parallel::Strategy;
use commonware_runtime::{
buffer::paged::CacheRef,
spawn_cell,
telemetry::metrics::{
histogram,
status::{CounterExt, GaugeExt, Status},
},
BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage,
BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage, Strategist,
};
use commonware_storage::journal::segmented::variable::{Config as JConfig, Journal};
use commonware_utils::{futures::Pool as FuturesPool, ordered::Quorum, N3f1, PrioritySet};
Expand Down Expand Up @@ -71,14 +70,13 @@ struct DigestRequest<D: Digest, E: Clock> {

/// Instance of the engine.
pub struct Engine<
E: BufferPooler + Clock + Spawner + Storage + Metrics + CryptoRngCore,
E: BufferPooler + Clock + Spawner + Storage + Metrics + CryptoRngCore + Strategist,
P: Provider<Scope = Epoch>,
D: Digest,
A: Automaton<Context = Height, Digest = D> + Clone,
Z: Reporter<Activity = Activity<P::Scheme, D>>,
M: Monitor<Index = Epoch>,
B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
T: Strategy,
> {
// ---------- Interfaces ----------
context: ContextCell<E>,
Expand All @@ -87,7 +85,6 @@ pub struct Engine<
provider: P,
reporter: Z,
blocker: B,
strategy: T,

// Pruning
/// A tuple representing the epochs to keep in memory.
Expand Down Expand Up @@ -154,18 +151,17 @@ pub struct Engine<
}

impl<
E: BufferPooler + Clock + Spawner + Storage + Metrics + CryptoRngCore,
E: BufferPooler + Clock + Spawner + Storage + Metrics + CryptoRngCore + Strategist,
P: Provider<Scope = Epoch, Scheme: scheme::Scheme<D>>,
D: Digest,
A: Automaton<Context = Height, Digest = D> + Clone,
Z: Reporter<Activity = Activity<P::Scheme, D>>,
M: Monitor<Index = Epoch>,
B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
T: Strategy,
> Engine<E, P, D, A, Z, M, B, T>
> Engine<E, P, D, A, Z, M, B>
{
/// Creates a new engine with the given context and configuration.
pub fn new(context: E, cfg: Config<P, D, A, Z, M, B, T>) -> Self {
pub fn new(context: E, cfg: Config<P, D, A, Z, M, B>) -> Self {
// TODO(#1833): Metrics should use the post-start context
let metrics = metrics::Metrics::init(context.clone());

Expand All @@ -176,7 +172,6 @@ impl<
monitor: cfg.monitor,
provider: cfg.provider,
blocker: cfg.blocker,
strategy: cfg.strategy,
epoch_bounds: cfg.epoch_bounds,
window: HeightDelta::new(cfg.window.into()),
activity_timeout: cfg.activity_timeout,
Expand Down Expand Up @@ -385,7 +380,7 @@ impl<
}

// Validate that we need to process the ack
if let Err(err) = self.validate_ack(&ack, &sender) {
if let Err(err) = self.validate_ack(&ack, &sender).await {
if err.blockable() {
commonware_p2p::block!(
self.blocker,
Expand Down Expand Up @@ -530,9 +525,16 @@ impl<
let filtered = acks
.values()
.filter(|a| a.item.digest == ack.item.digest)
.cloned()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wonder if there is a way to avoid this clone with some more careful lifetime management?

.collect::<Vec<_>>();
if filtered.len() >= quorum as usize {
if let Some(certificate) = Certificate::from_acks(&*scheme, filtered, &self.strategy) {
let certificate = self
.context
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep strategy less tightly bound to context? We could accept strategy with it?

.with_strategy(move |strategy| {
Certificate::from_acks(&*scheme, filtered.iter(), strategy)
})
.await;
if let Some(certificate) = certificate {
self.metrics.certificates.inc();
self.handle_certificate(certificate).await;
}
Expand Down Expand Up @@ -613,7 +615,7 @@ impl<
/// Takes a raw ack (from sender) from the p2p network and validates it.
///
/// Returns an error if the ack is invalid.
fn validate_ack(
async fn validate_ack(
&mut self,
ack: &Ack<P::Scheme, D>,
sender: &<P::Scheme as Scheme>::PublicKey,
Expand Down Expand Up @@ -679,7 +681,13 @@ impl<
}

// Validate signature
if !ack.verify(&mut self.context, &*scheme, &self.strategy) {
let ack = ack.clone();
let mut rng = self.context.clone();
let valid = self
.context
.with_strategy(move |strategy| ack.verify(&mut rng, &*scheme, strategy))
.await;
if !valid {
return Err(Error::InvalidAckSignature);
}

Expand Down
6 changes: 0 additions & 6 deletions consensus/src/aggregation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ mod tests {
};
use commonware_macros::{select, test_group, test_traced};
use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
use commonware_parallel::Sequential;
use commonware_runtime::{
buffer::paged::CacheRef,
deterministic::{self, Context},
Expand Down Expand Up @@ -253,7 +252,6 @@ mod tests {
journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
journal_compression: Some(3),
journal_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
strategy: Sequential,
},
);

Expand Down Expand Up @@ -502,7 +500,6 @@ mod tests {
PAGE_SIZE,
PAGE_CACHE_SIZE,
),
strategy: Sequential,
},
);

Expand Down Expand Up @@ -656,7 +653,6 @@ mod tests {
PAGE_SIZE,
PAGE_CACHE_SIZE,
),
strategy: Sequential,
},
);

Expand Down Expand Up @@ -743,7 +739,6 @@ mod tests {
PAGE_SIZE,
PAGE_CACHE_SIZE,
),
strategy: Sequential,
},
);

Expand Down Expand Up @@ -1091,7 +1086,6 @@ mod tests {
PAGE_SIZE,
PAGE_CACHE_SIZE,
),
strategy: Sequential,
},
);

Expand Down
Loading
Loading