diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 51c7f0c289e..cf3674c587e 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -3069,9 +3069,10 @@ async fn deneb_prune_blobs_happy_case() { let store = get_store(&db_path); if store.get_chain_spec().is_peer_das_scheduled() { - // TODO(fulu): add prune tests for Fulu / PeerDAS data columns. + // Blob pruning no longer needed since Fulu / PeerDAS return; } + let Some(deneb_fork_epoch) = store.get_chain_spec().deneb_fork_epoch else { // No-op prior to Deneb. return; @@ -3120,9 +3121,10 @@ async fn deneb_prune_blobs_no_finalization() { let store = get_store(&db_path); if store.get_chain_spec().is_peer_das_scheduled() { - // TODO(fulu): add prune tests for Fulu / PeerDAS data columns. + // Blob pruning no longer needed since Fulu / PeerDAS return; } + let Some(deneb_fork_epoch) = store.get_chain_spec().deneb_fork_epoch else { // No-op prior to Deneb. return; @@ -3179,29 +3181,39 @@ async fn deneb_prune_blobs_no_finalization() { /// Check that blob pruning does not fail trying to prune across the fork boundary. #[tokio::test] -async fn deneb_prune_blobs_fork_boundary() { - let deneb_fork_epoch = Epoch::new(4); +async fn prune_blobs_across_fork_boundary() { let mut spec = ForkName::Capella.make_genesis_spec(E::default_spec()); + + let deneb_fork_epoch = Epoch::new(4); spec.deneb_fork_epoch = Some(deneb_fork_epoch); let deneb_fork_slot = deneb_fork_epoch.start_slot(E::slots_per_epoch()); + let electra_fork_epoch = Epoch::new(8); + spec.electra_fork_epoch = Some(electra_fork_epoch); + + let fulu_fork_epoch = Epoch::new(12); + spec.fulu_fork_epoch = Some(fulu_fork_epoch); + let db_path = tempdir().unwrap(); let store = get_store_generic(&db_path, StoreConfig::default(), spec); let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); - let num_blocks = E::slots_per_epoch() * 7; + let blocks_to_deneb_finalization = E::slots_per_epoch() * 7; + let blocks_to_electra_finalization = E::slots_per_epoch() * 4; + let blocks_to_fulu_finalization = E::slots_per_epoch() * 4; - // Finalize to epoch 5. + // Extend the chain to epoch 7 + // Finalize to epoch 5 (Deneb). harness .extend_chain( - num_blocks as usize, + blocks_to_deneb_finalization as usize, BlockStrategy::OnCanonicalHead, AttestationStrategy::AllValidators, ) .await; - // Finalization should be at epoch 5. + // Finalization should be at epoch 5 (Deneb). let finalized_epoch = Epoch::new(5); let finalized_slot = finalized_epoch.start_slot(E::slots_per_epoch()); assert_eq!( @@ -3240,6 +3252,116 @@ async fn deneb_prune_blobs_fork_boundary() { assert_eq!(store.get_blob_info().oldest_blob_slot, Some(pruned_slot)); check_blob_existence(&harness, Slot::new(0), pruned_slot - 1, false); check_blob_existence(&harness, pruned_slot, harness.head_slot(), true); + + // Extend the chain to epoch 11 + // Finalize to epoch 9 (Electra) + harness.advance_slot(); + harness + .extend_chain( + blocks_to_electra_finalization as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Finalization should be at epoch 9 (Electra). + let finalized_epoch = Epoch::new(9); + let finalized_slot = finalized_epoch.start_slot(E::slots_per_epoch()); + assert_eq!( + harness.get_current_state().finalized_checkpoint().epoch, + finalized_epoch + ); + assert_eq!(store.get_split_slot(), finalized_slot); + + // All blobs since last pruning during Deneb should still be available. + assert_eq!(store.get_blob_info().oldest_blob_slot, Some(pruned_slot)); + + let electra_first_slot = electra_fork_epoch.start_slot(E::slots_per_epoch()); + // Check that blobs exist from the pruned slot to electra + check_blob_existence(&harness, pruned_slot, electra_first_slot - 1, true); + + // Trigger pruning on Electra + let pruned_slot = (electra_fork_epoch + 1).start_slot(E::slots_per_epoch()); + + store.try_prune_blobs(true, finalized_epoch).unwrap(); + assert_eq!(store.get_blob_info().oldest_blob_slot, Some(finalized_slot)); + check_blob_existence(&harness, Slot::new(0), pruned_slot - 1, false); + check_blob_existence(&harness, pruned_slot, harness.head_slot(), true); + + // Check that blobs have been pruned up to the pruned slot + check_blob_existence(&harness, Slot::new(0), pruned_slot - 1, false); + // Check that blobs exist from electra to the current head + check_blob_existence(&harness, electra_first_slot, harness.head_slot(), true); + + // Extend the chain to epoch 15 + // Finalize to epoch 13 (Fulu) + harness.advance_slot(); + harness + .extend_chain( + blocks_to_fulu_finalization as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Finalization should be at epoch 13 (Fulu). + let finalized_epoch = Epoch::new(13); + let finalized_slot = finalized_epoch.start_slot(E::slots_per_epoch()); + assert_eq!( + harness.get_current_state().finalized_checkpoint().epoch, + finalized_epoch + ); + assert_eq!(store.get_split_slot(), finalized_slot); + + // All blobs since last pruning during Electra should still be available. + assert_eq!(store.get_blob_info().oldest_blob_slot, Some(pruned_slot)); + + let fulu_first_slot = fulu_fork_epoch.start_slot(E::slots_per_epoch()); + // Check that blobs have been pruned up to the pruned slot + check_blob_existence(&harness, Slot::new(0), pruned_slot - 1, false); + // Check that blobs exist from the pruned slot to Fulu + check_blob_existence(&harness, pruned_slot, fulu_first_slot - 1, true); + // Check that blobs do not exist from Fulu to the current head + check_blob_existence(&harness, fulu_first_slot, harness.head_slot(), false); + + // Attempt pruning with at different epochs. No pruning should occur for epochs + // preceding Fulu, as we have already triggered pruning pre-Fulu. Pruning should occur + // for epochs after Fulu. + assert!(fulu_fork_epoch < finalized_epoch); + for data_availability_boundary in [ + Epoch::new(7), + electra_fork_epoch, + Epoch::new(9), + Epoch::new(11), + fulu_fork_epoch, + Epoch::new(15), + ] { + store + .try_prune_blobs(true, data_availability_boundary) + .unwrap(); + + let oldest_slot = data_availability_boundary.start_slot(E::slots_per_epoch()); + + if data_availability_boundary < fulu_fork_epoch { + // Pre Fulu fork epochs + // Check oldest blob slot is not updated. + assert!(store.get_blob_info().oldest_blob_slot >= Some(oldest_slot)); + check_blob_existence(&harness, Slot::new(0), oldest_slot - 1, false); + // Blobs should exist + check_blob_existence(&harness, oldest_slot, harness.head_slot(), true); + } else { + // Fulu fork epochs + // Pruning should have been triggered + assert!(store.get_blob_info().oldest_blob_slot <= Some(oldest_slot)); + // Oldest blost slot should never be greater than the first fulu slot + let fulu_first_slot = fulu_fork_epoch.start_slot(E::slots_per_epoch()); + assert!(store.get_blob_info().oldest_blob_slot <= Some(fulu_first_slot)); + // Blobs should not exist post-Fulu + check_blob_existence(&harness, oldest_slot, harness.head_slot(), false); + // Data columns should exist post-Fulu + check_data_column_existence(&harness, oldest_slot, harness.head_slot(), true); + }; + } } /// Check that blob pruning prunes blobs older than the data availability boundary with margin @@ -3268,9 +3390,10 @@ async fn deneb_prune_blobs_margin_test(margin: u64) { let store = get_store_generic(&db_path, config, test_spec::()); if store.get_chain_spec().is_peer_das_scheduled() { - // TODO(fulu): add prune tests for Fulu / PeerDAS data columns. + // Blob pruning no longer needed since Fulu / PeerDAS return; } + let Some(deneb_fork_epoch) = store.get_chain_spec().deneb_fork_epoch else { // No-op prior to Deneb. return; @@ -3380,6 +3503,309 @@ fn check_blob_existence( } } +/// Check that blob pruning prunes data columns older than the data availability boundary. +#[tokio::test] +async fn fulu_prune_data_columns_happy_case() { + let db_path = tempdir().unwrap(); + let store = get_store(&db_path); + + if !store.get_chain_spec().is_peer_das_scheduled() { + // No-op if PeerDAS not scheduled. + return; + } + let Some(fulu_fork_epoch) = store.get_chain_spec().fulu_fork_epoch else { + // No-op prior to Fulu. + return; + }; + let fulu_fork_slot = fulu_fork_epoch.start_slot(E::slots_per_epoch()); + + let num_blocks_produced = E::slots_per_epoch() * 8; + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + + harness + .extend_chain( + num_blocks_produced as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Prior to manual pruning with an artifically low data availability boundary all data columns + // should be stored. + assert_eq!( + store.get_data_column_info().oldest_data_column_slot, + Some(fulu_fork_slot) + ); + check_data_column_existence(&harness, Slot::new(1), harness.head_slot(), true); + + // Trigger pruning of data columns older than epoch 2. + let data_availability_boundary = Epoch::new(2); + store + .try_prune_blobs(true, data_availability_boundary) + .unwrap(); + + // Check oldest data column slot is updated accordingly and prior data columns have been + // deleted. + let oldest_data_column_slot = store + .get_data_column_info() + .oldest_data_column_slot + .unwrap(); + assert_eq!( + oldest_data_column_slot, + data_availability_boundary.start_slot(E::slots_per_epoch()) + ); + check_data_column_existence(&harness, Slot::new(0), oldest_data_column_slot - 1, false); + check_data_column_existence(&harness, oldest_data_column_slot, harness.head_slot(), true); +} + +/// Check that blob pruning does not prune data columns without finalization. +#[tokio::test] +async fn fulu_prune_data_columns_no_finalization() { + let db_path = tempdir().unwrap(); + let store = get_store(&db_path); + + if !store.get_chain_spec().is_peer_das_scheduled() { + // No-op if PeerDAS not scheduled. + return; + } + let Some(fulu_fork_epoch) = store.get_chain_spec().fulu_fork_epoch else { + // No-op prior to Fulu. + return; + }; + let fulu_fork_slot = fulu_fork_epoch.start_slot(E::slots_per_epoch()); + + let initial_num_blocks = E::slots_per_epoch() * 5; + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + + // Finalize to epoch 3. + harness + .extend_chain( + initial_num_blocks as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Extend the chain for another few epochs without attestations. + let unfinalized_num_blocks = E::slots_per_epoch() * 3; + harness.advance_slot(); + harness + .extend_chain( + unfinalized_num_blocks as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::SomeValidators(vec![]), + ) + .await; + + // Finalization should be at epoch 3. + let finalized_slot = Slot::new(E::slots_per_epoch() * 3); + assert_eq!(harness.get_current_state().finalized_checkpoint().epoch, 3); + assert_eq!(store.get_split_slot(), finalized_slot); + + // All data columns should still be available. + assert_eq!( + store.get_data_column_info().oldest_data_column_slot, + Some(fulu_fork_slot) + ); + check_data_column_existence(&harness, Slot::new(0), harness.head_slot(), true); + + // Attempt pruning of data columns older than epoch 4, which is newer than finalization. + let data_availability_boundary = Epoch::new(4); + store + .try_prune_blobs(true, data_availability_boundary) + .unwrap(); + + // Check oldest data column slot is only updated to finalization, and NOT to the DAB. + let oldest_data_column_slot = store + .get_data_column_info() + .oldest_data_column_slot + .unwrap(); + assert_eq!(oldest_data_column_slot, finalized_slot); + check_data_column_existence(&harness, Slot::new(0), finalized_slot - 1, false); + check_data_column_existence(&harness, finalized_slot, harness.head_slot(), true); +} + +/// Check that data column pruning does not fail trying to prune across the fork boundary. +#[tokio::test] +async fn fulu_prune_data_columns_fork_boundary() { + let mut spec = ForkName::Electra.make_genesis_spec(E::default_spec()); + let fulu_fork_epoch = Epoch::new(4); + spec.fulu_fork_epoch = Some(fulu_fork_epoch); + let fulu_fork_slot = fulu_fork_epoch.start_slot(E::slots_per_epoch()); + + let db_path = tempdir().unwrap(); + let store = get_store_generic(&db_path, StoreConfig::default(), spec); + + if !store.get_chain_spec().is_peer_das_scheduled() { + // No-op if PeerDAS not scheduled. + panic!("PeerDAS not scheduled"); + //return; + } + + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + + let num_blocks = E::slots_per_epoch() * 7; + + // Finalize to epoch 5. + harness + .extend_chain( + num_blocks as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Finalization should be at epoch 5. + let finalized_epoch = Epoch::new(5); + let finalized_slot = finalized_epoch.start_slot(E::slots_per_epoch()); + assert_eq!( + harness.get_current_state().finalized_checkpoint().epoch, + finalized_epoch + ); + assert_eq!(store.get_split_slot(), finalized_slot); + + // All data columns should still be available. + assert_eq!( + store.get_data_column_info().oldest_data_column_slot, + Some(fulu_fork_slot) + ); + check_data_column_existence(&harness, Slot::new(0), harness.head_slot(), true); + + // Attempt pruning with data availability epochs that precede the fork epoch. + // No pruning should occur. + assert!(fulu_fork_epoch < finalized_epoch); + for data_availability_boundary in [Epoch::new(0), Epoch::new(3), fulu_fork_epoch] { + store + .try_prune_blobs(true, data_availability_boundary) + .unwrap(); + + // Check oldest data column slot is not updated. + assert_eq!( + store.get_data_column_info().oldest_data_column_slot, + Some(fulu_fork_slot) + ); + } + // All data columns should still be available. + check_data_column_existence(&harness, Slot::new(0), harness.head_slot(), true); + + // Prune one epoch past the fork. + let pruned_slot = (fulu_fork_epoch + 1).start_slot(E::slots_per_epoch()); + store.try_prune_blobs(true, fulu_fork_epoch + 1).unwrap(); + assert_eq!( + store.get_data_column_info().oldest_data_column_slot, + Some(pruned_slot) + ); + check_data_column_existence(&harness, Slot::new(0), pruned_slot - 1, false); + check_data_column_existence(&harness, pruned_slot, harness.head_slot(), true); +} + +/// Check that blob pruning prunes data columns older than the data availability boundary with +/// margin applied. +#[tokio::test] +async fn fulu_prune_data_columns_margin1() { + fulu_prune_data_columns_margin_test(1).await; +} + +#[tokio::test] +async fn fulu_prune_data_columns_margin3() { + fulu_prune_data_columns_margin_test(3).await; +} + +#[tokio::test] +async fn fulu_prune_data_columns_margin4() { + fulu_prune_data_columns_margin_test(4).await; +} + +async fn fulu_prune_data_columns_margin_test(margin: u64) { + let config = StoreConfig { + blob_prune_margin_epochs: margin, + ..StoreConfig::default() + }; + let db_path = tempdir().unwrap(); + let store = get_store_generic(&db_path, config, test_spec::()); + + if !store.get_chain_spec().is_peer_das_scheduled() { + // No-op if PeerDAS not scheduled. + return; + } + let Some(fulu_fork_epoch) = store.get_chain_spec().fulu_fork_epoch else { + // No-op prior to Fulu. + return; + }; + let fulu_fork_slot = fulu_fork_epoch.start_slot(E::slots_per_epoch()); + + let num_blocks_produced = E::slots_per_epoch() * 8; + let harness = get_harness(store.clone(), LOW_VALIDATOR_COUNT); + + harness + .extend_chain( + num_blocks_produced as usize, + BlockStrategy::OnCanonicalHead, + AttestationStrategy::AllValidators, + ) + .await; + + // Prior to manual pruning with an artifically low data availability boundary all blobs should + // be stored. + assert_eq!( + store.get_data_column_info().oldest_data_column_slot, + Some(fulu_fork_slot) + ); + check_data_column_existence(&harness, Slot::new(1), harness.head_slot(), true); + + // Trigger blob pruning of blobs older than epoch 6 - margin (6 is the minimum, due to + // finalization). + let data_availability_boundary = Epoch::new(6); + let effective_data_availability_boundary = + data_availability_boundary - store.get_config().blob_prune_margin_epochs; + assert!( + effective_data_availability_boundary > 0, + "must be > 0 because epoch 0 won't get pruned alone" + ); + store + .try_prune_blobs(true, data_availability_boundary) + .unwrap(); + + // Check oldest blob slot is updated accordingly and prior blobs have been deleted. + let oldest_data_column_slot = store + .get_data_column_info() + .oldest_data_column_slot + .unwrap(); + assert_eq!( + oldest_data_column_slot, + effective_data_availability_boundary.start_slot(E::slots_per_epoch()) + ); + check_data_column_existence(&harness, Slot::new(0), oldest_data_column_slot - 1, false); + check_data_column_existence(&harness, oldest_data_column_slot, harness.head_slot(), true); +} + +/// Check tat there are data column sidecars (or not) at every slot in the range. +fn check_data_column_existence( + harness: &TestHarness, + start_slot: Slot, + end_slot: Slot, + should_exist: bool, +) { + let mut columns_seen = 0; + for (block_root, slot) in harness + .chain + .forwards_iter_block_roots_until(start_slot, end_slot) + .unwrap() + .map(Result::unwrap) + { + if let Some(columns) = harness.chain.store.get_data_columns(&block_root).unwrap() { + assert!(should_exist, "columns at slot {slot} exist but should not"); + columns_seen += columns.len(); + } else { + // We don't actually store empty columns, so unfortunately we can't assert anything + // meaningful here (like asserting that the column should not exist). + } + } + if should_exist { + assert_ne!(columns_seen, 0, "expected non-zero number of columns"); + } +} + #[tokio::test] async fn prune_historic_states() { let num_blocks_produced = E::slots_per_epoch() * 5; diff --git a/beacon_node/store/src/database/leveldb_impl.rs b/beacon_node/store/src/database/leveldb_impl.rs index 81d6d1d4bd2..365d5e38b6a 100644 --- a/beacon_node/store/src/database/leveldb_impl.rs +++ b/beacon_node/store/src/database/leveldb_impl.rs @@ -287,7 +287,8 @@ impl LevelDB { ) -> Result<(), Error> { let mut leveldb_batch = Writebatch::new(); let iter = self.db.iter(self.read_options()); - + let start_key = BytesKey::from_vec(column.as_bytes().to_vec()); + iter.seek(&start_key); iter.take_while(move |(key, _)| key.matches_column(column)) .for_each(|(key, value)| { if f(&value).unwrap_or(false) { diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index 1663ec7b4d4..44890057d88 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -2688,13 +2688,14 @@ impl, Cold: ItemStore> HotColdDB self.try_prune_blobs(force, min_data_availability_boundary) } - /// Try to prune blobs older than the data availability boundary. + /// Try to prune blobs and data columns older than the data availability boundary. /// /// Blobs from the epoch `data_availability_boundary - blob_prune_margin_epochs` are retained. /// This epoch is an _exclusive_ endpoint for the pruning process. /// - /// This function only supports pruning blobs older than the split point, which is older than - /// (or equal to) finalization. Pruning blobs newer than finalization is not supported. + /// This function only supports pruning blobs and data columns older than the split point, + /// which is older than (or equal to) finalization. Pruning blobs and data columns newer than + /// finalization is not supported. /// /// This function also assumes that the split is stationary while it runs. It should only be /// run from the migrator thread (where `migrate_database` runs) or the database manager. @@ -2718,6 +2719,7 @@ impl, Cold: ItemStore> HotColdDB } let blob_info = self.get_blob_info(); + let data_column_info = self.get_data_column_info(); let Some(oldest_blob_slot) = blob_info.oldest_blob_slot else { error!("Slot of oldest blob is not known"); return Err(HotColdDBError::BlobPruneLogicError.into()); @@ -2816,13 +2818,7 @@ impl, Cold: ItemStore> HotColdDB } drop(block_cache); - let new_blob_info = BlobInfo { - oldest_blob_slot: Some(end_slot + 1), - blobs_db: blob_info.blobs_db, - }; - - let op = self.compare_and_set_blob_info(blob_info, new_blob_info)?; - self.do_atomically_with_block_and_blobs_cache(vec![StoreOp::KeyValueOp(op)])?; + self.update_blob_or_data_column_info(start_epoch, end_slot, blob_info, data_column_info)?; debug!("Blob pruning complete"); @@ -2889,6 +2885,31 @@ impl, Cold: ItemStore> HotColdDB Ok(()) } + + fn update_blob_or_data_column_info( + &self, + start_epoch: Epoch, + end_slot: Slot, + blob_info: BlobInfo, + data_column_info: DataColumnInfo, + ) -> Result<(), Error> { + let op = if self.spec.is_peer_das_enabled_for_epoch(start_epoch) { + let new_data_column_info = DataColumnInfo { + oldest_data_column_slot: Some(end_slot + 1), + }; + self.compare_and_set_data_column_info(data_column_info, new_data_column_info)? + } else { + let new_blob_info = BlobInfo { + oldest_blob_slot: Some(end_slot + 1), + blobs_db: blob_info.blobs_db, + }; + self.compare_and_set_blob_info(blob_info, new_blob_info)? + }; + + self.do_atomically_with_block_and_blobs_cache(vec![StoreOp::KeyValueOp(op)])?; + + Ok(()) + } } /// Advance the split point of the store, copying new finalized states to the freezer.