Skip to content

Commit a6f5812

Browse files
committed
[runtime] Create a trait for briding async and parallel code
When calling into code doing heavy computation using a Strategy, you want to avoid blocking the tokio thread on waiting for the computation. Instead, you want the waiting to be async, so that other tasks can use the thread while you wait for the computation to finish in the rayon thread pool. To facilitate this pattern, a new capability, Strategist, is added to to the runtime Context. This takes in a synchronous closure, which needs a strategy to do parallel computation. Runtimes implement this correctly, by using, e.g. a oneshot channel to asynchronously wait on the rayon result, or using the tokio blocking pool for a sequential strategy. To use this change throughout the code base, places where a Strategy was stored have been replaced with using the context instead.
1 parent c6a805b commit a6f5812

33 files changed

Lines changed: 820 additions & 521 deletions

File tree

consensus/fuzz/src/lib.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ use commonware_p2p::{
3333
simulated::{Config as NetworkConfig, Link, Network, Oracle, SplitOrigin, SplitTarget},
3434
Recipients,
3535
};
36-
use commonware_parallel::Sequential;
3736
use commonware_runtime::{
3837
buffer::paged::CacheRef, deterministic, Clock, IoBuf, Metrics, Runner, Spawner,
3938
};
@@ -402,7 +401,6 @@ where
402401
replay_buffer: NZUsize!(1024 * 1024),
403402
write_buffer: NZUsize!(1024 * 1024),
404403
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
405-
strategy: Sequential,
406404
forwarding: ForwardingPolicy::Disabled,
407405
};
408406
let engine = Engine::new(context.with_label("engine"), engine_cfg);
@@ -636,7 +634,6 @@ fn run_with_twin_mutator<P: simplex::Simplex>(input: FuzzInput) {
636634
replay_buffer: NZUsize!(1024 * 1024),
637635
write_buffer: NZUsize!(1024 * 1024),
638636
page_cache: CacheRef::from_pooler(&primary_context, PAGE_SIZE, PAGE_CACHE_SIZE),
639-
strategy: Sequential,
640637
forwarding: ForwardingPolicy::Disabled,
641638
};
642639
let engine = Engine::new(primary_context.with_label("engine"), engine_cfg);

consensus/src/aggregation/config.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use commonware_cryptography::{
88
Digest,
99
};
1010
use commonware_p2p::Blocker;
11-
use commonware_parallel::Strategy;
1211
use commonware_runtime::buffer::paged::CacheRef;
1312
use commonware_utils::NonZeroDuration;
1413
use std::num::{NonZeroU64, NonZeroUsize};
@@ -21,7 +20,6 @@ pub struct Config<
2120
Z: Reporter<Activity = Activity<P::Scheme, D>>,
2221
M: Monitor<Index = Epoch>,
2322
B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
24-
T: Strategy,
2523
> {
2624
/// Tracks the current state of consensus (to determine which participants should
2725
/// be involved in the current broadcast attempt).
@@ -79,7 +77,4 @@ pub struct Config<
7977

8078
/// Page cache for the journal.
8179
pub journal_page_cache: CacheRef,
82-
83-
/// Strategy for parallel operations.
84-
pub strategy: T,
8580
}

