Skip to content

Commit d8b2c95

Browse files
[release] v2026.3.0 Testing (#183)
* spike * make it async * nit * fix tests * fmt * fix docs * fix timeouts * update version
1 parent a179785 commit d8b2c95

15 files changed

Lines changed: 395 additions & 248 deletions

File tree

Cargo.lock

Lines changed: 205 additions & 112 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,20 @@ license = "MIT OR Apache-2.0"
2020
alto-chain = { version = "0.0.19", path = "chain" }
2121
alto-client = { version = "0.0.19", path = "client" }
2222
alto-types = { version = "0.0.19", path = "types" }
23-
commonware-broadcast = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
24-
commonware-codec = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
25-
commonware-consensus = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
26-
commonware-cryptography = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
27-
commonware-deployer = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e", default-features = false }
28-
commonware-macros = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
29-
commonware-p2p = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
30-
commonware-resolver = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
31-
commonware-runtime = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
32-
commonware-storage = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
33-
commonware-stream = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
34-
commonware-utils = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
35-
commonware-math = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
36-
commonware-parallel = { git = "https://github.com/commonwarexyz/monorepo.git", rev = "16e98b5247c513361ef791d1556a01e1dfd4eb9e" }
23+
commonware-broadcast = "2026.3.0"
24+
commonware-codec = "2026.3.0"
25+
commonware-consensus = "2026.3.0"
26+
commonware-cryptography = "2026.3.0"
27+
commonware-deployer = { version = "2026.3.0", default-features = false }
28+
commonware-macros = "2026.3.0"
29+
commonware-p2p = "2026.3.0"
30+
commonware-resolver = "2026.3.0"
31+
commonware-runtime = "2026.3.0"
32+
commonware-storage = "2026.3.0"
33+
commonware-stream = "2026.3.0"
34+
commonware-utils = "2026.3.0"
35+
commonware-math = "2026.3.0"
36+
commonware-parallel = "2026.3.0"
3737
thiserror = "2.0.12"
3838
bytes = "1.7.1"
3939
rand = "0.8.5"

chain/src/application.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
use alto_types::{Block, Context, Scheme, EPOCH};
22
use commonware_consensus::{
3-
marshal::{ingress::mailbox::AncestorStream, Update},
3+
marshal::{
4+
ancestry::{AncestorStream, BlockProvider},
5+
Update,
6+
},
47
types::{Height, Round, View},
58
Heightable, Reporter,
69
};
@@ -55,10 +58,10 @@ where
5558
self.genesis.as_ref().clone()
5659
}
5760

