Skip to content

Allow importing of historical blobs via HTTP API #6656

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: unstable
Choose a base branch
from
Open
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

158 changes: 149 additions & 9 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@ use crate::light_client::{get_light_client_bootstrap, get_light_client_updates};
use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3};
use crate::version::fork_versioned_response;
use beacon_chain::{
attestation_verification::VerifiedAttestation, observed_operations::ObservationOutcome,
validator_monitor::timestamp_now, AttestationError as AttnError, BeaconChain, BeaconChainError,
BeaconChainTypes, WhenSlotSkipped,
attestation_verification::VerifiedAttestation, blob_verification::verify_kzg_for_blob_list,
observed_operations::ObservationOutcome, validator_monitor::timestamp_now,
AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes,
WhenSlotSkipped,
};
use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend};
pub use block_id::BlockId;
Expand Down Expand Up @@ -66,7 +67,7 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;
use slog::{crit, debug, error, info, warn, Logger};
use slot_clock::SlotClock;
use ssz::Encode;
use ssz::{Decode, Encode};
pub use state_id::StateId;
use std::future::Future;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
Expand All @@ -86,11 +87,12 @@ use tokio_stream::{
};
use types::{
fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId,
AttesterSlashing, BeaconStateError, ChainSpec, CommitteeCache, ConfigAndPreset, Epoch, EthSpec,
ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing,
RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot,
SyncCommitteeMessage, SyncContributionData,
AttesterSlashing, BeaconStateError, BlobSidecar, BlobSidecarList, ChainSpec, CommitteeCache,
ConfigAndPreset, Epoch, EthSpec, ForkName, ForkVersionedResponse, Hash256,
ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof,
SignedBlindedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof,
SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncCommitteeMessage,
SyncContributionData,
};
use validator::pubkey_to_validator_index;
use version::{
Expand Down Expand Up @@ -4422,6 +4424,141 @@ pub fn serve<T: BeaconChainTypes>(
},
);

// POST lighthouse/database/verify_blobs

// POST lighthouse/database/import_blobs
let post_lighthouse_database_import_blobs = database_path
.and(warp::path("import_blobs"))
.and(warp::path::end())
.and(warp::query::<api_types::ImportBlobsQuery>())
.and(warp_utils::json::json())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
|query: api_types::ImportBlobsQuery,
blob_lists: Vec<BlobSidecarList<T::EthSpec>>,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
if query.verify == Some(true) {
for blob_list in &blob_lists {
match verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For completeness we should also check verify_blob_sidecar_inclusion_proof for each blob.

Ok(()) => (),
Err(e) => {
return Err(warp_utils::reject::custom_server_error(format!(
"{e:?}"
)))
}
}
}
}

match chain.store.import_blobs_batch(blob_lists) {
Ok(()) => Ok(()),
Err(e) => Err(warp_utils::reject::custom_server_error(format!("{e:?}"))),
}
})
},
);

// POST lighthouse/database/import_blobs_ssz
let post_lighthouse_database_import_blobs_ssz = database_path
.and(warp::path("import_blobs_ssz"))
.and(warp::path::end())
.and(warp::query::<api_types::ImportBlobsQuery>())
.and(warp::body::bytes())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
|query: api_types::ImportBlobsQuery,
body: Bytes,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let blob_lists = Vec::<Vec<Arc<BlobSidecar<T::EthSpec>>>>::from_ssz_bytes(
&body,
)
.map_err(|e| warp_utils::reject::custom_server_error(format!("{e:?}")))?;

if blob_lists.is_empty() {
return Err(warp_utils::reject::custom_server_error(
"Blob list must not be empty".to_string(),
));
}

// Build `BlobSidecarList`s from the `Vec<BlobSidecar>`s.
let blob_sidecar_lists: Vec<BlobSidecarList<T::EthSpec>> = blob_lists
.into_iter()
.map(|blob_sidecars| {
let max_blobs_at_epoch =
chain.spec.max_blobs_per_block(blob_sidecars[0].epoch()) as usize;
BlobSidecarList::new(blob_sidecars, max_blobs_at_epoch)
})
.collect::<Result<Vec<_>, _>>()
.map_err(|e| warp_utils::reject::custom_server_error(format!("{e:?}")))?;

