Skip to content

Commit 054f0d7

Browse files
committed
Additional checks and support SSZ
1 parent f72df2c commit 054f0d7

File tree

4 files changed

+140
-27
lines changed

4 files changed

+140
-27
lines changed

beacon_node/http_api/src/lib.rs

Lines changed: 69 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,10 @@ use crate::light_client::{get_light_client_bootstrap, get_light_client_updates};
3434
use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3};
3535
use crate::version::fork_versioned_response;
3636
use beacon_chain::{
37-
attestation_verification::VerifiedAttestation, observed_operations::ObservationOutcome,
38-
validator_monitor::timestamp_now, AttestationError as AttnError, BeaconChain, BeaconChainError,
39-
BeaconChainTypes, WhenSlotSkipped,
37+
attestation_verification::VerifiedAttestation, blob_verification::verify_kzg_for_blob_list,
38+
observed_operations::ObservationOutcome, validator_monitor::timestamp_now,
39+
AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes,
40+
WhenSlotSkipped,
4041
};
4142
use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend};
4243
pub use block_id::BlockId;
@@ -65,7 +66,7 @@ use serde::{Deserialize, Serialize};
6566
use serde_json::Value;
6667
use slog::{crit, debug, error, info, warn, Logger};
6768
use slot_clock::SlotClock;
68-
use ssz::Encode;
69+
use ssz::{Decode, Encode};
6970
pub use state_id::StateId;
7071
use std::future::Future;
7172
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
@@ -85,11 +86,12 @@ use tokio_stream::{
8586
};
8687
use types::{
8788
fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId,
88-
AttesterSlashing, BeaconStateError, ChainSpec, CommitteeCache, ConfigAndPreset, Epoch, EthSpec,
89-
ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing,
90-
RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
91-
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot,
92-
SyncCommitteeMessage, SyncContributionData,
89+
AttesterSlashing, BeaconStateError, BlobSidecarList, ChainSpec, CommitteeCache,
90+
ConfigAndPreset, Epoch, EthSpec, ForkName, ForkVersionedResponse, Hash256,
91+
ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof,
92+
SignedBlindedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof,
93+
SignedValidatorRegistrationData, SignedVoluntaryExit, Slot, SyncCommitteeMessage,
94+
SyncContributionData,
9395
};
9496
use validator::pubkey_to_validator_index;
9597
use version::{
@@ -4450,13 +4452,68 @@ pub fn serve<T: BeaconChainTypes>(
44504452
let post_lighthouse_database_import_blobs = database_path
44514453
.and(warp::path("import_blobs"))
44524454
.and(warp::path::end())
4455+
.and(warp::query::<api_types::ImportBlobsQuery>())
44534456
.and(warp_utils::json::json())
44544457
.and(task_spawner_filter.clone())
44554458
.and(chain_filter.clone())
44564459
.then(
4457-
|blobs, task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
4460+
|query: api_types::ImportBlobsQuery,
4461+
blob_lists: Vec<BlobSidecarList<T::EthSpec>>,
4462+
task_spawner: TaskSpawner<T::EthSpec>,
4463+
chain: Arc<BeaconChain<T>>| {
44584464
task_spawner.blocking_json_task(Priority::P1, move || {
4459-
match chain.store.import_historical_blobs(blobs) {
4465+
if query.verify {
4466+
for blob_list in &blob_lists {
4467+
match verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg) {
4468+
Ok(()) => (),
4469+
Err(e) => {
4470+
return Err(warp_utils::reject::custom_server_error(format!(
4471+
"{e:?}"
4472+
)))
4473+
}
4474+
}
4475+
}
4476+
}
4477+
4478+
match chain.store.import_blobs_batch(blob_lists) {
4479+
Ok(()) => Ok(()),
4480+
Err(e) => Err(warp_utils::reject::custom_server_error(format!("{e:?}"))),
4481+
}
4482+
})
4483+
},
4484+
);
4485+
4486+
// POST lighthouse/database/import_blobs_ssz
4487+
let post_lighthouse_database_import_blobs_ssz = database_path
4488+
.and(warp::path("import_blobs_ssz"))
4489+
.and(warp::path::end())
4490+
.and(warp::query::<api_types::ImportBlobsQuery>())
4491+
.and(warp::body::bytes())
4492+
.and(task_spawner_filter.clone())
4493+
.and(chain_filter.clone())
4494+
.then(
4495+
|query: api_types::ImportBlobsQuery,
4496+
body: Bytes,
4497+
task_spawner: TaskSpawner<T::EthSpec>,
4498+
chain: Arc<BeaconChain<T>>| {
4499+
task_spawner.blocking_json_task(Priority::P1, move || {
4500+
let blob_lists = Vec::<BlobSidecarList<T::EthSpec>>::from_ssz_bytes(&body)
4501+
.map_err(|e| warp_utils::reject::custom_server_error(format!("{e:?}")))?;
4502+
4503+
if query.verify {
4504+
for blob_list in &blob_lists {
4505+
match verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg) {
4506+
Ok(()) => (),
4507+
Err(e) => {
4508+
return Err(warp_utils::reject::custom_server_error(format!(
4509+
"{e:?}"
4510+
)))
4511+
}
4512+
}
4513+
}
4514+
}
4515+
4516+
match chain.store.import_blobs_batch(blob_lists) {
44604517
Ok(()) => Ok(()),
44614518
Err(e) => Err(warp_utils::reject::custom_server_error(format!("{e:?}"))),
44624519
}
@@ -4826,6 +4883,7 @@ pub fn serve<T: BeaconChainTypes>(
48264883
.uor(post_lighthouse_liveness)
48274884
.uor(post_lighthouse_database_reconstruct)
48284885
.uor(post_lighthouse_database_import_blobs)
4886+
.uor(post_lighthouse_database_import_blobs_ssz)
48294887
.uor(post_lighthouse_block_rewards)
48304888
.uor(post_lighthouse_ui_validator_metrics)
48314889
.uor(post_lighthouse_ui_validator_info)

beacon_node/store/src/hot_cold_store.rs

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,6 @@ use types::data_column_sidecar::{ColumnIndex, DataColumnSidecar, DataColumnSidec
4141
use types::*;
4242
use zstd::{Decoder, Encoder};
4343

44-
const HISTORICAL_BLOB_BATCH_SIZE: usize = 1000;
45-
4644
/// On-disk database that stores finalized states efficiently.
4745
///
4846
/// Stores vector fields like the `block_roots` and `state_roots` separately, and only stores
@@ -854,39 +852,74 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
854852
Ok(())
855853
}
856854

857-
/// Import historical blobs.
858-
pub fn import_historical_blobs(
855+
/// Import a batch of blobs.
856+
/// Implements the following checks:
857+
/// - Checks that `block_root` is consistent across each `BlobSidecarList`.
858+
/// - Checks that `block_root` exists in the database.
859+
/// - Checks if a `BlobSidecarList` is already stored for that `block_root`.
860+
/// If it is, ensure it matches the `BlobSidecarList` we are attempting to store.
861+
pub fn import_blobs_batch(
859862
&self,
860-
historical_blobs: Vec<(Hash256, BlobSidecarList<E>)>,
863+
historical_blobs: Vec<BlobSidecarList<E>>,
861864
) -> Result<(), Error> {
862865
if historical_blobs.is_empty() {
863866
return Ok(());
864867
}
865868

866869
let mut total_imported = 0;
867870

868-
for chunk in historical_blobs.chunks(HISTORICAL_BLOB_BATCH_SIZE) {
869-
let mut ops = Vec::with_capacity(chunk.len());
871+
let mut ops = vec![];
870872

871-
for (block_root, blobs) in chunk {
872-
// Verify block exists.
873-
if !self.block_exists(block_root)? {
874-
warn!(
873+
for blob_list in historical_blobs {
874+
// Ensure all block_roots in the blob list are the same.
875+
let block_root = {
876+
let first_block_root = blob_list[0].block_root();
877+
if !blob_list
878+
.iter()
879+
.all(|blob| blob.block_root() == first_block_root)
880+
{
881+
return Err(Error::InvalidBlobImport(
882+
"Inconsistent block roots".to_string(),
883+
));
884+
}
885+
first_block_root
886+
};
887+
888+
// Verify block exists.
889+
if !self.block_exists(&block_root)? {
890+
warn!(
891+
self.log,
892+
"Aborting blob import; block root does not exist.";
893+
"block_root" => ?block_root,
894+
"num_blobs" => blob_list.len(),
895+
);
896+
return Err(Error::InvalidBlobImport("Missing block root".to_string()));
897+
}
898+
899+
// Check if a blob_list is already stored for this block root.
900+
if let Some(existing_blob_list) = self.get_blobs(&block_root)? {
901+
if existing_blob_list == blob_list {
902+
debug!(
875903
self.log,
876-
"Skipping import of blobs; block root does not exist.";
904+
"Skipping blob import as identical blob exists";
877905
"block_root" => ?block_root,
878-
"num_blobs" => blobs.len(),
906+
"num_blobs" => blob_list.len(),
879907
);
880908
continue;
881909
}
882910

883-
self.blobs_as_kv_store_ops(block_root, blobs.clone(), &mut ops);
884-
total_imported += blobs.len();
911+
return Err(Error::InvalidBlobImport(format!(
912+
"Conflicting blobs exist for block root {:?}",
913+
block_root
914+
)));
885915
}
886916

887-
self.blobs_db.do_atomically(ops)?;
917+
self.blobs_as_kv_store_ops(&block_root, blob_list.clone(), &mut ops);
918+
total_imported += blob_list.len();
888919
}
889920

921+
self.blobs_db.do_atomically(ops)?;
922+
890923
debug!(
891924
self.log,
892925
"Imported historical blobs.";

common/eth2/src/lighthouse.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use crate::{
1313
},
1414
BeaconNodeHttpClient, DepositData, Error, Eth1Data, Hash256, Slot,
1515
};
16+
use bytes::Bytes;
1617
use proto_array::core::ProtoArray;
1718
use serde::{Deserialize, Serialize};
1819
use ssz::four_byte_option_impl;
@@ -406,6 +407,22 @@ impl BeaconNodeHttpClient {
406407
self.post_with_response(path, &()).await
407408
}
408409

410+
/// `POST lighthouse/database/import_blobs_ssz`
411+
pub async fn post_lighthouse_database_import_blobs_ssz(
412+
&self,
413+
blobs: Bytes,
414+
) -> Result<String, Error> {
415+
let mut path = self.server.full.clone();
416+
417+
path.path_segments_mut()
418+
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
419+
.push("lighthouse")
420+
.push("database")
421+
.push("import_blobs_ssz");
422+
423+
self.post_with_response(path, &blobs).await
424+
}
425+
409426
/*
410427
Analysis endpoints.
411428
*/

common/eth2/src/types.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -811,6 +811,11 @@ pub struct LightClientUpdateResponseChunk {
811811
pub payload: Vec<u8>,
812812
}
813813

814+
#[derive(Clone, Serialize, Deserialize)]
815+
pub struct ImportBlobsQuery {
816+
pub verify: bool,
817+
}
818+
814819
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]
815820
pub struct BeaconCommitteeSubscription {
816821
#[serde(with = "serde_utils::quoted_u64")]

0 commit comments

Comments
 (0)