Skip to content

Batch verify KZG proofs for getBlobsV2 #7582

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3699,7 +3699,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
data_columns.iter().map(|c| c.as_data_column()),
)?;
self.data_availability_checker
.put_gossip_verified_data_columns(block_root, data_columns)?
.put_kzg_verified_custody_data_columns(block_root, data_columns)?
}
};

Expand Down
16 changes: 14 additions & 2 deletions beacon_node/beacon_chain/src/data_availability_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,9 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
custody_columns: DataColumnSidecarList<T::EthSpec>,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
// Attributes fault to the specific peer that sent an invalid column
let kzg_verified_columns = KzgVerifiedDataColumn::from_batch(custody_columns, &self.kzg)
.map_err(AvailabilityCheckError::InvalidColumn)?;
let kzg_verified_columns =
KzgVerifiedDataColumn::from_batch_with_scoring(custody_columns, &self.kzg)
.map_err(AvailabilityCheckError::InvalidColumn)?;

let verified_custody_columns = kzg_verified_columns
.into_iter()
Expand Down Expand Up @@ -285,6 +286,17 @@ impl<T: BeaconChainTypes> DataAvailabilityChecker<T> {
.put_kzg_verified_data_columns(block_root, custody_columns)
}

pub fn put_kzg_verified_custody_data_columns<
I: IntoIterator<Item = KzgVerifiedCustodyDataColumn<T::EthSpec>>,
>(
&self,
block_root: Hash256,
custody_columns: I,
) -> Result<Availability<T::EthSpec>, AvailabilityCheckError> {
self.availability_cache
.put_kzg_verified_data_columns(block_root, custody_columns)
}