if query.verify == Some(true) {
for blob_list in &blob_sidecar_lists {
match verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg) {
Ok(()) => (),
Err(e) => {
return Err(warp_utils::reject::custom_server_error(format!(
"{e:?}"
)))
}
}
}
}

match chain.store.import_blobs_batch(blob_sidecar_lists) {
Ok(()) => Ok(()),
Err(e) => Err(warp_utils::reject::custom_server_error(format!("{e:?}"))),
}
})
},
);

// GET lighthouse/database/verify_blobs
let get_lighthouse_database_verify_blobs = database_path
.and(warp::path("verify_blobs"))
.and(warp::path::end())
.and(warp::query::<api_types::VerifyBlobsQuery>())
.and(task_spawner_filter.clone())
.and(chain_filter.clone())
.then(
|query: api_types::VerifyBlobsQuery,
task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>| {
task_spawner.blocking_json_task(Priority::P1, move || {
let mut results = Vec::new();
for slot in query.start_slot.as_u64()..=query.end_slot.as_u64() {
if let Ok((root, _, _)) = BlockId::from_slot(Slot::from(slot)).root(&chain)
{
if let Ok(blob_list_res) = chain.store.get_blobs(&root) {
if let Some(blob_list) = blob_list_res.blobs() {
if let Err(e) =
verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg)
{
results.push(format!(
"slot: {slot}, block_root: {root:?}, error: {e:?}"
));
}
}
}
}
}

if results.is_empty() {
Ok(api_types::GenericResponse::from(
"All blobs verified successfully".to_string(),
))
} else {
Ok(api_types::GenericResponse::from(results.join("\n")))
}
})
},
);