consensus/src/aggregation/engine.rs

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,14 @@ use commonware_p2p::{
2020
utils::codec::{wrap, WrappedSender},
2121
Blocker, Receiver, Recipients, Sender,
2222
};
23-
use commonware_parallel::Strategy;
2423
use commonware_runtime::{
2524
buffer::paged::CacheRef,
2625
spawn_cell,
2726
telemetry::metrics::{
2827
histogram,
2928
status::{CounterExt, GaugeExt, Status},
3029
},
31-
BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage,
30+
BufferPooler, Clock, ContextCell, Handle, Metrics, Spawner, Storage, Strategist,
3231
};
3332
use commonware_storage::journal::segmented::variable::{Config as JConfig, Journal};
3433
use commonware_utils::{futures::Pool as FuturesPool, ordered::Quorum, N3f1, PrioritySet};
@@ -71,14 +70,13 @@ struct DigestRequest<D: Digest, E: Clock> {
7170

7271
/// Instance of the engine.
7372
pub struct Engine<
74-
E: BufferPooler + Clock + Spawner + Storage + Metrics + CryptoRngCore,
73+
E: BufferPooler + Clock + Spawner + Storage + Metrics + CryptoRngCore + Strategist,
7574
P: Provider<Scope = Epoch>,
7675
D: Digest,
7776
A: Automaton<Context = Height, Digest = D> + Clone,
7877
Z: Reporter<Activity = Activity<P::Scheme, D>>,
7978
M: Monitor<Index = Epoch>,
8079
B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
81-
T: Strategy,
8280
> {
8381
// ---------- Interfaces ----------
8482
context: ContextCell<E>,
@@ -87,7 +85,6 @@ pub struct Engine<
8785
provider: P,
8886
reporter: Z,
8987
blocker: B,
90-
strategy: T,
9188

9289
// Pruning
9390
/// A tuple representing the epochs to keep in memory.
@@ -154,18 +151,17 @@ pub struct Engine<
154151
}
155152

156153
impl<
157-
E: BufferPooler + Clock + Spawner + Storage + Metrics + CryptoRngCore,
154+
E: BufferPooler + Clock + Spawner + Storage + Metrics + CryptoRngCore + Strategist,
158155
P: Provider<Scope = Epoch, Scheme: scheme::Scheme<D>>,
159156
D: Digest,
160157
A: Automaton<Context = Height, Digest = D> + Clone,
161158
Z: Reporter<Activity = Activity<P::Scheme, D>>,
162159
M: Monitor<Index = Epoch>,
163160
B: Blocker<PublicKey = <P::Scheme as Scheme>::PublicKey>,
164-
T: Strategy,
165-
> Engine<E, P, D, A, Z, M, B, T>
161+
> Engine<E, P, D, A, Z, M, B>
166162
{
167163
/// Creates a new engine with the given context and configuration.
168-
pub fn new(context: E, cfg: Config<P, D, A, Z, M, B, T>) -> Self {
164+
pub fn new(context: E, cfg: Config<P, D, A, Z, M, B>) -> Self {
169165
// TODO(#1833): Metrics should use the post-start context
170166
let metrics = metrics::Metrics::init(context.clone());
171167

@@ -176,7 +172,6 @@ impl<
176172
monitor: cfg.monitor,
177173
provider: cfg.provider,
178174
blocker: cfg.blocker,
179-
strategy: cfg.strategy,
180175
epoch_bounds: cfg.epoch_bounds,
181176
window: HeightDelta::new(cfg.window.into()),
182177
activity_timeout: cfg.activity_timeout,
@@ -385,7 +380,7 @@ impl<
385380
}
386381

387382
// Validate that we need to process the ack
388-
if let Err(err) = self.validate_ack(&ack, &sender) {
383+
if let Err(err) = self.validate_ack(&ack, &sender).await {
389384
if err.blockable() {
390385
commonware_p2p::block!(
391386
self.blocker,
@@ -530,9 +525,16 @@ impl<
530525
let filtered = acks
531526
.values()
532527
.filter(|a| a.item.digest == ack.item.digest)
528+
.cloned()
533529
.collect::<Vec<_>>();
534530
if filtered.len() >= quorum as usize {
535-
if let Some(certificate) = Certificate::from_acks(&*scheme, filtered, &self.strategy) {
531+
let certificate = self
532+
.context
533+
.with_strategy(move |strategy| {
534+
Certificate::from_acks(&*scheme, filtered.iter(), strategy)
535+
})
536+
.await;
537+
if let Some(certificate) = certificate {
536538
self.metrics.certificates.inc();
537539
self.handle_certificate(certificate).await;
538540
}
@@ -613,7 +615,7 @@ impl<
613615
/// Takes a raw ack (from sender) from the p2p network and validates it.
614616
///
615617
/// Returns an error if the ack is invalid.
616-
fn validate_ack(
618+
async fn validate_ack(
617619
&mut self,
618620
ack: &Ack<P::Scheme, D>,
619621
sender: &<P::Scheme as Scheme>::PublicKey,
@@ -679,7 +681,13 @@ impl<
679681
}
680682

681683
// Validate signature
682-
if !ack.verify(&mut self.context, &*scheme, &self.strategy) {
684+
let ack = ack.clone();
685+
let mut rng = self.context.clone();
686+
let valid = self
687+
.context
688+
.with_strategy(move |strategy| ack.verify(&mut rng, &*scheme, strategy))
689+
.await;
690+
if !valid {
683691
return Err(Error::InvalidAckSignature);
684692
}
685693

consensus/src/aggregation/mod.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ mod tests {
9898
};
9999
use commonware_macros::{select, test_group, test_traced};
100100
use commonware_p2p::simulated::{Link, Network, Oracle, Receiver, Sender};
101-
use commonware_parallel::Sequential;
102101
use commonware_runtime::{
103102
buffer::paged::CacheRef,
104103
deterministic::{self, Context},
@@ -253,7 +252,6 @@ mod tests {
253252
journal_heights_per_section: std::num::NonZeroU64::new(6).unwrap(),
254253
journal_compression: Some(3),
255254
journal_page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
256-
strategy: Sequential,
257255
},
258256
);
259257

@@ -502,7 +500,6 @@ mod tests {
502500
PAGE_SIZE,
503501
PAGE_CACHE_SIZE,
504502
),
505-
strategy: Sequential,
506503
},
507504
);
508505

@@ -656,7 +653,6 @@ mod tests {
656653
PAGE_SIZE,
657654
PAGE_CACHE_SIZE,
658655
),
659-
strategy: Sequential,
660656
},
661657
);
662658

@@ -743,7 +739,6 @@ mod tests {
743739
PAGE_SIZE,
744740
PAGE_CACHE_SIZE,
745741
),
746-
strategy: Sequential,
747742
},
748743
);
749744

@@ -1091,7 +1086,6 @@ mod tests {
10911086
PAGE_SIZE,
10921087
PAGE_CACHE_SIZE,
10931088
),
1094-
strategy: Sequential,
10951089
},
10961090
);
10971091

0 commit comments

Comments
 (0)