Skip to content

Fix for #7212 Call engine_getBlobs as soon as receiving a valid data_column_sidecar from gossip #7329

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 168 additions & 1 deletion beacon_node/beacon_chain/src/fetch_blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
//! supernodes.

use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use crate::kzg_utils::blobs_to_data_column_sidecars;
use crate::data_column_verification::KzgVerifiedDataColumn;
use crate::kzg_utils::{blobs_to_data_column_sidecars, blobs_to_data_column_sidecars_with_column};
use crate::observed_data_sidecars::DoNotObserve;
use crate::{
metrics, AvailabilityProcessingStatus, BeaconChain, BeaconChainError, BeaconChainTypes,
Expand Down Expand Up @@ -363,3 +364,169 @@ fn build_blob_sidecars<E: EthSpec>(
}
Ok(fixed_blob_sidecar_list)
}

pub async fn fetch_and_process_engine_blobs_with_column<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
block_root: Hash256,
data_column: KzgVerifiedDataColumn<T::EthSpec>,
custody_columns: HashSet<ColumnIndex>,
publish_fn: impl Fn(BlobsOrDataColumns<T>) + Send + 'static,
) -> Result<Option<AvailabilityProcessingStatus>, FetchEngineBlobError> {
let versioned_hashes = data_column
.as_data_column()
.kzg_commitments
.iter()
.map(kzg_commitment_to_versioned_hash)
.collect::<Vec<_>>();

debug!(
num_expected_blobs = versioned_hashes.len(),
"Fetching blobs from the EL"
);

fetch_and_process_blobs_v2_with_column(
chain,
block_root,
data_column,
versioned_hashes,
custody_columns,
publish_fn,
)
.await
}

async fn fetch_and_process_blobs_v2_with_column<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
block_root: Hash256,
data_column: KzgVerifiedDataColumn<T::EthSpec>,
versioned_hashes: Vec<VersionedHash>,
custody_columns_indices: HashSet<ColumnIndex>,
publish_fn: impl Fn(BlobsOrDataColumns<T>) + Send + 'static,
) -> Result<Option<AvailabilityProcessingStatus>, FetchEngineBlobError> {
let num_expected_blobs = versioned_hashes.len();
let execution_layer = chain
.execution_layer
.as_ref()
.ok_or(FetchEngineBlobError::ExecutionLayerMissing)?;

metrics::observe(&metrics::BLOBS_FROM_EL_EXPECTED, num_expected_blobs as f64);
debug!(num_expected_blobs, "Fetching blobs from the EL");
let response = execution_layer
.get_blobs_v2(versioned_hashes)
.await
.inspect_err(|_| {
inc_counter(&metrics::BLOBS_FROM_EL_ERROR_TOTAL);
})
.map_err(FetchEngineBlobError::RequestFailed)?;

let (blobs, proofs): (Vec<_>, Vec<_>) = response
.into_iter()
.filter_map(|blob_and_proof_opt| {
blob_and_proof_opt.map(|blob_and_proof| {
let BlobAndProofV2 { blob, proofs } = blob_and_proof;
(blob, proofs)
})
})
.unzip();

let num_fetched_blobs = blobs.len();
metrics::observe(&metrics::BLOBS_FROM_EL_RECEIVED, num_fetched_blobs as f64);

// Partial blobs response isn't useful for PeerDAS, so we don't bother building and publishing data columns.
if num_fetched_blobs != num_expected_blobs {
debug!(
info = "Unable to compute data columns",
num_fetched_blobs, num_expected_blobs, "Not all blobs fetched from the EL"
);
inc_counter(&metrics::BLOBS_FROM_EL_MISS_TOTAL);
return Ok(None);
} else {
inc_counter(&metrics::BLOBS_FROM_EL_HIT_TOTAL);
}

if chain
.canonical_head
.fork_choice_read_lock()
.contains_block(&block_root)
{
// Avoid computing columns if block has already been imported.
debug!(
info = "block has already been imported",
"Ignoring EL blobs response"
);
return Ok(None);
}

let custody_columns = compute_and_publish_data_columns_with_column(
&chain,
data_column.clone(),
blobs,
proofs,
custody_columns_indices,
publish_fn,
)
.await?;

debug!(num_fetched_blobs, "Processing engine blobs");

let availability_processing_status = chain
.process_engine_blobs(
data_column.to_data_column().slot(),
block_root,
EngineGetBlobsOutput::CustodyColumns(custody_columns),
)
.await
.map_err(FetchEngineBlobError::BlobProcessingError)?;

Ok(Some(availability_processing_status))
}

