Skip to content
Open
Show file tree
Hide file tree
Changes from 40 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
ea45814
using events api for eager start attestation tasks
hopinheimer Aug 18, 2025
6f20083
merge `origin/unstable` into `hopinheimer/eager-send-attestation`
hopinheimer Aug 18, 2025
46fe220
minor nits
hopinheimer Aug 18, 2025
de7226f
clippy chill vibes
hopinheimer Aug 18, 2025
5f2a101
missed something
hopinheimer Aug 18, 2025
ae16705
linty
hopinheimer Aug 18, 2025
c13d0c3
implemented head monitoring service
hopinheimer Sep 19, 2025
8405f01
Merge branch 'unstable' of github.com:sigp/lighthouse into eager-send…
hopinheimer Sep 19, 2025
65e04fb
merge from feature branch
hopinheimer Sep 19, 2025
a1d36e6
fixing some linting issues
hopinheimer Sep 19, 2025
a489d32
comments and removing unwanted code
hopinheimer Sep 20, 2025
2b974db
clippy change
hopinheimer Sep 20, 2025
5600aef
Merge branch 'unstable' into eager-send-attestation
hopinheimer Oct 15, 2025
b65fc30
same data attestation bug solved
hopinheimer Oct 16, 2025
9024931
Merge branch 'unstable' into eager-send-attestation
hopinheimer Oct 20, 2025
b25703f
fixing dangling conditions and amaking head_monitor_service optional
hopinheimer Oct 20, 2025
e68548b
Merge branch 'eager-send-attestation' of github.com:hopinheimer/light…
hopinheimer Oct 20, 2025
c4d851c
fmt
hopinheimer Oct 20, 2025
daf5b2e
update comment
hopinheimer Oct 20, 2025
c49b8eb
Merge branch 'unstable' into eager-send-attestation
hopinheimer Oct 25, 2025
0e35ee5
massive refact
hopinheimer Oct 28, 2025
29867d2
fixes and linting
hopinheimer Oct 29, 2025
fd43876
remove unused code
hopinheimer Oct 31, 2025
b054a10
changes
hopinheimer Nov 1, 2025
eb057d0
Merge branch 'unstable' into eager-send-attestation
hopinheimer Nov 2, 2025
91e5980
Merge branch 'unstable' into eager-send-attestation
hopinheimer Nov 5, 2025
2e0c78c
Merge branch 'unstable' into eager-send-attestation
hopinheimer Nov 10, 2025
32eed9a
addressing comments
hopinheimer Nov 10, 2025
dac9f00
removing unwanted logs
hopinheimer Nov 11, 2025
bf1471c
fmt
hopinheimer Nov 11, 2025
9794737
Merge branch 'unstable' into eager-send-attestation
hopinheimer Nov 11, 2025
b20ded5
Merge branch 'unstable' into eager-send-attestation
hopinheimer Nov 12, 2025
39b9a58
fixing a unwanted service starting bug
hopinheimer Nov 14, 2025
2e6a8f9
Merge branch 'unstable' into eager-send-attestation
hopinheimer Nov 26, 2025
07d9a12
making the service feature flagged
hopinheimer Nov 26, 2025
a82960b
cleaned up some logic
hopinheimer Nov 26, 2025
6bddeeb
fix
hopinheimer Nov 26, 2025
409d937
updating docs
hopinheimer Nov 26, 2025
0d8160f
addressing comments
hopinheimer Nov 27, 2025
4ea159b
Resolve merge conflicts
eserilev Dec 1, 2025
70e6d18
testsss
hopinheimer Dec 1, 2025
84c8291
fmt
hopinheimer Dec 1, 2025
f174615
Merge branch 'eager-send-attestation' of github.com:hopinheimer/light…
hopinheimer Dec 1, 2025
468fe8a
Merge branch 'unstable' of https://github.com/sigp/lighthouse into ea…
eserilev Dec 1, 2025
bdf62d9
Merge branch 'eager-send-attestation' of https://github.com/hopinheim…
eserilev Dec 3, 2025
94af503
Resolve merge conflicts
eserilev Dec 3, 2025
8b45d57
addressing comments
hopinheimer Dec 17, 2025
ce91eca
Merge branch 'unstable' into eager-send-attestation
hopinheimer Dec 17, 2025
eabbdd4
resolving unstable changes
hopinheimer Dec 17, 2025
dfe2935
clippy
hopinheimer Dec 17, 2025
7b7ab09
update cli docs for default behaviour
hopinheimer Dec 17, 2025
435a93c
Merge branch 'unstable' into eager-send-attestation
eserilev Jan 14, 2026
00962ea
Merge remote-tracking branch 'origin/unstable' into eager-send-attest…
michaelsproul Jan 15, 2026
c2092eb
Small tweaks
michaelsproul Jan 15, 2026
b388efe
Flip flag polarity and make it work
michaelsproul Jan 15, 2026
4bfafc1
Update book
michaelsproul Jan 15, 2026
face04f
Add eth2 events feature
michaelsproul Jan 15, 2026
b30d710
addressing comments
hopinheimer Jan 15, 2026
28dbe33
linting
hopinheimer Jan 15, 2026
8cfa532
Merge branch 'unstable' of github.com:sigp/lighthouse into eager-send…
hopinheimer Jan 16, 2026
516ccd7
fixing unfavorable condition
hopinheimer Jan 19, 2026
3d4dfe0
Merge branch 'unstable' into eager-send-attestation
hopinheimer Jan 19, 2026
4b4d0d1
Merge branch 'unstable' into eager-send-attestation
hopinheimer Jan 21, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions book/src/help_vc.md
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,13 @@ Flags:
--distributed
Enables functionality required for running the validator in a
distributed validator cluster.
--enable-beacon-head-monitor
Enable the beacon head monitor so fallback head updates trigger duties
when a lagging primary is detected. This keeps the attestation
service responsive when using multiple beacon nodes, but it relies on
the fallback service streaming head events which may increase network
usage. When not enabled (default), duties are only triggered on slot
boundaries and ignore fallback head changes.
--enable-doppelganger-protection
If this flag is set, Lighthouse will delay startup for three epochs
and monitor for messages on the network by any of the validators
Expand Down
144 changes: 144 additions & 0 deletions validator_client/beacon_node_fallback/src/beacon_head_monitor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
use crate::BeaconNodeFallback;
use eth2::types::{EventKind, EventTopic, SseHead};
use futures::StreamExt;
use slot_clock::SlotClock;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{info, warn};
use types::EthSpec;

