Skip to content

Commit bd6ac31

Browse files
spike
1 parent d2b918e commit bd6ac31

18 files changed

Lines changed: 222 additions & 729 deletions

File tree

consensus/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -196,14 +196,14 @@ stability_scope!(BETA, cfg(not(target_arch = "wasm32")) {
196196

197197
/// Broadcast a payload to the given recipients.
198198
///
199-
/// Returns `true` when the relay accepted the payload for the requested
200-
/// broadcast plan. Returns `false` when the relay could not complete the
201-
/// handoff.
199+
/// Broadcast is best-effort dissemination and does not signal
200+
/// durability; callers that need durability must rely on
201+
/// [`Automaton::propose`] or [`Automaton::verify`].
202202
fn broadcast(
203203
&mut self,
204204
payload: Self::Digest,
205205
plan: Self::Plan,
206-
) -> impl Future<Output = bool> + Send;
206+
) -> impl Future<Output = ()> + Send;
207207
}
208208

209209
/// Reporter is the interface responsible for reporting activity to some external actor.

consensus/src/marshal/application/validation.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,6 @@ use crate::{
88
types::{Epoch, Epocher, Height, Round},
99
};
1010
use commonware_cryptography::certificate::Scheme;
11-
use commonware_utils::sync::Mutex;
12-
use std::sync::Arc;
13-
14-
/// Cache for the last block built during proposal, shared between the
15-
/// proposer task and the broadcast path.
16-
pub(crate) type LastBuilt<B> = Arc<Mutex<Option<(Round, B)>>>;
1711

1812
/// Which stage of verification a block has reached.
1913
///

consensus/src/marshal/coding/marshaled.rs

