Skip to content

Commit 826c503

Browse files
[consensus] Decouple Propose Durability from Broadcast (#3630)
1 parent d2b918e commit 826c503

20 files changed

Lines changed: 245 additions & 802 deletions

File tree

consensus/src/lib.rs

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -194,16 +194,12 @@ stability_scope!(BETA, cfg(not(target_arch = "wasm32")) {
194194
/// treat every broadcast identically can set this to `()`.
195195
type Plan: Send;
196196

197-
/// Broadcast a payload to the given recipients.
198-
///
199-
/// Returns `true` when the relay accepted the payload for the requested
200-
/// broadcast plan. Returns `false` when the relay could not complete the
201-
/// handoff.
197+
/// Broadcast a payload according to the given plan.
202198
fn broadcast(
203199
&mut self,
204200
payload: Self::Digest,
205201
plan: Self::Plan,
206-
) -> impl Future<Output = bool> + Send;
202+
) -> impl Future<Output = ()> + Send;
207203
}
208204

209205
/// Reporter is the interface responsible for reporting activity to some external actor.

consensus/src/marshal/application/validation.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,6 @@ use crate::{
88
types::{Epoch, Epocher, Height, Round},
99
};
1010
use commonware_cryptography::certificate::Scheme;
11-
use commonware_utils::sync::Mutex;
12-
use std::sync::Arc;
13-
14-
/// Cache for the last block built during proposal, shared between the
15-
/// proposer task and the broadcast path.
16-
pub(crate) type LastBuilt<B> = Arc<Mutex<Option<(Round, B)>>>;
1711

1812
/// Which stage of verification a block has reached.
1913
///

consensus/src/marshal/coding/marshaled.rs

Lines changed: 26 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,7 @@ use crate::{
8282
marshal::{
8383
ancestry::AncestorStream,
8484
application::{
85-
validation::{
86-
is_inferred_reproposal_at_certify, is_valid_reproposal_at_verify, LastBuilt, Stage,
87-
},
85+
validation::{is_inferred_reproposal_at_certify, is_valid_reproposal_at_verify, Stage},
8886
verification_tasks::VerificationTasks,
8987
},
9088
coding::{
@@ -106,6 +104,7 @@ use commonware_cryptography::{
106104
Committable, Digestible, Hasher,
107105
};
108106
use commonware_macros::select;
107+
use commonware_p2p::Recipients;
109108
use commonware_parallel::Strategy;
110109
use commonware_runtime::{
111110
telemetry::metrics::histogram::{Buckets, Timed},
@@ -116,7 +115,6 @@ use commonware_utils::{
116115
fallible::OneshotExt,
117116
oneshot::{self, error::RecvError},
118117
},
119-
sync::Mutex,
120118
NZU16,
121119
};
122120
use futures::future::{ready, try_join, Either, Ready};
@@ -183,7 +181,6 @@ where
183181
scheme_provider: Z,
184182
epocher: ES,
185183
strategy: S,
186-
last_built: LastBuilt<CodedBlock<B, C, H>>,
187184
verification_tasks: VerificationTasks<Commitment>,
188185
cached_genesis: Arc<OnceLock<(Commitment, CodedBlock<B, C, H>)>>,
189186

@@ -266,7 +263,6 @@ where
266263
scheme_provider,
267264
strategy,
268265
epocher,
269-
last_built: Arc::new(Mutex::new(None)),
270266
verification_tasks: VerificationTasks::new(),
271267
cached_genesis: Arc::new(OnceLock::new()),
272268

@@ -491,15 +487,15 @@ where
491487
/// boundary block to avoid creating blocks that would be invalidated by the epoch transition.
492488
///
493489
/// The proposal operation is spawned in a background task and returns a receiver that will
494-
/// contain the proposed block's digest when ready. The built block is cached for later
495-
/// broadcasting.
490+
/// contain the proposed block's commitment when ready. The built block is persisted via
491+
/// [`core::Mailbox::verified`] before the commitment is delivered, so consensus can rely
492+
/// on the block surviving restart.
496493
async fn propose(
497494
&mut self,
498495
consensus_context: Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
499496
) -> oneshot::Receiver<Self::Digest> {
500497
let mut marshal = self.marshal.clone();
501498
let mut application = self.application.clone();
502-
let last_built = self.last_built.clone();
503499
let epocher = self.epocher.clone();
504500
let strategy = self.strategy.clone();
505501
let cached_genesis = self.cached_genesis.clone();
@@ -537,10 +533,6 @@ where
537533
if let Some(block) = marshal.get_verified(consensus_context.round).await {
538534
let commitment = block.commitment();
539535
let round = consensus_context.round;
540-
{
541-
let mut lock = last_built.lock();
542-
*lock = Some((round, block));
543-
}
544536
let success = tx.send_lossy(commitment);
545537
debug!(
546538
?round,
@@ -593,11 +585,14 @@ where
593585
if parent.height() == last_in_epoch {
594586
let commitment = parent.commitment();
595587
let round = consensus_context.round;
596-
{
597-
let mut lock = last_built.lock();
598-
*lock = Some((round, parent));
588+
if !marshal.verified(round, parent).await {
589+
debug!(
590+
?round,
591+
?commitment,
592+
"marshal rejected re-proposed boundary block"
593+
);
594+
return;
599595
}
600-
601596
let success = tx.send_lossy(commitment);
602597
debug!(
603598
?round,
@@ -643,11 +638,10 @@ where
643638

644639
let commitment = coded_block.commitment();
645640
let round = consensus_context.round;
646-
{
647-
let mut lock = last_built.lock();
648-
*lock = Some((round, coded_block));
641+
if !marshal.verified(round, coded_block).await {
642+
debug!(?round, ?commitment, "marshal rejected proposed block");
643+
return;
649644
}
650-
651645
let success = tx.send_lossy(commitment);
652646
debug!(?round, ?commitment, success, "proposed new block");
653647
});
@@ -973,38 +967,17 @@ where
973967
type PublicKey = <Z::Scheme as CertificateScheme>::PublicKey;
974968
type Plan = Plan<Self::PublicKey>;
975969

976-
async fn broadcast(&mut self, commitment: Self::Digest, plan: Self::Plan) -> bool {
977-
match plan {
978-
Plan::Propose => {
979-
let Some((round, block)) = self.last_built.lock().take() else {
980-
warn!(?commitment, "missing block to broadcast");
981-
return false;
982-
};
983-
if block.commitment() != commitment {
984-
warn!(
985-
round = %round,
986-
commitment = %block.commitment(),
987-
height = %block.height(),
988-
"skipping requested broadcast of block with mismatched commitment"
989-
);
990-
return false;
991-
};
992-
let height = block.height();
993-
if !self.marshal.proposed(round, block).await {
994-
warn!(?round, ?commitment, %height, "marshal unable to accept block");
995-
return false;
996-
}
997-
debug!(?round, ?commitment, %height, "requested broadcast of built block");
998-
true
999-
}
1000-
Plan::Forward { .. } => {
1001-
// Coding variant does not support targeted forwarding;
1002-
// peers reconstruct blocks from erasure-coded shards.
1003-
//
1004-
// TODO(#3389): Support checked data forwarding for PhasedScheme.
1005-
true
1006-
}
1007-
}
970+
async fn broadcast(&mut self, commitment: Self::Digest, plan: Self::Plan) {
971+
// Coding variant does not support targeted forwarding;
972+
// peers reconstruct blocks from erasure-coded shards.
973+
//
974+
// TODO(#3389): Support checked data forwarding for PhasedScheme.
975+
let Plan::Propose { round } = plan else {
976+
return;
977+
};
978+
self.marshal
979+
.forward(round, commitment, Recipients::All)
980+
.await;
1008981
}
1009982
}
1010983

0 commit comments

Comments
 (0)