diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 990f4b6099c..fbf16ed0ba2 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -7168,6 +7168,11 @@ impl BeaconChain { let end_slot = start_slot.saturating_add(count); let mut roots = vec![]; + // let's explicitly check count = 0 since it's a public function for readabiilty purpose. + if count == 0 { + return roots; + } + for (root, slot) in block_roots_iter { if slot < end_slot && slot >= start_slot { roots.push(root); diff --git a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs index 7c3c854ed89..3d75076eece 100644 --- a/beacon_node/network/src/network_beacon_processor/rpc_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/rpc_methods.rs @@ -3,6 +3,7 @@ use crate::network_beacon_processor::{NetworkBeaconProcessor, FUTURE_SLOT_TOLERA use crate::service::NetworkMessage; use crate::status::ToStatusMessage; use crate::sync::SyncMessage; +use beacon_chain::block_verification_types::AsBlock; use beacon_chain::{BeaconChainError, BeaconChainTypes, WhenSlotSkipped}; use itertools::{process_results, Itertools}; use lighthouse_network::rpc::methods::{ @@ -279,7 +280,38 @@ impl NetworkBeaconProcessor { let mut send_blob_count = 0; let mut blob_list_results = HashMap::new(); + let mut is_fulu_by_root = HashMap::new(); for id in request.blob_ids.as_slice() { + let BlobIdentifier { + block_root: root, + index, + } = id; + + if !is_fulu_by_root.contains_key(root) { + let epoch = if let Some(block) = self + .chain + .data_availability_checker + .get_execution_valid_block(root) + { + block.message().epoch() + } else if let Some(block) = self.chain.early_attester_cache.get_block(*root) { + block.message().epoch() + } else if let Some(block) = self.chain.store.get_cached_block(root) { + block.message().epoch() + } else if let Ok(Some(block)) = self.chain.store.get_blinded_block(root) { + block.message().epoch() + } else { + // If we can't find the block in either cache, we can't determine the epoch. + continue; + }; + + is_fulu_by_root.insert(*root, self.chain.spec.is_peer_das_enabled_for_epoch(epoch)); + } + + if *is_fulu_by_root.get(root).unwrap() { + continue; + } + // First attempt to get the blobs from the RPC cache. if let Ok(Some(blob)) = self.chain.data_availability_checker.get_blob(id) { self.send_response( @@ -289,11 +321,6 @@ impl NetworkBeaconProcessor { ); send_blob_count += 1; } else { - let BlobIdentifier { - block_root: root, - index, - } = id; - let blob_list_result = match blob_list_results.entry(root) { Entry::Vacant(entry) => { entry.insert(self.chain.get_blobs_checking_early_attester_cache(root)) @@ -882,8 +909,16 @@ impl NetworkBeaconProcessor { "Received BlobsByRange Request" ); + if req.count == 0 { + return Err((RpcErrorResponse::InvalidRequest, "Request count is zero")); + } + + let mut req = req; let request_start_slot = Slot::from(req.start_slot); + let request_end_slot = Slot::from(req.start_slot + req.count - 1); + let request_end_epoch = request_end_slot.epoch(T::EthSpec::slots_per_epoch()); + // Check Deneb is enabled. Blobs are available since then. let data_availability_boundary_slot = match self.chain.data_availability_boundary() { Some(boundary) => boundary.start_slot(T::EthSpec::slots_per_epoch()), None => { @@ -919,6 +954,19 @@ impl NetworkBeaconProcessor { }; } + // Check Fulu/PeerDAS is in the range. Blobs are not served since then. + if self + .chain + .spec + .is_peer_das_enabled_for_epoch(request_end_epoch) + { + // Fulu epoch is Some type by the condition above. + let fulu_epoch = self.chain.spec.fulu_fork_epoch.unwrap(); + let fulu_start_slot = fulu_epoch.start_slot(T::EthSpec::slots_per_epoch()); + // See the justification for the formula in PR https://github.com/sigp/lighthouse/pull/7328 + req.count = fulu_start_slot.as_u64().saturating_sub(req.start_slot); + } + let block_roots = self.get_block_roots_for_slot_range(req.start_slot, req.count, "BlobsByRange")?; diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index 87a5a772941..5569ddf2cc2 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -20,7 +20,7 @@ use beacon_chain::{BeaconChain, WhenSlotSkipped}; use beacon_processor::{work_reprocessing_queue::*, *}; use gossipsub::MessageAcceptance; use itertools::Itertools; -use lighthouse_network::rpc::methods::{BlobsByRangeRequest, MetaDataV3}; +use lighthouse_network::rpc::methods::{BlobsByRangeRequest, BlobsByRootRequest, MetaDataV3}; use lighthouse_network::rpc::InboundRequestId; use lighthouse_network::{ discv5::enr::{self, CombinedKey}, @@ -34,9 +34,9 @@ use std::iter::Iterator; use std::sync::Arc; use std::time::Duration; use tokio::sync::mpsc; -use types::blob_sidecar::FixedBlobSidecarList; +use types::{blob_sidecar::FixedBlobSidecarList, BlobIdentifier, ChainSpec, RuntimeVariableList}; use types::{ - Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecarList, + Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, DataColumnSidecarList, DataColumnSubnetId, Epoch, EthSpec, ForkName, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedVoluntaryExit, Slot, SubnetId, }; @@ -93,6 +93,17 @@ impl TestRig { Self::new_parametric(chain_length, BeaconProcessorConfig::default(), spec).await } + pub async fn new_parametric_with_custom_fulu_epoch( + chain_length: u64, + beacon_processor_config: BeaconProcessorConfig, + fulu_epoch: Epoch, + ) -> Self { + let mut spec = test_spec::(); + spec.shard_committee_period = 2; + spec.fulu_fork_epoch = Some(fulu_epoch); + Self::new_parametric(chain_length, beacon_processor_config, spec).await + } + pub async fn new_parametric( chain_length: u64, beacon_processor_config: BeaconProcessorConfig, @@ -428,15 +439,22 @@ impl TestRig { } } - pub fn enqueue_blobs_by_range_request(&self, count: u64) { + pub fn enqueue_blobs_by_root_request(&self, blob_ids: RuntimeVariableList) { + self.network_beacon_processor + .send_blobs_by_roots_request( + PeerId::random(), + InboundRequestId::new_unchecked(42, 24), + BlobsByRootRequest { blob_ids }, + ) + .unwrap(); + } + + pub fn enqueue_blobs_by_range_request(&self, start_slot: u64, count: u64) { self.network_beacon_processor .send_blobs_by_range_request( PeerId::random(), InboundRequestId::new_unchecked(42, 24), - BlobsByRangeRequest { - start_slot: 0, - count, - }, + BlobsByRangeRequest { start_slot, count }, ) .unwrap(); } @@ -1310,22 +1328,103 @@ async fn test_backfill_sync_processing_rate_limiting_disabled() { .await; } +async fn test_blobs_by_range(rig: &mut TestRig, start_slot: u64, slot_count: u64) { + rig.enqueue_blobs_by_range_request(start_slot, slot_count); + + let mut blob_count = 0; + for slot in start_slot..(start_slot + slot_count) { + let root = rig + .chain + .block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None) + .unwrap(); + let slot_blob_count = root + .map(|root| { + rig.chain + .get_blobs(&root) + .map(|list| list.len()) + .unwrap_or(0) + }) + .unwrap_or(0); + if rig + .chain + .spec + .is_peer_das_enabled_for_epoch(Slot::new(slot).epoch(SLOTS_PER_EPOCH)) + { + // If peer DAS is enabled, we expect the slot to have 0 blobs. + assert_eq!(slot_blob_count, 0); + } + blob_count += slot_blob_count; + } + + let mut actual_count = 0; + while let Some(next) = rig.network_rx.recv().await { + if let NetworkMessage::SendResponse { + peer_id: _, + response: Response::BlobsByRange(blob), + inbound_request_id: _, + } = next + { + if blob.is_some() { + actual_count += 1; + } else { + break; + } + } else { + panic!("unexpected message {:?}", next); + } + } + assert_eq!(blob_count, actual_count); +} + #[tokio::test] -async fn test_blobs_by_range() { - if test_spec::().deneb_fork_epoch.is_none() { +async fn test_blobs_by_range_pre_fulu() { + if test_spec::().deneb_fork_epoch.is_none() || test_spec::().is_peer_das_scheduled() { return; }; - let mut rig = TestRig::new(64).await; - let slot_count = 32; - rig.enqueue_blobs_by_range_request(slot_count); + let rig_slot = 2 * SLOTS_PER_EPOCH; + let mut rig = TestRig::new(rig_slot).await; + + // Test blobs by range. + test_blobs_by_range(&mut rig, 0, 32).await; + // Test duplicated request. + test_blobs_by_range(&mut rig, 0, 32).await; + // Test more random ranges. + test_blobs_by_range(&mut rig, 7, 23).await; + test_blobs_by_range(&mut rig, 0, 64).await; + test_blobs_by_range(&mut rig, 14, 64).await; +} + +#[tokio::test] +async fn test_blobs_by_range_fulu() { + if !test_spec::().is_peer_das_scheduled() { + return; + } + + let fulu_epoch = Slot::new(2 * SLOTS_PER_EPOCH).epoch(SLOTS_PER_EPOCH); + + let mut rig = TestRig::new_parametric_with_custom_fulu_epoch( + SLOTS_PER_EPOCH * 5, + BeaconProcessorConfig::default(), + fulu_epoch, + ) + .await; + // Test deprecation from Fulu epoch. + test_blobs_by_range(&mut rig, 0, 96).await; + test_blobs_by_range(&mut rig, 32, 96).await; + test_blobs_by_range(&mut rig, 64, 128).await; + test_blobs_by_range(&mut rig, 93, 121).await; +} + +async fn test_blobs_by_root(rig: &mut TestRig, slots_and_indices: &[(u64, u64)]) { let mut blob_count = 0; - for slot in 0..slot_count { + let mut blob_ids = RuntimeVariableList::empty(slots_and_indices.len()); + for (slot, index) in slots_and_indices { let root = rig .chain - .block_root_at_slot(Slot::new(slot), WhenSlotSkipped::None) + .block_root_at_slot(Slot::new(*slot), WhenSlotSkipped::None) .unwrap(); - blob_count += root + let slot_blob_count = root .map(|root| { rig.chain .get_blobs(&root) @@ -1333,12 +1432,30 @@ async fn test_blobs_by_range() { .unwrap_or(0) }) .unwrap_or(0); + if rig + .chain + .spec + .is_peer_das_enabled_for_epoch(Slot::new(*slot).epoch(SLOTS_PER_EPOCH)) + { + // If peer DAS is enabled, we expect the slot to have 0 blobs. + assert_eq!(slot_blob_count, 0); + } else if slot_blob_count > 0 { + blob_count += 1; + } + let blob_id = BlobIdentifier { + block_root: root.unwrap(), + index: *index, + }; + blob_ids.push(blob_id).unwrap(); } + + rig.enqueue_blobs_by_root_request(blob_ids); + let mut actual_count = 0; while let Some(next) = rig.network_rx.recv().await { if let NetworkMessage::SendResponse { peer_id: _, - response: Response::BlobsByRange(blob), + response: Response::BlobsByRoot(blob), inbound_request_id: _, } = next { @@ -1353,3 +1470,25 @@ async fn test_blobs_by_range() { } assert_eq!(blob_count, actual_count); } + +#[tokio::test] +async fn test_blobs_by_root_fulu() { + if !test_spec::().is_peer_das_scheduled() { + return; + } + + let fulu_epoch = Slot::new(2 * SLOTS_PER_EPOCH).epoch(SLOTS_PER_EPOCH); + + let mut rig = TestRig::new_parametric_with_custom_fulu_epoch( + SLOTS_PER_EPOCH * 5, + BeaconProcessorConfig::default(), + fulu_epoch, + ) + .await; + + // Test blobs by root. Fulu slots should not have blobs. + test_blobs_by_root(&mut rig, &[(0, 0)]).await; + test_blobs_by_root(&mut rig, &[(3, 0), (4, 0), (5, 0)]).await; + test_blobs_by_root(&mut rig, &[(32, 0), (65, 0), (120, 0)]).await; + test_blobs_by_root(&mut rig, &[(93, 0), (9, 0), (25, 0)]).await; +} diff --git a/beacon_node/store/src/hot_cold_store.rs b/beacon_node/store/src/hot_cold_store.rs index d4b68357b2d..4e953136ab4 100644 --- a/beacon_node/store/src/hot_cold_store.rs +++ b/beacon_node/store/src/hot_cold_store.rs @@ -2028,6 +2028,10 @@ impl, Cold: ItemStore> HotColdDB }) } + pub fn get_cached_block(&self, block_root: &Hash256) -> Option> { + self.block_cache.lock().get_block(block_root).cloned() + } + /// Fetch all columns for a given block from the store. pub fn get_data_columns( &self,