Skip to content

Commit 468b7a9

Browse files
[examples] Migrate to Actor Mailbox (#3806)
1 parent 1b229b8 commit 468b7a9

7 files changed

Lines changed: 165 additions & 74 deletions

File tree

consensus/src/marshal/coding/shards/engine.rs

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ use commonware_runtime::{
166166
};
167167
use commonware_utils::{
168168
bitmap::BitMap,
169-
channel::{fallible::OneshotExt, mpsc, oneshot},
169+
channel::{fallible::OneshotExt, oneshot},
170170
ordered::{Quorum, Set},
171171
};
172172
use rand::Rng;
@@ -251,9 +251,9 @@ where
251251
/// Capacity of the channel between the background receiver and the engine.
252252
///
253253
/// The background receiver decodes incoming network messages in a separate
254-
/// task and forwards them to the engine over an `mpsc` channel with this
254+
/// task and forwards them to the engine over a mailbox with this
255255
/// capacity.
256-
pub background_channel_capacity: usize,
256+
pub background_channel_capacity: NonZeroUsize,
257257

258258
/// Provider for peer set information. Pre-leader shards are buffered per
259259
/// peer only while that peer appears in the
@@ -324,7 +324,7 @@ where
324324
latest_primary_peers: Set<P>,
325325

326326
/// Capacity of the background receiver channel.
327-
background_channel_capacity: usize,
327+
background_channel_capacity: NonZeroUsize,
328328

329329
/// An ephemeral cache of reconstructed blocks, keyed by commitment.
330330
///
@@ -412,8 +412,8 @@ where
412412
self.context.network_buffer_pool().clone(),
413413
sender,
414414
);
415-
let (receiver_service, mut receiver): (_, mpsc::Receiver<(P, Shard<C, H>)>) =
416-
WrappedBackgroundReceiver::new(
415+
let (receiver_service, mut receiver) =
416+
WrappedBackgroundReceiver::<_, P, X, _, Shard<C, H>>::new(
417417
self.context.child("shard_ingress"),
418418
receiver,
419419
self.shard_codec_cfg.clone(),
@@ -1801,7 +1801,7 @@ mod tests {
18011801
strategy: STRATEGY,
18021802
mailbox_size: NZUsize!(1024),
18031803
peer_buffer_size: NZUsize!(64),
1804-
background_channel_capacity: 1024,
1804+
background_channel_capacity: NZUsize!(1024),
18051805
peer_provider: oracle.manager(),
18061806
};
18071807

@@ -1840,7 +1840,7 @@ mod tests {
18401840
strategy: STRATEGY,
18411841
mailbox_size: NZUsize!(1024),
18421842
peer_buffer_size: NZUsize!(64),
1843-
background_channel_capacity: 1024,
1843+
background_channel_capacity: NZUsize!(1024),
18441844
peer_provider: oracle.manager(),
18451845
};
18461846

@@ -3624,7 +3624,7 @@ mod tests {
36243624
strategy: STRATEGY,
36253625
mailbox_size: NZUsize!(1024),
36263626
peer_buffer_size: NZUsize!(64),
3627-
background_channel_capacity: 1024,
3627+
background_channel_capacity: NZUsize!(1024),
36283628
peer_provider: oracle.manager(),
36293629
};
36303630

@@ -3753,7 +3753,7 @@ mod tests {
37533753
strategy: STRATEGY,
37543754
mailbox_size: NZUsize!(1024),
37553755
peer_buffer_size: NZUsize!(64),
3756-
background_channel_capacity: 1024,
3756+
background_channel_capacity: NZUsize!(1024),
37573757
peer_provider: oracle.manager(),
37583758
};
37593759
let (broadcaster_engine, broadcaster_mailbox) =
@@ -3776,7 +3776,7 @@ mod tests {
37763776
strategy: STRATEGY,
37773777
mailbox_size: NZUsize!(1024),
37783778
peer_buffer_size: NZUsize!(64),
3779-
background_channel_capacity: 1024,
3779+
background_channel_capacity: NZUsize!(1024),
37803780
peer_provider: oracle.manager(),
37813781
};
37823782
let (receiver_engine, receiver_mailbox) =
@@ -4555,7 +4555,7 @@ mod tests {
45554555
strategy: STRATEGY,
45564556
mailbox_size: NZUsize!(1024),
45574557
peer_buffer_size: NZUsize!(64),
4558-
background_channel_capacity: 1024,
4558+
background_channel_capacity: NZUsize!(1024),
45594559
peer_provider: oracle.manager(),
45604560
};
45614561

@@ -4658,7 +4658,7 @@ mod tests {
46584658
strategy: STRATEGY,
46594659
mailbox_size: NZUsize!(16),
46604660
peer_buffer_size: NZUsize!(4),
4661-
background_channel_capacity: 16,
4661+
background_channel_capacity: NZUsize!(16),
46624662
peer_provider: oracle.manager(),
46634663
};
46644664

@@ -4791,7 +4791,7 @@ mod tests {
47914791
strategy: STRATEGY,
47924792
mailbox_size: NZUsize!(1024),
47934793
peer_buffer_size: NZUsize!(64),
4794-
background_channel_capacity: 1024,
4794+
background_channel_capacity: NZUsize!(1024),
47954795
peer_provider: oracle.manager(),
47964796
};
47974797

@@ -4946,7 +4946,7 @@ mod tests {
49464946
strategy: STRATEGY,
49474947
mailbox_size: NZUsize!(1024),
49484948
peer_buffer_size: NZUsize!(64),
4949-
background_channel_capacity: 1024,
4949+
background_channel_capacity: NZUsize!(1024),
49504950
peer_provider: oracle.manager(),
49514951
};
49524952

consensus/src/marshal/mocks/harness.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2507,7 +2507,7 @@ impl TestHarness for CodingHarness {
25072507
strategy: Sequential,
25082508
mailbox_size: NZUsize!(10),
25092509
peer_buffer_size: NZUsize!(64),
2510-
background_channel_capacity: 1024,
2510+
background_channel_capacity: NZUsize!(1024),
25112511
peer_provider: oracle.manager(),
25122512
};
25132513
let (shard_engine, shard_mailbox) =
@@ -2694,7 +2694,7 @@ impl TestHarness for CodingHarness {
26942694
strategy: Sequential,
26952695
mailbox_size: NZUsize!(10),
26962696
peer_buffer_size: NZUsize!(64),
2697-
background_channel_capacity: 1024,
2697+
background_channel_capacity: NZUsize!(1024),
26982698
peer_provider: oracle.manager(),
26992699
};
27002700
let (shard_engine, shard_mailbox) =

examples/reshare/src/dkg/actor.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ where
304304
share: epoch_state.share.clone(),
305305
dealers: dealers.clone(),
306306
};
307-
orchestrator.enter(transition).await;
307+
orchestrator.enter(transition);
308308

309309
// Register a channel for this round
310310
let (mut round_sender, mut round_receiver) = dkg_mux
@@ -576,7 +576,7 @@ where
576576
};
577577

