Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,16 +194,12 @@ stability_scope!(BETA, cfg(not(target_arch = "wasm32")) {
/// treat every broadcast identically can set this to `()`.
type Plan: Send;

/// Broadcast a payload to the given recipients.
///
/// Returns `true` when the relay accepted the payload for the requested
/// broadcast plan. Returns `false` when the relay could not complete the
/// handoff.
/// Broadcast a payload according to the given plan.
fn broadcast(
&mut self,
payload: Self::Digest,
plan: Self::Plan,
) -> impl Future<Output = bool> + Send;
) -> impl Future<Output = ()> + Send;
}

/// Reporter is the interface responsible for reporting activity to some external actor.
Expand Down
6 changes: 0 additions & 6 deletions consensus/src/marshal/application/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,6 @@ use crate::{
types::{Epoch, Epocher, Height, Round},
};
use commonware_cryptography::certificate::Scheme;
use commonware_utils::sync::Mutex;
use std::sync::Arc;

/// Cache for the last block built during proposal, shared between the
/// proposer task and the broadcast path.
pub(crate) type LastBuilt<B> = Arc<Mutex<Option<(Round, B)>>>;

/// Which stage of verification a block has reached.
///
Expand Down
79 changes: 26 additions & 53 deletions consensus/src/marshal/coding/marshaled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,7 @@ use crate::{
marshal::{
ancestry::AncestorStream,
application::{
validation::{
is_inferred_reproposal_at_certify, is_valid_reproposal_at_verify, LastBuilt, Stage,
},
validation::{is_inferred_reproposal_at_certify, is_valid_reproposal_at_verify, Stage},
verification_tasks::VerificationTasks,
},
coding::{
Expand All @@ -106,6 +104,7 @@ use commonware_cryptography::{
Committable, Digestible, Hasher,
};
use commonware_macros::select;
use commonware_p2p::Recipients;
use commonware_parallel::Strategy;
use commonware_runtime::{
telemetry::metrics::histogram::{Buckets, Timed},
Expand All @@ -116,7 +115,6 @@ use commonware_utils::{
fallible::OneshotExt,
oneshot::{self, error::RecvError},
},
sync::Mutex,
NZU16,
};
use futures::future::{ready, try_join, Either, Ready};
Expand Down Expand Up @@ -183,7 +181,6 @@ where
scheme_provider: Z,
epocher: ES,
strategy: S,
last_built: LastBuilt<CodedBlock<B, C, H>>,
verification_tasks: VerificationTasks<Commitment>,
cached_genesis: Arc<OnceLock<(Commitment, CodedBlock<B, C, H>)>>,

Expand Down Expand Up @@ -266,7 +263,6 @@ where
scheme_provider,
strategy,
epocher,
last_built: Arc::new(Mutex::new(None)),
verification_tasks: VerificationTasks::new(),
cached_genesis: Arc::new(OnceLock::new()),

Expand Down Expand Up @@ -491,15 +487,15 @@ where
/// boundary block to avoid creating blocks that would be invalidated by the epoch transition.
///
/// The proposal operation is spawned in a background task and returns a receiver that will
/// contain the proposed block's digest when ready. The built block is cached for later
/// broadcasting.
/// contain the proposed block's commitment when ready. The built block is persisted via
/// [`core::Mailbox::verified`] before the commitment is delivered, so consensus can rely
/// on the block surviving restart.
async fn propose(
&mut self,
consensus_context: Context<Commitment, <Z::Scheme as CertificateScheme>::PublicKey>,
) -> oneshot::Receiver<Self::Digest> {
let mut marshal = self.marshal.clone();
let mut application = self.application.clone();
let last_built = self.last_built.clone();
let epocher = self.epocher.clone();
let strategy = self.strategy.clone();
let cached_genesis = self.cached_genesis.clone();
Expand Down Expand Up @@ -537,10 +533,6 @@ where
if let Some(block) = marshal.get_verified(consensus_context.round).await {
let commitment = block.commitment();
let round = consensus_context.round;
{
let mut lock = last_built.lock();
*lock = Some((round, block));
}
let success = tx.send_lossy(commitment);
debug!(
?round,
Expand Down Expand Up @@ -593,11 +585,14 @@ where
if parent.height() == last_in_epoch {
let commitment = parent.commitment();
let round = consensus_context.round;
{
let mut lock = last_built.lock();
*lock = Some((round, parent));
if !marshal.verified(round, parent).await {
debug!(
?round,
?commitment,
"marshal rejected re-proposed boundary block"
);
return;
}

let success = tx.send_lossy(commitment);
debug!(
?round,
Expand Down Expand Up @@ -643,11 +638,10 @@ where

let commitment = coded_block.commitment();
let round = consensus_context.round;
{
let mut lock = last_built.lock();
*lock = Some((round, coded_block));
if !marshal.verified(round, coded_block).await {
debug!(?round, ?commitment, "marshal rejected proposed block");
return;
}

let success = tx.send_lossy(commitment);
debug!(?round, ?commitment, success, "proposed new block");
});
Expand Down Expand Up @@ -973,38 +967,17 @@ where
type PublicKey = <Z::Scheme as CertificateScheme>::PublicKey;
type Plan = Plan<Self::PublicKey>;

async fn broadcast(&mut self, commitment: Self::Digest, plan: Self::Plan) -> bool {
match plan {
Plan::Propose => {
let Some((round, block)) = self.last_built.lock().take() else {
warn!(?commitment, "missing block to broadcast");
return false;
};
if block.commitment() != commitment {
warn!(
round = %round,
commitment = %block.commitment(),
height = %block.height(),
"skipping requested broadcast of block with mismatched commitment"
);
return false;
};
let height = block.height();
if !self.marshal.proposed(round, block).await {
warn!(?round, ?commitment, %height, "marshal unable to accept block");
return false;
}
debug!(?round, ?commitment, %height, "requested broadcast of built block");
true
}
Plan::Forward { .. } => {
// Coding variant does not support targeted forwarding;
Comment thread
patrick-ogrady marked this conversation as resolved.
// peers reconstruct blocks from erasure-coded shards.
//
// TODO(#3389): Support checked data forwarding for PhasedScheme.
true
}
}
async fn broadcast(&mut self, commitment: Self::Digest, plan: Self::Plan) {
// Coding variant does not support targeted forwarding;
// peers reconstruct blocks from erasure-coded shards.
//
// TODO(#3389): Support checked data forwarding for PhasedScheme.
let Plan::Propose { round } = plan else {
return;
};
self.marshal
.forward(round, commitment, Recipients::All)
.await;
}
}

Expand Down
Loading
Loading