Skip to content

Commit d4158ee

Browse files
committed
Fix wait for fully verified head for sync committee duties in local validator
1 parent 7fc0273 commit d4158ee

2 files changed

Lines changed: 53 additions & 29 deletions

File tree

validator/src/slot_head.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ use std::sync::Arc;
33

44
use anyhow::Result;
55
use bls::{PublicKeyBytes, SignatureBytes};
6+
use eth1_api::ApiController;
7+
use fork_choice_control::Wait;
68
use futures::lock::Mutex;
79
use helper_functions::{
810
accessors, misc, predicates,
@@ -12,6 +14,7 @@ use itertools::Itertools as _;
1214
use log::warn;
1315
use signer::{Signer, SigningMessage, SigningTriple};
1416
use slashing_protection::SlashingProtector;
17+
use tap::Pipe as _;
1518
use types::{
1619
altair::{
1720
containers::{SyncAggregatorSelectionData, SyncCommitteeMessage},
@@ -75,6 +78,17 @@ impl<P: Preset> SlotHead<P> {
7578
self.beacon_state.phase() >= Phase::Altair
7679
}
7780

81+
pub fn is_optimistic<W: Wait>(&self, controller: &ApiController<P, W>) -> Result<bool> {
82+
if !self.optimistic {
83+
return Ok(false);
84+
}
85+
86+
controller
87+
.block_by_root(self.beacon_block_root)?
88+
.is_none_or(|block| block.status.is_optimistic())
89+
.pipe(Ok)
90+
}
91+
7892
pub fn subnet_id(&self, slot: Slot, committee_index: CommitteeIndex) -> Result<SubnetId> {
7993
let committees_per_slot =
8094
accessors::get_committee_count_per_slot(&self.beacon_state, RelativeEpoch::Current);

validator/src/validator.rs

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -743,7 +743,7 @@ impl<P: Preset, W: Wait + Sync> Validator<P, W> {
743743
return Ok(());
744744
}
745745

746-
if slot_head.optimistic {
746+
if self.wait_for_fully_validated_head(slot_head).await.is_err() {
747747
warn!(
748748
"validator cannot produce a block because \
749749
chain head has not been fully verified by an execution engine",
@@ -975,31 +975,7 @@ impl<P: Preset, W: Wait + Sync> Validator<P, W> {
975975
wait_group: &W,
976976
slot_head: &SlotHead<P>,
977977
) -> Result<()> {
978-
const BLOCK_EVENT_WAIT_TIMEOUT: Duration = Duration::from_secs(1);
979-
980-
if slot_head.optimistic
981-
&& timeout(BLOCK_EVENT_WAIT_TIMEOUT, async {
982-
loop {
983-
let block_event =
984-
match self.event_channels.receiver_for(Topic::Block).recv().await {
985-
Ok(Event::Block(block_event)) => block_event,
986-
Ok(_) => continue,
987-
Err(error) => {
988-
debug!("error receiving block event: {error:?}");
989-
continue;
990-
}
991-
};
992-
993-
if block_event.block == slot_head.beacon_block_root
994-
&& !block_event.execution_optimistic
995-
{
996-
break;
997-
}
998-
}
999-
})
1000-
.await
1001-
.is_err()
1002-
{
978+
if self.wait_for_fully_validated_head(slot_head).await.is_err() {
1003979
warn!(
1004980
"validator cannot participate in attestation because \
1005981
chain head has not been fully verified by an execution engine",
@@ -1146,7 +1122,7 @@ impl<P: Preset, W: Wait + Sync> Validator<P, W> {
11461122

11471123
#[expect(clippy::too_many_lines)]
11481124
async fn publish_aggregates_and_proofs(&self, wait_group: &W, slot_head: &SlotHead<P>) {
1149-
if slot_head.optimistic {
1125+
if self.wait_for_fully_validated_head(slot_head).await.is_err() {
11501126
warn!(
11511127
"validators cannot participate in aggregation because \
11521128
chain head has not been fully verified by an execution engine",
@@ -1303,7 +1279,11 @@ impl<P: Preset, W: Wait + Sync> Validator<P, W> {
13031279
return Ok(());
13041280
}
13051281

1306-
if slot_head.optimistic {
1282+
if self
1283+
.wait_for_fully_validated_head(&slot_head)
1284+
.await
1285+
.is_err()
1286+
{
13071287
warn!(
13081288
"validator cannot participate in sync committees because \
13091289
chain head has not been fully verified by an execution engine",
@@ -1360,7 +1340,7 @@ impl<P: Preset, W: Wait + Sync> Validator<P, W> {
13601340
return;
13611341
}
13621342

1363-
if slot_head.optimistic {
1343+
if self.wait_for_fully_validated_head(slot_head).await.is_err() {
13641344
warn!(
13651345
"validator cannot participate in sync committees because \
13661346
chain head has not been fully verified by an execution engine",
@@ -2090,6 +2070,36 @@ impl<P: Preset, W: Wait + Sync> Validator<P, W> {
20902070
validator_statistics.track_collection_metrics().await;
20912071
}
20922072
}
2073+
2074+
async fn wait_for_fully_validated_head(&self, slot_head: &SlotHead<P>) -> Result<()> {
2075+
const BLOCK_EVENT_WAIT_TIMEOUT: Duration = Duration::from_secs(1);
2076+
2077+
if !slot_head.is_optimistic(&self.controller)? {
2078+
return Ok(());
2079+
}
2080+
2081+
timeout(BLOCK_EVENT_WAIT_TIMEOUT, async {
2082+
loop {
2083+
let block_event = match self.event_channels.receiver_for(Topic::Block).recv().await
2084+
{
2085+
Ok(Event::Block(block_event)) => block_event,
2086+
Ok(_) => continue,
2087+
Err(error) => {
2088+
warn!("error receiving block event: {error:?}");
2089+
continue;
2090+
}
2091+
};
2092+
2093+
if block_event.block == slot_head.beacon_block_root
2094+
&& !block_event.execution_optimistic
2095+
{
2096+
break;
2097+
}
2098+
}
2099+
})
2100+
.await
2101+
.map_err(Into::into)
2102+
}
20932103
}
20942104

20952105
// Use `BTreeMap` to make grouping deterministic for snapshot testing.

0 commit comments

Comments
 (0)