/// Offload the data column computation to a blocking task to avoid holding up the async runtime.
async fn compute_and_publish_data_columns_with_column<T: BeaconChainTypes>(
chain: &Arc<BeaconChain<T>>,
data_column: KzgVerifiedDataColumn<T::EthSpec>,
blobs: Vec<Blob<T::EthSpec>>,
proofs: Vec<KzgProofs<T::EthSpec>>,
custody_columns_indices: HashSet<ColumnIndex>,
publish_fn: impl Fn(BlobsOrDataColumns<T>) + Send + 'static,
) -> Result<DataColumnSidecarList<T::EthSpec>, FetchEngineBlobError> {
let chain_cloned = chain.clone();
chain
.spawn_blocking_handle(
move || {
let mut timer = metrics::start_timer_vec(
&metrics::DATA_COLUMN_SIDECAR_COMPUTATION,
&[&blobs.len().to_string()],
);

let blob_refs = blobs.iter().collect::<Vec<_>>();
let cell_proofs = proofs.into_iter().flatten().collect();
let data_columns_result = blobs_to_data_column_sidecars_with_column(
&blob_refs,
cell_proofs,
data_column.as_data_column(),
&chain_cloned.kzg,
&chain_cloned.spec,
)
.discard_timer_on_break(&mut timer);
drop(timer);

// This filtering ensures we only import and publish the custody columns.
// `DataAvailabilityChecker` requires a strict match on custody columns count to
// consider a block available.
let custody_columns = data_columns_result
.map(|mut data_columns| {
data_columns.retain(|col| custody_columns_indices.contains(&col.index));
data_columns
})
.map_err(FetchEngineBlobError::DataColumnSidecarError)?;

publish_fn(BlobsOrDataColumns::DataColumns(custody_columns.clone()));
Ok(custody_columns)
},
"compute_and_publish_data_columns",
)
.await
.map_err(FetchEngineBlobError::BeaconChainError)
.and_then(|r| r)
}
50 changes: 50 additions & 0 deletions beacon_node/beacon_chain/src/kzg_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,56 @@ pub fn blobs_to_data_column_sidecars<E: EthSpec>(
.map_err(DataColumnSidecarError::BuildSidecarFailed)
}

pub fn blobs_to_data_column_sidecars_with_column<E: EthSpec>(
blobs: &[&Blob<E>],
cell_proofs: Vec<KzgProof>,
data_column: &DataColumnSidecar<E>,
kzg: &Kzg,
spec: &ChainSpec,
) -> Result<DataColumnSidecarList<E>, DataColumnSidecarError> {
if blobs.is_empty() {
return Ok(vec![]);
}

let kzg_commitments = data_column.kzg_commitments.clone();
let kzg_commitments_inclusion_proof = data_column.kzg_commitments_inclusion_proof.clone();
let signed_block_header = data_column.signed_block_header.clone();

let proof_chunks = cell_proofs
.chunks_exact(spec.number_of_columns as usize)
.collect::<Vec<_>>();

// NOTE: assumes blob sidecars are ordered by index
let blob_cells_and_proofs_vec = blobs
.into_par_iter()
.zip(proof_chunks.into_par_iter())
.map(|(blob, proofs)| {
let blob = blob
.as_ref()
.try_into()
.expect("blob should have a guaranteed size due to FixedVector");

kzg.compute_cells(blob).map(|cells| {
(
cells,
proofs
.try_into()
.expect("proof chunks should have exactly `number_of_columns` proofs"),
)
})
})
.collect::<Result<Vec<_>, KzgError>>()?;

build_data_column_sidecars(
kzg_commitments,
kzg_commitments_inclusion_proof,
signed_block_header,
blob_cells_and_proofs_vec,
spec,
)
.map_err(DataColumnSidecarError::BuildSidecarFailed)
}

