Skip to content

Commit 3c0f7be

Browse files
committed
Update blobs_manager
1 parent 054f0d7 commit 3c0f7be

File tree

15 files changed

+456
-35
lines changed

15 files changed

+456
-35
lines changed

Cargo.lock

Lines changed: 4 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

beacon_node/http_api/src/lib.rs

Lines changed: 69 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ use tokio_stream::{
8686
};
8787
use types::{
8888
fork_versioned_response::EmptyMetadata, Attestation, AttestationData, AttestationShufflingId,
89-
AttesterSlashing, BeaconStateError, BlobSidecarList, ChainSpec, CommitteeCache,
89+
AttesterSlashing, BeaconStateError, BlobSidecar, BlobSidecarList, ChainSpec, CommitteeCache,
9090
ConfigAndPreset, Epoch, EthSpec, ForkName, ForkVersionedResponse, Hash256,
9191
ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof,
9292
SignedBlindedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof,
@@ -4448,6 +4448,8 @@ pub fn serve<T: BeaconChainTypes>(
44484448
},
44494449
);
44504450

4451+
// POST lighthouse/database/verify_blobs
4452+
44514453
// POST lighthouse/database/import_blobs
44524454
let post_lighthouse_database_import_blobs = database_path
44534455
.and(warp::path("import_blobs"))
@@ -4462,7 +4464,7 @@ pub fn serve<T: BeaconChainTypes>(
44624464
task_spawner: TaskSpawner<T::EthSpec>,
44634465
chain: Arc<BeaconChain<T>>| {
44644466
task_spawner.blocking_json_task(Priority::P1, move || {
4465-
if query.verify {
4467+
if query.verify == Some(true) {
44664468
for blob_list in &blob_lists {
44674469
match verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg) {
44684470
Ok(()) => (),
@@ -4497,11 +4499,30 @@ pub fn serve<T: BeaconChainTypes>(
44974499
task_spawner: TaskSpawner<T::EthSpec>,
44984500
chain: Arc<BeaconChain<T>>| {
44994501
task_spawner.blocking_json_task(Priority::P1, move || {
4500-
let blob_lists = Vec::<BlobSidecarList<T::EthSpec>>::from_ssz_bytes(&body)
4502+
let blob_lists = Vec::<Vec<Arc<BlobSidecar<T::EthSpec>>>>::from_ssz_bytes(
4503+
&body,
4504+
)
4505+
.map_err(|e| warp_utils::reject::custom_server_error(format!("{e:?}")))?;
4506+
4507+
if blob_lists.is_empty() {
4508+
return Err(warp_utils::reject::custom_server_error(
4509+
"Blob list must not be empty".to_string(),
4510+
));
4511+
}
4512+
4513+
// Build `BlobSidecarList`s from the `Vec<BlobSidecar>`s.
4514+
let blob_sidecar_lists: Vec<BlobSidecarList<T::EthSpec>> = blob_lists
4515+
.into_iter()
4516+
.map(|blob_sidecars| {
4517+
let max_blobs_at_epoch =
4518+
chain.spec.max_blobs_per_block(blob_sidecars[0].epoch()) as usize;
4519+
BlobSidecarList::new(blob_sidecars, max_blobs_at_epoch)
4520+
})
4521+
.collect::<Result<Vec<_>, _>>()
45014522
.map_err(|e| warp_utils::reject::custom_server_error(format!("{e:?}")))?;
45024523

4503-
if query.verify {
4504-
for blob_list in &blob_lists {
4524+
if query.verify == Some(true) {
4525+
for blob_list in &blob_sidecar_lists {
45054526
match verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg) {
45064527
Ok(()) => (),
45074528
Err(e) => {
@@ -4513,14 +4534,55 @@ pub fn serve<T: BeaconChainTypes>(
45134534
}
45144535
}
45154536

4516-
match chain.store.import_blobs_batch(blob_lists) {
4537+
match chain.store.import_blobs_batch(blob_sidecar_lists) {
45174538
Ok(()) => Ok(()),
45184539
Err(e) => Err(warp_utils::reject::custom_server_error(format!("{e:?}"))),
45194540
}
45204541
})
45214542
},
45224543
);
45234544

4545+
// GET lighthouse/database/verify_blobs
4546+
let get_lighthouse_database_verify_blobs = database_path
4547+
.and(warp::path("verify_blobs"))
4548+
.and(warp::path::end())
4549+
.and(warp::query::<api_types::VerifyBlobsQuery>())
4550+
.and(task_spawner_filter.clone())
4551+
.and(chain_filter.clone())
4552+
.then(
4553+
|query: api_types::VerifyBlobsQuery,
4554+
task_spawner: TaskSpawner<T::EthSpec>,
4555+
chain: Arc<BeaconChain<T>>| {
4556+
task_spawner.blocking_json_task(Priority::P1, move || {
4557+
let mut results = Vec::new();
4558+
for slot in query.start_slot.as_u64()..=query.end_slot.as_u64() {
4559+
if let Ok((root, _, _)) = BlockId::from_slot(Slot::from(slot)).root(&chain)
4560+
{
4561+
if let Ok(blob_list_res) = chain.store.get_blobs(&root) {
4562+
if let Some(blob_list) = blob_list_res.blobs() {
4563+
if let Err(e) =
4564+
verify_kzg_for_blob_list(blob_list.iter(), &chain.kzg)
4565+
{
4566+
results.push(format!(
4567+
"slot: {slot}, block_root: {root:?}, error: {e:?}"
4568+
));
4569+
}
4570+
}
4571+
}
4572+
}
4573+
}
4574+
4575+
if results.is_empty() {
4576+
Ok(api_types::GenericResponse::from(
4577+
"All blobs verified successfully".to_string(),
4578+
))
4579+
} else {
4580+
Ok(api_types::GenericResponse::from(results.join("\n")))
4581+
}
4582+
})
4583+
},
4584+
);
4585+
45244586
// GET lighthouse/analysis/block_rewards
45254587
let get_lighthouse_block_rewards = warp::path("lighthouse")
45264588
.and(warp::path("analysis"))
@@ -4820,6 +4882,7 @@ pub fn serve<T: BeaconChainTypes>(
48204882
.uor(get_lighthouse_eth1_deposit_cache)
48214883
.uor(get_lighthouse_staking)
48224884
.uor(get_lighthouse_database_info)
4885+
.uor(get_lighthouse_database_verify_blobs)
48234886
.uor(get_lighthouse_block_rewards)
48244887
.uor(get_lighthouse_attestation_performance)
48254888
.uor(

beacon_node/store/src/hot_cold_store.rs

Lines changed: 42 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -860,23 +860,23 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
860860
/// If it is, ensure it matches the `BlobSidecarList` we are attempting to store.
861861
pub fn import_blobs_batch(
862862
&self,
863-
historical_blobs: Vec<BlobSidecarList<E>>,
863+
historical_blob_sidecars: Vec<BlobSidecarList<E>>,
864864
) -> Result<(), Error> {
865-
if historical_blobs.is_empty() {
865+
if historical_blob_sidecars.is_empty() {
866866
return Ok(());
867867
}
868868

869869
let mut total_imported = 0;
870870

871871
let mut ops = vec![];
872872

873-
for blob_list in historical_blobs {
873+
for blob_sidecar_list in historical_blob_sidecars {
874874
// Ensure all block_roots in the blob list are the same.
875875
let block_root = {
876-
let first_block_root = blob_list[0].block_root();
877-
if !blob_list
876+
let first_block_root = blob_sidecar_list[0].block_root();
877+
if !blob_sidecar_list
878878
.iter()
879-
.all(|blob| blob.block_root() == first_block_root)
879+
.all(|blob_sidecar| blob_sidecar.block_root() == first_block_root)
880880
{
881881
return Err(Error::InvalidBlobImport(
882882
"Inconsistent block roots".to_string(),
@@ -885,37 +885,54 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
885885
first_block_root
886886
};
887887

888-
// Verify block exists.
888+
// Check block is stored for this block_root.
889889
if !self.block_exists(&block_root)? {
890890
warn!(
891891
self.log,
892-
"Aborting blob import; block root does not exist.";
892+
"Aborting blob import; block does not exist.";
893893
"block_root" => ?block_root,
894-
"num_blobs" => blob_list.len(),
894+
"num_blob_sidecars" => blob_sidecar_list.len(),
895895
);
896-
return Err(Error::InvalidBlobImport("Missing block root".to_string()));
896+
return Err(Error::InvalidBlobImport("Missing block".to_string()));
897897
}
898898

899-
// Check if a blob_list is already stored for this block root.
900-
if let Some(existing_blob_list) = self.get_blobs(&block_root)? {
901-
if existing_blob_list == blob_list {
902-
debug!(
899+
// Check if a `blob_sidecar_list` is already stored for this block root.
900+
match self.get_blobs(&block_root) {
901+
Ok(BlobSidecarListFromRoot::Blobs(existing_blob_sidecar_list)) => {
902+
// If blobs already exist, only proceed if they match exactly.
903+
if existing_blob_sidecar_list == blob_sidecar_list {
904+
debug!(
905+
self.log,
906+
"Skipping blob sidecar import as identical blob exists";
907+
"block_root" => ?block_root,
908+
"num_blob_sidecars" => blob_sidecar_list.len(),
909+
);
910+
continue;
911+
} else {
912+
return Err(Error::InvalidBlobImport(format!(
913+
"Conflicting blobs exist for block root {:?}",
914+
block_root
915+
)));
916+
}
917+
}
918+
Ok(BlobSidecarListFromRoot::NoRoot) => {
919+
// This block has no existing blobs: proceed with import.
920+
self.blobs_as_kv_store_ops(&block_root, blob_sidecar_list.clone(), &mut ops);
921+
total_imported += blob_sidecar_list.len();
922+
}
923+
Ok(BlobSidecarListFromRoot::NoBlobs) => {
924+
// This block should not have any blobs: reject the import.
925+
warn!(
903926
self.log,
904-
"Skipping blob import as identical blob exists";
927+
"Aborting blob import; blobs should not exist for this block_root.";
905928
"block_root" => ?block_root,
906-
"num_blobs" => blob_list.len(),
907929
);
908-
continue;
930+
return Err(Error::InvalidBlobImport(
931+
"No blobs should exist for this block_root".to_string(),
932+
));
909933
}
910-
911-
return Err(Error::InvalidBlobImport(format!(
912-
"Conflicting blobs exist for block root {:?}",
913-
block_root
914-
)));
934+
Err(e) => return Err(Error::InvalidBlobImport(format!("{e:?}"))),
915935
}
916-
917-
self.blobs_as_kv_store_ops(&block_root, blob_list.clone(), &mut ops);
918-
total_imported += blob_list.len();
919936
}
920937

921938
self.blobs_db.do_atomically(ops)?;

common/eth2/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ edition = { workspace = true }
66
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
77

88
[dependencies]
9+
bytes = { workspace = true }
910
derivative = { workspace = true }
1011
either = { workspace = true }
1112
eth2_keystore = { workspace = true }

common/eth2/src/lib.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,27 @@ impl BeaconNodeHttpClient {
470470
ok_or_error(response).await
471471
}
472472

473+
/// Generic POST function supporting arbitrary responses and timeouts without setting Consensus Version.
474+
async fn post_generic_with_ssz_body<T: Into<Body>, U: IntoUrl>(
475+
&self,
476+
url: U,
477+
body: T,
478+
timeout: Option<Duration>,
479+
) -> Result<Response, Error> {
480+
let mut builder = self.client.post(url);
481+
if let Some(timeout) = timeout {
482+
builder = builder.timeout(timeout);
483+
}
484+
let mut headers = HeaderMap::new();
485+
486+
headers.insert(
487+
"Content-Type",
488+
HeaderValue::from_static("application/octet-stream"),
489+
);
490+
let response = builder.headers(headers).body(body).send().await?;
491+
ok_or_error(response).await
492+
}
493+
473494
/// `GET beacon/genesis`
474495
///
475496
/// ## Errors

common/eth2/src/lighthouse.rs

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ pub use block_packing_efficiency::{
2929
};
3030
pub use block_rewards::{AttestationRewards, BlockReward, BlockRewardMeta, BlockRewardsQuery};
3131
pub use lighthouse_network::{types::SyncState, PeerInfo};
32+
use reqwest::Response;
3233
pub use standard_block_rewards::StandardBlockReward;
3334
pub use sync_committee_rewards::SyncCommitteeReward;
3435

@@ -411,16 +412,44 @@ impl BeaconNodeHttpClient {
411412
pub async fn post_lighthouse_database_import_blobs_ssz(
412413
&self,
413414
blobs: Bytes,
414-
) -> Result<String, Error> {
415+
skip_verification: bool,
416+
) -> Result<Response, Error> {
415417
let mut path = self.server.full.clone();
418+
let verify = !skip_verification;
416419

417420
path.path_segments_mut()
418421
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
419422
.push("lighthouse")
420423
.push("database")
421424
.push("import_blobs_ssz");
422425

423-
self.post_with_response(path, &blobs).await
426+
if skip_verification {
427+
path.query_pairs_mut()
428+
.append_pair("verify", &verify.to_string());
429+
}
430+
431+
self.post_generic_with_ssz_body(path, blobs, None).await
432+
}
433+
434+
/// `POST lighthouse/database/verify_blobs`
435+
pub async fn get_lighthouse_database_verify_blobs(
436+
&self,
437+
start_slot: Slot,
438+
end_slot: Slot,
439+
) -> Result<String, Error> {
440+
let mut path = self.server.full.clone();
441+
442+
path.path_segments_mut()
443+
.map_err(|()| Error::InvalidUrl(self.server.clone()))?
444+
.push("lighthouse")
445+
.push("database")
446+
.push("verify_blobs");
447+
448+
path.query_pairs_mut()
449+
.append_pair("start_slot", &start_slot.to_string())
450+
.append_pair("end_slot", &end_slot.to_string());
451+
452+
self.get(path).await
424453
}
425454

426455
/*

common/eth2/src/types.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -813,7 +813,13 @@ pub struct LightClientUpdateResponseChunk {
813813

814814
#[derive(Clone, Serialize, Deserialize)]
815815
pub struct ImportBlobsQuery {
816-
pub verify: bool,
816+
pub verify: Option<bool>,
817+
}
818+
819+
#[derive(Debug, Deserialize)]
820+
pub struct VerifyBlobsQuery {
821+
pub start_slot: Slot,
822+
pub end_slot: Slot,
817823
}
818824

819825
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Hash)]

database_manager/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ beacon_node = { workspace = true }
99
clap = { workspace = true }
1010
clap_utils = { workspace = true }
1111
environment = { workspace = true }
12+
eth2 = { workspace = true }
13+
ethereum_ssz = { workspace = true }
1214
hex = { workspace = true }
15+
sensitive_url = { workspace = true }
1316
serde = { workspace = true }
1417
slog = { workspace = true }
1518
store = { workspace = true }

0 commit comments

Comments
 (0)