578578
// Exit the engine for this epoch now that the boundary is finalized
579-
orchestrator.exit(epoch).await;
579+
orchestrator.exit(epoch);
580580

581581
// If the update is stop, wait forever.
582582
if let PostUpdate::Stop = callback.on_update(update).await {
@@ -773,7 +773,8 @@ mod tests {
773773
},
774774
);
775775
let (sender, receiver) = inert_channel(&peer_config.participants);
776-
let (orchestrator_sender, mut orchestrator_receiver) = mpsc::channel(4);
776+
let (orchestrator_sender, mut orchestrator_receiver) =
777+
mailbox::new(context.child("orchestrator_mailbox"), NZUsize!(4));
777778
actor.start(
778779
None,
779780
None,

examples/reshare/src/engine.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ where
303303
marshal: marshal_mailbox,
304304
strategy: config.strategy.clone(),
305305
muxer_size: MAILBOX_SIZE.get(),
306-
mailbox_size: MAILBOX_SIZE.get(),
306+
mailbox_size: MAILBOX_SIZE,
307307
partition_prefix: format!("{}_consensus", config.partition_prefix),
308308
_phantom: PhantomData,
309309
},

examples/reshare/src/orchestrator/actor.rs

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use crate::{
55
orchestrator::{ingress::Message, Mailbox},
66
BLOCKS_PER_EPOCH,
77
};
8+
use commonware_actor::mailbox;
89
use commonware_consensus::{
910
marshal::{core::Mailbox as MarshalMailbox, standard::Standard},
1011
simplex::{self, elector::Config as Elector, scheme, types::Context, Plan},
@@ -17,7 +18,7 @@ use commonware_cryptography::{
1718
use commonware_macros::select_loop;
1819
use commonware_p2p::{
1920
utils::mux::{Builder, MuxHandle, Muxer},
20-
Blocker, Receiver, Sender,
21+
Blocker, Sender,
2122
};
2223
use commonware_parallel::Strategy;
2324
use commonware_runtime::{
@@ -26,9 +27,9 @@ use commonware_runtime::{
2627
telemetry::metrics::{Gauge, GaugeExt, MetricsExt as _},
2728
BufferPooler, Clock, ContextCell, Handle, Metrics, Network, Spawner, Storage,
2829
};
29-
use commonware_utils::{channel::mpsc, vec::NonEmptyVec, NZUsize, NZU16};
30+
use commonware_utils::{vec::NonEmptyVec, NZUsize, NZU16};
3031
use rand_core::CryptoRngCore;
31-
use std::{collections::BTreeMap, marker::PhantomData, time::Duration};
32+
use std::{collections::BTreeMap, marker::PhantomData, num::NonZeroUsize, time::Duration};
3233
use tracing::{debug, info, warn};
3334

3435
/// Configuration for the orchestrator.
@@ -51,7 +52,7 @@ where
5152
pub strategy: T,
5253

5354
pub muxer_size: usize,
54-
pub mailbox_size: usize,
55+
pub mailbox_size: NonZeroUsize,
5556

5657
// Partition prefix used for orchestrator metadata persistence
5758
pub partition_prefix: String,
@@ -74,7 +75,7 @@ where
7475
Provider<S, C>: EpochProvider<Variant = V, PublicKey = C::PublicKey, Scheme = S>,
7576
{
7677
context: ContextCell<E>,
77-
mailbox: mpsc::Receiver<Message<V, C::PublicKey>>,
78+
mailbox: mailbox::Receiver<Message<V, C::PublicKey>>,
7879
application: A,
7980

8081
oracle: B,
@@ -109,7 +110,7 @@ where
109110
context: E,
110111
config: Config<B, V, C, H, A, S, L, T>,
111112
) -> (Self, Mailbox<V, C::PublicKey>) {
112-
let (sender, mailbox) = mpsc::channel(config.mailbox_size);
113+
let (sender, mailbox) = mailbox::new(context.child("mailbox"), config.mailbox_size);
113114
let page_cache_ref = CacheRef::from_pooler(&context, NZU16!(16_384), NZUsize!(10_000));
114115

115116
// Register latest_epoch gauge for Grafana integration
@@ -138,15 +139,15 @@ where
138139
mut self,
139140
votes: (
140141
impl Sender<PublicKey = C::PublicKey>,
141-
impl Receiver<PublicKey = C::PublicKey>,
142+
impl commonware_p2p::Receiver<PublicKey = C::PublicKey>,
142143
),
143144
certificates: (
144145
impl Sender<PublicKey = C::PublicKey>,
145-
impl Receiver<PublicKey = C::PublicKey>,
146+
impl commonware_p2p::Receiver<PublicKey = C::PublicKey>,
146147
),
147148
resolver: (
148149
impl Sender<PublicKey = C::PublicKey>,
149-
impl Receiver<PublicKey = C::PublicKey>,
150+
impl commonware_p2p::Receiver<PublicKey = C::PublicKey>,
150151
),
151152
) -> Handle<()> {
152153
spawn_cell!(self.context, self.run(votes, certificates, resolver,))
@@ -156,15 +157,15 @@ where
156157
mut self,
157158
(vote_sender, vote_receiver): (
158159
impl Sender<PublicKey = C::PublicKey>,
159-
impl Receiver<PublicKey = C::PublicKey>,
160+
impl commonware_p2p::Receiver<PublicKey = C::PublicKey>,
160161
),
161162
(certificate_sender, certificate_receiver): (
162163
impl Sender<PublicKey = C::PublicKey>,
163-
impl Receiver<PublicKey = C::PublicKey>,
164+
impl commonware_p2p::Receiver<PublicKey = C::PublicKey>,
164165
),
165166
(resolver_sender, resolver_receiver): (
166167
impl Sender<PublicKey = C::PublicKey>,
167-
impl Receiver<PublicKey = C::PublicKey>,
168+
impl commonware_p2p::Receiver<PublicKey = C::PublicKey>,
168169
),
169170
) {
170171
// Start muxers for each physical channel used by consensus
@@ -287,15 +288,15 @@ where
287288
scheme: S,
288289
vote_mux: &mut MuxHandle<
289290
impl Sender<PublicKey = C::PublicKey>,
290-
impl Receiver<PublicKey = C::PublicKey>,
291+
impl commonware_p2p::Receiver<PublicKey = C::PublicKey>,
291292
>,
292293
certificate_mux: &mut MuxHandle<
293294
impl Sender<PublicKey = C::PublicKey>,
294-
impl Receiver<PublicKey = C::PublicKey>,
295+
impl commonware_p2p::Receiver<PublicKey = C::PublicKey>,
295296
>,
296297
resolver_mux: &mut MuxHandle<
297298
impl Sender<PublicKey = C::PublicKey>,
298-
impl Receiver<PublicKey = C::PublicKey>,
299+
impl commonware_p2p::Receiver<PublicKey = C::PublicKey>,
299300
>,
300301
) -> Handle<()> {
301302
// Start the new engine
Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
//! Inbound communication channel for epoch transitions.
22
3+
use commonware_actor::mailbox::{Policy, Sender};
34
use commonware_consensus::types::Epoch;
45
use commonware_cryptography::{
56
bls12381::primitives::{group, sharing::Sharing, variant::Variant},
67
PublicKey,
78
};
8-
use commonware_utils::{channel::mpsc, ordered::Set};
9+
use commonware_utils::ordered::Set;
10+
use std::collections::VecDeque;
911
use tracing::error;
1012

1113
/// Messages that can be sent to the orchestrator.
@@ -14,6 +16,36 @@ pub enum Message<V: Variant, P: PublicKey> {
1416
Exit(Epoch),
1517
}
1618

19+
impl<V: Variant, P: PublicKey> Policy for Message<V, P> {
20+
type Overflow = VecDeque<Self>;
21+
22+
fn handle(overflow: &mut VecDeque<Self>, message: Self) -> bool {
23+
match message {
24+
Self::Enter(transition) => {
25+
let epoch = transition.epoch;
26+
if let Some(index) = overflow
27+
.iter()
28+
.position(|pending| matches!(pending, Self::Exit(pending) if *pending == epoch))
29+
{
30+
overflow.remove(index);
31+
} else {
32+
overflow.push_back(Self::Enter(transition));
33+
}
34+
}
35+
Self::Exit(epoch) => {
36+
if let Some(index) = overflow.iter().position(
37+
|pending| matches!(pending, Self::Enter(pending) if pending.epoch == epoch),
38+
) {
39+
overflow.remove(index);
40+
} else {
41+
overflow.push_back(Self::Exit(epoch));
42+
}
43+
}
44+
}
45+
true
46+
}
47+
}
48+
1749
/// A notification of an epoch transition.
1850
pub struct EpochTransition<V: Variant, P: PublicKey> {
1951
/// The epoch to transition to.
@@ -29,24 +61,24 @@ pub struct EpochTransition<V: Variant, P: PublicKey> {
2961
/// Inbound communication channel for epoch transitions.
3062
#[derive(Debug, Clone)]
3163
pub struct Mailbox<V: Variant, P: PublicKey> {
32-
sender: mpsc::Sender<Message<V, P>>,
64+
sender: Sender<Message<V, P>>,
3365
}
3466

3567
impl<V: Variant, P: PublicKey> Mailbox<V, P> {
3668
/// Create a new [Mailbox].
37-
pub const fn new(sender: mpsc::Sender<Message<V, P>>) -> Self {
69+
pub const fn new(sender: Sender<Message<V, P>>) -> Self {
3870
Self { sender }
3971
}
4072

41-
pub async fn enter(&mut self, transition: EpochTransition<V, P>) {
42-
if let Err(err) = self.sender.send(Message::Enter(transition)).await {
43-
error!(?err, "failed to send epoch transition");
73+
pub fn enter(&mut self, transition: EpochTransition<V, P>) {
74+
if !self.sender.enqueue(Message::Enter(transition)).accepted() {
75+
error!("failed to send epoch transition");
4476
}
4577
}
4678

47-
pub async fn exit(&mut self, epoch: Epoch) {
48-
if let Err(err) = self.sender.send(Message::Exit(epoch)).await {
49-
error!(?err, "failed to send epoch exit");
79+
pub fn exit(&mut self, epoch: Epoch) {
80+
if !self.sender.enqueue(Message::Exit(epoch)).accepted() {
81+
error!("failed to send epoch exit");
5082
}
5183
}
5284
}

0 commit comments

Comments
 (0)