Skip to content

Commit 8aba0c4

Browse files
committed
chore: refactor polling of duties in metadata service
1 parent 73e9a82 commit 8aba0c4

File tree

4 files changed

+192
-17
lines changed

4 files changed

+192
-17
lines changed

anchor/common/ssv_types/src/message.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,7 @@ impl SignedSSVMessage {
454454
}
455455
})
456456
})
457-
.collect();
457+
.collect::<Result<Vec<_>, _>>()?;
458458

459459
// Then convert the Vec of VariableLists to VariableList<VariableList<u8, U256>, U13>
460460
// This can fail if we have more than 13 signatures

anchor/validator_store/src/lib.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -968,6 +968,16 @@ pub enum SpecificError {
968968
AggregatorInfoSlotPassed,
969969
/// Watch channel for AggregationAssignments has been closed
970970
AggregatorInfoChannelClosed,
971+
/// produce_selection_proof called for validator not in VotingAssignments.attesting_committees
972+
ValidatorNotAttesting {
973+
validator_pubkey: PublicKeyBytes,
974+
slot: Slot,
975+
},
976+
/// produce_sync_selection_proof called for validator not in VotingAssignments.sync_validators_by_subnet
977+
ValidatorNotInSyncCommittee {
978+
validator_pubkey: PublicKeyBytes,
979+
slot: Slot,
980+
},
971981
}
972982