pub fn compute_cells<E: EthSpec>(blobs: &[&Blob<E>], kzg: &Kzg) -> Result<Vec<KzgCell>, KzgError> {
let cells_vec = blobs
.into_par_iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
};
use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use beacon_chain::block_verification_types::AsBlock;
use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn};
use beacon_chain::data_column_verification::{GossipDataColumnError, GossipVerifiedDataColumn, KzgVerifiedDataColumn};
use beacon_chain::store::Error;
use beacon_chain::{
attestation_verification::{self, Error as AttnError, VerifiedAttestation},
Expand Down Expand Up @@ -1132,6 +1132,23 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
let data_column_slot = verified_data_column.slot();
let data_column_index = verified_data_column.id().index;

let publish_blobs = true;
let self_clone = self.clone();
let column_clone = KzgVerifiedDataColumn::from_verified(verified_data_column.clone_data_column());

self.executor.spawn(
async move {
self_clone
.fetch_engine_blobs_and_publish_with_column(
column_clone,
block_root,
publish_blobs,
)
.await
},
"fetch_blobs_gossip"
);

let result = self
.chain
.process_gossip_data_columns(vec![verified_data_column], || Ok(()))
Expand Down
79 changes: 77 additions & 2 deletions beacon_node/network/src/network_beacon_processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use crate::sync::SamplingId;
use crate::{service::NetworkMessage, sync::manager::SyncMessage};
use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob};
use beacon_chain::block_verification_types::RpcBlock;
use beacon_chain::data_column_verification::{observe_gossip_data_column, GossipDataColumnError};
use beacon_chain::data_column_verification::{observe_gossip_data_column, GossipDataColumnError, KzgVerifiedDataColumn};
use beacon_chain::fetch_blobs::{
fetch_and_process_engine_blobs, BlobsOrDataColumns, FetchEngineBlobError,
fetch_and_process_engine_blobs, fetch_and_process_engine_blobs_with_column, BlobsOrDataColumns, FetchEngineBlobError
};
use beacon_chain::observed_data_sidecars::DoNotObserve;
use beacon_chain::{
Expand Down Expand Up @@ -912,6 +912,81 @@ impl<T: BeaconChainTypes> NetworkBeaconProcessor<T> {
}
}

pub async fn fetch_engine_blobs_and_publish_with_column(
self: &Arc<Self>,
data_column: KzgVerifiedDataColumn<T::EthSpec>,
block_root: Hash256,
publish_blobs: bool,
) {
let custody_columns = self.network_globals.sampling_columns.clone();
let self_cloned = self.clone();
let publish_fn = move |blobs_or_data_column| {
if publish_blobs {
match blobs_or_data_column {
BlobsOrDataColumns::Blobs(blobs) => {
self_cloned.publish_blobs_gradually(blobs, block_root);
}
BlobsOrDataColumns::DataColumns(columns) => {
self_cloned.publish_data_columns_gradually(columns, block_root);
}
};
}
};

match fetch_and_process_engine_blobs_with_column(
self.chain.clone(),
block_root,
data_column.clone(),
custody_columns,
publish_fn,
)
.instrument(tracing::info_span!(
"",
service = "fetch_engine_blobs",
block_root = format!("{:?}", block_root)
))
.await
{
Ok(Some(availability)) => match availability {
AvailabilityProcessingStatus::Imported(_) => {
debug!(
result = "imported block and custody columns",
%block_root,
"Block components retrieved from EL"
);
self.chain.recompute_head_at_current_slot().await;
}
AvailabilityProcessingStatus::MissingComponents(_, _) => {
debug!(
%block_root,
"Still missing blobs after engine blobs processed successfully"
);
}
},
Ok(None) => {
debug!(
%block_root,
"Fetch blobs completed without import"
);
}
Err(FetchEngineBlobError::BlobProcessingError(BlockError::DuplicateFullyImported(
..,
))) => {
debug!(
%block_root,
"Fetch blobs duplicate import"
);
}
Err(e) => {
error!(
error = ?e,
%block_root,
"Error fetching or processing blobs from EL"
);
}
}
}

/// Attempt to reconstruct all data columns if the following conditions satisfies:
/// - Our custody requirement is all columns
/// - We have >= 50% of columns, but not all columns
Expand Down
Loading