Skip to content

Commit f72df2c

Browse files
committed
First try
1 parent 3d06bc2 commit f72df2c

File tree

3 files changed

+68
-4
lines changed

3 files changed

+68
-4
lines changed

beacon_node/http_api/src/lib.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4446,6 +4446,24 @@ pub fn serve<T: BeaconChainTypes>(
44464446
},
44474447
);
44484448

4449+
// POST lighthouse/database/import_blobs
4450+
let post_lighthouse_database_import_blobs = database_path
4451+
.and(warp::path("import_blobs"))
4452+
.and(warp::path::end())
4453+
.and(warp_utils::json::json())
4454+
.and(task_spawner_filter.clone())
4455+
.and(chain_filter.clone())
4456+
.then(
4457+
|blobs, task_spawner: TaskSpawner<T::EthSpec>, chain: Arc<BeaconChain<T>>| {
4458+
task_spawner.blocking_json_task(Priority::P1, move || {
4459+
match chain.store.import_historical_blobs(blobs) {
4460+
Ok(()) => Ok(()),
4461+
Err(e) => Err(warp_utils::reject::custom_server_error(format!("{e:?}"))),
4462+
}
4463+
})
4464+
},
4465+
);
4466+
44494467
// GET lighthouse/analysis/block_rewards
44504468
let get_lighthouse_block_rewards = warp::path("lighthouse")
44514469
.and(warp::path("analysis"))
@@ -4807,6 +4825,7 @@ pub fn serve<T: BeaconChainTypes>(
48074825
.uor(post_validator_liveness_epoch)
48084826
.uor(post_lighthouse_liveness)
48094827
.uor(post_lighthouse_database_reconstruct)
4828+
.uor(post_lighthouse_database_import_blobs)
48104829
.uor(post_lighthouse_block_rewards)
48114830
.uor(post_lighthouse_ui_validator_metrics)
48124831
.uor(post_lighthouse_ui_validator_info)

beacon_node/store/src/errors.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,6 @@ pub enum Error {
5050
MissingGenesisState,
5151
MissingSnapshot(Slot),
5252
BlockReplayError(BlockReplayError),
53-
AddPayloadLogicError,
54-
InvalidKey,
55-
InvalidBytes,
56-
InconsistentFork(InconsistentFork),
5753
#[cfg(feature = "leveldb")]
5854
LevelDbError(LevelDBError),
5955
#[cfg(feature = "redb")]
@@ -68,6 +64,11 @@ pub enum Error {
6864
state_root: Hash256,
6965
slot: Slot,
7066
},
67+
AddPayloadLogicError,
68+
InvalidKey,
69+
InvalidBytes,
70+
InvalidBlobImport(String),
71+
InconsistentFork(InconsistentFork),
7172
Hdiff(hdiff::Error),
7273
ForwardsIterInvalidColumn(DBColumn),
7374
ForwardsIterGap(DBColumn, Slot, Slot),

beacon_node/store/src/hot_cold_store.rs

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ use types::data_column_sidecar::{ColumnIndex, DataColumnSidecar, DataColumnSidec
4141
use types::*;
4242
use zstd::{Decoder, Encoder};
4343

44+
const HISTORICAL_BLOB_BATCH_SIZE: usize = 1000;
45+
4446
/// On-disk database that stores finalized states efficiently.
4547
///
4648
/// Stores vector fields like the `block_roots` and `state_roots` separately, and only stores
@@ -852,6 +854,48 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
852854
Ok(())
853855
}
854856

857+
/// Import historical blobs.
858+
pub fn import_historical_blobs(
859+
&self,
860+
historical_blobs: Vec<(Hash256, BlobSidecarList<E>)>,
861+
) -> Result<(), Error> {
862+
if historical_blobs.is_empty() {
863+
return Ok(());
864+
}
865+
866+
let mut total_imported = 0;
867+
868+
for chunk in historical_blobs.chunks(HISTORICAL_BLOB_BATCH_SIZE) {
869+
let mut ops = Vec::with_capacity(chunk.len());
870+
871+
for (block_root, blobs) in chunk {
872+
// Verify block exists.
873+
if !self.block_exists(block_root)? {
874+
warn!(
875+
self.log,
876+
"Skipping import of blobs; block root does not exist.";
877+
"block_root" => ?block_root,
878+
"num_blobs" => blobs.len(),
879+
);
880+
continue;
881+
}
882+
883+
self.blobs_as_kv_store_ops(block_root, blobs.clone(), &mut ops);
884+
total_imported += blobs.len();
885+
}
886+
887+
self.blobs_db.do_atomically(ops)?;
888+
}
889+
890+
debug!(
891+
self.log,
892+
"Imported historical blobs.";
893+
"total_imported" => total_imported,
894+
);
895+
896+
Ok(())
897+
}
898+
855899
pub fn blobs_as_kv_store_ops(
856900
&self,
857901
key: &Hash256,

0 commit comments

Comments
 (0)