Skip to content

Commit cda4d20

Browse files
committed
Additional checks and support SSZ
1 parent 5ede901 commit cda4d20

File tree

4 files changed

+137
-25
lines changed

4 files changed

+137
-25
lines changed

beacon_node/http_api/src/lib.rs

+66-9
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,10 @@ use crate::light_client::{get_light_client_bootstrap, get_light_client_updates};
3535
use crate::produce_block::{produce_blinded_block_v2, produce_block_v2, produce_block_v3};
3636
use crate::version::fork_versioned_response;
3737
use beacon_chain::{
38-
attestation_verification::VerifiedAttestation, observed_operations::ObservationOutcome,
39-
validator_monitor::timestamp_now, AttestationError as AttnError, BeaconChain, BeaconChainError,
40-
BeaconChainTypes, WhenSlotSkipped,
38+
attestation_verification::VerifiedAttestation, blob_verification::verify_kzg_for_blob_list,
39+
observed_operations::ObservationOutcome, validator_monitor::timestamp_now,
40+
AttestationError as AttnError, BeaconChain, BeaconChainError, BeaconChainTypes,
41+
WhenSlotSkipped,
4142
};
4243
use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend};
4344
pub use block_id::BlockId;
@@ -62,7 +63,7 @@ pub use publish_blocks::{
6263
use serde::{Deserialize, Serialize};
6364
use slog::{crit, debug, error, info, warn, Logger};
6465
use slot_clock::SlotClock;
65-
use ssz::Encode;
66+
use ssz::{Decode, Encode};
6667
pub use state_id::StateId;
6768
use std::future::Future;
6869
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
@@ -82,9 +83,9 @@ use tokio_stream::{
8283
};
8384
use types::{
8485
fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId,
85-
AttesterSlashing, BeaconStateError, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName,
86-
ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing, RelativeEpoch,
87-
SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
86+
AttesterSlashing, BeaconStateError, BlobSidecarList, CommitteeCache, ConfigAndPreset, Epoch,
87+
EthSpec, ForkName, ForkVersionedResponse, Hash256, ProposerPreparationData, ProposerSlashing,
88+
RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, SignedBlsToExecutionChange,
8889
SignedContributionAndProof, SignedValidatorRegistrationData, SignedVoluntaryExit, Slot,
8990
SyncCommitteeMessage, SyncContributionData,
9091
};
@@ -4399,13 +4400,68 @@ pub fn serve<T: BeaconChainTypes>(
43994400
let post_lighthouse_database_import_blobs = database_path
44004401
.and(warp::path("import_blobs"))
44014402
.and(warp::path::end())
4403+
.and(warp::query::<api_types::ImportBlobsQuery>())
44024404
.and(warp_utils::json::json())
44034405
.and(task_spawner_filter.clone())
44044406
.and(chain_filter.clone())
44054407
.then(
4406-
|blobs, task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
4408+
|query: api_types::ImportBlobsQuery,
4409+
blob_lists: Vec<BlobSidecarList<T::EthSpec>>,
4410+
task_spawner: TaskSpawner<T::EthSpec>,
4411+
chain: Arc<BeaconChain<T>>| {
44074412
task_spawner.blocking_json_task(Priority::P1, move || {
4408-
match chain.store.import_historical_blobs(blobs) {
4413+
if query.verify {
4414+
for blob_list in &blob_lists {
4415+
match verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg) {
4416+
Ok(()) => (),
4417+
Err(e) => {
4418+
return Err(warp_utils::reject::custom_server_error(format!(
4419+
"{e:?}"
4420+
)))
4421+
}
4422+
}
4423+
}
4424+
}
4425+
4426+
match chain.store.import_blobs_batch(blob_lists) {
4427+
Ok(()) => Ok(()),
4428+
Err(e) => Err(warp_utils::reject::custom_server_error(format!("{e:?}"))),
4429+
}
4430+
})
4431+
},
4432+
);
4433+
4434+
// POST lighthouse/database/import_blobs_ssz
4435+
let post_lighthouse_database_import_blobs_ssz = database_path
4436+
.and(warp::path("import_blobs_ssz"))
4437+
.and(warp::path::end())
4438+
.and(warp::query::<api_types::ImportBlobsQuery>())
4439+
.and(warp::body::bytes())
4440+
.and(task_spawner_filter.clone())
4441+
.and(chain_filter.clone())
4442+
.then(
4443+
|query: api_types::ImportBlobsQuery,
4444+
body: Bytes,
4445+
task_spawner: TaskSpawner<T::EthSpec>,
4446+
chain: Arc<BeaconChain<T>>| {
4447+
task_spawner.blocking_json_task(Priority::P1, move || {
4448+
let blob_lists = Vec::<BlobSidecarList<T::EthSpec>>::from_ssz_bytes(&body)
4449+
.map_err(|e| warp_utils::reject::custom_server_error(format!("{e:?}")))?;
4450+
4451+
if query.verify {
4452+
for blob_list in &blob_lists {
4453+
match verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg) {
4454+
Ok(()) => (),
4455+
Err(e) => {
4456+
return Err(warp_utils::reject::custom_server_error(format!(
4457+
"{e:?}"
4458+
)))
4459+
}
4460+
}
4461+
}
4462+
}
4463+
4464+
match chain.store.import_blobs_batch(blob_lists) {
44094465
Ok(()) => Ok(()),
44104466
Err(e) => Err(warp_utils::reject::custom_server_error(format!("{e:?}"))),
44114467
}
@@ -4771,6 +4827,7 @@ pub fn serve<T: BeaconChainTypes>(
47714827
.uor(post_lighthouse_liveness)
47724828
.uor(post_lighthouse_database_reconstruct)
47734829
.uor(post_lighthouse_database_import_blobs)
4830+
.uor(post_lighthouse_database_import_blobs_ssz)
47744831
.uor(post_lighthouse_block_rewards)
47754832
.uor(post_lighthouse_ui_validator_metrics)
47764833
.uor(post_lighthouse_ui_validator_info)

beacon_node/store/src/hot_cold_store.rs

+49-16
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,6 @@ use types::data_column_sidecar::{ColumnIndex, DataColumnSidecar, DataColumnSidec
4343
use types::*;
4444
use zstd::{Decoder, Encoder};
4545

46-
const HISTORICAL_BLOB_BATCH_SIZE: usize = 1000;
47-
4846
/// On-disk database that stores finalized states efficiently.
4947
///
5048
/// Stores vector fields like the `block_roots` and `state_roots` separately, and only stores
@@ -868,39 +866,74 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
868866
Ok(())
869867
}
870868

871-
/// Import historical blobs.
872-
pub fn import_historical_blobs(
869+
/// Import a batch of blobs.
870+
/// Implements the following checks:
871+
/// - Checks that `block_root` is consistent across each `BlobSidecarList`.
872+
/// - Checks that `block_root` exists in the database.
873+
/// - Checks if a `BlobSidecarList` is already stored for that `block_root`.
874+
/// If it is, ensure it matches the `BlobSidecarList` we are attempting to store.
875+
pub fn import_blobs_batch(
873876
&self,
874-
historical_blobs: Vec<(Hash256, BlobSidecarList<E>)>,
877+
historical_blobs: Vec<BlobSidecarList<E>>,
875878
) -> Result<(), Error> {
876879
if historical_blobs.is_empty() {
877880
return Ok(());
878881
}
879882

880883
let mut total_imported = 0;
881884

882-
for chunk in historical_blobs.chunks(HISTORICAL_BLOB_BATCH_SIZE) {
883-
let mut ops = Vec::with_capacity(chunk.len());
885+
let mut ops = vec![];
884886

885-
for (block_root, blobs) in chunk {
886-
// Verify block exists.
887-
if !self.block_exists(block_root)? {
888-
warn!(
887+
for blob_list in historical_blobs {
888+
// Ensure all block_roots in the blob list are the same.
889+
let block_root = {
890+
let first_block_root = blob_list[0].block_root();
891+
if !blob_list
892+
.iter()
893+
.all(|blob| blob.block_root() == first_block_root)
894+
{
895+
return Err(Error::InvalidBlobImport(
896+
"Inconsistent block roots".to_string(),
897+
));
898+
}
899+
first_block_root
900+
};
901+
902+
// Verify block exists.
903+
if !self.block_exists(&block_root)? {
904+
warn!(
905+
self.log,
906+
"Aborting blob import; block root does not exist.";
907+
"block_root" => ?block_root,
908+
"num_blobs" => blob_list.len(),
909+
);
910+
return Err(Error::InvalidBlobImport("Missing block root".to_string()));
911+
}
912+
913+
// Check if a blob_list is already stored for this block root.
914+
if let Some(existing_blob_list) = self.get_blobs(&block_root)? {
915+
if existing_blob_list == blob_list {
916+
debug!(
889917
self.log,
890-
"Skipping import of blobs; block root does not exist.";
918+
"Skipping blob import as identical blob exists";
891919
"block_root" => ?block_root,
892-
"num_blobs" => blobs.len(),
920+
"num_blobs" => blob_list.len(),
893921
);
894922
continue;
895923
}
896924

897-
self.blobs_as_kv_store_ops(block_root, blobs.clone(), &mut ops);
898-
total_imported += blobs.len();
925+
return Err(Error::InvalidBlobImport(format!(
926+
"Conflicting blobs exist for block root {:?}",
927+
block_root
928+
)));
899929
}
900930

901-
self.blobs_db.do_atomically(ops)?;
931+
self.blobs_as_kv_store_ops(&block_root, blob_list.clone(), &mut ops);
932+
total_imported += blob_list.len();
902933
}
903934

935+
self.blobs_db.do_atomically(ops)?;
936+
904937
debug!(
905938
self.log,
906939
"Imported historical blobs.";

common/eth2/src/lighthouse.rs

+17
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use crate::{
1313
},
1414
BeaconNodeHttpClient, DepositData, Error, Eth1Data, Hash256, Slot,
1515
};
16+
use bytes::Bytes;
1617
use proto_array::core::ProtoArray;
1718
use serde::{Deserialize, Serialize};
1819
use ssz::four_byte_option_impl;
@@ -528,6 +529,22 @@ impl BeaconNodeHttpClient {
528529
self.post_with_response(path, &()).await
529530
}
530531

532+
/// `POST lighthouse/database/import_blobs_ssz`
533+
pub async fn post_lighthouse_database_import_blobs_ssz(
534+
&self,
535+
blobs: Bytes,
536+
) -> Result<String, Error> {
537+
let mut path = self.server.full.clone();
538+
539+
path.path_segments_mut()
540+
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
541+
.push("lighthouse")
542+
.push("database")
543+
.push("import_blobs_ssz");
544+
545+
self.post_with_response(path, &blobs).await
546+
}
547+
531548
/*
532549
Analysis endpoints.
533550
*/

common/eth2/src/types.rs

+5
Original file line numberDiff line numberDiff line change
@@ -803,6 +803,11 @@ pub struct LightClientUpdateResponseChunk {
803803
pub payload: Vec<u8>,
804804
}
805805

806+
#[derive(Clone, Serialize, Deserialize)]
807+
pub struct ImportBlobsQuery {
808+
pub verify: bool,
809+
}
810+
806811
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]
807812
pub struct BeaconCommitteeSubscription {
808813
#[serde(with = "serde_utils::quoted_u64")]

0 commit comments

Comments
 (0)