diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index d9ae0e23451..ca663c2e20c 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -40,6 +40,7 @@ pub use crate::scheduler::BeaconProcessorQueueLengths; use crate::scheduler::work_queue::WorkQueues; +use crate::work::WorkCategory; use crate::work_reprocessing_queue::{ QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage, }; @@ -53,26 +54,27 @@ use serde::{Deserialize, Serialize}; use slot_clock::SlotClock; use std::cmp; use std::collections::HashSet; -use std::fmt; use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::task::Context; use std::time::{Duration, Instant}; -use strum::IntoStaticStr; use task_executor::{RayonPoolType, TaskExecutor}; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; use tracing::{debug, error, trace, warn}; -use types::{EthSpec, Hash256, SignedAggregateAndProof, SingleAttestation, Slot, SubnetId}; +use types::{EthSpec, Hash256, SignedAggregateAndProof, SubnetId}; use work_reprocessing_queue::IgnoredRpcBlock; use work_reprocessing_queue::{ QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork, spawn_reprocess_scheduler, }; +pub use work::{AsyncFn, BlockingFn, BlockingOrAsync, GossipAttestationBatch, Work, WorkType}; + mod metrics; pub mod scheduler; +pub mod work; /// The maximum size of the channel for work events to the `BeaconProcessor`. /// @@ -108,13 +110,18 @@ const WORKER_TASK_NAME: &str = "beacon_processor_worker"; const DEFAULT_MAX_GOSSIP_ATTESTATION_BATCH_SIZE: usize = 64; const DEFAULT_MAX_GOSSIP_AGGREGATE_BATCH_SIZE: usize = 64; +// Max io bound tasks that can be scheduled at once +// TODO what number should we pick? +const MAX_IO_BOUND_WORKERS: usize = 100; + /// Unique IDs used for metrics and testing. pub const WORKER_FREED: &str = "worker_freed"; pub const NOTHING_TO_DO: &str = "nothing_to_do"; #[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] pub struct BeaconProcessorConfig { - pub max_workers: usize, + pub max_io_bound_workers: usize, + pub max_cpu_bound_workers: usize, pub max_work_event_queue_len: usize, pub max_scheduled_work_queue_len: usize, pub max_gossip_attestation_batch_size: usize, @@ -125,7 +132,8 @@ pub struct BeaconProcessorConfig { impl Default for BeaconProcessorConfig { fn default() -> Self { Self { - max_workers: cmp::max(1, num_cpus::get()), + max_io_bound_workers: MAX_IO_BOUND_WORKERS, + max_cpu_bound_workers: cmp::max(1, num_cpus::get()), max_work_event_queue_len: DEFAULT_MAX_WORK_EVENT_QUEUE_LEN, max_scheduled_work_queue_len: DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN, max_gossip_attestation_batch_size: DEFAULT_MAX_GOSSIP_ATTESTATION_BATCH_SIZE, @@ -332,206 +340,10 @@ impl BeaconProcessorSend { } } -pub type AsyncFn = Pin + Send + Sync>>; -pub type BlockingFn = Box; -pub type BlockingFnWithManualSendOnIdle = Box; -pub enum BlockingOrAsync { - Blocking(BlockingFn), - Async(AsyncFn), -} -pub type GossipAttestationBatch = Vec>; - -/// Indicates the type of work to be performed and therefore its priority and -/// queuing specifics. -pub enum Work { - GossipAttestation { - attestation: Box>, - process_individual: - Box) + Send + Sync>, - process_batch: Box, - }, - UnknownBlockAttestation { - process_fn: BlockingFn, - }, - GossipAttestationBatch { - attestations: GossipAttestationBatch, - process_batch: Box, - }, - GossipAggregate { - aggregate: Box>, - process_individual: Box) + Send + Sync>, - process_batch: Box>) + Send + Sync>, - }, - UnknownBlockAggregate { - process_fn: BlockingFn, - }, - UnknownLightClientOptimisticUpdate { - parent_root: Hash256, - process_fn: BlockingFn, - }, - GossipAggregateBatch { - aggregates: Vec>, - process_batch: Box>) + Send + Sync>, - }, - GossipBlock(AsyncFn), - GossipBlobSidecar(AsyncFn), - GossipDataColumnSidecar(AsyncFn), - DelayedImportBlock { - beacon_block_slot: Slot, - beacon_block_root: Hash256, - process_fn: AsyncFn, - }, - GossipVoluntaryExit(BlockingFn), - GossipProposerSlashing(BlockingFn), - GossipAttesterSlashing(BlockingFn), - GossipSyncSignature(BlockingFn), - GossipSyncContribution(BlockingFn), - GossipLightClientFinalityUpdate(BlockingFn), - GossipLightClientOptimisticUpdate(BlockingFn), - RpcBlock { - process_fn: AsyncFn, - }, - RpcBlobs { - process_fn: AsyncFn, - }, - RpcCustodyColumn(AsyncFn), - ColumnReconstruction(AsyncFn), - IgnoredRpcBlock { - process_fn: BlockingFn, - }, - ChainSegment(AsyncFn), - ChainSegmentBackfill(BlockingFn), - Status(BlockingFn), - BlocksByRangeRequest(AsyncFn), - BlocksByRootsRequest(AsyncFn), - BlobsByRangeRequest(BlockingFn), - BlobsByRootsRequest(BlockingFn), - DataColumnsByRootsRequest(BlockingFn), - DataColumnsByRangeRequest(BlockingFn), - GossipBlsToExecutionChange(BlockingFn), - LightClientBootstrapRequest(BlockingFn), - LightClientOptimisticUpdateRequest(BlockingFn), - LightClientFinalityUpdateRequest(BlockingFn), - LightClientUpdatesByRangeRequest(BlockingFn), - ApiRequestP0(BlockingOrAsync), - ApiRequestP1(BlockingOrAsync), - Reprocess(ReprocessQueueMessage), -} - -impl fmt::Debug for Work { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", Into::<&'static str>::into(self.to_type())) - } -} - -#[derive(IntoStaticStr, PartialEq, Eq, Debug, Clone)] -#[strum(serialize_all = "snake_case")] -pub enum WorkType { - GossipAttestation, - GossipAttestationToConvert, - UnknownBlockAttestation, - GossipAttestationBatch, - GossipAggregate, - UnknownBlockAggregate, - UnknownLightClientOptimisticUpdate, - GossipAggregateBatch, - GossipBlock, - GossipBlobSidecar, - GossipDataColumnSidecar, - DelayedImportBlock, - GossipVoluntaryExit, - GossipProposerSlashing, - GossipAttesterSlashing, - GossipSyncSignature, - GossipSyncContribution, - GossipLightClientFinalityUpdate, - GossipLightClientOptimisticUpdate, - RpcBlock, - RpcBlobs, - RpcCustodyColumn, - ColumnReconstruction, - IgnoredRpcBlock, - ChainSegment, - ChainSegmentBackfill, - Status, - BlocksByRangeRequest, - BlocksByRootsRequest, - BlobsByRangeRequest, - BlobsByRootsRequest, - DataColumnsByRootsRequest, - DataColumnsByRangeRequest, - GossipBlsToExecutionChange, - LightClientBootstrapRequest, - LightClientOptimisticUpdateRequest, - LightClientFinalityUpdateRequest, - LightClientUpdatesByRangeRequest, - ApiRequestP0, - ApiRequestP1, - Reprocess, -} - -impl Work { - fn str_id(&self) -> &'static str { - self.to_type().into() - } - - /// Provides a `&str` that uniquely identifies each enum variant. - fn to_type(&self) -> WorkType { - match self { - Work::GossipAttestation { .. } => WorkType::GossipAttestation, - Work::GossipAttestationBatch { .. } => WorkType::GossipAttestationBatch, - Work::GossipAggregate { .. } => WorkType::GossipAggregate, - Work::GossipAggregateBatch { .. } => WorkType::GossipAggregateBatch, - Work::GossipBlock(_) => WorkType::GossipBlock, - Work::GossipBlobSidecar(_) => WorkType::GossipBlobSidecar, - Work::GossipDataColumnSidecar(_) => WorkType::GossipDataColumnSidecar, - Work::DelayedImportBlock { .. } => WorkType::DelayedImportBlock, - Work::GossipVoluntaryExit(_) => WorkType::GossipVoluntaryExit, - Work::GossipProposerSlashing(_) => WorkType::GossipProposerSlashing, - Work::GossipAttesterSlashing(_) => WorkType::GossipAttesterSlashing, - Work::GossipSyncSignature(_) => WorkType::GossipSyncSignature, - Work::GossipSyncContribution(_) => WorkType::GossipSyncContribution, - Work::GossipLightClientFinalityUpdate(_) => WorkType::GossipLightClientFinalityUpdate, - Work::GossipLightClientOptimisticUpdate(_) => { - WorkType::GossipLightClientOptimisticUpdate - } - Work::GossipBlsToExecutionChange(_) => WorkType::GossipBlsToExecutionChange, - Work::RpcBlock { .. } => WorkType::RpcBlock, - Work::RpcBlobs { .. } => WorkType::RpcBlobs, - Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn, - Work::ColumnReconstruction(_) => WorkType::ColumnReconstruction, - Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock, - Work::ChainSegment { .. } => WorkType::ChainSegment, - Work::ChainSegmentBackfill(_) => WorkType::ChainSegmentBackfill, - Work::Status(_) => WorkType::Status, - Work::BlocksByRangeRequest(_) => WorkType::BlocksByRangeRequest, - Work::BlocksByRootsRequest(_) => WorkType::BlocksByRootsRequest, - Work::BlobsByRangeRequest(_) => WorkType::BlobsByRangeRequest, - Work::BlobsByRootsRequest(_) => WorkType::BlobsByRootsRequest, - Work::DataColumnsByRootsRequest(_) => WorkType::DataColumnsByRootsRequest, - Work::DataColumnsByRangeRequest(_) => WorkType::DataColumnsByRangeRequest, - Work::LightClientBootstrapRequest(_) => WorkType::LightClientBootstrapRequest, - Work::LightClientOptimisticUpdateRequest(_) => { - WorkType::LightClientOptimisticUpdateRequest - } - Work::LightClientFinalityUpdateRequest(_) => WorkType::LightClientFinalityUpdateRequest, - Work::LightClientUpdatesByRangeRequest(_) => WorkType::LightClientUpdatesByRangeRequest, - Work::UnknownBlockAttestation { .. } => WorkType::UnknownBlockAttestation, - Work::UnknownBlockAggregate { .. } => WorkType::UnknownBlockAggregate, - Work::UnknownLightClientOptimisticUpdate { .. } => { - WorkType::UnknownLightClientOptimisticUpdate - } - Work::ApiRequestP0 { .. } => WorkType::ApiRequestP0, - Work::ApiRequestP1 { .. } => WorkType::ApiRequestP1, - Work::Reprocess { .. } => WorkType::Reprocess, - } - } -} - /// Unifies all the messages processed by the `BeaconProcessor`. enum InboundEvent { - /// A worker has completed a task and is free. - WorkerIdle, + /// A worker has completed a task for a specific work category and is free. + WorkerIdle(WorkCategory), /// There is new work to be done. WorkEvent((WorkEvent, Instant)), /// A work event that was queued for re-processing has become ready. @@ -558,8 +370,10 @@ impl Stream for InboundEvents { // Always check for idle workers before anything else. This allows us to ensure that a big // stream of new events doesn't suppress the processing of existing events. match self.idle_rx.poll_recv(cx) { - Poll::Ready(Some(_)) => { - return Poll::Ready(Some(InboundEvent::WorkerIdle)); + Poll::Ready(Some(work_type)) => { + return Poll::Ready(Some(InboundEvent::WorkerIdle(WorkCategory::get_category( + work_type, + )))); } Poll::Ready(None) => { return Poll::Ready(None); @@ -603,7 +417,8 @@ impl Stream for InboundEvents { pub struct BeaconProcessor { pub network_globals: Arc>, pub executor: TaskExecutor, - pub current_workers: usize, + pub current_cpu_bound_workers: usize, + pub current_io_bound_workers: usize, pub config: BeaconProcessorConfig, } @@ -614,7 +429,7 @@ impl BeaconProcessor { /// - Performed immediately, if a worker is available. /// - Queued for later processing, if no worker is currently available. /// - /// Only `self.config.max_workers` will ever be spawned at one time. Each worker is a `tokio` task + /// Only `self.config.max_cpu_bound_workers + self.config.max_io_bound_workers` will ever be spawned at one time. Each worker is a `tokio` task /// started with `spawn_blocking`. /// /// The optional `work_journal_tx` allows for an outside process to receive a log of all work @@ -665,8 +480,17 @@ impl BeaconProcessor { loop { let (work_event, created_timestamp) = match inbound_events.next().await { - Some(InboundEvent::WorkerIdle) => { - self.current_workers = self.current_workers.saturating_sub(1); + Some(InboundEvent::WorkerIdle(work_category)) => { + match work_category { + WorkCategory::IoBound => { + self.current_io_bound_workers = + self.current_io_bound_workers.saturating_sub(1) + } + WorkCategory::CpuBound => { + self.current_cpu_bound_workers = + self.current_cpu_bound_workers.saturating_sub(1) + } + } (None, Instant::now()) } Some(InboundEvent::WorkEvent((event, created_timestamp))) @@ -747,7 +571,11 @@ impl BeaconProcessor { } } - let can_spawn = self.current_workers < self.config.max_workers; + let can_spawn_io_bound = + self.current_io_bound_workers < self.config.max_io_bound_workers; + let can_spawn_cpu_bound = + self.current_cpu_bound_workers < self.config.max_cpu_bound_workers; + let drop_during_sync = work_event .as_ref() .is_some_and(|event| event.drop_during_sync); @@ -758,260 +586,30 @@ impl BeaconProcessor { // // We don't check the `work.drop_during_sync` here. We assume that if it made // it into the queue at any point then we should process it. - None if can_spawn => { - // Check for chain segments first, they're the most efficient way to get - // blocks into the system. - let work_event: Option> = if let Some(item) = - work_queues.chain_segment_queue.pop() - { - Some(item) - // Check sync blocks before gossip blocks, since we've already explicitly - // requested these blocks. - } else if let Some(item) = work_queues.rpc_block_queue.pop() { - Some(item) - } else if let Some(item) = work_queues.rpc_blob_queue.pop() { - Some(item) - } else if let Some(item) = work_queues.rpc_custody_column_queue.pop() { - Some(item) - // Check delayed blocks before gossip blocks, the gossip blocks might rely - // on the delayed ones. - } else if let Some(item) = work_queues.delayed_block_queue.pop() { - Some(item) - // Check gossip blocks before gossip attestations, since a block might be - // required to verify some attestations. - } else if let Some(item) = work_queues.gossip_block_queue.pop() { - Some(item) - } else if let Some(item) = work_queues.gossip_blob_queue.pop() { - Some(item) - } else if let Some(item) = work_queues.gossip_data_column_queue.pop() { - Some(item) - } else if let Some(item) = work_queues.column_reconstruction_queue.pop() { - Some(item) - // Check the priority 0 API requests after blocks and blobs, but before attestations. - } else if let Some(item) = work_queues.api_request_p0_queue.pop() { - Some(item) - // Check the aggregates, *then* the unaggregates since we assume that - // aggregates are more valuable to local validators and effectively give us - // more information with less signature verification time. - } else if !work_queues.aggregate_queue.is_empty() { - let batch_size = cmp::min( - work_queues.aggregate_queue.len(), - self.config.max_gossip_aggregate_batch_size, - ); - - if batch_size < 2 { - // One single aggregate is in the queue, process it individually. - work_queues.aggregate_queue.pop() - } else { - // Collect two or more aggregates into a batch, so they can take - // advantage of batch signature verification. - // - // Note: this will convert the `Work::GossipAggregate` item into a - // `Work::GossipAggregateBatch` item. - let mut aggregates = Vec::with_capacity(batch_size); - let mut process_batch_opt = None; - for _ in 0..batch_size { - if let Some(item) = work_queues.aggregate_queue.pop() { - match item { - Work::GossipAggregate { - aggregate, - process_individual: _, - process_batch, - } => { - aggregates.push(*aggregate); - if process_batch_opt.is_none() { - process_batch_opt = Some(process_batch); - } - } - _ => { - error!("Invalid item in aggregate queue"); - } - } - } - } - - if let Some(process_batch) = process_batch_opt { - // Process all aggregates with a single worker. - Some(Work::GossipAggregateBatch { - aggregates, - process_batch, - }) - } else { - // There is no good reason for this to - // happen, it is a serious logic error. - // Since we only form batches when multiple - // work items exist, we should always have a - // work closure at this point. - crit!("Missing aggregate work"); - None - } - } - // Check the unaggregated attestation queue. - // - // Potentially use batching. - } else if !work_queues.attestation_queue.is_empty() { - let batch_size = cmp::min( - work_queues.attestation_queue.len(), - self.config.max_gossip_attestation_batch_size, - ); - - if batch_size < 2 { - // One single attestation is in the queue, process it individually. - work_queues.attestation_queue.pop() - } else { - // Collect two or more attestations into a batch, so they can take - // advantage of batch signature verification. - // - // Note: this will convert the `Work::GossipAttestation` item into a - // `Work::GossipAttestationBatch` item. - let mut attestations = Vec::with_capacity(batch_size); - let mut process_batch_opt = None; - for _ in 0..batch_size { - if let Some(item) = work_queues.attestation_queue.pop() { - match item { - Work::GossipAttestation { - attestation, - process_individual: _, - process_batch, - } => { - attestations.push(*attestation); - if process_batch_opt.is_none() { - process_batch_opt = Some(process_batch); - } - } - _ => error!("Invalid item in attestation queue"), - } - } - } - - if let Some(process_batch) = process_batch_opt { - // Process all attestations with a single worker. - Some(Work::GossipAttestationBatch { - attestations, - process_batch, - }) - } else { - // There is no good reason for this to - // happen, it is a serious logic error. - // Since we only form batches when multiple - // work items exist, we should always have a - // work closure at this point. - crit!("Missing attestations work"); - None - } - } - // Convert any gossip attestations that need to be converted. - } else if let Some(item) = work_queues.attestation_to_convert_queue.pop() { - Some(item) - // Check sync committee messages after attestations as their rewards are lesser - // and they don't influence fork choice. - } else if let Some(item) = work_queues.sync_contribution_queue.pop() { - Some(item) - } else if let Some(item) = work_queues.sync_message_queue.pop() { - Some(item) - // Aggregates and unaggregates queued for re-processing are older and we - // care about fresher ones, so check those first. - } else if let Some(item) = work_queues.unknown_block_aggregate_queue.pop() { - Some(item) - } else if let Some(item) = work_queues.unknown_block_attestation_queue.pop() - { - Some(item) - // Check RPC methods next. Status messages are needed for sync so - // prioritize them over syncing requests from other peers (BlocksByRange - // and BlocksByRoot) - } else if let Some(item) = work_queues.status_queue.pop() { - Some(item) - } else if let Some(item) = work_queues.block_brange_queue.pop() { - Some(item) - } else if let Some(item) = work_queues.block_broots_queue.pop() { - Some(item) - } else if let Some(item) = work_queues.blob_brange_queue.pop() { - Some(item) - } else if let Some(item) = work_queues.blob_broots_queue.pop() { - Some(item) - } else if let Some(item) = work_queues.dcbroots_queue.pop() { - Some(item) - } else if let Some(item) = work_queues.dcbrange_queue.pop() { - Some(item) - // Check slashings after all other consensus messages so we prioritize - // following head. - // - // Check attester slashings before proposer slashings since they have the - // potential to slash multiple validators at once. - } else if let Some(item) = work_queues.gossip_attester_slashing_queue.pop() - { - Some(item) - } else if let Some(item) = work_queues.gossip_proposer_slashing_queue.pop() - { - Some(item) - // Check exits and address changes late since our validators don't get - // rewards from them. - } else if let Some(item) = work_queues.gossip_voluntary_exit_queue.pop() { - Some(item) - } else if let Some(item) = - work_queues.gossip_bls_to_execution_change_queue.pop() - { - Some(item) - // Check the priority 1 API requests after we've - // processed all the interesting things from the network - // and things required for us to stay in good repute - // with our P2P peers. - } else if let Some(item) = work_queues.api_request_p1_queue.pop() { - Some(item) - // Handle backfill sync chain segments. - } else if let Some(item) = work_queues.backfill_chain_segment.pop() { - Some(item) - // Handle light client requests. - } else if let Some(item) = work_queues.lc_gossip_finality_update_queue.pop() - { - Some(item) - } else if let Some(item) = - work_queues.lc_gossip_optimistic_update_queue.pop() - { - Some(item) - } else if let Some(item) = - work_queues.unknown_light_client_update_queue.pop() - { - Some(item) - } else if let Some(item) = work_queues.lc_bootstrap_queue.pop() { - Some(item) - } else if let Some(item) = work_queues.lc_rpc_optimistic_update_queue.pop() - { - Some(item) - } else if let Some(item) = work_queues.lc_rpc_finality_update_queue.pop() { - Some(item) - } else if let Some(item) = work_queues.lc_update_range_queue.pop() { - Some(item) - // This statement should always be the final else statement. - } else { - // Let the journal know that a worker is freed and there's nothing else - // for it to do. - if let Some(work_journal_tx) = &work_journal_tx { - // We don't care if this message was successfully sent, we only use the journal - // during testing. - let _ = work_journal_tx.try_send(NOTHING_TO_DO); - } - None - }; - + None => { + let work_event = self.get_next_work_item_from_queue( + &mut work_queues, + can_spawn_io_bound, + can_spawn_cpu_bound, + &work_journal_tx, + ); if let Some(work_event) = work_event { let work_type = work_event.to_type(); self.spawn_worker(work_event, created_timestamp, idle_tx); Some(work_type) } else { + if !can_spawn_cpu_bound && !can_spawn_io_bound { + // There is no new work event and we are unable to spawn a new worker. + // + // I cannot see any good reason why this would happen. + warn!( + msg = "no new work and cannot spawn worker", + "Unexpected gossip processor condition" + ); + } None } } - // There is no new work event and we are unable to spawn a new worker. - // - // I cannot see any good reason why this would happen. - None => { - warn!( - msg = "no new work and cannot spawn worker", - "Unexpected gossip processor condition" - ); - None - } // The chain is syncing and this event should be dropped during sync. Some(work_event) if self.network_globals.sync_state.read().is_syncing() @@ -1044,7 +642,16 @@ impl BeaconProcessor { ) } } - _ if can_spawn => self.spawn_worker(work, created_timestamp, idle_tx), + _ if can_spawn_cpu_bound + && work.get_work_category() == WorkCategory::CpuBound => + { + self.spawn_worker(work, created_timestamp, idle_tx) + } + _ if can_spawn_io_bound + && work.get_work_category() == WorkCategory::IoBound => + { + self.spawn_worker(work, created_timestamp, idle_tx) + } Work::GossipAttestation { .. } => { work_queues.attestation_queue.push(work) } @@ -1278,6 +885,293 @@ impl BeaconProcessor { Ok(()) } + fn get_next_work_item_from_queue( + &mut self, + work_queues: &mut WorkQueues, + can_spawn_io_bound: bool, + can_spawn_cpu_bound: bool, + work_journal_tx: &Option>, + ) -> Option> { + let can_spawn_predicate = |item: &Work| match item.get_work_category() { + WorkCategory::IoBound => can_spawn_io_bound, + WorkCategory::CpuBound => can_spawn_cpu_bound, + }; + + // Check for chain segments first, they're the most efficient way to get + // blocks into the system. + work_queues + .chain_segment_queue + .pop_if(can_spawn_predicate) + // Check sync blocks before gossip blocks, since we've already explicitly + // requested these blocks. + .or_else(|| work_queues.rpc_block_queue.pop_if(can_spawn_predicate)) + .or_else(|| work_queues.rpc_blob_queue.pop_if(can_spawn_predicate)) + .or_else(|| { + work_queues + .rpc_custody_column_queue + .pop_if(can_spawn_predicate) + }) + // Check delayed blocks before gossip blocks, the gossip blocks might rely + // on the delayed ones. + .or_else(|| work_queues.delayed_block_queue.pop_if(can_spawn_predicate)) + // Check gossip blocks before gossip attestations, since a block might be + // required to verify some attestations. + .or_else(|| work_queues.gossip_block_queue.pop_if(can_spawn_predicate)) + .or_else(|| work_queues.gossip_blob_queue.pop_if(can_spawn_predicate)) + .or_else(|| { + work_queues + .gossip_data_column_queue + .pop_if(can_spawn_predicate) + }) + .or_else(|| { + work_queues + .column_reconstruction_queue + .pop_if(can_spawn_predicate) + }) + // Check the priority 0 API requests after blocks and blobs, but before attestations. + .or_else(|| work_queues.api_request_p0_queue.pop_if(can_spawn_predicate)) + // Check the aggregates, *then* the unaggregates since we assume that + // aggregates are more valuable to local validators and effectively give us + // more information with less signature verification time. + .or_else(|| { + if !work_queues.aggregate_queue.is_empty() { + let batch_size = cmp::min( + work_queues.aggregate_queue.len(), + self.config.max_gossip_aggregate_batch_size, + ); + + if batch_size < 2 { + // One single aggregate is in the queue, process it individually. + work_queues.aggregate_queue.pop_if(can_spawn_predicate) + } else { + // Collect two or more aggregates into a batch, so they can take + // advantage of batch signature verification. + // + // Note: this will convert the `Work::GossipAggregate` item into a + // `Work::GossipAggregateBatch` item. + let mut aggregates = Vec::with_capacity(batch_size); + let mut process_batch_opt = None; + for _ in 0..batch_size { + if let Some(item) = + work_queues.aggregate_queue.pop_if(can_spawn_predicate) + { + match item { + Work::GossipAggregate { + aggregate, + process_individual: _, + process_batch, + } => { + aggregates.push(*aggregate); + if process_batch_opt.is_none() { + process_batch_opt = Some(process_batch); + } + } + _ => { + error!("Invalid item in aggregate queue"); + } + } + } + } + + if let Some(process_batch) = process_batch_opt { + // Process all aggregates with a single worker. + Some(Work::GossipAggregateBatch { + aggregates, + process_batch, + }) + } else { + // There is no good reason for this to + // happen, it is a serious logic error. + // Since we only form batches when multiple + // work items exist, we should always have a + // work closure at this point. + crit!("Missing aggregate work"); + None + } + } + } else { + None + } + }) + // Check the unaggregated attestation queue. + // + // Potentially use batching. + .or_else(|| { + if !work_queues.attestation_queue.is_empty() { + let batch_size = cmp::min( + work_queues.attestation_queue.len(), + self.config.max_gossip_attestation_batch_size, + ); + if batch_size < 2 { + // One single attestation is in the queue, process it individually. + work_queues.attestation_queue.pop_if(can_spawn_predicate) + } else { + // Collect two or more attestations into a batch, so they can take + // advantage of batch signature verification. + // + // Note: this will convert the `Work::GossipAttestation` item into a + // `Work::GossipAttestationBatch` item. + let mut attestations = Vec::with_capacity(batch_size); + let mut process_batch_opt = None; + for _ in 0..batch_size { + if let Some(item) = + work_queues.attestation_queue.pop_if(can_spawn_predicate) + { + match item { + Work::GossipAttestation { + attestation, + process_individual: _, + process_batch, + } => { + attestations.push(*attestation); + if process_batch_opt.is_none() { + process_batch_opt = Some(process_batch); + } + } + _ => error!("Invalid item in attestation queue"), + } + } + } + + if let Some(process_batch) = process_batch_opt { + // Process all attestations with a single worker. + Some(Work::GossipAttestationBatch { + attestations, + process_batch, + }) + } else { + // There is no good reason for this to + // happen, it is a serious logic error. + // Since we only form batches when multiple + // work items exist, we should always have a + // work closure at this point. + crit!("Missing attestations work"); + None + } + } + } else { + None + } + }) + // Convert any gossip attestations that need to be converted. + .or_else(|| { + work_queues + .attestation_to_convert_queue + .pop_if(can_spawn_predicate) + }) + // Check sync committee messages after attestations as their rewards are lesser + // and they don't influence fork choice. + .or_else(|| { + work_queues + .sync_contribution_queue + .pop_if(can_spawn_predicate) + }) + .or_else(|| work_queues.sync_message_queue.pop_if(can_spawn_predicate)) + // Aggregates and unaggregates queued for re-processing are older and we + // care about fresher ones, so check those first. + .or_else(|| { + work_queues + .unknown_block_aggregate_queue + .pop_if(can_spawn_predicate) + }) + .or_else(|| { + work_queues + .unknown_block_attestation_queue + .pop_if(can_spawn_predicate) + }) + // Check RPC methods next. Status messages are needed for sync so + // prioritize them over syncing requests from other peers (BlocksByRange + // and BlocksByRoot) + .or_else(|| work_queues.status_queue.pop_if(can_spawn_predicate)) + .or_else(|| work_queues.block_brange_queue.pop_if(can_spawn_predicate)) + .or_else(|| work_queues.block_broots_queue.pop_if(can_spawn_predicate)) + .or_else(|| work_queues.blob_brange_queue.pop_if(can_spawn_predicate)) + .or_else(|| work_queues.blob_broots_queue.pop_if(can_spawn_predicate)) + .or_else(|| work_queues.dcbroots_queue.pop_if(can_spawn_predicate)) + .or_else(|| work_queues.dcbrange_queue.pop_if(can_spawn_predicate)) + // Check slashings after all other consensus messages so we prioritize + // following head. + // + // Check attester slashings before proposer slashings since they have the + // potential to slash multiple validators at ongice. + .or_else(|| { + work_queues + .gossip_attester_slashing_queue + .pop_if(can_spawn_predicate) + }) + .or_else(|| { + work_queues + .gossip_proposer_slashing_queue + .pop_if(can_spawn_predicate) + }) + // Check exits and address changes late since our validators don't get + // rewards from them. + .or_else(|| { + work_queues + .gossip_voluntary_exit_queue + .pop_if(can_spawn_predicate) + }) + .or_else(|| { + work_queues + .gossip_bls_to_execution_change_queue + .pop_if(can_spawn_predicate) + }) + // Check the priority 1 API requests after we've + // processed all the interesting things from the network + // and things required for us to stay in good repute + // with our P2P peers. + .or_else(|| work_queues.api_request_p1_queue.pop_if(can_spawn_predicate)) + // Handle backfill sync chain segments. + .or_else(|| { + work_queues + .backfill_chain_segment + .pop_if(can_spawn_predicate) + }) + // Handle light client requests. + .or_else(|| { + work_queues + .lc_gossip_finality_update_queue + .pop_if(can_spawn_predicate) + }) + .or_else(|| { + work_queues + .lc_gossip_optimistic_update_queue + .pop_if(can_spawn_predicate) + }) + .or_else(|| { + work_queues + .unknown_light_client_update_queue + .pop_if(can_spawn_predicate) + }) + .or_else(|| work_queues.lc_bootstrap_queue.pop_if(can_spawn_predicate)) + .or_else(|| { + work_queues + .lc_rpc_optimistic_update_queue + .pop_if(can_spawn_predicate) + }) + .or_else(|| { + work_queues + .lc_rpc_finality_update_queue + .pop_if(can_spawn_predicate) + }) + .or_else(|| { + work_queues + .lc_update_range_queue + .pop_if(can_spawn_predicate) + }) + // This should always be the final `or_else` statement. + .or_else(|| { + // Let the journal know that a worker is freed and there's nothing else + // for it to do. + if let Some(work_journal_tx) = work_journal_tx { + // We don't care if this message was successfully sent, we only use the journal + // during testing. + let _ = work_journal_tx.try_send(NOTHING_TO_DO); + } + None + }) + } + /// Spawns a blocking worker thread to process some `Work`. /// /// Sends an message on `idle_tx` when the work is complete and the task is stopping. @@ -1320,14 +1214,27 @@ impl BeaconProcessor { _worker_timer: worker_timer, }; - let worker_id = self.current_workers; - self.current_workers = self.current_workers.saturating_add(1); + let work_category = work.get_work_category(); + + let worker_id = match work_category { + WorkCategory::IoBound => { + let worker_id = self.current_io_bound_workers; + self.current_io_bound_workers = self.current_io_bound_workers.saturating_add(1); + worker_id + } + WorkCategory::CpuBound => { + let worker_id = self.current_cpu_bound_workers; + self.current_cpu_bound_workers = self.current_cpu_bound_workers.saturating_add(1); + worker_id + } + }; let executor = self.executor.clone(); trace!( work = work_id, worker = worker_id, + ?work_category, "Spawning beacon processor worker" ); diff --git a/beacon_node/beacon_processor/src/scheduler/work_queue.rs b/beacon_node/beacon_processor/src/scheduler/work_queue.rs index c6f74961d17..a17a9e4c373 100644 --- a/beacon_node/beacon_processor/src/scheduler/work_queue.rs +++ b/beacon_node/beacon_processor/src/scheduler/work_queue.rs @@ -29,6 +29,17 @@ impl FifoQueue { } } + pub fn pop_if(&mut self, predicate: F) -> Option + where + F: FnOnce(&T) -> bool, + { + if self.queue.front().is_some_and(predicate) { + self.queue.pop_front() + } else { + None + } + } + /// Add a new item to the queue. /// /// Drops `item` if the queue is full. @@ -75,6 +86,18 @@ impl LifoQueue { } } + /// Pop if predicate evaluates to true + pub fn pop_if(&mut self, predicate: F) -> Option + where + F: FnOnce(&T) -> bool, + { + if self.queue.front().is_some_and(predicate) { + self.queue.pop_front() + } else { + None + } + } + /// Add a new item to the front of the queue. /// /// If the queue is full, the item at the back of the queue is dropped. @@ -85,7 +108,7 @@ impl LifoQueue { self.queue.push_front(item); } - /// Remove the next item from the queue. + /// Remove at the next item in the queue. pub fn pop(&mut self) -> Option { self.queue.pop_front() } @@ -255,6 +278,10 @@ pub struct WorkQueues { pub api_request_p1_queue: FifoQueue>, } +impl WorkQueues { + pub fn get_next_work(&mut self) {} +} + impl WorkQueues { pub fn new(queue_lengths: BeaconProcessorQueueLengths) -> Self { // Using LIFO queues for attestations since validator profits rely upon getting fresh diff --git a/beacon_node/beacon_processor/src/work.rs b/beacon_node/beacon_processor/src/work.rs new file mode 100644 index 00000000000..6208cf18743 --- /dev/null +++ b/beacon_node/beacon_processor/src/work.rs @@ -0,0 +1,270 @@ +use crate::{ + GossipAggregatePackage, GossipAttestationPackage, SendOnDrop, + work_reprocessing_queue::ReprocessQueueMessage, +}; +use std::{ + fmt::{Debug, Formatter}, + pin::Pin, +}; +use strum::IntoStaticStr; +use types::{EthSpec, Hash256, SingleAttestation, Slot}; + +pub type AsyncFn = Pin + Send + Sync>>; +pub type BlockingFn = Box; +pub type BlockingFnWithManualSendOnIdle = Box; +pub enum BlockingOrAsync { + Blocking(BlockingFn), + Async(AsyncFn), +} +pub type GossipAttestationBatch = Vec>; + +/// Indicates the type of work to be performed and therefore its priority and +/// queuing specifics. +pub enum Work { + GossipAttestation { + attestation: Box>, + process_individual: + Box) + Send + Sync>, + process_batch: Box, + }, + UnknownBlockAttestation { + process_fn: BlockingFn, + }, + GossipAttestationBatch { + attestations: GossipAttestationBatch, + process_batch: Box, + }, + GossipAggregate { + aggregate: Box>, + process_individual: Box) + Send + Sync>, + process_batch: Box>) + Send + Sync>, + }, + UnknownBlockAggregate { + process_fn: BlockingFn, + }, + UnknownLightClientOptimisticUpdate { + parent_root: Hash256, + process_fn: BlockingFn, + }, + GossipAggregateBatch { + aggregates: Vec>, + process_batch: Box>) + Send + Sync>, + }, + GossipBlock(AsyncFn), + GossipBlobSidecar(AsyncFn), + GossipDataColumnSidecar(AsyncFn), + DelayedImportBlock { + beacon_block_slot: Slot, + beacon_block_root: Hash256, + process_fn: AsyncFn, + }, + GossipVoluntaryExit(BlockingFn), + GossipProposerSlashing(BlockingFn), + GossipAttesterSlashing(BlockingFn), + GossipSyncSignature(BlockingFn), + GossipSyncContribution(BlockingFn), + GossipLightClientFinalityUpdate(BlockingFn), + GossipLightClientOptimisticUpdate(BlockingFn), + RpcBlock { + process_fn: AsyncFn, + }, + RpcBlobs { + process_fn: AsyncFn, + }, + RpcCustodyColumn(AsyncFn), + ColumnReconstruction(AsyncFn), + IgnoredRpcBlock { + process_fn: BlockingFn, + }, + ChainSegment(AsyncFn), + ChainSegmentBackfill(BlockingFn), + Status(BlockingFn), + BlocksByRangeRequest(AsyncFn), + BlocksByRootsRequest(AsyncFn), + BlobsByRangeRequest(BlockingFn), + BlobsByRootsRequest(BlockingFn), + DataColumnsByRootsRequest(BlockingFn), + DataColumnsByRangeRequest(BlockingFn), + GossipBlsToExecutionChange(BlockingFn), + LightClientBootstrapRequest(BlockingFn), + LightClientOptimisticUpdateRequest(BlockingFn), + LightClientFinalityUpdateRequest(BlockingFn), + LightClientUpdatesByRangeRequest(BlockingFn), + ApiRequestP0(BlockingOrAsync), + ApiRequestP1(BlockingOrAsync), + Reprocess(ReprocessQueueMessage), +} + +impl Work { + pub fn get_work_category(&self) -> WorkCategory { + WorkCategory::get_category(self.to_type()) + } +} + +impl Debug for Work { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", Into::<&'static str>::into(self.to_type())) + } +} + +#[derive(IntoStaticStr, PartialEq, Eq, Debug, Clone)] +#[strum(serialize_all = "snake_case")] +pub enum WorkType { + GossipAttestation, + GossipAttestationToConvert, + UnknownBlockAttestation, + GossipAttestationBatch, + GossipAggregate, + UnknownBlockAggregate, + UnknownLightClientOptimisticUpdate, + GossipAggregateBatch, + GossipBlock, + GossipBlobSidecar, + GossipDataColumnSidecar, + DelayedImportBlock, + GossipVoluntaryExit, + GossipProposerSlashing, + GossipAttesterSlashing, + GossipSyncSignature, + GossipSyncContribution, + GossipLightClientFinalityUpdate, + GossipLightClientOptimisticUpdate, + RpcBlock, + RpcBlobs, + RpcCustodyColumn, + ColumnReconstruction, + IgnoredRpcBlock, + ChainSegment, + ChainSegmentBackfill, + Status, + BlocksByRangeRequest, + BlocksByRootsRequest, + BlobsByRangeRequest, + BlobsByRootsRequest, + DataColumnsByRootsRequest, + DataColumnsByRangeRequest, + GossipBlsToExecutionChange, + LightClientBootstrapRequest, + LightClientOptimisticUpdateRequest, + LightClientFinalityUpdateRequest, + LightClientUpdatesByRangeRequest, + ApiRequestP0, + ApiRequestP1, + Reprocess, +} + +#[derive(PartialEq, Eq, Debug)] +pub enum WorkCategory { + // Work that is IO bound, i.e. network requests, disk operations, external services etc. + IoBound, + // Work that is CPU bound + CpuBound, +} + +impl WorkCategory { + pub fn get_category(work_type: WorkType) -> Self { + match work_type { + // IO bound tasks + WorkType::GossipAttestationToConvert + | WorkType::UnknownLightClientOptimisticUpdate + | WorkType::GossipVoluntaryExit + | WorkType::GossipProposerSlashing + | WorkType::GossipAttesterSlashing + | WorkType::GossipSyncSignature + | WorkType::GossipSyncContribution + | WorkType::GossipLightClientFinalityUpdate + | WorkType::GossipLightClientOptimisticUpdate + | WorkType::IgnoredRpcBlock + | WorkType::Status + | WorkType::BlocksByRangeRequest + | WorkType::BlocksByRootsRequest + | WorkType::BlobsByRangeRequest + | WorkType::BlobsByRootsRequest + | WorkType::DataColumnsByRootsRequest + | WorkType::DataColumnsByRangeRequest + | WorkType::GossipBlsToExecutionChange + | WorkType::LightClientBootstrapRequest + | WorkType::LightClientOptimisticUpdateRequest + | WorkType::LightClientFinalityUpdateRequest + | WorkType::LightClientUpdatesByRangeRequest + | WorkType::ApiRequestP0 + | WorkType::ApiRequestP1 + | WorkType::Reprocess => Self::IoBound, + // CPU bound tasks + WorkType::GossipBlock + | WorkType::UnknownBlockAttestation + | WorkType::UnknownBlockAggregate + | WorkType::GossipAttestation + | WorkType::GossipAttestationBatch + | WorkType::GossipAggregate + | WorkType::GossipAggregateBatch + | WorkType::GossipBlobSidecar + | WorkType::GossipDataColumnSidecar + | WorkType::DelayedImportBlock + | WorkType::RpcBlock + | WorkType::RpcBlobs + | WorkType::RpcCustodyColumn + | WorkType::ColumnReconstruction + | WorkType::ChainSegment + | WorkType::ChainSegmentBackfill => Self::CpuBound, + } + } +} + +impl Work { + pub fn str_id(&self) -> &'static str { + self.to_type().into() + } + + /// Provides a `&str` that uniquely identifies each enum variant. + pub fn to_type(&self) -> WorkType { + match self { + Work::GossipAttestation { .. } => WorkType::GossipAttestation, + Work::GossipAttestationBatch { .. } => WorkType::GossipAttestationBatch, + Work::GossipAggregate { .. } => WorkType::GossipAggregate, + Work::GossipAggregateBatch { .. } => WorkType::GossipAggregateBatch, + Work::GossipBlock(_) => WorkType::GossipBlock, + Work::GossipBlobSidecar(_) => WorkType::GossipBlobSidecar, + Work::GossipDataColumnSidecar(_) => WorkType::GossipDataColumnSidecar, + Work::DelayedImportBlock { .. } => WorkType::DelayedImportBlock, + Work::GossipVoluntaryExit(_) => WorkType::GossipVoluntaryExit, + Work::GossipProposerSlashing(_) => WorkType::GossipProposerSlashing, + Work::GossipAttesterSlashing(_) => WorkType::GossipAttesterSlashing, + Work::GossipSyncSignature(_) => WorkType::GossipSyncSignature, + Work::GossipSyncContribution(_) => WorkType::GossipSyncContribution, + Work::GossipLightClientFinalityUpdate(_) => WorkType::GossipLightClientFinalityUpdate, + Work::GossipLightClientOptimisticUpdate(_) => { + WorkType::GossipLightClientOptimisticUpdate + } + Work::GossipBlsToExecutionChange(_) => WorkType::GossipBlsToExecutionChange, + Work::RpcBlock { .. } => WorkType::RpcBlock, + Work::RpcBlobs { .. } => WorkType::RpcBlobs, + Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn, + Work::ColumnReconstruction(_) => WorkType::ColumnReconstruction, + Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock, + Work::ChainSegment { .. } => WorkType::ChainSegment, + Work::ChainSegmentBackfill(_) => WorkType::ChainSegmentBackfill, + Work::Status(_) => WorkType::Status, + Work::BlocksByRangeRequest(_) => WorkType::BlocksByRangeRequest, + Work::BlocksByRootsRequest(_) => WorkType::BlocksByRootsRequest, + Work::BlobsByRangeRequest(_) => WorkType::BlobsByRangeRequest, + Work::BlobsByRootsRequest(_) => WorkType::BlobsByRootsRequest, + Work::DataColumnsByRootsRequest(_) => WorkType::DataColumnsByRootsRequest, + Work::DataColumnsByRangeRequest(_) => WorkType::DataColumnsByRangeRequest, + Work::LightClientBootstrapRequest(_) => WorkType::LightClientBootstrapRequest, + Work::LightClientOptimisticUpdateRequest(_) => { + WorkType::LightClientOptimisticUpdateRequest + } + Work::LightClientFinalityUpdateRequest(_) => WorkType::LightClientFinalityUpdateRequest, + Work::LightClientUpdatesByRangeRequest(_) => WorkType::LightClientUpdatesByRangeRequest, + Work::UnknownBlockAttestation { .. } => WorkType::UnknownBlockAttestation, + Work::UnknownBlockAggregate { .. } => WorkType::UnknownBlockAggregate, + Work::UnknownLightClientOptimisticUpdate { .. } => { + WorkType::UnknownLightClientOptimisticUpdate + } + Work::ApiRequestP0 { .. } => WorkType::ApiRequestP0, + Work::ApiRequestP1 { .. } => WorkType::ApiRequestP1, + Work::Reprocess { .. } => WorkType::Reprocess, + } + } +} diff --git a/beacon_node/client/src/builder.rs b/beacon_node/client/src/builder.rs index 694c6fb356a..6986a8db4fc 100644 --- a/beacon_node/client/src/builder.rs +++ b/beacon_node/client/src/builder.rs @@ -696,7 +696,8 @@ where BeaconProcessor { network_globals: network_globals.clone(), executor: beacon_processor_context.executor.clone(), - current_workers: 0, + current_cpu_bound_workers: 0, + current_io_bound_workers: 0, config: beacon_processor_config, } .spawn_manager( diff --git a/beacon_node/http_api/src/test_utils.rs b/beacon_node/http_api/src/test_utils.rs index 27e2a27d35c..89bd1b6adf1 100644 --- a/beacon_node/http_api/src/test_utils.rs +++ b/beacon_node/http_api/src/test_utils.rs @@ -247,11 +247,6 @@ pub async fn create_api_server_with_config( *network_globals.sync_state.write() = SyncState::Synced; let beacon_processor_config = BeaconProcessorConfig { - // The number of workers must be greater than one. Tests which use the - // builder workflow sometimes require an internal HTTP request in order - // to fulfill an already in-flight HTTP request, therefore having only - // one worker will result in a deadlock. - max_workers: 2, ..BeaconProcessorConfig::default() }; let BeaconProcessorChannels { @@ -263,7 +258,8 @@ pub async fn create_api_server_with_config( BeaconProcessor { network_globals: network_globals.clone(), executor: test_runtime.task_executor.clone(), - current_workers: 0, + current_cpu_bound_workers: 0, + current_io_bound_workers: 0, config: beacon_processor_config, } .spawn_manager( diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index ed04fe7bb97..7337e13eb8d 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -277,8 +277,9 @@ impl TestRig { let beacon_processor = BeaconProcessor { network_globals: network_globals.clone(), + current_cpu_bound_workers: 0, + current_io_bound_workers: 0, executor, - current_workers: 0, config: beacon_processor_config, } .spawn_manager( diff --git a/beacon_node/src/config.rs b/beacon_node/src/config.rs index 26dd3b6642e..db51f24711a 100644 --- a/beacon_node/src/config.rs +++ b/beacon_node/src/config.rs @@ -830,10 +830,10 @@ pub fn get_config( if let Some(max_workers) = clap_utils::parse_optional(cli_args, "beacon-processor-max-workers")? { - client_config.beacon_processor.max_workers = max_workers; + client_config.beacon_processor.max_cpu_bound_workers = max_workers; } - if client_config.beacon_processor.max_workers == 0 { + if client_config.beacon_processor.max_cpu_bound_workers == 0 { return Err("--beacon-processor-max-workers must be a non-zero value".to_string()); } diff --git a/lighthouse/tests/beacon_node.rs b/lighthouse/tests/beacon_node.rs index 207324ea33f..0171f9913f5 100644 --- a/lighthouse/tests/beacon_node.rs +++ b/lighthouse/tests/beacon_node.rs @@ -2671,6 +2671,8 @@ fn beacon_processor() { .with_config(|config| assert_eq!(config.beacon_processor, <_>::default())); CommandLineTest::new() + // TODO add separate max worker flag for cpu bound and io bound tasks + // Deprecate max-workers .flag("beacon-processor-max-workers", Some("1")) .flag("beacon-processor-work-queue-len", Some("2")) .flag("beacon-processor-reprocess-queue-len", Some("3")) @@ -2682,7 +2684,8 @@ fn beacon_processor() { assert_eq!( config.beacon_processor, BeaconProcessorConfig { - max_workers: 1, + max_io_bound_workers: 100, + max_cpu_bound_workers: 8, max_work_event_queue_len: 2, max_scheduled_work_queue_len: 3, max_gossip_attestation_batch_size: 4, diff --git a/testing/node_test_rig/src/lib.rs b/testing/node_test_rig/src/lib.rs index e49d11ee1eb..56b42a715c4 100644 --- a/testing/node_test_rig/src/lib.rs +++ b/testing/node_test_rig/src/lib.rs @@ -120,7 +120,7 @@ pub fn testing_client_config() -> ClientConfig { // Specify a constant count of beacon processor workers. Having this number // too low can cause annoying HTTP timeouts, especially on Github runners // with 2 logical CPUs. - client_config.beacon_processor.max_workers = 4; + client_config.beacon_processor.max_cpu_bound_workers = 4; client_config }