973983
impl From<CollectionError> for SpecificError {
@@ -1531,6 +1541,21 @@ impl<T: SlotClock, E: EthSpec> ValidatorStore for AnchorValidatorStore<T, E> {
15311541
let committee_id = cluster.committee_id();
15321542
let voting_assignments = self.get_voting_assignments(slot).await?;
15331543

1544+
// Defensive check: validator should be in `VotingAssignments` since both this
1545+
// function call and `VotingAssignments` are derived from `DutiesService`. If not,
1546+
// there's an inconsistency (e.g., stale cache after poll timeout) and we
1547+
// should not participate with a wrong `num_signatures_to_collect`.
1548+
if !voting_assignments
1549+
.attesting_committees
1550+
.contains_key(&validator_pubkey)
1551+
{
1552+
return Err(SpecificError::ValidatorNotAttesting {
1553+
validator_pubkey,
1554+
slot,
1555+
}
1556+
.into());
1557+
}
1558+
15341559
// Build a set of validator indices in this committee
15351560
// This handles divergent operator views, since we only count validators we have
15361561
// shares
@@ -1629,6 +1654,24 @@ impl<T: SlotClock, E: EthSpec> ValidatorStore for AnchorValidatorStore<T, E> {
16291654
let committee_id = cluster.committee_id();
16301655
let voting_assignments = self.get_voting_assignments(slot).await?;
16311656

1657+
// Defensive check: validator should be in `VotingAssignments` since both this
1658+
// function call and `VotingAssignments` are derived from `DutiesService`. If not,
1659+
// there's an inconsistency (e.g., stale cache after poll timeout) and we
1660+
// should not participate with a wrong `num_signatures_to_collect`.
1661+
let validator_index = validator
1662+
.index
1663+
.ok_or(SpecificError::MissingIndex)?;
1664+
if !voting_assignments
1665+
.sync_validators_by_subnet
1666+
.contains_key(&validator_index)
1667+
{
1668+
return Err(SpecificError::ValidatorNotInSyncCommittee {
1669+
validator_pubkey: *validator_pubkey,
1670+
slot,
1671+
}
1672+
.into());
1673+
}
1674+
16321675
// Build a set of validator indices in this committee
16331676
// This handles divergent operator views, since we only count validators we have
16341677
// shares for

anchor/validator_store/src/metadata_service.rs

Lines changed: 95 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,21 @@
11
use std::{
22
collections::{HashMap, HashSet},
33
sync::Arc,
4-
time::Duration,
4+
time::{Duration, Instant},
55
};
66

77
use beacon_node_fallback::BeaconNodeFallback;
88
use slot_clock::SlotClock;
99
use ssv_types::{ValidatorIndex, consensus::BeaconVote};
1010
use task_executor::TaskExecutor;
1111
use tokio::{sync::watch, time::sleep};
12-
use tracing::{error, info, trace};
12+
use tracing::{error, info, trace, warn};
1313
use types::{ChainSpec, EthSpec, Slot, SyncSubnetId};
1414
use validator_services::duties_service::DutiesService;
1515

1616
use crate::{
1717
AggregationAssignments, AnchorValidatorStore, ContributionWaiter, VotingAssignments,
18-
VotingContext,
18+
VotingContext, metrics,
1919
};
2020

2121
#[derive(Clone)]
@@ -72,12 +72,24 @@ impl<E: EthSpec, T: SlotClock + 'static> MetadataService<E, T> {
7272
// PHASE 1: VotingAssignments (slot start)
7373
// Caches voting assignments for use by both selection proofs AND voting context.
7474
// Waits for DutiesService to complete polling before reading duties.
75+
//
76+
// RESILIENCE: We wait up to 3.5s for poll signals, but always proceed to
77+
// read from duties cache regardless of poll outcome. This handles:
78+
// - Normal operation: Signals arrive quickly (< 100ms), we read fresh duties
79+
// - Beacon node slow: Signal arrives late (< 3s), we still get fresh duties
80+
// - Beacon node down: Timeout after 3.5s, we read from cache (stale or empty)
81+
//
82+
// The 3.5s timeout is chosen because:
83+
// - Lighthouse BN API timeout is 3 seconds (slot_duration / 4)
84+
// - Phase 2 starts at 4 seconds (1/3 slot)
85+
// - This gives 500ms buffer after BN timeout before Phase 2 deadline
7586
// ═══════════════════════════════════════════════════════════════════════
7687
let self_clone_phase1 = self.clone();
7788
executor.spawn(
7889
async move {
7990
let mut attesters_poll_rx = self_clone_phase1.attesters_poll_rx.clone();
8091
let mut sync_poll_rx = self_clone_phase1.sync_poll_rx.clone();
92+
let poll_timeout = Duration::from_millis(3500);
8193

8294
loop {
8395
if let Some(duration_to_next_slot) =
@@ -87,20 +99,71 @@ impl<E: EthSpec, T: SlotClock + 'static> MetadataService<E, T> {
8799
sleep(duration_to_next_slot).await;
88100

89101
let slot: Slot = self_clone_phase1.slot_clock.now().unwrap_or_default();
90-
91-
// Wait for BOTH DutiesService polls to complete for this slot.
92-
// This guarantees duties are cached before we read them.
93-
// If poll channels are closed, skip this slot to avoid signing with stale
94-
// data.
95-
if attesters_poll_rx.wait_for(|&s| s >= slot).await.is_err() {
96-
error!(%slot, "Attesters poll channel closed, skipping slot");
97-
continue;
102+
let poll_start = Instant::now();
103+
104+
// Wait for poll signals with timeout (parallel execution).
105+
let (attesters_ready, sync_ready) = tokio::join!(
106+
async {
107+
tokio::time::timeout(
108+
poll_timeout,
109+
attesters_poll_rx.wait_for(|&s| s >= slot),
110+
)
111+
.await
112+
.is_ok_and(|r| r.is_ok())
113+
},
114+
async {
115+
tokio::time::timeout(
116+
poll_timeout,
117+
sync_poll_rx.wait_for(|&s| s >= slot),
118+
)
119+
.await
120+
.is_ok_and(|r| r.is_ok())
121+
}
122+
);
123+
124+
let poll_duration = poll_start.elapsed();
125+
126+
// Log poll failures
127+
if !attesters_ready {
128+
warn!(
129+
%slot,
130+
poll_duration_ms = poll_duration.as_millis(),
131+
"Attesters poll failed or timed out - will use cached duties"
132+
);
98133
}
99-
if sync_poll_rx.wait_for(|&s| s >= slot).await.is_err() {
100-
error!(%slot, "Sync poll channel closed, skipping slot");
101-
continue;
134+
if !sync_ready {
135+
warn!(
136+
%slot,
137+
poll_duration_ms = poll_duration.as_millis(),
138+
"Sync poll failed or timed out - will use cached duties"
139+
);
102140
}
103141

142+
// Record poll telemetry
143+
metrics::inc_counter_vec(
144+
&metrics::METADATA_SERVICE_POLL_TOTAL,
145+
&[
146+
metrics::ATTESTERS,
147+
if attesters_ready { metrics::SUCCESS } else { metrics::FAILED },
148+
],
149+
);
150+
metrics::inc_counter_vec(
151+
&metrics::METADATA_SERVICE_POLL_TOTAL,
152+
&[
153+
metrics::SYNC,
154+
if sync_ready { metrics::SUCCESS } else { metrics::FAILED },
155+
],
156+
);
157+
metrics::observe(
158+
&metrics::METADATA_SERVICE_POLL_DURATION,
159+
poll_duration.as_secs_f64(),
160+
);
161+
162+
// Always proceed to build VotingAssignments regardless of poll results.
163+
// Even if polls failed, the duties cache may have:
164+
// - Fresh data from a previous poll this slot
165+
// - Stale data from previous slot/epoch (better than nothing)
166+
// - Empty data (only if poll timed out AND this is a fresh restart)
104167
if let Err(err) = self_clone_phase1.update_voting_assignments() {
105168
error!(err, "Failed to update validator voting assignments");
106169
}
@@ -177,7 +240,7 @@ impl<E: EthSpec, T: SlotClock + 'static> MetadataService<E, T> {
177240
let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?;
178241

179242
// Get attestation validators
180-
let (attesting_validators, attesting_committees) = self
243+
let (attesting_validators, attesting_committees): (Vec<_>, HashMap<_, _>) = self
181244
.duties_service
182245
.attesters(slot)
183246
.into_iter()
@@ -222,6 +285,9 @@ impl<E: EthSpec, T: SlotClock + 'static> MetadataService<E, T> {
222285
})
223286
.unwrap_or_default();
224287

288+
let attester_count = attesting_validators.len();
289+
let sync_count = sync_validators_by_subnet.len();
290+
225291
let voting_assignments = VotingAssignments {
226292
slot,
227293
attesting_validators,
@@ -232,7 +298,20 @@ impl<E: EthSpec, T: SlotClock + 'static> MetadataService<E, T> {
232298
self.validator_store
233299
.update_voting_assignments(voting_assignments);
234300

235-
trace!(%slot, "Published VotingAssignments at slot start");
301+
// Record validator count metrics
302+
metrics::set_gauge(
303+
&metrics::METADATA_SERVICE_ATTESTING_VALIDATORS,
304+
attester_count as i64,
305+
);
306+
metrics::set_gauge(
307+
&metrics::METADATA_SERVICE_SYNC_VALIDATORS,
308+
sync_count as i64,
309+
);
310+
if attester_count == 0 && sync_count == 0 {
311+
metrics::inc_counter(&metrics::METADATA_SERVICE_EMPTY_ASSIGNMENTS_TOTAL);
312+
}
313+
314+
trace!(%slot, attester_count, sync_count, "Published VotingAssignments at slot start");
236315
Ok(())
237316
}
238317

anchor/validator_store/src/metrics.rs

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,56 @@ pub static SIGNED_RANDAO_REVEALS_TOTAL: LazyLock<Result<IntCounterVec>> = LazyLo
2424
&["status"],
2525
)
2626
});
27+
28+
// ═══════════════════════════════════════════════════════════════════════════════
29+
// MetadataService metrics
30+
// ═══════════════════════════════════════════════════════════════════════════════
31+
32+
// Poll outcome labels
33+
pub const ATTESTERS: &str = "attesters";
34+
pub const SYNC: &str = "sync";
35+
pub const SUCCESS: &str = "success";
36+
pub const FAILED: &str = "failed";
37+
38+
/// Count of poll attempts by type (attesters/sync) and outcome (success/failed)
39+
pub static METADATA_SERVICE_POLL_TOTAL: LazyLock<Result<IntCounterVec>> = LazyLock::new(|| {
40+
try_create_int_counter_vec(
41+
"anchor_metadata_service_poll_total",
42+
"Count of DutiesService poll wait attempts",
43+
&["type", "outcome"],
44+
)
45+
});
46+
47+
/// Duration of poll wait phase (both polls in parallel)
48+
pub static METADATA_SERVICE_POLL_DURATION: LazyLock<Result<Histogram>> = LazyLock::new(|| {
49+
try_create_histogram(
50+
"anchor_metadata_service_poll_duration_seconds",
51+
"Duration waiting for DutiesService poll signals",
52+
)
53+
});
54+
55+
/// Current count of attesting validators in VotingAssignments
56+
pub static METADATA_SERVICE_ATTESTING_VALIDATORS: LazyLock<Result<IntGauge>> =
57+
LazyLock::new(|| {
58+
try_create_int_gauge(
59+
"anchor_metadata_service_attesting_validators",
60+
"Count of validators with attestation duties this slot",
61+
)
62+
});
63+
64+
/// Current count of sync committee validators in VotingAssignments
65+
pub static METADATA_SERVICE_SYNC_VALIDATORS: LazyLock<Result<IntGauge>> = LazyLock::new(|| {
66+
try_create_int_gauge(
67+
"anchor_metadata_service_sync_validators",
68+
"Count of validators with sync committee duties this slot",
69+
)
70+
});
71+
72+
/// Count of slots where VotingAssignments was empty
73+
pub static METADATA_SERVICE_EMPTY_ASSIGNMENTS_TOTAL: LazyLock<Result<IntCounter>> =
74+
LazyLock::new(|| {
75+
try_create_int_counter(
76+
"anchor_metadata_service_empty_assignments_total",
77+
"Count of slots where VotingAssignments had no duties",
78+
)
79+
});

0 commit comments

Comments
 (0)