Lines changed: 26 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ use crate::{
8383
ancestry::AncestorStream,
8484
application::{
8585
validation::{
86-
is_inferred_reproposal_at_certify, is_valid_reproposal_at_verify, LastBuilt, Stage,
86+
is_inferred_reproposal_at_certify, is_valid_reproposal_at_verify, Stage,
8787
},
8888
verification_tasks::VerificationTasks,
8989
},
@@ -106,6 +106,7 @@ use commonware_cryptography::{
106106
Committable, Digestible, Hasher,
107107
};
108108
use commonware_macros::select;
109+
use commonware_p2p::Recipients;
109110
use commonware_parallel::Strategy;
110111
use commonware_runtime::{
111112
telemetry::metrics::histogram::{Buckets, Timed},
@@ -116,7 +117,6 @@ use commonware_utils::{
116117
fallible::OneshotExt,
117118
oneshot::{self, error::RecvError},
118119
},
119-
sync::Mutex,
120120
NZU16,
121121
};
122122
use futures::future::{ready, try_join, Either, Ready};
@@ -183,7 +183,6 @@ where
183183
scheme_provider: Z,
184184
epocher: ES,
185185
strategy: S,
186-
last_built: LastBuilt<CodedBlock<B, C, H>>,
187186
verification_tasks: VerificationTasks<Commitment>,
188187
cached_genesis: Arc<OnceLock<(Commitment, CodedBlock<B, C, H>)>>,
189188

@@ -266,7 +265,6 @@ where
266265
scheme_provider,
267266
strategy,
268267
epocher,
269-
last_built: Arc::new(Mutex::new(None)),
270268
verification_tasks: VerificationTasks::new(),
271269
cached_genesis: Arc::new(OnceLock::new()),
272270

@@ -491,15 +489,15 @@ where
491489
/// boundary block to avoid creating blocks that would be invalidated by the epoch transition.
492490
///
493491
/// The proposal operation is spawned in a background task and returns a receiver that will
494-
/// contain the proposed block's digest when ready. The built block is cached for later
495-
/// broadcasting.
492+
/// contain the proposed block's commitment when ready. The built block is persisted via
493+
/// [`core::Mailbox::verified`] before the commitment is delivered, so consensus can rely
494+
/// on the block surviving restart.
496495
async fn propose(
497496
&mut self,
498497
consensus_context: Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
499498
) -> oneshot::Receiver<Self::Digest> {
500499
let mut marshal = self.marshal.clone();
501500
let mut application = self.application.clone();
502-
let last_built = self.last_built.clone();
503501
let epocher = self.epocher.clone();
504502
let strategy = self.strategy.clone();
505503
let cached_genesis = self.cached_genesis.clone();
@@ -537,10 +535,6 @@ where
537535
if let Some(block) = marshal.get_verified(consensus_context.round).await {
538536
let commitment = block.commitment();
539537
let round = consensus_context.round;
540-
{
541-
let mut lock = last_built.lock();
542-
*lock = Some((round, block));
543-
}
544538
let success = tx.send_lossy(commitment);
545539
debug!(
546540
?round,
@@ -593,11 +587,14 @@ where
593587
if parent.height() == last_in_epoch {
594588
let commitment = parent.commitment();
595589
let round = consensus_context.round;
596-
{
597-
let mut lock = last_built.lock();
598-
*lock = Some((round, parent));
590+
if !marshal.verified(round, parent).await {
591+
debug!(
592+
?round,
593+
?commitment,
594+
"marshal rejected re-proposed boundary block"
595+
);
596+
return;
599597
}
600-
601598
let success = tx.send_lossy(commitment);
602599
debug!(
603600
?round,
@@ -643,11 +640,10 @@ where
643640

644641
let commitment = coded_block.commitment();
645642
let round = consensus_context.round;
646-
{
647-
let mut lock = last_built.lock();
648-
*lock = Some((round, coded_block));
643+
if !marshal.verified(round, coded_block).await {
644+
debug!(?round, ?commitment, "marshal rejected proposed block");
645+
return;
649646
}
650-
651647
let success = tx.send_lossy(commitment);
652648
debug!(?round, ?commitment, success, "proposed new block");
653649
});
@@ -973,38 +969,17 @@ where
973969
type PublicKey = <Z::Scheme as CertificateScheme>::PublicKey;
974970
type Plan = Plan<Self::PublicKey>;
975971

976-
async fn broadcast(&mut self, commitment: Self::Digest, plan: Self::Plan) -> bool {
977-
match plan {
978-
Plan::Propose => {
979-
let Some((round, block)) = self.last_built.lock().take() else {
980-
warn!(?commitment, "missing block to broadcast");
981-
return false;
982-
};
983-
if block.commitment() != commitment {
984-
warn!(
985-
round = %round,
986-
commitment = %block.commitment(),
987-
height = %block.height(),
988-
"skipping requested broadcast of block with mismatched commitment"
989-
);
990-
return false;
991-
};
992-
let height = block.height();
993-
if !self.marshal.proposed(round, block).await {
994-
warn!(?round, ?commitment, %height, "marshal unable to accept block");
995-
return false;
996-
}
997-
debug!(?round, ?commitment, %height, "requested broadcast of built block");
998-
true
999-
}
1000-
Plan::Forward { .. } => {
1001-
// Coding variant does not support targeted forwarding;
1002-
// peers reconstruct blocks from erasure-coded shards.
1003-
//
1004-
// TODO(#3389): Support checked data forwarding for PhasedScheme.
1005-
true
1006-
}
1007-
}
972+
async fn broadcast(&mut self, commitment: Self::Digest, plan: Self::Plan) {
973+
// Coding only disseminates on the initial proposer broadcast; peers
974+
// reconstruct blocks from erasure-coded shards rather than receiving
975+
// targeted full-block forwards. TODO(#3389): checked data forwarding
976+
// for PhasedScheme.
977+
let Plan::Propose { round } = plan else {
978+
return;
979+
};
980+
self.marshal
981+
.forward(round, commitment, Recipients::All)
982+
.await;
1008983
}
1009984
}
1010985

0 commit comments

Comments
 (0)