/// Check if we have all the blobs for a block. Returns `Availability` which has information
/// about whether all components have been received or more are required.
pub fn put_pending_executed_block(
Expand Down
11 changes: 11 additions & 0 deletions beacon_node/beacon_chain/src/data_column_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,17 @@ impl<E: EthSpec> KzgVerifiedDataColumn<E> {
pub fn from_batch(
data_columns: Vec<Arc<DataColumnSidecar<E>>>,
kzg: &Kzg,
) -> Result<Vec<Self>, KzgError> {
verify_kzg_for_data_column_list(data_columns.iter(), kzg)?;
Ok(data_columns
.into_iter()
.map(|column| Self { data: column })
.collect())
}

pub fn from_batch_with_scoring(
data_columns: Vec<Arc<DataColumnSidecar<E>>>,
kzg: &Kzg,
) -> Result<Vec<Self>, Vec<(ColumnIndex, KzgError)>> {
verify_kzg_for_data_column_list_with_scoring(data_columns.iter(), kzg)?;
Ok(data_columns
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::data_column_verification::KzgVerifiedDataColumn;
use crate::fetch_blobs::{EngineGetBlobsOutput, FetchEngineBlobError};
use crate::observed_block_producers::ProposalKey;
use crate::observed_data_sidecars::DoNotObserve;
use crate::{AvailabilityProcessingStatus, BeaconChain, BeaconChainTypes};
use execution_layer::json_structures::{BlobAndProofV1, BlobAndProofV2};
use kzg::Kzg;
use kzg::{Error as KzgError, Kzg};
#[cfg(test)]
use mockall::automock;
use std::collections::HashSet;
use std::sync::Arc;
use task_executor::TaskExecutor;
use types::{BlobSidecar, ChainSpec, DataColumnSidecar, Hash256, Slot};
use types::{BlobSidecar, ChainSpec, ColumnIndex, DataColumnSidecar, Hash256, Slot};

/// An adapter to the `BeaconChain` functionalities to remove `BeaconChain` from direct dependency to enable testing fetch blobs logic.
pub(crate) struct FetchBlobsBeaconAdapter<T: BeaconChainTypes> {
Expand Down Expand Up @@ -75,12 +77,28 @@ impl<T: BeaconChainTypes> FetchBlobsBeaconAdapter<T> {
GossipVerifiedBlob::<T, DoNotObserve>::new(blob.clone(), blob.index, &self.chain)
}

pub(crate) fn verify_data_column_for_gossip(
pub(crate) fn verify_data_columns_kzg(
&self,
data_column: Arc<DataColumnSidecar<T::EthSpec>>,
) -> Result<GossipVerifiedDataColumn<T, DoNotObserve>, GossipDataColumnError> {
let index = data_column.index;
GossipVerifiedDataColumn::<T, DoNotObserve>::new(data_column, index, &self.chain)
data_columns: Vec<Arc<DataColumnSidecar<T::EthSpec>>>,
) -> Result<Vec<KzgVerifiedDataColumn<T::EthSpec>>, KzgError> {
KzgVerifiedDataColumn::from_batch(data_columns, &self.chain.kzg)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KzgVerifiedDataColumn::from_batch actually does up to two round of verifications, if batch verification fails, it attempts individual verification to identify bad columns - the purpose is to identify which peer sent the invalid columns.

In our case here, we don't really need to know exactly which columns fails, so we can just run a single pass with verify_kzg_for_data_column_list

}

pub(crate) fn known_for_proposal(
&self,
proposal_key: ProposalKey,
) -> Option<HashSet<ColumnIndex>> {
self.chain
.observed_column_sidecars
.read()
.known_for_proposal(&proposal_key)
.cloned()
}

pub(crate) fn cached_data_column_indexes(&self, block_root: &Hash256) -> Option<Vec<u64>> {
self.chain
.data_availability_checker
.cached_data_column_indexes(block_root)
}

pub(crate) async fn process_engine_blobs(
Expand Down
80 changes: 37 additions & 43 deletions beacon_node/beacon_chain/src/fetch_blobs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ mod fetch_blobs_beacon_adapter;
mod tests;

use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::block_verification_types::AsBlock;
use crate::data_column_verification::KzgVerifiedCustodyDataColumn;
#[cfg_attr(test, double)]
use crate::fetch_blobs::fetch_blobs_beacon_adapter::FetchBlobsBeaconAdapter;
use crate::kzg_utils::blobs_to_data_column_sidecars;
use crate::observed_block_producers::ProposalKey;
use crate::observed_data_sidecars::DoNotObserve;
use crate::{
metrics, AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes,
Expand Down Expand Up @@ -46,7 +48,7 @@ use types::{
pub enum EngineGetBlobsOutput<T: BeaconChainTypes> {
Blobs(Vec<GossipVerifiedBlob<T, DoNotObserve>>),
/// A filtered list of custody data columns to be imported into the `DataAvailabilityChecker`.
CustodyColumns(Vec<GossipVerifiedDataColumn<T, DoNotObserve>>),
CustodyColumns(Vec<KzgVerifiedCustodyDataColumn<T::EthSpec>>),
}

#[derive(Debug)]
Expand All @@ -59,7 +61,7 @@ pub enum FetchEngineBlobError {
ExecutionLayerMissing,
InternalError(String),
GossipBlob(GossipBlobError),
GossipDataColumn(GossipDataColumnError),
KzgError(kzg::Error),
RequestFailed(ExecutionLayerError),
RuntimeShutdown,
TokioJoin(tokio::task::JoinError),
Expand Down Expand Up @@ -293,6 +295,7 @@ async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
let chain_adapter = Arc::new(chain_adapter);
let custody_columns_to_import = compute_custody_columns_to_import(
&chain_adapter,
block_root,
block.clone(),
blobs,
proofs,
Expand Down Expand Up @@ -326,11 +329,12 @@ async fn fetch_and_process_blobs_v2<T: BeaconChainTypes>(
/// Offload the data column computation to a blocking task to avoid holding up the async runtime.
async fn compute_custody_columns_to_import<T: BeaconChainTypes>(
chain_adapter: &Arc<FetchBlobsBeaconAdapter<T>>,
block_root: Hash256,
block: Arc<SignedBeaconBlock<T::EthSpec, FullPayload<T::EthSpec>>>,
blobs: Vec<Blob<T::EthSpec>>,
proofs: Vec<KzgProofs<T::EthSpec>>,
custody_columns_indices: HashSet<ColumnIndex>,
) -> Result<Vec<GossipVerifiedDataColumn<T, DoNotObserve>>, FetchEngineBlobError> {
) -> Result<Vec<KzgVerifiedCustodyDataColumn<T::EthSpec>>, FetchEngineBlobError> {
let kzg = chain_adapter.kzg().clone();
let spec = chain_adapter.spec().clone();
let chain_adapter_cloned = chain_adapter.clone();
Expand All @@ -353,57 +357,47 @@ async fn compute_custody_columns_to_import<T: BeaconChainTypes>(
// This filtering ensures we only import and publish the custody columns.
// `DataAvailabilityChecker` requires a strict match on custody columns count to
// consider a block available.
let custody_columns = data_columns_result
let mut custody_columns = data_columns_result
.map(|mut data_columns| {
data_columns.retain(|col| custody_columns_indices.contains(&col.index));
data_columns
})
.map_err(FetchEngineBlobError::DataColumnSidecarError)?;

// Gossip verify data columns before publishing. This prevents blobs with invalid
// Only consider columns that are not already observed on gossip.
if let Some(observed_columns) = chain_adapter_cloned.known_for_proposal(
ProposalKey::new(block.message().proposer_index(), block.slot()),
) {
custody_columns.retain(|col| !observed_columns.contains(&col.index));
if custody_columns.is_empty() {
return Ok(vec![]);
}
}

// Only consider columns that are not already known to data availability.
if let Some(known_columns) =
chain_adapter_cloned.cached_data_column_indexes(&block_root)
{
custody_columns.retain(|col| !known_columns.contains(&col.index));
if custody_columns.is_empty() {
return Ok(vec![]);
}
}

// KZG verify data columns before publishing. This prevents blobs with invalid
// KZG proofs from the EL making it into the data availability checker. We do not
// immediately add these blobs to the observed blobs/columns cache because we want
// to allow blobs/columns to arrive on gossip and be accepted (and propagated) while
// we are waiting to publish. Just before publishing we will observe the blobs/columns
// and only proceed with publishing if they are not yet seen.
// TODO(das): we may want to just perform kzg proof verification here, since the
// `DataColumnSidecar` and inclusion proof is computed just above and is unnecessary
// to verify them.
let columns_to_import_and_publish = custody_columns
.into_iter()
.filter_map(|col| {
match chain_adapter_cloned.verify_data_column_for_gossip(col) {
Ok(verified) => Some(Ok(verified)),
Err(e) => match e {
// Ignore already seen data columns
GossipDataColumnError::PriorKnown { .. }
| GossipDataColumnError::PriorKnownUnpublished => None,
GossipDataColumnError::BeaconChainError(_)
| GossipDataColumnError::ProposalSignatureInvalid
| GossipDataColumnError::UnknownValidator(_)
| GossipDataColumnError::IsNotLaterThanParent { .. }
| GossipDataColumnError::InvalidKzgProof(_)
| GossipDataColumnError::InvalidSubnetId { .. }
| GossipDataColumnError::FutureSlot { .. }
| GossipDataColumnError::PastFinalizedSlot { .. }
| GossipDataColumnError::PubkeyCacheTimeout
| GossipDataColumnError::ProposerIndexMismatch { .. }
| GossipDataColumnError::ParentUnknown { .. }
| GossipDataColumnError::NotFinalizedDescendant { .. }
| GossipDataColumnError::InvalidInclusionProof
| GossipDataColumnError::InvalidColumnIndex(_)
| GossipDataColumnError::UnexpectedDataColumn
| GossipDataColumnError::InconsistentCommitmentsLength { .. }
| GossipDataColumnError::InconsistentProofsLength { .. } => {
Some(Err(e))
}
},
}
})
.collect::<Result<Vec<_>, _>>()
.map_err(FetchEngineBlobError::GossipDataColumn)?;
let verified = chain_adapter_cloned
.verify_data_columns_kzg(custody_columns)
.map_err(FetchEngineBlobError::KzgError)?;

Ok(columns_to_import_and_publish)
Ok(verified
.into_iter()
.map(KzgVerifiedCustodyDataColumn::from_asserted_custody)
.collect())
},
"compute_custody_columns_to_import",
)
Expand Down
24 changes: 13 additions & 11 deletions beacon_node/beacon_chain/src/fetch_blobs/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use crate::data_column_verification::KzgVerifiedDataColumn;
use crate::fetch_blobs::fetch_blobs_beacon_adapter::MockFetchBlobsBeaconAdapter;
use crate::fetch_blobs::{
fetch_and_process_engine_blobs_inner, EngineGetBlobsOutput, FetchEngineBlobError,
Expand Down Expand Up @@ -156,14 +156,8 @@ mod get_blobs_v2 {
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
// All data columns already seen on gossip
mock_adapter
.expect_verify_data_column_for_gossip()
.returning(|c| {
Err(GossipDataColumnError::PriorKnown {
proposer: c.block_proposer_index(),
slot: c.slot(),
index: c.index,
})
});
.expect_known_for_proposal()
.returning(|_| Some(hashset![0, 1, 2]));
// No blobs should be processed
mock_adapter.expect_process_engine_blobs().times(0);

Expand Down Expand Up @@ -198,9 +192,17 @@ mod get_blobs_v2 {
// All blobs returned, fork choice doesn't contain block
mock_get_blobs_v2_response(&mut mock_adapter, Some(blobs_and_proofs));
mock_fork_choice_contains_block(&mut mock_adapter, vec![]);
mock_adapter.expect_known_for_proposal().returning(|_| None);
mock_adapter
.expect_verify_data_column_for_gossip()
.returning(|c| Ok(GossipVerifiedDataColumn::__new_for_testing(c)));
.expect_cached_data_column_indexes()
.returning(|_| None);
mock_adapter
.expect_verify_data_columns_kzg()
.returning(|c| {
Ok(c.into_iter()
.map(KzgVerifiedDataColumn::__new_for_testing)
.collect())
});
mock_process_engine_blobs_result(
&mut mock_adapter,
Ok(AvailabilityProcessingStatus::Imported(block_root)),
Expand Down
4 changes: 4 additions & 0 deletions beacon_node/beacon_chain/src/observed_data_sidecars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ impl<T: ObservableDataSidecar> ObservedDataSidecars<T> {
Ok(is_known)
}

pub fn known_for_proposal(&self, proposal_key: &ProposalKey) -> Option<&HashSet<u64>> {
self.items.get(proposal_key)
}

fn sanitize_data_sidecar(&self, data_sidecar: &T) -> Result<(), Error> {
if data_sidecar.index() >= T::max_num_of_items(&self.spec, data_sidecar.slot()) as u64 {
return Err(Error::InvalidDataIndex(data_sidecar.index()));
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -853,7 +853,7 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
EngineGetBlobsOutput::CustodyColumns(columns) => {
self_cloned.publish_data_columns_gradually(
columns.into_iter().map(|c| c.clone_data_column()).collect(),
columns.into_iter().map(|c| c.clone_arc()).collect(),
block_root,
);
}
Expand Down