// GET lighthouse/analysis/block_rewards
let get_lighthouse_block_rewards = warp::path("lighthouse")
.and(warp::path("analysis"))
Expand Down Expand Up @@ -4721,6 +4858,7 @@ pub fn serve<T: BeaconChainTypes>(
.uor(get_lighthouse_eth1_deposit_cache)
.uor(get_lighthouse_staking)
.uor(get_lighthouse_database_info)
.uor(get_lighthouse_database_verify_blobs)
.uor(get_lighthouse_block_rewards)
.uor(get_lighthouse_attestation_performance)
.uor(
Expand Down Expand Up @@ -4783,6 +4921,8 @@ pub fn serve<T: BeaconChainTypes>(
.uor(post_validator_liveness_epoch)
.uor(post_lighthouse_liveness)
.uor(post_lighthouse_database_reconstruct)
.uor(post_lighthouse_database_import_blobs)
.uor(post_lighthouse_database_import_blobs_ssz)
.uor(post_lighthouse_block_rewards)
.uor(post_lighthouse_ui_validator_metrics)
.uor(post_lighthouse_ui_validator_info)
Expand Down
9 changes: 5 additions & 4 deletions beacon_node/store/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,6 @@ pub enum Error {
MissingGenesisState,
MissingSnapshot(Slot),
BlockReplayError(BlockReplayError),
AddPayloadLogicError,
InvalidKey,
InvalidBytes,
InconsistentFork(InconsistentFork),
#[cfg(feature = "leveldb")]
LevelDbError(LevelDBError),
#[cfg(feature = "redb")]
Expand All @@ -68,6 +64,11 @@ pub enum Error {
state_root: Hash256,
slot: Slot,
},
AddPayloadLogicError,
InvalidKey,
InvalidBytes,
InvalidBlobImport(String),
InconsistentFork(InconsistentFork),
Hdiff(hdiff::Error),
ForwardsIterInvalidColumn(DBColumn),
ForwardsIterGap(DBColumn, Slot, Slot),
Expand Down
94 changes: 94 additions & 0 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -852,6 +852,100 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
Ok(())
}

/// Import a batch of blobs.
/// Implements the following checks:
/// - Checks that `block_root` is consistent across each `BlobSidecarList`.
/// - Checks that `block_root` exists in the database.
/// - Checks if a `BlobSidecarList` is already stored for that `block_root`.
/// If it is, ensure it matches the `BlobSidecarList` we are attempting to store.
pub fn import_blobs_batch(
&self,
historical_blob_sidecars: Vec<BlobSidecarList<E>>,
) -> Result<(), Error> {
if historical_blob_sidecars.is_empty() {
return Ok(());
}

let mut total_imported = 0;

let mut ops = vec![];

for blob_sidecar_list in historical_blob_sidecars {
// Ensure all block_roots in the blob list are the same.
let block_root = {
let first_block_root = blob_sidecar_list[0].block_root();
if !blob_sidecar_list
.iter()
.all(|blob_sidecar| blob_sidecar.block_root() == first_block_root)
{
return Err(Error::InvalidBlobImport(
"Inconsistent block roots".to_string(),
));
}
first_block_root
};

// Check block is stored for this block_root.
if !self.block_exists(&block_root)? {
warn!(
self.log,
"Aborting blob import; block does not exist.";
"block_root" => ?block_root,
"num_blob_sidecars" => blob_sidecar_list.len(),
);
return Err(Error::InvalidBlobImport("Missing block".to_string()));
}

// Check if a `blob_sidecar_list` is already stored for this block root.
match self.get_blobs(&block_root) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets try adding a log with the time that this step takes

Ok(BlobSidecarListFromRoot::Blobs(existing_blob_sidecar_list)) => {
// If blobs already exist, only proceed if they match exactly.
if existing_blob_sidecar_list == blob_sidecar_list {
debug!(
self.log,
"Skipping blob sidecar import as identical blob exists";
"block_root" => ?block_root,
"num_blob_sidecars" => blob_sidecar_list.len(),
);
continue;
} else {
return Err(Error::InvalidBlobImport(format!(
"Conflicting blobs exist for block root {:?}",
block_root
)));
}
}
Ok(BlobSidecarListFromRoot::NoRoot) => {
// This block has no existing blobs: proceed with import.
self.blobs_as_kv_store_ops(&block_root, blob_sidecar_list.clone(), &mut ops);
total_imported += blob_sidecar_list.len();
}
Ok(BlobSidecarListFromRoot::NoBlobs) => {
// This block should not have any blobs: reject the import.
warn!(
self.log,
"Aborting blob import; blobs should not exist for this block_root.";
"block_root" => ?block_root,
);
return Err(Error::InvalidBlobImport(
"No blobs should exist for this block_root".to_string(),
));
}
Err(e) => return Err(Error::InvalidBlobImport(format!("{e:?}"))),
}
}

self.blobs_db.do_atomically(ops)?;

debug!(
self.log,
"Imported historical blobs.";
"total_imported" => total_imported,
);

Ok(())
}

pub fn blobs_as_kv_store_ops(
&self,
key: &Hash256,
Expand Down
1 change: 1 addition & 0 deletions common/eth2/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = { workspace = true }
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
bytes = { workspace = true }
derivative = { workspace = true }
either = { workspace = true }
eth2_keystore = { workspace = true }
Expand Down
21 changes: 21 additions & 0 deletions common/eth2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,27 @@ impl BeaconNodeHttpClient {
ok_or_error(response).await
}

/// Generic POST function supporting arbitrary responses and timeouts without setting Consensus Version.
async fn post_generic_with_ssz_body<T: Into<Body>, U: IntoUrl>(
&self,
url: U,
body: T,
timeout: Option<Duration>,
) -> Result<Response, Error> {
let mut builder = self.client.post(url);
if let Some(timeout) = timeout {
builder = builder.timeout(timeout);
}
let mut headers = HeaderMap::new();

headers.insert(
"Content-Type",
HeaderValue::from_static("application/octet-stream"),
);
let response = builder.headers(headers).body(body).send().await?;
ok_or_error(response).await
}

/// `GET beacon/genesis`
///
/// ## Errors
Expand Down
Loading
Loading