Skip to content

Commit 5acdc54

Browse files
[p2p/utils] (Optionally) Wrap Channel with Codec (#830)
1 parent 5da3189 commit 5acdc54

9 files changed

Lines changed: 257 additions & 103 deletions

File tree

consensus/src/ordered_broadcast/engine.rs

Lines changed: 23 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,15 @@ use super::{
1313
AckManager, Config, TipManager,
1414
};
1515
use crate::{Automaton, Monitor, Relay, Reporter, Supervisor, ThresholdSupervisor};
16-
use commonware_codec::{DecodeExt, Encode};
1716
use commonware_cryptography::{
1817
bls12381::primitives::{group, poly},
1918
Digest, Scheme,
2019
};
2120
use commonware_macros::select;
22-
use commonware_p2p::{Receiver, Recipients, Sender};
21+
use commonware_p2p::{
22+
utils::codec::{wrap, WrappedSender},
23+
Receiver, Recipients, Sender,
24+
};
2325
use commonware_runtime::{
2426
telemetry::metrics::{
2527
histogram,
@@ -259,8 +261,8 @@ impl<
259261

260262
/// Inner run loop called by `start`.
261263
async fn run(mut self, chunk_network: (NetS, NetR), ack_network: (NetS, NetR)) {
262-
let (mut node_sender, mut node_receiver) = chunk_network;
263-
let (mut ack_sender, mut ack_receiver) = ack_network;
264+
let (mut node_sender, mut node_receiver) = wrap((), chunk_network.0, chunk_network.1);
265+
let (mut ack_sender, mut ack_receiver) = wrap((), ack_network.0, ack_network.1);
264266
let mut shutdown = self.context.stopped();
265267

266268
// Tracks if there is an outstanding proposal request to the automaton.
@@ -361,7 +363,7 @@ impl<
361363
}
362364
};
363365
let mut guard = self.metrics.nodes.guard(Status::Invalid);
364-
let node = match Node::decode(msg) {
366+
let node = match msg {
365367
Ok(node) => node,
366368
Err(err) => {
367369
warn!(?err, ?sender, "node decode failed");
@@ -405,7 +407,7 @@ impl<
405407
}
406408
};
407409
let mut guard = self.metrics.acks.guard(Status::Invalid);
408-
let ack = match Ack::decode(msg) {
410+
let ack = match msg {
409411
Ok(ack) => ack,
410412
Err(err) => {
411413
warn!(?err, ?sender, "ack decode failed");
@@ -469,7 +471,7 @@ impl<
469471
&mut self,
470472
context: &Context<C::PublicKey>,
471473
payload: &D,
472-
ack_sender: &mut NetS,
474+
ack_sender: &mut WrappedSender<NetS, (), Ack<C::PublicKey, D>>,
473475
) -> Result<(), Error> {
474476
// Get the tip
475477
let Some(tip) = self.tip_manager.get(&context.sequencer) else {
@@ -513,19 +515,15 @@ impl<
513515
recipients
514516
};
515517

518+
// Handle the ack internally
519+
self.handle_ack(&ack).await?;
520+
516521
// Send the ack to the network
517522
ack_sender
518-
.send(
519-
Recipients::Some(recipients),
520-
ack.encode().into(),
521-
self.priority_acks,
522-
)
523+
.send(Recipients::Some(recipients), ack, self.priority_acks)
523524
.await
524525
.map_err(|_| Error::UnableToSendMessage)?;
525526

526-
// Handle the ack internally
527-
self.handle_ack(&ack).await?;
528-
529527
// Emit the activity
530528
self.reporter
531529
.report(Activity::Proposal(Proposal::new(
@@ -669,7 +667,7 @@ impl<
669667
&mut self,
670668
context: Context<C::PublicKey>,
671669
payload: D,
672-
node_sender: &mut NetS,
670+
node_sender: &mut WrappedSender<NetS, (), Node<C, D>>,
673671
) -> Result<(), Error> {
674672
let mut guard = self.metrics.propose.guard(Status::Dropped);
675673
let me = self.crypto.public_key();
@@ -718,7 +716,7 @@ impl<
718716
self.propose_timer = Some(self.metrics.e2e_duration.timer());
719717

720718
// Broadcast to network
721-
if let Err(err) = self.broadcast(&node, node_sender, self.epoch).await {
719+
if let Err(err) = self.broadcast(node, node_sender, self.epoch).await {
722720
guard.set(Status::Failure);
723721
return Err(err);
724722
};
@@ -734,7 +732,10 @@ impl<
734732
/// - this instance is the sequencer for the current epoch.
735733
/// - this instance has a chunk to rebroadcast.
736734
/// - this instance has not yet collected the threshold signature for the chunk.
737-
async fn rebroadcast(&mut self, node_sender: &mut NetS) -> Result<(), Error> {
735+
async fn rebroadcast(
736+
&mut self,
737+
node_sender: &mut WrappedSender<NetS, (), Node<C, D>>,
738+
) -> Result<(), Error> {
738739
let mut guard = self.metrics.rebroadcast.guard(Status::Dropped);
739740

740741
// Unset the rebroadcast deadline
@@ -762,16 +763,16 @@ impl<
762763

763764
// Broadcast the message, which resets the rebroadcast deadline
764765
guard.set(Status::Failure);
765-
self.broadcast(&tip, node_sender, self.epoch).await?;
766+
self.broadcast(tip, node_sender, self.epoch).await?;
766767
guard.set(Status::Success);
767768
Ok(())
768769
}
769770

770771
/// Send a `Node` message to all validators in the given epoch.
771772
async fn broadcast(
772773
&mut self,
773-
node: &Node<C, D>,
774-
node_sender: &mut NetS,
774+
node: Node<C, D>,
775+
node_sender: &mut WrappedSender<NetS, (), Node<C, D>>,
775776
epoch: Epoch,
776777
) -> Result<(), Error> {
777778
// Get the validators for the epoch
@@ -786,7 +787,7 @@ impl<
786787
node_sender
787788
.send(
788789
Recipients::Some(validators.clone()),
789-
node.encode().into(),
790+
node,
790791
self.priority_proposals,
791792
)
792793
.await

consensus/src/simplex/actors/resolver/actor.rs

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,15 @@ use crate::{
99
},
1010
Supervisor,
1111
};
12-
use commonware_codec::{Decode, Encode};
1312
use commonware_cryptography::{Digest, Scheme};
1413
use commonware_macros::select;
15-
use commonware_p2p::{utils::requester, Receiver, Recipients, Sender};
14+
use commonware_p2p::{
15+
utils::{
16+
codec::{wrap, WrappedSender},
17+
requester,
18+
},
19+
Receiver, Recipients, Sender,
20+
};
1621
use commonware_runtime::{Clock, Handle, Metrics, Spawner};
1722
use futures::{channel::mpsc, future::Either, StreamExt};
1823
use governor::clock::Clock as GClock;
@@ -191,7 +196,11 @@ impl<
191196
}
192197

193198
/// Concurrent indicates whether we should send a new request (only if we see a request for the first time)
194-
async fn send(&mut self, shuffle: bool, sender: &mut impl Sender<PublicKey = C::PublicKey>) {
199+
async fn send<Sr: Sender<PublicKey = C::PublicKey>>(
200+
&mut self,
201+
shuffle: bool,
202+
sender: &mut WrappedSender<Sr, (usize, usize), Backfiller<C::Signature, D>>,
203+
) {
195204
// Clear retry
196205
self.retry = None;
197206

@@ -255,9 +264,7 @@ impl<
255264

256265
// Create new message
257266
msg.id = request;
258-
let encoded = Backfiller::Request::<C::Signature, D>(msg.clone())
259-
.encode()
260-
.into();
267+
let encoded = Backfiller::Request(msg.clone());
261268

262269
// Try to send
263270
if sender
@@ -298,9 +305,16 @@ impl<
298305
async fn run(
299306
mut self,
300307
mut voter: voter::Mailbox<C::Signature, D>,
301-
mut sender: impl Sender<PublicKey = C::PublicKey>,
302-
mut receiver: impl Receiver<PublicKey = C::PublicKey>,
308+
sender: impl Sender<PublicKey = C::PublicKey>,
309+
receiver: impl Receiver<PublicKey = C::PublicKey>,
303310
) {
311+
// Wrap channel
312+
let (mut sender, mut receiver) = wrap(
313+
(self.max_fetch_count, self.max_participants),
314+
sender,
315+
receiver,
316+
);
317+
304318
// Wait for an event
305319
let mut current_view = 0;
306320
let mut finalized_view = 0;
@@ -423,14 +437,19 @@ impl<
423437
}
424438
},
425439
network = receiver.recv() => {
426-
let (s, msg) = network.unwrap();
427-
let msg = match Backfiller::decode_cfg(msg, &(self.max_fetch_count, self.max_participants)) {
440+
// Break if there is an internal error
441+
let Ok((s, msg)) = network else {
442+
break;
443+
};
444+
445+
// Skip if there is a decoding error
446+
let msg = match msg {
428447
Ok(msg) => msg,
429448
Err(err) => {
430449
warn!(?err, sender = ?s, "failed to decode message");
431450
self.requester.block(s);
432451
continue;
433-
},
452+
}
434453
};
435454
match msg {
436455
Backfiller::Request(request) => {
@@ -465,13 +484,11 @@ impl<
465484

466485
// Send response
467486
debug!(sender = ?s, ?notarizations, ?missing_notarizations, ?nullifications, ?missing_nullifications, "sending response");
468-
let msg = Backfiller::Response::<C::Signature, D>(Response::new(
487+
let msg = Backfiller::Response(Response::new(
469488
request.id,
470489
notarizations_found,
471490
nullifications_found,
472-
))
473-
.encode()
474-
.into();
491+
));
475492
sender
476493
.send(Recipients::One(s), msg, false)
477494
.await

consensus/src/simplex/actors/voter/actor.rs

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@ use crate::{
1111
},
1212
Automaton, Relay, Reporter, Supervisor, LATENCY,
1313
};
14-
use commonware_codec::{Decode, Encode};
1514
use commonware_cryptography::{Digest, Scheme};
1615
use commonware_macros::select;
17-
use commonware_p2p::{Receiver, Recipients, Sender};
16+
use commonware_p2p::{
17+
utils::codec::{wrap, WrappedSender},
18+
Receiver, Recipients, Sender,
19+
};
1820
use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage};
1921
use commonware_storage::journal::variable::{Config as JConfig, Journal};
2022
use commonware_utils::quorum;
@@ -598,7 +600,10 @@ impl<
598600
null_retry
599601
}
600602

601-
async fn timeout(&mut self, sender: &mut impl Sender) {
603+
async fn timeout<Sr: Sender>(
604+
&mut self,
605+
sender: &mut WrappedSender<Sr, usize, Voter<C::Signature, D>>,
606+
) {
602607
// Set timeout fired
603608
let round = self.views.get_mut(&self.view).unwrap();
604609
let mut retry = false;
@@ -616,16 +621,14 @@ impl<
616621
let past_view = self.view - 1;
617622
if retry && past_view > 0 {
618623
if let Some(notarization) = self.construct_notarization(past_view, true) {
619-
let msg = Voter::Notarization(notarization).encode().into();
624+
let msg = Voter::Notarization(notarization);
620625
sender.send(Recipients::All, msg, true).await.unwrap();
621626
self.broadcast_messages
622627
.get_or_create(&metrics::NOTARIZATION)
623628
.inc();
624629
debug!(view = past_view, "rebroadcast entry notarization");
625630
} else if let Some(nullification) = self.construct_nullification(past_view, true) {
626-
let msg = Voter::Nullification::<C::Signature, D>(nullification)
627-
.encode()
628-
.into();
631+
let msg = Voter::Nullification(nullification);
629632
sender.send(Recipients::All, msg, true).await.unwrap();
630633
self.broadcast_messages
631634
.get_or_create(&metrics::NULLIFICATION)
@@ -663,7 +666,7 @@ impl<
663666
.expect("unable to sync journal");
664667

665668
// Broadcast nullify
666-
let msg = Voter::Nullify::<C::Signature, D>(nullify).encode().into();
669+
let msg = Voter::Nullify(nullify);
667670
sender.send(Recipients::All, msg, true).await.unwrap();
668671
self.broadcast_messages
669672
.get_or_create(&metrics::NULLIFY)
@@ -1444,10 +1447,10 @@ impl<
14441447
Some(Finalization::new(proposal, signatures))
14451448
}
14461449

1447-
async fn notify(
1450+
async fn notify<Sr: Sender>(
14481451
&mut self,
14491452
backfiller: &mut resolver::Mailbox<C::Signature, D>,
1450-
sender: &mut impl Sender,
1453+
sender: &mut WrappedSender<Sr, usize, Voter<C::Signature, D>>,
14511454
view: u64,
14521455
) {
14531456
// Attempt to notarize
@@ -1464,7 +1467,7 @@ impl<
14641467
.expect("unable to sync journal");
14651468

14661469
// Broadcast the notarize
1467-
let msg = Voter::Notarize(notarize).encode().into();
1470+
let msg = Voter::Notarize(notarize);
14681471
sender.send(Recipients::All, msg, true).await.unwrap();
14691472
self.broadcast_messages
14701473
.get_or_create(&metrics::NOTARIZE)
@@ -1500,7 +1503,7 @@ impl<
15001503
.await;
15011504

15021505
// Broadcast the notarization
1503-
let msg = Voter::Notarization(notarization).encode().into();
1506+
let msg = Voter::Notarization(notarization);
15041507
sender.send(Recipients::All, msg, true).await.unwrap();
15051508
self.broadcast_messages
15061509
.get_or_create(&metrics::NOTARIZATION)
@@ -1531,9 +1534,7 @@ impl<
15311534
.await;
15321535

15331536
// Broadcast the nullification
1534-
let msg = Voter::Nullification::<C::Signature, D>(nullification)
1535-
.encode()
1536-
.into();
1537+
let msg = Voter::Nullification(nullification);
15371538
sender.send(Recipients::All, msg, true).await.unwrap();
15381539
self.broadcast_messages
15391540
.get_or_create(&metrics::NULLIFICATION)
@@ -1573,7 +1574,7 @@ impl<
15731574
);
15741575
let finalization = self.construct_finalization(self.last_finalized, true);
15751576
if let Some(finalization) = finalization {
1576-
let msg = Voter::Finalization(finalization).encode().into();
1577+
let msg = Voter::Finalization(finalization);
15771578
sender
15781579
.send(Recipients::All, msg, true)
15791580
.await
@@ -1605,7 +1606,7 @@ impl<
16051606
.expect("unable to sync journal");
16061607

16071608
// Broadcast the finalize
1608-
let msg = Voter::Finalize(finalize).encode().into();
1609+
let msg = Voter::Finalize(finalize);
16091610
sender.send(Recipients::All, msg, true).await.unwrap();
16101611
self.broadcast_messages
16111612
.get_or_create(&metrics::FINALIZE)
@@ -1641,7 +1642,7 @@ impl<
16411642
.await;
16421643

16431644
// Broadcast the finalization
1644-
let msg = Voter::Finalization(finalization).encode().into();
1645+
let msg = Voter::Finalization(finalization);
16451646
sender.send(Recipients::All, msg, true).await.unwrap();
16461647
self.broadcast_messages
16471648
.get_or_create(&metrics::FINALIZATION)
@@ -1661,9 +1662,12 @@ impl<
16611662
async fn run(
16621663
mut self,
16631664
mut backfiller: resolver::Mailbox<C::Signature, D>,
1664-
mut sender: impl Sender,
1665-
mut receiver: impl Receiver,
1665+
sender: impl Sender,
1666+
receiver: impl Receiver,
16661667
) {
1668+
// Wrap channel
1669+
let (mut sender, mut receiver) = wrap(self.max_participants, sender, receiver);
1670+
16671671
// Compute genesis
16681672
let genesis = self.automaton.genesis().await;
16691673
self.genesis = Some(genesis);
@@ -1907,11 +1911,13 @@ impl<
19071911
}
19081912
},
19091913
msg = receiver.recv() => {
1910-
// Parse message
1914+
// Break if there is an internal error
19111915
let Ok((s, msg)) = msg else {
19121916
break;
19131917
};
1914-
let Ok(msg) = Voter::decode_cfg(msg, &self.max_participants) else {
1918+
1919+
// Skip if there is a decoding error
1920+
let Ok(msg) = msg else {
19151921
continue;
19161922
};
19171923

0 commit comments

Comments
 (0)