58-
async fn propose(
61+
async fn propose<A: BlockProvider<Block = Self::Block>>(
5962
&mut self,
6063
(runtime_context, context): (E, Self::Context),
61-
mut ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
64+
mut ancestry: AncestorStream<A, Self::Block>,
6265
) -> Option<Self::Block> {
6366
let parent = ancestry.next().await?;
6467

@@ -81,10 +84,10 @@ impl<E> commonware_consensus::VerifyingApplication<E> for Application
8184
where
8285
E: Rng + Spawner + Metrics + Clock,
8386
{
84-
async fn verify(
87+
async fn verify<A: BlockProvider<Block = Self::Block>>(
8588
&mut self,
8689
(runtime_context, _): (E, Context),
87-
mut ancestry: AncestorStream<Self::SigningScheme, Self::Block>,
90+
mut ancestry: AncestorStream<A, Self::Block>,
8891
) -> bool {
8992
let Some(block) = ancestry.next().await else {
9093
return false;

chain/src/engine.rs

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,12 @@ use crate::{
55
use alto_types::{Activity, Block, Finalization, Scheme, EPOCH, EPOCH_LENGTH, NAMESPACE};
66
use commonware_broadcast::buffered;
77
use commonware_consensus::{
8-
application::marshaled::Marshaled as ConsensusMarshaled,
9-
marshal::{self, ingress::handler},
8+
marshal::{
9+
self,
10+
core::{Actor as MarshalActor, Mailbox as MarshalMailbox},
11+
resolver::handler,
12+
standard::{Deferred, Standard},
13+
},
1014
simplex::{self, elector::Random, Engine as Consensus},
1115
types::{Epoch, FixedEpocher, ViewDelta},
1216
Reporters,
@@ -17,7 +21,7 @@ use commonware_cryptography::{
1721
ed25519::PublicKey,
1822
sha256::Digest,
1923
};
20-
use commonware_p2p::{Blocker, Receiver, Sender};
24+
use commonware_p2p::{Blocker, Provider, Receiver, Sender};
2125
use commonware_parallel::Strategy;
2226
use commonware_resolver::Resolver;
2327
use commonware_runtime::{
@@ -40,7 +44,7 @@ use tracing::{error, info, warn};
4044

4145
/// Reporter type for [simplex::Engine].
4246
type Reporter<E, I> =
43-
Reporters<Activity, marshal::Mailbox<Scheme, Block>, Option<indexer::Pusher<E, I>>>;
47+
Reporters<Activity, MarshalMailbox<Scheme, Standard<Block>>, Option<indexer::Pusher<E, I>>>;
4448

4549
/// To better support peers near tip during network instability, we multiply
4650
/// the consensus activity timeout by this factor.
@@ -59,8 +63,14 @@ const MAX_REPAIR: NonZero<usize> = NZUsize!(20);
5963
const MAX_PENDING_ACKS: NonZero<usize> = NZUsize!(16);
6064

6165
/// Configuration for the [Engine].
62-
pub struct Config<B: Blocker<PublicKey = PublicKey>, I: Indexer, S: Strategy> {
66+
pub struct Config<
67+
B: Blocker<PublicKey = PublicKey>,
68+
P: Provider<PublicKey = PublicKey>,
69+
I: Indexer,
70+
S: Strategy,
71+
> {
6372
pub blocker: B,
73+
pub provider: P,
6474
pub partition_prefix: String,
6575
pub blocks_freezer_table_initial_size: u32,
6676
pub finalized_freezer_table_initial_size: u32,
@@ -72,7 +82,7 @@ pub struct Config<B: Blocker<PublicKey = PublicKey>, I: Indexer, S: Strategy> {
7282
pub deque_size: usize,
7383

7484
pub leader_timeout: Duration,
75-
pub notarization_timeout: Duration,
85+
pub certification_timeout: Duration,
7686
pub nullify_retry: Duration,
7787
pub fetch_timeout: Duration,
7888
pub activity_timeout: ViewDelta,
@@ -87,24 +97,25 @@ pub struct Config<B: Blocker<PublicKey = PublicKey>, I: Indexer, S: Strategy> {
8797
pub indexer: Option<I>,
8898
}
8999

90-
type Marshaled<E> = ConsensusMarshaled<E, Scheme, Application, Block, FixedEpocher>;
100+
type Marshaled<E> = Deferred<E, Scheme, Application, Block, FixedEpocher>;
91101

92102
/// The engine that drives the [Application].
93103
#[allow(clippy::type_complexity)]
94-
pub struct Engine<E, B, S, I>
104+
pub struct Engine<E, B, P, S, I>
95105
where
96106
E: BufferPooler + Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics,
97107
B: Blocker<PublicKey = PublicKey>,
108+
P: Provider<PublicKey = PublicKey>,
98109
S: Strategy,
99110
I: Indexer,
100111
{
101112
context: ContextCell<E>,
102113

103-
buffer: buffered::Engine<E, PublicKey, Block>,
114+
buffer: buffered::Engine<E, PublicKey, Block, P>,
104115
buffer_mailbox: buffered::Mailbox<PublicKey, Block>,
105-
marshal: marshal::Actor<
116+
marshal: MarshalActor<
106117
E,
107-
Block,
118+
Standard<Block>,
108119
ConstantProvider<Scheme, Epoch>,
109120
immutable::Archive<E, Digest, Finalization>,
110121
immutable::Archive<E, Digest, Block>,
@@ -117,15 +128,16 @@ where
117128
Consensus<E, Scheme, Random, B, Digest, Marshaled<E>, Marshaled<E>, Reporter<E, I>, S>,
118129
}
119130

120-
impl<E, B, S, I> Engine<E, B, S, I>
131+
impl<E, B, P, S, I> Engine<E, B, P, S, I>
121132
where
122133
E: BufferPooler + Clock + GClock + Rng + CryptoRng + Spawner + ThreadPooler + Storage + Metrics,
123134
B: Blocker<PublicKey = PublicKey>,
135+
P: Provider<PublicKey = PublicKey>,
124136
S: Strategy,
125137
I: Indexer,
126138
{
127139
/// Create a new [Engine].
128-
pub async fn new(context: E, cfg: Config<B, I, S>) -> Self {
140+
pub async fn new(context: E, cfg: Config<B, P, I, S>) -> Self {
129141
// Create the buffer
130142
let (buffer, buffer_mailbox) = buffered::Engine::new(
131143
context.with_label("buffer"),
@@ -135,6 +147,7 @@ where
135147
deque_size: cfg.deque_size,
136148
priority: true,
137149
codec_config: (),
150+
peer_provider: cfg.provider,
138151
},
139152
);
140153

@@ -226,7 +239,7 @@ where
226239
.expect("failed to create scheme");
227240
let provider = ConstantProvider::new(scheme.clone());
228241
let epocher = FixedEpocher::new(EPOCH_LENGTH);
229-
let (marshal, marshal_mailbox, _) = marshal::Actor::init(
242+
let (marshal, marshal_mailbox, _) = MarshalActor::init(
230243
context.with_label("marshal"),
231244
finalizations_by_height,
232245
finalized_blocks,
@@ -287,8 +300,8 @@ where
287300
partition: format!("{}-consensus", cfg.partition_prefix),
288301
mailbox_size: cfg.mailbox_size,
289302
leader_timeout: cfg.leader_timeout,
290-
notarization_timeout: cfg.notarization_timeout,
291-
nullify_retry: cfg.nullify_retry,
303+
certification_timeout: cfg.certification_timeout,
304+
timeout_retry: cfg.nullify_retry,
292305
fetch_timeout: cfg.fetch_timeout,
293306
activity_timeout: cfg.activity_timeout,
294307
skip_timeout: cfg.skip_timeout,
@@ -335,8 +348,8 @@ where
335348
impl Receiver<PublicKey = PublicKey>,
336349
),
337350
marshal: (
338-
mpsc::Receiver<handler::Message<Block>>,
339-
impl Resolver<Key = handler::Request<Block>, PublicKey = PublicKey>,
351+
mpsc::Receiver<handler::Message<Digest>>,
352+
impl Resolver<Key = handler::Request<Digest>, PublicKey = PublicKey>,
340353
),
341354
) -> Handle<()> {
342355
spawn_cell!(
@@ -366,8 +379,8 @@ where
366379
impl Receiver<PublicKey = PublicKey>,
367380
),
368381
marshal: (
369-
mpsc::Receiver<handler::Message<Block>>,
370-
impl Resolver<Key = handler::Request<Block>, PublicKey = PublicKey>,
382+
mpsc::Receiver<handler::Message<Digest>>,
383+
impl Resolver<Key = handler::Request<Digest>, PublicKey = PublicKey>,
371384
),
372385
) {
373386
// Start the buffer

chain/src/indexer.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
#[cfg(test)]
22
use alto_types::Identity;
33
use alto_types::{Activity, Block, Finalized, Notarized, Scheme, Seed, Seedable};
4-
use commonware_consensus::{marshal, Reporter, Viewable};
4+
use commonware_consensus::{
5+
marshal::{core::Mailbox as MarshalMailbox, standard::Standard},
6+
Reporter, Viewable,
7+
};
58
use commonware_parallel::Strategy;
69
use commonware_runtime::{Metrics, Spawner};
710
use std::future::Future;
@@ -99,12 +102,12 @@ impl<S: Strategy> Indexer for alto_client::Client<S> {
99102
pub struct Pusher<E: Spawner + Metrics, I: Indexer> {
100103
context: E,
101104
indexer: I,
102-
marshal: marshal::Mailbox<Scheme, Block>,
105+
marshal: MarshalMailbox<Scheme, Standard<Block>>,
103106
}
104107

105108
impl<E: Spawner + Metrics, I: Indexer> Pusher<E, I> {
106109
/// Create a new [Pusher].
107-
pub fn new(context: E, indexer: I, marshal: marshal::Mailbox<Scheme, Block>) -> Self {
110+
pub fn new(context: E, indexer: I, marshal: MarshalMailbox<Scheme, Standard<Block>>) -> Self {
108111
Self {
109112
context,
110113
indexer,
@@ -137,11 +140,14 @@ impl<E: Spawner + Metrics, I: Indexer> Reporter for Pusher<E, I> {
137140
// Upload block to indexer (once we have it)
138141
self.context.with_label("notarized_block").spawn({
139142
let indexer = self.indexer.clone();
140-
let mut marshal = self.marshal.clone();
143+
let marshal = self.marshal.clone();
141144
move |_| async move {
142145
// Wait for block
143146
let block = marshal
144-
.subscribe(Some(notarization.round()), notarization.proposal.payload)
147+
.subscribe_by_digest(
148+
Some(notarization.round()),
149+
notarization.proposal.payload,
150+
)
145151
.await
146152
.await;
147153
let Ok(block) = block else {
@@ -179,10 +185,13 @@ impl<E: Spawner + Metrics, I: Indexer> Reporter for Pusher<E, I> {
179185
// Upload block to indexer (once we have it)
180186
self.context.with_label("finalized_block").spawn({
181187
let indexer = self.indexer.clone();
182-
let mut marshal = self.marshal.clone();
188+
let marshal = self.marshal.clone();
183189
move |_| async move {
184190
let block = marshal
185-
.subscribe(Some(finalization.round()), finalization.proposal.payload)
191+
.subscribe_by_digest(
192+
Some(finalization.round()),
193+
finalization.proposal.payload,
194+
)
186195
.await
187196
.await;
188197
let Ok(block) = block else {

0 commit comments

Comments
 (0)