diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index fcd7be791da..51bc34bffac 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -7,7 +7,9 @@ use crate::attester_cache::{AttesterCache, AttesterCacheKey}; use crate::beacon_block_streamer::{BeaconBlockStreamer, CheckEarlyAttesterCache}; use crate::beacon_proposer_cache::compute_proposer_duties_from_head; use crate::beacon_proposer_cache::BeaconProposerCache; -use crate::blob_verification::{GossipBlobError, GossipVerifiedBlob}; +use crate::blob_verification::{ + GossipBlobError, GossipVerifiedBlob, GossipVerifiedDataColumnSidecar, +}; use crate::block_times_cache::BlockTimesCache; use crate::block_verification::POS_PANDA_BANNER; use crate::block_verification::{ @@ -2070,6 +2072,19 @@ impl BeaconChain { }) } + pub fn verify_data_column_sidecar_for_gossip( + self: &Arc, + data_column_sidecar: Arc>, + subnet_id: u64, + ) -> Result, GossipBlobError> { + metrics::inc_counter(&metrics::BLOBS_COLUMN_SIDECAR_PROCESSING_REQUESTS); + let _timer = metrics::start_timer(&metrics::DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES); + GossipVerifiedDataColumnSidecar::new(data_column_sidecar, subnet_id, self).map(|v| { + metrics::inc_counter(&metrics::DATA_COLUMNS_SIDECAR_PROCESSING_SUCCESSES); + v + }) + } + pub fn verify_blob_sidecar_for_gossip( self: &Arc, blob_sidecar: Arc>, @@ -2885,6 +2900,20 @@ impl BeaconChain { self.remove_notified(&block_root, r) } + pub fn process_gossip_data_column( + self: &Arc, + gossip_verified_data_column: GossipVerifiedDataColumnSidecar, + ) { + let data_column = gossip_verified_data_column.as_data_column(); + // TODO(das) send to DA checker + info!( + self.log, + "Processed gossip data column"; + "index" => data_column.index, + "slot" => data_column.slot().as_u64() + ); + } + /// Cache the blobs in the processing cache, process it, then evict it from the cache if it was /// imported or errors. pub async fn process_rpc_blobs( diff --git a/beacon_node/beacon_chain/src/blob_verification.rs b/beacon_node/beacon_chain/src/blob_verification.rs index f2d150d72bf..ffc64f9d10e 100644 --- a/beacon_node/beacon_chain/src/blob_verification.rs +++ b/beacon_node/beacon_chain/src/blob_verification.rs @@ -17,7 +17,8 @@ use ssz_types::VariableList; use tree_hash::TreeHash; use types::blob_sidecar::BlobIdentifier; use types::{ - BeaconStateError, BlobSidecar, CloneConfig, EthSpec, Hash256, SignedBeaconBlockHeader, Slot, + BeaconStateError, BlobSidecar, CloneConfig, DataColumnSidecar, EthSpec, Hash256, + SignedBeaconBlockHeader, Slot, }; /// An error occurred while validating a gossip blob. @@ -184,6 +185,33 @@ pub type GossipVerifiedBlobList = VariableList< <::EthSpec as EthSpec>::MaxBlobsPerBlock, >; +#[derive(Debug)] +pub struct GossipVerifiedDataColumnSidecar { + data_column_sidecar: Arc>, +} + +impl GossipVerifiedDataColumnSidecar { + pub fn new( + column_sidecar: Arc>, + subnet_id: u64, + chain: &BeaconChain, + ) -> Result> { + let header = column_sidecar.signed_block_header.clone(); + // We only process slashing info if the gossip verification failed + // since we do not process the blob any further in that case. + validate_data_column_sidecar_for_gossip(column_sidecar, subnet_id, chain).map_err(|e| { + process_block_slash_info::<_, GossipBlobError>( + chain, + BlockSlashInfo::from_early_error_blob(header, e), + ) + }) + } + + pub fn as_data_column(&self) -> &Arc> { + &self.data_column_sidecar + } +} + /// A wrapper around a `BlobSidecar` that indicates it has been approved for re-gossiping on /// the p2p network. #[derive(Debug)] @@ -647,6 +675,17 @@ pub fn validate_blob_sidecar_for_gossip( }) } +pub fn validate_data_column_sidecar_for_gossip( + data_column_sidecar: Arc>, + _subnet: u64, + _chain: &BeaconChain, +) -> Result, GossipBlobError> { + // TODO(das): validate kzg commitments, cell proofs etc + Ok(GossipVerifiedDataColumnSidecar { + data_column_sidecar: data_column_sidecar.clone(), + }) +} + /// Returns the canonical root of the given `blob`. /// /// Use this function to ensure that we report the blob hashing time Prometheus metric. diff --git a/beacon_node/beacon_chain/src/errors.rs b/beacon_node/beacon_chain/src/errors.rs index 9c1ba06f853..010488a558b 100644 --- a/beacon_node/beacon_chain/src/errors.rs +++ b/beacon_node/beacon_chain/src/errors.rs @@ -220,6 +220,7 @@ pub enum BeaconChainError { InconsistentFork(InconsistentFork), ProposerHeadForkChoiceError(fork_choice::Error), UnableToPublish, + UnableToBuildColumnSidecar(String), AvailabilityCheckError(AvailabilityCheckError), LightClientError(LightClientError), UnsupportedFork, diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index ad095b37b51..4a2187be610 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -1014,14 +1014,26 @@ lazy_static! { "beacon_blobs_sidecar_processing_requests_total", "Count of all blob sidecars submitted for processing" ); + pub static ref BLOBS_COLUMN_SIDECAR_PROCESSING_REQUESTS: Result = try_create_int_counter( + "beacon_blobs_column_sidecar_processing_requests_total", + "Count of all data column sidecars submitted for processing" + ); pub static ref BLOBS_SIDECAR_PROCESSING_SUCCESSES: Result = try_create_int_counter( "beacon_blobs_sidecar_processing_successes_total", "Number of blob sidecars verified for gossip" ); + pub static ref DATA_COLUMNS_SIDECAR_PROCESSING_SUCCESSES: Result = try_create_int_counter( + "beacon_blobs_column_sidecar_processing_successes_total", + "Number of data column sidecars verified for gossip" + ); pub static ref BLOBS_SIDECAR_GOSSIP_VERIFICATION_TIMES: Result = try_create_histogram( "beacon_blobs_sidecar_gossip_verification_seconds", "Full runtime of blob sidecars gossip verification" ); + pub static ref DATA_COLUMN_SIDECAR_GOSSIP_VERIFICATION_TIMES: Result = try_create_histogram( + "beacon_blobs_column_sidecar_gossip_verification_seconds", + "Full runtime of data column sidecars gossip verification" + ); pub static ref BLOB_SIDECAR_INCLUSION_PROOF_VERIFICATION: Result = try_create_histogram( "blob_sidecar_inclusion_proof_verification_seconds", "Time taken to verify blob sidecar inclusion proof" diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index 045b06a1e72..8b1c127d300 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -109,6 +109,10 @@ const MAX_GOSSIP_BLOCK_QUEUE_LEN: usize = 1_024; /// will be stored before we start dropping them. const MAX_GOSSIP_BLOB_QUEUE_LEN: usize = 1_024; +/// The maximum number of queued `DataColumnSidecar` objects received on gossip that +/// will be stored before we start dropping them. +const MAX_GOSSIP_DATA_COL_QUEUE_LEN: usize = 1_024; + /// The maximum number of queued `SignedBeaconBlock` objects received prior to their slot (but /// within acceptable clock disparity) that will be queued before we start dropping them. const MAX_DELAYED_BLOCK_QUEUE_LEN: usize = 1_024; @@ -224,6 +228,7 @@ pub const GOSSIP_AGGREGATE: &str = "gossip_aggregate"; pub const GOSSIP_AGGREGATE_BATCH: &str = "gossip_aggregate_batch"; pub const GOSSIP_BLOCK: &str = "gossip_block"; pub const GOSSIP_BLOBS_SIDECAR: &str = "gossip_blobs_sidecar"; +pub const GOSSIP_BLOBS_COLUMN_SIDECAR: &str = "gossip_blobs_column_sidecar"; pub const DELAYED_IMPORT_BLOCK: &str = "delayed_import_block"; pub const GOSSIP_VOLUNTARY_EXIT: &str = "gossip_voluntary_exit"; pub const GOSSIP_PROPOSER_SLASHING: &str = "gossip_proposer_slashing"; @@ -590,6 +595,7 @@ pub enum Work { }, GossipBlock(AsyncFn), GossipBlobSidecar(AsyncFn), + GossipDataColumnSidecar(AsyncFn), DelayedImportBlock { beacon_block_slot: Slot, beacon_block_root: Hash256, @@ -640,6 +646,7 @@ impl Work { Work::GossipAggregateBatch { .. } => GOSSIP_AGGREGATE_BATCH, Work::GossipBlock(_) => GOSSIP_BLOCK, Work::GossipBlobSidecar(_) => GOSSIP_BLOBS_SIDECAR, + Work::GossipDataColumnSidecar(_) => GOSSIP_BLOBS_COLUMN_SIDECAR, Work::DelayedImportBlock { .. } => DELAYED_IMPORT_BLOCK, Work::GossipVoluntaryExit(_) => GOSSIP_VOLUNTARY_EXIT, Work::GossipProposerSlashing(_) => GOSSIP_PROPOSER_SLASHING, @@ -809,6 +816,7 @@ impl BeaconProcessor { let mut backfill_chain_segment = FifoQueue::new(MAX_CHAIN_SEGMENT_QUEUE_LEN); let mut gossip_block_queue = FifoQueue::new(MAX_GOSSIP_BLOCK_QUEUE_LEN); let mut gossip_blob_queue = FifoQueue::new(MAX_GOSSIP_BLOB_QUEUE_LEN); + let mut gossip_data_column_queue = FifoQueue::new(MAX_GOSSIP_DATA_COL_QUEUE_LEN); let mut delayed_block_queue = FifoQueue::new(MAX_DELAYED_BLOCK_QUEUE_LEN); let mut status_queue = FifoQueue::new(MAX_STATUS_QUEUE_LEN); @@ -964,6 +972,8 @@ impl BeaconProcessor { self.spawn_worker(item, idle_tx); } else if let Some(item) = gossip_blob_queue.pop() { self.spawn_worker(item, idle_tx); + } else if let Some(item) = gossip_data_column_queue.pop() { + self.spawn_worker(item, idle_tx); // Check the priority 0 API requests after blocks and blobs, but before attestations. } else if let Some(item) = api_request_p0_queue.pop() { self.spawn_worker(item, idle_tx); @@ -1206,6 +1216,9 @@ impl BeaconProcessor { Work::GossipBlobSidecar { .. } => { gossip_blob_queue.push(work, work_id, &self.log) } + Work::GossipDataColumnSidecar { .. } => { + gossip_data_column_queue.push(work, work_id, &self.log) + } Work::DelayedImportBlock { .. } => { delayed_block_queue.push(work, work_id, &self.log) } @@ -1304,6 +1317,10 @@ impl BeaconProcessor { &metrics::BEACON_PROCESSOR_GOSSIP_BLOB_QUEUE_TOTAL, gossip_blob_queue.len() as i64, ); + metrics::set_gauge( + &metrics::BEACON_PROCESSOR_GOSSIP_DATA_COLUMN_QUEUE_TOTAL, + gossip_data_column_queue.len() as i64, + ); metrics::set_gauge( &metrics::BEACON_PROCESSOR_RPC_BLOCK_QUEUE_TOTAL, rpc_block_queue.len() as i64, @@ -1455,11 +1472,11 @@ impl BeaconProcessor { task_spawner.spawn_async(process_fn) } Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn), - Work::GossipBlock(work) | Work::GossipBlobSidecar(work) => { - task_spawner.spawn_async(async move { - work.await; - }) - } + Work::GossipBlock(work) + | Work::GossipBlobSidecar(work) + | Work::GossipDataColumnSidecar(work) => task_spawner.spawn_async(async move { + work.await; + }), Work::BlobsByRangeRequest(process_fn) | Work::BlobsByRootsRequest(process_fn) => { task_spawner.spawn_blocking(process_fn) } diff --git a/beacon_node/beacon_processor/src/metrics.rs b/beacon_node/beacon_processor/src/metrics.rs index fa7d7d7b9a3..bcd422b357d 100644 --- a/beacon_node/beacon_processor/src/metrics.rs +++ b/beacon_node/beacon_processor/src/metrics.rs @@ -51,6 +51,11 @@ lazy_static::lazy_static! { "beacon_processor_gossip_blob_queue_total", "Count of blobs from gossip waiting to be verified." ); + // Gossip data column sidecars. + pub static ref BEACON_PROCESSOR_GOSSIP_DATA_COLUMN_QUEUE_TOTAL: Result = try_create_int_gauge( + "beacon_processor_gossip_data_column_queue_total", + "Count of data column sidecars from gossip waiting to be verified." + ); // Gossip Exits. pub static ref BEACON_PROCESSOR_EXIT_QUEUE_TOTAL: Result = try_create_int_gauge( "beacon_processor_exit_queue_total", diff --git a/beacon_node/http_api/src/publish_blocks.rs b/beacon_node/http_api/src/publish_blocks.rs index 8b85c2ac951..67e5d00f8c6 100644 --- a/beacon_node/http_api/src/publish_blocks.rs +++ b/beacon_node/http_api/src/publish_blocks.rs @@ -19,9 +19,9 @@ use std::time::Duration; use tokio::sync::mpsc::UnboundedSender; use tree_hash::TreeHash; use types::{ - AbstractExecPayload, BeaconBlockRef, BlobSidecarList, EthSpec, ExecPayload, ExecutionBlockHash, - ForkName, FullPayload, FullPayloadMerge, Hash256, SignedBeaconBlock, SignedBlindedBeaconBlock, - VariableList, + AbstractExecPayload, BeaconBlockRef, BlobSidecarList, DataColumnSidecar, DataColumnSubnetId, + EthSpec, ExecPayload, ExecutionBlockHash, ForkName, FullPayload, FullPayloadMerge, Hash256, + SignedBeaconBlock, SignedBlindedBeaconBlock, VariableList, }; use warp::http::StatusCode; use warp::{reply::Response, Rejection, Reply}; @@ -88,6 +88,24 @@ pub async fn publish_block { let mut pubsub_messages = vec![PubsubMessage::BeaconBlock(block.clone())]; if let Some(blob_sidecars) = blobs_opt { + // Build and publish column sidecars + let col_sidecars = DataColumnSidecar::random_from_blob_sidecars(&blob_sidecars) + .map_err(|e| { + BeaconChainError::UnableToBuildColumnSidecar(format!("{e:?}")) + })?; + + for (col_index, col_sidecar) in col_sidecars.into_iter().enumerate() { + let subnet_id = + DataColumnSubnetId::try_from_column_index::(col_index) + .map_err(|e| { + BeaconChainError::UnableToBuildColumnSidecar(format!("{e:?}")) + })?; + pubsub_messages.push(PubsubMessage::DataColumnSidecar(Box::new(( + subnet_id, + Arc::new(col_sidecar), + )))); + } + // Publish blob sidecars for (blob_index, blob) in blob_sidecars.into_iter().enumerate() { pubsub_messages.push(PubsubMessage::BlobSidecar(Box::new(( blob_index as u64, diff --git a/beacon_node/lighthouse_network/src/discovery/mod.rs b/beacon_node/lighthouse_network/src/discovery/mod.rs index 829124e1233..ebb2bfcccfb 100644 --- a/beacon_node/lighthouse_network/src/discovery/mod.rs +++ b/beacon_node/lighthouse_network/src/discovery/mod.rs @@ -513,6 +513,8 @@ impl Discovery { ) .map_err(|e| format!("{:?}", e))?; } + // TODO(das) discovery to be implemented at a later phase. Initially we just use a large peer count. + Subnet::DataColumn(_) => return Ok(()), } // replace the global version @@ -832,6 +834,7 @@ impl Discovery { let query_str = match query.subnet { Subnet::Attestation(_) => "attestation", Subnet::SyncCommittee(_) => "sync_committee", + Subnet::DataColumn(_) => "data_column", }; if let Some(v) = metrics::get_int_counter( diff --git a/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs b/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs index f79ff8daf69..0b35465233a 100644 --- a/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs +++ b/beacon_node/lighthouse_network/src/discovery/subnet_predicate.rs @@ -33,6 +33,8 @@ where Subnet::SyncCommittee(s) => sync_committee_bitfield .as_ref() .map_or(false, |b| b.get(*s.deref() as usize).unwrap_or(false)), + // TODO(das) discovery to be implemented at a later phase. Initially we just use a large peer count. + Subnet::DataColumn(_) => false, }); if !predicate { diff --git a/beacon_node/lighthouse_network/src/peer_manager/mod.rs b/beacon_node/lighthouse_network/src/peer_manager/mod.rs index 4316c0d07e1..76fe7a7d7b3 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/mod.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/mod.rs @@ -1073,6 +1073,10 @@ impl PeerManager { .or_default() .insert(id); } + // TODO(das) to be implemented. We're not pruning data column peers yet + // because data column topics are subscribed as core topics until we + // implement recomputing data column subnets. + Subnet::DataColumn(_) => {} } } } diff --git a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs index 44c54511ddc..6e0c00e42b8 100644 --- a/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs +++ b/beacon_node/lighthouse_network/src/peer_manager/peerdb/peer_info.rs @@ -94,6 +94,8 @@ impl PeerInfo { .syncnets() .map_or(false, |s| s.get(**id as usize).unwrap_or(false)) } + // TODO(das) Add data column nets bitfield + Subnet::DataColumn(_) => return false, } } false diff --git a/beacon_node/lighthouse_network/src/service/gossip_cache.rs b/beacon_node/lighthouse_network/src/service/gossip_cache.rs index 5dc0d29ff5b..d0927283da7 100644 --- a/beacon_node/lighthouse_network/src/service/gossip_cache.rs +++ b/beacon_node/lighthouse_network/src/service/gossip_cache.rs @@ -194,6 +194,7 @@ impl GossipCache { let expire_timeout = match topic.kind() { GossipKind::BeaconBlock => self.beacon_block, GossipKind::BlobSidecar(_) => self.blob_sidecar, + GossipKind::DataColumnSidecar(_) => self.blob_sidecar, GossipKind::BeaconAggregateAndProof => self.aggregates, GossipKind::Attestation(_) => self.attestation, GossipKind::VoluntaryExit => self.voluntary_exit, diff --git a/beacon_node/lighthouse_network/src/service/mod.rs b/beacon_node/lighthouse_network/src/service/mod.rs index 2b20c76cf4b..545e58d28ed 100644 --- a/beacon_node/lighthouse_network/src/service/mod.rs +++ b/beacon_node/lighthouse_network/src/service/mod.rs @@ -226,6 +226,7 @@ impl Network { let max_topics = ctx.chain_spec.attestation_subnet_count as usize + SYNC_COMMITTEE_SUBNET_COUNT as usize + ctx.chain_spec.blob_sidecar_subnet_count as usize + + ctx.chain_spec.data_column_sidecar_subnet_count as usize + BASE_CORE_TOPICS.len() + ALTAIR_CORE_TOPICS.len() + CAPELLA_CORE_TOPICS.len() @@ -239,6 +240,7 @@ impl Network { ctx.chain_spec.attestation_subnet_count, SYNC_COMMITTEE_SUBNET_COUNT, ctx.chain_spec.blob_sidecar_subnet_count, + ctx.chain_spec.data_column_sidecar_subnet_count, ), // during a fork we subscribe to both the old and new topics max_subscribed_topics: max_topics * 4, diff --git a/beacon_node/lighthouse_network/src/service/utils.rs b/beacon_node/lighthouse_network/src/service/utils.rs index 34dec1ca6c0..217469eac8a 100644 --- a/beacon_node/lighthouse_network/src/service/utils.rs +++ b/beacon_node/lighthouse_network/src/service/utils.rs @@ -20,7 +20,9 @@ use std::io::prelude::*; use std::path::Path; use std::sync::Arc; use std::time::Duration; -use types::{ChainSpec, EnrForkId, EthSpec, ForkContext, SubnetId, SyncSubnetId}; +use types::{ + ChainSpec, DataColumnSubnetId, EnrForkId, EthSpec, ForkContext, SubnetId, SyncSubnetId, +}; pub const NETWORK_KEY_FILENAME: &str = "key"; /// The maximum simultaneous libp2p connections per peer. @@ -232,6 +234,7 @@ pub(crate) fn create_whitelist_filter( attestation_subnet_count: u64, sync_committee_subnet_count: u64, blob_sidecar_subnet_count: u64, + data_column_subnet_count: u64, ) -> gossipsub::WhitelistSubscriptionFilter { let mut possible_hashes = HashSet::new(); for fork_digest in possible_fork_digests { @@ -260,6 +263,9 @@ pub(crate) fn create_whitelist_filter( for id in 0..blob_sidecar_subnet_count { add(BlobSidecar(id)); } + for id in 0..data_column_subnet_count { + add(DataColumnSidecar(DataColumnSubnetId::new(id))); + } } gossipsub::WhitelistSubscriptionFilter(possible_hashes) } diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index 60fe3748265..e9bccf59617 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -9,12 +9,12 @@ use std::boxed::Box; use std::io::{Error, ErrorKind}; use std::sync::Arc; use types::{ - Attestation, AttesterSlashing, BlobSidecar, EthSpec, ForkContext, ForkName, - LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing, - SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase, - SignedBeaconBlockCapella, SignedBeaconBlockDeneb, SignedBeaconBlockMerge, - SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, SubnetId, - SyncCommitteeMessage, SyncSubnetId, + Attestation, AttesterSlashing, BlobSidecar, DataColumnSidecar, DataColumnSubnetId, EthSpec, + ForkContext, ForkName, LightClientFinalityUpdate, LightClientOptimisticUpdate, + ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBeaconBlockAltair, + SignedBeaconBlockBase, SignedBeaconBlockCapella, SignedBeaconBlockDeneb, + SignedBeaconBlockMerge, SignedBlsToExecutionChange, SignedContributionAndProof, + SignedVoluntaryExit, SubnetId, SyncCommitteeMessage, SyncSubnetId, }; #[derive(Debug, Clone, PartialEq)] @@ -23,6 +23,8 @@ pub enum PubsubMessage { BeaconBlock(Arc>), /// Gossipsub message providing notification of a [`BlobSidecar`] along with the subnet id where it was received. BlobSidecar(Box<(u64, Arc>)>), + /// Gossipsub message providing notification of a [`DataColumnSidecar`] along with the subnet id where it was received. + DataColumnSidecar(Box<(DataColumnSubnetId, Arc>)>), /// Gossipsub message providing notification of a Aggregate attestation and associated proof. AggregateAndProofAttestation(Box>), /// Gossipsub message providing notification of a raw un-aggregated attestation with its shard id. @@ -119,6 +121,9 @@ impl PubsubMessage { PubsubMessage::BlobSidecar(blob_sidecar_data) => { GossipKind::BlobSidecar(blob_sidecar_data.0) } + PubsubMessage::DataColumnSidecar(column_sidecar_data) => { + GossipKind::DataColumnSidecar(column_sidecar_data.0) + } PubsubMessage::AggregateAndProofAttestation(_) => GossipKind::BeaconAggregateAndProof, PubsubMessage::Attestation(attestation_data) => { GossipKind::Attestation(attestation_data.0) @@ -226,6 +231,30 @@ impl PubsubMessage { )), } } + GossipKind::DataColumnSidecar(subnet_id) => { + match fork_context.from_context_bytes(gossip_topic.fork_digest) { + Some(ForkName::Deneb) => { + let col_sidecar = Arc::new( + DataColumnSidecar::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?, + ); + Ok(PubsubMessage::DataColumnSidecar(Box::new(( + *subnet_id, + col_sidecar, + )))) + } + Some( + ForkName::Base + | ForkName::Altair + | ForkName::Merge + | ForkName::Capella, + ) + | None => Err(format!( + "data_column_sidecar topic invalid for given fork digest {:?}", + gossip_topic.fork_digest + )), + } + } GossipKind::VoluntaryExit => { let voluntary_exit = SignedVoluntaryExit::from_ssz_bytes(data) .map_err(|e| format!("{:?}", e))?; @@ -295,6 +324,7 @@ impl PubsubMessage { match &self { PubsubMessage::BeaconBlock(data) => data.as_ssz_bytes(), PubsubMessage::BlobSidecar(data) => data.1.as_ssz_bytes(), + PubsubMessage::DataColumnSidecar(data) => data.1.as_ssz_bytes(), PubsubMessage::AggregateAndProofAttestation(data) => data.as_ssz_bytes(), PubsubMessage::VoluntaryExit(data) => data.as_ssz_bytes(), PubsubMessage::ProposerSlashing(data) => data.as_ssz_bytes(), @@ -324,6 +354,12 @@ impl std::fmt::Display for PubsubMessage { data.1.slot(), data.1.index, ), + PubsubMessage::DataColumnSidecar(data) => write!( + f, + "DataColumnSidecar: slot: {}, column index: {}", + data.1.slot(), + data.1.index, + ), PubsubMessage::AggregateAndProofAttestation(att) => write!( f, "Aggregate and Proof: slot: {}, index: {}, aggregator_index: {}", diff --git a/beacon_node/lighthouse_network/src/types/subnet.rs b/beacon_node/lighthouse_network/src/types/subnet.rs index 50d28542bec..e814feefc70 100644 --- a/beacon_node/lighthouse_network/src/types/subnet.rs +++ b/beacon_node/lighthouse_network/src/types/subnet.rs @@ -1,6 +1,6 @@ use serde::Serialize; use std::time::Instant; -use types::{SubnetId, SyncSubnetId}; +use types::{DataColumnSubnetId, SubnetId, SyncSubnetId}; /// Represents a subnet on an attestation or sync committee `SubnetId`. /// @@ -12,6 +12,8 @@ pub enum Subnet { Attestation(SubnetId), /// Represents a gossipsub sync committee subnet and the metadata `syncnets` field. SyncCommittee(SyncSubnetId), + /// Represents a gossipsub data column subnet and the metadata `blbcolnets` field. + DataColumn(DataColumnSubnetId), } /// A subnet to discover peers on along with the instant after which it's no longer useful. diff --git a/beacon_node/lighthouse_network/src/types/topics.rs b/beacon_node/lighthouse_network/src/types/topics.rs index b774905174f..dc1479e9f6a 100644 --- a/beacon_node/lighthouse_network/src/types/topics.rs +++ b/beacon_node/lighthouse_network/src/types/topics.rs @@ -1,7 +1,7 @@ use libp2p::gossipsub::{IdentTopic as Topic, TopicHash}; use serde::{Deserialize, Serialize}; use strum::AsRefStr; -use types::{ChainSpec, EthSpec, ForkName, SubnetId, SyncSubnetId}; +use types::{ChainSpec, DataColumnSubnetId, EthSpec, ForkName, SubnetId, SyncSubnetId}; use crate::Subnet; @@ -14,6 +14,7 @@ pub const BEACON_BLOCK_TOPIC: &str = "beacon_block"; pub const BEACON_AGGREGATE_AND_PROOF_TOPIC: &str = "beacon_aggregate_and_proof"; pub const BEACON_ATTESTATION_PREFIX: &str = "beacon_attestation_"; pub const BLOB_SIDECAR_PREFIX: &str = "blob_sidecar_"; +pub const DATA_COLUMN_SIDECAR_PREFIX: &str = "data_column_sidecar_"; pub const VOLUNTARY_EXIT_TOPIC: &str = "voluntary_exit"; pub const PROPOSER_SLASHING_TOPIC: &str = "proposer_slashing"; pub const ATTESTER_SLASHING_TOPIC: &str = "attester_slashing"; @@ -100,6 +101,8 @@ pub enum GossipKind { BeaconAggregateAndProof, /// Topic for publishing BlobSidecars. BlobSidecar(u64), + /// Topic for publishing DataColumnSidecars. + DataColumnSidecar(DataColumnSubnetId), /// Topic for publishing raw attestations on a particular subnet. #[strum(serialize = "beacon_attestation")] Attestation(SubnetId), @@ -132,6 +135,9 @@ impl std::fmt::Display for GossipKind { GossipKind::BlobSidecar(blob_index) => { write!(f, "{}{}", BLOB_SIDECAR_PREFIX, blob_index) } + GossipKind::DataColumnSidecar(column_index) => { + write!(f, "{}{}", DATA_COLUMN_SIDECAR_PREFIX, **column_index) + } x => f.write_str(x.as_ref()), } } @@ -219,6 +225,7 @@ impl GossipTopic { match self.kind() { GossipKind::Attestation(subnet_id) => Some(Subnet::Attestation(*subnet_id)), GossipKind::SyncCommitteeMessage(subnet_id) => Some(Subnet::SyncCommittee(*subnet_id)), + GossipKind::DataColumnSidecar(subnet_id) => Some(Subnet::DataColumn(*subnet_id)), _ => None, } } @@ -257,6 +264,9 @@ impl std::fmt::Display for GossipTopic { GossipKind::BlobSidecar(blob_index) => { format!("{}{}", BLOB_SIDECAR_PREFIX, blob_index) } + GossipKind::DataColumnSidecar(index) => { + format!("{}{}", DATA_COLUMN_SIDECAR_PREFIX, *index) + } GossipKind::BlsToExecutionChange => BLS_TO_EXECUTION_CHANGE_TOPIC.into(), GossipKind::LightClientFinalityUpdate => LIGHT_CLIENT_FINALITY_UPDATE.into(), GossipKind::LightClientOptimisticUpdate => LIGHT_CLIENT_OPTIMISTIC_UPDATE.into(), @@ -277,6 +287,7 @@ impl From for GossipKind { match subnet_id { Subnet::Attestation(s) => GossipKind::Attestation(s), Subnet::SyncCommittee(s) => GossipKind::SyncCommitteeMessage(s), + Subnet::DataColumn(s) => GossipKind::DataColumnSidecar(s), } } } @@ -300,6 +311,10 @@ fn subnet_topic_index(topic: &str) -> Option { ))); } else if let Some(index) = topic.strip_prefix(BLOB_SIDECAR_PREFIX) { return Some(GossipKind::BlobSidecar(index.parse::().ok()?)); + } else if let Some(index) = topic.strip_prefix(DATA_COLUMN_SIDECAR_PREFIX) { + return Some(GossipKind::DataColumnSidecar(DataColumnSubnetId::new( + index.parse::().ok()?, + ))); } None } diff --git a/beacon_node/network/src/metrics.rs b/beacon_node/network/src/metrics.rs index 0509ed1ea7d..11e02f5f3e7 100644 --- a/beacon_node/network/src/metrics.rs +++ b/beacon_node/network/src/metrics.rs @@ -71,6 +71,10 @@ lazy_static! { "beacon_processor_gossip_blob_verified_total", "Total number of gossip blob verified for propagation." ); + pub static ref BEACON_PROCESSOR_GOSSIP_DATA_COLUMN_SIDECAR_VERIFIED_TOTAL: Result = try_create_int_counter( + "beacon_processor_gossip_data_column_verified_total", + "Total number of gossip data column sidecar verified for propagation." + ); // Gossip Exits. pub static ref BEACON_PROCESSOR_EXIT_VERIFIED_TOTAL: Result = try_create_int_counter( "beacon_processor_exit_verified_total", @@ -283,6 +287,12 @@ lazy_static! { // [0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5] decimal_buckets(-3,-1) ); + pub static ref BEACON_DATA_COLUMN_GOSSIP_PROPAGATION_VERIFICATION_DELAY_TIME: Result = try_create_histogram_with_buckets( + "beacon_data_column_gossip_propagation_verification_delay_time", + "Duration between when the data column sidecar is received over gossip and when it is verified for propagation.", + // [0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1, 0.2, 0.5] + decimal_buckets(-3,-1) + ); pub static ref BEACON_BLOB_GOSSIP_SLOT_START_DELAY_TIME: Result = try_create_histogram_with_buckets( "beacon_blob_gossip_slot_start_delay_time", "Duration between when the blob is received over gossip and the start of the slot it belongs to.", @@ -292,6 +302,15 @@ lazy_static! { // [0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 50] //decimal_buckets(-1,2) ); + pub static ref BEACON_DATA_COLUMN_GOSSIP_SLOT_START_DELAY_TIME: Result = try_create_histogram_with_buckets( + "beacon_data_column_gossip_slot_start_delay_time", + "Duration between when the data column sidecar is received over gossip and the start of the slot it belongs to.", + // Create a custom bucket list for greater granularity in block delay + Ok(vec![0.1, 0.2, 0.3,0.4,0.5,0.75,1.0,1.25,1.5,1.75,2.0,2.5,3.0,3.5,4.0,5.0,6.0,7.0,8.0,9.0,10.0,15.0,20.0]) + // NOTE: Previous values, which we may want to switch back to. + // [0.1, 0.2, 0.5, 1, 2, 5, 10, 20, 50] + //decimal_buckets(-1,2) + ); pub static ref BEACON_BLOB_RPC_SLOT_START_DELAY_TIME: Result = try_create_histogram_with_buckets( "beacon_blob_rpc_slot_start_delay_time", "Duration between when a blob is received over rpc and the start of the slot it belongs to.", @@ -306,6 +325,10 @@ lazy_static! { "beacon_blob_last_delay", "Keeps track of the last blob's delay from the start of the slot" ); + pub static ref BEACON_DATA_COLUMN_LAST_DELAY: Result = try_create_int_gauge( + "beacon_data_column_last_delay", + "Keeps track of the last data column sidecar's delay from the start of the slot" + ); pub static ref BEACON_BLOB_GOSSIP_ARRIVED_LATE_TOTAL: Result = try_create_int_counter( "beacon_blob_gossip_arrived_late_total", diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 9d9b196e9be..2362b3d672f 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -4,7 +4,9 @@ use crate::{ service::NetworkMessage, sync::SyncMessage, }; -use beacon_chain::blob_verification::{GossipBlobError, GossipVerifiedBlob}; +use beacon_chain::blob_verification::{ + GossipBlobError, GossipVerifiedBlob, GossipVerifiedDataColumnSidecar, +}; use beacon_chain::block_verification_types::AsBlock; use beacon_chain::store::Error; use beacon_chain::{ @@ -31,9 +33,9 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use store::hot_cold_store::HotColdDBError; use tokio::sync::mpsc; use types::{ - Attestation, AttesterSlashing, BlobSidecar, EthSpec, Hash256, IndexedAttestation, - LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing, - SignedAggregateAndProof, SignedBeaconBlock, SignedBlsToExecutionChange, + Attestation, AttesterSlashing, BlobSidecar, DataColumnSidecar, DataColumnSubnetId, EthSpec, + Hash256, IndexedAttestation, LightClientFinalityUpdate, LightClientOptimisticUpdate, + ProposerSlashing, SignedAggregateAndProof, SignedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof, SignedVoluntaryExit, Slot, SubnetId, SyncCommitteeMessage, SyncSubnetId, }; @@ -599,6 +601,75 @@ impl NetworkBeaconProcessor { } } + pub async fn process_gossip_data_column_sidecar( + self: &Arc, + message_id: MessageId, + peer_id: PeerId, + _peer_client: Client, + subnet_id: DataColumnSubnetId, + column_sidecar: Arc>, + seen_duration: Duration, + ) { + let slot = column_sidecar.slot(); + let root = column_sidecar.block_root(); + let index = column_sidecar.index; + let delay = get_slot_delay_ms(seen_duration, slot, &self.chain.slot_clock); + // Log metrics to track delay from other nodes on the network. + metrics::observe_duration( + &metrics::BEACON_DATA_COLUMN_GOSSIP_SLOT_START_DELAY_TIME, + delay, + ); + metrics::set_gauge( + &metrics::BEACON_DATA_COLUMN_LAST_DELAY, + delay.as_millis() as i64, + ); + match self + .chain + .verify_data_column_sidecar_for_gossip(column_sidecar, *subnet_id) + { + Ok(gossip_verified_data_column) => { + metrics::inc_counter( + &metrics::BEACON_PROCESSOR_GOSSIP_DATA_COLUMN_SIDECAR_VERIFIED_TOTAL, + ); + + debug!( + self.log, + "Successfully verified gossip data column sidecar"; + "slot" => %slot, + "root" => %root, + "index" => %index, + ); + + self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Accept); + + // Log metrics to keep track of propagation delay times. + if let Some(duration) = SystemTime::now() + .duration_since(UNIX_EPOCH) + .ok() + .and_then(|now| now.checked_sub(seen_duration)) + { + metrics::observe_duration( + &metrics::BEACON_DATA_COLUMN_GOSSIP_PROPAGATION_VERIFICATION_DELAY_TIME, + duration, + ); + } + self.process_gossip_verified_data_column( + peer_id, + gossip_verified_data_column, + seen_duration, + ) + .await + } + Err(err) => { + error!( + self.log, + "Internal error when verifying data column sidecar"; + "error" => ?err, + ) + } + } + } + #[allow(clippy::too_many_arguments)] pub async fn process_gossip_blob( self: &Arc, @@ -796,6 +867,16 @@ impl NetworkBeaconProcessor { } } + pub async fn process_gossip_verified_data_column( + self: &Arc, + _peer_id: PeerId, + verified_data_column: GossipVerifiedDataColumnSidecar, + // This value is not used presently, but it might come in handy for debugging. + _seen_duration: Duration, + ) { + self.chain.process_gossip_data_column(verified_data_column); + } + /// Process the beacon block received from the gossip network and: /// /// - If it passes gossip propagation criteria, tell the network thread to forward it. diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index 67fc2fabb1e..42cab254412 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -229,6 +229,36 @@ impl NetworkBeaconProcessor { }) } + /// Create a new `Work` event for some data column sidecar. + pub fn send_gossip_data_column_sidecar( + self: &Arc, + message_id: MessageId, + peer_id: PeerId, + peer_client: Client, + subnet_id: DataColumnSubnetId, + column_sidecar: Arc>, + seen_timestamp: Duration, + ) -> Result<(), Error> { + let processor = self.clone(); + let process_fn = async move { + processor + .process_gossip_data_column_sidecar( + message_id, + peer_id, + peer_client, + subnet_id, + column_sidecar, + seen_timestamp, + ) + .await + }; + + self.try_send(BeaconWorkEvent { + drop_during_sync: false, + work: Work::GossipDataColumnSidecar(Box::pin(process_fn)), + }) + } + /// Create a new `Work` event for some sync committee signature. pub fn send_gossip_sync_signature( self: &Arc, diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index f56a3b7445e..924e9355d8c 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -308,6 +308,20 @@ impl Router { ), ) } + PubsubMessage::DataColumnSidecar(data) => { + let (subnet_id, column_sidecar) = *data; + self.handle_beacon_processor_send_result( + self.network_beacon_processor + .send_gossip_data_column_sidecar( + message_id, + peer_id, + self.network_globals.client(&peer_id), + subnet_id, + column_sidecar, + timestamp_now(), + ), + ) + } PubsubMessage::VoluntaryExit(exit) => { debug!(self.log, "Received a voluntary exit"; "peer_id" => %peer_id); self.handle_beacon_processor_send_result( diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 01a7e1f9896..2186f8ac896 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -34,8 +34,8 @@ use task_executor::ShutdownReason; use tokio::sync::mpsc; use tokio::time::Sleep; use types::{ - ChainSpec, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, SyncSubnetId, - Unsigned, ValidatorSubscription, + ChainSpec, DataColumnSubnetId, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription, + SyncSubnetId, Unsigned, ValidatorSubscription, }; mod tests; @@ -753,6 +753,29 @@ impl NetworkService { } } + if !self.subscribe_all_subnets { + for column_subnet in + DataColumnSubnetId::compute_subnets_for_data_column::( + self.network_globals.local_enr().node_id().raw().into(), + &self.beacon_chain.spec, + ) + { + for fork_digest in self.required_gossip_fork_digests() { + let gossip_kind = Subnet::DataColumn(column_subnet).into(); + let topic = GossipTopic::new( + gossip_kind, + GossipEncoding::default(), + fork_digest, + ); + if self.libp2p.subscribe(topic.clone()) { + subscribed_topics.push(topic); + } else { + warn!(self.log, "Could not subscribe to topic"; "topic" => %topic); + } + } + } + } + // If we are to subscribe to all subnets we do it here if self.subscribe_all_subnets { for subnet_id in 0..<::EthSpec as EthSpec>::SubnetBitfieldLength::to_u64() { @@ -786,6 +809,23 @@ impl NetworkService { } } } + // Subscribe to all data column subnets + for column_subnet in 0..T::EthSpec::data_column_subnet_count() as u64 { + for fork_digest in self.required_gossip_fork_digests() { + let gossip_kind = + Subnet::DataColumn(DataColumnSubnetId::new(column_subnet)).into(); + let topic = GossipTopic::new( + gossip_kind, + GossipEncoding::default(), + fork_digest, + ); + if self.libp2p.subscribe(topic.clone()) { + subscribed_topics.push(topic); + } else { + warn!(self.log, "Could not subscribe to topic"; "topic" => %topic); + } + } + } } if !subscribed_topics.is_empty() { diff --git a/consensus/types/presets/gnosis/deneb.yaml b/consensus/types/presets/gnosis/deneb.yaml index d2d7d0abed3..bef51470e89 100644 --- a/consensus/types/presets/gnosis/deneb.yaml +++ b/consensus/types/presets/gnosis/deneb.yaml @@ -12,3 +12,12 @@ MAX_BLOB_COMMITMENTS_PER_BLOCK: 4096 MAX_BLOBS_PER_BLOCK: 6 # `floorlog2(BLOB_KZG_COMMITMENTS_GINDEX) + 1 + ceillog2(MAX_BLOB_COMMITMENTS_PER_BLOCK)` = 4 + 1 + 12 = 17 KZG_COMMITMENT_INCLUSION_PROOF_DEPTH: 17 + +# EIP-7594 (temporary in Deneb for the purpose of prototyping) +# --------------------------------------------------------------- +# `uint64(2**6)` (= 64) +FIELD_ELEMENTS_PER_CELL: 64 +# uint64(floorlog2(get_generalized_index(BeaconBlockBody, 'blob_kzg_commitments')) +KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH: 4 +# `uint64((FIELD_ELEMENTS_PER_BLOB * 2) // FIELD_ELEMENTS_PER_CELL)` (= 128) +NUMBER_OF_COLUMNS: 128 \ No newline at end of file diff --git a/consensus/types/presets/mainnet/deneb.yaml b/consensus/types/presets/mainnet/deneb.yaml index 6d2fb4abde9..0b2f28853ec 100644 --- a/consensus/types/presets/mainnet/deneb.yaml +++ b/consensus/types/presets/mainnet/deneb.yaml @@ -10,3 +10,12 @@ MAX_BLOB_COMMITMENTS_PER_BLOCK: 4096 MAX_BLOBS_PER_BLOCK: 6 # `floorlog2(BLOB_KZG_COMMITMENTS_GINDEX) + 1 + ceillog2(MAX_BLOB_COMMITMENTS_PER_BLOCK)` = 4 + 1 + 12 = 17 KZG_COMMITMENT_INCLUSION_PROOF_DEPTH: 17 + +# EIP-7594 (temporary in Deneb for the purpose of prototyping) +# --------------------------------------------------------------- +# `uint64(2**6)` (= 64) +FIELD_ELEMENTS_PER_CELL: 64 +# uint64(floorlog2(get_generalized_index(BeaconBlockBody, 'blob_kzg_commitments')) +KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH: 4 +# `uint64((FIELD_ELEMENTS_PER_BLOB * 2) // FIELD_ELEMENTS_PER_CELL)` (= 128) +NUMBER_OF_COLUMNS: 128 \ No newline at end of file diff --git a/consensus/types/presets/minimal/deneb.yaml b/consensus/types/presets/minimal/deneb.yaml index be2b9fadfa5..8bb3e0b66bd 100644 --- a/consensus/types/presets/minimal/deneb.yaml +++ b/consensus/types/presets/minimal/deneb.yaml @@ -10,3 +10,12 @@ MAX_BLOB_COMMITMENTS_PER_BLOCK: 16 MAX_BLOBS_PER_BLOCK: 6 # [customized] `floorlog2(BLOB_KZG_COMMITMENTS_GINDEX) + 1 + ceillog2(MAX_BLOB_COMMITMENTS_PER_BLOCK)` = 4 + 1 + 4 = 9 KZG_COMMITMENT_INCLUSION_PROOF_DEPTH: 9 + +# EIP-7594 (temporary in Deneb for the purpose of prototyping) +# --------------------------------------------------------------- +# `uint64(2**6)` (= 64) +FIELD_ELEMENTS_PER_CELL: 64 +# uint64(floorlog2(get_generalized_index(BeaconBlockBody, 'blob_kzg_commitments')) +KZG_COMMITMENTS_INCLUSION_PROOF_DEPTH: 4 +# `uint64((FIELD_ELEMENTS_PER_BLOB * 2) // FIELD_ELEMENTS_PER_CELL)` (= 128) +NUMBER_OF_COLUMNS: 128 \ No newline at end of file diff --git a/consensus/types/src/chain_spec.rs b/consensus/types/src/chain_spec.rs index b2120fb0406..109d706b800 100644 --- a/consensus/types/src/chain_spec.rs +++ b/consensus/types/src/chain_spec.rs @@ -168,6 +168,11 @@ pub struct ChainSpec { pub deneb_fork_version: [u8; 4], pub deneb_fork_epoch: Option, + /* + * DAS params + */ + pub custody_requirement: u64, + /* * Networking */ @@ -197,6 +202,7 @@ pub struct ChainSpec { pub max_request_blob_sidecars: u64, pub min_epochs_for_blob_sidecars_requests: u64, pub blob_sidecar_subnet_count: u64, + pub data_column_sidecar_subnet_count: u64, /* * Networking Derived @@ -681,6 +687,11 @@ impl ChainSpec { deneb_fork_version: [0x04, 0x00, 0x00, 0x00], deneb_fork_epoch: None, + /* + * DAS params + */ + custody_requirement: 1, + /* * Network specific */ @@ -710,6 +721,7 @@ impl ChainSpec { max_request_blob_sidecars: default_max_request_blob_sidecars(), min_epochs_for_blob_sidecars_requests: default_min_epochs_for_blob_sidecars_requests(), blob_sidecar_subnet_count: default_blob_sidecar_subnet_count(), + data_column_sidecar_subnet_count: default_data_column_sidecar_subnet_count(), /* * Derived Deneb Specific @@ -941,7 +953,10 @@ impl ChainSpec { */ deneb_fork_version: [0x04, 0x00, 0x00, 0x64], deneb_fork_epoch: None, - + /* + * DAS params + */ + custody_requirement: 1, /* * Network specific */ @@ -971,6 +986,7 @@ impl ChainSpec { max_request_blob_sidecars: default_max_request_blob_sidecars(), min_epochs_for_blob_sidecars_requests: default_min_epochs_for_blob_sidecars_requests(), blob_sidecar_subnet_count: default_blob_sidecar_subnet_count(), + data_column_sidecar_subnet_count: default_data_column_sidecar_subnet_count(), /* * Derived Deneb Specific @@ -1151,6 +1167,9 @@ pub struct Config { #[serde(default = "default_blob_sidecar_subnet_count")] #[serde(with = "serde_utils::quoted_u64")] blob_sidecar_subnet_count: u64, + #[serde(default = "default_data_column_sidecar_subnet_count")] + #[serde(with = "serde_utils::quoted_u64")] + data_column_sidecar_subnet_count: u64, } fn default_bellatrix_fork_version() -> [u8; 4] { @@ -1256,6 +1275,10 @@ const fn default_blob_sidecar_subnet_count() -> u64 { 6 } +const fn default_data_column_sidecar_subnet_count() -> u64 { + 32 +} + const fn default_epochs_per_subnet_subscription() -> u64 { 256 } @@ -1418,6 +1441,7 @@ impl Config { max_request_blob_sidecars: spec.max_request_blob_sidecars, min_epochs_for_blob_sidecars_requests: spec.min_epochs_for_blob_sidecars_requests, blob_sidecar_subnet_count: spec.blob_sidecar_subnet_count, + data_column_sidecar_subnet_count: spec.data_column_sidecar_subnet_count, } } @@ -1482,6 +1506,7 @@ impl Config { max_request_blob_sidecars, min_epochs_for_blob_sidecars_requests, blob_sidecar_subnet_count, + data_column_sidecar_subnet_count, } = self; if preset_base != T::spec_name().to_string().as_str() { @@ -1539,6 +1564,7 @@ impl Config { max_request_blob_sidecars, min_epochs_for_blob_sidecars_requests, blob_sidecar_subnet_count, + data_column_sidecar_subnet_count, // We need to re-derive any values that might have changed in the config. max_blocks_by_root_request: max_blocks_by_root_request_common(max_request_blocks), diff --git a/consensus/types/src/data_column_sidecar.rs b/consensus/types/src/data_column_sidecar.rs new file mode 100644 index 00000000000..310c13a5e94 --- /dev/null +++ b/consensus/types/src/data_column_sidecar.rs @@ -0,0 +1,216 @@ +use crate::beacon_block_body::KzgCommitments; +use crate::test_utils::TestRandom; +use crate::{BlobSidecarList, EthSpec, Hash256, KzgProofs, SignedBeaconBlockHeader, Slot}; +use derivative::Derivative; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use safe_arith::ArithError; +use serde::{Deserialize, Serialize}; +use ssz_derive::{Decode, Encode}; +use ssz_types::Error as SszError; +use ssz_types::{FixedVector, VariableList}; +use test_random_derive::TestRandom; +use tree_hash::TreeHash; +use tree_hash_derive::TreeHash; + +pub type ColumnIndex = u64; +pub type Cell = FixedVector::FieldElementsPerCell>; +pub type DataColumn = VariableList, ::MaxBlobsPerBlock>; + +#[derive( + Debug, + Clone, + Serialize, + Deserialize, + Encode, + Decode, + TreeHash, + TestRandom, + Derivative, + arbitrary::Arbitrary, +)] +#[serde(bound = "T: EthSpec")] +#[arbitrary(bound = "T: EthSpec")] +#[derivative(PartialEq, Eq, Hash(bound = "T: EthSpec"))] +pub struct DataColumnSidecar { + #[serde(with = "serde_utils::quoted_u64")] + pub index: ColumnIndex, + #[serde(with = "ssz_types::serde_utils::list_of_hex_fixed_vec")] + pub column: DataColumn, + /// All of the KZG commitments and proofs associated with the block, used for verifying sample cells. + pub kzg_commitments: KzgCommitments, + pub kzg_proofs: KzgProofs, + pub signed_block_header: SignedBeaconBlockHeader, + /// An inclusion proof, proving the inclusion of `blob_kzg_commitments` in `BeaconBlockBody`. + pub kzg_commitments_inclusion_proof: FixedVector, +} + +impl DataColumnSidecar { + pub fn random_from_blob_sidecars( + blob_sidecars: &BlobSidecarList, + ) -> Result>, DataColumnSidecarError> { + if blob_sidecars.is_empty() { + return Ok(vec![]); + } + + let first_blob_sidecar = blob_sidecars + .first() + .ok_or(DataColumnSidecarError::MissingBlobSidecars)?; + let slot = first_blob_sidecar.slot(); + + // Proof for kzg commitments in `BeaconBlockBody` + let body_proof_start = first_blob_sidecar + .kzg_commitment_inclusion_proof + .len() + .saturating_sub(T::kzg_commitments_inclusion_proof_depth()); + let kzg_commitments_inclusion_proof: FixedVector< + Hash256, + T::KzgCommitmentsInclusionProofDepth, + > = first_blob_sidecar + .kzg_commitment_inclusion_proof + .get(body_proof_start..) + .ok_or(DataColumnSidecarError::KzgCommitmentInclusionProofOutOfBounds)? + .to_vec() + .into(); + + let mut rng = StdRng::seed_from_u64(slot.as_u64()); + let num_of_blobs = blob_sidecars.len(); + + (0..T::number_of_columns()) + .map(|col_index| { + Ok(DataColumnSidecar { + index: col_index as u64, + column: Self::generate_column_data(&mut rng, num_of_blobs, col_index)?, + kzg_commitments: blob_sidecars + .iter() + .map(|b| b.kzg_commitment) + .collect::>() + .into(), + kzg_proofs: blob_sidecars + .iter() + .map(|b| b.kzg_proof) + .collect::>() + .into(), + signed_block_header: first_blob_sidecar.signed_block_header.clone(), + kzg_commitments_inclusion_proof: kzg_commitments_inclusion_proof.clone(), + }) + }) + .collect::, _>>() + } + + fn generate_column_data( + rng: &mut StdRng, + num_of_blobs: usize, + index: usize, + ) -> Result, DataColumnSidecarError> { + let mut dummy_cell_data = Cell::::default(); + // Prefix with column index + let prefix = index.to_le_bytes(); + dummy_cell_data + .get_mut(..prefix.len()) + .ok_or(DataColumnSidecarError::DataColumnIndexOutOfBounds)? + .copy_from_slice(&prefix); + // Fill the rest of the vec with random values + rng.fill( + dummy_cell_data + .get_mut(prefix.len()..) + .ok_or(DataColumnSidecarError::DataColumnIndexOutOfBounds)?, + ); + + let column = DataColumn::::new(vec![dummy_cell_data; num_of_blobs])?; + Ok(column) + } + + pub fn slot(&self) -> Slot { + self.signed_block_header.message.slot + } + + pub fn block_root(&self) -> Hash256 { + self.signed_block_header.message.tree_hash_root() + } +} + +#[derive(Debug)] +pub enum DataColumnSidecarError { + ArithError(ArithError), + MissingBlobSidecars, + KzgCommitmentInclusionProofOutOfBounds, + DataColumnIndexOutOfBounds, + SszError(SszError), +} + +impl From for DataColumnSidecarError { + fn from(e: ArithError) -> Self { + Self::ArithError(e) + } +} + +impl From for DataColumnSidecarError { + fn from(e: SszError) -> Self { + Self::SszError(e) + } +} + +#[cfg(test)] +mod test { + use crate::beacon_block::EmptyBlock; + use crate::beacon_block_body::KzgCommitments; + use crate::eth_spec::EthSpec; + use crate::{ + BeaconBlock, BeaconBlockDeneb, Blob, BlobSidecar, BlobSidecarList, ChainSpec, + DataColumnSidecar, MainnetEthSpec, SignedBeaconBlock, + }; + use bls::Signature; + use kzg::{KzgCommitment, KzgProof}; + use std::sync::Arc; + + #[test] + fn test_random_from_blob_sidecars() { + type E = MainnetEthSpec; + let num_of_blobs = 6; + let spec = E::default_spec(); + let blob_sidecars: BlobSidecarList = create_test_blob_sidecars(num_of_blobs, &spec); + + let column_sidecars = DataColumnSidecar::random_from_blob_sidecars(&blob_sidecars).unwrap(); + + assert_eq!(column_sidecars.len(), E::number_of_columns()); + + for (idx, col_sidecar) in column_sidecars.iter().enumerate() { + assert_eq!(col_sidecar.index, idx as u64); + assert_eq!(col_sidecar.kzg_commitments.len(), num_of_blobs); + // ensure column sidecars are prefixed with column index (for verification purpose in prototype only) + let prefix_len = 8; // column index (u64) is stored as the first 8 bytes + let cell = col_sidecar.column.first().unwrap(); + let col_index_prefix = u64::from_le_bytes(cell[0..prefix_len].try_into().unwrap()); + assert_eq!(col_index_prefix, idx as u64) + } + } + + fn create_test_blob_sidecars( + num_of_blobs: usize, + spec: &ChainSpec, + ) -> BlobSidecarList { + let mut block = BeaconBlock::Deneb(BeaconBlockDeneb::empty(spec)); + let mut body = block.body_mut(); + let blob_kzg_commitments = body.blob_kzg_commitments_mut().unwrap(); + *blob_kzg_commitments = + KzgCommitments::::new(vec![KzgCommitment::empty_for_testing(); num_of_blobs]) + .unwrap(); + + let signed_block = SignedBeaconBlock::from_block(block, Signature::empty()); + + (0..num_of_blobs) + .map(|index| { + BlobSidecar::new( + index, + Blob::::default(), + &signed_block, + KzgProof::empty(), + ) + .map(Arc::new) + }) + .collect::, _>>() + .unwrap() + .into() + } +} diff --git a/consensus/types/src/data_column_subnet_id.rs b/consensus/types/src/data_column_subnet_id.rs new file mode 100644 index 00000000000..5a42b323895 --- /dev/null +++ b/consensus/types/src/data_column_subnet_id.rs @@ -0,0 +1,170 @@ +//! Identifies each data column subnet by an integer identifier. +use crate::{ChainSpec, EthSpec}; +use ethereum_types::U256; +use safe_arith::{ArithError, SafeArith}; +use serde::{Deserialize, Serialize}; +use std::fmt::{self, Display}; +use std::ops::{Deref, DerefMut}; + +const DATA_COLUMN_SUBNET_COUNT: u64 = 64; + +lazy_static! { + static ref DATA_COLUMN_SUBNET_ID_TO_STRING: Vec = { + let mut v = Vec::with_capacity(DATA_COLUMN_SUBNET_COUNT as usize); + + for i in 0..DATA_COLUMN_SUBNET_COUNT { + v.push(i.to_string()); + } + v + }; +} + +#[derive(arbitrary::Arbitrary, Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)] +#[serde(transparent)] +pub struct DataColumnSubnetId(#[serde(with = "serde_utils::quoted_u64")] u64); + +pub fn data_column_subnet_id_to_string(i: u64) -> &'static str { + if i < DATA_COLUMN_SUBNET_COUNT { + DATA_COLUMN_SUBNET_ID_TO_STRING + .get(i as usize) + .expect("index below DATA_COLUMN_SUBNET_COUNT") + } else { + "data column subnet id out of range" + } +} + +impl DataColumnSubnetId { + pub fn new(id: u64) -> Self { + id.into() + } + + pub fn try_from_column_index(column_index: usize) -> Result { + let id = column_index.safe_rem(T::data_column_subnet_count())? as u64; + Ok(id.into()) + } + + #[allow(clippy::arithmetic_side_effects)] + /// Compute required subnets to subscribe to given the node id. + /// TODO(das): Add epoch param + /// TODO(das): Add num of subnets (from ENR) + pub fn compute_subnets_for_data_column( + node_id: U256, + spec: &ChainSpec, + ) -> impl Iterator { + let num_of_column_subnets = T::data_column_subnet_count() as u64; + (0..spec.custody_requirement) + .map(move |i| { + let node_offset = (node_id % U256::from(num_of_column_subnets)).as_u64(); + node_offset.saturating_add(i) % num_of_column_subnets + }) + .map(DataColumnSubnetId::new) + } +} + +impl Display for DataColumnSubnetId { + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + write!(f, "{}", self.0) + } +} + +impl Deref for DataColumnSubnetId { + type Target = u64; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl DerefMut for DataColumnSubnetId { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl From for DataColumnSubnetId { + fn from(x: u64) -> Self { + Self(x) + } +} + +impl Into for DataColumnSubnetId { + fn into(self) -> u64 { + self.0 + } +} + +impl Into for &DataColumnSubnetId { + fn into(self) -> u64 { + self.0 + } +} + +impl AsRef for DataColumnSubnetId { + fn as_ref(&self) -> &str { + data_column_subnet_id_to_string(self.0) + } +} + +#[derive(Debug)] +pub enum Error { + ArithError(ArithError), +} + +impl From for Error { + fn from(e: ArithError) -> Self { + Error::ArithError(e) + } +} + +#[cfg(test)] +mod test { + use crate::data_column_subnet_id::DataColumnSubnetId; + use crate::ChainSpec; + + #[test] + fn test_compute_subnets_for_data_column() { + let node_ids = [ + "0", + "88752428858350697756262172400162263450541348766581994718383409852729519486397", + "18732750322395381632951253735273868184515463718109267674920115648614659369468", + "27726842142488109545414954493849224833670205008410190955613662332153332462900", + "39755236029158558527862903296867805548949739810920318269566095185775868999998", + "31899136003441886988955119620035330314647133604576220223892254902004850516297", + "58579998103852084482416614330746509727562027284701078483890722833654510444626", + "28248042035542126088870192155378394518950310811868093527036637864276176517397", + "60930578857433095740782970114409273483106482059893286066493409689627770333527", + "103822458477361691467064888613019442068586830412598673713899771287914656699997", + ] + .into_iter() + .map(|v| ethereum_types::U256::from_dec_str(v).unwrap()) + .collect::>(); + + let expected_subnets = vec![ + vec![0], + vec![29], + vec![28], + vec![20], + vec![30], + vec![9], + vec![18], + vec![21], + vec![23], + vec![29], + ]; + + let spec = ChainSpec::mainnet(); + + for x in 0..node_ids.len() { + let computed_subnets = DataColumnSubnetId::compute_subnets_for_data_column::< + crate::MainnetEthSpec, + >(node_ids[x], &spec); + + assert_eq!( + expected_subnets[x], + computed_subnets + .map(DataColumnSubnetId::into) + .collect::>() + ); + } + } +} diff --git a/consensus/types/src/eth_spec.rs b/consensus/types/src/eth_spec.rs index 17baad9c4c7..00bf41e8a73 100644 --- a/consensus/types/src/eth_spec.rs +++ b/consensus/types/src/eth_spec.rs @@ -109,8 +109,16 @@ pub trait EthSpec: type MaxBlobsPerBlock: Unsigned + Clone + Sync + Send + Debug + PartialEq + Unpin; type MaxBlobCommitmentsPerBlock: Unsigned + Clone + Sync + Send + Debug + PartialEq + Unpin; type FieldElementsPerBlob: Unsigned + Clone + Sync + Send + Debug + PartialEq; + type FieldElementsPerCell: Unsigned + Clone + Sync + Send + Debug + PartialEq; type BytesPerFieldElement: Unsigned + Clone + Sync + Send + Debug + PartialEq; type KzgCommitmentInclusionProofDepth: Unsigned + Clone + Sync + Send + Debug + PartialEq; + /* + * New in PeerDAS + */ + type DataColumnSubnetCount: Unsigned + Clone + Sync + Send + Debug + PartialEq; + type DataColumnCount: Unsigned + Clone + Sync + Send + Debug + PartialEq; + type MaxBytesPerColumn: Unsigned + Clone + Sync + Send + Debug + PartialEq; + type KzgCommitmentsInclusionProofDepth: Unsigned + Clone + Sync + Send + Debug + PartialEq; /* * Derived values (set these CAREFULLY) */ @@ -269,14 +277,36 @@ pub trait EthSpec: Self::FieldElementsPerBlob::to_usize() } + /// Returns the `FIELD_ELEMENTS_PER_CELL` constant for this specification. + fn field_elements_per_cell() -> usize { + Self::FieldElementsPerCell::to_usize() + } + /// Returns the `BYTES_PER_BLOB` constant for this specification. fn bytes_per_blob() -> usize { Self::BytesPerBlob::to_usize() } + /// Returns the `KZG_COMMITMENT_INCLUSION_PROOF_DEPTH` preset for this specification. fn kzg_proof_inclusion_proof_depth() -> usize { Self::KzgCommitmentInclusionProofDepth::to_usize() } + + fn number_of_columns() -> usize { + Self::DataColumnCount::to_usize() + } + + fn data_column_subnet_count() -> usize { + Self::DataColumnSubnetCount::to_usize() + } + + fn max_bytes_per_column() -> usize { + Self::MaxBytesPerColumn::to_usize() + } + + fn kzg_commitments_inclusion_proof_depth() -> usize { + Self::KzgCommitmentsInclusionProofDepth::to_usize() + } } /// Macro to inherit some type values from another EthSpec. @@ -320,8 +350,16 @@ impl EthSpec for MainnetEthSpec { type MaxBlobCommitmentsPerBlock = U4096; type BytesPerFieldElement = U32; type FieldElementsPerBlob = U4096; + type FieldElementsPerCell = U64; type BytesPerBlob = U131072; type KzgCommitmentInclusionProofDepth = U17; + type DataColumnSubnetCount = U32; + type DataColumnCount = U128; + // Column samples are entire columns in 1D DAS. + // max data size = extended_blob_bytes * max_blobs_per_block / num_of_columns + // 256kb * 32 / 128 = 64kb + type MaxBytesPerColumn = U65536; + type KzgCommitmentsInclusionProofDepth = U4; // inclusion of the whole list of commitments type SyncSubcommitteeSize = U128; // 512 committee size / 4 sync committee subnet count type MaxPendingAttestations = U4096; // 128 max attestations * 32 slots per epoch type SlotsPerEth1VotingPeriod = U2048; // 64 epochs * 32 slots per epoch @@ -353,9 +391,15 @@ impl EthSpec for MinimalEthSpec { type SlotsPerEth1VotingPeriod = U32; // 4 epochs * 8 slots per epoch type MaxWithdrawalsPerPayload = U4; type FieldElementsPerBlob = U4096; + type FieldElementsPerCell = U64; type BytesPerBlob = U131072; type MaxBlobCommitmentsPerBlock = U16; type KzgCommitmentInclusionProofDepth = U9; + // DAS spec values copied from `MainnetEthSpec` + type DataColumnSubnetCount = U32; + type DataColumnCount = U128; + type MaxBytesPerColumn = U65536; + type KzgCommitmentsInclusionProofDepth = U4; params_from_eth_spec!(MainnetEthSpec { JustificationBitsLength, @@ -427,9 +471,15 @@ impl EthSpec for GnosisEthSpec { type MaxBlobsPerBlock = U6; type MaxBlobCommitmentsPerBlock = U4096; type FieldElementsPerBlob = U4096; + type FieldElementsPerCell = U64; type BytesPerFieldElement = U32; type BytesPerBlob = U131072; type KzgCommitmentInclusionProofDepth = U17; + // DAS spec values copied from `MainnetEthSpec` + type DataColumnSubnetCount = U32; + type DataColumnCount = U128; + type MaxBytesPerColumn = U65536; + type KzgCommitmentsInclusionProofDepth = U4; fn default_spec() -> ChainSpec { ChainSpec::gnosis() diff --git a/consensus/types/src/lib.rs b/consensus/types/src/lib.rs index b07b497a2ae..3c3e18d9297 100644 --- a/consensus/types/src/lib.rs +++ b/consensus/types/src/lib.rs @@ -99,6 +99,8 @@ pub mod slot_data; pub mod sqlite; pub mod blob_sidecar; +pub mod data_column_sidecar; +pub mod data_column_subnet_id; pub mod light_client_header; pub mod non_zero_usize; pub mod runtime_var_list; @@ -127,6 +129,8 @@ pub use crate::chain_spec::{ChainSpec, Config, Domain}; pub use crate::checkpoint::Checkpoint; pub use crate::config_and_preset::{ConfigAndPreset, ConfigAndPresetCapella, ConfigAndPresetDeneb}; pub use crate::contribution_and_proof::ContributionAndProof; +pub use crate::data_column_sidecar::DataColumnSidecar; +pub use crate::data_column_subnet_id::DataColumnSubnetId; pub use crate::deposit::{Deposit, DEPOSIT_TREE_DEPTH}; pub use crate::deposit_data::DepositData; pub use crate::deposit_message::DepositMessage; diff --git a/consensus/types/src/preset.rs b/consensus/types/src/preset.rs index 63a372ea1c9..fe8cc94f818 100644 --- a/consensus/types/src/preset.rs +++ b/consensus/types/src/preset.rs @@ -214,6 +214,13 @@ pub struct DenebPreset { pub max_blob_commitments_per_block: u64, #[serde(with = "serde_utils::quoted_u64")] pub field_elements_per_blob: u64, + // EIP-7594 DAS presets - to be moved to the next fork + #[serde(with = "serde_utils::quoted_u64")] + pub field_elements_per_cell: u64, + #[serde(with = "serde_utils::quoted_u64")] + pub kzg_commitments_inclusion_proof_depth: u64, + #[serde(with = "serde_utils::quoted_u64")] + pub number_of_columns: u64, } impl DenebPreset { @@ -222,6 +229,10 @@ impl DenebPreset { max_blobs_per_block: T::max_blobs_per_block() as u64, max_blob_commitments_per_block: T::max_blob_commitments_per_block() as u64, field_elements_per_blob: T::field_elements_per_blob() as u64, + field_elements_per_cell: T::field_elements_per_cell() as u64, + kzg_commitments_inclusion_proof_depth: T::kzg_commitments_inclusion_proof_depth() + as u64, + number_of_columns: T::number_of_columns() as u64, } } }