diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index e5bdda384fe..06781cdae5f 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -7300,12 +7300,15 @@ impl BeaconChain { /// The epoch at which we require a data availability check in block processing. /// `None` if the `Deneb` fork is disabled. pub fn data_availability_boundary(&self) -> Option { - self.data_availability_checker.data_availability_boundary() + self.data_availability_checker + .custody_context() + .data_availability_boundary() } /// Returns true if epoch is within the data availability boundary pub fn da_check_required_for_epoch(&self, epoch: Epoch) -> bool { self.data_availability_checker + .custody_context() .da_check_required_for_epoch(epoch) } diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index df8c49f8deb..1b0aea5a2f0 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -645,26 +645,36 @@ pub fn signature_verify_chain_segment( &chain.spec, )?; - // unzip chain segment and verify kzg in bulk - let (roots, blocks): (Vec<_>, Vec<_>) = chain_segment.into_iter().unzip(); - let maybe_available_blocks = chain - .data_availability_checker - .verify_kzg_for_rpc_blocks(blocks)?; - // zip it back up - let mut signature_verified_blocks = roots - .into_iter() - .zip(maybe_available_blocks) - .map(|(block_root, maybe_available_block)| { - let consensus_context = ConsensusContext::new(maybe_available_block.slot()) - .set_current_block_root(block_root); - SignatureVerifiedBlock { - block: maybe_available_block, - block_root, - parent: None, - consensus_context, + let mut available_blocks = Vec::with_capacity(chain_segment.len()); + let mut signature_verified_blocks = Vec::with_capacity(chain_segment.len()); + + for (block_root, block) in chain_segment { + let consensus_context = + ConsensusContext::new(block.slot()).set_current_block_root(block_root); + + match block { + RpcBlock::FullyAvailable(available_block) => { + available_blocks.push(available_block.clone()); + signature_verified_blocks.push(SignatureVerifiedBlock { + block: MaybeAvailableBlock::Available(available_block), + block_root, + parent: None, + consensus_context, + }); } - }) - .collect::>(); + RpcBlock::BlockOnly { .. } => { + // RangeSync and BackfillSync already ensure that the chain segment is fully available + // so this shouldn't be possible in practice. + return Err(BlockError::InternalError( + "Chain segment is not fully available".to_string(), + )); + } + } + } + + chain + .data_availability_checker + .batch_verify_kzg_for_available_blocks(&available_blocks)?; // verify signatures let pubkey_cache = get_validator_pubkey_cache(chain)?; @@ -1297,16 +1307,28 @@ impl IntoExecutionPendingBlock for RpcBlock // Perform an early check to prevent wasting time on irrelevant blocks. let block_root = check_block_relevancy(self.as_block(), block_root, chain) .map_err(|e| BlockSlashInfo::SignatureNotChecked(self.signed_block_header(), e))?; - let maybe_available = chain - .data_availability_checker - .verify_kzg_for_rpc_block(self.clone()) - .map_err(|e| { - BlockSlashInfo::SignatureNotChecked( - self.signed_block_header(), - BlockError::AvailabilityCheck(e), - ) - })?; - SignatureVerifiedBlock::check_slashable(maybe_available, block_root, chain)? + + let maybe_available_block = match &self { + RpcBlock::FullyAvailable(available_block) => { + chain + .data_availability_checker + .verify_kzg_for_available_block(available_block) + .map_err(|e| { + BlockSlashInfo::SignatureNotChecked( + self.signed_block_header(), + BlockError::AvailabilityCheck(e), + ) + })?; + MaybeAvailableBlock::Available(available_block.clone()) + } + // No need to perform KZG verification unless we have a fully available block + RpcBlock::BlockOnly { block, block_root } => MaybeAvailableBlock::AvailabilityPending { + block_root: *block_root, + block: block.clone(), + }, + }; + + SignatureVerifiedBlock::check_slashable(maybe_available_block, block_root, chain)? .into_execution_pending_block_slashable(block_root, chain, notify_execution_layer) } diff --git a/beacon_node/beacon_chain/src/block_verification_types.rs b/beacon_node/beacon_chain/src/block_verification_types.rs index f7831d5c770..fd9c4663079 100644 --- a/beacon_node/beacon_chain/src/block_verification_types.rs +++ b/beacon_node/beacon_chain/src/block_verification_types.rs @@ -1,204 +1,154 @@ use crate::data_availability_checker::AvailabilityCheckError; -pub use crate::data_availability_checker::{AvailableBlock, MaybeAvailableBlock}; -use crate::data_column_verification::{CustodyDataColumn, CustodyDataColumnList}; -use crate::{PayloadVerificationOutcome, get_block_root}; +pub use crate::data_availability_checker::{ + AvailableBlock, AvailableBlockData, MaybeAvailableBlock, +}; +use crate::{BeaconChainTypes, CustodyContext, PayloadVerificationOutcome}; use educe::Educe; -use ssz_types::VariableList; use state_processing::ConsensusContext; use std::fmt::{Debug, Formatter}; use std::sync::Arc; use types::data::BlobIdentifier; use types::{ - BeaconBlockRef, BeaconState, BlindedPayload, BlobSidecarList, Epoch, EthSpec, Hash256, + BeaconBlockRef, BeaconState, BlindedPayload, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock, SignedBeaconBlockHeader, Slot, }; /// A block that has been received over RPC. It has 2 internal variants: /// -/// 1. `BlockAndBlobs`: A fully available post deneb block with all the blobs available. This variant -/// is only constructed after making consistency checks between blocks and blobs. -/// Hence, it is fully self contained w.r.t verification. i.e. this block has all the required -/// data to get verified and imported into fork choice. -/// -/// 2. `Block`: This can be a fully available pre-deneb block **or** a post-deneb block that may or may -/// not require blobs to be considered fully available. +/// 1. `FullyAvailable`: A fully available block. This can either be a pre-deneb block, a +/// post-Deneb block with blobs, a post-Fulu block with the columns the node is required to custody, +/// or a post-Deneb block that doesn't require blobs/columns. Hence, it is fully self contained w.r.t +/// verification. i.e. this block has all the required data to get verified and imported into fork choice. /// -/// Note: We make a distinction over blocks received over gossip because -/// in a post-deneb world, the blobs corresponding to a given block that are received -/// over rpc do not contain the proposer signature for dos resistance. +/// 2. `BlockOnly`: This is a post-deneb block that may or may not require blobs to be considered fully available. #[derive(Clone, Educe)] #[educe(Hash(bound(E: EthSpec)))] -pub struct RpcBlock { - block_root: Hash256, - block: RpcBlockInner, +pub enum RpcBlock { + FullyAvailable(AvailableBlock), + BlockOnly { + block: Arc>, + block_root: Hash256, + }, } impl Debug for RpcBlock { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "RpcBlock({:?})", self.block_root) + write!(f, "RpcBlock({:?})", self.block_root()) } } impl RpcBlock { pub fn block_root(&self) -> Hash256 { - self.block_root + match self { + RpcBlock::FullyAvailable(available_block) => available_block.block_root(), + RpcBlock::BlockOnly { block_root, .. } => *block_root, + } } pub fn as_block(&self) -> &SignedBeaconBlock { - match &self.block { - RpcBlockInner::Block(block) => block, - RpcBlockInner::BlockAndBlobs(block, _) => block, - RpcBlockInner::BlockAndCustodyColumns(block, _) => block, + match self { + RpcBlock::FullyAvailable(available_block) => available_block.block(), + RpcBlock::BlockOnly { block, .. } => block, } } pub fn block_cloned(&self) -> Arc> { - match &self.block { - RpcBlockInner::Block(block) => block.clone(), - RpcBlockInner::BlockAndBlobs(block, _) => block.clone(), - RpcBlockInner::BlockAndCustodyColumns(block, _) => block.clone(), - } - } - - pub fn blobs(&self) -> Option<&BlobSidecarList> { - match &self.block { - RpcBlockInner::Block(_) => None, - RpcBlockInner::BlockAndBlobs(_, blobs) => Some(blobs), - RpcBlockInner::BlockAndCustodyColumns(_, _) => None, + match self { + RpcBlock::FullyAvailable(available_block) => available_block.block_cloned(), + RpcBlock::BlockOnly { block, .. } => block.clone(), } } - pub fn custody_columns(&self) -> Option<&CustodyDataColumnList> { - match &self.block { - RpcBlockInner::Block(_) => None, - RpcBlockInner::BlockAndBlobs(_, _) => None, - RpcBlockInner::BlockAndCustodyColumns(_, data_columns) => Some(data_columns), + pub fn block_data(&self) -> Option<&AvailableBlockData> { + match self { + RpcBlock::FullyAvailable(available_block) => Some(available_block.data()), + RpcBlock::BlockOnly { .. } => None, } } } -/// Note: This variant is intentionally private because we want to safely construct the -/// internal variants after applying consistency checks to ensure that the block and blobs -/// are consistent with respect to each other. -#[derive(Debug, Clone, Educe)] -#[educe(Hash(bound(E: EthSpec)))] -enum RpcBlockInner { - /// Single block lookup response. This should potentially hit the data availability cache. - Block(Arc>), - /// This variant is used with parent lookups and by-range responses. It should have all blobs - /// ordered, all block roots matching, and the correct number of blobs for this block. - BlockAndBlobs(Arc>, BlobSidecarList), - /// This variant is used with parent lookups and by-range responses. It should have all - /// requested data columns, all block roots matching for this block. - BlockAndCustodyColumns(Arc>, CustodyDataColumnList), -} - impl RpcBlock { - /// Constructs a `Block` variant. - pub fn new_without_blobs( - block_root: Option, + /// Constructs an `RpcBlock` from a block and optional availability data. + /// + /// This function creates an RpcBlock which can be in one of two states: + /// - `FullyAvailable`: When `block_data` is provided, the block contains all required + /// data for verification. + /// - `BlockOnly`: When `block_data` is `None`, the block may still need additional + /// data to be considered fully available (used during block lookups or when blobs + /// will arrive separately). + /// + /// # Validation + /// + /// When `block_data` is provided, this function validates that: + /// - Block data is not provided when not required. + /// - Required blobs are present and match the expected count. + /// - Required custody columns are included based on the nodes custody requirements. + /// + /// # Errors + /// + /// Returns `AvailabilityCheckError` if: + /// - `InvalidAvailableBlockData`: Block data is provided but not required. + /// - `MissingBlobs`: Block requires blobs but they are missing or incomplete. + /// - `MissingCustodyColumns`: Block requires custody columns but they are incomplete. + pub fn new( block: Arc>, - ) -> Self { - let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); - - Self { - block_root, - block: RpcBlockInner::Block(block), + block_data: Option>, + custody_context: &CustodyContext, + spec: Arc, + ) -> Result + where + T: BeaconChainTypes, + { + match block_data { + Some(block_data) => Ok(RpcBlock::FullyAvailable(AvailableBlock::new( + block, + block_data, + custody_context, + spec, + )?)), + None => Ok(RpcBlock::BlockOnly { + block_root: block.canonical_root(), + block, + }), } } - /// Constructs a new `BlockAndBlobs` variant after making consistency - /// checks between the provided blocks and blobs. This struct makes no - /// guarantees about whether blobs should be present, only that they are - /// consistent with the block. An empty list passed in for `blobs` is - /// viewed the same as `None` passed in. - pub fn new( - block_root: Option, - block: Arc>, - blobs: Option>, - ) -> Result { - let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); - // Treat empty blob lists as if they are missing. - let blobs = blobs.filter(|b| !b.is_empty()); - - if let (Some(blobs), Ok(block_commitments)) = ( - blobs.as_ref(), - block.message().body().blob_kzg_commitments(), - ) { - if blobs.len() != block_commitments.len() { - return Err(AvailabilityCheckError::MissingBlobs); - } - for (blob, &block_commitment) in blobs.iter().zip(block_commitments.iter()) { - let blob_commitment = blob.kzg_commitment; - if blob_commitment != block_commitment { - return Err(AvailabilityCheckError::KzgCommitmentMismatch { - block_commitment, - blob_commitment, - }); - } - } - } - let inner = match blobs { - Some(blobs) => RpcBlockInner::BlockAndBlobs(block, blobs), - None => RpcBlockInner::Block(block), - }; - Ok(Self { - block_root, - block: inner, - }) - } - - pub fn new_with_custody_columns( - block_root: Option, - block: Arc>, - custody_columns: Vec>, - ) -> Result { - let block_root = block_root.unwrap_or_else(|| get_block_root(&block)); - - if block.num_expected_blobs() > 0 && custody_columns.is_empty() { - // The number of required custody columns is out of scope here. - return Err(AvailabilityCheckError::MissingCustodyColumns); - } - // Treat empty data column lists as if they are missing. - let inner = if !custody_columns.is_empty() { - RpcBlockInner::BlockAndCustodyColumns(block, VariableList::new(custody_columns)?) - } else { - RpcBlockInner::Block(block) - }; - Ok(Self { - block_root, - block: inner, - }) - } - #[allow(clippy::type_complexity)] pub fn deconstruct( self, ) -> ( Hash256, Arc>, - Option>, - Option>, + Option>, ) { - let block_root = self.block_root(); - match self.block { - RpcBlockInner::Block(block) => (block_root, block, None, None), - RpcBlockInner::BlockAndBlobs(block, blobs) => (block_root, block, Some(blobs), None), - RpcBlockInner::BlockAndCustodyColumns(block, data_columns) => { - (block_root, block, None, Some(data_columns)) + match self { + RpcBlock::FullyAvailable(available_block) => { + let (block_root, block, block_data) = available_block.deconstruct(); + (block_root, block, Some(block_data)) } + RpcBlock::BlockOnly { block, block_root } => (block_root, block, None), } } + pub fn n_blobs(&self) -> usize { - match &self.block { - RpcBlockInner::Block(_) | RpcBlockInner::BlockAndCustodyColumns(_, _) => 0, - RpcBlockInner::BlockAndBlobs(_, blobs) => blobs.len(), + if let Some(block_data) = self.block_data() { + match block_data { + AvailableBlockData::NoData | AvailableBlockData::DataColumns(_) => 0, + AvailableBlockData::Blobs(blobs) => blobs.len(), + } + } else { + 0 } } + pub fn n_data_columns(&self) -> usize { - match &self.block { - RpcBlockInner::Block(_) | RpcBlockInner::BlockAndBlobs(_, _) => 0, - RpcBlockInner::BlockAndCustodyColumns(_, data_columns) => data_columns.len(), + if let Some(block_data) = self.block_data() { + match block_data { + AvailableBlockData::NoData | AvailableBlockData::Blobs(_) => 0, + AvailableBlockData::DataColumns(columns) => columns.len(), + } + } else { + 0 } } } @@ -500,17 +450,21 @@ impl AsBlock for RpcBlock { self.as_block().message() } fn as_block(&self) -> &SignedBeaconBlock { - match &self.block { - RpcBlockInner::Block(block) => block, - RpcBlockInner::BlockAndBlobs(block, _) => block, - RpcBlockInner::BlockAndCustodyColumns(block, _) => block, + match self { + Self::BlockOnly { + block, + block_root: _, + } => block, + Self::FullyAvailable(available_block) => available_block.block(), } } fn block_cloned(&self) -> Arc> { - match &self.block { - RpcBlockInner::Block(block) => block.clone(), - RpcBlockInner::BlockAndBlobs(block, _) => block.clone(), - RpcBlockInner::BlockAndCustodyColumns(block, _) => block.clone(), + match self { + RpcBlock::FullyAvailable(available_block) => available_block.block_cloned(), + RpcBlock::BlockOnly { + block, + block_root: _, + } => block.clone(), } } fn canonical_root(&self) -> Hash256 { diff --git a/beacon_node/beacon_chain/src/builder.rs b/beacon_node/beacon_chain/src/builder.rs index dc38fc1c292..f0444615473 100644 --- a/beacon_node/beacon_chain/src/builder.rs +++ b/beacon_node/beacon_chain/src/builder.rs @@ -918,7 +918,6 @@ where let genesis_time = head_snapshot.beacon_state.genesis_time(); let canonical_head = CanonicalHead::new(fork_choice, Arc::new(head_snapshot)); let shuffling_cache_size = self.chain_config.shuffling_cache_size; - let complete_blob_backfill = self.chain_config.complete_blob_backfill; // Calculate the weak subjectivity point in which to backfill blocks to. let genesis_backfill_slot = if self.chain_config.genesis_backfill { @@ -961,14 +960,18 @@ where self.node_custody_type, head_epoch, ordered_custody_column_indices, - &self.spec, + slot_clock.clone(), + self.chain_config.complete_blob_backfill, + self.spec.clone(), ) } else { ( CustodyContext::new( self.node_custody_type, + slot_clock.clone(), ordered_custody_column_indices, - &self.spec, + self.chain_config.complete_blob_backfill, + self.spec.clone(), ), None, ) @@ -1045,7 +1048,6 @@ where genesis_backfill_slot, data_availability_checker: Arc::new( DataAvailabilityChecker::new( - complete_blob_backfill, slot_clock, self.kzg.clone(), store, diff --git a/beacon_node/beacon_chain/src/custody_context.rs b/beacon_node/beacon_chain/src/custody_context.rs index c512ce616a1..3124f52cfb7 100644 --- a/beacon_node/beacon_chain/src/custody_context.rs +++ b/beacon_node/beacon_chain/src/custody_context.rs @@ -1,13 +1,17 @@ use parking_lot::RwLock; use serde::{Deserialize, Serialize}; +use slot_clock::SlotClock; use ssz_derive::{Decode, Encode}; use std::marker::PhantomData; +use std::sync::Arc; use std::{ collections::{BTreeMap, HashMap}, sync::atomic::{AtomicU64, Ordering}, }; use tracing::{debug, warn}; -use types::{ChainSpec, ColumnIndex, Epoch, EthSpec, Slot}; +use types::{ChainSpec, ColumnIndex, Epoch, EthSpec, SignedBeaconBlock, Slot}; + +use crate::BeaconChainTypes; /// A delay before making the CGC change effective to the data availability checker. pub const CUSTODY_CHANGE_DA_EFFECTIVE_DELAY_SECONDS: u64 = 30; @@ -235,8 +239,7 @@ impl NodeCustodyType { /// Contains all the information the node requires to calculate the /// number of columns to be custodied when checking for DA. -#[derive(Debug)] -pub struct CustodyContext { +pub struct CustodyContext { /// The Number of custody groups required based on the number of validators /// that is attached to this node. /// @@ -249,20 +252,42 @@ pub struct CustodyContext { /// Stores an immutable, ordered list of all data column indices as determined by the node's NodeID /// on startup. This used to determine the node's custody columns. ordered_custody_column_indices: Vec, - _phantom_data: PhantomData, + slot_clock: T::SlotClock, + complete_blob_backfill: bool, + spec: Arc, + _phantom_data: PhantomData, +} + +impl std::fmt::Debug for CustodyContext { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CustodyContext") + .field( + "validator_custody_count", + &self.validator_custody_count.load(Ordering::Relaxed), + ) + .field("validator_registrations", &self.validator_registrations) + .field( + "ordered_custody_column_indices", + &self.ordered_custody_column_indices, + ) + .field("compete_blob_backfill", &self.complete_blob_backfill) + .finish_non_exhaustive() + } } -impl CustodyContext { +impl CustodyContext { /// Create a new custody default custody context object when no persisted object /// exists. /// /// The `node_custody_type` value is based on current cli parameters. pub fn new( node_custody_type: NodeCustodyType, + slot_clock: T::SlotClock, ordered_custody_column_indices: Vec, - spec: &ChainSpec, + complete_blob_backfill: bool, + spec: Arc, ) -> Self { - let cgc_override = node_custody_type.get_custody_count_override(spec); + let cgc_override = node_custody_type.get_custody_count_override(&spec); // If there's no override, we initialise `validator_custody_count` to 0. This has been the // existing behaviour and we maintain this for now to avoid a semantic schema change until // a later release. @@ -270,6 +295,9 @@ impl CustodyContext { validator_custody_count: AtomicU64::new(cgc_override.unwrap_or(0)), validator_registrations: RwLock::new(ValidatorRegistrations::new(cgc_override)), ordered_custody_column_indices, + slot_clock, + complete_blob_backfill, + spec, _phantom_data: PhantomData, } } @@ -293,7 +321,9 @@ impl CustodyContext { node_custody_type: NodeCustodyType, head_epoch: Epoch, ordered_custody_column_indices: Vec, - spec: &ChainSpec, + slot_clock: T::SlotClock, + complete_blob_backfill: bool, + spec: Arc, ) -> (Self, Option) { let CustodyContextSsz { mut validator_custody_at_head, @@ -303,7 +333,7 @@ impl CustodyContext { let mut custody_count_changed = None; - if let Some(cgc_from_cli) = node_custody_type.get_custody_count_override(spec) { + if let Some(cgc_from_cli) = node_custody_type.get_custody_count_override(&spec) { debug!( ?node_custody_type, persisted_custody_count = validator_custody_at_head, @@ -359,6 +389,9 @@ impl CustodyContext { .collect(), }), ordered_custody_column_indices, + slot_clock, + complete_blob_backfill, + spec, _phantom_data: PhantomData, }; @@ -377,10 +410,10 @@ impl CustodyContext { current_slot: Slot, spec: &ChainSpec, ) -> Option { - let Some((effective_epoch, new_validator_custody)) = self - .validator_registrations - .write() - .register_validators::(validators_and_balance, current_slot, spec) + let Some((effective_epoch, new_validator_custody)) = + self.validator_registrations + .write() + .register_validators::(validators_and_balance, current_slot, spec) else { return None; }; @@ -452,13 +485,13 @@ impl CustodyContext { /// Returns the count of columns this node must _sample_ for a block at `epoch` to import. pub fn num_of_data_columns_to_sample(&self, epoch: Epoch, spec: &ChainSpec) -> usize { let custody_group_count = self.custody_group_count_at_epoch(epoch, spec); - spec.sampling_size_columns::(custody_group_count) + spec.sampling_size_columns::(custody_group_count) .expect("should compute node sampling size from valid chain spec") } /// Returns whether the node should attempt reconstruction at a given epoch. pub fn should_attempt_reconstruction(&self, epoch: Epoch, spec: &ChainSpec) -> bool { - let min_columns_for_reconstruction = E::number_of_columns() / 2; + let min_columns_for_reconstruction = T::EthSpec::number_of_columns() / 2; // performing reconstruction is not necessary if sampling column count is exactly 50%, // because the node doesn't need the remaining columns. self.num_of_data_columns_to_sample(epoch, spec) > min_columns_for_reconstruction @@ -502,7 +535,7 @@ impl CustodyContext { }; // This is an unnecessary conversion for spec compliance, basically just multiplying by 1. - let columns_per_custody_group = spec.data_columns_per_group::() as usize; + let columns_per_custody_group = spec.data_columns_per_group::() as usize; let custody_column_count = columns_per_custody_group * custody_group_count; &self.ordered_custody_column_indices[..custody_column_count] @@ -527,6 +560,49 @@ impl CustodyContext { .write() .reset_validator_custody_requirements(effective_epoch); } + + /// Determines the blob requirements for a block. If the block is pre-deneb, no blobs are required. + /// If the epoch is from prior to the data availability boundary, no blobs are required. + pub fn blobs_required_for_epoch(&self, epoch: Epoch) -> bool { + self.da_check_required_for_epoch(epoch) && !self.spec.is_peer_das_enabled_for_epoch(epoch) + } + + /// Determines the data column requirements for an epoch. + /// - If the epoch is pre-peerdas, no data columns are required. + /// - If the epoch is from prior to the data availability boundary, no data columns are required. + pub fn data_columns_required_for_epoch(&self, epoch: Epoch) -> bool { + self.da_check_required_for_epoch(epoch) && self.spec.is_peer_das_enabled_for_epoch(epoch) + } + + /// See `Self::blobs_required_for_epoch` + pub fn blobs_required_for_block(&self, block: &SignedBeaconBlock) -> bool { + block.num_expected_blobs() > 0 && self.blobs_required_for_epoch(block.epoch()) + } + + /// See `Self::data_columns_required_for_epoch` + pub fn data_columns_required_for_block(&self, block: &SignedBeaconBlock) -> bool { + block.num_expected_blobs() > 0 && self.data_columns_required_for_epoch(block.epoch()) + } + + /// Returns true if the given epoch lies within the da boundary and false otherwise. + pub fn da_check_required_for_epoch(&self, block_epoch: Epoch) -> bool { + self.data_availability_boundary() + .is_some_and(|da_epoch| block_epoch >= da_epoch) + } + + /// The epoch at which we require a data availability check in block processing. + /// `None` if the `Deneb` fork is disabled. + pub fn data_availability_boundary(&self) -> Option { + let fork_epoch = self.spec.deneb_fork_epoch?; + + if self.complete_blob_backfill { + Some(fork_epoch) + } else { + let current_epoch = self.slot_clock.now()?.epoch(T::EthSpec::slots_per_epoch()); + self.spec + .min_epoch_data_availability_boundary(current_epoch) + } + } } /// Indicates that the custody group count (CGC) has increased. @@ -552,8 +628,8 @@ pub struct CustodyContextSsz { pub epoch_validator_custody_requirements: Vec<(Epoch, u64)>, } -impl From<&CustodyContext> for CustodyContextSsz { - fn from(context: &CustodyContext) -> Self { +impl From<&CustodyContext> for CustodyContextSsz { + fn from(context: &CustodyContext) -> Self { CustodyContextSsz { validator_custody_at_head: context.validator_custody_count.load(Ordering::Relaxed), // This field is deprecated and has no effect @@ -571,17 +647,39 @@ impl From<&CustodyContext> for CustodyContextSsz { #[cfg(test)] mod tests { + use std::time::Duration; + use super::*; - use crate::test_utils::generate_data_column_indices_rand_order; + use crate::test_utils::{EphemeralHarnessType, generate_data_column_indices_rand_order}; + use slot_clock::{SlotClock, TestingSlotClock}; use types::MainnetEthSpec; type E = MainnetEthSpec; + type T = EphemeralHarnessType; + + fn create_custody_context( + node_custody_type: NodeCustodyType, + spec: &ChainSpec, + ) -> CustodyContext { + let slot_clock = TestingSlotClock::new( + Slot::new(0), + Duration::from_secs(0), + Duration::from_millis(spec.slot_duration_ms), + ); + CustodyContext::::new( + node_custody_type, + slot_clock, + generate_data_column_indices_rand_order::(), + false, + Arc::new(spec.clone()), + ) + } fn setup_custody_context( spec: &ChainSpec, head_epoch: Epoch, epoch_and_cgc_tuples: Vec<(Epoch, u64)>, - ) -> CustodyContext { + ) -> CustodyContext { let cgc_at_head = epoch_and_cgc_tuples.last().unwrap().1; let ssz_context = CustodyContextSsz { validator_custody_at_head: cgc_at_head, @@ -589,19 +687,27 @@ mod tests { epoch_validator_custody_requirements: epoch_and_cgc_tuples, }; - let (custody_context, _) = CustodyContext::::new_from_persisted_custody_context( + let slot_clock = TestingSlotClock::new( + Slot::new(0), + Duration::from_secs(0), + Duration::from_millis(spec.slot_duration_ms), + ); + + let (custody_context, _) = CustodyContext::::new_from_persisted_custody_context( ssz_context, NodeCustodyType::Fullnode, head_epoch, generate_data_column_indices_rand_order::(), - spec, + slot_clock, + false, + Arc::new(spec.clone()), ); custody_context } fn complete_backfill_for_epochs( - custody_context: &CustodyContext, + custody_context: &CustodyContext, start_epoch: Epoch, end_epoch: Epoch, expected_cgc: u64, @@ -630,13 +736,21 @@ mod tests { epoch_validator_custody_requirements: vec![(Epoch::new(0), persisted_cgc)], }; + let slot_clock = TestingSlotClock::new( + Slot::new(0), + Duration::from_secs(0), + Duration::from_millis(spec.slot_duration_ms), + ); + let (custody_context, custody_count_changed) = - CustodyContext::::new_from_persisted_custody_context( + CustodyContext::::new_from_persisted_custody_context( ssz_context, target_node_custody_type, head_epoch, generate_data_column_indices_rand_order::(), - spec, + slot_clock, + false, + Arc::new(spec.clone()), ); // Verify CGC increased @@ -701,13 +815,21 @@ mod tests { epoch_validator_custody_requirements: vec![(Epoch::new(0), persisted_cgc)], }; + let slot_clock = TestingSlotClock::new( + Slot::new(0), + Duration::from_secs(0), + Duration::from_millis(spec.slot_duration_ms), + ); + let (custody_context, custody_count_changed) = - CustodyContext::::new_from_persisted_custody_context( + CustodyContext::::new_from_persisted_custody_context( ssz_context, target_node_custody_type, head_epoch, generate_data_column_indices_rand_order::(), - spec, + slot_clock, + false, + Arc::new(spec.clone()), ); // Verify CGC stays at persisted value (no reduction) @@ -728,11 +850,7 @@ mod tests { #[test] fn no_validators_supernode_default() { let spec = E::default_spec(); - let custody_context = CustodyContext::::new( - NodeCustodyType::Supernode, - generate_data_column_indices_rand_order::(), - &spec, - ); + let custody_context = create_custody_context(NodeCustodyType::Supernode, &spec); assert_eq!( custody_context.custody_group_count_at_head(&spec), spec.number_of_custody_groups @@ -746,11 +864,7 @@ mod tests { #[test] fn no_validators_semi_supernode_default() { let spec = E::default_spec(); - let custody_context = CustodyContext::::new( - NodeCustodyType::SemiSupernode, - generate_data_column_indices_rand_order::(), - &spec, - ); + let custody_context = create_custody_context(NodeCustodyType::SemiSupernode, &spec); assert_eq!( custody_context.custody_group_count_at_head(&spec), spec.number_of_custody_groups / 2 @@ -764,11 +878,7 @@ mod tests { #[test] fn no_validators_fullnode_default() { let spec = E::default_spec(); - let custody_context = CustodyContext::::new( - NodeCustodyType::Fullnode, - generate_data_column_indices_rand_order::(), - &spec, - ); + let custody_context = create_custody_context(NodeCustodyType::Fullnode, &spec); assert_eq!( custody_context.custody_group_count_at_head(&spec), spec.custody_requirement, @@ -783,11 +893,7 @@ mod tests { #[test] fn register_single_validator_should_update_cgc() { let spec = E::default_spec(); - let custody_context = CustodyContext::::new( - NodeCustodyType::Fullnode, - generate_data_column_indices_rand_order::(), - &spec, - ); + let custody_context = create_custody_context(NodeCustodyType::Fullnode, &spec); let bal_per_additional_group = spec.balance_per_additional_custody_group; let min_val_custody_requirement = spec.validator_custody_requirement; // One single node increases its balance over 3 epochs. @@ -801,7 +907,7 @@ mod tests { (vec![(0, 10 * bal_per_additional_group)], Some(10)), ]; - register_validators_and_assert_cgc::( + register_validators_and_assert_cgc( &custody_context, validators_and_expected_cgc_change, &spec, @@ -811,11 +917,7 @@ mod tests { #[test] fn register_multiple_validators_should_update_cgc() { let spec = E::default_spec(); - let custody_context = CustodyContext::::new( - NodeCustodyType::Fullnode, - generate_data_column_indices_rand_order::(), - &spec, - ); + let custody_context = create_custody_context(NodeCustodyType::Fullnode, &spec); let bal_per_additional_group = spec.balance_per_additional_custody_group; let min_val_custody_requirement = spec.validator_custody_requirement; // Add 3 validators over 3 epochs. @@ -842,21 +944,13 @@ mod tests { ), ]; - register_validators_and_assert_cgc::( - &custody_context, - validators_and_expected_cgc, - &spec, - ); + register_validators_and_assert_cgc(&custody_context, validators_and_expected_cgc, &spec); } #[test] fn register_validators_should_not_update_cgc_for_supernode() { let spec = E::default_spec(); - let custody_context = CustodyContext::::new( - NodeCustodyType::Supernode, - generate_data_column_indices_rand_order::(), - &spec, - ); + let custody_context = create_custody_context(NodeCustodyType::Supernode, &spec); let bal_per_additional_group = spec.balance_per_additional_custody_group; // Add 3 validators over 3 epochs. @@ -879,11 +973,7 @@ mod tests { ), ]; - register_validators_and_assert_cgc::( - &custody_context, - validators_and_expected_cgc, - &spec, - ); + register_validators_and_assert_cgc(&custody_context, validators_and_expected_cgc, &spec); let current_epoch = Epoch::new(2); assert_eq!( custody_context.num_of_custody_groups_to_sample(current_epoch, &spec), @@ -894,11 +984,7 @@ mod tests { #[test] fn cgc_change_should_be_effective_to_sampling_after_delay() { let spec = E::default_spec(); - let custody_context = CustodyContext::::new( - NodeCustodyType::Fullnode, - generate_data_column_indices_rand_order::(), - &spec, - ); + let custody_context = create_custody_context(NodeCustodyType::Fullnode, &spec); let current_slot = Slot::new(10); let current_epoch = current_slot.epoch(E::slots_per_epoch()); let default_sampling_size = @@ -929,11 +1015,7 @@ mod tests { #[test] fn validator_dropped_after_no_registrations_within_expiry_should_not_reduce_cgc() { let spec = E::default_spec(); - let custody_context = CustodyContext::::new( - NodeCustodyType::Fullnode, - generate_data_column_indices_rand_order::(), - &spec, - ); + let custody_context = create_custody_context(NodeCustodyType::Fullnode, &spec); let current_slot = Slot::new(10); let val_custody_units_1 = 10; let val_custody_units_2 = 5; @@ -975,11 +1057,7 @@ mod tests { #[test] fn validator_dropped_after_no_registrations_within_expiry() { let spec = E::default_spec(); - let custody_context = CustodyContext::::new( - NodeCustodyType::Fullnode, - generate_data_column_indices_rand_order::(), - &spec, - ); + let custody_context = create_custody_context(NodeCustodyType::Fullnode, &spec); let current_slot = Slot::new(10); let val_custody_units_1 = 10; let val_custody_units_2 = 5; @@ -1027,8 +1105,8 @@ mod tests { } /// Update the validator every epoch and assert cgc against expected values. - fn register_validators_and_assert_cgc( - custody_context: &CustodyContext, + fn register_validators_and_assert_cgc( + custody_context: &CustodyContext, validators_and_expected_cgc_changed: Vec<(ValidatorsAndBalances, Option)>, spec: &ChainSpec, ) { @@ -1051,12 +1129,7 @@ mod tests { #[test] fn custody_columns_for_epoch_no_validators_fullnode() { let spec = E::default_spec(); - let ordered_custody_column_indices = generate_data_column_indices_rand_order::(); - let custody_context = CustodyContext::::new( - NodeCustodyType::Fullnode, - ordered_custody_column_indices, - &spec, - ); + let custody_context = create_custody_context(NodeCustodyType::Fullnode, &spec); assert_eq!( custody_context.custody_columns_for_epoch(None, &spec).len(), @@ -1067,12 +1140,7 @@ mod tests { #[test] fn custody_columns_for_epoch_no_validators_supernode() { let spec = E::default_spec(); - let ordered_custody_column_indices = generate_data_column_indices_rand_order::(); - let custody_context = CustodyContext::::new( - NodeCustodyType::Supernode, - ordered_custody_column_indices, - &spec, - ); + let custody_context = create_custody_context(NodeCustodyType::Supernode, &spec); assert_eq!( custody_context.custody_columns_for_epoch(None, &spec).len(), @@ -1083,12 +1151,7 @@ mod tests { #[test] fn custody_columns_for_epoch_with_validators_should_match_cgc() { let spec = E::default_spec(); - let ordered_custody_column_indices = generate_data_column_indices_rand_order::(); - let custody_context = CustodyContext::::new( - NodeCustodyType::Fullnode, - ordered_custody_column_indices, - &spec, - ); + let custody_context = create_custody_context(NodeCustodyType::Fullnode, &spec); let val_custody_units = 10; let _ = custody_context.register_validators( @@ -1109,12 +1172,7 @@ mod tests { #[test] fn custody_columns_for_epoch_specific_epoch_uses_epoch_cgc() { let spec = E::default_spec(); - let ordered_custody_column_indices = generate_data_column_indices_rand_order::(); - let custody_context = CustodyContext::::new( - NodeCustodyType::Fullnode, - ordered_custody_column_indices, - &spec, - ); + let custody_context = create_custody_context(NodeCustodyType::Fullnode, &spec); let test_epoch = Epoch::new(5); let expected_cgc = custody_context.custody_group_count_at_epoch(test_epoch, &spec); @@ -1135,12 +1193,20 @@ mod tests { epoch_validator_custody_requirements: vec![], }; - let (custody_context, _) = CustodyContext::::new_from_persisted_custody_context( + let slot_clock = TestingSlotClock::new( + Slot::new(0), + Duration::from_secs(0), + Duration::from_millis(spec.slot_duration_ms), + ); + + let (custody_context, _) = CustodyContext::::new_from_persisted_custody_context( ssz_context, NodeCustodyType::Fullnode, Epoch::new(0), generate_data_column_indices_rand_order::(), - &spec, + slot_clock, + false, + Arc::new(spec.clone()), ); assert_eq!( @@ -1173,11 +1239,7 @@ mod tests { fn restore_semi_supernode_with_validators_can_exceed_64() { let spec = E::default_spec(); let semi_supernode_cgc = spec.number_of_custody_groups / 2; // 64 - let custody_context = CustodyContext::::new( - NodeCustodyType::SemiSupernode, - generate_data_column_indices_rand_order::(), - &spec, - ); + let custody_context = create_custody_context(NodeCustodyType::SemiSupernode, &spec); // Verify initial CGC is 64 (semi-supernode) assert_eq!( @@ -1323,12 +1385,20 @@ mod tests { ], }; - let (custody_context, _) = CustodyContext::::new_from_persisted_custody_context( + let slot_clock = TestingSlotClock::new( + Slot::new(0), + Duration::from_secs(0), + Duration::from_millis(spec.slot_duration_ms), + ); + + let (custody_context, _) = CustodyContext::::new_from_persisted_custody_context( ssz_context, NodeCustodyType::Fullnode, Epoch::new(20), generate_data_column_indices_rand_order::(), - &spec, + slot_clock, + false, + Arc::new(spec.clone()), ); // Verify head uses latest value diff --git a/beacon_node/beacon_chain/src/data_availability_checker.rs b/beacon_node/beacon_chain/src/data_availability_checker.rs index 7aec24b8e52..c50dbac3457 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker.rs @@ -1,17 +1,17 @@ use crate::blob_verification::{ GossipVerifiedBlob, KzgVerifiedBlob, KzgVerifiedBlobList, verify_kzg_for_blob_list, }; -use crate::block_verification_types::{ - AvailabilityPendingExecutedBlock, AvailableExecutedBlock, RpcBlock, -}; +use crate::block_verification_types::{AvailabilityPendingExecutedBlock, AvailableExecutedBlock}; use crate::data_availability_checker::overflow_lru_cache::{ DataAvailabilityCheckerInner, ReconstructColumnsDecision, }; use crate::{ BeaconChain, BeaconChainTypes, BeaconStore, BlockProcessStatus, CustodyContext, metrics, }; +use educe::Educe; use kzg::Kzg; use slot_clock::SlotClock; +use std::collections::HashSet; use std::fmt; use std::fmt::Debug; use std::num::NonZeroUsize; @@ -21,7 +21,7 @@ use task_executor::TaskExecutor; use tracing::{debug, error, instrument}; use types::data::{BlobIdentifier, BlobSidecar, FixedBlobSidecarList}; use types::{ - BlobSidecarList, BlockImportSource, ChainSpec, DataColumnSidecar, DataColumnSidecarList, Epoch, + BlobSidecarList, BlockImportSource, ChainSpec, DataColumnSidecar, DataColumnSidecarList, EthSpec, Hash256, SignedBeaconBlock, Slot, }; @@ -31,8 +31,8 @@ mod state_lru_cache; use crate::data_availability_checker::error::Error; use crate::data_column_verification::{ - CustodyDataColumn, GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, - KzgVerifiedDataColumn, verify_kzg_for_data_column_list, + GossipVerifiedDataColumn, KzgVerifiedCustodyDataColumn, KzgVerifiedDataColumn, + verify_kzg_for_data_column_list, }; use crate::metrics::{ KZG_DATA_COLUMN_RECONSTRUCTION_ATTEMPTS, KZG_DATA_COLUMN_RECONSTRUCTION_FAILURES, @@ -80,11 +80,10 @@ const STATE_LRU_CAPACITY_NON_ZERO: NonZeroUsize = new_non_zero_usize(32); /// proposer. Having a capacity > 1 is an optimization to prevent sync lookup from having re-fetch /// data during moments of unstable network conditions. pub struct DataAvailabilityChecker { - complete_blob_backfill: bool, availability_cache: Arc>, - slot_clock: T::SlotClock, kzg: Arc, - custody_context: Arc>, + custody_context: Arc>, + slot_clock: T::SlotClock, spec: Arc, } @@ -119,11 +118,10 @@ impl Debug for Availability { impl DataAvailabilityChecker { pub fn new( - complete_blob_backfill: bool, slot_clock: T::SlotClock, kzg: Arc, store: BeaconStore, - custody_context: Arc>, + custody_context: Arc>, spec: Arc, ) -> Result { let inner = DataAvailabilityCheckerInner::new( @@ -133,7 +131,6 @@ impl DataAvailabilityChecker { spec.clone(), )?; Ok(Self { - complete_blob_backfill, availability_cache: Arc::new(inner), slot_clock, kzg, @@ -142,7 +139,7 @@ impl DataAvailabilityChecker { }) } - pub fn custody_context(&self) -> &Arc> { + pub fn custody_context(&self) -> &Arc> { &self.custody_context } @@ -366,83 +363,61 @@ impl DataAvailabilityChecker { .remove_pre_execution_block(block_root); } - /// Verifies kzg commitments for an RpcBlock, returns a `MaybeAvailableBlock` that may - /// include the fully available block. + /// Verifies kzg commitments for an `AvailableBlock`.` /// /// WARNING: This function assumes all required blobs are already present, it does NOT /// check if there are any missing blobs. - pub fn verify_kzg_for_rpc_block( + pub fn verify_kzg_for_available_block( &self, - block: RpcBlock, - ) -> Result, AvailabilityCheckError> { - let (block_root, block, blobs, data_columns) = block.deconstruct(); - if self.blobs_required_for_block(&block) { - return if let Some(blob_list) = blobs { - verify_kzg_for_blob_list(blob_list.iter(), &self.kzg) + available_block: &AvailableBlock, + ) -> Result<(), AvailabilityCheckError> { + let block_data_required = self + .custody_context + .blobs_required_for_block(&available_block.block) + || self + .custody_context + .data_columns_required_for_block(&available_block.block); + match available_block.data() { + AvailableBlockData::NoData => { + if block_data_required { + if available_block.block.fork_name_unchecked().fulu_enabled() { + return Err(AvailabilityCheckError::MissingCustodyColumns); + } else { + return Err(AvailabilityCheckError::MissingBlobs); + } + } + } + AvailableBlockData::Blobs(blobs) => { + verify_kzg_for_blob_list(blobs.iter(), &self.kzg) .map_err(AvailabilityCheckError::InvalidBlobs)?; - Ok(MaybeAvailableBlock::Available(AvailableBlock { - block_root, - block, - blob_data: AvailableBlockData::Blobs(blob_list), - blobs_available_timestamp: None, - spec: self.spec.clone(), - })) - } else { - Ok(MaybeAvailableBlock::AvailabilityPending { block_root, block }) - }; - } - if self.data_columns_required_for_block(&block) { - return if let Some(data_column_list) = data_columns.as_ref() { - verify_kzg_for_data_column_list( - data_column_list - .iter() - .map(|custody_column| custody_column.as_data_column()), - &self.kzg, - ) - .map_err(AvailabilityCheckError::InvalidColumn)?; - Ok(MaybeAvailableBlock::Available(AvailableBlock { - block_root, - block, - blob_data: AvailableBlockData::DataColumns( - data_column_list - .into_iter() - .map(|d| d.clone_arc()) - .collect(), - ), - blobs_available_timestamp: None, - spec: self.spec.clone(), - })) - } else { - Ok(MaybeAvailableBlock::AvailabilityPending { block_root, block }) - }; + } + AvailableBlockData::DataColumns(data_columns) => { + verify_kzg_for_data_column_list(data_columns.iter(), &self.kzg) + .map_err(AvailabilityCheckError::InvalidColumn)?; + } } - Ok(MaybeAvailableBlock::Available(AvailableBlock { - block_root, - block, - blob_data: AvailableBlockData::NoData, - blobs_available_timestamp: None, - spec: self.spec.clone(), - })) + Ok(()) } - /// Checks if a vector of blocks are available. Returns a vector of `MaybeAvailableBlock` - /// This is more efficient than calling `verify_kzg_for_rpc_block` in a loop as it does - /// all kzg verification at once + /// Performs batch kzg verification for a vector of `AvailableBlocks`. This is more efficient than + /// calling `verify_kzg_for_available_block` in a loop. /// /// WARNING: This function assumes all required blobs are already present, it does NOT /// check if there are any missing blobs. #[instrument(skip_all)] - pub fn verify_kzg_for_rpc_blocks( + pub fn batch_verify_kzg_for_available_blocks( &self, - blocks: Vec>, - ) -> Result>, AvailabilityCheckError> { - let mut results = Vec::with_capacity(blocks.len()); - let all_blobs = blocks + available_blocks: &Vec>, + ) -> Result<(), AvailabilityCheckError> { + let all_blobs = available_blocks .iter() - .filter(|block| self.blobs_required_for_block(block.as_block())) + .filter(|available_block| { + self.custody_context + .blobs_required_for_block(&available_block.block) + }) // this clone is cheap as it's cloning an Arc - .filter_map(|block| block.blobs().cloned()) + .filter_map(|available_block| available_block.blob_data.blobs()) .flatten() .collect::>(); @@ -452,15 +427,35 @@ impl DataAvailabilityChecker { .map_err(AvailabilityCheckError::InvalidBlobs)?; } - let all_data_columns = blocks + let all_data_columns = available_blocks .iter() - .filter(|block| self.data_columns_required_for_block(block.as_block())) + .filter(|available_block| { + self.custody_context + .data_columns_required_for_block(&available_block.block) + }) // this clone is cheap as it's cloning an Arc - .filter_map(|block| block.custody_columns().cloned()) + .filter_map(|available_block| available_block.blob_data.data_columns()) .flatten() - .map(CustodyDataColumn::into_inner) .collect::>(); + for available_block in available_blocks { + let block_data_required = self + .custody_context + .blobs_required_for_block(&available_block.block) + || self + .custody_context + .data_columns_required_for_block(&available_block.block); + if let AvailableBlockData::NoData = available_block.data() + && block_data_required + { + if available_block.block.fork_name_unchecked().fulu_enabled() { + return Err(AvailabilityCheckError::MissingCustodyColumns); + } else { + return Err(AvailabilityCheckError::MissingBlobs); + } + } + } + // verify kzg for all data columns at once if !all_data_columns.is_empty() { // Attributes fault to the specific peer that sent an invalid column @@ -468,92 +463,7 @@ impl DataAvailabilityChecker { .map_err(AvailabilityCheckError::InvalidColumn)?; } - for block in blocks { - let (block_root, block, blobs, data_columns) = block.deconstruct(); - - let maybe_available_block = if self.blobs_required_for_block(&block) { - if let Some(blobs) = blobs { - MaybeAvailableBlock::Available(AvailableBlock { - block_root, - block, - blob_data: AvailableBlockData::Blobs(blobs), - blobs_available_timestamp: None, - spec: self.spec.clone(), - }) - } else { - MaybeAvailableBlock::AvailabilityPending { block_root, block } - } - } else if self.data_columns_required_for_block(&block) { - if let Some(data_columns) = data_columns { - MaybeAvailableBlock::Available(AvailableBlock { - block_root, - block, - blob_data: AvailableBlockData::DataColumns( - data_columns.into_iter().map(|d| d.into_inner()).collect(), - ), - blobs_available_timestamp: None, - spec: self.spec.clone(), - }) - } else { - MaybeAvailableBlock::AvailabilityPending { block_root, block } - } - } else { - MaybeAvailableBlock::Available(AvailableBlock { - block_root, - block, - blob_data: AvailableBlockData::NoData, - blobs_available_timestamp: None, - spec: self.spec.clone(), - }) - }; - - results.push(maybe_available_block); - } - - Ok(results) - } - - /// Determines the blob requirements for a block. If the block is pre-deneb, no blobs are required. - /// If the epoch is from prior to the data availability boundary, no blobs are required. - pub fn blobs_required_for_epoch(&self, epoch: Epoch) -> bool { - self.da_check_required_for_epoch(epoch) && !self.spec.is_peer_das_enabled_for_epoch(epoch) - } - - /// Determines the data column requirements for an epoch. - /// - If the epoch is pre-peerdas, no data columns are required. - /// - If the epoch is from prior to the data availability boundary, no data columns are required. - pub fn data_columns_required_for_epoch(&self, epoch: Epoch) -> bool { - self.da_check_required_for_epoch(epoch) && self.spec.is_peer_das_enabled_for_epoch(epoch) - } - - /// See `Self::blobs_required_for_epoch` - fn blobs_required_for_block(&self, block: &SignedBeaconBlock) -> bool { - block.num_expected_blobs() > 0 && self.blobs_required_for_epoch(block.epoch()) - } - - /// See `Self::data_columns_required_for_epoch` - fn data_columns_required_for_block(&self, block: &SignedBeaconBlock) -> bool { - block.num_expected_blobs() > 0 && self.data_columns_required_for_epoch(block.epoch()) - } - - /// The epoch at which we require a data availability check in block processing. - /// `None` if the `Deneb` fork is disabled. - pub fn data_availability_boundary(&self) -> Option { - let fork_epoch = self.spec.deneb_fork_epoch?; - - if self.complete_blob_backfill { - Some(fork_epoch) - } else { - let current_epoch = self.slot_clock.now()?.epoch(T::EthSpec::slots_per_epoch()); - self.spec - .min_epoch_data_availability_boundary(current_epoch) - } - } - - /// Returns true if the given epoch lies within the da boundary and false otherwise. - pub fn da_check_required_for_epoch(&self, block_epoch: Epoch) -> bool { - self.data_availability_boundary() - .is_some_and(|da_epoch| block_epoch >= da_epoch) + Ok(()) } /// Returns `true` if the current epoch is greater than or equal to the `Deneb` epoch. @@ -749,7 +659,8 @@ async fn availability_cache_maintenance_service( } } -#[derive(Debug)] +#[derive(Debug, Clone)] +// TODO(#8633) move this to `block_verification_types.rs` pub enum AvailableBlockData { /// Block is pre-Deneb or has zero blobs NoData, @@ -759,31 +670,156 @@ pub enum AvailableBlockData { DataColumns(DataColumnSidecarList), } +impl AvailableBlockData { + pub fn new_with_blobs(blobs: BlobSidecarList) -> Self { + if blobs.is_empty() { + Self::NoData + } else { + Self::Blobs(blobs) + } + } + + pub fn new_with_data_columns(columns: DataColumnSidecarList) -> Self { + if columns.is_empty() { + Self::NoData + } else { + Self::DataColumns(columns) + } + } + + pub fn blobs(&self) -> Option> { + match self { + AvailableBlockData::NoData => None, + AvailableBlockData::Blobs(blobs) => Some(blobs.clone()), + AvailableBlockData::DataColumns(_) => None, + } + } + + pub fn blobs_len(&self) -> usize { + if let Some(blobs) = self.blobs() { + blobs.len() + } else { + 0 + } + } + + pub fn data_columns(&self) -> Option> { + match self { + AvailableBlockData::NoData => None, + AvailableBlockData::Blobs(_) => None, + AvailableBlockData::DataColumns(data_columns) => Some(data_columns.clone()), + } + } + + pub fn data_columns_len(&self) -> usize { + if let Some(data_columns) = self.data_columns() { + data_columns.len() + } else { + 0 + } + } +} + /// A fully available block that is ready to be imported into fork choice. -#[derive(Debug)] +#[derive(Debug, Clone, Educe)] +#[educe(Hash(bound(E: EthSpec)))] pub struct AvailableBlock { block_root: Hash256, block: Arc>, + #[educe(Hash(ignore))] blob_data: AvailableBlockData, + #[educe(Hash(ignore))] /// Timestamp at which this block first became available (UNIX timestamp, time since 1970). blobs_available_timestamp: Option, + #[educe(Hash(ignore))] pub spec: Arc, } impl AvailableBlock { - pub fn __new_for_testing( - block_root: Hash256, - block: Arc>, - data: AvailableBlockData, + /// Constructs an `RpcBlock` from a block and optional data. + /// - If `block_data` is `Some`, constructs `FullyAvailable` variant after validation + /// - If `block_data` is `None`, constructs `BlockOnly` variant (used for lookups) + /// + /// Returns `AvailabilityCheckError` if: + /// - `block_data` contains data not required by the block + /// - Required `block_data` is missing + /// - Blob count doesn't match expected + /// - Custody columns are incomplete + pub fn new( + block: Arc>, + block_data: AvailableBlockData, + custody_context: &CustodyContext, spec: Arc, - ) -> Self { - Self { - block_root, + ) -> Result + where + T: BeaconChainTypes, + { + // Ensure block availability + let blobs_required = custody_context.blobs_required_for_block(&block); + let columns_required = custody_context.data_columns_required_for_block(&block); + + match &block_data { + AvailableBlockData::NoData => { + if columns_required { + return Err(AvailabilityCheckError::MissingCustodyColumns); + } else if blobs_required { + return Err(AvailabilityCheckError::MissingBlobs); + } + } + AvailableBlockData::Blobs(blobs) => { + if !blobs_required { + return Err(AvailabilityCheckError::InvalidAvailableBlockData); + } + + let Ok(block_kzg_commitments) = block.message().body().blob_kzg_commitments() + else { + return Err(AvailabilityCheckError::Unexpected( + "Expected blobs but could not fetch KZG commitments from the block" + .to_owned(), + )); + }; + + if blobs.len() != block_kzg_commitments.len() { + return Err(AvailabilityCheckError::MissingBlobs); + } + + for (blob, &block_kzg_commitment) in blobs.iter().zip(block_kzg_commitments.iter()) + { + if blob.kzg_commitment != block_kzg_commitment { + return Err(AvailabilityCheckError::KzgCommitmentMismatch { + blob_commitment: blob.kzg_commitment, + block_commitment: block_kzg_commitment, + }); + } + } + } + AvailableBlockData::DataColumns(data_columns) => { + if !columns_required { + return Err(AvailabilityCheckError::InvalidAvailableBlockData); + } + + let mut column_indices = custody_context + .custody_columns_for_epoch(Some(block.epoch()), &spec) + .iter() + .collect::>(); + + for data_column in data_columns { + column_indices.remove(&data_column.index); + } + + if !column_indices.is_empty() { + return Err(AvailabilityCheckError::MissingCustodyColumns); + } + } + } + + Ok(Self { + block_root: block.canonical_root(), block, - blob_data: data, + blob_data: block_data, blobs_available_timestamp: None, - spec, - } + spec: spec.clone(), + }) } pub fn block(&self) -> &SignedBeaconBlock { @@ -801,6 +837,10 @@ impl AvailableBlock { &self.blob_data } + pub fn block_root(&self) -> Hash256 { + self.block_root + } + pub fn has_blobs(&self) -> bool { match self.blob_data { AvailableBlockData::NoData => false, @@ -864,7 +904,9 @@ impl MaybeAvailableBlock { mod test { use super::*; use crate::CustodyContext; + use crate::block_verification_types::RpcBlock; use crate::custody_context::NodeCustodyType; + use crate::data_column_verification::CustodyDataColumn; use crate::test_utils::{ EphemeralHarnessType, NumBlobs, generate_data_column_indices_rand_order, generate_rand_block_and_data_columns, get_kzg, @@ -877,7 +919,7 @@ mod test { use std::time::Duration; use store::HotColdDB; use types::data::DataColumn; - use types::{ChainSpec, ColumnIndex, EthSpec, ForkName, MainnetEthSpec, Slot}; + use types::{ChainSpec, ColumnIndex, Epoch, EthSpec, ForkName, MainnetEthSpec, Slot}; type E = MainnetEthSpec; type T = EphemeralHarnessType; @@ -1057,9 +1099,6 @@ mod test { let custody_columns = if index == 0 { // 128 valid data columns in the first block data_columns - .into_iter() - .map(CustodyDataColumn::from_asserted_custody) - .collect::>() } else { // invalid data columns in the second block data_columns @@ -1070,17 +1109,35 @@ mod test { ..d.as_ref().clone() }; CustodyDataColumn::from_asserted_custody(Arc::new(invalid_sidecar)) + .as_data_column() + .clone() }) .collect::>() }; - RpcBlock::new_with_custody_columns(None, Arc::new(block), custody_columns) - .expect("should create RPC block with custody columns") + let block_data = AvailableBlockData::new_with_data_columns(custody_columns); + let custody_context = Arc::new(new_custody_context(spec.clone())); + RpcBlock::new( + Arc::new(block), + Some(block_data), + &custody_context, + spec.clone(), + ) + .expect("should create RPC block with custody columns") + }) + .collect::>(); + + let available_blocks = blocks_with_columns + .iter() + .filter_map(|block| match block { + RpcBlock::FullyAvailable(available_block) => Some(available_block.clone()), + RpcBlock::BlockOnly { .. } => None, }) .collect::>(); // WHEN verifying all blocks together (totalling 256 data columns) - let verification_result = da_checker.verify_kzg_for_rpc_blocks(blocks_with_columns); + let verification_result = + da_checker.batch_verify_kzg_for_available_blocks(&available_blocks); // THEN batch block verification should fail due to 128 invalid columns in the second block verification_result.expect_err("should have failed to verify blocks"); @@ -1185,18 +1242,28 @@ mod test { let ordered_custody_column_indices = generate_data_column_indices_rand_order::(); let custody_context = Arc::new(CustodyContext::new( NodeCustodyType::Fullnode, + slot_clock.clone(), ordered_custody_column_indices, - &spec, + false, + spec.clone(), )); - let complete_blob_backfill = false; - DataAvailabilityChecker::new( - complete_blob_backfill, - slot_clock, - kzg, - store, - custody_context, - spec, + DataAvailabilityChecker::new(slot_clock, kzg, store, custody_context, spec) + .expect("should initialise data availability checker") + } + + fn new_custody_context(spec: Arc) -> CustodyContext { + let slot_clock = TestingSlotClock::new( + Slot::new(0), + Duration::from_secs(0), + Duration::from_secs(spec.seconds_per_slot), + ); + let ordered_custody_column_indices = generate_data_column_indices_rand_order::(); + CustodyContext::new( + NodeCustodyType::Fullnode, + slot_clock.clone(), + ordered_custody_column_indices, + false, + spec.clone(), ) - .expect("should initialise data availability checker") } } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/error.rs b/beacon_node/beacon_chain/src/data_availability_checker/error.rs index c9efb7a4149..af3cb72c034 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/error.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/error.rs @@ -22,6 +22,7 @@ pub enum Error { BlockReplayError(state_processing::BlockReplayError), RebuildingStateCaches(BeaconStateError), SlotClockError, + InvalidAvailableBlockData, } #[derive(PartialEq, Eq)] @@ -44,7 +45,8 @@ impl Error { | Error::ParentStateMissing(_) | Error::BlockReplayError(_) | Error::RebuildingStateCaches(_) - | Error::SlotClockError => ErrorCategory::Internal, + | Error::SlotClockError + | Error::InvalidAvailableBlockData => ErrorCategory::Internal, Error::InvalidBlobs { .. } | Error::InvalidColumn { .. } | Error::ReconstructColumnsError { .. } diff --git a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs index ff098a827db..8a62d0e2e88 100644 --- a/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs +++ b/beacon_node/beacon_chain/src/data_availability_checker/overflow_lru_cache.rs @@ -402,7 +402,7 @@ pub struct DataAvailabilityCheckerInner { /// This cache holds a limited number of states in memory and reconstructs them /// from disk when necessary. This is necessary until we merge tree-states state_cache: StateLRUCache, - custody_context: Arc>, + custody_context: Arc>, spec: Arc, } @@ -419,7 +419,7 @@ impl DataAvailabilityCheckerInner { pub fn new( capacity: NonZeroUsize, beacon_store: BeaconStore, - custody_context: Arc>, + custody_context: Arc>, spec: Arc, ) -> Result { Ok(Self { @@ -833,13 +833,15 @@ mod test { }; use fork_choice::PayloadVerificationStatus; use logging::create_test_tracing_subscriber; + use slot_clock::{SlotClock, TestingSlotClock}; use state_processing::ConsensusContext; use std::collections::VecDeque; + use std::time::Duration; use store::{HotColdDB, ItemStore, StoreConfig, database::interface::BeaconNodeBackend}; use tempfile::{TempDir, tempdir}; use tracing::{debug_span, info}; - use types::new_non_zero_usize; use types::{ExecPayload, MinimalEthSpec}; + use types::{Slot, new_non_zero_usize}; const LOW_VALIDATOR_COUNT: usize = 32; const STATE_LRU_CAPACITY: usize = STATE_LRU_CAPACITY_NON_ZERO.get(); @@ -1012,6 +1014,7 @@ mod test { where E: EthSpec, T: BeaconChainTypes< + SlotClock = TestingSlotClock, HotStore = BeaconNodeBackend, ColdStore = BeaconNodeBackend, EthSpec = E, @@ -1023,10 +1026,19 @@ mod test { let spec = harness.spec.clone(); let test_store = harness.chain.store.clone(); let capacity_non_zero = new_non_zero_usize(capacity); + + let slot_clock = TestingSlotClock::new( + Slot::new(0), + Duration::from_secs(0), + Duration::from_millis(spec.slot_duration_ms), + ); + let custody_context = Arc::new(CustodyContext::new( NodeCustodyType::Fullnode, + slot_clock, generate_data_column_indices_rand_order::(), - &spec, + false, + spec.clone(), )); let cache = Arc::new( DataAvailabilityCheckerInner::::new( diff --git a/beacon_node/beacon_chain/src/historical_blocks.rs b/beacon_node/beacon_chain/src/historical_blocks.rs index 91b0f12cbb3..45ae9d7b844 100644 --- a/beacon_node/beacon_chain/src/historical_blocks.rs +++ b/beacon_node/beacon_chain/src/historical_blocks.rs @@ -157,12 +157,10 @@ impl BeaconChain { } match &block_data { - AvailableBlockData::NoData => {} - AvailableBlockData::Blobs(..) => { - new_oldest_blob_slot = Some(block.slot()); - } + AvailableBlockData::NoData => (), + AvailableBlockData::Blobs(_) => new_oldest_blob_slot = Some(block.slot()), AvailableBlockData::DataColumns(_) => { - new_oldest_data_column_slot = Some(block.slot()); + new_oldest_data_column_slot = Some(block.slot()) } } diff --git a/beacon_node/beacon_chain/src/lib.rs b/beacon_node/beacon_chain/src/lib.rs index f92030a6714..e77739e2d53 100644 --- a/beacon_node/beacon_chain/src/lib.rs +++ b/beacon_node/beacon_chain/src/lib.rs @@ -78,7 +78,7 @@ pub use block_verification::{ BlockError, ExecutionPayloadError, ExecutionPendingBlock, GossipVerifiedBlock, IntoExecutionPendingBlock, IntoGossipVerifiedBlock, InvalidSignature, PayloadVerificationOutcome, PayloadVerificationStatus, build_blob_data_column_sidecars, - get_block_root, + get_block_root, signature_verify_chain_segment, }; pub use block_verification_types::AvailabilityPendingExecutedBlock; pub use block_verification_types::ExecutedBlock; diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index b6c235a4cb0..a9212bb8224 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -1,12 +1,12 @@ use crate::blob_verification::GossipVerifiedBlob; -use crate::block_verification_types::{AsBlock, RpcBlock}; +use crate::block_verification_types::{AsBlock, AvailableBlockData, RpcBlock}; use crate::custody_context::NodeCustodyType; -use crate::data_column_verification::CustodyDataColumn; +use crate::data_availability_checker::DataAvailabilityChecker; use crate::graffiti_calculator::GraffitiSettings; use crate::kzg_utils::build_data_column_sidecars; use crate::observed_operations::ObservationOutcome; pub use crate::persisted_beacon_chain::PersistedBeaconChain; -use crate::{BeaconBlockResponseWrapper, get_block_root}; +use crate::{BeaconBlockResponseWrapper, CustodyContext, get_block_root}; use crate::{ BeaconChain, BeaconChainTypes, BlockError, ChainConfig, ServerSentEventHandler, StateSkipConfig, @@ -213,6 +213,48 @@ pub fn test_spec() -> ChainSpec { spec } +pub fn test_da_checker( + spec: Arc, +) -> DataAvailabilityChecker> { + let slot_clock = TestingSlotClock::new( + Slot::new(0), + Duration::from_secs(0), + Duration::from_secs(spec.seconds_per_slot), + ); + let kzg = get_kzg(&spec); + let store = Arc::new(HotColdDB::open_ephemeral(<_>::default(), spec.clone()).unwrap()); + let ordered_custody_column_indices = generate_data_column_indices_rand_order::(); + let complete_blob_backfill = false; + let custody_context = Arc::new(CustodyContext::new( + NodeCustodyType::Fullnode, + slot_clock.clone(), + ordered_custody_column_indices, + complete_blob_backfill, + spec.clone(), + )); + DataAvailabilityChecker::new(slot_clock, kzg, store, custody_context, spec) + .expect("should initialise data availability checker") +} + +pub fn test_custody_context( + spec: Arc, +) -> CustodyContext> { + let slot_clock = TestingSlotClock::new( + Slot::new(0), + Duration::from_secs(0), + Duration::from_secs(spec.seconds_per_slot), + ); + let ordered_custody_column_indices = generate_data_column_indices_rand_order::(); + let complete_blob_backfill = false; + CustodyContext::new( + NodeCustodyType::Fullnode, + slot_clock.clone(), + ordered_custody_column_indices, + complete_blob_backfill, + spec.clone(), + ) +} + pub struct Builder { eth_spec_instance: T::EthSpec, spec: Option>, @@ -2380,8 +2422,16 @@ where ) -> Result { self.set_current_slot(slot); let (block, blob_items) = block_contents; + // Determine if block is available: it's available if it doesn't require blobs, + // or if it requires blobs and we have them + let has_blob_commitments = block + .message() + .body() + .blob_kzg_commitments() + .is_ok_and(|c| !c.is_empty()); + let is_available = !has_blob_commitments || blob_items.is_some(); - let rpc_block = self.build_rpc_block_from_blobs(block_root, block, blob_items)?; + let rpc_block = self.build_rpc_block_from_blobs(block, blob_items, is_available)?; let block_hash: SignedBeaconBlockHash = self .chain .process_block( @@ -2405,7 +2455,15 @@ where let (block, blob_items) = block_contents; let block_root = block.canonical_root(); - let rpc_block = self.build_rpc_block_from_blobs(block_root, block, blob_items)?; + // Determine if block is available: it's available if it doesn't require blobs, + // or if it requires blobs and we have them + let has_blob_commitments = block + .message() + .body() + .blob_kzg_commitments() + .is_ok_and(|c| !c.is_empty()); + let is_available = !has_blob_commitments || blob_items.is_some(); + let rpc_block = self.build_rpc_block_from_blobs(block, blob_items, is_available)?; let block_hash: SignedBeaconBlockHash = self .chain .process_block( @@ -2436,29 +2494,51 @@ where .blob_kzg_commitments() .is_ok_and(|c| !c.is_empty()); if !has_blobs { - return RpcBlock::new_without_blobs(Some(block_root), block); + return RpcBlock::new( + block, + Some(AvailableBlockData::NoData), + self.chain.data_availability_checker.custody_context(), + self.chain.spec.clone(), + ) + .unwrap(); } // Blobs are stored as data columns from Fulu (PeerDAS) if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) { let columns = self.chain.get_data_columns(&block_root).unwrap().unwrap(); - let custody_columns = columns - .into_iter() - .map(CustodyDataColumn::from_asserted_custody) - .collect::>(); - RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns).unwrap() + let custody_columns = columns.into_iter().collect::>(); + let block_data = AvailableBlockData::new_with_data_columns(custody_columns); + RpcBlock::new( + block, + Some(block_data), + self.chain.data_availability_checker.custody_context(), + self.chain.spec.clone(), + ) + .unwrap() } else { let blobs = self.chain.get_blobs(&block_root).unwrap().blobs(); - RpcBlock::new(Some(block_root), block, blobs).unwrap() + let block_data = if let Some(blobs) = blobs { + AvailableBlockData::new_with_blobs(blobs) + } else { + AvailableBlockData::NoData + }; + + RpcBlock::new( + block, + Some(block_data), + self.chain.data_availability_checker.custody_context(), + self.chain.spec.clone(), + ) + .unwrap() } } /// Builds an `RpcBlock` from a `SignedBeaconBlock` and `BlobsList`. pub fn build_rpc_block_from_blobs( &self, - block_root: Hash256, block: Arc>>, blob_items: Option<(KzgProofs, BlobsList)>, + is_available: bool, ) -> Result, BlockError> { Ok(if self.spec.is_peer_das_enabled_for_epoch(block.epoch()) { let epoch = block.slot().epoch(E::slots_per_epoch()); @@ -2471,11 +2551,37 @@ where let columns = generate_data_column_sidecars_from_block(&block, &self.spec) .into_iter() .filter(|d| sampling_columns.contains(&d.index)) - .map(CustodyDataColumn::from_asserted_custody) .collect::>(); - RpcBlock::new_with_custody_columns(Some(block_root), block, columns)? + if is_available { + let block_data = AvailableBlockData::new_with_data_columns(columns); + RpcBlock::new( + block, + Some(block_data), + self.chain.data_availability_checker.custody_context(), + self.chain.spec.clone(), + )? + } else { + RpcBlock::new( + block, + None, + self.chain.data_availability_checker.custody_context(), + self.chain.spec.clone(), + )? + } + } else if is_available { + RpcBlock::new( + block, + Some(AvailableBlockData::NoData), + self.chain.data_availability_checker.custody_context(), + self.chain.spec.clone(), + )? } else { - RpcBlock::new_without_blobs(Some(block_root), block) + RpcBlock::new( + block, + None, + self.chain.data_availability_checker.custody_context(), + self.chain.spec.clone(), + )? } } else { let blobs = blob_items @@ -2484,7 +2590,27 @@ where }) .transpose() .unwrap(); - RpcBlock::new(Some(block_root), block, blobs)? + if is_available { + let block_data = if let Some(blobs) = blobs { + AvailableBlockData::new_with_blobs(blobs) + } else { + AvailableBlockData::NoData + }; + + RpcBlock::new( + block, + Some(block_data), + self.chain.data_availability_checker.custody_context(), + self.chain.spec.clone(), + )? + } else { + RpcBlock::new( + block, + None, + self.chain.data_availability_checker.custody_context(), + self.chain.spec.clone(), + )? + } }) } diff --git a/beacon_node/beacon_chain/tests/attestation_production.rs b/beacon_node/beacon_chain/tests/attestation_production.rs index a57c20211aa..e1c5849bddb 100644 --- a/beacon_node/beacon_chain/tests/attestation_production.rs +++ b/beacon_node/beacon_chain/tests/attestation_production.rs @@ -1,6 +1,7 @@ #![cfg(not(debug_assertions))] use beacon_chain::attestation_simulator::produce_unaggregated_attestation; +use beacon_chain::block_verification_types::RpcBlock; use beacon_chain::test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy}; use beacon_chain::validator_monitor::UNAGGREGATED_ATTESTATION_LAG_SLOTS; use beacon_chain::{StateSkipConfig, WhenSlotSkipped, metrics}; @@ -221,14 +222,16 @@ async fn produces_attestations() { let rpc_block = harness.build_rpc_block_from_store_blobs(Some(block_root), Arc::new(block.clone())); - let beacon_chain::data_availability_checker::MaybeAvailableBlock::Available( - available_block, - ) = chain - .data_availability_checker - .verify_kzg_for_rpc_block(rpc_block) - .unwrap() - else { - panic!("block should be available") + + let available_block = match rpc_block { + RpcBlock::FullyAvailable(available_block) => { + chain + .data_availability_checker + .verify_kzg_for_available_block(&available_block) + .unwrap(); + available_block + } + RpcBlock::BlockOnly { .. } => panic!("block should be available"), }; let early_attestation = { @@ -288,14 +291,17 @@ async fn early_attester_cache_old_request() { let rpc_block = harness .build_rpc_block_from_store_blobs(Some(head.beacon_block_root), head.beacon_block.clone()); - let beacon_chain::data_availability_checker::MaybeAvailableBlock::Available(available_block) = - harness - .chain - .data_availability_checker - .verify_kzg_for_rpc_block(rpc_block) - .unwrap() - else { - panic!("block should be available") + + let available_block = match rpc_block { + RpcBlock::FullyAvailable(available_block) => { + harness + .chain + .data_availability_checker + .verify_kzg_for_available_block(&available_block) + .unwrap(); + available_block + } + RpcBlock::BlockOnly { .. } => panic!("block should be available"), }; harness diff --git a/beacon_node/beacon_chain/tests/blob_verification.rs b/beacon_node/beacon_chain/tests/blob_verification.rs index 019736ca01b..e39c53729fb 100644 --- a/beacon_node/beacon_chain/tests/blob_verification.rs +++ b/beacon_node/beacon_chain/tests/blob_verification.rs @@ -77,7 +77,7 @@ async fn rpc_blobs_with_invalid_header_signature() { // Process the block without blobs so that it doesn't become available. harness.advance_slot(); let rpc_block = harness - .build_rpc_block_from_blobs(block_root, signed_block.clone(), None) + .build_rpc_block_from_blobs(signed_block.clone(), None, false) .unwrap(); let availability = harness .chain @@ -85,11 +85,12 @@ async fn rpc_blobs_with_invalid_header_signature() { block_root, rpc_block, NotifyExecutionLayer::Yes, - BlockImportSource::RangeSync, + BlockImportSource::Lookup, || Ok(()), ) .await .unwrap(); + assert_eq!( availability, AvailabilityProcessingStatus::MissingComponents(slot, block_root) @@ -114,6 +115,8 @@ async fn rpc_blobs_with_invalid_header_signature() { .process_rpc_blobs(slot, block_root, blob_sidecars) .await .unwrap_err(); + + println!("{:?}", err); assert!(matches!( err, BlockError::InvalidSignature(InvalidSignature::ProposerSignature) diff --git a/beacon_node/beacon_chain/tests/block_verification.rs b/beacon_node/beacon_chain/tests/block_verification.rs index 2644b74b28e..f27e5069af6 100644 --- a/beacon_node/beacon_chain/tests/block_verification.rs +++ b/beacon_node/beacon_chain/tests/block_verification.rs @@ -1,9 +1,11 @@ #![cfg(not(debug_assertions))] use beacon_chain::block_verification_types::{AsBlock, ExecutedBlock, RpcBlock}; +use beacon_chain::data_availability_checker::{AvailabilityCheckError, AvailableBlockData}; use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes, ExecutionPendingBlock, + WhenSlotSkipped, custody_context::NodeCustodyType, test_utils::{ AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, test_spec, @@ -11,7 +13,7 @@ use beacon_chain::{ }; use beacon_chain::{ BeaconSnapshot, BlockError, ChainConfig, ChainSegmentResult, IntoExecutionPendingBlock, - InvalidSignature, NotifyExecutionLayer, + InvalidSignature, NotifyExecutionLayer, signature_verify_chain_segment, }; use bls::{AggregateSignature, Keypair, Signature}; use fixed_bytes::FixedBytesExtended; @@ -39,6 +41,7 @@ const BLOCK_INDICES: &[usize] = &[0, 1, 32, 64, 68 + 1, 129, CHAIN_SEGMENT_LENGT static KEYPAIRS: LazyLock> = LazyLock::new(|| types::test_utils::generate_deterministic_keypairs(VALIDATOR_COUNT)); +// TODO(#8633): Delete this unnecessary enum and refactor this file to use `AvailableBlockData` instead. enum DataSidecars { Blobs(BlobSidecarList), DataColumns(Vec>), @@ -128,32 +131,65 @@ fn get_harness( harness } -fn chain_segment_blocks( +fn chain_segment_blocks( chain_segment: &[BeaconSnapshot], chain_segment_sidecars: &[Option>], -) -> Vec> { + chain: Arc>, +) -> Vec> +where + T: BeaconChainTypes, +{ chain_segment .iter() .zip(chain_segment_sidecars.iter()) .map(|(snapshot, data_sidecars)| { let block = snapshot.beacon_block.clone(); - build_rpc_block(block, data_sidecars) + build_rpc_block(block, data_sidecars, chain.clone()) }) .collect() } -fn build_rpc_block( +fn build_rpc_block( block: Arc>, data_sidecars: &Option>, -) -> RpcBlock { + chain: Arc>, +) -> RpcBlock +where + T: BeaconChainTypes, +{ match data_sidecars { Some(DataSidecars::Blobs(blobs)) => { - RpcBlock::new(None, block, Some(blobs.clone())).unwrap() + let block_data = AvailableBlockData::new_with_blobs(blobs.clone()); + RpcBlock::new( + block, + Some(block_data), + chain.data_availability_checker.custody_context(), + chain.spec.clone(), + ) + .unwrap() } Some(DataSidecars::DataColumns(columns)) => { - RpcBlock::new_with_custody_columns(None, block, columns.clone()).unwrap() + let block_data = AvailableBlockData::new_with_data_columns( + columns + .iter() + .map(|c| c.as_data_column().clone()) + .collect::>(), + ); + RpcBlock::new( + block, + Some(block_data), + chain.data_availability_checker.custody_context(), + chain.spec.clone(), + ) + .unwrap() } - None => RpcBlock::new_without_blobs(None, block), + None => RpcBlock::new( + block, + Some(AvailableBlockData::NoData), + chain.data_availability_checker.custody_context(), + chain.spec.clone(), + ) + .unwrap(), } } @@ -264,9 +300,10 @@ fn update_data_column_signed_header( async fn chain_segment_full_segment() { let harness = get_harness(VALIDATOR_COUNT, NodeCustodyType::Fullnode); let (chain_segment, chain_segment_blobs) = get_chain_segment().await; - let blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .collect(); + let blocks: Vec> = + chain_segment_blocks(&chain_segment, &chain_segment_blobs, harness.chain.clone()) + .into_iter() + .collect(); harness .chain @@ -300,9 +337,11 @@ async fn chain_segment_full_segment() { #[tokio::test] async fn chain_segment_varying_chunk_size() { let (chain_segment, chain_segment_blobs) = get_chain_segment().await; - let blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .collect(); + let harness = get_harness(VALIDATOR_COUNT, NodeCustodyType::Fullnode); + let blocks: Vec> = + chain_segment_blocks(&chain_segment, &chain_segment_blobs, harness.chain.clone()) + .into_iter() + .collect(); for chunk_size in &[1, 2, 31, 32, 33] { let harness = get_harness(VALIDATOR_COUNT, NodeCustodyType::Fullnode); @@ -344,9 +383,10 @@ async fn chain_segment_non_linear_parent_roots() { /* * Test with a block removed. */ - let mut blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .collect(); + let mut blocks: Vec> = + chain_segment_blocks(&chain_segment, &chain_segment_blobs, harness.chain.clone()) + .into_iter() + .collect(); blocks.remove(2); assert!( @@ -364,16 +404,21 @@ async fn chain_segment_non_linear_parent_roots() { /* * Test with a modified parent root. */ - let mut blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .collect(); + let mut blocks: Vec> = + chain_segment_blocks(&chain_segment, &chain_segment_blobs, harness.chain.clone()) + .into_iter() + .collect(); let (mut block, signature) = blocks[3].as_block().clone().deconstruct(); *block.parent_root_mut() = Hash256::zero(); - blocks[3] = RpcBlock::new_without_blobs( - None, + + blocks[3] = RpcBlock::new( Arc::new(SignedBeaconBlock::from_block(block, signature)), - ); + blocks[3].block_data().cloned(), + harness.chain.data_availability_checker.custody_context(), + harness.spec.clone(), + ) + .unwrap(); assert!( matches!( @@ -401,15 +446,19 @@ async fn chain_segment_non_linear_slots() { * Test where a child is lower than the parent. */ - let mut blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .collect(); + let mut blocks: Vec> = + chain_segment_blocks(&chain_segment, &chain_segment_blobs, harness.chain.clone()) + .into_iter() + .collect(); let (mut block, signature) = blocks[3].as_block().clone().deconstruct(); *block.slot_mut() = Slot::new(0); - blocks[3] = RpcBlock::new_without_blobs( - None, + blocks[3] = RpcBlock::new( Arc::new(SignedBeaconBlock::from_block(block, signature)), - ); + blocks[3].block_data().cloned(), + harness.chain.data_availability_checker.custody_context(), + harness.spec.clone(), + ) + .unwrap(); assert!( matches!( @@ -427,15 +476,19 @@ async fn chain_segment_non_linear_slots() { * Test where a child is equal to the parent. */ - let mut blocks: Vec> = chain_segment_blocks(&chain_segment, &chain_segment_blobs) - .into_iter() - .collect(); + let mut blocks: Vec> = + chain_segment_blocks(&chain_segment, &chain_segment_blobs, harness.chain.clone()) + .into_iter() + .collect(); let (mut block, signature) = blocks[3].as_block().clone().deconstruct(); *block.slot_mut() = blocks[2].slot(); - blocks[3] = RpcBlock::new_without_blobs( - None, + blocks[3] = RpcBlock::new( Arc::new(SignedBeaconBlock::from_block(block, signature)), - ); + blocks[3].block_data().cloned(), + harness.chain.data_availability_checker.custody_context(), + harness.chain.spec.clone(), + ) + .unwrap(); assert!( matches!( @@ -461,7 +514,9 @@ async fn assert_invalid_signature( let blocks: Vec> = snapshots .iter() .zip(chain_segment_blobs.iter()) - .map(|(snapshot, blobs)| build_rpc_block(snapshot.beacon_block.clone(), blobs)) + .map(|(snapshot, blobs)| { + build_rpc_block(snapshot.beacon_block.clone(), blobs, harness.chain.clone()) + }) .collect(); // Ensure the block will be rejected if imported in a chain segment. @@ -486,7 +541,9 @@ async fn assert_invalid_signature( .iter() .take(block_index) .zip(chain_segment_blobs.iter()) - .map(|(snapshot, blobs)| build_rpc_block(snapshot.beacon_block.clone(), blobs)) + .map(|(snapshot, blobs)| { + build_rpc_block(snapshot.beacon_block.clone(), blobs, harness.chain.clone()) + }) .collect(); // We don't care if this fails, we just call this to ensure that all prior blocks have been // imported prior to this test. @@ -503,6 +560,7 @@ async fn assert_invalid_signature( build_rpc_block( snapshots[block_index].beacon_block.clone(), &chain_segment_blobs[block_index], + harness.chain.clone(), ), NotifyExecutionLayer::Yes, BlockImportSource::Lookup, @@ -560,7 +618,9 @@ async fn invalid_signature_gossip_block() { .iter() .take(block_index) .zip(chain_segment_blobs.iter()) - .map(|(snapshot, blobs)| build_rpc_block(snapshot.beacon_block.clone(), blobs)) + .map(|(snapshot, blobs)| { + build_rpc_block(snapshot.beacon_block.clone(), blobs, harness.chain.clone()) + }) .collect(); harness .chain @@ -569,7 +629,13 @@ async fn invalid_signature_gossip_block() { .into_block_error() .expect("should import all blocks prior to the one being tested"); let signed_block = SignedBeaconBlock::from_block(block, junk_signature()); - let rpc_block = RpcBlock::new_without_blobs(None, Arc::new(signed_block)); + let rpc_block = RpcBlock::new( + Arc::new(signed_block), + None, + harness.chain.data_availability_checker.custody_context(), + harness.spec.clone(), + ) + .unwrap(); let process_res = harness .chain .process_block( @@ -611,7 +677,9 @@ async fn invalid_signature_block_proposal() { let blocks: Vec> = snapshots .iter() .zip(chain_segment_blobs.iter()) - .map(|(snapshot, blobs)| build_rpc_block(snapshot.beacon_block.clone(), blobs)) + .map(|(snapshot, blobs)| { + build_rpc_block(snapshot.beacon_block.clone(), blobs, harness.chain.clone()) + }) .collect::>(); // Ensure the block will be rejected if imported in a chain segment. let process_res = harness @@ -928,7 +996,9 @@ async fn invalid_signature_deposit() { let blocks: Vec> = snapshots .iter() .zip(chain_segment_blobs.iter()) - .map(|(snapshot, blobs)| build_rpc_block(snapshot.beacon_block.clone(), blobs)) + .map(|(snapshot, blobs)| { + build_rpc_block(snapshot.beacon_block.clone(), blobs, harness.chain.clone()) + }) .collect(); assert!( !matches!( @@ -1570,7 +1640,13 @@ async fn add_base_block_to_altair_chain() { )); // Ensure that it would be impossible to import via `BeaconChain::process_block`. - let base_rpc_block = RpcBlock::new_without_blobs(None, Arc::new(base_block.clone())); + let base_rpc_block = RpcBlock::new( + Arc::new(base_block.clone()), + None, + harness.chain.data_availability_checker.custody_context(), + harness.spec.clone(), + ) + .unwrap(); assert!(matches!( harness .chain @@ -1594,7 +1670,15 @@ async fn add_base_block_to_altair_chain() { harness .chain .process_chain_segment( - vec![RpcBlock::new_without_blobs(None, Arc::new(base_block))], + vec![ + RpcBlock::new( + Arc::new(base_block), + None, + harness.chain.data_availability_checker.custody_context(), + harness.spec.clone() + ) + .unwrap() + ], NotifyExecutionLayer::Yes, ) .await, @@ -1707,7 +1791,13 @@ async fn add_altair_block_to_base_chain() { )); // Ensure that it would be impossible to import via `BeaconChain::process_block`. - let altair_rpc_block = RpcBlock::new_without_blobs(None, Arc::new(altair_block.clone())); + let altair_rpc_block = RpcBlock::new( + Arc::new(altair_block.clone()), + None, + harness.chain.data_availability_checker.custody_context(), + harness.spec.clone(), + ) + .unwrap(); assert!(matches!( harness .chain @@ -1731,7 +1821,15 @@ async fn add_altair_block_to_base_chain() { harness .chain .process_chain_segment( - vec![RpcBlock::new_without_blobs(None, Arc::new(altair_block))], + vec![ + RpcBlock::new( + Arc::new(altair_block), + None, + harness.chain.data_availability_checker.custody_context(), + harness.spec.clone() + ) + .unwrap() + ], NotifyExecutionLayer::Yes ) .await, @@ -1794,7 +1892,13 @@ async fn import_duplicate_block_unrealized_justification() { // Create two verified variants of the block, representing the same block being processed in // parallel. let notify_execution_layer = NotifyExecutionLayer::Yes; - let rpc_block = RpcBlock::new_without_blobs(Some(block_root), block.clone()); + let rpc_block = RpcBlock::new( + block.clone(), + Some(AvailableBlockData::NoData), + harness.chain.data_availability_checker.custody_context(), + harness.spec.clone(), + ) + .unwrap(); let verified_block1 = rpc_block .clone() .into_execution_pending_block(block_root, chain, notify_execution_layer) @@ -1868,3 +1972,271 @@ async fn import_execution_pending_block( } } } + +// Test that `signature_verify_chain_segment` errors with a chain segment of mixed `FullyAvailable` +// and `BlockOnly` RpcBlocks. This situation should never happen in production. +#[tokio::test] +async fn signature_verify_mixed_rpc_block_variants() { + let (snapshots, data_sidecars) = get_chain_segment().await; + let snapshots: Vec<_> = snapshots.into_iter().take(10).collect(); + let data_sidecars: Vec<_> = data_sidecars.into_iter().take(10).collect(); + + let harness = get_harness(VALIDATOR_COUNT, NodeCustodyType::Fullnode); + + let mut chain_segment = Vec::new(); + + for (i, (snapshot, blobs)) in snapshots.iter().zip(data_sidecars.iter()).enumerate() { + let block = snapshot.beacon_block.clone(); + let block_root = snapshot.beacon_block_root; + + // Alternate between FullyAvailable and BlockOnly + let rpc_block = if i % 2 == 0 { + // FullyAvailable - with blobs/columns if needed + build_rpc_block(block, blobs, harness.chain.clone()) + } else { + // BlockOnly - no data + RpcBlock::new( + block, + None, + harness.chain.data_availability_checker.custody_context(), + harness.chain.spec.clone(), + ) + .unwrap() + }; + + chain_segment.push((block_root, rpc_block)); + } + + // This should error because `signature_verify_chain_segment` expects a list + // of `RpcBlock::FullyAvailable`. + assert!(signature_verify_chain_segment(chain_segment.clone(), &harness.chain).is_err()); +} + +// Test that RpcBlock::new() rejects blocks when blob count doesn't match expected. +#[tokio::test] +async fn rpc_block_construction_fails_with_wrong_blob_count() { + let spec = test_spec::(); + + if !spec.fork_name_at_slot::(Slot::new(0)).deneb_enabled() + || spec.fork_name_at_slot::(Slot::new(0)).fulu_enabled() + { + return; + } + + let harness = BeaconChainHarness::builder(MainnetEthSpec) + .spec(spec.into()) + .keypairs(KEYPAIRS[0..VALIDATOR_COUNT].to_vec()) + .node_custody_type(NodeCustodyType::Fullnode) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + harness.advance_slot(); + + harness + .extend_chain( + E::slots_per_epoch() as usize * 2, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Get a block with blobs + for slot in 1..=5 { + let root = harness + .chain + .block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + let block = harness.chain.get_block(&root).await.unwrap().unwrap(); + + if let Ok(commitments) = block.message().body().blob_kzg_commitments() + && !commitments.is_empty() + { + let blobs = harness.chain.get_blobs(&root).unwrap().blobs().unwrap(); + + // Create AvailableBlockData with wrong number of blobs (remove one) + let mut wrong_blobs_vec: Vec<_> = blobs.iter().cloned().collect(); + wrong_blobs_vec.pop(); + + let max_blobs = harness.spec.max_blobs_per_block(block.epoch()) as usize; + let wrong_blobs = ssz_types::RuntimeVariableList::new(wrong_blobs_vec, max_blobs) + .expect("should create BlobSidecarList"); + let block_data = AvailableBlockData::new_with_blobs(wrong_blobs); + + // Try to create RpcBlock with wrong blob count + let result = RpcBlock::new( + Arc::new(block), + Some(block_data), + harness.chain.data_availability_checker.custody_context(), + harness.chain.spec.clone(), + ); + + // Should fail with MissingBlobs + assert!( + matches!(result, Err(AvailabilityCheckError::MissingBlobs)), + "RpcBlock construction should fail with wrong blob count, got: {:?}", + result + ); + return; + } + } + + panic!("No block with blobs found"); +} + +// Test that RpcBlock::new() rejects blocks when custody columns are incomplete. +#[tokio::test] +async fn rpc_block_rejects_missing_custody_columns() { + let spec = test_spec::(); + + if !spec.fork_name_at_slot::(Slot::new(0)).fulu_enabled() { + return; + } + + let harness = BeaconChainHarness::builder(MainnetEthSpec) + .spec(spec.into()) + .keypairs(KEYPAIRS[0..VALIDATOR_COUNT].to_vec()) + .node_custody_type(NodeCustodyType::Fullnode) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + harness.advance_slot(); + + // Extend chain to create some blocks with data columns + harness + .extend_chain( + 5, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Get a block with data columns + for slot in 1..=5 { + let root = harness + .chain + .block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + let block = harness.chain.get_block(&root).await.unwrap().unwrap(); + + if let Ok(commitments) = block.message().body().blob_kzg_commitments() + && !commitments.is_empty() + { + let columns = harness.chain.get_data_columns(&root).unwrap().unwrap(); + + if columns.len() > 1 { + // Create AvailableBlockData with incomplete columns (remove one) + let mut incomplete_columns: Vec<_> = columns.to_vec(); + incomplete_columns.pop(); + + let block_data = AvailableBlockData::new_with_data_columns(incomplete_columns); + + // Try to create RpcBlock with incomplete custody columns + let result = RpcBlock::new( + Arc::new(block), + Some(block_data), + harness.chain.data_availability_checker.custody_context(), + harness.chain.spec.clone(), + ); + + // Should fail with MissingCustodyColumns + assert!( + matches!(result, Err(AvailabilityCheckError::MissingCustodyColumns)), + "RpcBlock construction should fail with missing custody columns, got: {:?}", + result + ); + return; + } + } + } + + panic!("No block with data columns found"); +} + +// Test that RpcBlock::new() allows construction past the data availability boundary. +// When a block is past the DA boundary, we should be able to construct an RpcBlock +// with NoData even if the block has blob commitments, since columns are not expected. +#[tokio::test] +async fn rpc_block_allows_construction_past_da_boundary() { + let spec = test_spec::(); + + if !spec.fork_name_at_slot::(Slot::new(0)).fulu_enabled() { + return; + } + + let harness = BeaconChainHarness::builder(MainnetEthSpec) + .spec(spec.into()) + .keypairs(KEYPAIRS[0..VALIDATOR_COUNT].to_vec()) + .node_custody_type(NodeCustodyType::Fullnode) + .fresh_ephemeral_store() + .mock_execution_layer() + .build(); + + harness.advance_slot(); + + // Extend chain to create some blocks with blob commitments + harness + .extend_chain( + 5, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Find a block with blob commitments + for slot in 1..=5 { + let root = harness + .chain + .block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None) + .unwrap() + .unwrap(); + let block = harness.chain.get_block(&root).await.unwrap().unwrap(); + + if let Ok(commitments) = block.message().body().blob_kzg_commitments() + && !commitments.is_empty() + { + let block_epoch = block.epoch(); + + // Advance the slot clock far into the future, past the DA boundary + // For a block to be past the DA boundary: + // current_epoch - min_epochs_for_data_column_sidecars_requests > block_epoch + let min_epochs_for_data = harness.spec.min_epochs_for_data_column_sidecars_requests; + let future_epoch = block_epoch + min_epochs_for_data + 10; + let future_slot = future_epoch.start_slot(E::slots_per_epoch()); + harness.chain.slot_clock.set_slot(future_slot.as_u64()); + + // Now verify the block is past the DA boundary + let da_boundary = harness + .chain + .data_availability_boundary() + .expect("DA boundary should be set"); + assert!( + block_epoch < da_boundary, + "Block should be past the DA boundary. Block epoch: {}, DA boundary: {}", + block_epoch, + da_boundary + ); + + // Try to create RpcBlock with NoData for a block past DA boundary + // This should succeed since columns are not expected for blocks past DA boundary + let result = RpcBlock::new( + Arc::new(block), + Some(AvailableBlockData::NoData), + harness.chain.data_availability_checker.custody_context(), + harness.chain.spec.clone(), + ); + + assert!( + result.is_ok(), + "RpcBlock construction should succeed for blocks past DA boundary, got: {:?}", + result + ); + return; + } + } + + panic!("No block with blob commitments found"); +} diff --git a/beacon_node/beacon_chain/tests/column_verification.rs b/beacon_node/beacon_chain/tests/column_verification.rs index be9b3b2fa12..ffbc4604657 100644 --- a/beacon_node/beacon_chain/tests/column_verification.rs +++ b/beacon_node/beacon_chain/tests/column_verification.rs @@ -81,7 +81,7 @@ async fn rpc_columns_with_invalid_header_signature() { // Process the block without blobs so that it doesn't become available. harness.advance_slot(); let rpc_block = harness - .build_rpc_block_from_blobs(block_root, signed_block.clone(), None) + .build_rpc_block_from_blobs(signed_block.clone(), None, false) .unwrap(); let availability = harness .chain diff --git a/beacon_node/beacon_chain/tests/payload_invalidation.rs b/beacon_node/beacon_chain/tests/payload_invalidation.rs index 5bd43835e33..2c35b7f5eec 100644 --- a/beacon_node/beacon_chain/tests/payload_invalidation.rs +++ b/beacon_node/beacon_chain/tests/payload_invalidation.rs @@ -685,7 +685,16 @@ async fn invalidates_all_descendants() { assert_eq!(fork_parent_state.slot(), fork_parent_slot); let ((fork_block, _), _fork_post_state) = rig.harness.make_block(fork_parent_state, fork_slot).await; - let fork_rpc_block = RpcBlock::new_without_blobs(None, fork_block.clone()); + let fork_rpc_block = RpcBlock::new( + fork_block.clone(), + None, + rig.harness + .chain + .data_availability_checker + .custody_context(), + rig.harness.chain.spec.clone(), + ) + .unwrap(); let fork_block_root = rig .harness .chain @@ -787,7 +796,16 @@ async fn switches_heads() { let ((fork_block, _), _fork_post_state) = rig.harness.make_block(fork_parent_state, fork_slot).await; let fork_parent_root = fork_block.parent_root(); - let fork_rpc_block = RpcBlock::new_without_blobs(None, fork_block.clone()); + let fork_rpc_block = RpcBlock::new( + fork_block.clone(), + None, + rig.harness + .chain + .data_availability_checker + .custody_context(), + rig.harness.chain.spec.clone(), + ) + .unwrap(); let fork_block_root = rig .harness .chain @@ -1059,7 +1077,16 @@ async fn invalid_parent() { )); // Ensure the block built atop an invalid payload is invalid for import. - let rpc_block = RpcBlock::new_without_blobs(None, block.clone()); + let rpc_block = RpcBlock::new( + block.clone(), + None, + rig.harness + .chain + .data_availability_checker + .custody_context(), + rig.harness.chain.spec.clone(), + ) + .unwrap(); assert!(matches!( rig.harness.chain.process_block(rpc_block.block_root(), rpc_block, NotifyExecutionLayer::Yes, BlockImportSource::Lookup, || Ok(()), @@ -1384,7 +1411,16 @@ async fn recover_from_invalid_head_by_importing_blocks() { } = InvalidHeadSetup::new().await; // Import the fork block, it should become the head. - let fork_rpc_block = RpcBlock::new_without_blobs(None, fork_block.clone()); + let fork_rpc_block = RpcBlock::new( + fork_block.clone(), + None, + rig.harness + .chain + .data_availability_checker + .custody_context(), + rig.harness.chain.spec.clone(), + ) + .unwrap(); rig.harness .chain .process_block( diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 69e1346cfd6..9c5f9c20cca 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -21,7 +21,6 @@ use beacon_chain::{ compute_proposer_duties_from_head, ensure_state_can_determine_proposers_for_epoch, }, custody_context::NodeCustodyType, - data_availability_checker::MaybeAvailableBlock, historical_blocks::HistoricalBlockError, migrate::MigratorConfig, }; @@ -3176,16 +3175,19 @@ async fn weak_subjectivity_sync_test( .expect("should get block") .expect("should get block"); - if let MaybeAvailableBlock::Available(block) = harness - .chain - .data_availability_checker - .verify_kzg_for_rpc_block( + let rpc_block = + harness.build_rpc_block_from_store_blobs(Some(block_root), Arc::new(full_block)); + + match rpc_block { + RpcBlock::FullyAvailable(available_block) => { harness - .build_rpc_block_from_store_blobs(Some(block_root), Arc::new(full_block)), - ) - .expect("should verify kzg") - { - available_blocks.push(block); + .chain + .data_availability_checker + .verify_kzg_for_available_block(&available_block) + .expect("should verify kzg"); + available_blocks.push(available_block); + } + RpcBlock::BlockOnly { .. } => panic!("Should be an available block"), } } @@ -3194,15 +3196,16 @@ async fn weak_subjectivity_sync_test( let mut batch_with_invalid_first_block = available_blocks.iter().map(clone_block).collect::>(); batch_with_invalid_first_block[0] = { - let (block_root, block, data) = clone_block(&available_blocks[0]).deconstruct(); + let (_, block, data) = clone_block(&available_blocks[0]).deconstruct(); let mut corrupt_block = (*block).clone(); *corrupt_block.signature_mut() = Signature::empty(); - AvailableBlock::__new_for_testing( - block_root, + AvailableBlock::new( Arc::new(corrupt_block), data, + beacon_chain.data_availability_checker.custody_context(), Arc::new(spec), ) + .expect("available block") }; // Importing the invalid batch should error. @@ -3703,7 +3706,13 @@ async fn process_blocks_and_attestations_for_unaligned_checkpoint() { assert_eq!(split.block_root, valid_fork_block.parent_root()); assert_ne!(split.state_root, unadvanced_split_state_root); - let invalid_fork_rpc_block = RpcBlock::new_without_blobs(None, invalid_fork_block.clone()); + let invalid_fork_rpc_block = RpcBlock::new( + invalid_fork_block.clone(), + None, + harness.chain.data_availability_checker.custody_context(), + harness.spec.clone(), + ) + .unwrap(); // Applying the invalid block should fail. let err = harness .chain @@ -3719,7 +3728,13 @@ async fn process_blocks_and_attestations_for_unaligned_checkpoint() { assert!(matches!(err, BlockError::WouldRevertFinalizedSlot { .. })); // Applying the valid block should succeed, but it should not become head. - let valid_fork_rpc_block = RpcBlock::new_without_blobs(None, valid_fork_block.clone()); + let valid_fork_rpc_block = RpcBlock::new( + valid_fork_block.clone(), + None, + harness.chain.data_availability_checker.custody_context(), + harness.spec.clone(), + ) + .unwrap(); harness .chain .process_block( diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 99a00f5f97b..c5e8b8f1dfd 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -314,9 +314,19 @@ pub async fn publish_block>( slot = %block.slot(), "Block previously seen" ); + let Ok(rpc_block) = RpcBlock::new( + block.clone(), + None, + chain.data_availability_checker.custody_context(), + chain.spec.clone(), + ) else { + return Err(warp_utils::reject::custom_bad_request( + "Unable to construct rpc block".to_string(), + )); + }; let import_result = Box::pin(chain.process_block( block_root, - RpcBlock::new_without_blobs(Some(block_root), block.clone()), + rpc_block, NotifyExecutionLayer::Yes, BlockImportSource::HttpApi, publish_fn, diff --git a/beacon_node/network/src/network_beacon_processor/sync_methods.rs b/beacon_node/network/src/network_beacon_processor/sync_methods.rs index 6ba8bd4d3e0..4cd92e1e83e 100644 --- a/beacon_node/network/src/network_beacon_processor/sync_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/sync_methods.rs @@ -8,7 +8,6 @@ use crate::sync::{ }; use beacon_chain::block_verification_types::{AsBlock, RpcBlock}; use beacon_chain::data_availability_checker::AvailabilityCheckError; -use beacon_chain::data_availability_checker::MaybeAvailableBlock; use beacon_chain::historical_data_columns::HistoricalDataColumnError; use beacon_chain::{ AvailabilityProcessingStatus, BeaconChainTypes, BlockError, ChainSegmentResult, @@ -717,18 +716,27 @@ impl NetworkBeaconProcessor { downloaded_blocks: Vec>, ) -> (usize, Result<(), ChainSegmentFailed>) { let total_blocks = downloaded_blocks.len(); - let available_blocks = match self + let mut available_blocks = vec![]; + + for downloaded_block in downloaded_blocks { + match downloaded_block { + RpcBlock::FullyAvailable(available_block) => available_blocks.push(available_block), + RpcBlock::BlockOnly { .. } => return ( + 0, + Err(ChainSegmentFailed { + peer_action: None, + message: "Invalid downloaded_blocks segment. All downloaded blocks must be fully available".to_string() + }) + ), + } + } + + match self .chain .data_availability_checker - .verify_kzg_for_rpc_blocks(downloaded_blocks) + .batch_verify_kzg_for_available_blocks(&available_blocks) { - Ok(blocks) => blocks - .into_iter() - .filter_map(|maybe_available| match maybe_available { - MaybeAvailableBlock::Available(block) => Some(block), - MaybeAvailableBlock::AvailabilityPending { .. } => None, - }) - .collect::>(), + Ok(blocks) => blocks, Err(e) => match e { AvailabilityCheckError::StoreError(_) => { return ( diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 7b8554662b8..6fe99679b02 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -399,7 +399,16 @@ impl TestRig { self.network_beacon_processor .send_rpc_beacon_block( block_root, - RpcBlock::new_without_blobs(Some(block_root), self.next_block.clone()), + RpcBlock::new( + self.next_block.clone(), + None, + self._harness + .chain + .data_availability_checker + .custody_context(), + self._harness.spec.clone(), + ) + .unwrap(), std::time::Duration::default(), BlockProcessType::SingleBlock { id: 0 }, ) @@ -411,7 +420,16 @@ impl TestRig { self.network_beacon_processor .send_rpc_beacon_block( block_root, - RpcBlock::new_without_blobs(Some(block_root), self.next_block.clone()), + RpcBlock::new( + self.next_block.clone(), + None, + self._harness + .chain + .data_availability_checker + .custody_context(), + self._harness.spec.clone(), + ) + .unwrap(), std::time::Duration::default(), BlockProcessType::SingleBlock { id: 1 }, ) diff --git a/beacon_node/network/src/sync/block_sidecar_coupling.rs b/beacon_node/network/src/sync/block_sidecar_coupling.rs index 6f563820f7c..df103aa3350 100644 --- a/beacon_node/network/src/sync/block_sidecar_coupling.rs +++ b/beacon_node/network/src/sync/block_sidecar_coupling.rs @@ -1,5 +1,8 @@ use beacon_chain::{ - block_verification_types::RpcBlock, data_column_verification::CustodyDataColumn, get_block_root, + BeaconChainTypes, CustodyContext, + block_verification_types::{AvailableBlockData, RpcBlock}, + data_column_verification::CustodyDataColumn, + get_block_root, }; use lighthouse_network::{ PeerId, @@ -192,19 +195,26 @@ impl RangeBlockComponentsRequest { /// Returns `None` if not all expected requests have completed. /// Returns `Some(Ok(_))` with valid RPC blocks if all data is present and valid. /// Returns `Some(Err(_))` if there are issues coupling blocks with their data. - pub fn responses( + pub fn responses( &mut self, - spec: &ChainSpec, - ) -> Option>, CouplingError>> { + custody_context: Arc>, + spec: Arc, + ) -> Option>, CouplingError>> + where + T: BeaconChainTypes, + { let Some(blocks) = self.blocks_request.to_finished() else { return None; }; // Increment the attempt once this function returns the response or errors match &mut self.block_data_request { - RangeBlockDataRequest::NoData => { - Some(Self::responses_with_blobs(blocks.to_vec(), vec![], spec)) - } + RangeBlockDataRequest::NoData => Some(Self::responses_with_blobs( + blocks.to_vec(), + vec![], + custody_context, + spec, + )), RangeBlockDataRequest::Blobs(request) => { let Some(blobs) = request.to_finished() else { return None; @@ -212,6 +222,7 @@ impl RangeBlockComponentsRequest { Some(Self::responses_with_blobs( blocks.to_vec(), blobs.to_vec(), + custody_context, spec, )) } @@ -248,6 +259,8 @@ impl RangeBlockComponentsRequest { column_to_peer_id, expected_custody_columns, *attempt, + custody_context, + spec, ); if let Err(CouplingError::DataColumnPeerFailure { @@ -269,11 +282,15 @@ impl RangeBlockComponentsRequest { } } - fn responses_with_blobs( + fn responses_with_blobs( blocks: Vec>>, blobs: Vec>>, - spec: &ChainSpec, - ) -> Result>, CouplingError> { + custody_context: Arc>, + spec: Arc, + ) -> Result>, CouplingError> + where + T: BeaconChainTypes, + { // There can't be more more blobs than blocks. i.e. sending any blob (empty // included) for a skipped slot is not permitted. let mut responses = Vec::with_capacity(blocks.len()); @@ -315,8 +332,9 @@ impl RangeBlockComponentsRequest { .map_err(|_| { CouplingError::BlobPeerFailure("Blobs returned exceeds max length".to_string()) })?; + let block_data = AvailableBlockData::new_with_blobs(blobs); responses.push( - RpcBlock::new(None, block, Some(blobs)) + RpcBlock::new(block, Some(block_data), &custody_context, spec.clone()) .map_err(|e| CouplingError::BlobPeerFailure(format!("{e:?}")))?, ) } @@ -333,13 +351,18 @@ impl RangeBlockComponentsRequest { Ok(responses) } - fn responses_with_custody_columns( + fn responses_with_custody_columns( blocks: Vec>>, data_columns: DataColumnSidecarList, column_to_peer: HashMap, expects_custody_columns: &[ColumnIndex], attempt: usize, - ) -> Result>, CouplingError> { + custody_context: Arc>, + spec: Arc, + ) -> Result>, CouplingError> + where + T: BeaconChainTypes, + { // Group data columns by block_root and index let mut data_columns_by_block = HashMap::>>>::new(); @@ -415,11 +438,14 @@ impl RangeBlockComponentsRequest { ); } - RpcBlock::new_with_custody_columns(Some(block_root), block, custody_columns) + let block_data = AvailableBlockData::new_with_data_columns(custody_columns.iter().map(|c| c.as_data_column().clone()).collect::>()); + + RpcBlock::new(block, Some(block_data), &custody_context, spec.clone()) .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? } else { // Block has no data, expects zero columns - RpcBlock::new_without_blobs(Some(block_root), block) + RpcBlock::new(block, Some(AvailableBlockData::NoData), &custody_context, spec.clone()) + .map_err(|e| CouplingError::InternalError(format!("{:?}", e)))? }); } @@ -459,10 +485,12 @@ impl ByRangeRequest { #[cfg(test)] mod tests { - use super::RangeBlockComponentsRequest; use crate::sync::network_context::MAX_COLUMN_RETRIES; + + use super::RangeBlockComponentsRequest; use beacon_chain::test_utils::{ - NumBlobs, generate_rand_block_and_blobs, generate_rand_block_and_data_columns, test_spec, + NumBlobs, generate_rand_block_and_blobs, generate_rand_block_and_data_columns, + test_custody_context, test_spec, }; use lighthouse_network::{ PeerId, @@ -472,7 +500,7 @@ mod tests { }, }; use rand::SeedableRng; - use std::sync::Arc; + use std::{collections::HashMap, sync::Arc}; use tracing::Span; use types::{Epoch, ForkName, MinimalEthSpec as E, SignedBeaconBlock, test_utils::XorShiftRng}; @@ -512,8 +540,9 @@ mod tests { } fn is_finished(info: &mut RangeBlockComponentsRequest) -> bool { - let spec = test_spec::(); - info.responses(&spec).is_some() + let spec = Arc::new(test_spec::()); + let custody_context = Arc::new(test_custody_context(spec.clone())); + info.responses(custody_context, spec).is_some() } #[test] @@ -534,8 +563,11 @@ mod tests { // Send blocks and complete terminate response info.add_blocks(blocks_req_id, blocks).unwrap(); + let spec = Arc::new(test_spec::()); + let custody_context = Arc::new(test_custody_context(spec.clone())); + // Assert response is finished and RpcBlocks can be constructed - info.responses(&test_spec::()).unwrap().unwrap(); + info.responses(custody_context, spec).unwrap().unwrap(); } #[test] @@ -565,16 +597,25 @@ mod tests { // Expect no blobs returned info.add_blobs(blobs_req_id, vec![]).unwrap(); - // Assert response is finished and RpcBlocks can be constructed, even if blobs weren't returned. - // This makes sure we don't expect blobs here when they have expired. Checking this logic should - // be hendled elsewhere. - info.responses(&test_spec::()).unwrap().unwrap(); + let mut spec = test_spec::(); + spec.deneb_fork_epoch = Some(Epoch::new(0)); + let spec = Arc::new(spec); + let custody_context = Arc::new(test_custody_context(spec.clone())); + // Assert response is finished and RpcBlocks cannot be constructed, because blobs weren't returned. + let result = info.responses(custody_context, spec).unwrap(); + assert!(result.is_err()) } #[test] fn rpc_block_with_custody_columns() { - let spec = test_spec::(); - let expects_custody_columns = vec![1, 2, 3, 4]; + let mut spec = test_spec::(); + spec.deneb_fork_epoch = Some(Epoch::new(0)); + spec.fulu_fork_epoch = Some(Epoch::new(0)); + let spec = Arc::new(spec); + let custody_context = Arc::new(test_custody_context(spec.clone())); + let expects_custody_columns = custody_context + .custody_columns_for_epoch(None, &spec) + .to_vec(); let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..4) .map(|_| { @@ -638,18 +679,23 @@ mod tests { } // All completed construct response - info.responses(&spec).unwrap().unwrap(); + info.responses(custody_context, spec).unwrap().unwrap(); } #[test] fn rpc_block_with_custody_columns_batched() { - let spec = test_spec::(); - let batched_column_requests = [vec![1_u64, 2], vec![3, 4]]; - let expects_custody_columns = batched_column_requests - .iter() - .flatten() - .cloned() - .collect::>(); + let mut spec = test_spec::(); + spec.deneb_fork_epoch = Some(Epoch::new(0)); + spec.fulu_fork_epoch = Some(Epoch::new(0)); + let spec = Arc::new(spec); + let custody_context = Arc::new(test_custody_context(spec.clone())); + let expected_custody_columns = custody_context + .custody_columns_for_epoch(None, &spec) + .to_vec(); + let batched_column_requests = [ + vec![expected_custody_columns[0], expected_custody_columns[1]], + vec![expected_custody_columns[2], expected_custody_columns[3]], + ]; let custody_column_request_ids = (0..batched_column_requests.len() as u32).collect::>(); let num_of_data_column_requests = custody_column_request_ids.len(); @@ -673,7 +719,7 @@ mod tests { let mut info = RangeBlockComponentsRequest::::new( blocks_req_id, None, - Some((columns_req_id.clone(), expects_custody_columns.clone())), + Some((columns_req_id.clone(), expected_custody_columns.clone())), Span::none(), ); @@ -723,14 +769,17 @@ mod tests { } // All completed construct response - info.responses(&spec).unwrap().unwrap(); + info.responses(custody_context, spec).unwrap().unwrap(); } #[test] fn missing_custody_columns_from_faulty_peers() { // GIVEN: A request expecting custody columns from multiple peers - let spec = test_spec::(); - let expected_custody_columns = vec![1, 2, 3, 4]; + let spec = Arc::new(test_spec::()); + let custody_context = Arc::new(test_custody_context(spec.clone())); + let expected_custody_columns = custody_context + .custody_columns_for_epoch(None, &spec) + .to_vec(); let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..2) .map(|_| { @@ -792,7 +841,7 @@ mod tests { } // WHEN: Attempting to construct RPC blocks - let result = info.responses(&spec).unwrap(); + let result = info.responses(custody_context, spec).unwrap(); // THEN: Should fail with PeerFailure identifying the faulty peers assert!(result.is_err()); @@ -804,8 +853,8 @@ mod tests { { assert!(error.contains("Peers did not return column")); assert_eq!(faulty_peers.len(), 2); // columns 3 and 4 missing - assert_eq!(faulty_peers[0].0, 3); // column index 3 - assert_eq!(faulty_peers[1].0, 4); // column index 4 + assert_eq!(faulty_peers[0].0, expected_custody_columns[2]); // column index 2 + assert_eq!(faulty_peers[1].0, expected_custody_columns[3]); // column index 3 assert!(!exceeded_retries); // First attempt, should be false } else { panic!("Expected PeerFailure error"); @@ -815,8 +864,14 @@ mod tests { #[test] fn retry_logic_after_peer_failures() { // GIVEN: A request expecting custody columns where some peers initially fail - let spec = test_spec::(); - let expected_custody_columns = vec![1, 2]; + let mut spec = test_spec::(); + spec.deneb_fork_epoch = Some(Epoch::new(0)); + spec.fulu_fork_epoch = Some(Epoch::new(0)); + let spec = Arc::new(spec); + let custody_context = Arc::new(test_custody_context(spec.clone())); + let expected_custody_columns = custody_context + .custody_columns_for_epoch(None, &spec) + .to_vec(); let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..2) .map(|_| { @@ -858,23 +913,37 @@ mod tests { ) .unwrap(); - // AND: Only partial custody columns are received (column 1 but not 2) + // AND: Only partial custody columns are received (first column but not second) let (req1, _) = columns_req_id.first().unwrap(); info.add_custody_columns( *req1, blocks .iter() - .flat_map(|b| b.1.iter().filter(|d| d.index == 1).cloned()) + .flat_map(|b| { + b.1.iter() + .filter(|d| d.index == expected_custody_columns[0]) + .cloned() + }) .collect(), ) .unwrap(); - // AND: The missing column request is completed with empty data (peer failure) + // AND: The missing column requests are completed with empty data (peer failure) let (req2, _) = columns_req_id.get(1).unwrap(); info.add_custody_columns(*req2, vec![]).unwrap(); - // WHEN: First attempt to get responses fails - let result = info.responses(&spec).unwrap(); + let (req3, _) = columns_req_id.get(2).unwrap(); + info.add_custody_columns(*req3, vec![]).unwrap(); + + let (req4, _) = columns_req_id.get(3).unwrap(); + info.add_custody_columns(*req4, vec![]).unwrap(); + + let result: Result< + Vec>, + crate::sync::block_sidecar_coupling::CouplingError, + > = info + .responses(custody_context.clone(), spec.clone()) + .unwrap(); assert!(result.is_err()); // AND: We retry with a new peer for the failed column @@ -882,7 +951,15 @@ mod tests { 10 as Id, DataColumnsByRangeRequester::ComponentsByRange(components_id), ); - let failed_column_requests = vec![(new_columns_req_id, vec![2])]; + let failed_column_requests = vec![(new_columns_req_id, vec![expected_custody_columns[1]])]; + info.reinsert_failed_column_requests(failed_column_requests) + .unwrap(); + + let failed_column_requests = vec![(new_columns_req_id, vec![expected_custody_columns[2]])]; + info.reinsert_failed_column_requests(failed_column_requests) + .unwrap(); + + let failed_column_requests = vec![(new_columns_req_id, vec![expected_custody_columns[3]])]; info.reinsert_failed_column_requests(failed_column_requests) .unwrap(); @@ -891,13 +968,21 @@ mod tests { new_columns_req_id, blocks .iter() - .flat_map(|b| b.1.iter().filter(|d| d.index == 2).cloned()) + .flat_map(|b| { + b.1.iter() + .filter(|d| { + d.index == expected_custody_columns[1] + || d.index == expected_custody_columns[2] + || d.index == expected_custody_columns[3] + }) + .cloned() + }) .collect(), ) .unwrap(); // WHEN: Attempting to get responses again - let result = info.responses(&spec).unwrap(); + let result = info.responses(custody_context, spec).unwrap(); // THEN: Should succeed with complete RPC blocks assert!(result.is_ok()); @@ -908,8 +993,14 @@ mod tests { #[test] fn max_retries_exceeded_behavior() { // GIVEN: A request where peers consistently fail to provide required columns - let spec = test_spec::(); - let expected_custody_columns = vec![1, 2]; + let mut spec = test_spec::(); + spec.deneb_fork_epoch = Some(Epoch::new(0)); + spec.fulu_fork_epoch = Some(Epoch::new(0)); + let spec = Arc::new(spec); + let custody_context = Arc::new(test_custody_context(spec.clone())); + let expected_custody_columns = custody_context + .custody_columns_for_epoch(None, &spec) + .to_vec(); let mut rng = XorShiftRng::from_seed([42; 16]); let blocks = (0..1) .map(|_| { @@ -966,9 +1057,19 @@ mod tests { let (req2, _) = columns_req_id.get(1).unwrap(); info.add_custody_columns(*req2, vec![]).unwrap(); + // AND: Column 3 request completes with empty data (persistent peer failure) + let (req3, _) = columns_req_id.get(2).unwrap(); + info.add_custody_columns(*req3, vec![]).unwrap(); + + // AND: Column 4 request completes with empty data (persistent peer failure) + let (req4, _) = columns_req_id.get(3).unwrap(); + info.add_custody_columns(*req4, vec![]).unwrap(); + // WHEN: Multiple retry attempts are made (up to max retries) for _ in 0..MAX_COLUMN_RETRIES { - let result = info.responses(&spec).unwrap(); + let result = info + .responses(custody_context.clone(), spec.clone()) + .unwrap(); assert!(result.is_err()); if let Err(super::CouplingError::DataColumnPeerFailure { @@ -981,7 +1082,7 @@ mod tests { } // AND: One final attempt after exceeding max retries - let result = info.responses(&spec).unwrap(); + let result = info.responses(custody_context, spec).unwrap(); // THEN: Should fail with exceeded_retries = true assert!(result.is_err()); @@ -991,8 +1092,13 @@ mod tests { exceeded_retries, }) = result { - assert_eq!(faulty_peers.len(), 1); // column 2 missing - assert_eq!(faulty_peers[0].0, 2); // column index 2 + assert_eq!(faulty_peers.len(), 4); + + let mut faulty_peers = faulty_peers.into_iter().collect::>(); + for column in expected_custody_columns { + faulty_peers.remove(&column); + } + assert!(faulty_peers.is_empty()); assert!(exceeded_retries); // Should be true after max retries } else { panic!("Expected PeerFailure error with exceeded_retries=true"); diff --git a/beacon_node/network/src/sync/network_context.rs b/beacon_node/network/src/sync/network_context.rs index 069d51764f0..e984618b369 100644 --- a/beacon_node/network/src/sync/network_context.rs +++ b/beacon_node/network/src/sync/network_context.rs @@ -776,7 +776,13 @@ impl SyncNetworkContext { } let range_req = entry.get_mut(); - if let Some(blocks_result) = range_req.responses(&self.chain.spec) { + if let Some(blocks_result) = range_req.responses( + self.chain + .data_availability_checker + .custody_context() + .clone(), + self.chain.spec.clone(), + ) { if let Err(CouplingError::DataColumnPeerFailure { error, faulty_peers: _, @@ -1366,12 +1372,14 @@ impl SyncNetworkContext { if self .chain .data_availability_checker + .custody_context() .data_columns_required_for_epoch(epoch) { ByRangeRequestType::BlocksAndColumns } else if self .chain .data_availability_checker + .custody_context() .blobs_required_for_epoch(epoch) { ByRangeRequestType::BlocksAndBlobs @@ -1605,7 +1613,13 @@ impl SyncNetworkContext { .beacon_processor_if_enabled() .ok_or(SendErrorProcessor::ProcessorNotAvailable)?; - let block = RpcBlock::new_without_blobs(Some(block_root), block); + let block = RpcBlock::new( + block, + None, + self.chain.data_availability_checker.custody_context(), + self.chain.spec.clone(), + ) + .map_err(|_| SendErrorProcessor::SendError)?; debug!(block = ?block_root, id, "Sending block for processing"); // Lookup sync event safety: If `beacon_processor.send_rpc_beacon_block` returns Ok() sync diff --git a/beacon_node/network/src/sync/tests/lookups.rs b/beacon_node/network/src/sync/tests/lookups.rs index 715928906ee..3b4bc0c8f7d 100644 --- a/beacon_node/network/src/sync/tests/lookups.rs +++ b/beacon_node/network/src/sync/tests/lookups.rs @@ -101,8 +101,6 @@ impl TestRig { .network_globals .set_sync_state(SyncState::Synced); - let spec = chain.spec.clone(); - // deterministic seed let rng_08 = ::from_seed([0u8; 32]); let rng = ChaCha20Rng::from_seed([0u8; 32]); @@ -128,7 +126,6 @@ impl TestRig { ), harness, fork_name, - spec, } } @@ -1929,7 +1926,6 @@ mod deneb_only { block_verification_types::{AsBlock, RpcBlock}, data_availability_checker::AvailabilityCheckError, }; - use ssz_types::RuntimeVariableList; use std::collections::VecDeque; struct DenebTester { @@ -2283,15 +2279,17 @@ mod deneb_only { fn parent_block_unknown_parent(mut self) -> Self { self.rig.log("parent_block_unknown_parent"); let block = self.unknown_parent_block.take().unwrap(); - let max_len = self.rig.spec.max_blobs_per_block(block.epoch()) as usize; // Now this block is the one we expect requests from self.block = block.clone(); let block = RpcBlock::new( - Some(block.canonical_root()), block, - self.unknown_parent_blobs - .take() - .map(|vec| RuntimeVariableList::new(vec, max_len).unwrap()), + None, + self.rig + .harness + .chain + .data_availability_checker + .custody_context(), + self.rig.harness.chain.spec.clone(), ) .unwrap(); self.rig.parent_block_processed( diff --git a/beacon_node/network/src/sync/tests/mod.rs b/beacon_node/network/src/sync/tests/mod.rs index 23c14ff63ef..dcc7e3e49df 100644 --- a/beacon_node/network/src/sync/tests/mod.rs +++ b/beacon_node/network/src/sync/tests/mod.rs @@ -16,7 +16,7 @@ use tokio::sync::mpsc; use tracing_subscriber::fmt::MakeWriter; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; -use types::{ChainSpec, ForkName, MinimalEthSpec as E}; +use types::{ForkName, MinimalEthSpec as E}; mod lookups; mod range; @@ -68,7 +68,6 @@ struct TestRig { rng_08: rand_chacha_03::ChaCha20Rng, rng: ChaCha20Rng, fork_name: ForkName, - spec: Arc, } // Environment variable to read if `fork_from_env` feature is enabled. diff --git a/beacon_node/network/src/sync/tests/range.rs b/beacon_node/network/src/sync/tests/range.rs index cb728a90c1b..9458f5dd616 100644 --- a/beacon_node/network/src/sync/tests/range.rs +++ b/beacon_node/network/src/sync/tests/range.rs @@ -5,6 +5,8 @@ use crate::sync::SyncMessage; use crate::sync::manager::SLOT_IMPORT_TOLERANCE; use crate::sync::network_context::RangeRequestId; use crate::sync::range_sync::RangeSyncType; +use beacon_chain::BeaconChain; +use beacon_chain::block_verification_types::AvailableBlockData; use beacon_chain::data_column_verification::CustodyDataColumn; use beacon_chain::test_utils::{AttestationStrategy, BlockStrategy}; use beacon_chain::{EngineState, NotifyExecutionLayer, block_verification_types::RpcBlock}; @@ -427,7 +429,7 @@ impl TestRig { .chain .process_block( block_root, - build_rpc_block(block.into(), &data_sidecars), + build_rpc_block(block.into(), &data_sidecars, self.harness.chain.clone()), NotifyExecutionLayer::Yes, BlockImportSource::RangeSync, || Ok(()), @@ -443,16 +445,42 @@ impl TestRig { fn build_rpc_block( block: Arc>, data_sidecars: &Option>, + chain: Arc>, ) -> RpcBlock { match data_sidecars { Some(DataSidecars::Blobs(blobs)) => { - RpcBlock::new(None, block, Some(blobs.clone())).unwrap() + let block_data = AvailableBlockData::new_with_blobs(blobs.clone()); + RpcBlock::new( + block, + Some(block_data), + chain.data_availability_checker.custody_context(), + chain.spec.clone(), + ) + .unwrap() } Some(DataSidecars::DataColumns(columns)) => { - RpcBlock::new_with_custody_columns(None, block, columns.clone()).unwrap() + let block_data = AvailableBlockData::new_with_data_columns( + columns + .iter() + .map(|c| c.as_data_column().clone()) + .collect::>(), + ); + RpcBlock::new( + block, + Some(block_data), + chain.data_availability_checker.custody_context(), + chain.spec.clone(), + ) + .unwrap() } // Block has no data, expects zero columns - None => RpcBlock::new_without_blobs(None, block), + None => RpcBlock::new( + block, + Some(AvailableBlockData::NoData), + chain.data_availability_checker.custody_context(), + chain.spec.clone(), + ) + .unwrap(), } } diff --git a/testing/ef_tests/src/cases/fork_choice.rs b/testing/ef_tests/src/cases/fork_choice.rs index 93a8805240a..0d3378523aa 100644 --- a/testing/ef_tests/src/cases/fork_choice.rs +++ b/testing/ef_tests/src/cases/fork_choice.rs @@ -524,6 +524,7 @@ impl Tester { valid: bool, ) -> Result<(), Error> { let block_root = block.canonical_root(); + let mut data_column_success = true; if let Some(columns) = columns.clone() { @@ -551,13 +552,24 @@ impl Tester { let block = Arc::new(block); let result: Result, _> = self - .block_on_dangerous(self.harness.chain.process_block( - block_root, - RpcBlock::new_without_blobs(Some(block_root), block.clone()), - NotifyExecutionLayer::Yes, - BlockImportSource::Lookup, - || Ok(()), - ))? + .block_on_dangerous( + self.harness.chain.process_block( + block_root, + RpcBlock::new( + block.clone(), + None, + self.harness + .chain + .data_availability_checker + .custody_context(), + self.harness.chain.spec.clone(), + ) + .map_err(|e| Error::InternalError(format!("{:?}", e)))?, + NotifyExecutionLayer::Yes, + BlockImportSource::Lookup, + || Ok(()), + ), + )? .map(|avail: AvailabilityProcessingStatus| avail.try_into()); let success = data_column_success && result.as_ref().is_ok_and(|inner| inner.is_ok()); if success != valid { @@ -641,13 +653,24 @@ impl Tester { let block = Arc::new(block); let result: Result, _> = self - .block_on_dangerous(self.harness.chain.process_block( - block_root, - RpcBlock::new_without_blobs(Some(block_root), block.clone()), - NotifyExecutionLayer::Yes, - BlockImportSource::Lookup, - || Ok(()), - ))? + .block_on_dangerous( + self.harness.chain.process_block( + block_root, + RpcBlock::new( + block.clone(), + None, + self.harness + .chain + .data_availability_checker + .custody_context(), + self.harness.chain.spec.clone(), + ) + .map_err(|e| Error::InternalError(format!("{:?}", e)))?, + NotifyExecutionLayer::Yes, + BlockImportSource::Lookup, + || Ok(()), + ), + )? .map(|avail: AvailabilityProcessingStatus| avail.try_into()); let success = blob_success && result.as_ref().is_ok_and(|inner| inner.is_ok()); if success != valid {