type CacheHashMap = HashMap<usize, SseHead>;

// This is used send the index derived from `CandidateBeaconNode` to the
// `AttestationService` for further processing
#[derive(Debug)]
pub struct HeadEvent {
pub beacon_node_index: usize,
}

/// Cache to maintain the latest head received from each of the beacon nodes
/// in the `BeaconNodeFallback`.
#[derive(Debug)]
pub struct BeaconHeadCache {
cache: RwLock<CacheHashMap>,
}
Comment on lines +23 to +26
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this cache really needed? Can't we just send the slot of the new head to the attestation service via the HeadEvent above, and the attestation service checks if it is still relevant for it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's useful to have the head monitor handle this, as it might eventually be useful for things other than just notifying the attestation service.

E.g. when proposing a block we might want to query the head monitor, or the attestation consensus system might want to query the cached head of each BN to work out whether consensus exists (this is being worked on by Eitan in #8445)


impl BeaconHeadCache {
pub fn new() -> Self {
Self {
cache: RwLock::new(HashMap::new()),
}
}

pub async fn get(&self, beacon_node_index: usize) -> Option<SseHead> {
self.cache.read().await.get(&beacon_node_index).cloned()
}

pub async fn insert(&self, beacon_node_index: usize, head: SseHead) {
self.cache.write().await.insert(beacon_node_index, head);
}

pub async fn is_latest(&self, head: &SseHead) -> bool {
let cache = self.cache.read().await;
cache
.values()
.all(|cache_head| head.slot >= cache_head.slot)
}

pub async fn purge_cache(&self) {
self.cache.write().await.clear();
}
}

impl Default for BeaconHeadCache {
fn default() -> Self {
Self::new()
}
}

// Runs a non-terminating loop to update the `BeaconHeadCache` with the latest head received
// from the candidate beacon_nodes. This is an attempt to stream events to beacon nodes and
// potential start attestion duties earlier as soon as latest head is receive from any of the
// beacon node in contrast to attest at the 1/3rd mark in the slot.
//
//
// The cache and the candidate BNs list are refresh/purged to avoid dangling reference conditions
// that arise due to `update_candidates_list`.
//
// Starts the service to perpetually stream head events from connected beacon_nodes
pub async fn poll_head_event_from_beacon_nodes<E: EthSpec, T: SlotClock + 'static>(
beacon_nodes: Arc<BeaconNodeFallback<T>>,
) -> Result<(), String> {
let head_cache = beacon_nodes
.beacon_head_cache
.clone()
.expect("Unable to start head monitor without beacon_head_cache");
let head_monitor_send = beacon_nodes
.head_monitor_send
.clone()
.expect("Unable to start head monitor without head_monitor_send");

info!("Starting head monitoring service");
let candidates = {
let candidates_guard = beacon_nodes.candidates.read().await;
candidates_guard.clone()
};
let mut tasks = vec![];

for candidate in candidates.iter() {
let head_event_stream = candidate
.beacon_node
.get_events::<E>(&[EventTopic::Head])
.await;

let mut head_event_stream = match head_event_stream {
Ok(stream) => stream,
Err(e) => {
warn!("failed to get head event stream: {:?}", e);
continue;
}
};

let sender_tx = head_monitor_send.clone();
let head_cache_ref = head_cache.clone();

let stream_fut = async move {
while let Some(event_result) = head_event_stream.next().await {
if let Ok(EventKind::Head(head)) = event_result {
head_cache_ref.insert(candidate.index, head.clone()).await;

if !head_cache_ref.is_latest(&head).await {
continue;
}

if sender_tx
.send(HeadEvent {
beacon_node_index: candidate.index,
})
.await
.is_err()
{
warn!("Head monitoring service channel closed");
}
}
}
};

tasks.push(stream_fut);
}

if tasks.is_empty() {
head_cache.purge_cache().await;
return Err(
"No beacon nodes available for head event streaming, retry in sometime".to_string(),
);
}

futures::future::join_all(tasks).await;

drop(candidates);
head_cache.purge_cache().await;

Ok(())
}
77 changes: 76 additions & 1 deletion validator_client/beacon_node_fallback/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
//! "fallback" behaviour; it will try a request on all of the nodes until one or none of them
//! succeed.

