diff --git a/common/task_executor/src/rayon_pool_provider.rs b/common/task_executor/src/rayon_pool_provider.rs index 8e12f7eaa49..ea8c988fb76 100644 --- a/common/task_executor/src/rayon_pool_provider.rs +++ b/common/task_executor/src/rayon_pool_provider.rs @@ -15,7 +15,7 @@ pub struct RayonPoolProvider { /// By default ~25% of CPUs or a minimum of 1 thread. low_priority_thread_pool: Arc, /// Larger rayon thread pool for high-priority, compute-intensive tasks. - /// By default ~80% of CPUs or a minimum of 1 thread. Citical/highest + /// By default ~80% of CPUs or a minimum of 1 thread. Critical/highest /// priority tasks should use the global pool instead. high_priority_thread_pool: Arc, } diff --git a/testing/web3signer_tests/src/lib.rs b/testing/web3signer_tests/src/lib.rs index b37321617b8..a0067796544 100644 --- a/testing/web3signer_tests/src/lib.rs +++ b/testing/web3signer_tests/src/lib.rs @@ -539,6 +539,56 @@ mod tests { } self } + + /// Assert that a slashable attestation fails to be signed locally (empty result) and is + /// either signed or not by the web3signer rig depending on the value of + /// `web3signer_should_sign`. + /// + /// The batch attestation signing API returns an empty result instead of an error for + /// slashable attestations. + pub async fn assert_slashable_attestation_should_sign( + self, + case_name: &str, + generate_sig: F, + web3signer_should_sign: bool, + ) -> Self + where + F: Fn(PublicKeyBytes, Arc>) -> R, + R: Future>, lighthouse_validator_store::Error>>, + { + for validator_rig in &self.validator_rigs { + let result = + generate_sig(self.validator_pubkey, validator_rig.validator_store.clone()) + .await; + + if !validator_rig.using_web3signer || !web3signer_should_sign { + // For local validators, slashable attestations should return an empty result + // or an error. + match result { + Ok(attestations) => { + assert!( + attestations.is_empty(), + "should not sign slashable {case_name}: expected empty result" + ); + } + Err(ValidatorStoreError::Slashable(_)) => { + // Also acceptable - error indicates slashable + } + Err(e) => { + panic!("unexpected error for slashable {case_name}: {e:?}"); + } + } + } else { + // Web3signer should sign (has its own slashing protection) + let attestations = result.expect("should sign slashable {case_name}"); + assert!( + !attestations.is_empty(), + "web3signer should sign slashable {case_name}" + ); + } + } + self + } } /// Get a generic, arbitrary attestation for signing. @@ -605,12 +655,13 @@ mod tests { }) .await .assert_signatures_match("attestation", |pubkey, validator_store| async move { - let mut attestation = get_attestation(); + let attestation = get_attestation(); validator_store - .sign_attestation(pubkey, 0, &mut attestation, Epoch::new(0)) + .sign_attestations(vec![(pubkey, 0, attestation)]) .await - .unwrap(); - attestation + .unwrap() + .pop() + .unwrap() }) .await .assert_signatures_match("signed_aggregate", |pubkey, validator_store| async move { @@ -820,8 +871,6 @@ mod tests { block }; - let current_epoch = Epoch::new(5); - TestingRig::new( network, slashing_protection_config, @@ -830,42 +879,43 @@ mod tests { ) .await .assert_signatures_match("first_attestation", |pubkey, validator_store| async move { - let mut attestation = first_attestation(); + let attestation = first_attestation(); validator_store - .sign_attestation(pubkey, 0, &mut attestation, current_epoch) + .sign_attestations(vec![(pubkey, 0, attestation)]) .await - .unwrap(); - attestation + .unwrap() + .pop() + .unwrap() }) .await - .assert_slashable_message_should_sign( + .assert_slashable_attestation_should_sign( "double_vote_attestation", move |pubkey, validator_store| async move { - let mut attestation = double_vote_attestation(); + let attestation = double_vote_attestation(); validator_store - .sign_attestation(pubkey, 0, &mut attestation, current_epoch) + .sign_attestations(vec![(pubkey, 0, attestation)]) .await }, slashable_message_should_sign, ) .await - .assert_slashable_message_should_sign( + .assert_slashable_attestation_should_sign( "surrounding_attestation", move |pubkey, validator_store| async move { - let mut attestation = surrounding_attestation(); + let attestation = surrounding_attestation(); validator_store - .sign_attestation(pubkey, 0, &mut attestation, current_epoch) + .sign_attestations(vec![(pubkey, 0, attestation)]) .await }, slashable_message_should_sign, ) .await - .assert_slashable_message_should_sign( + .assert_slashable_attestation_should_sign( "surrounded_attestation", move |pubkey, validator_store| async move { - let mut attestation = surrounded_attestation(); + let attestation = surrounded_attestation(); validator_store - .sign_attestation(pubkey, 0, &mut attestation, current_epoch) + .sign_attestations(vec![(pubkey, 0, attestation)]) .await }, slashable_message_should_sign, diff --git a/validator_client/http_api/src/tests/keystores.rs b/validator_client/http_api/src/tests/keystores.rs index eeb3cd94de0..d248ed77588 100644 --- a/validator_client/http_api/src/tests/keystores.rs +++ b/validator_client/http_api/src/tests/keystores.rs @@ -1099,14 +1099,18 @@ async fn generic_migration_test( check_keystore_import_response(&import_res, all_imported(keystores.len())); // Sign attestations on VC1. - for (validator_index, mut attestation) in first_vc_attestations { + for (validator_index, attestation) in first_vc_attestations { let public_key = keystore_pubkey(&keystores[validator_index]); - let current_epoch = attestation.data().target.epoch; - tester1 + let safe_attestations = tester1 .validator_store - .sign_attestation(public_key, 0, &mut attestation, current_epoch) + .sign_attestations(vec![(public_key, 0, attestation.clone())]) .await .unwrap(); + assert_eq!(safe_attestations.len(), 1); + // Compare data only, ignoring signatures which are added during signing. + assert_eq!(safe_attestations[0].data(), attestation.data()); + // Check that the signature is non-zero. + assert!(!safe_attestations[0].signature().is_infinity()); } // Delete the selected keys from VC1. @@ -1178,16 +1182,28 @@ async fn generic_migration_test( check_keystore_import_response(&import_res, all_imported(import_indices.len())); // Sign attestations on the second VC. - for (validator_index, mut attestation, should_succeed) in second_vc_attestations { + for (validator_index, attestation, should_succeed) in second_vc_attestations { let public_key = keystore_pubkey(&keystores[validator_index]); - let current_epoch = attestation.data().target.epoch; - match tester2 + let result = tester2 .validator_store - .sign_attestation(public_key, 0, &mut attestation, current_epoch) - .await - { - Ok(()) => assert!(should_succeed), - Err(e) => assert!(!should_succeed, "{:?}", e), + .sign_attestations(vec![(public_key, 0, attestation.clone())]) + .await; + match result { + Ok(safe_attestations) => { + if should_succeed { + // Compare data only, ignoring signatures which are added during signing. + assert_eq!(safe_attestations.len(), 1); + assert_eq!(safe_attestations[0].data(), attestation.data()); + // Check that the signature is non-zero. + assert!(!safe_attestations[0].signature().is_infinity()); + } else { + assert!(safe_attestations.is_empty()); + } + } + Err(_) => { + // Doppelganger protected or other error. + assert!(!should_succeed); + } } } }) @@ -1313,10 +1329,10 @@ async fn delete_concurrent_with_signing() { let handle = handle.spawn(async move { for j in 0..num_attestations { - let mut att = make_attestation(j, j + 1); + let att = make_attestation(j, j + 1); for public_key in thread_pubkeys.iter() { let _ = validator_store - .sign_attestation(*public_key, 0, &mut att, Epoch::new(j + 1)) + .sign_attestations(vec![(*public_key, 0, att.clone())]) .await; } } diff --git a/validator_client/lighthouse_validator_store/Cargo.toml b/validator_client/lighthouse_validator_store/Cargo.toml index 01c7616be15..55d5f1cf32e 100644 --- a/validator_client/lighthouse_validator_store/Cargo.toml +++ b/validator_client/lighthouse_validator_store/Cargo.toml @@ -12,6 +12,7 @@ doppelganger_service = { workspace = true } either = { workspace = true } environment = { workspace = true } eth2 = { workspace = true } +futures = { workspace = true } initialized_validators = { workspace = true } logging = { workspace = true } parking_lot = { workspace = true } diff --git a/validator_client/lighthouse_validator_store/src/lib.rs b/validator_client/lighthouse_validator_store/src/lib.rs index 3bea21a05d8..8c79f0c9d4f 100644 --- a/validator_client/lighthouse_validator_store/src/lib.rs +++ b/validator_client/lighthouse_validator_store/src/lib.rs @@ -2,6 +2,7 @@ use account_utils::validator_definitions::{PasswordStorage, ValidatorDefinition} use bls::{PublicKeyBytes, Signature}; use doppelganger_service::DoppelgangerService; use eth2::types::PublishBlockRequest; +use futures::future::join_all; use initialized_validators::InitializedValidators; use logging::crit; use parking_lot::{Mutex, RwLock}; @@ -9,7 +10,7 @@ use serde::{Deserialize, Serialize}; use signing_method::Error as SigningError; use signing_method::{SignableMessage, SigningContext, SigningMethod}; use slashing_protection::{ - InterchangeError, NotSafe, Safe, SlashingDatabase, interchange::Interchange, + CheckSlashability, InterchangeError, NotSafe, Safe, SlashingDatabase, interchange::Interchange, }; use slot_clock::SlotClock; use std::marker::PhantomData; @@ -52,7 +53,7 @@ pub struct Config { /// Number of epochs of slashing protection history to keep. /// /// This acts as a maximum safe-guard against clock drift. -const SLASHING_PROTECTION_HISTORY_EPOCHS: u64 = 512; +const SLASHING_PROTECTION_HISTORY_EPOCHS: u64 = 1; /// Currently used as the default gas limit in execution clients. /// @@ -556,6 +557,140 @@ impl LighthouseValidatorStore { signature, }) } + + /// Sign an attestation without performing any slashing protection checks. + /// + /// THIS METHOD IS DANGEROUS AND SHOULD ONLY BE USED INTERNALLY IMMEDIATELY PRIOR TO A + /// SLASHING PROTECTION CHECK. See `slashing_protect_attestations`. + /// + /// This method DOES perform doppelganger protection checks. + #[instrument(level = "debug", skip_all)] + async fn sign_attestation_no_slashing_protection( + &self, + validator_pubkey: PublicKeyBytes, + validator_committee_position: usize, + attestation: &mut Attestation, + ) -> Result<(), Error> { + // Get the signing method and check doppelganger protection. + let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?; + + // Sign the attestation. + let signing_epoch = attestation.data().target.epoch; + let signing_context = self.signing_context(Domain::BeaconAttester, signing_epoch); + + let signature = signing_method + .get_signature::>( + SignableMessage::AttestationData(attestation.data()), + signing_context, + &self.spec, + &self.task_executor, + ) + .await?; + attestation + .add_signature(&signature, validator_committee_position) + .map_err(Error::UnableToSignAttestation)?; + + Ok(()) + } + + /// Provide slashing protection for `attestations`, safely updating the slashing protection DB. + /// + /// Return a vec of safe attestations which have passed slashing protection. Unsafe attestations + /// will be dropped and result in warning logs. + /// + /// This method SKIPS slashing protection for web3signer validators that have slashing + /// protection disabled at the Lighthouse layer. It is up to the user to ensure slashing + /// protection is enabled in web3signer instead. + #[instrument(level = "debug", skip_all)] + fn slashing_protect_attestations( + &self, + attestations: Vec<(Attestation, PublicKeyBytes)>, + ) -> Result>, Error> { + let mut safe_attestations = Vec::with_capacity(attestations.len()); + let mut attestations_to_check = Vec::with_capacity(attestations.len()); + + // Split attestations into de-facto safe attestations (checked by web3signer's slashing + // protection) and ones requiring checking against the slashing protection DB. + // + // All attestations are added to `attestation_to_check`, with skipped attestations having + // `CheckSlashability::No`. + for (attestation, validator_pubkey) in &attestations { + let signing_method = self.doppelganger_checked_signing_method(*validator_pubkey)?; + let signing_epoch = attestation.data().target.epoch; + let signing_context = self.signing_context(Domain::BeaconAttester, signing_epoch); + let domain_hash = signing_context.domain_hash(&self.spec); + + let check_slashability = if signing_method + .requires_local_slashing_protection(self.enable_web3signer_slashing_protection) + { + CheckSlashability::Yes + } else { + CheckSlashability::No + }; + attestations_to_check.push(( + attestation.data(), + validator_pubkey, + domain_hash, + check_slashability, + )); + } + + // Batch check the attestations against the slashing protection DB while preserving the + // order so we can zip the results against the original vec. + // + // If the DB transaction fails then we consider the entire batch slashable and discard it. + let results = self + .slashing_protection + .check_and_insert_attestations(&attestations_to_check) + .map_err(Error::Slashable)?; + + for ((attestation, validator_pubkey), slashing_status) in + attestations.into_iter().zip(results.into_iter()) + { + match slashing_status { + Ok(Safe::Valid) => { + safe_attestations.push(attestation); + validator_metrics::inc_counter_vec( + &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, + &[validator_metrics::SUCCESS], + ); + } + Ok(Safe::SameData) => { + warn!("Skipping previously signed attestation"); + validator_metrics::inc_counter_vec( + &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, + &[validator_metrics::SAME_DATA], + ); + } + Err(NotSafe::UnregisteredValidator(pk)) => { + warn!( + msg = "Carefully consider running with --init-slashing-protection (see --help)", + public_key = ?pk, + "Not signing attestation for unregistered validator" + ); + validator_metrics::inc_counter_vec( + &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, + &[validator_metrics::UNREGISTERED], + ); + } + Err(e) => { + warn!( + slot = %attestation.data().slot, + block_root = ?attestation.data().beacon_block_root, + public_key = ?validator_pubkey, + error = ?e, + "Skipping signing of slashable attestation" + ); + validator_metrics::inc_counter_vec( + &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, + &[validator_metrics::SLASHABLE], + ); + } + } + } + + Ok(safe_attestations) + } } impl ValidatorStore for LighthouseValidatorStore { @@ -747,96 +882,71 @@ impl ValidatorStore for LighthouseValidatorS } } - #[instrument(skip_all)] - async fn sign_attestation( - &self, - validator_pubkey: PublicKeyBytes, - validator_committee_position: usize, - attestation: &mut Attestation, - current_epoch: Epoch, - ) -> Result<(), Error> { - // Make sure the target epoch is not higher than the current epoch to avoid potential attacks. - if attestation.data().target.epoch > current_epoch { - return Err(Error::GreaterThanCurrentEpoch { - epoch: attestation.data().target.epoch, - current_epoch, - }); - } + async fn sign_attestations( + self: &Arc, + mut attestations: Vec<(PublicKeyBytes, usize, Attestation)>, + ) -> Result>, Error> { + // Sign all attestations concurrently. + let signing_futures = + attestations + .iter_mut() + .map(|(pubkey, validator_committee_index, attestation)| { + let pubkey = *pubkey; + let validator_committee_index = *validator_committee_index; + async move { + self.sign_attestation_no_slashing_protection( + pubkey, + validator_committee_index, + attestation, + ) + .await + } + }); - // Get the signing method and check doppelganger protection. - let signing_method = self.doppelganger_checked_signing_method(validator_pubkey)?; + // Execute all signing in parallel. + let results: Vec<_> = join_all(signing_futures).await; - // Checking for slashing conditions. - let signing_epoch = attestation.data().target.epoch; - let signing_context = self.signing_context(Domain::BeaconAttester, signing_epoch); - let domain_hash = signing_context.domain_hash(&self.spec); - let slashing_status = if signing_method - .requires_local_slashing_protection(self.enable_web3signer_slashing_protection) + // Collect successfully signed attestations and log errors. + let mut signed_attestations = Vec::with_capacity(attestations.len()); + for (result, (pubkey, _, attestation)) in results.into_iter().zip(attestations.into_iter()) { - self.slashing_protection.check_and_insert_attestation( - &validator_pubkey, - attestation.data(), - domain_hash, - ) - } else { - Ok(Safe::Valid) - }; - - match slashing_status { - // We can safely sign this attestation. - Ok(Safe::Valid) => { - let signature = signing_method - .get_signature::>( - SignableMessage::AttestationData(attestation.data()), - signing_context, - &self.spec, - &self.task_executor, - ) - .await?; - attestation - .add_signature(&signature, validator_committee_position) - .map_err(Error::UnableToSignAttestation)?; - - validator_metrics::inc_counter_vec( - &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, - &[validator_metrics::SUCCESS], - ); - - Ok(()) - } - Ok(Safe::SameData) => { - warn!("Skipping signing of previously signed attestation"); - validator_metrics::inc_counter_vec( - &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, - &[validator_metrics::SAME_DATA], - ); - Err(Error::SameData) - } - Err(NotSafe::UnregisteredValidator(pk)) => { - warn!( - msg = "Carefully consider running with --init-slashing-protection (see --help)", - public_key = format!("{:?}", pk), - "Not signing attestation for unregistered validator" - ); - validator_metrics::inc_counter_vec( - &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, - &[validator_metrics::UNREGISTERED], - ); - Err(Error::Slashable(NotSafe::UnregisteredValidator(pk))) - } - Err(e) => { - crit!( - attestation = format!("{:?}", attestation.data()), - error = format!("{:?}", e), - "Not signing slashable attestation" - ); - validator_metrics::inc_counter_vec( - &validator_metrics::SIGNED_ATTESTATIONS_TOTAL, - &[validator_metrics::SLASHABLE], - ); - Err(Error::Slashable(e)) + match result { + Ok(()) => { + signed_attestations.push((attestation, pubkey)); + } + Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { + warn!( + info = "a validator may have recently been removed from this VC", + ?pubkey, + "Missing pubkey for attestation" + ); + } + Err(e) => { + crit!( + error = ?e, + "Failed to sign attestation" + ); + } } } + + if signed_attestations.is_empty() { + return Ok(vec![]); + } + + // Check slashing protection and insert into database. Use a dedicated blocking thread + // to avoid clogging the async executor with blocking database I/O. + let validator_store = self.clone(); + let safe_attestations = self + .task_executor + .spawn_blocking_handle( + move || validator_store.slashing_protect_attestations(signed_attestations), + "slashing_protect_attestations", + ) + .ok_or(Error::ExecutorError)? + .await + .map_err(|_| Error::ExecutorError)??; + Ok(safe_attestations) } async fn sign_validator_registration_data( diff --git a/validator_client/signing_method/src/lib.rs b/validator_client/signing_method/src/lib.rs index d0d98689526..bf3cc6a17d5 100644 --- a/validator_client/signing_method/src/lib.rs +++ b/validator_client/signing_method/src/lib.rs @@ -10,7 +10,7 @@ use parking_lot::Mutex; use reqwest::{Client, header::ACCEPT}; use std::path::PathBuf; use std::sync::Arc; -use task_executor::TaskExecutor; +use task_executor::{RayonPoolType, TaskExecutor}; use tracing::instrument; use types::*; use url::Url; @@ -181,14 +181,16 @@ impl SigningMethod { let voting_keypair = voting_keypair.clone(); // Spawn a blocking task to produce the signature. This avoids blocking the core // tokio executor. + // + // We are using the Rayon high-priority pool which uses up to 80% of available + // threads. In future we could consider using 90-100% in the VC, seeing as we have + // very little other work to do aside from signing. let signature = executor - .spawn_blocking_handle( - move || voting_keypair.sk.sign(signing_root), - "local_keystore_signer", - ) - .ok_or(Error::ShuttingDown)? + .spawn_blocking_with_rayon_async(RayonPoolType::HighPriority, move || { + voting_keypair.sk.sign(signing_root) + }) .await - .map_err(|e| Error::TokioJoin(e.to_string()))?; + .map_err(|_| Error::ShuttingDown)?; Ok(signature) } SigningMethod::Web3Signer { diff --git a/validator_client/slashing_protection/src/interchange_test.rs b/validator_client/slashing_protection/src/interchange_test.rs index 0dfcda204d7..c5c3df7ea47 100644 --- a/validator_client/slashing_protection/src/interchange_test.rs +++ b/validator_client/slashing_protection/src/interchange_test.rs @@ -135,12 +135,15 @@ impl MultiTestCase { } for (i, att) in test_case.attestations.iter().enumerate() { - match slashing_db.check_and_insert_attestation_signing_root( - &att.pubkey, - att.source_epoch, - att.target_epoch, - SigningRoot::from(att.signing_root), - ) { + match slashing_db.with_transaction(|txn| { + slashing_db.check_and_insert_attestation_signing_root( + &att.pubkey, + att.source_epoch, + att.target_epoch, + SigningRoot::from(att.signing_root), + txn, + ) + }) { Ok(safe) if !att.should_succeed => { panic!( "attestation {} from `{}` succeeded when it should have failed: {:?}", diff --git a/validator_client/slashing_protection/src/lib.rs b/validator_client/slashing_protection/src/lib.rs index f8580e73158..d8039acda63 100644 --- a/validator_client/slashing_protection/src/lib.rs +++ b/validator_client/slashing_protection/src/lib.rs @@ -16,8 +16,8 @@ pub mod interchange { pub use crate::signed_attestation::{InvalidAttestation, SignedAttestation}; pub use crate::signed_block::{InvalidBlock, SignedBlock}; pub use crate::slashing_database::{ - InterchangeError, InterchangeImportOutcome, SUPPORTED_INTERCHANGE_FORMAT_VERSION, - SlashingDatabase, + CheckSlashability, InterchangeError, InterchangeImportOutcome, + SUPPORTED_INTERCHANGE_FORMAT_VERSION, SlashingDatabase, }; use bls::PublicKeyBytes; use rusqlite::Error as SQLError; diff --git a/validator_client/slashing_protection/src/parallel_tests.rs b/validator_client/slashing_protection/src/parallel_tests.rs index e3cc1a0d567..57709e0bf51 100644 --- a/validator_client/slashing_protection/src/parallel_tests.rs +++ b/validator_client/slashing_protection/src/parallel_tests.rs @@ -44,11 +44,14 @@ fn attestation_same_target() { let results = (0..num_attestations) .into_par_iter() .map(|i| { - slashing_db.check_and_insert_attestation( - &pk, - &attestation_data_builder(i, num_attestations), - DEFAULT_DOMAIN, - ) + slashing_db.with_transaction(|txn| { + slashing_db.check_and_insert_attestation( + &pk, + &attestation_data_builder(i, num_attestations), + DEFAULT_DOMAIN, + txn, + ) + }) }) .collect::>(); @@ -73,7 +76,9 @@ fn attestation_surround_fest() { .into_par_iter() .map(|i| { let att = attestation_data_builder(i, 2 * num_attestations - i); - slashing_db.check_and_insert_attestation(&pk, &att, DEFAULT_DOMAIN) + slashing_db.with_transaction(|txn| { + slashing_db.check_and_insert_attestation(&pk, &att, DEFAULT_DOMAIN, txn) + }) }) .collect::>(); diff --git a/validator_client/slashing_protection/src/slashing_database.rs b/validator_client/slashing_protection/src/slashing_database.rs index 67e1234ac57..bb11e50ffb3 100644 --- a/validator_client/slashing_protection/src/slashing_database.rs +++ b/validator_client/slashing_protection/src/slashing_database.rs @@ -38,6 +38,17 @@ pub struct SlashingDatabase { conn_pool: Pool, } +/// Whether to check slashability of a message. +/// +/// The `No` variant MUST only be used if there is another source of slashing protection configured, +/// e.g. web3signer's slashing protection. +#[derive(Debug, Clone, Copy, Default)] +pub enum CheckSlashability { + #[default] + Yes, + No, +} + impl SlashingDatabase { /// Open an existing database at the given `path`, or create one if none exists. pub fn open_or_create(path: &Path) -> Result { @@ -183,7 +194,9 @@ impl SlashingDatabase { U: From, { let mut conn = self.conn_pool.get().map_err(NotSafe::from)?; - let txn = conn.transaction().map_err(NotSafe::from)?; + let txn = conn + .transaction_with_behavior(TransactionBehavior::Exclusive) + .map_err(NotSafe::from)?; let value = f(&txn)?; txn.commit().map_err(NotSafe::from)?; Ok(value) @@ -635,6 +648,43 @@ impl SlashingDatabase { self.check_block_proposal(&txn, validator_pubkey, slot, signing_root) } + #[instrument(name = "db_check_and_insert_attestations", level = "debug", skip_all)] + pub fn check_and_insert_attestations<'a>( + &self, + attestations: &'a [( + &'a AttestationData, + &'a PublicKeyBytes, + Hash256, + CheckSlashability, + )], + ) -> Result>, NotSafe> { + let mut conn = self.conn_pool.get()?; + let txn = conn.transaction_with_behavior(TransactionBehavior::Exclusive)?; + + let mut results = Vec::with_capacity(attestations.len()); + for (attestation, validator_pubkey, domain, check_slashability) in attestations { + match check_slashability { + CheckSlashability::No => { + results.push(Ok(Safe::Valid)); + } + CheckSlashability::Yes => { + let attestation_signing_root = attestation.signing_root(*domain).into(); + results.push(self.check_and_insert_attestation_signing_root( + validator_pubkey, + attestation.source.epoch, + attestation.target.epoch, + attestation_signing_root, + &txn, + )); + } + } + } + + txn.commit()?; + + Ok(results) + } + /// Check an attestation for slash safety, and if it is safe, record it in the database. /// /// The checking and inserting happen atomically and exclusively. We enforce exclusivity @@ -647,6 +697,7 @@ impl SlashingDatabase { validator_pubkey: &PublicKeyBytes, attestation: &AttestationData, domain: Hash256, + txn: &Transaction, ) -> Result { let attestation_signing_root = attestation.signing_root(domain).into(); self.check_and_insert_attestation_signing_root( @@ -654,6 +705,7 @@ impl SlashingDatabase { attestation.source.epoch, attestation.target.epoch, attestation_signing_root, + txn, ) } @@ -664,17 +716,15 @@ impl SlashingDatabase { att_source_epoch: Epoch, att_target_epoch: Epoch, att_signing_root: SigningRoot, + txn: &Transaction, ) -> Result { - let mut conn = self.conn_pool.get()?; - let txn = conn.transaction_with_behavior(TransactionBehavior::Exclusive)?; let safe = self.check_and_insert_attestation_signing_root_txn( validator_pubkey, att_source_epoch, att_target_epoch, att_signing_root, - &txn, + txn, )?; - txn.commit()?; Ok(safe) } diff --git a/validator_client/slashing_protection/src/test_utils.rs b/validator_client/slashing_protection/src/test_utils.rs index 39ede58bb27..28370ba3e89 100644 --- a/validator_client/slashing_protection/src/test_utils.rs +++ b/validator_client/slashing_protection/src/test_utils.rs @@ -1,3 +1,4 @@ +use crate::slashing_database::CheckSlashability; use crate::*; use tempfile::{TempDir, tempdir}; use types::{AttestationData, BeaconBlockHeader, test_utils::generate_deterministic_keypair}; @@ -72,6 +73,12 @@ impl Default for StreamTest { impl StreamTest { pub fn run(&self) { + self.run_solo(); + self.run_batched(); + } + + // Run the test with every attestation processed individually. + pub fn run_solo(&self) { let dir = tempdir().unwrap(); let slashing_db_file = dir.path().join("slashing_protection.sqlite"); let slashing_db = SlashingDatabase::create(&slashing_db_file).unwrap(); @@ -84,7 +91,12 @@ impl StreamTest { for (i, test) in self.cases.iter().enumerate() { assert_eq!( - slashing_db.check_and_insert_attestation(&test.pubkey, &test.data, test.domain), + slashing_db.with_transaction(|txn| slashing_db.check_and_insert_attestation( + &test.pubkey, + &test.data, + test.domain, + txn + )), test.expected, "attestation {} not processed as expected", i @@ -93,6 +105,48 @@ impl StreamTest { roundtrip_database(&dir, &slashing_db, self.registered_validators.is_empty()); } + + // Run the test with all attestations processed by the slashing DB as part of a batch. + pub fn run_batched(&self) { + let dir = tempdir().unwrap(); + let slashing_db_file = dir.path().join("slashing_protection.sqlite"); + let slashing_db = SlashingDatabase::create(&slashing_db_file).unwrap(); + + for pubkey in &self.registered_validators { + slashing_db.register_validator(*pubkey).unwrap(); + } + + check_registration_invariants(&slashing_db, &self.registered_validators); + + let attestations_to_check = self + .cases + .iter() + .map(|test| { + ( + &test.data, + &test.pubkey, + test.domain, + CheckSlashability::Yes, + ) + }) + .collect::>(); + + let results = slashing_db + .check_and_insert_attestations(&attestations_to_check) + .unwrap(); + + assert_eq!(results.len(), self.cases.len()); + + for ((i, test), result) in self.cases.iter().enumerate().zip(results) { + assert_eq!( + result, test.expected, + "attestation {} not processed as expected", + i + ); + } + + roundtrip_database(&dir, &slashing_db, self.registered_validators.is_empty()); + } } impl StreamTest { diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index 587d4668b8a..44c1745acaa 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -8,7 +8,7 @@ use std::ops::Deref; use std::sync::Arc; use task_executor::TaskExecutor; use tokio::time::{Duration, Instant, sleep, sleep_until}; -use tracing::{Instrument, Span, debug, error, info, info_span, instrument, trace, warn}; +use tracing::{Instrument, debug, error, info, info_span, instrument, trace, warn}; use tree_hash::TreeHash; use types::{Attestation, AttestationData, ChainSpec, CommitteeIndex, EthSpec, Slot}; use validator_store::{Error as ValidatorStoreError, ValidatorStore}; @@ -231,7 +231,7 @@ impl AttestationService AttestationService current_epoch { + return Err(format!( + "Attestation target epoch {} is higher than current epoch {}", + attestation_data.target.epoch, current_epoch + )); + } + + // Create attestations for each validator duty. + let mut attestations_to_sign = Vec::with_capacity(validator_duties.len()); + let mut validator_indices = Vec::with_capacity(validator_duties.len()); + + for duty_and_proof in validator_duties { + let duty = &duty_and_proof.duty; + + // Ensure that the attestation matches the duties. + if !duty.match_attestation_data::(&attestation_data, &self.chain_spec) { + crit!( + validator = ?duty.pubkey, + duty_slot = %duty.slot, + attestation_slot = %attestation_data.slot, + duty_index = duty.committee_index, + attestation_index = attestation_data.index, + "Inconsistent validator duties during signing" + ); + continue; + } - // Ensure that the attestation matches the duties. - if !duty.match_attestation_data::(attestation_data, &self.chain_spec) { + let attestation = match Attestation::empty_for_signing( + duty.committee_index, + duty.committee_length as usize, + attestation_data.slot, + attestation_data.beacon_block_root, + attestation_data.source, + attestation_data.target, + &self.chain_spec, + ) { + Ok(attestation) => attestation, + Err(err) => { crit!( validator = ?duty.pubkey, - duty_slot = %duty.slot, - attestation_slot = %attestation_data.slot, - duty_index = duty.committee_index, - attestation_index = attestation_data.index, - "Inconsistent validator duties during signing" + ?duty, + ?err, + "Invalid validator duties during signing" ); - return None; + continue; } + }; + + attestations_to_sign.push(( + duty.pubkey, + duty.validator_committee_index as usize, + attestation, + )); + validator_indices.push(duty.validator_index); + } - let mut attestation = match Attestation::empty_for_signing( - duty.committee_index, - duty.committee_length as usize, - attestation_data.slot, - attestation_data.beacon_block_root, - attestation_data.source, - attestation_data.target, - &self.chain_spec, - ) { - Ok(attestation) => attestation, - Err(err) => { - crit!( - validator = ?duty.pubkey, - ?duty, - ?err, - "Invalid validator duties during signing" - ); - return None; - } - }; - - match self - .validator_store - .sign_attestation( - duty.pubkey, - duty.validator_committee_index as usize, - &mut attestation, - current_epoch, - ) - .await - { - Ok(()) => Some((attestation, duty.validator_index)), - Err(ValidatorStoreError::UnknownPubkey(pubkey)) => { - // A pubkey can be missing when a validator was recently - // removed via the API. - warn!( - info = "a validator may have recently been removed from this VC", - pubkey = ?pubkey, - validator = ?duty.pubkey, - slot = slot.as_u64(), - "Missing pubkey for attestation" - ); - None - } - Err(e) => { - crit!( - error = ?e, - validator = ?duty.pubkey, - slot = slot.as_u64(), - "Failed to sign attestation" - ); - None - } - } - } - .instrument(Span::current()) - }); + if attestations_to_sign.is_empty() { + warn!("No valid attestations to sign"); + return Ok(()); + } - // Execute all the futures in parallel, collecting any successful results. - let (ref attestations, ref validator_indices): (Vec<_>, Vec<_>) = join_all(signing_futures) - .instrument(info_span!( - "sign_attestations", - count = validator_duties.len() - )) + // Sign and check all attestations (includes slashing protection). + let safe_attestations = self + .validator_store + .sign_attestations(attestations_to_sign) .await - .into_iter() - .flatten() - .unzip(); + .map_err(|e| format!("Failed to sign attestations: {e:?}"))?; - if attestations.is_empty() { + if safe_attestations.is_empty() { warn!("No attestations were published"); return Ok(()); } @@ -480,7 +460,10 @@ impl AttestationService(attestation_data.slot); + let safe_attestations = &safe_attestations; + // Post the attestations to the BN. + let validator_indices_ref = &validator_indices; match self .beacon_nodes .request(ApiTopic::Attestations, |beacon_node| async move { @@ -489,9 +472,9 @@ impl AttestationService Some(a), @@ -517,12 +500,12 @@ impl AttestationService info!( - count = attestations.len(), + count = safe_attestations.len(), validator_indices = ?validator_indices, head_block = ?attestation_data.beacon_block_root, committee_index = attestation_data.index, diff --git a/validator_client/validator_store/src/lib.rs b/validator_client/validator_store/src/lib.rs index 2b472799d24..3f79aa2c6d2 100644 --- a/validator_client/validator_store/src/lib.rs +++ b/validator_client/validator_store/src/lib.rs @@ -22,6 +22,7 @@ pub enum Error { GreaterThanCurrentEpoch { epoch: Epoch, current_epoch: Epoch }, UnableToSignAttestation(AttestationError), SpecificError(T), + ExecutorError, Middleware(String), } @@ -103,13 +104,14 @@ pub trait ValidatorStore: Send + Sync { current_slot: Slot, ) -> impl Future, Error>> + Send; - fn sign_attestation( - &self, - validator_pubkey: PublicKeyBytes, - validator_committee_position: usize, - attestation: &mut Attestation, - current_epoch: Epoch, - ) -> impl Future>> + Send; + /// Sign a batch of `attestations` and apply slashing protection to them. + /// + /// Only successfully signed attestations that pass slashing protection are returned. + #[allow(clippy::type_complexity)] + fn sign_attestations( + self: &Arc, + attestations: Vec<(PublicKeyBytes, usize, Attestation)>, + ) -> impl Future>, Error>> + Send; fn sign_validator_registration_data( &self,