Skip to content

Remove reprocess channel #7437

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jun 20, 2025
Merged
44 changes: 28 additions & 16 deletions beacon_node/beacon_processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use lighthouse_network::{MessageId, NetworkGlobals, PeerId};
use logging::crit;
use logging::TimeLatch;
use parking_lot::Mutex;
pub use scheduler::work_reprocessing_queue;
use serde::{Deserialize, Serialize};
use slot_clock::SlotClock;
use std::cmp;
Expand All @@ -73,7 +74,7 @@ use work_reprocessing_queue::{
use work_reprocessing_queue::{IgnoredRpcBlock, QueuedSamplingRequest};

mod metrics;
pub mod work_reprocessing_queue;
pub mod scheduler;

/// The maximum size of the channel for work events to the `BeaconProcessor`.
///
Expand Down Expand Up @@ -264,22 +265,16 @@ impl Default for BeaconProcessorConfig {
pub struct BeaconProcessorChannels<E: EthSpec> {
pub beacon_processor_tx: BeaconProcessorSend<E>,
pub beacon_processor_rx: mpsc::Receiver<WorkEvent<E>>,
pub work_reprocessing_tx: mpsc::Sender<ReprocessQueueMessage>,
pub work_reprocessing_rx: mpsc::Receiver<ReprocessQueueMessage>,
}

impl<E: EthSpec> BeaconProcessorChannels<E> {
pub fn new(config: &BeaconProcessorConfig) -> Self {
let (beacon_processor_tx, beacon_processor_rx) =
mpsc::channel(config.max_work_event_queue_len);
let (work_reprocessing_tx, work_reprocessing_rx) =
mpsc::channel(config.max_scheduled_work_queue_len);

Self {
beacon_processor_tx: BeaconProcessorSend(beacon_processor_tx),
beacon_processor_rx,
work_reprocessing_rx,
work_reprocessing_tx,
}
}
}
Expand Down Expand Up @@ -638,6 +633,7 @@ pub enum Work<E: EthSpec> {
LightClientUpdatesByRangeRequest(BlockingFn),
ApiRequestP0(BlockingOrAsync),
ApiRequestP1(BlockingOrAsync),
Reprocess(ReprocessQueueMessage),
}

impl<E: EthSpec> fmt::Debug for Work<E> {
Expand Down Expand Up @@ -692,6 +688,7 @@ pub enum WorkType {
LightClientUpdatesByRangeRequest,
ApiRequestP0,
ApiRequestP1,
Reprocess,
}

impl<E: EthSpec> Work<E> {
Expand Down Expand Up @@ -750,6 +747,7 @@ impl<E: EthSpec> Work<E> {
}
Work::ApiRequestP0 { .. } => WorkType::ApiRequestP0,
Work::ApiRequestP1 { .. } => WorkType::ApiRequestP1,
Work::Reprocess { .. } => WorkType::Reprocess,
}
}
}
Expand All @@ -774,7 +772,7 @@ struct InboundEvents<E: EthSpec> {
/// Used by upstream processes to send new work to the `BeaconProcessor`.
event_rx: mpsc::Receiver<WorkEvent<E>>,
/// Used internally for queuing work ready to be re-processed.
reprocess_work_rx: mpsc::Receiver<ReadyWork>,
ready_work_rx: mpsc::Receiver<ReadyWork>,
}

impl<E: EthSpec> Stream for InboundEvents<E> {
Expand All @@ -795,7 +793,7 @@ impl<E: EthSpec> Stream for InboundEvents<E> {

// Poll for delayed blocks before polling for new work. It might be the case that a delayed
// block is required to successfully process some new work.
match self.reprocess_work_rx.poll_recv(cx) {
match self.ready_work_rx.poll_recv(cx) {
Poll::Ready(Some(ready_work)) => {
return Poll::Ready(Some(InboundEvent::ReprocessingWork(ready_work.into())));
}
Expand Down Expand Up @@ -846,8 +844,6 @@ impl<E: EthSpec> BeaconProcessor<E> {
pub fn spawn_manager<S: SlotClock + 'static>(
mut self,
event_rx: mpsc::Receiver<WorkEvent<E>>,
work_reprocessing_tx: mpsc::Sender<ReprocessQueueMessage>,
work_reprocessing_rx: mpsc::Receiver<ReprocessQueueMessage>,
work_journal_tx: Option<mpsc::Sender<&'static str>>,
slot_clock: S,
maximum_gossip_clock_disparity: Duration,
Expand Down Expand Up @@ -935,9 +931,13 @@ impl<E: EthSpec> BeaconProcessor<E> {
// receive them back once they are ready (`ready_work_rx`).
let (ready_work_tx, ready_work_rx) =
mpsc::channel::<ReadyWork>(self.config.max_scheduled_work_queue_len);

let (reprocess_work_tx, reprocess_work_rx) =
mpsc::channel::<ReprocessQueueMessage>(self.config.max_scheduled_work_queue_len);

spawn_reprocess_scheduler(
ready_work_tx,
work_reprocessing_rx,
reprocess_work_rx,
&self.executor,
Arc::new(slot_clock),
maximum_gossip_clock_disparity,
Expand All @@ -951,7 +951,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
let mut inbound_events = InboundEvents {
idle_rx,
event_rx,
reprocess_work_rx: ready_work_rx,
ready_work_rx,
};

let enable_backfill_rate_limiting = self.config.enable_backfill_rate_limiting;
Expand All @@ -965,7 +965,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
Some(InboundEvent::WorkEvent(event)) if enable_backfill_rate_limiting => {
match QueuedBackfillBatch::try_from(event) {
Ok(backfill_batch) => {
match work_reprocessing_tx
match reprocess_work_tx
.try_send(ReprocessQueueMessage::BackfillSync(backfill_batch))
{
Err(e) => {
Expand Down Expand Up @@ -1027,8 +1027,10 @@ impl<E: EthSpec> BeaconProcessor<E> {
.unwrap_or(WORKER_FREED);

// We don't care if this message was successfully sent, we only use the journal
// during testing.
let _ = work_journal_tx.try_send(id);
// during testing. We also ignore reprocess messages to ensure our test cases can pass.
if id != "reprocess" {
let _ = work_journal_tx.try_send(id);
}
}

let can_spawn = self.current_workers < self.config.max_workers;
Expand Down Expand Up @@ -1318,6 +1320,14 @@ impl<E: EthSpec> BeaconProcessor<E> {
let work_type = work.to_type();

match work {
Work::Reprocess(work_event) => {
if let Err(e) = reprocess_work_tx.try_send(work_event) {
error!(
error = ?e,
"Failed to reprocess work event"
)
}
}
_ if can_spawn => self.spawn_worker(work, idle_tx),
Work::GossipAttestation { .. } => attestation_queue.push(work),
// Attestation batches are formed internally within the
Expand Down Expand Up @@ -1488,6 +1498,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
WorkType::LightClientUpdatesByRangeRequest => lc_update_range_queue.len(),
WorkType::ApiRequestP0 => api_request_p0_queue.len(),
WorkType::ApiRequestP1 => api_request_p1_queue.len(),
WorkType::Reprocess => 0,
};
metrics::observe_vec(
&metrics::BEACON_PROCESSOR_QUEUE_LENGTH,
Expand Down Expand Up @@ -1639,6 +1650,7 @@ impl<E: EthSpec> BeaconProcessor<E> {
| Work::LightClientUpdatesByRangeRequest(process_fn) => {
task_spawner.spawn_blocking(process_fn)
}
Work::Reprocess(_) => {}
};
}
}
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_processor/src/scheduler/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod work_reprocessing_queue;
9 changes: 1 addition & 8 deletions beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,6 @@ where
network_senders: None,
network_globals: None,
beacon_processor_send: None,
beacon_processor_reprocess_send: None,
eth1_service: Some(genesis_service.eth1_service.clone()),
sse_logging_components: runtime_context.sse_logging_components.clone(),
});
Expand Down Expand Up @@ -638,7 +637,6 @@ where
context.executor,
libp2p_registry.as_mut(),
beacon_processor_channels.beacon_processor_tx.clone(),
beacon_processor_channels.work_reprocessing_tx.clone(),
)
.await
.map_err(|e| format!("Failed to start network: {:?}", e))?;
Expand Down Expand Up @@ -777,9 +775,6 @@ where
network_globals: self.network_globals.clone(),
eth1_service: self.eth1_service.clone(),
beacon_processor_send: Some(beacon_processor_channels.beacon_processor_tx.clone()),
beacon_processor_reprocess_send: Some(
beacon_processor_channels.work_reprocessing_tx.clone(),
),
sse_logging_components: runtime_context.sse_logging_components.clone(),
});

Expand Down Expand Up @@ -843,8 +838,6 @@ where
}
.spawn_manager(
beacon_processor_channels.beacon_processor_rx,
beacon_processor_channels.work_reprocessing_tx.clone(),
beacon_processor_channels.work_reprocessing_rx,
None,
beacon_chain.slot_clock.clone(),
beacon_chain.spec.maximum_gossip_clock_disparity(),
Expand Down Expand Up @@ -918,7 +911,7 @@ where
compute_light_client_updates(
&inner_chain,
light_client_server_rv,
beacon_processor_channels.work_reprocessing_tx,
beacon_processor_channels.beacon_processor_tx,
)
.await
},
Expand Down
12 changes: 9 additions & 3 deletions beacon_node/client/src/compute_light_client_updates.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use beacon_chain::{BeaconChain, BeaconChainTypes, LightClientProducerEvent};
use beacon_processor::work_reprocessing_queue::ReprocessQueueMessage;
use beacon_processor::{BeaconProcessorSend, Work, WorkEvent};
use futures::channel::mpsc::Receiver;
use futures::StreamExt;
use tokio::sync::mpsc::Sender;
use tracing::error;

// Each `LightClientProducerEvent` is ~200 bytes. With the light_client server producing only recent
Expand All @@ -14,7 +14,7 @@ pub(crate) const LIGHT_CLIENT_SERVER_CHANNEL_CAPACITY: usize = 32;
pub async fn compute_light_client_updates<T: BeaconChainTypes>(
chain: &BeaconChain<T>,
mut light_client_server_rv: Receiver<LightClientProducerEvent<T::EthSpec>>,
reprocess_tx: Sender<ReprocessQueueMessage>,
beacon_processor_send: BeaconProcessorSend<T::EthSpec>,
) {
// Should only receive events for recent blocks, import_block filters by blocks close to clock.
//
Expand All @@ -31,7 +31,13 @@ pub async fn compute_light_client_updates<T: BeaconChainTypes>(
});

let msg = ReprocessQueueMessage::NewLightClientOptimisticUpdate { parent_root };
if reprocess_tx.try_send(msg).is_err() {
if beacon_processor_send
.try_send(WorkEvent {
drop_during_sync: true,
work: Work::Reprocess(msg),
})
.is_err()
{
error!(%parent_root,"Failed to inform light client update")
};
}
Expand Down
14 changes: 3 additions & 11 deletions beacon_node/http_api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use beacon_chain::{
validator_monitor::timestamp_now, AttestationError as AttnError, BeaconChain, BeaconChainError,
BeaconChainTypes, WhenSlotSkipped,
};
use beacon_processor::{work_reprocessing_queue::ReprocessQueueMessage, BeaconProcessorSend};
use beacon_processor::BeaconProcessorSend;
pub use block_id::BlockId;
use builder_states::get_next_withdrawals;
use bytes::Bytes;
Expand Down Expand Up @@ -130,7 +130,6 @@ pub struct Context<T: BeaconChainTypes> {
pub network_senders: Option<NetworkSenders<T::EthSpec>>,
pub network_globals: Option<Arc<NetworkGlobals<T::EthSpec>>>,
pub beacon_processor_send: Option<BeaconProcessorSend<T::EthSpec>>,
pub beacon_processor_reprocess_send: Option<Sender<ReprocessQueueMessage>>,
pub eth1_service: Option<eth1::Service>,
pub sse_logging_components: Option<SSELoggingComponents>,
}
Expand Down Expand Up @@ -554,11 +553,6 @@ pub fn serve<T: BeaconChainTypes>(
.filter(|_| config.enable_beacon_processor);
let task_spawner_filter =
warp::any().map(move || TaskSpawner::new(beacon_processor_send.clone()));
let beacon_processor_reprocess_send = ctx
.beacon_processor_reprocess_send
.clone()
.filter(|_| config.enable_beacon_processor);
let reprocess_send_filter = warp::any().map(move || beacon_processor_reprocess_send.clone());

let duplicate_block_status_code = ctx.config.duplicate_block_status_code;

Expand Down Expand Up @@ -1986,20 +1980,18 @@ pub fn serve<T: BeaconChainTypes>(
.and(warp_utils::json::json::<Vec<SingleAttestation>>())
.and(optional_consensus_version_header_filter)
.and(network_tx_filter.clone())
.and(reprocess_send_filter.clone())
.then(
|task_spawner: TaskSpawner<T::EthSpec>,
chain: Arc<BeaconChain<T>>,
attestations: Vec<SingleAttestation>,
_fork_name: Option<ForkName>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
reprocess_tx: Option<Sender<ReprocessQueueMessage>>| async move {
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>| async move {
let result = crate::publish_attestations::publish_attestations(
task_spawner,
chain,
attestations,
network_tx,
reprocess_tx,
true,
)
.await
.map(|()| warp::reply::json(&()));
Expand Down
19 changes: 12 additions & 7 deletions beacon_node/http_api/src/publish_attestations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,13 @@ use beacon_chain::{
BeaconChainTypes,
};
use beacon_processor::work_reprocessing_queue::{QueuedUnaggregate, ReprocessQueueMessage};
use beacon_processor::{Work, WorkEvent};
use eth2::types::Failure;
use lighthouse_network::PubsubMessage;
use network::NetworkMessage;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{
mpsc::{Sender, UnboundedSender},
oneshot,
};
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use tracing::{debug, error, warn};
use types::SingleAttestation;

Expand Down Expand Up @@ -130,7 +128,7 @@ pub async fn publish_attestations<T: BeaconChainTypes>(
chain: Arc<BeaconChain<T>>,
attestations: Vec<SingleAttestation>,
network_tx: UnboundedSender<NetworkMessage<T::EthSpec>>,
reprocess_send: Option<Sender<ReprocessQueueMessage>>,
allow_reprocess: bool,
) -> Result<(), warp::Rejection> {
// Collect metadata about attestations which we'll use to report failures. We need to
// move the `attestations` vec into the blocking task, so this small overhead is unavoidable.
Expand All @@ -142,6 +140,7 @@ pub async fn publish_attestations<T: BeaconChainTypes>(
// Gossip validate and publish attestations that can be immediately processed.
let seen_timestamp = timestamp_now();
let mut prelim_results = task_spawner
.clone()
.blocking_task(Priority::P0, move || {
Ok(attestations
.into_iter()
Expand All @@ -156,7 +155,7 @@ pub async fn publish_attestations<T: BeaconChainTypes>(
Err(Error::Validation(AttestationError::UnknownHeadBlock {
beacon_block_root,
})) => {
let Some(reprocess_tx) = &reprocess_send else {
if !allow_reprocess {
return PublishAttestationResult::Failure(Error::ReprocessDisabled);
};
// Re-process.
Expand All @@ -180,7 +179,13 @@ pub async fn publish_attestations<T: BeaconChainTypes>(
beacon_block_root,
process_fn: Box::new(reprocess_fn),
});
if reprocess_tx.try_send(reprocess_msg).is_err() {
if task_spawner
.try_send(WorkEvent {
drop_during_sync: false,
work: Work::Reprocess(reprocess_msg),
})
.is_err()
{
PublishAttestationResult::Failure(Error::ReprocessFull)
} else {
PublishAttestationResult::Reprocessing(rx)
Expand Down
27 changes: 27 additions & 0 deletions beacon_node/http_api/src/task_spawner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ impl Priority {
}

/// Spawns tasks on the `BeaconProcessor` or directly on the tokio executor.
#[derive(Clone)]
pub struct TaskSpawner<E: EthSpec> {
/// Used to send tasks to the `BeaconProcessor`. The tokio executor will be
/// used if this is `None`.
Expand Down Expand Up @@ -155,6 +156,32 @@ impl<E: EthSpec> TaskSpawner<E> {
.and_then(|x| x)
}
}

pub fn try_send(&self, work_event: WorkEvent<E>) -> Result<(), warp::Rejection> {
if let Some(beacon_processor_send) = &self.beacon_processor_send {
let error_message = match beacon_processor_send.try_send(work_event) {
Ok(()) => None,
Err(TrySendError::Full(_)) => {
Some("The task was dropped. The server is overloaded.")
}
Err(TrySendError::Closed(_)) => {
Some("The task was dropped. The server is shutting down.")
}
};

if let Some(error_message) = error_message {
return Err(warp_utils::reject::custom_server_error(
error_message.to_string(),
));
};

Ok(())
} else {
Err(warp_utils::reject::custom_server_error(
"The beacon processor is unavailable".to_string(),
))
}
}
}

/// Send a task to the beacon processor and await execution.
Expand Down
Loading