pub mod beacon_head_monitor;
pub mod beacon_node_health;

use beacon_head_monitor::{BeaconHeadCache, HeadEvent, poll_head_event_from_beacon_nodes};
use beacon_node_health::{
BeaconNodeHealth, BeaconNodeSyncDistanceTiers, ExecutionEngineHealth, IsOptimistic,
SyncDistanceTier, check_node_health,
Expand All @@ -22,7 +25,11 @@ use std::time::{Duration, Instant};
use std::vec::Vec;
use strum::EnumVariantNames;
use task_executor::TaskExecutor;
use tokio::{sync::RwLock, time::sleep};

use tokio::{
sync::{RwLock, mpsc},
time::sleep,
};
use tracing::{debug, error, warn};
use types::{ChainSpec, Config as ConfigSpec, EthSpec, Slot};
use validator_metrics::{ENDPOINT_ERRORS, ENDPOINT_REQUESTS, inc_counter_vec};
Expand Down Expand Up @@ -68,6 +75,24 @@ pub fn start_fallback_updater_service<T: SlotClock + 'static, E: EthSpec>(
return Err("Cannot start fallback updater without slot clock");
}

let beacon_nodes_ref = beacon_nodes.clone();

// the existence of head_monitor_send is overloaded with the predicate of
// requirement of starting the head monitoring service or not.
if beacon_nodes_ref.head_monitor_send.is_some() {
let head_monitor_future = async move {
loop {
if let Err(err) =
poll_head_event_from_beacon_nodes::<E, T>(beacon_nodes_ref.clone()).await
{
warn!(error=?err, "Head service failed");
}
}
};

executor.spawn(head_monitor_future, "head_monitoring");
}

