Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,14 @@ stability_scope!(BETA, cfg(not(target_arch = "wasm32")) {

/// Broadcast a payload to the given recipients.
///
/// Returns `true` when the relay accepted the payload for the requested
/// broadcast plan. Returns `false` when the relay could not complete the
/// handoff.
/// Broadcast is best-effort dissemination and does not signal
Comment thread
patrick-ogrady marked this conversation as resolved.
Outdated
/// durability; callers that need durability must rely on
/// [`Automaton::propose`] or [`Automaton::verify`].
fn broadcast(
&mut self,
payload: Self::Digest,
plan: Self::Plan,
) -> impl Future<Output = bool> + Send;
) -> impl Future<Output = ()> + Send;
}

/// Reporter is the interface responsible for reporting activity to some external actor.
Expand Down
6 changes: 0 additions & 6 deletions consensus/src/marshal/application/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,6 @@ use crate::{
types::{Epoch, Epocher, Height, Round},
};
use commonware_cryptography::certificate::Scheme;
use commonware_utils::sync::Mutex;
use std::sync::Arc;

/// Cache for the last block built during proposal, shared between the
/// proposer task and the broadcast path.
pub(crate) type LastBuilt<B> = Arc<Mutex<Option<(Round, B)>>>;

/// Which stage of verification a block has reached.
///
Expand Down
79 changes: 26 additions & 53 deletions consensus/src/marshal/coding/marshaled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ use crate::{
marshal::{
ancestry::AncestorStream,
application::{
validation::{
is_inferred_reproposal_at_certify, is_valid_reproposal_at_verify, LastBuilt, Stage,
},
validation::{is_inferred_reproposal_at_certify, is_valid_reproposal_at_verify, Stage},
verification_tasks::VerificationTasks,
},
coding::{
Expand All @@ -106,6 +104,7 @@ use commonware_cryptography::{
Committable, Digestible, Hasher,
};
use commonware_macros::select;
use commonware_p2p::Recipients;
use commonware_parallel::Strategy;
use commonware_runtime::{
telemetry::metrics::histogram::{Buckets, Timed},
Expand All @@ -116,7 +115,6 @@ use commonware_utils::{
fallible::OneshotExt,
oneshot::{self, error::RecvError},
},
sync::Mutex,
NZU16,
};
use futures::future::{ready, try_join, Either, Ready};
Expand Down Expand Up @@ -183,7 +181,6 @@ where
scheme_provider: Z,
epocher: ES,
strategy: S,
last_built: LastBuilt<CodedBlock<B, C, H>>,
verification_tasks: VerificationTasks<Commitment>,
cached_genesis: Arc<OnceLock<(Commitment, CodedBlock<B, C, H>)>>,

Expand Down Expand Up @@ -266,7 +263,6 @@ where
scheme_provider,
strategy,
epocher,
last_built: Arc::new(Mutex::new(None)),
verification_tasks: VerificationTasks::new(),
cached_genesis: Arc::new(OnceLock::new()),

Expand Down Expand Up @@ -491,15 +487,15 @@ where
/// boundary block to avoid creating blocks that would be invalidated by the epoch transition.
///
/// The proposal operation is spawned in a background task and returns a receiver that will
/// contain the proposed block's digest when ready. The built block is cached for later
/// broadcasting.
/// contain the proposed block's commitment when ready. The built block is persisted via
/// [`core::Mailbox::verified`] before the commitment is delivered, so consensus can rely
/// on the block surviving restart.
async fn propose(
&mut self,
consensus_context: Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
) -> oneshot::Receiver<Self::Digest> {
let mut marshal = self.marshal.clone();
let mut application = self.application.clone();
let last_built = self.last_built.clone();
let epocher = self.epocher.clone();
let strategy = self.strategy.clone();
let cached_genesis = self.cached_genesis.clone();
Expand Down Expand Up @@ -537,10 +533,6 @@ where
if let Some(block) = marshal.get_verified(consensus_context.round).await {
let commitment = block.commitment();
let round = consensus_context.round;
{
let mut lock = last_built.lock();
*lock = Some((round, block));
}
let success = tx.send_lossy(commitment);
debug!(
?round,
Expand Down Expand Up @@ -593,11 +585,14 @@ where
if parent.height() == last_in_epoch {
let commitment = parent.commitment();
let round = consensus_context.round;
{
let mut lock = last_built.lock();
*lock = Some((round, parent));
if !marshal.verified(round, parent).await {
debug!(
?round,
?commitment,
"marshal rejected re-proposed boundary block"
);
return;
}

let success = tx.send_lossy(commitment);
debug!(
?round,
Expand Down Expand Up @@ -643,11 +638,10 @@ where

let commitment = coded_block.commitment();
let round = consensus_context.round;
{
let mut lock = last_built.lock();
*lock = Some((round, coded_block));
if !marshal.verified(round, coded_block).await {
debug!(?round, ?commitment, "marshal rejected proposed block");
return;
}

let success = tx.send_lossy(commitment);
debug!(?round, ?commitment, success, "proposed new block");
});
Expand Down Expand Up @@ -973,38 +967,17 @@ where
type PublicKey = <Z::Scheme as CertificateScheme>::PublicKey;
type Plan = Plan<Self::PublicKey>;

async fn broadcast(&mut self, commitment: Self::Digest, plan: Self::Plan) -> bool {
match plan {
Plan::Propose => {
let Some((round, block)) = self.last_built.lock().take() else {
warn!(?commitment, "missing block to broadcast");
return false;
};
if block.commitment() != commitment {
warn!(
round = %round,
commitment = %block.commitment(),
height = %block.height(),
"skipping requested broadcast of block with mismatched commitment"
);
return false;
};
let height = block.height();
if !self.marshal.proposed(round, block).await {
warn!(?round, ?commitment, %height, "marshal unable to accept block");
return false;
}
debug!(?round, ?commitment, %height, "requested broadcast of built block");
true
}
Plan::Forward { .. } => {
// Coding variant does not support targeted forwarding;
Comment thread
patrick-ogrady marked this conversation as resolved.
// peers reconstruct blocks from erasure-coded shards.
//
// TODO(#3389): Support checked data forwarding for PhasedScheme.
true
}
}
async fn broadcast(&mut self, commitment: Self::Digest, plan: Self::Plan) {
// Coding only disseminates on the initial proposer broadcast; peers
// reconstruct blocks from erasure-coded shards rather than receiving
// targeted full-block forwards. TODO(#3389): checked data forwarding
// for PhasedScheme.
let Plan::Propose { round } = plan else {
return;
};
self.marshal
.forward(round, commitment, Recipients::All)
.await;
}
}

Expand Down
Loading
Loading