diff --git a/Cargo.lock b/Cargo.lock index b275172a..939acec2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5241,17 +5241,22 @@ dependencies = [ name = "message_validator" version = "0.1.0" dependencies = [ + "beacon_node_fallback", "bls", "dashmap", "database", + "eth2", "ethereum_ssz", "hex", "libp2p-gossipsub", "openssl", + "parking_lot", "processor", + "safe_arith", "sha2 0.10.8", "slot_clock", "ssv_types", + "task_executor", "thiserror 2.0.12", "tokio", "tracing", diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index fe26b015..b1fe812c 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -28,7 +28,7 @@ use eth2::{ use keygen::{encryption::decrypt, run_keygen, Keygen}; use message_receiver::NetworkMessageReceiver; use message_sender::{impostor::ImpostorMessageSender, MessageSender, NetworkMessageSender}; -use message_validator::Validator; +use message_validator::{DutiesTracker, Validator}; use network::Network; use openssl::{pkey::Private, rsa::Rsa}; use parking_lot::RwLock; @@ -381,9 +381,20 @@ impl Client { // Network sender/receiver let (network_tx, network_rx) = mpsc::channel::<(SubnetId, Vec)>(9001); + let duties_tracker = Arc::new(DutiesTracker::new( + beacon_nodes.clone(), + spec.clone(), + E::slots_per_epoch(), + slot_clock.clone(), + database.watch(), + )); + duties_tracker.clone().start(executor.clone()); + let message_validator = Arc::new(Validator::new( database.watch(), E::slots_per_epoch(), + spec.epochs_per_sync_committee_period.as_u64(), + duties_tracker.clone(), slot_clock.clone(), )); diff --git a/anchor/common/ssv_types/src/cluster.rs b/anchor/common/ssv_types/src/cluster.rs index 73e200a0..22c87f6e 100644 --- a/anchor/common/ssv_types/src/cluster.rs +++ b/anchor/common/ssv_types/src/cluster.rs @@ -70,6 +70,12 @@ pub struct ClusterMember { #[ssz(struct_behaviour = "transparent")] pub struct ValidatorIndex(pub usize); +impl From for u64 { + fn from(value: ValidatorIndex) -> Self { + value.0 as u64 + } +} + /// General Metadata about a Validator #[derive(Debug, Clone)] pub struct ValidatorMetadata { diff --git a/anchor/database/src/state.rs b/anchor/database/src/state.rs index 8d8832e7..1d03a635 100644 --- a/anchor/database/src/state.rs +++ b/anchor/database/src/state.rs @@ -368,4 +368,12 @@ impl NetworkState { validator_indices: validator_index.map(|idx| vec![idx]).unwrap_or(vec![]), }) } + + pub fn validator_indices(&self) -> Vec { + self.multi_state + .validator_metadata + .values() + .filter_map(|metadata| metadata.index.map(|idx| idx.into())) + .collect() + } } diff --git a/anchor/message_receiver/src/manager.rs b/anchor/message_receiver/src/manager.rs index 7b4ae013..6b427305 100644 --- a/anchor/message_receiver/src/manager.rs +++ b/anchor/message_receiver/src/manager.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use database::{NetworkState, NonUniqueIndex, UniqueIndex}; use gossipsub::{Message, MessageAcceptance, MessageId}; use libp2p::PeerId; -use message_validator::{ValidatedMessage, ValidatedSSVMessage, Validator}; +use message_validator::{DutiesProvider, ValidatedMessage, ValidatedSSVMessage, Validator}; use qbft_manager::QbftManager; use signature_collector::SignatureCollectorManager; use slot_clock::SlotClock; @@ -22,23 +22,23 @@ pub struct Outcome { } /// A message receiver that passes messages to responsible managers. -pub struct NetworkMessageReceiver { +pub struct NetworkMessageReceiver { processor: processor::Senders, qbft_manager: Arc, signature_collector: Arc, network_state_rx: watch::Receiver, outcome_tx: mpsc::Sender, - validator: Arc>, + validator: Arc>, } -impl NetworkMessageReceiver { +impl NetworkMessageReceiver { pub fn new( processor: processor::Senders, qbft_manager: Arc, signature_collector: Arc, network_state_rx: watch::Receiver, outcome_tx: mpsc::Sender, - validator: Arc>, + validator: Arc>, ) -> Arc { Arc::new(Self { processor, @@ -51,7 +51,9 @@ impl NetworkMessageReceiver { } } -impl MessageReceiver for Arc> { +impl MessageReceiver + for Arc> +{ fn receive( &self, propagation_source: PeerId, @@ -92,12 +94,12 @@ impl MessageReceiver for Arc> } = match result { Ok(message) => message, Err(failure) => { - debug!(?failure, "Validation failure"); + debug!(gosspisub_message_id = ?message_id, ?failure, "Validation failure"); return; } }; - let msg_id = signed_ssv_message.ssv_message().msg_id(); + let msg_id = signed_ssv_message.ssv_message().msg_id().clone(); match msg_id.duty_executor() { Some(DutyExecutor::Validator(validator)) => { @@ -109,7 +111,7 @@ impl MessageReceiver for Arc> .is_none() { // We are not a signer for this validator, return without passing. - trace!(?validator, ?msg_id, "Not interested"); + trace!(gosspisub_message_id = ?message_id, ssv_msg_id = ?msg_id, ?validator, "Not interested"); return; } } @@ -123,19 +125,19 @@ impl MessageReceiver for Arc> let committee = state.clusters().get_all_by(&committee_id); // We only need to check one cluster, as all clusters will have the same set // of operators. - let is_member = committee + let is_member = committee.clone() .and_then(|mut v| v.pop()) .map(|c| c.cluster_members.contains(&own_id)) .unwrap_or(false); if is_member { // We are not a member for this committee, return without passing. - trace!(?committee_id, ?msg_id, "Not interested"); + trace!(gosspisub_message_id = ?message_id, ssv_msg_id = ?msg_id, ?committee, "Not interested"); return; } } None => { - error!(?msg_id, "Invalid message ID"); + error!(gosspisub_message_id = ?message_id, ssv_msg_id = ?msg_id, "Invalid message ID"); return; } } @@ -146,7 +148,7 @@ impl MessageReceiver for Arc> .qbft_manager .receive_data(signed_ssv_message, qbft_message) { - error!(?err, "Unable to receive QBFT message"); + error!(gosspisub_message_id = ?message_id, ssv_msg_id = ?msg_id, ?err, "Unable to receive QBFT message"); } } ValidatedSSVMessage::PartialSignatureMessages(messages) => { @@ -154,7 +156,7 @@ impl MessageReceiver for Arc> .signature_collector .receive_partial_signatures(messages) { - error!(?err, "Unable to receive partial signature message"); + error!(gosspisub_message_id = ?message_id, ssv_msg_id = ?msg_id, ?err, "Unable to receive partial signature message"); } } } diff --git a/anchor/message_sender/src/network.rs b/anchor/message_sender/src/network.rs index 2b5ed0db..a5f7e442 100644 --- a/anchor/message_sender/src/network.rs +++ b/anchor/message_sender/src/network.rs @@ -1,6 +1,6 @@ use std::sync::Arc; -use message_validator::Validator; +use message_validator::{DutiesProvider, Validator}; use openssl::{ error::ErrorStack, hash::MessageDigest, @@ -22,16 +22,16 @@ use crate::{Error, MessageCallback, MessageSender}; const SIGNER_NAME: &str = "message_sign_and_send"; const SENDER_NAME: &str = "message_send"; -pub struct NetworkMessageSender { +pub struct NetworkMessageSender { processor: processor::Senders, network_tx: mpsc::Sender<(SubnetId, Vec)>, private_key: PKey, operator_id: OperatorId, - validator: Option>>, + validator: Option>>, subnet_count: usize, } -impl MessageSender for Arc> { +impl MessageSender for Arc> { fn sign_and_send( &self, message: UnsignedSSVMessage, @@ -94,13 +94,13 @@ impl MessageSender for Arc> { } } -impl NetworkMessageSender { +impl NetworkMessageSender { pub fn new( processor: processor::Senders, network_tx: mpsc::Sender<(SubnetId, Vec)>, private_key: Rsa, operator_id: OperatorId, - validator: Option>>, + validator: Option>>, subnet_count: usize, ) -> Result, String> { let private_key = PKey::from_rsa(private_key) diff --git a/anchor/message_validator/Cargo.toml b/anchor/message_validator/Cargo.toml index 0425a26d..1db0d3f8 100644 --- a/anchor/message_validator/Cargo.toml +++ b/anchor/message_validator/Cargo.toml @@ -5,20 +5,26 @@ edition = { workspace = true } authors = ["Sigma Prime "] [dependencies] +beacon_node_fallback = { workspace = true } +bls = { workspace = true } dashmap = { workspace = true } database = { workspace = true } +eth2 = { workspace = true } ethereum_ssz = { workspace = true } gossipsub = { workspace = true } hex = { workspace = true } openssl = { workspace = true } +parking_lot = { workspace = true } processor = { workspace = true } +safe_arith = { workspace = true } sha2 = { workspace = true } slot_clock = { workspace = true } ssv_types = { workspace = true } +task_executor = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tracing = { workspace = true } +types = { workspace = true } [dev-dependencies] bls = { workspace = true } -types = { workspace = true } diff --git a/anchor/message_validator/src/consensus_message.rs b/anchor/message_validator/src/consensus_message.rs index da8ad67b..b3719ba7 100644 --- a/anchor/message_validator/src/consensus_message.rs +++ b/anchor/message_validator/src/consensus_message.rs @@ -1,27 +1,27 @@ -use std::{ - convert::Into, - time::{Duration, SystemTime, UNIX_EPOCH}, -}; +use std::{convert::Into, sync::Arc, time::Duration}; use slot_clock::SlotClock; use ssv_types::{ consensus::{QbftMessage, QbftMessageType}, message::SignedSSVMessage, msgid::Role, - CommitteeInfo, IndexSet, OperatorId, Round, Slot, VariableList, + CommitteeInfo, IndexSet, OperatorId, Round, Slot, ValidatorIndex, VariableList, }; use ssz::Decode; +use ValidationFailure::EarlySlotMessage; use crate::{ - compute_quorum_size, consensus_state::ConsensusState, hash_data, verify_message_signatures, + compute_quorum_size, + consensus_state::{ConsensusState, OperatorState}, + duties::DutiesProvider, + hash_data, slot_start_time, sync_committee_period, verify_message_signatures, ValidatedSSVMessage, ValidationContext, ValidationFailure, }; pub(crate) fn validate_consensus_message( - validation_context: ValidationContext, + validation_context: ValidationContext, consensus_state: &mut ConsensusState, - slots_per_epoch: u64, - slot_clock: impl SlotClock, + duty_provider: Arc, ) -> Result { // Decode message to QbftMessage let consensus_message = match QbftMessage::from_ssz_bytes( @@ -38,11 +38,13 @@ pub(crate) fn validate_consensus_message( validation_context.committee_info, )?; - validate_qbft_logic( + validate_qbft_logic(&validation_context, &consensus_message, consensus_state)?; + + validate_qbft_message_by_duty_logic( &validation_context, &consensus_message, consensus_state, - slot_clock, + duty_provider, )?; verify_message_signatures( @@ -53,7 +55,7 @@ pub(crate) fn validate_consensus_message( consensus_state.update( validation_context.signed_ssv_message, &consensus_message, - slots_per_epoch, + validation_context.slots_per_epoch, ); // Return the validated message @@ -169,10 +171,9 @@ pub(crate) fn validate_justifications( #[allow(clippy::comparison_chain)] pub(crate) fn validate_qbft_logic( - validation_context: &ValidationContext, + validation_context: &ValidationContext, consensus_message: &QbftMessage, consensus_state: &mut ConsensusState, - slot_clock: impl SlotClock, ) -> Result<(), ValidationFailure> { let signed_ssv_message = validation_context.signed_ssv_message; @@ -247,12 +248,7 @@ pub(crate) fn validate_qbft_logic( // Rule: Round must be within allowed spread from current time if signers.len() == 1 { - validate_round_in_allowed_spread( - consensus_message, - validation_context.role, - validation_context.received_at, - slot_clock, - )?; + validate_round_in_allowed_spread(consensus_message, validation_context)?; } Ok(()) @@ -284,19 +280,16 @@ fn round_robin_proposer( /// Validate that the message round is within the allowed spread fn validate_round_in_allowed_spread( consensus_message: &QbftMessage, - role: Role, - received_at: SystemTime, - slot_clock: impl SlotClock, + validation_context: &ValidationContext, ) -> Result<(), ValidationFailure> { // Get the slot let slot = Slot::new(consensus_message.height); - let slot_start_time = match slot_clock.start_of(slot) { - Some(time) => UNIX_EPOCH + time, - None => return Err(ValidationFailure::SlotStartTimeNotFound), - }; + let slot_start_time = slot_start_time(slot, validation_context.slot_clock.clone()) + .map_err(|_| ValidationFailure::SlotStartTimeNotFound { slot })?; - let (since_slot_start, estimated_round) = if received_at > slot_start_time { - let duration = received_at + let (since_slot_start, estimated_round) = if validation_context.received_at > slot_start_time { + let duration = validation_context + .received_at .duration_since(slot_start_time) .unwrap_or_default(); (duration, current_estimated_round(duration)) @@ -311,10 +304,13 @@ fn validate_round_in_allowed_spread( if consensus_message.round < lowest_allowed || consensus_message.round > highest_allowed.into() { return Err(ValidationFailure::EstimatedRoundNotInAllowedSpread { - got: format!("{} ({} role)", consensus_message.round, role), + got: format!( + "{} ({} role)", + consensus_message.round, validation_context.role + ), want: format!( "between {} and {} ({} role) / {:?} passed", - lowest_allowed, highest_allowed, role, since_slot_start + lowest_allowed, highest_allowed, validation_context.role, since_slot_start ), }); } @@ -355,11 +351,263 @@ fn current_estimated_round(since_slot_start: Duration) -> Round { (QUICK_TIMEOUT_THRESHOLD + FIRST_ROUND + delta_slow).into() } +/// clockErrorTolerance is the maximum amount of clock error we expect to see between nodes. +const CLOCK_ERROR_TOLERANCE: Duration = Duration::from_millis(50); +/// lateMessageMargin is the duration past a message's TTL in which it is still considered valid. +const LATE_MESSAGE_MARGIN: Duration = Duration::from_secs(3); +const LATE_SLOT_ALLOWANCE: u64 = 2; + +/// Validates QBFT messages based on beacon chain duties +pub(crate) fn validate_qbft_message_by_duty_logic( + validation_context: &ValidationContext, + consensus_message: &QbftMessage, + consensus_state: &mut ConsensusState, + duty_provider: Arc, +) -> Result<(), ValidationFailure> { + let role = validation_context.role; + let signed_ssv_message = validation_context.signed_ssv_message; + + // Rule: Height must not be "old". I.e., signer must not have already advanced to a later slot. + if role != Role::Committee { + for &signer in signed_ssv_message.operator_ids() { + let signer_state = consensus_state.get_or_create_operator(&signer); + let max_slot = signer_state.max_slot(); + if max_slot > consensus_message.height { + return Err(ValidationFailure::SlotAlreadyAdvanced { + got: consensus_message.height, + want: max_slot.as_u64(), + }); + } + } + } + + let msg_slot = Slot::new(consensus_message.height); + let randao_msg = false; // Default to false as in the Go code + + validate_beacon_duty( + validation_context, + msg_slot, + randao_msg, + duty_provider.clone(), + )?; + + // Rule: current slot(height) must be between duty's starting slot and: + // - duty's starting slot + 34 (committee and aggregation) + // - duty's starting slot + 3 (other types) + validate_slot_time(msg_slot, validation_context)?; + + // Rule: valid number of duties per epoch + for &signer in signed_ssv_message.operator_ids() { + let signer_state = consensus_state.get_or_create_operator(&signer); + validate_duty_count( + validation_context, + msg_slot, + signer_state, + duty_provider.clone(), + )?; + } + + Ok(()) +} + +/// Validates if a validator is assigned to a specific duty +pub(crate) fn validate_beacon_duty( + validation_context: &ValidationContext, + slot: Slot, + randao_msg: bool, + duty_provider: Arc, +) -> Result<(), ValidationFailure> { + let role = validation_context.role; + let epoch = slot.epoch(validation_context.slots_per_epoch); + // Rule: For a proposal duty message, check if the validator is assigned to it + if role == Role::Proposer { + // Tolerate missing duties for RANDAO signatures during the first slot of an epoch, + // while duties are still being fetched from the Beacon node. + + let is_first_slot_of_epoch = epoch.start_slot(validation_context.slots_per_epoch) == slot; + + if randao_msg + && is_first_slot_of_epoch + && validation_context.slot_clock.now().unwrap_or_default() <= slot + && !duty_provider.is_epoch_known_for_proposers(epoch) + { + return Ok(()); + } + + // Non-committee roles always have one validator index + let validator_index = validation_context + .committee_info + .validator_indices + .first() + .copied() + .unwrap_or_default(); + if !duty_provider.is_validator_proposer_at_slot(slot, validator_index) { + return Err(ValidationFailure::NoDuty); + } + } + + // Rule: For a sync committee duty message, check if the validator is assigned + if role == Role::SyncCommittee { + let period = + sync_committee_period(epoch, validation_context.epochs_per_sync_committee_period)?; + let validator_index = validation_context + .committee_info + .validator_indices + .first() + .copied() + .unwrap_or_default(); + if !duty_provider.is_validator_in_sync_committee(period, validator_index) { + return Err(ValidationFailure::NoDuty); + } + } + + Ok(()) +} + +/// Validates that the message's slot timing is correct +pub(crate) fn validate_slot_time( + msg_slot: Slot, + validation_context: &ValidationContext, +) -> Result<(), ValidationFailure> { + // Check if the message is too early + let earliness = message_earliness(msg_slot, validation_context)?; + if earliness > CLOCK_ERROR_TOLERANCE { + return Err(EarlySlotMessage { + got: format!("early by {:?}", earliness), + }); + } + + // Check if the message is too late + let lateness = message_lateness(msg_slot, validation_context)?; + if lateness > CLOCK_ERROR_TOLERANCE { + return Err(ValidationFailure::LateSlotMessage { + got: format!("late by {:?}", lateness), + }); + } + + Ok(()) +} + +/// Returns how early a message is compared to its slot start time. +/// Returns a zero duration if the message is on time or late. +fn message_earliness( + slot: Slot, + validation_context: &ValidationContext, +) -> Result { + let slot_start = slot_start_time(slot, validation_context.slot_clock.clone()) + .map_err(|_| ValidationFailure::SlotStartTimeNotFound { slot })?; + Ok(slot_start + .duration_since(validation_context.received_at) + .unwrap_or_default()) +} + +/// Returns how late a message is compared to its deadline based on role. +/// If the message was received before the deadline, it returns 0. +/// If the message was received after the deadline, it returns the duration by which it was late. +fn message_lateness( + slot: Slot, + validation_context: &ValidationContext, +) -> Result { + let ttl = match validation_context.role { + Role::Proposer | Role::SyncCommittee => 1 + LATE_SLOT_ALLOWANCE, + Role::Committee | Role::Aggregator => { + validation_context.slots_per_epoch + LATE_SLOT_ALLOWANCE + } + // No lateness check for these roles + Role::ValidatorRegistration | Role::VoluntaryExit => return Ok(Duration::from_secs(0)), + }; + + let deadline = slot_start_time(slot + ttl, validation_context.slot_clock.clone()) + .map_err(|_| ValidationFailure::SlotStartTimeNotFound { slot })? + .checked_add(LATE_MESSAGE_MARGIN) + .ok_or(ValidationFailure::UnexpectedFailure { + msg: "Unexpected overflow calculating message deadline".to_string(), + })?; + + Ok(validation_context + .received_at + .duration_since(deadline) + .unwrap_or_default()) +} + +/// Validates the duty count for a specific message and operator +pub(crate) fn validate_duty_count( + validation_context: &ValidationContext, + slot: Slot, + signer_state: &mut OperatorState, + duty_provider: Arc, +) -> Result<(), ValidationFailure> { + let (limit, should_check) = duty_limit( + validation_context, + slot, + &validation_context.committee_info.validator_indices, + duty_provider, + )?; + + if should_check { + // Get current duty count for this signer + let epoch = slot.epoch(validation_context.slots_per_epoch); + let duty_count = signer_state.get_duty_count(epoch); + + if duty_count >= limit { + return Err(ValidationFailure::ExcessiveDutyCount { + got: duty_count, + limit, + }); + } + } + + Ok(()) +} + +/// Determines duty limit based on role and validator indices +fn duty_limit( + validation_context: &ValidationContext, + slot: Slot, + validator_indices: &[ValidatorIndex], + duty_provider: Arc, +) -> Result<(u64, bool), ValidationFailure> { + match validation_context.role { + Role::VoluntaryExit => { + // TODO For voluntary exit, check the stored duties https://github.com/sigp/anchor/issues/277 + // This would need to be adapted to use the actual duty store + Ok((2, true)) + } + Role::Aggregator | Role::ValidatorRegistration => Ok((2, true)), + Role::Committee => { + let validator_index_count = validator_indices.len() as u64; + let slots_per_epoch_val = validation_context.slots_per_epoch; + + // Skip duty search if validators * 2 exceeds slots per epoch + if validator_index_count < slots_per_epoch_val / 2 { + let epoch = slot.epoch(validation_context.slots_per_epoch); + let period = sync_committee_period( + epoch, + validation_context.epochs_per_sync_committee_period, + )?; + + // Check if at least one validator is in the sync committee + for &index in validator_indices { + if duty_provider.is_validator_in_sync_committee(period, index) { + return Ok((slots_per_epoch_val, true)); + } + } + } + Ok(( + std::cmp::min(slots_per_epoch_val, 2 * validator_index_count), + true, + )) + } + _ => Ok((0, false)), + } +} + #[cfg(test)] mod tests { + use std::time::{SystemTime, UNIX_EPOCH}; + use bls::{Hash256, PublicKeyBytes}; use openssl::hash::MessageDigest; - use slot_clock::ManualSlotClock; use ssv_types::{ consensus::{QbftMessage, QbftMessageType}, domain_type::DomainType, @@ -435,6 +683,29 @@ mod tests { } } + struct MockDutiesProvider {} + impl DutiesProvider for MockDutiesProvider { + fn is_validator_in_sync_committee( + &self, + _committee_period: u64, + _validator_index: ValidatorIndex, + ) -> bool { + true + } + + fn is_epoch_known_for_proposers(&self, _epoch: Epoch) -> bool { + true + } + + fn is_validator_proposer_at_slot( + &self, + _slot: Slot, + _validator_index: ValidatorIndex, + ) -> bool { + true + } + } + // Helper for creating SignedSSVMessage with a QbftMessage fn create_signed_consensus_message( qbft_message: QbftMessage, @@ -548,17 +819,19 @@ mod tests { role: Role::Committee, received_at: SystemTime::now(), operators_pk: &[public_key], + slots_per_epoch: 32, + epochs_per_sync_committee_period: 256, + slot_clock: ManualSlotClock::new( + Slot::new(0), + SystemTime::now().duration_since(UNIX_EPOCH).unwrap(), + Duration::from_secs(1), + ), }; let result = validate_ssv_message( validation_context, &mut ConsensusState::new(2), - 32, - ManualSlotClock::new( - Slot::new(0), - SystemTime::now().duration_since(UNIX_EPOCH).unwrap(), - Duration::from_secs(1), - ), + Arc::new(MockDutiesProvider {}), ); match result { @@ -598,17 +871,19 @@ mod tests { role: Role::Committee, received_at: SystemTime::now(), operators_pk: &generate_random_rsa_public_keys(signed_msg.operator_ids().len()), + slots_per_epoch: 32, + epochs_per_sync_committee_period: 256, + slot_clock: ManualSlotClock::new( + Slot::new(0), + SystemTime::now().duration_since(UNIX_EPOCH).unwrap(), + Duration::from_secs(1), + ), }; let result = validate_ssv_message( validation_context, &mut ConsensusState::new(2), - 32, - ManualSlotClock::new( - Slot::new(0), - SystemTime::now().duration_since(UNIX_EPOCH).unwrap(), - Duration::from_secs(1), - ), + Arc::new(MockDutiesProvider {}), ); assert_validation_error( @@ -1042,6 +1317,8 @@ mod tests { rsa::Rsa, sign::Signer, }; + use slot_clock::ManualSlotClock; + use types::Epoch; #[test] fn test_verify_message_signatures_success() { diff --git a/anchor/message_validator/src/consensus_state.rs b/anchor/message_validator/src/consensus_state.rs index 0da29de0..700d0f1f 100644 --- a/anchor/message_validator/src/consensus_state.rs +++ b/anchor/message_validator/src/consensus_state.rs @@ -99,6 +99,19 @@ impl OperatorState { } } + /// Retrieves the maximum slot number processed for this operator. + pub(crate) fn max_slot(&self) -> Slot { + self.max_slot + } + + pub(crate) fn get_duty_count(&self, epoch: Epoch) -> u64 { + match epoch { + e if e == self.max_epoch => self.last_epoch_duties, + e if e == self.max_epoch - 1 => self.prev_epoch_duties, + _ => 0, // unused because messages from too old epochs must be rejected in advance + } + } + /// Retrieves the SignerState for a given slot, if it exists. /// /// The state is stored in a circular buffer and is accessed using modulo arithmetic. diff --git a/anchor/message_validator/src/duties/duties_tracker.rs b/anchor/message_validator/src/duties/duties_tracker.rs new file mode 100644 index 00000000..5ad28fd9 --- /dev/null +++ b/anchor/message_validator/src/duties/duties_tracker.rs @@ -0,0 +1,340 @@ +use std::{future::Future, sync::Arc}; + +use beacon_node_fallback::BeaconNodeFallback; +use database::NetworkState; +use safe_arith::ArithError; +use slot_clock::SlotClock; +use ssv_types::ValidatorIndex; +use task_executor::TaskExecutor; +use thiserror::Error; +use tokio::{sync::watch, time::sleep}; +use tracing::{debug, error, info, trace, warn}; +use types::{ChainSpec, Epoch, Slot}; + +use crate::duties::{Duties, DutiesProvider}; + +/// Only retain `HISTORICAL_DUTIES_EPOCHS` duties prior to the current epoch. +const HISTORICAL_DUTIES_EPOCHS: u64 = 2; + +#[derive(Error, Debug)] +pub enum Error { + #[error("Unable to read the slot clock")] + UnableToReadSlotClock, + #[error("Arithmetic error")] + Arith(#[allow(dead_code)] ArithError), + #[error("Failed to poll proposers: {0}")] + FailedToPollProposers(String), +} + +pub struct DutiesTracker { + /// Duties data structures + duties: Duties, + /// The beacon node fallback clients + beacon_nodes: Arc>, + /// The chain spec + spec: Arc, + /// The number of slots per epoch + slots_per_epoch: u64, + /// The slot clock. + slot_clock: T, + /// The network state receiver. + network_state_rx: watch::Receiver, +} + +impl DutiesTracker { + pub fn new( + beacon_nodes: Arc>, + spec: Arc, + slots_per_epoch: u64, + slot_clock: T, + network_state_rx: watch::Receiver, + ) -> Self { + Self { + duties: Duties::new(), + beacon_nodes, + spec, + slots_per_epoch, + slot_clock, + network_state_rx, + } + } + + async fn poll_sync_committee_duties(&self) -> Result<(), Error> { + let sync_duties = &self.duties.sync_duties; + let spec = &self.spec; + let current_slot = self.slot_clock.now().ok_or(Error::UnableToReadSlotClock)?; + let current_epoch = current_slot.epoch(self.slots_per_epoch); + + // If the Altair fork is yet to be activated, do not attempt to poll for duties. + if spec + .altair_fork_epoch + .is_none_or(|altair_epoch| current_epoch < altair_epoch) + { + return Ok(()); + } + + let current_sync_committee_period = current_epoch + .sync_committee_period(spec) + .map_err(Error::Arith)?; + let next_sync_committee_period = current_sync_committee_period + 1; + + // avoid holding the borrow across .await points + let validator_indices = { + let network_state = self.network_state_rx.borrow(); + network_state.validator_indices() + }; + + // If duties aren't known for the current period, poll for them. + if !sync_duties.all_duties_known(current_sync_committee_period, &validator_indices) { + self.poll_sync_committee_duties_for_period( + validator_indices.as_slice(), + current_sync_committee_period, + ) + .await?; + + // Prune previous duties. + sync_duties.prune(current_sync_committee_period); + } + + // If we're past the point in the current period where we should determine duties for the + // next period and they are not yet known, then poll. + if current_epoch.as_u64() % spec.epochs_per_sync_committee_period.as_u64() + >= epoch_offset(spec) + && !sync_duties.all_duties_known(next_sync_committee_period, &validator_indices) + { + self.poll_sync_committee_duties_for_period( + &validator_indices, + next_sync_committee_period, + ) + .await?; + + // Prune (this is the main code path for updating duties, so we should almost always hit + // this prune). + sync_duties.prune(current_sync_committee_period); + } + + Ok(()) + } + + async fn poll_sync_committee_duties_for_period( + &self, + local_indices: &[u64], + sync_committee_period: u64, + ) -> Result<(), Error> { + // no local validators don't need to poll for sync committee + if local_indices.is_empty() { + debug!( + sync_committee_period, + "No validators, not polling for sync committee duties" + ); + return Ok(()); + } + + debug!( + sync_committee_period, + num_validators = local_indices.len(), + "Fetching sync committee duties" + ); + + let period_start_epoch = self.spec.epochs_per_sync_committee_period * sync_committee_period; + + let duties_response = self + .beacon_nodes + .first_success(|beacon_node| async move { + beacon_node + .post_validator_duties_sync(period_start_epoch, local_indices) + .await + }) + .await; + + let duties = match duties_response { + Ok(res) => res.data, + Err(e) => { + warn!( + sync_committee_period, + error = %e, + "Failed to download sync committee duties" + ); + return Ok(()); + } + }; + + debug!(count = duties.len(), "Fetched sync duties from BN"); + + // Get or create the HashSet for this committee period + let mut validators = self + .duties + .sync_duties + .committees + .entry(sync_committee_period) + .or_default(); + + // Insert only validators that have duties + for duty in duties { + info!( + validator_index = duty.validator_index, + sync_committee_period, "Validator in sync committee" + ); + + // Insert the validator index + validators.insert(duty.validator_index); + } + + Ok(()) + } + + /// Download the proposer duties for the current epoch. + async fn poll_beacon_proposers(&self) -> Result<(), Error> { + let current_slot = self.slot_clock.now().ok_or(Error::UnableToReadSlotClock)?; + let current_epoch = current_slot.epoch(self.slots_per_epoch); + + let download_result = self + .beacon_nodes + .first_success(|beacon_node| async move { + beacon_node + .get_validator_duties_proposer(current_epoch) + .await + }) + .await; + + let result = match download_result { + Ok(response) => { + // avoid holding the borrow across .await points + let validator_indices = { + let network_state = self.network_state_rx.borrow(); + network_state.validator_indices() + }; + + let relevant_duties = response + .data + .into_iter() + .filter(|proposer_duty| { + validator_indices.contains(&proposer_duty.validator_index) + }) + .collect::>(); + + trace!( + num_relevant_duties = relevant_duties.len(), + "Downloaded proposer duties" + ); + + self.duties + .proposers + .write() + .insert(current_epoch, relevant_duties); + Ok(()) + } + // Don't return early here, we"ll try again later + Err(e) => Err(Error::FailedToPollProposers(e.to_string())), + }; + + // Prune old duties. + self.duties + .proposers + .write() + .retain(|&epoch, _| epoch + HISTORICAL_DUTIES_EPOCHS >= current_epoch); + + result + } + + pub fn start(self: Arc, executor: TaskExecutor) { + let self_clone = self.clone(); + self_clone.spawn_polling_task( + |tracker| { + let tracker = tracker.clone(); + async move { tracker.poll_sync_committee_duties().await } + }, + "Failed to poll sync committee duties", + "sync_committee_tracker", + executor.clone(), + ); + + self.spawn_polling_task( + |tracker| { + let tracker = tracker.clone(); + async move { tracker.poll_beacon_proposers().await } + }, + "Failed to poll beacon proposers", + "proposers_tracker", + executor, + ); + } + + fn spawn_polling_task( + self: Arc, + poll_fn: F, + error_msg: &'static str, + task_name: &'static str, + executor: TaskExecutor, + ) where + F: Fn(Arc) -> Fut + Send + 'static, + Fut: Future> + Send + 'static, + { + let duties_tracker = self.clone(); + executor.spawn( + async move { + loop { + if let Err(e) = poll_fn(duties_tracker.clone()).await { + error!( + error = ?e, + error_msg + ); + } + + trace!(sync_committee = ?duties_tracker.duties.sync_duties); + + // Wait until the next slot before polling again. + // This doesn't mean that the beacon node will get polled every slot + // as the sync duties service will return early if it deems it already has + // enough information. + if let Some(duration) = duties_tracker.slot_clock.duration_to_next_slot() { + sleep(duration).await; + } else { + // Just sleep for one slot if we are unable to read the system clock, this + // gives us an opportunity for the clock to + // eventually come good. + sleep(duties_tracker.slot_clock.slot_duration()).await; + continue; + } + } + }, + task_name, + ); + } +} + +impl DutiesProvider for DutiesTracker { + fn is_validator_in_sync_committee( + &self, + committee_period: u64, + validator_index: ValidatorIndex, + ) -> bool { + self.duties + .sync_duties + .is_validator_in_sync_committee(committee_period, validator_index.into()) + } + + fn is_epoch_known_for_proposers(&self, epoch: Epoch) -> bool { + self.duties.proposers.read().contains_key(&epoch) + } + + fn is_validator_proposer_at_slot(&self, slot: Slot, validator_index: ValidatorIndex) -> bool { + let epoch = slot.epoch(self.slots_per_epoch); + let validator_index: u64 = validator_index.into(); + self.duties + .proposers + .read() + .get(&epoch) + .map(|proposers| { + proposers.iter().any(|proposer_data| { + proposer_data.slot == slot && proposer_data.validator_index == validator_index + }) + }) + .unwrap_or_default() + } +} + +/// Number of epochs to wait from the start of the period before actually fetching duties. +fn epoch_offset(spec: &ChainSpec) -> u64 { + spec.epochs_per_sync_committee_period.as_u64() / 2 +} diff --git a/anchor/message_validator/src/duties/mod.rs b/anchor/message_validator/src/duties/mod.rs new file mode 100644 index 00000000..46a7692e --- /dev/null +++ b/anchor/message_validator/src/duties/mod.rs @@ -0,0 +1,103 @@ +use std::collections::{HashMap, HashSet}; + +use dashmap::DashMap; +use eth2::types::ProposerData; +use parking_lot::RwLock; +use ssv_types::ValidatorIndex; +use types::{Epoch, Slot}; + +pub mod duties_tracker; + +/// Top-level data-structure containing sync duty information. +/// +/// This data is structured using a `DashMap` which provides concurrent read/write access +/// with fine-grained locking at the entry level. This allows multiple threads to access +/// different entries without blocking each other. +/// +/// Key benefits of using DashMap over RwLock: +/// 1. Fine-grained locking at the individual entry level rather than the entire map +/// 2. Better performance in concurrent scenarios with many readers and occasional writers +/// 3. Simpler code that doesn't require explicit lock acquisition +/// +/// The structure only stores validators that actually have sync committee duties, which +/// helps reduce memory usage compared to storing all validators and marking some as not +/// having duties. +#[derive(Debug)] +pub struct SyncCommitteePerPeriod { + /// Map from sync committee period to validators that are members of that sync committee. + /// Only validators with actual duties are stored in the HashSet for each period. + committees: DashMap>, +} + +impl SyncCommitteePerPeriod { + fn new() -> Self { + Self { + committees: DashMap::new(), + } + } + + /// Check if duties are already known for all of the given validators for `committee_period`. + fn all_duties_known(&self, committee_period: u64, validator_indices: &[u64]) -> bool { + self.committees + .get(&committee_period) + .is_some_and(|validators| { + validator_indices + .iter() + .all(|index| validators.contains(index)) + }) + } + + /// Prune duties for past sync committee periods from the map. + fn prune(&self, current_sync_committee_period: u64) { + self.committees + .retain(|period, _| *period >= current_sync_committee_period) + } + + pub fn is_validator_in_sync_committee( + &self, + committee_period: u64, + validator_index: u64, + ) -> bool { + self.committees + .get(&committee_period) + .is_some_and(|validator_indices| validator_indices.contains(&validator_index)) + } +} + +type ProposerMap = HashMap>; + +#[derive(Debug)] +pub struct Duties { + /// Maps an epoch to all *local* proposers in this epoch. Notably, this does not contain + /// proposals for any validators which are not registered locally. + pub proposers: RwLock, + /// Map from validator index to sync committee duties. + pub sync_duties: SyncCommitteePerPeriod, +} + +impl Duties { + pub fn new() -> Self { + Self { + proposers: RwLock::new(HashMap::new()), + sync_duties: SyncCommitteePerPeriod::new(), + } + } +} + +impl Default for Duties { + fn default() -> Self { + Self::new() + } +} + +pub trait DutiesProvider: Sync + Send + 'static { + fn is_validator_in_sync_committee( + &self, + committee_period: u64, + validator_index: ValidatorIndex, + ) -> bool; + + fn is_epoch_known_for_proposers(&self, epoch: Epoch) -> bool; + + fn is_validator_proposer_at_slot(&self, slot: Slot, validator_index: ValidatorIndex) -> bool; +} diff --git a/anchor/message_validator/src/lib.rs b/anchor/message_validator/src/lib.rs index 46f050fc..21ce30a6 100644 --- a/anchor/message_validator/src/lib.rs +++ b/anchor/message_validator/src/lib.rs @@ -1,9 +1,13 @@ mod consensus_message; mod consensus_state; +mod duties; mod message_counts; mod partial_signature; -use std::time::SystemTime; +use std::{ + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, +}; use dashmap::{mapref::one::RefMut, DashMap}; use database::NetworkState; @@ -14,6 +18,7 @@ use openssl::{ rsa::Rsa, sign::Verifier, }; +use safe_arith::SafeArith; use sha2::{Digest, Sha256}; use slot_clock::SlotClock; use ssv_types::{ @@ -26,7 +31,9 @@ use ssv_types::{ use ssz::{Decode, Encode}; use tokio::sync::watch::Receiver; use tracing::{error, trace}; +use types::{Epoch, Slot}; +pub use crate::duties::{duties_tracker::DutiesTracker, DutiesProvider}; use crate::{ consensus_message::validate_consensus_message, consensus_state::ConsensusState, partial_signature::validate_partial_signature_message, @@ -40,9 +47,16 @@ pub enum ValidationFailure { UnknownValidator, ValidatorLiquidated, ValidatorNotAttesting, - EarlySlotMessage, - LateSlotMessage, - SlotAlreadyAdvanced, + EarlySlotMessage { + got: String, + }, + LateSlotMessage { + got: String, + }, + SlotAlreadyAdvanced { + got: u64, + want: u64, + }, RoundAlreadyAdvanced { got: u64, want: u64, @@ -123,10 +137,20 @@ pub enum ValidationFailure { TooManyPartialSignatureMessages, EncodeOperators, FailedToGetMaxRound, - SlotStartTimeNotFound, + SlotStartTimeNotFound { + slot: Slot, + }, SignatureVerificationFailed { reason: String, }, + ExcessiveDutyCount { + got: u64, + limit: u64, + }, + SyncCommitteePeriodCalculationFailure, + UnexpectedFailure { + msg: String, + }, } impl From<&ValidationFailure> for MessageAcceptance { @@ -137,9 +161,9 @@ impl From<&ValidationFailure> for MessageAcceptance { | ValidationFailure::UnknownValidator | ValidationFailure::ValidatorLiquidated | ValidationFailure::ValidatorNotAttesting - | ValidationFailure::EarlySlotMessage - | ValidationFailure::LateSlotMessage - | ValidationFailure::SlotAlreadyAdvanced + | ValidationFailure::EarlySlotMessage { .. } + | ValidationFailure::LateSlotMessage { .. } + | ValidationFailure::SlotAlreadyAdvanced { .. } | ValidationFailure::RoundAlreadyAdvanced { .. } | ValidationFailure::DecidedWithSameSigners | ValidationFailure::PubSubDataTooBig(_) @@ -184,31 +208,40 @@ pub enum Error { Processor(#[from] ::processor::Error), } -struct ValidationContext<'a> { +struct ValidationContext<'a, S> { pub signed_ssv_message: &'a SignedSSVMessage, pub role: Role, // Small value type can remain owned pub committee_info: &'a CommitteeInfo, pub received_at: SystemTime, // Small value type pub operators_pk: &'a [Rsa], + pub slots_per_epoch: u64, + pub epochs_per_sync_committee_period: u64, + pub slot_clock: S, } -pub struct Validator { +pub struct Validator { network_state_rx: Receiver, consensus_state_map: DashMap, slots_per_epoch: u64, + epochs_per_sync_committee_period: u64, + duties_provider: Arc, slot_clock: S, } -impl Validator { +impl Validator { pub fn new( network_state_rx: Receiver, slots_per_epoch: u64, + epochs_per_sync_committee_period: u64, + duties_provider: Arc, slot_clock: S, ) -> Self { Self { network_state_rx, consensus_state_map: DashMap::new(), slots_per_epoch, + epochs_per_sync_committee_period, + duties_provider, slot_clock, } } @@ -261,13 +294,15 @@ impl Validator { committee_info: &committee_info, received_at: SystemTime::now(), operators_pk: &operators_pks, + slots_per_epoch: self.slots_per_epoch, + epochs_per_sync_committee_period: self.epochs_per_sync_committee_period, + slot_clock: self.slot_clock.clone(), }; validate_ssv_message( validation_context, consensus_state.value_mut(), - self.slots_per_epoch, - self.slot_clock.clone(), + self.duties_provider.clone(), ) .map(|validated| ValidatedMessage::new(signed_ssv_message.clone(), validated)) } @@ -295,20 +330,16 @@ impl Validator { } fn validate_ssv_message( - validation_context: ValidationContext, + validation_context: ValidationContext, consensus_state: &mut ConsensusState, - slots_per_epoch: u64, - slot_clock: impl SlotClock, + duty_provider: Arc, ) -> Result { let ssv_message = validation_context.signed_ssv_message.ssv_message(); match ssv_message.msg_type() { - MsgType::SSVConsensusMsgType => validate_consensus_message( - validation_context, - consensus_state, - slots_per_epoch, - slot_clock, - ), + MsgType::SSVConsensusMsgType => { + validate_consensus_message(validation_context, consensus_state, duty_provider) + } MsgType::SSVPartialSignatureMsgType => { validate_partial_signature_message(validation_context) } @@ -370,6 +401,28 @@ fn verify_message_signatures( Ok(()) } +#[derive(thiserror::Error, Debug)] +pub enum TimeError { + #[error("clock start-of-slot overflow for slot {0}")] + Overflow(Slot), +} + +pub fn slot_start_time(slot: Slot, slot_clock: impl SlotClock) -> Result { + let dur = slot_clock.start_of(slot).ok_or(TimeError::Overflow(slot))?; + Ok(UNIX_EPOCH + dur) +} + +/// Compute the sync committee period for an epoch. +pub fn sync_committee_period( + epoch: Epoch, + epochs_per_sync_committee_period: u64, +) -> Result { + Ok(epoch + .safe_div(epochs_per_sync_committee_period) + .map_err(|_| ValidationFailure::SyncCommitteePeriodCalculationFailure)? + .as_u64()) +} + pub(crate) fn compute_quorum_size(committee_size: usize) -> usize { let f = get_f(committee_size); f * 2 + 1 diff --git a/anchor/message_validator/src/partial_signature.rs b/anchor/message_validator/src/partial_signature.rs index 2a15cc54..0cb2bf98 100644 --- a/anchor/message_validator/src/partial_signature.rs +++ b/anchor/message_validator/src/partial_signature.rs @@ -1,3 +1,4 @@ +use slot_clock::SlotClock; use ssv_types::{ msgid::Role, partial_sig::{PartialSignatureKind, PartialSignatureMessages}, @@ -7,7 +8,7 @@ use ssz::Decode; use crate::{verify_message_signature, ValidatedSSVMessage, ValidationContext, ValidationFailure}; pub(crate) fn validate_partial_signature_message( - validation_context: ValidationContext, + validation_context: ValidationContext, ) -> Result { // Decode message directly to PartialSignatureMessages let messages = match PartialSignatureMessages::from_ssz_bytes( @@ -43,7 +44,7 @@ pub(crate) fn validate_partial_signature_message( } fn validate_partial_signature_message_semantics( - validation_context: &ValidationContext, + validation_context: &ValidationContext, partial_signature_messages: &PartialSignatureMessages, ) -> Result<(), ValidationFailure> { // Rule: Partial Signature message must have 1 signer @@ -120,7 +121,7 @@ fn partial_signature_type_matches_role(kind: PartialSignatureKind, role: Role) - #[cfg(test)] mod tests { - use std::time::SystemTime; + use std::time::{Duration, SystemTime, UNIX_EPOCH}; use bls::{Hash256, Signature}; use openssl::{ @@ -129,6 +130,7 @@ mod tests { rsa::Rsa, sign::Signer, }; + use slot_clock::{ManualSlotClock, SlotClock}; use ssv_types::{ message::{MsgType, SSVMessage, SignedSSVMessage, RSA_SIGNATURE_SIZE}, partial_sig::PartialSignatureMessage, @@ -216,6 +218,29 @@ mod tests { (private_key, public_key) } + // Helper function to create a ValidationContext for testing + fn create_test_validation_context<'a>( + signed_msg: &'a SignedSSVMessage, + committee_info: &'a crate::CommitteeInfo, + role: Role, + operators_pk: &'a [Rsa], + ) -> ValidationContext<'a, ManualSlotClock> { + ValidationContext { + signed_ssv_message: signed_msg, + committee_info, + role, + received_at: SystemTime::now(), + operators_pk, + slots_per_epoch: 32, + epochs_per_sync_committee_period: 256, + slot_clock: ManualSlotClock::new( + Slot::new(0), + SystemTime::now().duration_since(UNIX_EPOCH).unwrap(), + Duration::from_secs(1), + ), + } + } + #[test] fn test_partial_signature_message_with_invalid_type_for_role() { let committee_info = create_committee_info(FOUR_NODE_COMMITTEE); @@ -228,13 +253,9 @@ mod tests { None, ); - let validation_context = ValidationContext { - signed_ssv_message: &signed_msg, - committee_info: &committee_info, - role: Role::Committee, - received_at: SystemTime::now(), - operators_pk: &generate_random_rsa_public_keys(signed_msg.operator_ids().len()), - }; + let binding = generate_random_rsa_public_keys(signed_msg.operator_ids().len()); + let validation_context = + create_test_validation_context(&signed_msg, &committee_info, Role::Committee, &binding); let result = validate_partial_signature_message(validation_context); @@ -273,13 +294,9 @@ mod tests { let signed_msg = SignedSSVMessage::new(signatures, signers, ssv_msg, vec![]) .expect("SignedSSVMessage should be created"); - let validation_context = ValidationContext { - signed_ssv_message: &signed_msg, - committee_info: &committee_info, - role: Role::Proposer, - received_at: SystemTime::now(), - operators_pk: &generate_random_rsa_public_keys(signed_msg.operator_ids().len()), - }; + let binding = generate_random_rsa_public_keys(signed_msg.operator_ids().len()); + let validation_context = + create_test_validation_context(&signed_msg, &committee_info, Role::Proposer, &binding); let result = validate_partial_signature_message(validation_context); @@ -305,13 +322,9 @@ mod tests { None, ); - let validation_context = ValidationContext { - signed_ssv_message: &signed_msg, - committee_info: &committee_info, - role: Role::Proposer, - received_at: SystemTime::now(), - operators_pk: &generate_random_rsa_public_keys(signed_msg.operator_ids().len()), - }; + let binding = generate_random_rsa_public_keys(signed_msg.operator_ids().len()); + let validation_context = + create_test_validation_context(&signed_msg, &committee_info, Role::Proposer, &binding); let result = validate_partial_signature_message(validation_context); @@ -337,13 +350,9 @@ mod tests { None, ); - let validation_context = ValidationContext { - signed_ssv_message: &signed_msg, - committee_info: &committee_info, - role: Role::Proposer, - received_at: SystemTime::now(), - operators_pk: &generate_random_rsa_public_keys(signed_msg.operator_ids().len()), - }; + let binding = generate_random_rsa_public_keys(signed_msg.operator_ids().len()); + let validation_context = + create_test_validation_context(&signed_msg, &committee_info, Role::Proposer, &binding); let result = validate_partial_signature_message(validation_context); @@ -369,13 +378,9 @@ mod tests { None, ); - let validation_context = ValidationContext { - signed_ssv_message: &signed_msg, - committee_info: &committee_info, - role: Role::Proposer, - received_at: SystemTime::now(), - operators_pk: &generate_random_rsa_public_keys(signed_msg.operator_ids().len()), - }; + let binding = generate_random_rsa_public_keys(signed_msg.operator_ids().len()); + let validation_context = + create_test_validation_context(&signed_msg, &committee_info, Role::Proposer, &binding); let result = validate_partial_signature_message(validation_context); @@ -399,13 +404,9 @@ mod tests { Some(private_key), ); - let validation_context = ValidationContext { - signed_ssv_message: &signed_msg, - committee_info: &committee_info, - role: Role::Proposer, - received_at: SystemTime::now(), - operators_pk: &[public_key], - }; + let binding = [public_key]; + let validation_context = + create_test_validation_context(&signed_msg, &committee_info, Role::Proposer, &binding); let result = validate_partial_signature_message(validation_context); @@ -441,13 +442,13 @@ mod tests { None, ); - let validation_context = ValidationContext { - signed_ssv_message: &signed_msg, - committee_info: &committee_info, - role: Role::Proposer, // Not a committee role, so validator index is checked - received_at: SystemTime::now(), - operators_pk: &generate_random_rsa_public_keys(signed_msg.operator_ids().len()), - }; + let binding = generate_random_rsa_public_keys(signed_msg.operator_ids().len()); + let validation_context = create_test_validation_context( + &signed_msg, + &committee_info, + Role::Proposer, // Not a committee role, so validator index is checked + &binding, + ); let result = validate_partial_signature_message(validation_context); @@ -478,13 +479,13 @@ mod tests { Some(private_key), ); - let validation_context = ValidationContext { - signed_ssv_message: &signed_msg, - committee_info: &committee_info, - role: Role::Committee, // Committee role, so validator index is not checked - received_at: SystemTime::now(), - operators_pk: &[public_key], - }; + let binding = [public_key]; + let validation_context = create_test_validation_context( + &signed_msg, + &committee_info, + Role::Committee, // Committee role, so validator index is not checked + &binding, + ); let result = validate_partial_signature_message(validation_context);