let future = async move {
loop {
beacon_nodes.update_all_candidates::<E>().await;
Expand Down Expand Up @@ -380,6 +405,8 @@ pub struct BeaconNodeFallback<T> {
pub candidates: Arc<RwLock<Vec<CandidateBeaconNode>>>,
distance_tiers: BeaconNodeSyncDistanceTiers,
slot_clock: Option<T>,
beacon_head_cache: Option<Arc<BeaconHeadCache>>,
head_monitor_send: Option<Arc<mpsc::Sender<HeadEvent>>>,
broadcast_topics: Vec<ApiTopic>,
spec: Arc<ChainSpec>,
}
Expand All @@ -396,6 +423,8 @@ impl<T: SlotClock> BeaconNodeFallback<T> {
candidates: Arc::new(RwLock::new(candidates)),
distance_tiers,
slot_clock: None,
beacon_head_cache: None,
head_monitor_send: None,
broadcast_topics,
spec,
}
Expand All @@ -410,6 +439,15 @@ impl<T: SlotClock> BeaconNodeFallback<T> {
self.slot_clock = Some(slot_clock);
}

/// This the head monitor channel that streams events from all the beacon node that the
/// validator client is connected in the `BeaconNodeFallback`. This is also initialize the
/// beacon_head_cache under the assumption the beacon_head_cache will always be needed when
/// head_monitor_send is set.
pub fn set_head_send(&mut self, head_monitor_send: Arc<mpsc::Sender<HeadEvent>>) {
self.head_monitor_send = Some(head_monitor_send);
self.beacon_head_cache = Some(Arc::new(BeaconHeadCache::new()));
}

/// The count of candidates, regardless of their state.
pub async fn num_total(&self) -> usize {
self.candidates.read().await.len()
Expand Down Expand Up @@ -493,6 +531,10 @@ impl<T: SlotClock> BeaconNodeFallback<T> {
let mut candidates = self.candidates.write().await;
*candidates = new_candidates;

if let Some(cache) = &self.beacon_head_cache {
cache.purge_cache().await;
}

Ok(new_list)
}

Expand Down Expand Up @@ -646,6 +688,39 @@ impl<T: SlotClock> BeaconNodeFallback<T> {
Err(Errors(errors))
}

/// Try `func` on a specific beacon node by index first, then fall back to the normal order.
/// Returns immediately if the preferred node succeeds, otherwise falls back to first_success.
/// This is an insurance against potential race conditions that may arise.
pub async fn first_success_from_index<F, O, Err, R>(
&self,
preferred_index: Option<usize>,
func: F,
) -> Result<O, Errors<Err>>
where
F: Fn(BeaconNodeHttpClient) -> R + Clone,
R: Future<Output = Result<O, Err>>,
Err: Debug,
{
// Try the preferred beacon node first if it exists
if let Some(preferred_idx) = preferred_index
&& let candidates = self.candidates.read().await
&& let Some(preferred_candidate) = candidates.iter().find(|c| c.index == preferred_idx)
{
let preferred_node = preferred_candidate.beacon_node.clone();
drop(candidates);

match Self::run_on_candidate(preferred_node, &func).await {
Ok(val) => return Ok(val),
Err(_) => {
return self.first_success(func).await;
}
}
}

// Fall back to normal first_success behavior
self.first_success(func).await
}

/// Run the future `func` on `candidate` while reporting metrics.
async fn run_on_candidate<F, R, Err, O>(
candidate: BeaconNodeHttpClient,
Expand Down
11 changes: 11 additions & 0 deletions validator_client/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,17 @@ pub struct ValidatorClient {
)]
pub beacon_nodes_sync_tolerances: Vec<u64>,

#[clap(
long,
help = "Enable the beacon head monitor so fallback head updates trigger duties when a lagging primary is detected. \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line has too many spaces in it, we could probably flow it onto the next line,

Suggested change
help = "Enable the beacon head monitor so fallback head updates trigger duties when a lagging primary is detected. \
help = "Enable the beacon head monitor so fallback head updates trigger duties when a lagging primary is detected. \

This keeps the attestation service responsive when using multiple beacon nodes, but it relies on the \
fallback service streaming head events which may increase network usage. \
When not enabled (default), duties are only triggered on slot boundaries and ignore fallback head changes.",
display_order = 0,
help_heading = FLAG_HEADER
)]
pub enable_beacon_head_monitor: bool,

#[clap(
long,
help = "Disable Lighthouse's slashing protection for all web3signer keys. This can \
Expand Down
5 changes: 5 additions & 0 deletions validator_client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ pub struct Config {
pub broadcast_topics: Vec<ApiTopic>,
/// Enables a service which attempts to measure latency between the VC and BNs.
pub enable_latency_measurement_service: bool,
/// Enables the beacon head monitor that reacts to fallback head updates.
#[serde(default)]
pub enable_beacon_head_monitor: bool,
/// Defines the number of validators per `validator/register_validator` request sent to the BN.
pub validator_registration_batch_size: usize,
/// Whether we are running with distributed network support.
Expand Down Expand Up @@ -129,6 +132,7 @@ impl Default for Config {
builder_registration_timestamp_override: None,
broadcast_topics: vec![ApiTopic::Subscriptions],
enable_latency_measurement_service: true,
enable_beacon_head_monitor: false,
validator_registration_batch_size: 500,
distributed: false,
initialized_validators: <_>::default(),
Expand Down Expand Up @@ -368,6 +372,7 @@ impl Config {
config.validator_store.builder_boost_factor = validator_client_config.builder_boost_factor;
config.enable_latency_measurement_service =
!validator_client_config.disable_latency_measurement_service;
config.enable_beacon_head_monitor = validator_client_config.enable_beacon_head_monitor;

config.validator_registration_batch_size =
validator_client_config.validator_registration_batch_size;
Expand Down
25 changes: 21 additions & 4 deletions validator_client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ use metrics::set_gauge;
use monitoring_api::{MonitoringHttpClient, ProcessType};
use sensitive_url::SensitiveUrl;
use slashing_protection::{SLASHING_PROTECTION_FILENAME, SlashingDatabase};
use tokio::sync::Mutex;

use account_utils::validator_definitions::ValidatorDefinitions;
use beacon_node_fallback::{
BeaconNodeFallback, CandidateBeaconNode, start_fallback_updater_service,
BeaconNodeFallback, CandidateBeaconNode, beacon_head_monitor::HeadEvent,
start_fallback_updater_service,
};
use clap::ArgMatches;
use doppelganger_service::DoppelgangerService;
Expand Down Expand Up @@ -72,6 +74,8 @@ pub const AGGREGATION_PRE_COMPUTE_EPOCHS: u64 = 2;
/// Number of slots in advance to compute sync selection proofs when in `distributed` mode.
pub const AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED: u64 = 1;

const MAX_HEAD_EVENT_QUEUE_LEN: usize = 1_024;

type ValidatorStore<E> = LighthouseValidatorStore<SystemTimeSlotClock, E>;

#[derive(Clone)]
Expand Down Expand Up @@ -386,6 +390,17 @@ impl<E: EthSpec> ProductionValidatorClient<E> {
beacon_nodes.set_slot_clock(slot_clock.clone());
proposer_nodes.set_slot_clock(slot_clock.clone());

// Only the beacon_nodes are used for attestation duties and thus biconditionally
// proposer_nodes do not need head_send ref.
let head_monitor_rx = if config.enable_beacon_head_monitor {
let (head_monitor_tx, head_receiver) =
mpsc::channel::<HeadEvent>(MAX_HEAD_EVENT_QUEUE_LEN);
beacon_nodes.set_head_send(Arc::new(head_monitor_tx));
Some(Arc::new(Mutex::new(head_receiver)))
} else {
None
};

let beacon_nodes = Arc::new(beacon_nodes);
start_fallback_updater_service::<_, E>(context.executor.clone(), beacon_nodes.clone())?;

Expand Down Expand Up @@ -495,15 +510,17 @@ impl<E: EthSpec> ProductionValidatorClient<E> {

let block_service = block_service_builder.build()?;

let attestation_service = AttestationServiceBuilder::new()
let attestation_builder = AttestationServiceBuilder::new()
.duties_service(duties_service.clone())
.slot_clock(slot_clock.clone())
.validator_store(validator_store.clone())
.beacon_nodes(beacon_nodes.clone())
.executor(context.executor.clone())
.head_monitor_rx(head_monitor_rx)
.chain_spec(context.eth2_config.spec.clone())
.disable(config.disable_attesting)
.build()?;
.disable(config.disable_attesting);

let attestation_service = attestation_builder.build()?;

let preparation_service = PreparationServiceBuilder::new()
.slot_clock(slot_clock.clone())
Expand Down
Loading