diff --git a/Cargo.lock b/Cargo.lock index b98e096718a..f93b158c188 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2085,6 +2085,7 @@ dependencies = [ "clap", "clap_utils", "environment", + "ethereum_ssz", "hex", "serde", "store", diff --git a/database_manager/Cargo.toml b/database_manager/Cargo.toml index 99bef75a72c..c0ea2644326 100644 --- a/database_manager/Cargo.toml +++ b/database_manager/Cargo.toml @@ -11,6 +11,7 @@ clap_utils = { workspace = true } environment = { workspace = true } hex = { workspace = true } serde = { workspace = true } +ethereum_ssz = { workspace = true } store = { workspace = true } strum = { workspace = true } tracing = { workspace = true } diff --git a/database_manager/src/cli.rs b/database_manager/src/cli.rs index c62da1206f1..1a42be2bc00 100644 --- a/database_manager/src/cli.rs +++ b/database_manager/src/cli.rs @@ -4,6 +4,7 @@ use clap_utils::FLAG_HEADER; use serde::{Deserialize, Serialize}; use std::path::PathBuf; use store::hdiff::HierarchyConfig; +use types::Slot; use crate::InspectTarget; @@ -80,6 +81,9 @@ pub enum DatabaseManagerSubcommand { PruneBlobs(PruneBlobs), PruneStates(PruneStates), Compact(Compact), + SetOldestBlobSlot(SetOldestBlobSlot), + InspectBlobs(InspectBlobs), + ImportBlobs(ImportBlobs), } #[derive(Parser, Clone, Deserialize, Serialize, Debug)] @@ -228,3 +232,29 @@ pub struct Compact { )] pub output_dir: Option, } + +#[derive(Parser, Clone, Deserialize, Serialize, Debug)] +#[clap(about = "Manually override the database's view of the oldest blob known.")] +pub struct SetOldestBlobSlot { + #[clap( + long, + value_name = "SLOT", + help = "Slot of the oldest blob in the database.", + display_order = 0 + )] + pub slot: Slot, +} + +#[derive(Parser, Clone, Deserialize, Serialize, Debug)] +#[clap(about = "Produce a summary of blob availability in the database.")] +pub struct InspectBlobs { + #[clap(long, help = "Verify blob data integrity.", display_order = 0)] + pub verify: bool, +} + +#[derive(Parser, Clone, Deserialize, Serialize, Debug)] +#[clap(about = "Import blobs from another node's blob database.")] +pub struct ImportBlobs { + #[clap(long, help = "Path to the database to import", display_order = 0)] + pub source_db: PathBuf, +} diff --git a/database_manager/src/lib.rs b/database_manager/src/lib.rs index f38c28d8b02..a960b485be8 100644 --- a/database_manager/src/lib.rs +++ b/database_manager/src/lib.rs @@ -1,4 +1,5 @@ pub mod cli; + use crate::cli::DatabaseManager; use crate::cli::Migrate; use crate::cli::PruneStates; @@ -14,17 +15,17 @@ use environment::{Environment, RuntimeContext}; use serde::{Deserialize, Serialize}; use std::fs; use std::io::Write; -use std::path::PathBuf; -use store::KeyValueStore; +use std::path::{Path, PathBuf}; use store::{ database::interface::BeaconNodeBackend, errors::Error, + hot_cold_store::HotColdDBError, metadata::{SchemaVersion, CURRENT_SCHEMA_VERSION}, - DBColumn, HotColdDB, + BlobSidecarListFromRoot, DBColumn, HotColdDB, KeyValueStore, KeyValueStoreOp, }; use strum::{EnumString, EnumVariantNames}; -use tracing::{info, warn}; -use types::{BeaconState, EthSpec, Slot}; +use tracing::{debug, info, warn}; +use types::{BeaconState, BlobSidecarList, EthSpec, Hash256, Slot}; fn parse_client_config( cli_args: &ArgMatches, @@ -454,6 +455,184 @@ pub fn prune_states( Ok(()) } +fn set_oldest_blob_slot( + slot: Slot, + client_config: ClientConfig, + runtime_context: &RuntimeContext, +) -> Result<(), Error> { + let spec = &runtime_context.eth2_config.spec; + let hot_path = client_config.get_db_path(); + let cold_path = client_config.get_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); + + let db = HotColdDB::, BeaconNodeBackend>::open( + &hot_path, + &cold_path, + &blobs_path, + |_, _, _| Ok(()), + client_config.store, + spec.clone(), + )?; + + let old_blob_info = db.get_blob_info(); + let mut new_blob_info = old_blob_info.clone(); + new_blob_info.oldest_blob_slot = Some(slot); + + info!( + previous = ?old_blob_info.oldest_blob_slot, + new = ?slot, + "Updating oldest blob slot" + ); + + db.compare_and_set_blob_info_with_write(old_blob_info, new_blob_info) +} + +fn inspect_blobs( + _verify: bool, + client_config: ClientConfig, + runtime_context: &RuntimeContext, +) -> Result<(), Error> { + let spec = &runtime_context.eth2_config.spec; + let hot_path = client_config.get_db_path(); + let cold_path = client_config.get_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); + + let db = HotColdDB::, BeaconNodeBackend>::open( + &hot_path, + &cold_path, + &blobs_path, + |_, _, _| Ok(()), + client_config.store, + spec.clone(), + )?; + + let split = db.get_split_info(); + let oldest_block_slot = db.get_oldest_block_slot(); + let deneb_start_slot = spec + .deneb_fork_epoch + .map_or(Slot::new(0), |epoch| epoch.start_slot(E::slots_per_epoch())); + let start_slot = oldest_block_slot.max(deneb_start_slot); + + if oldest_block_slot > deneb_start_slot { + info!( + start = %deneb_start_slot, + end = %(oldest_block_slot - 1), + "Missing blobs AND blocks" + ); + } + + let mut last_block_root = Hash256::ZERO; + + for res in db.forwards_block_roots_iterator_until(start_slot, split.slot, || { + db.get_advanced_hot_state(split.block_root, split.slot, split.state_root)? + .ok_or(HotColdDBError::MissingSplitState(split.state_root, split.slot).into()) + .map(|(_, split_state)| (split_state, split.block_root)) + })? { + let (block_root, slot) = res?; + + if last_block_root == block_root { + info!("Slot {}: no block", slot); + } else if let BlobSidecarListFromRoot::Blobs(blobs) = db.get_blobs(&block_root)? { + // FIXME(sproul): do verification here + info!("Slot {}: {} blobs stored", slot, blobs.len()); + } else { + // Check whether blobs are expected. + let block = db + .get_blinded_block(&block_root)? + .ok_or(Error::BlockNotFound(block_root))?; + + let num_expected_blobs = block + .message() + .body() + .blob_kzg_commitments() + .map_or(0, |blobs| blobs.len()); + if num_expected_blobs > 0 { + warn!( + "Slot {}: {} blobs missing ({:?})", + slot, num_expected_blobs, block_root + ); + } else { + info!("Slot {}: block with 0 blobs", slot); + } + } + last_block_root = block_root; + } + + Ok(()) +} + +fn import_blobs( + source_path: &Path, + client_config: ClientConfig, + runtime_context: &RuntimeContext, +) -> Result<(), Error> { + let spec = &runtime_context.eth2_config.spec; + let hot_path = client_config.get_db_path(); + let cold_path = client_config.get_freezer_db_path(); + let blobs_path = client_config.get_blobs_db_path(); + + let db = HotColdDB::, BeaconNodeBackend>::open( + &hot_path, + &cold_path, + &blobs_path, + |_, _, _| Ok(()), + client_config.store.clone(), + spec.clone(), + )?; + + let source_db = BeaconNodeBackend::::open(&client_config.store, source_path)?; + + let prev_blob_info = db.get_blob_info(); + let mut oldest_blob_slot = prev_blob_info + .oldest_blob_slot + .unwrap_or(Slot::new(u64::MAX)); + + let mut num_already_known = 0; + let mut num_imported = 0; + + let mut ops = vec![]; + let batch_size = 1024; + + for res in source_db.iter_column(DBColumn::BeaconBlob) { + let (block_root, blob_bytes) = res?; + + if db.get_blobs(&block_root)?.len() > 0 { + num_already_known += 1; + } else { + // FIXME(sproul): max len? + let blobs = BlobSidecarList::::from_ssz_bytes(&blob_bytes, 64)?; + ops.push(KeyValueStoreOp::PutKeyValue( + DBColumn::BeaconBlob, + block_root.to_vec(), + blob_bytes, + )); + + if let Some(blob) = blobs.first() { + oldest_blob_slot = oldest_blob_slot.min(blob.slot()); + debug!("Imported blobs for slot {} ({:?})", blob.slot(), block_root); + } + num_imported += 1; + + if ops.len() >= batch_size { + db.blobs_db.do_atomically(std::mem::take(&mut ops))?; + } + } + } + db.blobs_db.do_atomically(ops)?; + + let mut new_blob_info = prev_blob_info.clone(); + new_blob_info.oldest_blob_slot = Some(oldest_blob_slot); + db.compare_and_set_blob_info_with_write(prev_blob_info, new_blob_info)?; + + info!( + imported = num_imported, + already_known = num_already_known, + "Blobs imported" + ); + + Ok(()) +} + /// Run the database manager, returning an error string if the operation did not succeed. pub fn run( cli_args: &ArgMatches, @@ -512,5 +691,14 @@ pub fn run( let compact_config = parse_compact_config(compact_config)?; compact_db::(compact_config, client_config).map_err(format_err) } + cli::DatabaseManagerSubcommand::SetOldestBlobSlot(blob_slot_config) => { + set_oldest_blob_slot(blob_slot_config.slot, client_config, &context).map_err(format_err) + } + cli::DatabaseManagerSubcommand::InspectBlobs(_) => { + inspect_blobs(false, client_config, &context).map_err(format_err) + } + cli::DatabaseManagerSubcommand::ImportBlobs(config) => { + import_blobs(&config.source_db, client_config, &context).map_err(format_err) + } } }