Skip to content

Commit 35fb976

Browse files
committed
subscribe to attest and sync duties polling completion
1 parent 7f06500 commit 35fb976

File tree

2 files changed

+41
-6
lines changed

2 files changed

+41
-6
lines changed

validator_client/validator_services/src/duties_service.rs

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@ use std::sync::Arc;
2828
use std::sync::atomic::{AtomicBool, Ordering};
2929
use std::time::Duration;
3030
use task_executor::TaskExecutor;
31-
use tokio::{sync::mpsc::Sender, time::sleep};
31+
use tokio::{
32+
sync::{mpsc::Sender, watch},
33+
time::sleep,
34+
};
3235
use tracing::{debug, error, info, warn};
3336
use types::{ChainSpec, Epoch, EthSpec, Hash256, SelectionProof, Slot};
3437
use validator_metrics::{ATTESTATION_DUTY, get_int_gauge, set_int_gauge};
@@ -373,6 +376,9 @@ impl<S, T> DutiesServiceBuilder<S, T> {
373376
}
374377

375378
pub fn build(self) -> Result<DutiesService<S, T>, String> {
379+
let (attesters_poll_tx, _) = watch::channel(Slot::default());
380+
let (sync_poll_tx, _) = watch::channel(Slot::default());
381+
376382
Ok(DutiesService {
377383
attesters: Default::default(),
378384
proposers: Default::default(),
@@ -394,6 +400,8 @@ impl<S, T> DutiesServiceBuilder<S, T> {
394400
enable_high_validator_count_metrics: self.enable_high_validator_count_metrics,
395401
selection_proof_config: self.attestation_selection_proof_config,
396402
disable_attesting: self.disable_attesting,
403+
attesters_poll_tx,
404+
sync_poll_tx,
397405
})
398406
}
399407
}
@@ -424,6 +432,10 @@ pub struct DutiesService<S, T> {
424432
/// Pass the config for distributed or non-distributed mode.
425433
pub selection_proof_config: SelectionProofConfig,
426434
pub disable_attesting: bool,
435+
/// Watch channel sender for signaling when attestation duty polling completes.
436+
pub(crate) attesters_poll_tx: watch::Sender<Slot>,
437+
/// Watch channel sender for signaling when sync committee duty polling completes.
438+
pub(crate) sync_poll_tx: watch::Sender<Slot>,
427439
}
428440

429441
impl<S: ValidatorStore, T: SlotClock + 'static> DutiesService<S, T> {
@@ -527,6 +539,16 @@ impl<S: ValidatorStore, T: SlotClock + 'static> DutiesService<S, T> {
527539
self.enable_high_validator_count_metrics
528540
|| self.total_validator_count() <= VALIDATOR_METRICS_MIN_COUNT
529541
}
542+
543+
/// Subscribe to notifications when attestation duty polling completes.
544+
pub fn subscribe_to_attesters_poll(&self) -> watch::Receiver<Slot> {
545+
self.attesters_poll_tx.subscribe()
546+
}
547+
548+
/// Subscribe to notifications when sync committee duty polling completes.
549+
pub fn subscribe_to_sync_poll(&self) -> watch::Receiver<Slot> {
550+
self.sync_poll_tx.subscribe()
551+
}
530552
}
531553

532554
/// Start the service that periodically polls the beacon node for validator duties. This will start
@@ -974,12 +996,19 @@ async fn poll_beacon_attesters_for_epoch<S: ValidatorStore + 'static, T: SlotClo
974996
local_indices: &[u64],
975997
local_pubkeys: &HashSet<PublicKeyBytes>,
976998
) -> Result<(), Error<S::Error>> {
999+
let current_slot = duties_service
1000+
.slot_clock
1001+
.now_or_genesis()
1002+
.unwrap_or_default();
1003+
9771004
// No need to bother the BN if we don't have any validators.
9781005
if local_indices.is_empty() {
9791006
debug!(
9801007
%epoch,
9811008
"No validators, not downloading duties"
9821009
);
1010+
// Signal that sync committee duty polling is complete for this slot.
1011+
duties_service.attesters_poll_tx.send_replace(current_slot);
9831012
return Ok(());
9841013
}
9851014

@@ -1020,8 +1049,10 @@ async fn poll_beacon_attesters_for_epoch<S: ValidatorStore + 'static, T: SlotClo
10201049
.collect::<Vec<_>>()
10211050
};
10221051

1052+
// No validators have conflicting (epoch, dependent_root) values or missing duties for the epoch.
10231053
if validators_to_update.is_empty() {
1024-
// No validators have conflicting (epoch, dependent_root) values or missing duties for the epoch.
1054+
// Signal that sync committee duty polling is complete for this slot.
1055+
duties_service.attesters_poll_tx.send_replace(current_slot);
10251056
return Ok(());
10261057
}
10271058

@@ -1065,10 +1096,6 @@ async fn poll_beacon_attesters_for_epoch<S: ValidatorStore + 'static, T: SlotClo
10651096
// Update the duties service with the new `DutyAndProof` messages.
10661097
let mut attesters = duties_service.attesters.write();
10671098
let mut already_warned = Some(());
1068-
let current_slot = duties_service
1069-
.slot_clock
1070-
.now_or_genesis()
1071-
.unwrap_or_default();
10721099
for duty in &new_duties {
10731100
let attester_map = attesters.entry(duty.pubkey).or_default();
10741101

@@ -1115,6 +1142,9 @@ async fn poll_beacon_attesters_for_epoch<S: ValidatorStore + 'static, T: SlotClo
11151142
}
11161143
drop(attesters);
11171144

1145+
// Signal that attestation duty polling is complete for this slot.
1146+
duties_service.attesters_poll_tx.send_replace(current_slot);
1147+
11181148
// Spawn the background task to compute selection proofs.
11191149
let subservice = duties_service.clone();
11201150
duties_service.executor.spawn(

validator_client/validator_services/src/sync.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,8 @@ pub async fn poll_sync_committee_duties<S: ValidatorStore + 'static, T: SlotCloc
285285
.altair_fork_epoch
286286
.is_none_or(|altair_epoch| current_epoch < altair_epoch)
287287
{
288+
// Signal that sync committee duty polling is complete for this slot.
289+
duties_service.sync_poll_tx.send_replace(current_slot);
288290
return Ok(());
289291
}
290292

@@ -323,6 +325,9 @@ pub async fn poll_sync_committee_duties<S: ValidatorStore + 'static, T: SlotCloc
323325
sync_duties.prune(current_sync_committee_period);
324326
}
325327

328+
// Signal that sync committee duty polling is complete for this slot.
329+
duties_service.sync_poll_tx.send_replace(current_slot);
330+
326331
// Pre-compute aggregator selection proofs for the current period.
327332
let (current_pre_compute_slot, new_pre_compute_duties) = sync_duties
328333
.prepare_for_aggregator_pre_compute::<S::E>(

0 commit comments

Comments
 (0)