diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index fc12a4c5f35..5bfa8ddd6ff 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -135,11 +135,13 @@ pub struct Timeouts { pub attestation: Duration, pub attester_duties: Duration, pub attestation_subscriptions: Duration, + pub attestation_aggregators: Duration, pub liveness: Duration, pub proposal: Duration, pub proposer_duties: Duration, pub sync_committee_contribution: Duration, pub sync_duties: Duration, + pub sync_aggregators: Duration, pub get_beacon_blocks_ssz: Duration, pub get_debug_beacon_states: Duration, pub get_deposit_snapshot: Duration, @@ -153,11 +155,13 @@ impl Timeouts { attestation: timeout, attester_duties: timeout, attestation_subscriptions: timeout, + attestation_aggregators: timeout, liveness: timeout, proposal: timeout, proposer_duties: timeout, sync_committee_contribution: timeout, sync_duties: timeout, + sync_aggregators: timeout, get_beacon_blocks_ssz: timeout, get_debug_beacon_states: timeout, get_deposit_snapshot: timeout, @@ -2732,6 +2736,42 @@ impl BeaconNodeHttpClient { ) .await } + + /// `POST validator/beacon_committee_selections` + pub async fn post_validator_beacon_committee_selections( + &self, + selections: &[BeaconCommitteeSelection], + ) -> Result>, Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("beacon_committee_selections"); + + self.post_with_timeout_and_response( + path, + &selections, + self.timeouts.attestation_aggregators, + ) + .await + } + + /// `POST validator/sync_committee_selections` + pub async fn post_validator_sync_committee_selections( + &self, + selections: &[SyncCommitteeSelection], + ) -> Result>, Error> { + let mut path = self.eth_path(V1)?; + + path.path_segments_mut() + .map_err(|()| Error::InvalidUrl(self.server.clone()))? + .push("validator") + .push("sync_committee_selections"); + + self.post_with_timeout_and_response(path, &selections, self.timeouts.sync_aggregators) + .await + } } /// Returns `Ok(response)` if the response is a `200 OK` response. Otherwise, creates an diff --git a/common/eth2/src/types.rs b/common/eth2/src/types.rs index 0f9fa7f2c2d..06c983b1a3d 100644 --- a/common/eth2/src/types.rs +++ b/common/eth2/src/types.rs @@ -930,6 +930,23 @@ pub struct PeerCount { pub disconnecting: u64, } +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct BeaconCommitteeSelection { + #[serde(with = "serde_utils::quoted_u64")] + pub validator_index: u64, + pub slot: Slot, + pub selection_proof: Signature, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub struct SyncCommitteeSelection { + #[serde(with = "serde_utils::quoted_u64")] + pub validator_index: u64, + pub slot: Slot, + #[serde(with = "serde_utils::quoted_u64")] + pub subcommittee_index: u64, + pub selection_proof: Signature, +} // --------- Server Sent Event Types ----------- #[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] diff --git a/consensus/types/src/selection_proof.rs b/consensus/types/src/selection_proof.rs index c80a00c3d1f..60446ee3c50 100644 --- a/consensus/types/src/selection_proof.rs +++ b/consensus/types/src/selection_proof.rs @@ -3,10 +3,12 @@ use crate::{ }; use ethereum_hashing::hash; use safe_arith::{ArithError, SafeArith}; +use serde::{Deserialize, Serialize}; use ssz::Encode; use std::cmp; -#[derive(arbitrary::Arbitrary, PartialEq, Debug, Clone)] +#[derive(arbitrary::Arbitrary, PartialEq, Debug, Clone, Serialize, Deserialize)] +#[serde(transparent)] pub struct SelectionProof(Signature); impl SelectionProof { diff --git a/consensus/types/src/sync_selection_proof.rs b/consensus/types/src/sync_selection_proof.rs index 4adb90b26e2..61302a9dfb2 100644 --- a/consensus/types/src/sync_selection_proof.rs +++ b/consensus/types/src/sync_selection_proof.rs @@ -7,11 +7,13 @@ use crate::{ }; use ethereum_hashing::hash; use safe_arith::{ArithError, SafeArith}; +use serde::{Deserialize, Serialize}; use ssz::Encode; use ssz_types::typenum::Unsigned; use std::cmp; -#[derive(arbitrary::Arbitrary, PartialEq, Debug, Clone)] +#[derive(arbitrary::Arbitrary, PartialEq, Debug, Clone, Serialize, Deserialize)] +#[serde(transparent)] pub struct SyncSelectionProof(Signature); impl SyncSelectionProof { diff --git a/validator_client/src/lib.rs b/validator_client/src/lib.rs index a7993dc879c..5dccda20f0c 100644 --- a/validator_client/src/lib.rs +++ b/validator_client/src/lib.rs @@ -4,6 +4,7 @@ mod latency; mod notifier; use crate::cli::ValidatorClient; +use crate::duties_service::SelectionProofConfig; pub use config::Config; use initialized_validators::InitializedValidators; use metrics::set_gauge; @@ -59,11 +60,13 @@ const WAITING_FOR_GENESIS_POLL_TIME: Duration = Duration::from_secs(12); const HTTP_ATTESTATION_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT: u32 = 24; +const HTTP_ATTESTATION_AGGREGATOR_TIMEOUT_QUOTIENT: u32 = 24; // For DVT involving middleware only const HTTP_LIVENESS_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_PROPOSAL_TIMEOUT_QUOTIENT: u32 = 2; const HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT: u32 = 4; +const HTTP_SYNC_AGGREGATOR_TIMEOUT_QUOTIENT: u32 = 24; // For DVT involving middleware only const HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT: u32 = 4; const HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT: u32 = 4; const HTTP_GET_DEPOSIT_SNAPSHOT_QUOTIENT: u32 = 4; @@ -72,6 +75,22 @@ const HTTP_DEFAULT_TIMEOUT_QUOTIENT: u32 = 4; const DOPPELGANGER_SERVICE_NAME: &str = "doppelganger"; +/// Compute attestation selection proofs this many slots before they are required. +/// +/// At start-up selection proofs will be computed with less lookahead out of necessity. +const SELECTION_PROOF_SLOT_LOOKAHEAD: u64 = 8; + +/// The attestation selection proof lookahead for those running with the --distributed flag. +const SELECTION_PROOF_SLOT_LOOKAHEAD_DVT: u64 = 1; + +/// Fraction of a slot at which attestation selection proof signing should happen (2 means half way). +const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2; + +/// Number of epochs in advance to compute sync selection proofs when not in `distributed` mode. +pub const AGGREGATION_PRE_COMPUTE_EPOCHS: u64 = 2; +/// Number of slots in advance to compute sync selection proofs when in `distributed` mode. +pub const AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED: u64 = 1; + type ValidatorStore = LighthouseValidatorStore; #[derive(Clone)] @@ -297,12 +316,15 @@ impl ProductionValidatorClient { attester_duties: slot_duration / HTTP_ATTESTER_DUTIES_TIMEOUT_QUOTIENT, attestation_subscriptions: slot_duration / HTTP_ATTESTATION_SUBSCRIPTIONS_TIMEOUT_QUOTIENT, + attestation_aggregators: slot_duration + / HTTP_ATTESTATION_AGGREGATOR_TIMEOUT_QUOTIENT, liveness: slot_duration / HTTP_LIVENESS_TIMEOUT_QUOTIENT, proposal: slot_duration / HTTP_PROPOSAL_TIMEOUT_QUOTIENT, proposer_duties: slot_duration / HTTP_PROPOSER_DUTIES_TIMEOUT_QUOTIENT, sync_committee_contribution: slot_duration / HTTP_SYNC_COMMITTEE_CONTRIBUTION_TIMEOUT_QUOTIENT, sync_duties: slot_duration / HTTP_SYNC_DUTIES_TIMEOUT_QUOTIENT, + sync_aggregators: slot_duration / HTTP_SYNC_AGGREGATOR_TIMEOUT_QUOTIENT, get_beacon_blocks_ssz: slot_duration / HTTP_GET_BEACON_BLOCK_SSZ_TIMEOUT_QUOTIENT, get_debug_beacon_states: slot_duration / HTTP_GET_DEBUG_BEACON_STATE_QUOTIENT, @@ -442,6 +464,41 @@ impl ProductionValidatorClient { validator_store.prune_slashing_protection_db(slot.epoch(E::slots_per_epoch()), true); } + // Define a config to be pass to duties_service. + // The defined config here defaults to using selections_endpoint and parallel_sign (i.e., distributed mode) + // Other DVT applications, e.g., Anchor can pass in different configs to suit different needs. + let attestation_selection_proof_config = if config.distributed { + SelectionProofConfig { + lookahead_slot: SELECTION_PROOF_SLOT_LOOKAHEAD_DVT, + computation_offset: slot_clock.slot_duration() / SELECTION_PROOF_SCHEDULE_DENOM, + selections_endpoint: true, + parallel_sign: true, + } + } else { + SelectionProofConfig { + lookahead_slot: SELECTION_PROOF_SLOT_LOOKAHEAD, + computation_offset: slot_clock.slot_duration() / SELECTION_PROOF_SCHEDULE_DENOM, + selections_endpoint: false, + parallel_sign: false, + } + }; + + let sync_selection_proof_config = if config.distributed { + SelectionProofConfig { + lookahead_slot: AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED, + computation_offset: Duration::default(), + selections_endpoint: true, + parallel_sign: true, + } + } else { + SelectionProofConfig { + lookahead_slot: E::slots_per_epoch() * AGGREGATION_PRE_COMPUTE_EPOCHS, + computation_offset: Duration::default(), + selections_endpoint: false, + parallel_sign: false, + } + }; + let duties_service = Arc::new( DutiesServiceBuilder::new() .slot_clock(slot_clock.clone()) @@ -450,7 +507,8 @@ impl ProductionValidatorClient { .spec(context.eth2_config.spec.clone()) .executor(context.executor.clone()) .enable_high_validator_count_metrics(config.enable_high_validator_count_metrics) - .distributed(config.distributed) + .attestation_selection_proof_config(attestation_selection_proof_config) + .sync_selection_proof_config(sync_selection_proof_config) .disable_attesting(config.disable_attesting) .build()?, ); diff --git a/validator_client/validator_services/src/duties_service.rs b/validator_client/validator_services/src/duties_service.rs index b4d9bae2732..34607eaa131 100644 --- a/validator_client/validator_services/src/duties_service.rs +++ b/validator_client/validator_services/src/duties_service.rs @@ -11,10 +11,14 @@ use crate::sync::poll_sync_committee_duties; use crate::sync::SyncDutiesMap; use beacon_node_fallback::{ApiTopic, BeaconNodeFallback}; use eth2::types::{ - AttesterData, BeaconCommitteeSubscription, DutiesResponse, ProposerData, StateId, ValidatorId, + AttesterData, BeaconCommitteeSelection, BeaconCommitteeSubscription, DutiesResponse, + ProposerData, StateId, ValidatorId, }; -use futures::{stream, StreamExt}; -use parking_lot::RwLock; +use futures::{ + stream::{self, FuturesUnordered}, + StreamExt, +}; +use parking_lot::{RwLock, RwLockWriteGuard}; use safe_arith::{ArithError, SafeArith}; use slot_clock::SlotClock; use std::cmp::min; @@ -32,17 +36,6 @@ use validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, Validato /// Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch. const HISTORICAL_DUTIES_EPOCHS: u64 = 2; -/// Compute attestation selection proofs this many slots before they are required. -/// -/// At start-up selection proofs will be computed with less lookahead out of necessity. -const SELECTION_PROOF_SLOT_LOOKAHEAD: u64 = 8; - -/// The attestation selection proof lookahead for those running with the --distributed flag. -const SELECTION_PROOF_SLOT_LOOKAHEAD_DVT: u64 = 1; - -/// Fraction of a slot at which selection proof signing should happen (2 means half way). -const SELECTION_PROOF_SCHEDULE_DENOM: u32 = 2; - /// Minimum number of validators for which we auto-enable per-validator metrics. /// For validators greater than this value, we need to manually set the `enable-per-validator-metrics` /// flag in the cli to enable collection of per validator metrics. @@ -121,18 +114,87 @@ pub struct SubscriptionSlots { duty_slot: Slot, } +#[derive(Copy, Clone, Debug)] +pub struct SelectionProofConfig { + pub lookahead_slot: u64, + pub computation_offset: Duration, // The seconds to compute the selection proof before a slot + pub selections_endpoint: bool, // whether to call the selections endpoint, true for DVT with middleware + pub parallel_sign: bool, // whether to sign the selection proof in parallel, true in distributed mode +} + +impl SelectionProofConfig { + // Create a default associated function to be passed in DutiesServiceBuilder::new() + fn default() -> Self { + Self { + lookahead_slot: 0, + computation_offset: Duration::default(), + selections_endpoint: false, + parallel_sign: false, + } + } +} + /// Create a selection proof for `duty`. /// /// Return `Ok(None)` if the attesting validator is not an aggregator. -async fn make_selection_proof( +async fn make_selection_proof( duty: &AttesterData, validator_store: &S, spec: &ChainSpec, + beacon_nodes: &Arc>, + config: &SelectionProofConfig, ) -> Result, Error> { - let selection_proof = validator_store - .produce_selection_proof(duty.pubkey, duty.slot) - .await - .map_err(Error::FailedToProduceSelectionProof)?; + let selection_proof = if config.selections_endpoint { + let beacon_committee_selection = BeaconCommitteeSelection { + validator_index: duty.validator_index, + slot: duty.slot, + // This is partial selection proof + selection_proof: validator_store + .produce_selection_proof(duty.pubkey, duty.slot) + .await + .map_err(Error::FailedToProduceSelectionProof)? + .into(), + }; + // Call the endpoint /eth/v1/validator/beacon_committee_selections + // by sending the BeaconCommitteeSelection that contains partial selection proof + // The middleware should return BeaconCommitteeSelection that contains full selection proof + let middleware_response = beacon_nodes + .first_success(|beacon_node| { + let selection_data = beacon_committee_selection.clone(); + debug!( + "validator_index" = duty.validator_index, + "slot" = %duty.slot, + "partial selection proof" = ?beacon_committee_selection.selection_proof, + "Sending selection to middleware" + ); + async move { + beacon_node + .post_validator_beacon_committee_selections(&[selection_data]) + .await + } + }) + .await; + + let response_data = &middleware_response + .map_err(|e| { + Error::FailedToProduceSelectionProof(ValidatorStoreError::Middleware(e.to_string())) + })? + .data[0]; + + debug!( + "validator_index" = response_data.validator_index, + "slot" = %response_data.slot, + // The selection proof from middleware response will be a full selection proof + "full selection proof" = ?response_data.selection_proof, + "Received selection from middleware" + ); + SelectionProof::from(response_data.selection_proof.clone()) + } else { + validator_store + .produce_selection_proof(duty.pubkey, duty.slot) + .await + .map_err(Error::FailedToProduceSelectionProof)? + }; selection_proof .is_aggregator(duty.committee_length as usize, spec) @@ -217,8 +279,10 @@ pub struct DutiesServiceBuilder { spec: Option>, //// Whether we permit large validator counts in the metrics. enable_high_validator_count_metrics: bool, - /// If this validator is running in distributed mode. - distributed: bool, + /// Create attestation selection proof config + attestation_selection_proof_config: SelectionProofConfig, + /// Create sync selection proof config + sync_selection_proof_config: SelectionProofConfig, disable_attesting: bool, } @@ -237,7 +301,8 @@ impl DutiesServiceBuilder { executor: None, spec: None, enable_high_validator_count_metrics: false, - distributed: false, + attestation_selection_proof_config: SelectionProofConfig::default(), + sync_selection_proof_config: SelectionProofConfig::default(), disable_attesting: false, } } @@ -275,8 +340,19 @@ impl DutiesServiceBuilder { self } - pub fn distributed(mut self, distributed: bool) -> Self { - self.distributed = distributed; + pub fn attestation_selection_proof_config( + mut self, + attestation_selection_proof_config: SelectionProofConfig, + ) -> Self { + self.attestation_selection_proof_config = attestation_selection_proof_config; + self + } + + pub fn sync_selection_proof_config( + mut self, + sync_selection_proof_config: SelectionProofConfig, + ) -> Self { + self.sync_selection_proof_config = sync_selection_proof_config; self } @@ -289,7 +365,7 @@ impl DutiesServiceBuilder { Ok(DutiesService { attesters: Default::default(), proposers: Default::default(), - sync_duties: SyncDutiesMap::new(self.distributed), + sync_duties: SyncDutiesMap::new(self.sync_selection_proof_config), validator_store: self .validator_store .ok_or("Cannot build DutiesService without validator_store")?, @@ -305,7 +381,7 @@ impl DutiesServiceBuilder { .ok_or("Cannot build DutiesService without executor")?, spec: self.spec.ok_or("Cannot build DutiesService without spec")?, enable_high_validator_count_metrics: self.enable_high_validator_count_metrics, - distributed: self.distributed, + selection_proof_config: self.attestation_selection_proof_config, disable_attesting: self.disable_attesting, }) } @@ -332,10 +408,10 @@ pub struct DutiesService { pub executor: TaskExecutor, /// The current chain spec. pub spec: Arc, - //// Whether we permit large validator counts in the metrics. + /// Whether we permit large validator counts in the metrics. pub enable_high_validator_count_metrics: bool, - /// If this validator is running in distributed mode. - pub distributed: bool, + /// Pass the config for distributed or non-distributed mode. + pub selection_proof_config: SelectionProofConfig, pub disable_attesting: bool, } @@ -1119,6 +1195,75 @@ async fn post_validator_duties_attester( + attesters: &mut RwLockWriteGuard, + result: Result<(AttesterData, Option), Error>, + dependent_root: Hash256, + current_slot: Slot, +) -> bool { + let (duty, selection_proof) = match result { + Ok(duty_and_proof) => duty_and_proof, + Err(Error::FailedToProduceSelectionProof(ValidatorStoreError::UnknownPubkey(pubkey))) => { + // A pubkey can be missing when a validator was recently removed via the API. + warn!( + info = "A validator may have recently been removed from this VC", + ?pubkey, + "Missing pubkey for duty and proof" + ); + // Do not abort the entire batch for a single failure. + // return true means continue processing duties. + return true; + } + Err(e) => { + error!( + error = ?e, + msg = "may impair attestation duties", + "Failed to produce duty and proof" + ); + return true; + } + }; + + let attester_map = attesters.entry(duty.pubkey).or_default(); + let epoch = duty.slot.epoch(S::E::slots_per_epoch()); + match attester_map.entry(epoch) { + hash_map::Entry::Occupied(mut entry) => { + // No need to update duties for which no proof was computed. + let Some(selection_proof) = selection_proof else { + return true; + }; + + let (existing_dependent_root, existing_duty) = entry.get_mut(); + + if *existing_dependent_root == dependent_root { + // Replace existing proof. + existing_duty.selection_proof = Some(selection_proof); + true + } else { + // Our selection proofs are no longer relevant due to a reorg, abandon this entire background process. + debug!( + reason = "re-org", + "Stopping selection proof background task" + ); + false + } + } + + hash_map::Entry::Vacant(entry) => { + // This probably shouldn't happen, but we have enough info to fill in the entry so we may as well. + let subscription_slots = SubscriptionSlots::new(duty.slot, current_slot); + let duty_and_proof = DutyAndProof { + duty, + selection_proof, + subscription_slots, + }; + entry.insert((dependent_root, duty_and_proof)); + true + } + } +} + /// Compute the attestation selection proofs for the `duties` and add them to the `attesters` map. /// /// Duties are computed in batches each slot. If a re-org is detected then the process will @@ -1138,26 +1283,33 @@ async fn fill_in_selection_proofs(); @@ -1170,87 +1322,69 @@ async fn fill_in_selection_proofs>() - .await; - - // Add to attesters store. - let mut attesters = duties_service.attesters.write(); - for result in duty_and_proof_results { - let (duty, selection_proof) = match result { - Ok(duty_and_proof) => duty_and_proof, - Err(Error::FailedToProduceSelectionProof( - ValidatorStoreError::UnknownPubkey(pubkey), - )) => { - // A pubkey can be missing when a validator was recently - // removed via the API. - warn!( - info = "a validator may have recently been removed from this VC", - ?pubkey, - "Missing pubkey for duty and proof" - ); - // Do not abort the entire batch for a single failure. - continue; - } - Err(e) => { - error!( - error = ?e, - msg = "may impair attestation duties", - "Failed to produce duty and proof" - ); - // Do not abort the entire batch for a single failure. - continue; - } - }; - - let attester_map = attesters.entry(duty.pubkey).or_default(); - let epoch = duty.slot.epoch(S::E::slots_per_epoch()); - match attester_map.entry(epoch) { - hash_map::Entry::Occupied(mut entry) => { - // No need to update duties for which no proof was computed. - let Some(selection_proof) = selection_proof else { - continue; - }; - - let (existing_dependent_root, existing_duty) = entry.get_mut(); - - if *existing_dependent_root == dependent_root { - // Replace existing proof. - existing_duty.selection_proof = Some(selection_proof); - } else { - // Our selection proofs are no longer relevant due to a reorg, abandon - // this entire background process. - debug!( - reason = "re-org", - "Stopping selection proof background task" - ); - return; - } + // In distributed case, we want to send all partial selection proofs to the middleware to determine aggregation duties, + // as the middleware will need to have a threshold of partial selection proofs to be able to return the full selection proof + // Thus, sign selection proofs in parallel in distributed case; Otherwise, sign them serially in non-distributed (normal) case + if duties_service.selection_proof_config.parallel_sign { + let mut duty_and_proof_results = relevant_duties + .into_values() + .flatten() + .map(|duty| async { + let opt_selection_proof = make_selection_proof( + &duty, + duties_service.validator_store.as_ref(), + &duties_service.spec, + &duties_service.beacon_nodes, + &duties_service.selection_proof_config, + ) + .await?; + Ok((duty, opt_selection_proof)) + }) + .collect::>(); + + while let Some(result) = duty_and_proof_results.next().await { + let mut attesters = duties_service.attesters.write(); + // if process_duty_and_proof returns false, exit the loop + if !process_duty_and_proof::( + &mut attesters, + result, + dependent_root, + current_slot, + ) { + return; } - hash_map::Entry::Vacant(entry) => { - // This probably shouldn't happen, but we have enough info to fill in the - // entry so we may as well. - let subscription_slots = SubscriptionSlots::new(duty.slot, current_slot); - let duty_and_proof = DutyAndProof { - duty, - selection_proof, - subscription_slots, - }; - entry.insert((dependent_root, duty_and_proof)); + } + } else { + // In normal (non-distributed case), sign selection proofs serially + let duty_and_proof_results = stream::iter(relevant_duties.into_values().flatten()) + .then(|duty| async { + let opt_selection_proof = make_selection_proof( + &duty, + duties_service.validator_store.as_ref(), + &duties_service.spec, + &duties_service.beacon_nodes, + &duties_service.selection_proof_config, + ) + .await?; + Ok((duty, opt_selection_proof)) + }) + .collect::>() + .await; + + // Add to attesters store. + let mut attesters = duties_service.attesters.write(); + for result in duty_and_proof_results { + if !process_duty_and_proof::( + &mut attesters, + result, + dependent_root, + current_slot, + ) { + return; } } - } - drop(attesters); + drop(attesters); + }; let time_taken_ms = Duration::from_secs_f64(timer.map_or(0.0, |t| t.stop_and_record())).as_millis(); diff --git a/validator_client/validator_services/src/sync.rs b/validator_client/validator_services/src/sync.rs index c13b70db802..cdda9a66902 100644 --- a/validator_client/validator_services/src/sync.rs +++ b/validator_client/validator_services/src/sync.rs @@ -1,19 +1,16 @@ -use crate::duties_service::{DutiesService, Error}; +use crate::duties_service::{DutiesService, Error, SelectionProofConfig}; +use eth2::types::{Signature, SyncCommitteeSelection}; use futures::future::join_all; +use futures::stream::{FuturesUnordered, StreamExt}; use logging::crit; use parking_lot::{MappedRwLockReadGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; use slot_clock::SlotClock; use std::collections::{HashMap, HashSet}; use std::sync::Arc; -use tracing::{debug, info, warn}; +use tracing::{debug, error, info, warn}; use types::{ChainSpec, EthSpec, PublicKeyBytes, Slot, SyncDuty, SyncSelectionProof, SyncSubnetId}; use validator_store::{DoppelgangerStatus, Error as ValidatorStoreError, ValidatorStore}; -/// Number of epochs in advance to compute selection proofs when not in `distributed` mode. -pub const AGGREGATION_PRE_COMPUTE_EPOCHS: u64 = 2; -/// Number of slots in advance to compute selection proofs when in `distributed` mode. -pub const AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED: u64 = 1; - /// Top-level data-structure containing sync duty information. /// /// This data is structured as a series of nested `HashMap`s wrapped in `RwLock`s. Fine-grained @@ -30,7 +27,7 @@ pub struct SyncDutiesMap { /// Map from sync committee period to duties for members of that sync committee. committees: RwLock>, /// Whether we are in `distributed` mode and using reduced lookahead for aggregate pre-compute. - distributed: bool, + pub selection_proof_config: SelectionProofConfig, } /// Duties for a single sync committee period. @@ -79,10 +76,10 @@ pub struct SlotDuties { } impl SyncDutiesMap { - pub fn new(distributed: bool) -> Self { + pub fn new(selection_proof_config: SelectionProofConfig) -> Self { Self { committees: RwLock::new(HashMap::new()), - distributed, + selection_proof_config, } } @@ -99,15 +96,6 @@ impl SyncDutiesMap { }) } - /// Number of slots in advance to compute selection proofs - fn aggregation_pre_compute_slots(&self) -> u64 { - if self.distributed { - AGGREGATION_PRE_COMPUTE_SLOTS_DISTRIBUTED - } else { - E::slots_per_epoch() * AGGREGATION_PRE_COMPUTE_EPOCHS - } - } - /// Prepare for pre-computation of selection proofs for `committee_period`. /// /// Return the slot up to which proofs should be pre-computed, as well as a vec of @@ -123,7 +111,7 @@ impl SyncDutiesMap { current_slot, first_slot_of_period::(committee_period, spec), ); - let pre_compute_lookahead_slots = self.aggregation_pre_compute_slots::(); + let pre_compute_lookahead_slots = self.selection_proof_config.lookahead_slot; let pre_compute_slot = std::cmp::min( current_slot + pre_compute_lookahead_slots, last_slot_of_period::(committee_period, spec), @@ -377,7 +365,7 @@ pub async fn poll_sync_committee_duties(); + let aggregate_pre_compute_lookahead_slots = sync_duties.selection_proof_config.lookahead_slot; if (current_slot + aggregate_pre_compute_lookahead_slots) .epoch(S::E::slots_per_epoch()) .sync_committee_period(spec)? @@ -498,6 +486,108 @@ pub async fn poll_sync_committee_duties_for_period( + duties_service: &Arc>, + duty: &SyncDuty, + proof_slot: Slot, + subnet_id: SyncSubnetId, +) -> Option { + let sync_selection_proof = duties_service + .validator_store + .produce_sync_selection_proof(&duty.pubkey, proof_slot, subnet_id) + .await; + + let selection_proof = match sync_selection_proof { + Ok(proof) => proof, + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + // A pubkey can be missing when a validator was recently removed via the API + debug!( + ?pubkey, + "slot" = %proof_slot, + "Missing pubkey for sync selection proof"); + return None; + } + Err(e) => { + warn!( + "error" = ?e, + "pubkey" = ?duty.pubkey, + "slot" = %proof_slot, + "Unable to sign selection proof" + ); + return None; + } + }; + + // In DVT with middleware, when we want to call the selections endpoint + if duties_service + .sync_duties + .selection_proof_config + .selections_endpoint + { + debug!( + "validator_index" = duty.validator_index, + "slot" = %proof_slot, + "subcommittee_index" = *subnet_id, + // This is partial selection proof + "partial selection proof" = ?Signature::from(selection_proof.clone()), + "Sending sync selection to middleware" + ); + + let sync_committee_selection = SyncCommitteeSelection { + validator_index: duty.validator_index, + slot: proof_slot, + subcommittee_index: *subnet_id, + selection_proof: selection_proof.clone().into(), + }; + + // Call the endpoint /eth/v1/validator/sync_committee_selections + // by sending the SyncCommitteeSelection that contains partial sync selection proof + // The middleware should return SyncCommitteeSelection that contains full sync selection proof + let middleware_response = duties_service + .beacon_nodes + .first_success(|beacon_node| { + let selection_data = sync_committee_selection.clone(); + async move { + beacon_node + .post_validator_sync_committee_selections(&[selection_data]) + .await + } + }) + .await; + + match middleware_response { + Ok(response) => { + let response_data = &response.data[0]; + debug!( + "validator_index" = response_data.validator_index, + "slot" = %response_data.slot, + "subcommittee_index" = response_data.subcommittee_index, + // The selection proof from middleware response will be a full selection proof + "full selection proof" = ?response_data.selection_proof, + "Received sync selection from middleware" + ); + + // Convert the response to a SyncSelectionProof + let full_selection_proof = + SyncSelectionProof::from(response_data.selection_proof.clone()); + Some(full_selection_proof) + } + Err(e) => { + error!( + "error" = %e, + %proof_slot, + "Failed to get sync selection proofs from middleware" + ); + None + } + } + } else { + // In non-distributed mode, the selection_proof is already a full selection proof + Some(selection_proof) + } +} + pub async fn fill_in_aggregation_proofs( duties_service: Arc>, pre_compute_duties: &[(Slot, SyncDuty)], @@ -505,127 +595,188 @@ pub async fn fill_in_aggregation_proofs() { - Ok(subnet_ids) => subnet_ids, - Err(e) => { - crit!( - error = ?e, - "Arithmetic error computing subnet IDs" - ); - continue; - } - }; + // For distributed mode + if duties_service + .sync_duties + .selection_proof_config + .parallel_sign + { + let mut futures_unordered = FuturesUnordered::new(); + + for (_, duty) in pre_compute_duties { + let subnet_ids = match duty.subnet_ids::() { + Ok(subnet_ids) => subnet_ids, + Err(e) => { + crit!( + "error" = ?e, + "Arithmetic error computing subnet IDs" + ); + continue; + } + }; - // Create futures to produce proofs. - let duties_service_ref = &duties_service; - let futures = subnet_ids.iter().map(|subnet_id| async move { // Construct proof for prior slot. let proof_slot = slot - 1; - let proof = match duties_service_ref - .validator_store - .produce_sync_selection_proof(&duty.pubkey, proof_slot, *subnet_id) - .await - { - Ok(proof) => proof, - Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { - // A pubkey can be missing when a validator was recently - // removed via the API. - debug!( - ?pubkey, - pubkey = ?duty.pubkey, - slot = %proof_slot, - "Missing pubkey for sync selection proof" - ); - return None; + // Calling the make_sync_selection_proof will return a full selection proof + for &subnet_id in &subnet_ids { + let duties_service = duties_service.clone(); + futures_unordered.push(async move { + let result = + make_sync_selection_proof(&duties_service, duty, proof_slot, subnet_id) + .await; + + result.map(|proof| (duty.validator_index, proof_slot, subnet_id, proof)) + }); + } + } + + while let Some(result) = futures_unordered.next().await { + if let Some((validator_index, proof_slot, subnet_id, proof)) = result { + let sync_map = duties_service.sync_duties.committees.read(); + let Some(committee_duties) = sync_map.get(&sync_committee_period) else { + debug!("period" = sync_committee_period, "Missing sync duties"); + continue; + }; + + let validators = committee_duties.validators.read(); + + // Check if the validator is an aggregator + match proof.is_aggregator::() { + Ok(true) => { + if let Some(Some(duty)) = validators.get(&validator_index) { + debug!( + validator_index, + "slot" = %proof_slot, + "subcommittee_index" = *subnet_id, + // log full selection proof for debugging + "full selection proof" = ?proof, + "Validator is sync aggregator" + ); + + // Store the proof + duty.aggregation_duties + .proofs + .write() + .insert((proof_slot, subnet_id), proof); + } + } + Ok(false) => {} // Not an aggregator + Err(e) => { + warn!( + validator_index, + %slot, + "error" = ?e, + "Error determining is_aggregator" + ); + } } + } + } + } else { + // For non-distributed mode + debug!( + period = sync_committee_period, + %current_slot, + %pre_compute_slot, + "Calculating sync selection proofs" + ); + + let mut validator_proofs = vec![]; + for (validator_start_slot, duty) in pre_compute_duties { + // Proofs are already known at this slot for this validator. + if slot < *validator_start_slot { + continue; + } + + let subnet_ids = match duty.subnet_ids::() { + Ok(subnet_ids) => subnet_ids, Err(e) => { - warn!( + crit!( error = ?e, - pubkey = ?duty.pubkey, - slot = %proof_slot, - "Unable to sign selection proof" + "Arithmetic error computing subnet IDs" ); - return None; + continue; } }; - match proof.is_aggregator::() { - Ok(true) => { - debug!( - validator_index = duty.validator_index, - slot = %proof_slot, - %subnet_id, - "Validator is sync aggregator" - ); - Some(((proof_slot, *subnet_id), proof)) - } - Ok(false) => None, - Err(e) => { - warn!( - pubkey = ?duty.pubkey, - slot = %proof_slot, - error = ?e, - "Error determining is_aggregator" - ); - None + // Create futures to produce proofs. + let duties_service_ref = &duties_service; + let futures = subnet_ids.iter().map(|subnet_id| async move { + // Construct proof for prior slot. + let proof_slot = slot - 1; + + let proof = + make_sync_selection_proof(duties_service_ref, duty, proof_slot, *subnet_id) + .await; + + match proof { + Some(proof) => match proof.is_aggregator::() { + Ok(true) => { + debug!( + validator_index = duty.validator_index, + slot = %proof_slot, + %subnet_id, + "Validator is sync aggregator" + ); + Some(((proof_slot, *subnet_id), proof)) + } + Ok(false) => None, + Err(e) => { + warn!( + pubkey = ?duty.pubkey, + slot = %proof_slot, + error = ?e, + "Error determining is_aggregator" + ); + None + } + }, + + None => None, } - } - }); + }); - // Execute all the futures in parallel, collecting any successful results. - let proofs = join_all(futures) - .await - .into_iter() - .flatten() - .collect::>(); + // Execute all the futures in parallel, collecting any successful results. + let proofs = join_all(futures) + .await + .into_iter() + .flatten() + .collect::>(); - validator_proofs.push((duty.validator_index, proofs)); - } + validator_proofs.push((duty.validator_index, proofs)); + } - // Add to global storage (we add regularly so the proofs can be used ASAP). - let sync_map = duties_service.sync_duties.committees.read(); - let Some(committee_duties) = sync_map.get(&sync_committee_period) else { - debug!(period = sync_committee_period, "Missing sync duties"); - continue; - }; - let validators = committee_duties.validators.read(); - let num_validators_updated = validator_proofs.len(); + // Add to global storage (we add regularly so the proofs can be used ASAP). + let sync_map = duties_service.sync_duties.committees.read(); + let Some(committee_duties) = sync_map.get(&sync_committee_period) else { + debug!(period = sync_committee_period, "Missing sync duties"); + continue; + }; + let validators = committee_duties.validators.read(); + let num_validators_updated = validator_proofs.len(); + + for (validator_index, proofs) in validator_proofs { + if let Some(Some(duty)) = validators.get(&validator_index) { + duty.aggregation_duties.proofs.write().extend(proofs); + } else { + debug!( + validator_index, + period = sync_committee_period, + "Missing sync duty to update" + ); + } + } - for (validator_index, proofs) in validator_proofs { - if let Some(Some(duty)) = validators.get(&validator_index) { - duty.aggregation_duties.proofs.write().extend(proofs); - } else { + if num_validators_updated > 0 { debug!( - validator_index, - period = sync_committee_period, - "Missing sync duty to update" + %slot, + updated_validators = num_validators_updated, + "Finished computing sync selection proofs" ); } } - - if num_validators_updated > 0 { - debug!( - %slot, - updated_validators = num_validators_updated, - "Finished computing sync selection proofs" - ); - } } } diff --git a/validator_client/validator_store/src/lib.rs b/validator_client/validator_store/src/lib.rs index 9de3a6d66a0..f4ef9416765 100644 --- a/validator_client/validator_store/src/lib.rs +++ b/validator_client/validator_store/src/lib.rs @@ -20,6 +20,7 @@ pub enum Error { GreaterThanCurrentEpoch { epoch: Epoch, current_epoch: Epoch }, UnableToSignAttestation(AttestationError), SpecificError(T), + Middleware(String), } impl From for Error {