diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index d69667f3de2..2f7c0be229f 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -38,6 +38,7 @@ use crate::{ metrics, observed_aggregates::{ObserveOutcome, ObservedAttestationKey}, observed_attesters::Error as ObservedAttestersError, + single_attestation::single_attestation_to_attestation, BeaconChain, BeaconChainError, BeaconChainTypes, }; use bls::verify_signature_sets; @@ -202,12 +203,6 @@ pub enum Error { /// /// The peer has sent an invalid message. NoCommitteeForSlotAndIndex { slot: Slot, index: CommitteeIndex }, - /// The unaggregated attestation doesn't have only one aggregation bit set. - /// - /// ## Peer scoring - /// - /// The peer has sent an invalid message. - NotExactlyOneAggregationBitSet(usize), /// The attestation doesn't have only one aggregation bit set. /// /// ## Peer scoring @@ -304,9 +299,9 @@ struct IndexedAggregatedAttestation<'a, T: BeaconChainTypes> { /// /// These attestations have *not* undergone signature verification. struct IndexedUnaggregatedAttestation<'a, T: BeaconChainTypes> { - attestation: AttestationRef<'a, T::EthSpec>, + attestation: &'a SingleAttestation, indexed_attestation: IndexedAttestation, - subnet_id: SubnetId, + subnet_id: Option, validator_index: u64, } @@ -323,12 +318,13 @@ impl VerifiedAggregatedAttestation<'_, T> { } } +#[derive(Clone)] /// Wraps an `Attestation` that has been fully verified for propagation on the gossip network. pub struct VerifiedUnaggregatedAttestation<'a, T: BeaconChainTypes> { - attestation: AttestationRef<'a, T::EthSpec>, + attestation: Attestation, + single_attestation: &'a SingleAttestation, indexed_attestation: IndexedAttestation, subnet_id: SubnetId, - validator_index: usize, } impl VerifiedUnaggregatedAttestation<'_, T> { @@ -336,13 +332,8 @@ impl VerifiedUnaggregatedAttestation<'_, T> { self.indexed_attestation } - pub fn single_attestation(&self) -> Option { - Some(SingleAttestation { - committee_index: self.attestation.committee_index()?, - attester_index: self.validator_index as u64, - data: self.attestation.data().clone(), - signature: self.attestation.signature().clone(), - }) + pub fn single_attestation(&self) -> SingleAttestation { + self.single_attestation.clone() } } @@ -386,7 +377,7 @@ impl VerifiedAttestation for VerifiedAggregatedAttestati impl VerifiedAttestation for VerifiedUnaggregatedAttestation<'_, T> { fn attestation(&self) -> AttestationRef { - self.attestation + self.attestation.to_ref() } fn indexed_attestation(&self) -> &IndexedAttestation { @@ -400,6 +391,8 @@ pub enum AttestationSlashInfo<'a, T: BeaconChainTypes, TErr> { SignatureNotChecked(AttestationRef<'a, T::EthSpec>, TErr), /// As for `SignatureNotChecked`, but we know the `IndexedAttestation`. SignatureNotCheckedIndexed(IndexedAttestation, TErr), + /// As for `SignatureNotChecked`, but for the `SingleAttestation`. + SignatureNotCheckedSingle(&'a SingleAttestation, TErr), /// The attestation's signature is invalid, so it will never be slashable. SignatureInvalid(TErr), /// The signature is valid but the attestation is invalid in some other way. @@ -438,6 +431,20 @@ fn process_slash_info( } } } + SignatureNotCheckedSingle(attestation, err) => { + if let Error::UnknownHeadBlock { .. } = err { + if attestation.data.beacon_block_root == attestation.data.target.root { + return err; + } + } + + let fork_name = chain + .spec + .fork_name_at_slot::(attestation.data.slot); + + let indexed_attestation = attestation.to_indexed(fork_name); + (indexed_attestation, true, err) + } SignatureNotCheckedIndexed(indexed, err) => (indexed, true, err), SignatureInvalid(e) => return e, SignatureValid(indexed, err) => (indexed, false, err), @@ -461,6 +468,7 @@ fn process_slash_info( match slash_info { SignatureNotChecked(_, e) | SignatureNotCheckedIndexed(_, e) + | SignatureNotCheckedSingle(_, e) | SignatureInvalid(e) | SignatureValid(_, e) => e, } @@ -561,7 +569,7 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> { // // Attestations must be for a known block. If the block is unknown, we simply drop the // attestation and do not delay consideration for later. - let head_block = verify_head_block_is_known(chain, attestation, None)?; + let head_block = verify_head_block_is_known(chain, attestation.data(), None)?; // Check the attestation target root is consistent with the head root. // @@ -570,7 +578,7 @@ impl<'a, T: BeaconChainTypes> IndexedAggregatedAttestation<'a, T> { // // Whilst this attestation *technically* could be used to add value to a block, it is // invalid in the spirit of the protocol. Here we choose safety over profit. - verify_attestation_target_root::(&head_block, attestation)?; + verify_attestation_target_root::(&head_block, attestation.data())?; // Ensure that the attestation has participants. if attestation.is_aggregation_bits_zero() { @@ -813,16 +821,16 @@ impl<'a, T: BeaconChainTypes> VerifiedAggregatedAttestation<'a, T> { impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> { /// Run the checks that happen before an indexed attestation is constructed. pub fn verify_early_checks( - attestation: AttestationRef, + attestation: &'a SingleAttestation, chain: &BeaconChain, ) -> Result<(), Error> { - let attestation_epoch = attestation.data().slot.epoch(T::EthSpec::slots_per_epoch()); + let attestation_epoch = attestation.data.slot.epoch(T::EthSpec::slots_per_epoch()); // Check the attestation's epoch matches its target. - if attestation_epoch != attestation.data().target.epoch { + if attestation_epoch != attestation.data.target.epoch { return Err(Error::InvalidTargetEpoch { - slot: attestation.data().slot, - epoch: attestation.data().target.epoch, + slot: attestation.data.slot, + epoch: attestation.data.target.epoch, }); } @@ -832,61 +840,44 @@ impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> { // We do not queue future attestations for later processing. verify_propagation_slot_range::<_, T::EthSpec>( &chain.slot_clock, - attestation.data(), + &attestation.data, &chain.spec, )?; - // Check to ensure that the attestation is "unaggregated". I.e., it has exactly one - // aggregation bit set. - let num_aggregation_bits = attestation.num_set_aggregation_bits(); - if num_aggregation_bits != 1 { - return Err(Error::NotExactlyOneAggregationBitSet(num_aggregation_bits)); + let fork_name = chain + .spec + .fork_name_at_slot::(attestation.data.slot); + if fork_name.electra_enabled() { + // [New in Electra:EIP7549] + if attestation.data.index != 0 { + return Err(Error::CommitteeIndexNonZero( + attestation.data.index as usize, + )); + } } - // [New in Electra:EIP7549] - verify_committee_index(attestation)?; - // Attestations must be for a known block. If the block is unknown, we simply drop the // attestation and do not delay consideration for later. // // Enforce a maximum skip distance for unaggregated attestations. - let head_block = - verify_head_block_is_known(chain, attestation, chain.config.import_max_skip_slots)?; + let head_block = verify_head_block_is_known( + chain, + &attestation.data, + chain.config.import_max_skip_slots, + )?; // Check the attestation target root is consistent with the head root. - verify_attestation_target_root::(&head_block, attestation)?; + verify_attestation_target_root::(&head_block, &attestation.data)?; Ok(()) } /// Run the checks that apply to the indexed attestation before the signature is checked. pub fn verify_middle_checks( - attestation: AttestationRef, - indexed_attestation: &IndexedAttestation, - committees_per_slot: u64, - subnet_id: Option, + attestation: &'a SingleAttestation, chain: &BeaconChain, - ) -> Result<(u64, SubnetId), Error> { - let expected_subnet_id = SubnetId::compute_subnet_for_attestation::( - attestation, - committees_per_slot, - &chain.spec, - ) - .map_err(BeaconChainError::from)?; - - // If a subnet was specified, ensure that subnet is correct. - if let Some(subnet_id) = subnet_id { - if subnet_id != expected_subnet_id { - return Err(Error::InvalidSubnetId { - received: subnet_id, - expected: expected_subnet_id, - }); - } - }; - - let validator_index = *indexed_attestation - .attesting_indices_first() - .ok_or(Error::NotExactlyOneAggregationBitSet(0))?; + ) -> Result { + let validator_index = attestation.attester_index; /* * The attestation is the first valid attestation received for the participating validator @@ -895,16 +886,16 @@ impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> { if chain .observed_gossip_attesters .read() - .validator_has_been_observed(attestation.data().target.epoch, validator_index as usize) + .validator_has_been_observed(attestation.data.target.epoch, validator_index as usize) .map_err(BeaconChainError::from)? { return Err(Error::PriorAttestationKnown { validator_index, - epoch: attestation.data().target.epoch, + epoch: attestation.data.target.epoch, }); } - Ok((validator_index, expected_subnet_id)) + Ok(validator_index) } /// Returns `Ok(Self)` if the `attestation` is valid to be (re)published on the gossip @@ -913,11 +904,11 @@ impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> { /// `subnet_id` is the subnet from which we received this attestation. This function will /// verify that it was received on the correct subnet. pub fn verify( - attestation: &'a Attestation, + attestation: &'a SingleAttestation, subnet_id: Option, chain: &BeaconChain, ) -> Result { - Self::verify_slashable(attestation.to_ref(), subnet_id, chain) + Self::verify_slashable(attestation, subnet_id, chain) .inspect(|verified_unaggregated| { if let Some(slasher) = chain.slasher.as_ref() { slasher.accept_attestation(verified_unaggregated.indexed_attestation.clone()); @@ -928,31 +919,23 @@ impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> { /// Verify the attestation, producing extra information about whether it might be slashable. pub fn verify_slashable( - attestation: AttestationRef<'a, T::EthSpec>, + attestation: &'a SingleAttestation, subnet_id: Option, chain: &BeaconChain, ) -> Result> { use AttestationSlashInfo::*; if let Err(e) = Self::verify_early_checks(attestation, chain) { - return Err(SignatureNotChecked(attestation, e)); + return Err(SignatureNotCheckedSingle(attestation, e)); } - let (indexed_attestation, committees_per_slot) = - match obtain_indexed_attestation_and_committees_per_slot(chain, attestation) { - Ok(x) => x, - Err(e) => { - return Err(SignatureNotChecked(attestation, e)); - } - }; + let fork_name = chain + .spec + .fork_name_at_slot::(attestation.data.slot); - let (validator_index, expected_subnet_id) = match Self::verify_middle_checks( - attestation, - &indexed_attestation, - committees_per_slot, - subnet_id, - chain, - ) { + let indexed_attestation = attestation.to_indexed(fork_name); + + let validator_index = match Self::verify_middle_checks(attestation, chain) { Ok(t) => t, Err(e) => return Err(SignatureNotCheckedIndexed(indexed_attestation, e)), }; @@ -960,7 +943,7 @@ impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> { Ok(Self { attestation, indexed_attestation, - subnet_id: expected_subnet_id, + subnet_id, validator_index, }) } @@ -977,10 +960,55 @@ impl<'a, T: BeaconChainTypes> IndexedUnaggregatedAttestation<'a, T> { impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> { /// Run the checks that apply after the signature has been checked. fn verify_late_checks( - attestation: AttestationRef, + attestation: &'a SingleAttestation, validator_index: u64, + subnet_id: Option, chain: &BeaconChain, - ) -> Result<(), Error> { + ) -> Result<(Attestation, SubnetId), Error> { + // Check that the attester is a member of the committee + let (committee_opt, committees_per_slot) = chain.with_committee_cache( + attestation.data.target.root, + attestation.data.slot.epoch(T::EthSpec::slots_per_epoch()), + |committee_cache, _| { + let committee_opt = committee_cache + .get_beacon_committee(attestation.data.slot, attestation.committee_index) + .map(|beacon_committee| beacon_committee.committee.to_vec()); + + Ok((committee_opt, committee_cache.committees_per_slot())) + }, + )?; + + let Some(committee) = committee_opt else { + return Err(Error::NoCommitteeForSlotAndIndex { + slot: attestation.data.slot, + index: attestation.committee_index, + }); + }; + + if !committee.contains(&(attestation.attester_index as usize)) { + return Err(Error::AttesterNotInCommittee { + attester_index: attestation.attester_index, + committee_index: attestation.committee_index, + slot: attestation.data.slot, + }); + } + + let expected_subnet_id = SubnetId::compute_subnet_for_single_attestation::( + attestation, + committees_per_slot, + &chain.spec, + ) + .map_err(BeaconChainError::from)?; + + // If a subnet was specified, ensure that subnet is correct. + if let Some(subnet_id) = subnet_id { + if subnet_id != expected_subnet_id { + return Err(Error::InvalidSubnetId { + received: subnet_id, + expected: expected_subnet_id, + }); + } + }; // Now that the attestation has been fully verified, store that we have received a valid // attestation from this validator. // @@ -990,20 +1018,28 @@ impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> { if chain .observed_gossip_attesters .write() - .observe_validator(attestation.data().target.epoch, validator_index as usize) + .observe_validator(attestation.data.target.epoch, validator_index as usize) .map_err(BeaconChainError::from)? { return Err(Error::PriorAttestationKnown { validator_index, - epoch: attestation.data().target.epoch, + epoch: attestation.data.target.epoch, }); } - Ok(()) + + let fork_name = chain + .spec + .fork_name_at_slot::(attestation.data.slot); + + let unaggregated_attestation = + single_attestation_to_attestation(attestation, &committee, fork_name)?; + + Ok((unaggregated_attestation, expected_subnet_id)) } /// Verify the `unaggregated_attestation`. pub fn verify( - unaggregated_attestation: &'a Attestation, + unaggregated_attestation: &'a SingleAttestation, subnet_id: Option, chain: &BeaconChain, ) -> Result { @@ -1054,15 +1090,17 @@ impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> { CheckAttestationSignature::No => (), }; - if let Err(e) = Self::verify_late_checks(attestation, validator_index, chain) { - return Err(SignatureValid(indexed_attestation, e)); - } + let (unaggregated_attestation, subnet_id) = + match Self::verify_late_checks(attestation, validator_index, subnet_id, chain) { + Ok(a) => a, + Err(e) => return Err(SignatureValid(indexed_attestation, e)), + }; Ok(Self { - attestation, + single_attestation: attestation, + attestation: unaggregated_attestation, indexed_attestation, subnet_id, - validator_index: validator_index as usize, }) } @@ -1071,11 +1109,6 @@ impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> { self.subnet_id } - /// Returns the wrapped `attestation`. - pub fn attestation(&self) -> AttestationRef { - self.attestation - } - /// Returns the wrapped `indexed_attestation`. pub fn indexed_attestation(&self) -> &IndexedAttestation { &self.indexed_attestation @@ -1102,40 +1135,40 @@ impl<'a, T: BeaconChainTypes> VerifiedUnaggregatedAttestation<'a, T> { /// already finalized. fn verify_head_block_is_known( chain: &BeaconChain, - attestation: AttestationRef, + attestation_data: &AttestationData, max_skip_slots: Option, ) -> Result { let block_opt = chain .canonical_head .fork_choice_read_lock() - .get_block(&attestation.data().beacon_block_root) + .get_block(&attestation_data.beacon_block_root) .or_else(|| { chain .early_attester_cache - .get_proto_block(attestation.data().beacon_block_root) + .get_proto_block(attestation_data.beacon_block_root) }); if let Some(block) = block_opt { // Reject any block that exceeds our limit on skipped slots. if let Some(max_skip_slots) = max_skip_slots { - if attestation.data().slot > block.slot + max_skip_slots { + if attestation_data.slot > block.slot + max_skip_slots { return Err(Error::TooManySkippedSlots { head_block_slot: block.slot, - attestation_slot: attestation.data().slot, + attestation_slot: attestation_data.slot, }); } } - if !verify_attestation_is_finalized_checkpoint_or_descendant(attestation.data(), chain) { + if !verify_attestation_is_finalized_checkpoint_or_descendant(attestation_data, chain) { return Err(Error::HeadBlockFinalized { - beacon_block_root: attestation.data().beacon_block_root, + beacon_block_root: attestation_data.beacon_block_root, }); } Ok(block) - } else if chain.is_pre_finalization_block(attestation.data().beacon_block_root)? { + } else if chain.is_pre_finalization_block(attestation_data.beacon_block_root)? { Err(Error::HeadBlockFinalized { - beacon_block_root: attestation.data().beacon_block_root, + beacon_block_root: attestation_data.beacon_block_root, }) } else { // The block is either: @@ -1145,7 +1178,7 @@ fn verify_head_block_is_known( // 2) A post-finalization block that we don't know about yet. We'll queue // the attestation until the block becomes available (or we time out). Err(Error::UnknownHeadBlock { - beacon_block_root: attestation.data().beacon_block_root, + beacon_block_root: attestation_data.beacon_block_root, }) } } @@ -1237,11 +1270,11 @@ pub fn verify_attestation_signature( /// `attestation.data.beacon_block_root`. pub fn verify_attestation_target_root( head_block: &ProtoBlock, - attestation: AttestationRef, + attestation_data: &AttestationData, ) -> Result<(), Error> { // Check the attestation target root. let head_block_epoch = head_block.slot.epoch(E::slots_per_epoch()); - let attestation_epoch = attestation.data().slot.epoch(E::slots_per_epoch()); + let attestation_epoch = attestation_data.slot.epoch(E::slots_per_epoch()); if head_block_epoch > attestation_epoch { // The epoch references an invalid head block from a future epoch. // @@ -1254,7 +1287,7 @@ pub fn verify_attestation_target_root( // Reference: // https://github.com/ethereum/eth2.0-specs/pull/2001#issuecomment-699246659 return Err(Error::InvalidTargetRoot { - attestation: attestation.data().target.root, + attestation: attestation_data.target.root, // It is not clear what root we should expect in this case, since the attestation is // fundamentally invalid. expected: None, @@ -1273,9 +1306,9 @@ pub fn verify_attestation_target_root( }; // Reject any attestation with an invalid target root. - if target_root != attestation.data().target.root { + if target_root != attestation_data.target.root { return Err(Error::InvalidTargetRoot { - attestation: attestation.data().target.root, + attestation: attestation_data.target.root, expected: Some(target_root), }); } diff --git a/beacon_node/beacon_chain/src/attestation_verification/batch.rs b/beacon_node/beacon_chain/src/attestation_verification/batch.rs index 5f856140ba8..266279432ef 100644 --- a/beacon_node/beacon_chain/src/attestation_verification/batch.rs +++ b/beacon_node/beacon_chain/src/attestation_verification/batch.rs @@ -136,7 +136,7 @@ pub fn batch_verify_unaggregated_attestations<'a, T, I>( ) -> Result, Error>>, Error> where T: BeaconChainTypes, - I: Iterator, Option)> + ExactSizeIterator, + I: Iterator)> + ExactSizeIterator, { let mut num_partially_verified = 0; let mut num_failed = 0; diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index 990f4b6099c..ae985251f3c 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -2053,7 +2053,7 @@ impl BeaconChain { AttestationError, > where - I: Iterator, Option)> + ExactSizeIterator, + I: Iterator)> + ExactSizeIterator, { batch_verify_unaggregated_attestations(attestations, self) } @@ -2065,7 +2065,7 @@ impl BeaconChain { /// aggregation bit set. pub fn verify_unaggregated_attestation_for_gossip<'a>( &self, - unaggregated_attestation: &'a Attestation, + unaggregated_attestation: &'a SingleAttestation, subnet_id: Option, ) -> Result, AttestationError> { metrics::inc_counter(&metrics::UNAGGREGATED_ATTESTATION_PROCESSING_REQUESTS); @@ -2081,13 +2081,9 @@ impl BeaconChain { .spec .fork_name_at_slot::(v.attestation().data().slot); if current_fork.electra_enabled() { - // I don't see a situation where this could return None. The upstream unaggregated attestation checks - // should have already verified that this is an attestation with a single committee bit set. - if let Some(single_attestation) = v.single_attestation() { - event_handler.register(EventKind::SingleAttestation(Box::new( - single_attestation, - ))); - } + event_handler.register(EventKind::SingleAttestation(Box::new( + v.single_attestation(), + ))); } } diff --git a/beacon_node/beacon_chain/src/single_attestation.rs b/beacon_node/beacon_chain/src/single_attestation.rs index fa4f98bb070..33a093687e5 100644 --- a/beacon_node/beacon_chain/src/single_attestation.rs +++ b/beacon_node/beacon_chain/src/single_attestation.rs @@ -1,9 +1,13 @@ use crate::attestation_verification::Error; -use types::{Attestation, AttestationElectra, BitList, BitVector, EthSpec, SingleAttestation}; +use types::{ + Attestation, AttestationBase, AttestationElectra, BitList, BitVector, EthSpec, ForkName, + SingleAttestation, +}; pub fn single_attestation_to_attestation( single_attestation: &SingleAttestation, committee: &[usize], + fork_name: ForkName, ) -> Result, Error> { let attester_index = single_attestation.attester_index; let committee_index = single_attestation.committee_index; @@ -24,23 +28,33 @@ pub fn single_attestation_to_attestation( slot, })?; - let mut committee_bits: BitVector = BitVector::default(); - committee_bits - .set(committee_index as usize, true) - .map_err(|e| Error::Invalid(e.into()))?; + if fork_name.electra_enabled() { + let mut committee_bits: BitVector = BitVector::default(); + committee_bits + .set(committee_index as usize, true) + .map_err(|e| Error::Invalid(e.into()))?; - let mut aggregation_bits = - BitList::with_capacity(committee.len()).map_err(|e| Error::Invalid(e.into()))?; - aggregation_bits - .set(aggregation_bit, true) - .map_err(|e| Error::Invalid(e.into()))?; - - // TODO(electra): consider eventually allowing conversion to non-Electra attestations as well - // to maintain invertability (`Attestation` -> `SingleAttestation` -> `Attestation`). - Ok(Attestation::Electra(AttestationElectra { - aggregation_bits, - committee_bits, - data: single_attestation.data.clone(), - signature: single_attestation.signature.clone(), - })) + let mut aggregation_bits = + BitList::with_capacity(committee.len()).map_err(|e| Error::Invalid(e.into()))?; + aggregation_bits + .set(aggregation_bit, true) + .map_err(|e| Error::Invalid(e.into()))?; + Ok(Attestation::Electra(AttestationElectra { + aggregation_bits, + committee_bits, + data: single_attestation.data.clone(), + signature: single_attestation.signature.clone(), + })) + } else { + let mut aggregation_bits = + BitList::with_capacity(committee.len()).map_err(|e| Error::Invalid(e.into()))?; + aggregation_bits + .set(aggregation_bit, true) + .map_err(|e| Error::Invalid(e.into()))?; + Ok(Attestation::Base(AttestationBase { + aggregation_bits, + data: single_attestation.data.clone(), + signature: single_attestation.signature.clone(), + })) + } } diff --git a/beacon_node/beacon_chain/src/test_utils.rs b/beacon_node/beacon_chain/src/test_utils.rs index 3ee8c7ae3f9..441f72da3dd 100644 --- a/beacon_node/beacon_chain/src/test_utils.rs +++ b/beacon_node/beacon_chain/src/test_utils.rs @@ -1123,9 +1123,14 @@ where attn.aggregation_bits .set(aggregation_bit_index, true) .unwrap(); - attn + Attestation::Electra(attn) + } + Attestation::Base(mut attn) => { + attn.aggregation_bits + .set(aggregation_bit_index, true) + .unwrap(); + Attestation::Base(attn) } - Attestation::Base(_) => panic!("Must be an Electra attestation"), }; let aggregation_bits = attestation.get_aggregation_bits(); @@ -1153,8 +1158,10 @@ where let single_attestation = attestation.to_single_attestation_with_attester_index(attester_index as u64)?; + let fork_name = self.spec.fork_name_at_slot::(attestation.data().slot); let attestation: Attestation = - single_attestation_to_attestation(&single_attestation, committee.committee).unwrap(); + single_attestation_to_attestation(&single_attestation, committee.committee, fork_name) + .unwrap(); assert_eq!( single_attestation.committee_index, @@ -2424,7 +2431,11 @@ where }) } - pub fn process_attestations(&self, attestations: HarnessAttestations) { + pub fn process_attestations( + &self, + attestations: HarnessAttestations, + state: &BeaconState, + ) { let num_validators = self.validator_keypairs.len(); let mut unaggregated = Vec::with_capacity(num_validators); // This is an over-allocation, but it should be fine. It won't be *that* memory hungry and @@ -2433,7 +2444,35 @@ where for (unaggregated_attestations, maybe_signed_aggregate) in attestations.iter() { for (attn, subnet) in unaggregated_attestations { - unaggregated.push((attn, Some(*subnet))); + let aggregation_bits = attn.get_aggregation_bits(); + + if aggregation_bits.len() != 1 { + panic!("Must be an unaggregated attestation") + } + + let aggregation_bit = *aggregation_bits.first().unwrap(); + + let committee = state + .get_beacon_committee(attn.data().slot, attn.committee_index().unwrap()) + .unwrap(); + + let attester_index = committee + .committee + .iter() + .enumerate() + .find_map(|(i, &index)| { + if aggregation_bit as usize == i { + return Some(index); + } + None + }) + .unwrap(); + + let single_attestation = attn + .to_single_attestation_with_attester_index(attester_index as u64) + .unwrap(); + + unaggregated.push((single_attestation, Some(*subnet))); } if let Some(a) = maybe_signed_aggregate { @@ -2443,7 +2482,9 @@ where for result in self .chain - .batch_verify_unaggregated_attestations_for_gossip(unaggregated.into_iter()) + .batch_verify_unaggregated_attestations_for_gossip( + unaggregated.iter().map(|(attn, subnet)| (attn, *subnet)), + ) .unwrap() { let verified = result.unwrap(); @@ -2510,7 +2551,7 @@ where ) { let attestations = self.make_attestations(validators, state, state_root, block_hash, block.slot()); - self.process_attestations(attestations); + self.process_attestations(attestations, state); } pub fn sync_committee_sign_block( diff --git a/beacon_node/beacon_chain/tests/attestation_verification.rs b/beacon_node/beacon_chain/tests/attestation_verification.rs index 30eec539fcb..11729f8d8a8 100644 --- a/beacon_node/beacon_chain/tests/attestation_verification.rs +++ b/beacon_node/beacon_chain/tests/attestation_verification.rs @@ -8,24 +8,22 @@ use beacon_chain::test_utils::{MakeAttestationOptions, HARNESS_GENESIS_TIME}; use beacon_chain::{ attestation_verification::Error as AttnError, test_utils::{ - test_spec, AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType, + single_attestation_to_attestation, test_spec, AttestationStrategy, BeaconChainHarness, + BlockStrategy, EphemeralHarnessType, }, BeaconChain, BeaconChainError, BeaconChainTypes, ChainConfig, WhenSlotSkipped, }; use genesis::{interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH}; use int_to_bytes::int_to_bytes32; -use ssz_types::BitVector; -use state_processing::{ - per_block_processing::errors::AttestationValidationError, per_slot_processing, -}; +use state_processing::per_slot_processing; use std::sync::{Arc, LazyLock}; use tree_hash::TreeHash; use types::{ signed_aggregate_and_proof::SignedAggregateAndProofRefMut, test_utils::generate_deterministic_keypair, Address, AggregateSignature, Attestation, - AttestationRef, AttestationRefMut, BeaconStateError, BitList, ChainSpec, Epoch, EthSpec, - FixedBytesExtended, ForkName, Hash256, Keypair, MainnetEthSpec, SecretKey, SelectionProof, - SignedAggregateAndProof, Slot, SubnetId, Unsigned, + AttestationRef, ChainSpec, Epoch, EthSpec, FixedBytesExtended, ForkName, Hash256, Keypair, + MainnetEthSpec, SecretKey, SelectionProof, SignedAggregateAndProof, SingleAttestation, Slot, + SubnetId, Unsigned, }; pub type E = MainnetEthSpec; @@ -122,7 +120,7 @@ fn get_harness_capella_spec( /// Also returns some info about who created it. fn get_valid_unaggregated_attestation( chain: &BeaconChain, -) -> (Attestation, usize, usize, SecretKey, SubnetId) { +) -> (SingleAttestation, SecretKey, SubnetId) { let head = chain.head_snapshot(); let current_slot = chain.slot().expect("should get slot"); @@ -156,8 +154,15 @@ fn get_valid_unaggregated_attestation( ) .expect("should sign attestation"); - let subnet_id = SubnetId::compute_subnet_for_attestation::( - valid_attestation.to_ref(), + let single_attestation = SingleAttestation { + committee_index: valid_attestation.committee_index().unwrap(), + attester_index: validator_index as u64, + data: valid_attestation.data().clone(), + signature: valid_attestation.signature().clone(), + }; + + let subnet_id = SubnetId::compute_subnet_for_single_attestation::( + &single_attestation, head.beacon_state .get_committee_count_at_slot(current_slot) .expect("should get committee count"), @@ -165,13 +170,7 @@ fn get_valid_unaggregated_attestation( ) .expect("should get subnet_id"); - ( - valid_attestation, - validator_index, - validator_committee_index, - validator_sk, - subnet_id, - ) + (single_attestation, validator_sk, subnet_id) } fn get_valid_aggregated_attestation( @@ -275,15 +274,13 @@ struct GossipTester { /* * Valid unaggregated attestation */ - valid_attestation: Attestation, - attester_validator_index: usize, - attester_committee_index: usize, + valid_attestation: SingleAttestation, attester_sk: SecretKey, attestation_subnet_id: SubnetId, /* * Valid unaggregated attestation for batch testing */ - invalid_attestation: Attestation, + invalid_attestation: SingleAttestation, /* * Valid aggregate */ @@ -312,22 +309,33 @@ impl GossipTester { // Advance into a slot where there have not been blocks or attestations produced. harness.advance_slot(); - let ( - valid_attestation, - attester_validator_index, - attester_committee_index, - attester_sk, - attestation_subnet_id, - ) = get_valid_unaggregated_attestation(&harness.chain); + let (valid_attestation, attester_sk, attestation_subnet_id) = + get_valid_unaggregated_attestation(&harness.chain); + + let head = harness.chain.head_snapshot(); + let state = &head.beacon_state; + let committee = state + .get_beacon_committee( + valid_attestation.data.slot, + valid_attestation.committee_index, + ) + .unwrap(); + let fork_name = harness + .chain + .spec + .fork_name_at_slot::(valid_attestation.data.slot); + let valid_aggregate_attestation = + single_attestation_to_attestation(&valid_attestation, committee.committee, fork_name) + .unwrap(); let (valid_aggregate, aggregator_validator_index, aggregator_sk) = - get_valid_aggregated_attestation(&harness.chain, valid_attestation.clone()); + get_valid_aggregated_attestation(&harness.chain, valid_aggregate_attestation.clone()); let mut invalid_attestation = valid_attestation.clone(); - invalid_attestation.data_mut().beacon_block_root = Hash256::repeat_byte(13); + invalid_attestation.data.beacon_block_root = Hash256::repeat_byte(13); let (mut invalid_aggregate, _, _) = - get_valid_aggregated_attestation(&harness.chain, invalid_attestation.clone()); + get_valid_aggregated_attestation(&harness.chain, valid_aggregate_attestation.clone()); match invalid_aggregate.to_mut() { SignedAggregateAndProofRefMut::Base(att) => { @@ -341,8 +349,6 @@ impl GossipTester { Self { harness, valid_attestation, - attester_validator_index, - attester_committee_index, attester_sk, attestation_subnet_id, invalid_attestation, @@ -467,12 +473,12 @@ impl GossipTester { pub fn inspect_unaggregate_err(self, desc: &str, get_attn: G, inspect_err: I) -> Self where - G: Fn(&Self, &mut Attestation, &mut SubnetId), + G: Fn(&Self, &mut SingleAttestation, &mut SubnetId, &ChainSpec), I: Fn(&Self, AttnError), { let mut attn = self.valid_attestation.clone(); let mut subnet_id = self.attestation_subnet_id; - get_attn(&self, &mut attn, &mut subnet_id); + get_attn(&self, &mut attn, &mut subnet_id, &self.harness.spec); /* * Individual verification @@ -912,32 +918,20 @@ async fn unaggregated_gossip_verification() { */ .inspect_unaggregate_err( "attestation with invalid committee index", - |tester, a, _| { - match a.to_mut() { - AttestationRefMut::Base(attn) => { - attn.data.index = tester - .harness - .chain - .head_snapshot() - .beacon_state - .get_committee_count_at_slot(attn.data.slot) - .unwrap(); - } - AttestationRefMut::Electra(attn) => { - let committee_index = tester - .harness - .chain - .head_snapshot() - .beacon_state - .get_committee_count_at_slot(attn.data.slot) - .unwrap(); - // overwrite the existing committee bits before setting - attn.committee_bits = BitVector::default(); - attn.committee_bits.set(committee_index as usize, true).unwrap(); - } - } + |tester, a, _, _| { + let committee_index = tester + .harness + .chain + .head_snapshot() + .beacon_state + .get_committee_count_at_slot(a.data.slot) + .unwrap(); + + a.committee_index = committee_index; + }, + |_, err| { + assert!(matches!(err, AttnError::NoCommitteeForSlotAndIndex { .. })) }, - |_, err| assert!(matches!(err, AttnError::NoCommitteeForSlotAndIndex { .. })), ) /* * The following test ensures: @@ -946,8 +940,8 @@ async fn unaggregated_gossip_verification() { * attestation.data.slot, attestation.data.index) == subnet_id). */ .inspect_unaggregate_err( - "attestation with invalid committee index", - |_, _, subnet_id| *subnet_id = SubnetId::new(42), + "attestation with invalid subnet_id", + |_, _, subnet_id, _| *subnet_id = SubnetId::new(42), |tester, err| { assert!(matches!( err, @@ -969,7 +963,7 @@ async fn unaggregated_gossip_verification() { */ .inspect_unaggregate_err( "attestation from future slot", - |tester, a, _| a.data_mut().slot = tester.slot() + 1, + |tester, a, _, _| a.data.slot = tester.slot() + 1, |tester, err| { assert!(matches!( err, @@ -983,10 +977,10 @@ async fn unaggregated_gossip_verification() { ) .inspect_unaggregate_err( "attestation from past slot", - |tester, a, _| { + |tester, a, _, _| { let too_early_slot = tester.earliest_valid_attestation_slot() - 1; - a.data_mut().slot = too_early_slot; - a.data_mut().target.epoch = too_early_slot.epoch(E::slots_per_epoch()); + a.data.slot = too_early_slot; + a.data.target.epoch = too_early_slot.epoch(E::slots_per_epoch()); }, |tester, err| { let valid_early_slot = tester.earliest_valid_attestation_slot(); @@ -1010,7 +1004,7 @@ async fn unaggregated_gossip_verification() { */ .inspect_unaggregate_err( "attestation with invalid target epoch", - |_, a, _| a.data_mut().target.epoch += 1, + |_, a, _, _| a.data.target.epoch += 1, |_, err| { assert!(matches!( err, @@ -1018,104 +1012,6 @@ async fn unaggregated_gossip_verification() { )) }, ) - /* - * The following two tests ensure: - * - * The attestation is unaggregated -- that is, it has exactly one participating validator - * (len([bit for bit in attestation.aggregation_bits if bit == 0b1]) == 1). - */ - .inspect_unaggregate_err( - "attestation without any aggregation bits set", - |tester, mut a, _| { - match &mut a { - Attestation::Base(ref mut att) => { - att.aggregation_bits - .set(tester.attester_committee_index, false) - .expect("should unset aggregation bit"); - assert_eq!( - att.aggregation_bits.num_set_bits(), - 0, - "test requires no set bits" - ); - } - Attestation::Electra(ref mut att) => { - att.aggregation_bits - .set(tester.attester_committee_index, false) - .expect("should unset aggregation bit"); - assert_eq!( - att.aggregation_bits.num_set_bits(), - 0, - "test requires no set bits" - ); - } - } - }, - |_, err| { - assert!(matches!( - err, - AttnError::NotExactlyOneAggregationBitSet(0) - )) - }, - ) - .inspect_unaggregate_err( - "attestation with two aggregation bits set", - |tester, mut a, _| { - match &mut a { - Attestation::Base(ref mut att) => { - att.aggregation_bits - .set(tester.attester_committee_index + 1, true) - .expect("should set second aggregation bit"); - } - Attestation::Electra(ref mut att) => { - att.aggregation_bits - .set(tester.attester_committee_index + 1, true) - .expect("should set second aggregation bit"); - } - } - }, - |_, err| { - assert!(matches!( - err, - AttnError::NotExactlyOneAggregationBitSet(2) - )) - }, - ) - /* - * The following test ensures: - * - * The number of aggregation bits matches the committee size -- i.e. - * `len(attestation.aggregation_bits) == len(get_beacon_committee(state, data.slot, - * data.index))`. - */ - .inspect_unaggregate_err( - "attestation with invalid bitfield", - |_, mut a, _| { - match &mut a { - Attestation::Base(ref mut att) => { - let bits = att.aggregation_bits.iter().collect::>(); - att.aggregation_bits = BitList::with_capacity(bits.len() + 1).unwrap(); - for (i, bit) in bits.into_iter().enumerate() { - att.aggregation_bits.set(i, bit).unwrap(); - } - } - Attestation::Electra(ref mut att) => { - let bits = att.aggregation_bits.iter().collect::>(); - att.aggregation_bits = BitList::with_capacity(bits.len() + 1).unwrap(); - for (i, bit) in bits.into_iter().enumerate() { - att.aggregation_bits.set(i, bit).unwrap(); - } - } - } - }, - |_, err| { - assert!(matches!( - err, - AttnError::Invalid(AttestationValidationError::BeaconStateError( - BeaconStateError::InvalidBitfield - )) - )) - }, - ) /* * The following test ensures that: * @@ -1123,8 +1019,8 @@ async fn unaggregated_gossip_verification() { */ .inspect_unaggregate_err( "attestation with unknown head block", - |_, a, _| { - a.data_mut().beacon_block_root = Hash256::repeat_byte(42); + |_, a, _, _| { + a.data.beacon_block_root = Hash256::repeat_byte(42); }, |_, err| { assert!(matches!( @@ -1145,8 +1041,8 @@ async fn unaggregated_gossip_verification() { */ .inspect_unaggregate_err( "attestation with invalid target root", - |_, a, _| { - a.data_mut().target.root = Hash256::repeat_byte(42); + |_, a, _, _| { + a.data.target.root = Hash256::repeat_byte(42); }, |_, err| { assert!(matches!( @@ -1162,10 +1058,10 @@ async fn unaggregated_gossip_verification() { */ .inspect_unaggregate_err( "attestation with bad signature", - |tester, a, _| { + |tester, a, _, _| { let mut agg_sig = AggregateSignature::infinity(); agg_sig.add_assign(&tester.attester_sk.sign(Hash256::repeat_byte(42))); - *a.signature_mut() = agg_sig; + a.signature = agg_sig; }, |_, err| { assert!(matches!( @@ -1186,7 +1082,7 @@ async fn unaggregated_gossip_verification() { */ .inspect_unaggregate_err( "attestation that has already been seen", - |_, _, _| {}, + |_, _, _, _| {}, |tester, err| { assert!(matches!( err, @@ -1194,7 +1090,7 @@ async fn unaggregated_gossip_verification() { validator_index, epoch, } - if validator_index == tester.attester_validator_index as u64 && epoch == tester.epoch() + if validator_index == tester.valid_attestation.attester_index && epoch == tester.epoch() )) }, ); @@ -1243,7 +1139,7 @@ async fn attestation_that_skips_epochs() { let state_root = state.update_tree_hash_cache().unwrap(); let (attestation, subnet_id) = harness - .get_unaggregated_attestations( + .get_single_attestations( &AttestationStrategy::AllValidators, &state, state_root, @@ -1256,7 +1152,7 @@ async fn attestation_that_skips_epochs() { .cloned() .expect("should have at least one attestation in committee"); - let block_root = attestation.data().beacon_block_root; + let block_root = attestation.data.beacon_block_root; let block_slot = harness .chain .store @@ -1267,7 +1163,7 @@ async fn attestation_that_skips_epochs() { .slot(); assert!( - attestation.data().slot - block_slot > E::slots_per_epoch() * 2, + attestation.data.slot - block_slot > E::slots_per_epoch() * 2, "the attestation must skip more than two epochs" ); @@ -1357,7 +1253,7 @@ async fn attestation_validator_receive_proposer_reward_and_withdrawals() { // Verifying the attestation triggers an inconsistent state replay. let remaining_attesters = (two_thirds..VALIDATOR_COUNT).collect(); let (attestation, subnet_id) = harness - .get_unaggregated_attestations( + .get_single_attestations( &AttestationStrategy::SomeValidators(remaining_attesters), &state, state_root, @@ -1426,7 +1322,7 @@ async fn attestation_to_finalized_block() { let state_root = state.update_tree_hash_cache().unwrap(); let (attestation, subnet_id) = harness - .get_unaggregated_attestations( + .get_single_attestations( &AttestationStrategy::AllValidators, &state, state_root, @@ -1438,7 +1334,7 @@ async fn attestation_to_finalized_block() { .first() .cloned() .expect("should have at least one attestation in committee"); - assert_eq!(attestation.data().beacon_block_root, earlier_block_root); + assert_eq!(attestation.data.beacon_block_root, earlier_block_root); // Attestation should be rejected for attesting to a pre-finalization block. let res = harness @@ -1481,8 +1377,23 @@ async fn verify_aggregate_for_gossip_doppelganger_detection() { "the test requires a new epoch to avoid already-seen errors" ); - let (valid_attestation, _attester_index, _attester_committee_index, _, _) = - get_valid_unaggregated_attestation(&harness.chain); + let (valid_attestation, _, _) = get_valid_unaggregated_attestation(&harness.chain); + + let head = harness.chain.head_snapshot(); + let state = &head.beacon_state; + let committee = state + .get_beacon_committee( + valid_attestation.data.slot, + valid_attestation.committee_index, + ) + .unwrap(); + let fork_name = harness + .chain + .spec + .fork_name_at_slot::(valid_attestation.data.slot); + let valid_attestation = + single_attestation_to_attestation(&valid_attestation, committee.committee, fork_name) + .unwrap(); let (valid_aggregate, _, _) = get_valid_aggregated_attestation(&harness.chain, valid_attestation); @@ -1540,15 +1451,16 @@ async fn verify_attestation_for_gossip_doppelganger_detection() { "the test requires a new epoch to avoid already-seen errors" ); - let (valid_attestation, index, _attester_committee_index, _, subnet_id) = - get_valid_unaggregated_attestation(&harness.chain); + let (valid_attestation, _, subnet_id) = get_valid_unaggregated_attestation(&harness.chain); + + let index = valid_attestation.attester_index as usize; harness .chain .verify_unaggregated_attestation_for_gossip(&valid_attestation, Some(subnet_id)) .expect("should verify attestation"); - let epoch = valid_attestation.data().target.epoch; + let epoch = valid_attestation.data.target.epoch; assert!(harness.chain.validator_seen_at_epoch(index, epoch)); // Check the correct beacon cache is populated @@ -1612,7 +1524,7 @@ async fn attestation_verification_use_head_state_fork() { let attesters = (0..VALIDATOR_COUNT / 2).collect::>(); let capella_fork = spec.fork_for_name(ForkName::Capella).unwrap(); let committee_attestations = harness - .make_unaggregated_attestations_with_opts( + .make_single_attestations_with_opts( attesters.as_slice(), &state, state_root, @@ -1642,7 +1554,7 @@ async fn attestation_verification_use_head_state_fork() { let attesters = (VALIDATOR_COUNT / 2..VALIDATOR_COUNT).collect::>(); let bellatrix_fork = spec.fork_for_name(ForkName::Bellatrix).unwrap(); let committee_attestations = harness - .make_unaggregated_attestations_with_opts( + .make_single_attestations_with_opts( attesters.as_slice(), &state, state_root, diff --git a/beacon_node/beacon_chain/tests/store_tests.rs b/beacon_node/beacon_chain/tests/store_tests.rs index 51c7f0c289e..5d0d62e3fe4 100644 --- a/beacon_node/beacon_chain/tests/store_tests.rs +++ b/beacon_node/beacon_chain/tests/store_tests.rs @@ -491,7 +491,7 @@ async fn epoch_boundary_state_attestation_processing() { .await; let head = harness.chain.head_snapshot(); - late_attestations.extend(harness.get_unaggregated_attestations( + late_attestations.extend(harness.get_single_attestations( &AttestationStrategy::SomeValidators(late_validators.clone()), &head.beacon_state, head.beacon_state_root(), @@ -511,7 +511,7 @@ async fn epoch_boundary_state_attestation_processing() { for (attestation, subnet_id) in late_attestations.into_iter().flatten() { // load_epoch_boundary_state is idempotent! - let block_root = attestation.data().beacon_block_root; + let block_root = attestation.data.beacon_block_root; let block = store .get_blinded_block(&block_root) .unwrap() @@ -536,7 +536,7 @@ async fn epoch_boundary_state_attestation_processing() { .verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id)); let current_slot = harness.chain.slot().expect("should get slot"); - let expected_attestation_slot = attestation.data().slot; + let expected_attestation_slot = attestation.data.slot; // Extra -1 to handle gossip clock disparity. let expected_earliest_permissible_slot = current_slot - E::slots_per_epoch() - 1; @@ -2712,7 +2712,7 @@ async fn process_blocks_and_attestations_for_unaligned_checkpoint() { slot, ); harness.advance_slot(); - harness.process_attestations(attestations); + harness.process_attestations(attestations, &advanced_split_state); } } @@ -2874,8 +2874,8 @@ async fn revert_minority_fork_on_resume() { ); harness1.set_current_slot(slot); harness2.set_current_slot(slot); - harness1.process_attestations(attestations.clone()); - harness2.process_attestations(attestations); + harness1.process_attestations(attestations.clone(), &state); + harness2.process_attestations(attestations, &state); let ((block, blobs), new_state) = harness1.make_block(state, slot).await; @@ -2915,7 +2915,7 @@ async fn revert_minority_fork_on_resume() { slot, ); harness2.set_current_slot(slot); - harness2.process_attestations(attestations); + harness2.process_attestations(attestations, &state2); // Minority chain block (no attesters). let ((block1, blobs1), new_state1) = harness1.make_block(state1, slot).await; diff --git a/beacon_node/beacon_chain/tests/tests.rs b/beacon_node/beacon_chain/tests/tests.rs index c801361fd5f..944f8f8631e 100644 --- a/beacon_node/beacon_chain/tests/tests.rs +++ b/beacon_node/beacon_chain/tests/tests.rs @@ -567,7 +567,7 @@ async fn attestations_with_increasing_slots() { let head = harness.chain.head_snapshot(); let head_state_root = head.beacon_state_root(); - attestations.extend(harness.get_unaggregated_attestations( + attestations.extend(harness.get_single_attestations( &AttestationStrategy::AllValidators, &head.beacon_state, head_state_root, @@ -584,7 +584,7 @@ async fn attestations_with_increasing_slots() { .verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id)); let current_slot = harness.chain.slot().expect("should get slot"); - let expected_attestation_slot = attestation.data().slot; + let expected_attestation_slot = attestation.data.slot; let expected_earliest_permissible_slot = current_slot - MinimalEthSpec::slots_per_epoch() - 1; diff --git a/beacon_node/beacon_processor/src/lib.rs b/beacon_node/beacon_processor/src/lib.rs index e864cb1fd91..57ead0b0242 100644 --- a/beacon_node/beacon_processor/src/lib.rs +++ b/beacon_node/beacon_processor/src/lib.rs @@ -63,7 +63,7 @@ use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; use tracing::{debug, error, trace, warn}; use types::{ - Attestation, BeaconState, ChainSpec, EthSpec, Hash256, RelativeEpoch, SignedAggregateAndProof, + BeaconState, ChainSpec, EthSpec, Hash256, RelativeEpoch, SignedAggregateAndProof, SingleAttestation, Slot, SubnetId, }; use work_reprocessing_queue::{ @@ -549,32 +549,23 @@ pub enum BlockingOrAsync { Blocking(BlockingFn), Async(AsyncFn), } -pub type GossipAttestationBatch = Vec>>; +pub type GossipAttestationBatch = Vec>; /// Indicates the type of work to be performed and therefore its priority and /// queuing specifics. pub enum Work { GossipAttestation { - attestation: Box>>, - process_individual: Box>) + Send + Sync>, - process_batch: Box) + Send + Sync>, - }, - // Attestation requiring conversion before processing. - // - // For now this is a `SingleAttestation`, but eventually we will switch this around so that - // legacy `Attestation`s are converted and the main processing pipeline operates on - // `SingleAttestation`s. - GossipAttestationToConvert { attestation: Box>, process_individual: Box) + Send + Sync>, + process_batch: Box, }, UnknownBlockAttestation { process_fn: BlockingFn, }, GossipAttestationBatch { - attestations: GossipAttestationBatch, - process_batch: Box) + Send + Sync>, + attestations: GossipAttestationBatch, + process_batch: Box, }, GossipAggregate { aggregate: Box>, @@ -702,7 +693,6 @@ impl Work { fn to_type(&self) -> WorkType { match self { Work::GossipAttestation { .. } => WorkType::GossipAttestation, - Work::GossipAttestationToConvert { .. } => WorkType::GossipAttestationToConvert, Work::GossipAttestationBatch { .. } => WorkType::GossipAttestationBatch, Work::GossipAggregate { .. } => WorkType::GossipAggregate, Work::GossipAggregateBatch { .. } => WorkType::GossipAggregateBatch, @@ -1315,9 +1305,6 @@ impl BeaconProcessor { match work { _ if can_spawn => self.spawn_worker(work, idle_tx), Work::GossipAttestation { .. } => attestation_queue.push(work), - Work::GossipAttestationToConvert { .. } => { - attestation_to_convert_queue.push(work) - } // Attestation batches are formed internally within the // `BeaconProcessor`, they are not sent from external services. Work::GossipAttestationBatch { .. } => crit!( @@ -1559,12 +1546,6 @@ impl BeaconProcessor { } => task_spawner.spawn_blocking(move || { process_individual(*attestation); }), - Work::GossipAttestationToConvert { - attestation, - process_individual, - } => task_spawner.spawn_blocking(move || { - process_individual(*attestation); - }), Work::GossipAttestationBatch { attestations, process_batch, diff --git a/beacon_node/http_api/src/lib.rs b/beacon_node/http_api/src/lib.rs index 2eaa33a9648..9ccb2d1cdf5 100644 --- a/beacon_node/http_api/src/lib.rs +++ b/beacon_node/http_api/src/lib.rs @@ -45,7 +45,6 @@ pub use block_id::BlockId; use builder_states::get_next_withdrawals; use bytes::Bytes; use directory::DEFAULT_ROOT_DIR; -use either::Either; use eth2::types::{ self as api_types, BroadcastValidation, ContextDeserialize, EndpointVersion, ForkChoice, ForkChoiceNode, LightClientUpdatesQuery, PublishBlockRequest, ValidatorBalancesRequestBody, @@ -64,7 +63,6 @@ pub use publish_blocks::{ publish_blinded_block, publish_block, reconstruct_block, ProvenancedBlock, }; use serde::{Deserialize, Serialize}; -use serde_json::Value; use slot_clock::SlotClock; use ssz::Encode; pub use state_id::StateId; @@ -87,13 +85,13 @@ use tokio_stream::{ StreamExt, }; use tracing::{debug, error, info, warn}; -use types::AttestationData; use types::{ - Attestation, AttestationShufflingId, AttesterSlashing, BeaconStateError, ChainSpec, Checkpoint, - CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, Hash256, ProposerPreparationData, - ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, SignedBlindedBeaconBlock, - SignedBlsToExecutionChange, SignedContributionAndProof, SignedValidatorRegistrationData, - SignedVoluntaryExit, Slot, SyncCommitteeMessage, SyncContributionData, + Attestation, AttestationData, AttestationShufflingId, AttesterSlashing, BeaconStateError, + ChainSpec, Checkpoint, CommitteeCache, ConfigAndPreset, Epoch, EthSpec, ForkName, Hash256, + ProposerPreparationData, ProposerSlashing, RelativeEpoch, SignedAggregateAndProof, + SignedBlindedBeaconBlock, SignedBlsToExecutionChange, SignedContributionAndProof, + SignedValidatorRegistrationData, SignedVoluntaryExit, SingleAttestation, Slot, + SyncCommitteeMessage, SyncContributionData, }; use validator::pubkey_to_validator_index; use version::{ @@ -1981,68 +1979,21 @@ pub fn serve( .and(task_spawner_filter.clone()) .and(chain_filter.clone()); - let post_beacon_pool_attestations_v1 = beacon_pool_path - .clone() - .and(warp::path("attestations")) - .and(warp::path::end()) - .and(warp_utils::json::json()) - .and(network_tx_filter.clone()) - .and(reprocess_send_filter.clone()) - .then( - |task_spawner: TaskSpawner, - chain: Arc>, - attestations: Vec>, - network_tx: UnboundedSender>, - reprocess_tx: Option>| async move { - let attestations = attestations.into_iter().map(Either::Left).collect(); - let result = crate::publish_attestations::publish_attestations( - task_spawner, - chain, - attestations, - network_tx, - reprocess_tx, - ) - .await - .map(|()| warp::reply::json(&())); - convert_rejection(result).await - }, - ); - let post_beacon_pool_attestations_v2 = beacon_pool_path_v2 .clone() .and(warp::path("attestations")) .and(warp::path::end()) - .and(warp_utils::json::json::()) + .and(warp_utils::json::json::>()) .and(optional_consensus_version_header_filter) .and(network_tx_filter.clone()) .and(reprocess_send_filter.clone()) .then( |task_spawner: TaskSpawner, chain: Arc>, - payload: Value, - fork_name: Option, + attestations: Vec, + _fork_name: Option, network_tx: UnboundedSender>, reprocess_tx: Option>| async move { - let attestations = - match crate::publish_attestations::deserialize_attestation_payload::( - payload, fork_name, - ) { - Ok(attestations) => attestations, - Err(err) => { - warn!( - error = ?err, - "Unable to deserialize attestation POST request" - ); - return warp::reply::with_status( - warp::reply::json( - &"Unable to deserialize request body".to_string(), - ), - eth2::StatusCode::BAD_REQUEST, - ) - .into_response(); - } - }; - let result = crate::publish_attestations::publish_attestations( task_spawner, chain, @@ -4966,7 +4917,6 @@ pub fn serve( .uor(post_beacon_blinded_blocks) .uor(post_beacon_blocks_v2) .uor(post_beacon_blinded_blocks_v2) - .uor(post_beacon_pool_attestations_v1) .uor(post_beacon_pool_attestations_v2) .uor(post_beacon_pool_attester_slashings) .uor(post_beacon_pool_proposer_slashings) diff --git a/beacon_node/http_api/src/publish_attestations.rs b/beacon_node/http_api/src/publish_attestations.rs index db85b8f205e..3c18a8ec417 100644 --- a/beacon_node/http_api/src/publish_attestations.rs +++ b/beacon_node/http_api/src/publish_attestations.rs @@ -36,16 +36,13 @@ //! attestations and there's no immediate cause for concern. use crate::task_spawner::{Priority, TaskSpawner}; use beacon_chain::{ - single_attestation::single_attestation_to_attestation, validator_monitor::timestamp_now, - AttestationError, BeaconChain, BeaconChainError, BeaconChainTypes, + validator_monitor::timestamp_now, AttestationError, BeaconChain, BeaconChainError, + BeaconChainTypes, }; use beacon_processor::work_reprocessing_queue::{QueuedUnaggregate, ReprocessQueueMessage}; -use either::Either; use eth2::types::Failure; use lighthouse_network::PubsubMessage; use network::NetworkMessage; -use serde_json::Value; -use std::borrow::Cow; use std::sync::Arc; use std::time::Duration; use tokio::sync::{ @@ -53,7 +50,7 @@ use tokio::sync::{ oneshot, }; use tracing::{debug, error, warn}; -use types::{Attestation, EthSpec, ForkName, SingleAttestation}; +use types::SingleAttestation; // Error variants are only used in `Debug` and considered `dead_code` by the compiler. #[derive(Debug)] @@ -65,8 +62,6 @@ pub enum Error { ReprocessDisabled, ReprocessFull, ReprocessTimeout, - InvalidJson(#[allow(dead_code)] serde_json::Error), - FailedConversion(#[allow(dead_code)] Box), } enum PublishAttestationResult { @@ -76,66 +71,24 @@ enum PublishAttestationResult { Failure(Error), } -#[allow(clippy::type_complexity)] -pub fn deserialize_attestation_payload( - payload: Value, - fork_name: Option, -) -> Result, SingleAttestation>>, Error> { - if fork_name.is_some_and(|fork_name| fork_name.electra_enabled()) || fork_name.is_none() { - if fork_name.is_none() { - warn!("No Consensus Version header specified."); - } - - Ok(serde_json::from_value::>(payload) - .map_err(Error::InvalidJson)? - .into_iter() - .map(Either::Right) - .collect()) - } else { - Ok( - serde_json::from_value::>>(payload) - .map_err(Error::InvalidJson)? - .into_iter() - .map(Either::Left) - .collect(), - ) - } -} - fn verify_and_publish_attestation( chain: &Arc>, - either_attestation: &Either, SingleAttestation>, + attestation: &SingleAttestation, seen_timestamp: Duration, network_tx: &UnboundedSender>, ) -> Result<(), Error> { - let attestation = convert_to_attestation(chain, either_attestation)?; let verified_attestation = chain - .verify_unaggregated_attestation_for_gossip(&attestation, None) + .verify_unaggregated_attestation_for_gossip(attestation, None) .map_err(Error::Validation)?; - match either_attestation { - Either::Left(attestation) => { - // Publish. - network_tx - .send(NetworkMessage::Publish { - messages: vec![PubsubMessage::Attestation(Box::new(( - verified_attestation.subnet_id(), - attestation.clone(), - )))], - }) - .map_err(|_| Error::Publication)?; - } - Either::Right(single_attestation) => { - network_tx - .send(NetworkMessage::Publish { - messages: vec![PubsubMessage::SingleAttestation(Box::new(( - verified_attestation.subnet_id(), - single_attestation.clone(), - )))], - }) - .map_err(|_| Error::Publication)?; - } - } + network_tx + .send(NetworkMessage::Publish { + messages: vec![PubsubMessage::Attestation(Box::new(( + verified_attestation.subnet_id(), + attestation.clone(), + )))], + }) + .map_err(|_| Error::Publication)?; // Notify the validator monitor. chain @@ -172,57 +125,10 @@ fn verify_and_publish_attestation( } } -fn convert_to_attestation<'a, T: BeaconChainTypes>( - chain: &Arc>, - attestation: &'a Either, SingleAttestation>, -) -> Result>, Error> { - match attestation { - Either::Left(a) => Ok(Cow::Borrowed(a)), - Either::Right(single_attestation) => { - let conversion_result = chain.with_committee_cache( - single_attestation.data.target.root, - single_attestation - .data - .slot - .epoch(T::EthSpec::slots_per_epoch()), - |committee_cache, _| { - let Some(committee) = committee_cache.get_beacon_committee( - single_attestation.data.slot, - single_attestation.committee_index, - ) else { - return Ok(Err(AttestationError::NoCommitteeForSlotAndIndex { - slot: single_attestation.data.slot, - index: single_attestation.committee_index, - })); - }; - - Ok(single_attestation_to_attestation::( - single_attestation, - committee.committee, - ) - .map(Cow::Owned)) - }, - ); - match conversion_result { - Ok(Ok(attestation)) => Ok(attestation), - Ok(Err(e)) => Err(Error::Validation(e)), - // Map the error returned by `with_committee_cache` for unknown blocks into the - // `UnknownHeadBlock` error that is gracefully handled. - Err(BeaconChainError::MissingBeaconBlock(beacon_block_root)) => { - Err(Error::Validation(AttestationError::UnknownHeadBlock { - beacon_block_root, - })) - } - Err(e) => Err(Error::FailedConversion(Box::new(e))), - } - } - } -} - pub async fn publish_attestations( task_spawner: TaskSpawner, chain: Arc>, - attestations: Vec, SingleAttestation>>, + attestations: Vec, network_tx: UnboundedSender>, reprocess_send: Option>, ) -> Result<(), warp::Rejection> { @@ -230,10 +136,7 @@ pub async fn publish_attestations( // move the `attestations` vec into the blocking task, so this small overhead is unavoidable. let attestation_metadata = attestations .iter() - .map(|att| match att { - Either::Left(att) => (att.data().slot, att.committee_index()), - Either::Right(att) => (att.data.slot, Some(att.committee_index)), - }) + .map(|att| (att.data.slot, Some(att.committee_index))) .collect::>(); // Gossip validate and publish attestations that can be immediately processed. diff --git a/beacon_node/http_api/tests/fork_tests.rs b/beacon_node/http_api/tests/fork_tests.rs index 10e1d015368..dcc6d13ec44 100644 --- a/beacon_node/http_api/tests/fork_tests.rs +++ b/beacon_node/http_api/tests/fork_tests.rs @@ -149,10 +149,41 @@ async fn attestations_across_fork_with_skip_slots() { .flat_map(|(atts, _)| atts.iter().map(|(att, _)| att.clone())) .collect::>(); + let unaggregated_attestations = unaggregated_attestations + .into_iter() + .map(|attn| { + let aggregation_bits = attn.get_aggregation_bits(); + + if aggregation_bits.len() != 1 { + panic!("Must be an unaggregated attestation") + } + + let aggregation_bit = *aggregation_bits.first().unwrap(); + + let committee = fork_state + .get_beacon_committee(attn.data().slot, attn.committee_index().unwrap()) + .unwrap(); + + let attester_index = committee + .committee + .iter() + .enumerate() + .find_map(|(i, &index)| { + if aggregation_bit as usize == i { + return Some(index); + } + None + }) + .unwrap(); + attn.to_single_attestation_with_attester_index(attester_index as u64) + .unwrap() + }) + .collect::>(); + assert!(!unaggregated_attestations.is_empty()); let fork_name = harness.spec.fork_name_at_slot::(fork_slot); client - .post_beacon_pool_attestations_v1(&unaggregated_attestations) + .post_beacon_pool_attestations_v2::(unaggregated_attestations, fork_name) .await .unwrap(); diff --git a/beacon_node/http_api/tests/interactive_tests.rs b/beacon_node/http_api/tests/interactive_tests.rs index 4f3cd6c8285..399474c85c2 100644 --- a/beacon_node/http_api/tests/interactive_tests.rs +++ b/beacon_node/http_api/tests/interactive_tests.rs @@ -5,7 +5,6 @@ use beacon_chain::{ ChainConfig, }; use beacon_processor::work_reprocessing_queue::ReprocessQueueMessage; -use either::Either; use eth2::types::ProduceBlockV3Response; use eth2::types::{DepositContractData, StateId}; use execution_layer::{ForkchoiceState, PayloadAttributes}; @@ -539,7 +538,7 @@ pub async fn proposer_boost_re_org_test( slot_a, num_parent_votes, ); - harness.process_attestations(block_a_parent_votes); + harness.process_attestations(block_a_parent_votes, &state_a); // Attest to block A during slot B. for _ in 0..parent_distance { @@ -553,7 +552,7 @@ pub async fn proposer_boost_re_org_test( slot_b, num_empty_votes, ); - harness.process_attestations(block_a_empty_votes); + harness.process_attestations(block_a_empty_votes, &state_a); let remaining_attesters = all_validators .iter() @@ -586,7 +585,7 @@ pub async fn proposer_boost_re_org_test( slot_b, num_head_votes, ); - harness.process_attestations(block_b_head_votes); + harness.process_attestations(block_b_head_votes, &state_b); let payload_lookahead = harness.chain.config.prepare_payload_lookahead; let fork_choice_lookahead = Duration::from_millis(500); @@ -818,10 +817,10 @@ pub async fn fork_choice_before_proposal() { block_root_c, slot_c, ); - harness.process_attestations(attestations_c); + harness.process_attestations(attestations_c, &state_c); // Apply the attestations to B, but don't re-run fork choice. - harness.process_attestations(attestations_b); + harness.process_attestations(attestations_b, &state_b); // Due to proposer boost, the head should be C during slot C. assert_eq!( @@ -894,7 +893,7 @@ async fn queue_attestations_from_http() { let fork_name = tester.harness.spec.fork_name_at_slot::(attestation_slot); // Make attestations to the block and POST them to the beacon node on a background thread. - let attestation_future = if fork_name.electra_enabled() { + let attestation_future = { let single_attestations = harness .make_single_attestations( &all_validators, @@ -907,30 +906,9 @@ async fn queue_attestations_from_http() { .flat_map(|attestations| attestations.into_iter().map(|(att, _subnet)| att)) .collect::>(); - let attestations = Either::Right(single_attestations); - - tokio::spawn(async move { - client - .post_beacon_pool_attestations_v2::(attestations, fork_name) - .await - .expect("attestations should be processed successfully") - }) - } else { - let attestations = harness - .make_unaggregated_attestations( - &all_validators, - &post_state, - block.0.state_root(), - block_root.into(), - attestation_slot, - ) - .into_iter() - .flat_map(|attestations| attestations.into_iter().map(|(att, _subnet)| att)) - .collect::>(); - tokio::spawn(async move { client - .post_beacon_pool_attestations_v1(&attestations) + .post_beacon_pool_attestations_v2::(single_attestations, fork_name) .await .expect("attestations should be processed successfully") }) diff --git a/beacon_node/http_api/tests/tests.rs b/beacon_node/http_api/tests/tests.rs index 4ad70c34671..c23ab924159 100644 --- a/beacon_node/http_api/tests/tests.rs +++ b/beacon_node/http_api/tests/tests.rs @@ -3,7 +3,6 @@ use beacon_chain::{ test_utils::{AttestationStrategy, BeaconChainHarness, BlockStrategy, EphemeralHarnessType}, BeaconChain, ChainConfig, StateSkipConfig, WhenSlotSkipped, }; -use either::Either; use eth2::{ mixin::{RequestAccept, ResponseForkName, ResponseOptional}, reqwest::RequestBuilder, @@ -1907,18 +1906,46 @@ impl ApiTester { } pub async fn test_post_beacon_pool_attestations_valid(mut self) -> Self { - self.client - .post_beacon_pool_attestations_v1(self.attestations.as_slice()) - .await - .unwrap(); - let fork_name = self .attestations .first() .map(|att| self.chain.spec.fork_name_at_slot::(att.data().slot)) .unwrap(); - let attestations = Either::Left(self.attestations.clone()); + let state = &self.chain.head_snapshot().beacon_state; + + let attestations = self + .attestations + .clone() + .into_iter() + .map(|attn| { + let aggregation_bits = attn.get_aggregation_bits(); + + if aggregation_bits.len() != 1 { + panic!("Must be an unaggregated attestation") + } + + let aggregation_bit = *aggregation_bits.first().unwrap(); + + let committee = state + .get_beacon_committee(attn.data().slot, attn.committee_index().unwrap()) + .unwrap(); + + let attester_index = committee + .committee + .iter() + .enumerate() + .find_map(|(i, &index)| { + if aggregation_bit as usize == i { + return Some(index); + } + None + }) + .unwrap(); + attn.to_single_attestation_with_attester_index(attester_index as u64) + .unwrap() + }) + .collect::>(); self.client .post_beacon_pool_attestations_v2::(attestations, fork_name) @@ -1943,9 +1970,8 @@ impl ApiTester { .map(|att| self.chain.spec.fork_name_at_slot::(att.data.slot)) .unwrap(); - let attestations = Either::Right(self.single_attestations.clone()); self.client - .post_beacon_pool_attestations_v2::(attestations, fork_name) + .post_beacon_pool_attestations_v2::(self.single_attestations.clone(), fork_name) .await .unwrap(); assert!( @@ -1958,18 +1984,87 @@ impl ApiTester { pub async fn test_post_beacon_pool_attestations_invalid_v1(mut self) -> Self { let mut attestations = Vec::new(); + let state = &self.chain.head_snapshot().beacon_state; for attestation in &self.attestations { let mut invalid_attestation = attestation.clone(); invalid_attestation.data_mut().slot += 1; + // Convert valid attestation into valid `SingleAttestation` + let aggregation_bits = attestation.get_aggregation_bits(); + + if aggregation_bits.len() != 1 { + panic!("Must be an unaggregated attestation") + } + + let aggregation_bit = *aggregation_bits.first().unwrap(); + + let committee = state + .get_beacon_committee( + attestation.data().slot, + attestation.committee_index().unwrap(), + ) + .unwrap(); + + let attester_index = committee + .committee + .iter() + .enumerate() + .find_map(|(i, &index)| { + if aggregation_bit as usize == i { + return Some(index); + } + None + }) + .unwrap(); + let attestation = attestation + .to_single_attestation_with_attester_index(attester_index as u64) + .unwrap(); + + // Convert invalid attestation to invalid `SingleAttestation` + let aggregation_bits = invalid_attestation.get_aggregation_bits(); + + if aggregation_bits.len() != 1 { + panic!("Must be an unaggregated attestation") + } + + let aggregation_bit = *aggregation_bits.first().unwrap(); + + let committee = state + .get_beacon_committee( + invalid_attestation.data().slot, + invalid_attestation.committee_index().unwrap(), + ) + .unwrap(); + + let attester_index = committee + .committee + .iter() + .enumerate() + .find_map(|(i, &index)| { + if aggregation_bit as usize == i { + return Some(index); + } + None + }) + .unwrap(); + let invalid_attestation = invalid_attestation + .to_single_attestation_with_attester_index(attester_index as u64) + .unwrap(); + // add both to ensure we only fail on invalid attestations attestations.push(attestation.clone()); attestations.push(invalid_attestation); } + let fork_name = self + .attestations + .first() + .map(|att| self.chain.spec.fork_name_at_slot::(att.data().slot)) + .unwrap(); + let err = self .client - .post_beacon_pool_attestations_v1(attestations.as_slice()) + .post_beacon_pool_attestations_v2::(attestations, fork_name) .await .unwrap_err(); @@ -2011,7 +2106,6 @@ impl ApiTester { .first() .map(|att| self.chain.spec.fork_name_at_slot::(att.data().slot)) .unwrap(); - let attestations = Either::Right(attestations); let err_v2 = self .client .post_beacon_pool_attestations_v2::(attestations, fork_name) @@ -4177,9 +4271,47 @@ impl ApiTester { assert_eq!(result, expected); + let attestations = self + .attestations + .clone() + .into_iter() + .map(|attn| { + let aggregation_bits = attn.get_aggregation_bits(); + + if aggregation_bits.len() != 1 { + panic!("Must be an unaggregated attestation") + } + + let aggregation_bit = *aggregation_bits.first().unwrap(); + + let committee = head_state + .get_beacon_committee(attn.data().slot, attn.committee_index().unwrap()) + .unwrap(); + + let attester_index = committee + .committee + .iter() + .enumerate() + .find_map(|(i, &index)| { + if aggregation_bit as usize == i { + return Some(index); + } + None + }) + .unwrap(); + attn.to_single_attestation_with_attester_index(attester_index as u64) + .unwrap() + }) + .collect::>(); + + let fork_name = self + .chain + .spec + .fork_name_at_slot::(attestations.first().unwrap().data.slot); + // Attest to the current slot self.client - .post_beacon_pool_attestations_v1(self.attestations.as_slice()) + .post_beacon_pool_attestations_v2::(attestations, fork_name) .await .unwrap(); @@ -5916,9 +6048,47 @@ impl ApiTester { assert_eq!(result, expected); + let attestations = self + .attestations + .clone() + .into_iter() + .map(|attn| { + let aggregation_bits = attn.get_aggregation_bits(); + + if aggregation_bits.len() != 1 { + panic!("Must be an unaggregated attestation") + } + + let aggregation_bit = *aggregation_bits.first().unwrap(); + + let committee = head_state + .get_beacon_committee(attn.data().slot, attn.committee_index().unwrap()) + .unwrap(); + + let attester_index = committee + .committee + .iter() + .enumerate() + .find_map(|(i, &index)| { + if aggregation_bit as usize == i { + return Some(index); + } + None + }) + .unwrap(); + attn.to_single_attestation_with_attester_index(attester_index as u64) + .unwrap() + }) + .collect::>(); + + let fork_name = self + .chain + .spec + .fork_name_at_slot::(attestations.first().unwrap().data.slot); + // Attest to the current slot self.client - .post_beacon_pool_attestations_v1(self.attestations.as_slice()) + .post_beacon_pool_attestations_v2::(attestations, fork_name) .await .unwrap(); @@ -5973,8 +6143,47 @@ impl ApiTester { let expected_attestation_len = self.attestations.len(); + let state = self.harness.get_current_state(); + let attestations = self + .attestations + .clone() + .into_iter() + .map(|attn| { + let aggregation_bits = attn.get_aggregation_bits(); + + if aggregation_bits.len() != 1 { + panic!("Must be an unaggregated attestation") + } + + let aggregation_bit = *aggregation_bits.first().unwrap(); + + let committee = state + .get_beacon_committee(attn.data().slot, attn.committee_index().unwrap()) + .unwrap(); + + let attester_index = committee + .committee + .iter() + .enumerate() + .find_map(|(i, &index)| { + if aggregation_bit as usize == i { + return Some(index); + } + None + }) + .unwrap(); + attn.to_single_attestation_with_attester_index(attester_index as u64) + .unwrap() + }) + .collect::>(); + + let fork_name = self + .chain + .spec + .fork_name_at_slot::(attestations.first().unwrap().data.slot); + self.client - .post_beacon_pool_attestations_v1(self.attestations.as_slice()) + .post_beacon_pool_attestations_v2::(attestations, fork_name) .await .unwrap(); @@ -6247,9 +6456,9 @@ impl ApiTester { .chain .spec .fork_name_at_slot::(self.chain.slot().unwrap()); - let attestations = Either::Right(self.single_attestations.clone()); + self.client - .post_beacon_pool_attestations_v2::(attestations, fork_name) + .post_beacon_pool_attestations_v2::(self.single_attestations.clone(), fork_name) .await .unwrap(); diff --git a/beacon_node/lighthouse_network/src/types/pubsub.rs b/beacon_node/lighthouse_network/src/types/pubsub.rs index 880b387250e..21df75a648c 100644 --- a/beacon_node/lighthouse_network/src/types/pubsub.rs +++ b/beacon_node/lighthouse_network/src/types/pubsub.rs @@ -7,8 +7,8 @@ use ssz::{Decode, Encode}; use std::io::{Error, ErrorKind}; use std::sync::Arc; use types::{ - Attestation, AttestationBase, AttesterSlashing, AttesterSlashingBase, AttesterSlashingElectra, - BlobSidecar, DataColumnSidecar, DataColumnSubnetId, EthSpec, ForkContext, ForkName, + AttesterSlashing, AttesterSlashingBase, AttesterSlashingElectra, BlobSidecar, + DataColumnSidecar, DataColumnSubnetId, EthSpec, ForkContext, ForkName, LightClientFinalityUpdate, LightClientOptimisticUpdate, ProposerSlashing, SignedAggregateAndProof, SignedAggregateAndProofBase, SignedAggregateAndProofElectra, SignedBeaconBlock, SignedBeaconBlockAltair, SignedBeaconBlockBase, SignedBeaconBlockBellatrix, @@ -27,10 +27,8 @@ pub enum PubsubMessage { DataColumnSidecar(Box<(DataColumnSubnetId, Arc>)>), /// Gossipsub message providing notification of a Aggregate attestation and associated proof. AggregateAndProofAttestation(Box>), - /// Gossipsub message providing notification of a raw un-aggregated attestation with its subnet id. - Attestation(Box<(SubnetId, Attestation)>), - /// Gossipsub message providing notification of a `SingleAttestation`` with its subnet id. - SingleAttestation(Box<(SubnetId, SingleAttestation)>), + /// Gossipsub message providing notification of a `SingleAttestation` with its subnet id. + Attestation(Box<(SubnetId, SingleAttestation)>), /// Gossipsub message providing notification of a voluntary exit. VoluntaryExit(Box), /// Gossipsub message providing notification of a new proposer slashing. @@ -140,9 +138,6 @@ impl PubsubMessage { PubsubMessage::Attestation(attestation_data) => { GossipKind::Attestation(attestation_data.0) } - PubsubMessage::SingleAttestation(attestation_data) => { - GossipKind::Attestation(attestation_data.0) - } PubsubMessage::VoluntaryExit(_) => GossipKind::VoluntaryExit, PubsubMessage::ProposerSlashing(_) => GossipKind::ProposerSlashing, PubsubMessage::AttesterSlashing(_) => GossipKind::AttesterSlashing, @@ -203,32 +198,12 @@ impl PubsubMessage { ))) } GossipKind::Attestation(subnet_id) => { - match fork_context.from_context_bytes(gossip_topic.fork_digest) { - Some(&fork_name) => { - if fork_name.electra_enabled() { - let single_attestation = - SingleAttestation::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?; - Ok(PubsubMessage::SingleAttestation(Box::new(( - *subnet_id, - single_attestation, - )))) - } else { - let attestation = Attestation::Base( - AttestationBase::from_ssz_bytes(data) - .map_err(|e| format!("{:?}", e))?, - ); - Ok(PubsubMessage::Attestation(Box::new(( - *subnet_id, - attestation, - )))) - } - } - None => Err(format!( - "Unknown gossipsub fork digest: {:?}", - gossip_topic.fork_digest - )), - } + let attestation = SingleAttestation::from_ssz_bytes(data) + .map_err(|e| format!("{:?}", e))?; + Ok(PubsubMessage::Attestation(Box::new(( + *subnet_id, + attestation, + )))) } GossipKind::BeaconBlock => { let beacon_block = @@ -418,7 +393,6 @@ impl PubsubMessage { PubsubMessage::ProposerSlashing(data) => data.as_ssz_bytes(), PubsubMessage::AttesterSlashing(data) => data.as_ssz_bytes(), PubsubMessage::Attestation(data) => data.1.as_ssz_bytes(), - PubsubMessage::SingleAttestation(data) => data.1.as_ssz_bytes(), PubsubMessage::SignedContributionAndProof(data) => data.as_ssz_bytes(), PubsubMessage::SyncCommitteeMessage(data) => data.1.as_ssz_bytes(), PubsubMessage::BlsToExecutionChange(data) => data.as_ssz_bytes(), @@ -457,13 +431,6 @@ impl std::fmt::Display for PubsubMessage { att.message().aggregator_index(), ), PubsubMessage::Attestation(data) => write!( - f, - "Attestation: subnet_id: {}, attestation_slot: {}, attestation_index: {:?}", - *data.0, - data.1.data().slot, - data.1.committee_index(), - ), - PubsubMessage::SingleAttestation(data) => write!( f, "SingleAttestation: subnet_id: {}, attestation_slot: {}, committee_index: {:?}, attester_index: {:?}", *data.0, diff --git a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs index 8757ab43830..9ce65f42dc6 100644 --- a/beacon_node/network/src/network_beacon_processor/gossip_methods.rs +++ b/beacon_node/network/src/network_beacon_processor/gossip_methods.rs @@ -14,7 +14,6 @@ use beacon_chain::{ light_client_finality_update_verification::Error as LightClientFinalityUpdateError, light_client_optimistic_update_verification::Error as LightClientOptimisticUpdateError, observed_operations::ObservationOutcome, - single_attestation::single_attestation_to_attestation, sync_committee_verification::{self, Error as SyncCommitteeError}, validator_monitor::{get_block_delay_ms, get_slot_delay_ms}, AvailabilityProcessingStatus, BeaconChainError, BeaconChainTypes, BlockError, ForkChoiceError, @@ -82,8 +81,8 @@ impl VerifiedAttestation for VerifiedUnaggregate { } /// An attestation that failed validation by the `BeaconChain`. -struct RejectedUnaggregate { - attestation: Box>, +struct RejectedUnaggregate { + attestation: Box, error: AttnError, } @@ -124,16 +123,11 @@ struct RejectedAggregate { /// Data for an aggregated or unaggregated attestation that failed verification. enum FailedAtt { Unaggregate { - attestation: Box>, + attestation: Box, subnet_id: SubnetId, should_import: bool, seen_timestamp: Duration, }, - // This variant is just a dummy variant for now, as SingleAttestation reprocessing is handled - // separately. - SingleUnaggregate { - attestation: Box, - }, Aggregate { attestation: Box>, seen_timestamp: Duration, @@ -148,15 +142,13 @@ impl FailedAtt { pub fn kind(&self) -> &'static str { match self { FailedAtt::Unaggregate { .. } => "unaggregated", - FailedAtt::SingleUnaggregate { .. } => "unaggregated", FailedAtt::Aggregate { .. } => "aggregated", } } pub fn attestation_data(&self) -> &AttestationData { match self { - FailedAtt::Unaggregate { attestation, .. } => attestation.data(), - FailedAtt::SingleUnaggregate { attestation, .. } => &attestation.data, + FailedAtt::Unaggregate { attestation, .. } => &attestation.data, FailedAtt::Aggregate { attestation, .. } => attestation.message().aggregate().data(), } } @@ -208,7 +200,7 @@ impl NetworkBeaconProcessor { self: Arc, message_id: MessageId, peer_id: PeerId, - attestation: Box>, + attestation: Box, subnet_id: SubnetId, should_import: bool, reprocess_tx: Option>, @@ -218,10 +210,14 @@ impl NetworkBeaconProcessor { .chain .verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id)) { - Ok(verified_attestation) => Ok(VerifiedUnaggregate { - indexed_attestation: verified_attestation.into_indexed_attestation(), - attestation, - }), + Ok(verified_attestation) => { + let attestation = + Box::new(verified_attestation.attestation().clone_as_attestation()); + Ok(VerifiedUnaggregate { + indexed_attestation: verified_attestation.into_indexed_attestation(), + attestation, + }) + } Err(error) => Err(RejectedUnaggregate { attestation, error }), }; @@ -238,7 +234,7 @@ impl NetworkBeaconProcessor { pub fn process_gossip_attestation_batch( self: Arc, - packages: GossipAttestationBatch, + packages: GossipAttestationBatch, reprocess_tx: Option>, ) { let attestations_and_subnets = packages @@ -275,14 +271,19 @@ impl NetworkBeaconProcessor { #[allow(clippy::needless_collect)] // The clippy suggestion fails the borrow checker. let results = results .into_iter() - .map(|result| result.map(|verified| verified.into_indexed_attestation())) + .map(|result| { + result.map(|verified| { + let attestation = verified.attestation().clone_as_attestation(); + (verified.into_indexed_attestation(), attestation) + }) + }) .collect::>(); for (result, package) in results.into_iter().zip(packages.into_iter()) { let result = match result { - Ok(indexed_attestation) => Ok(VerifiedUnaggregate { + Ok((indexed_attestation, attestation)) => Ok(VerifiedUnaggregate { indexed_attestation, - attestation: package.attestation, + attestation: Box::new(attestation), }), Err(error) => Err(RejectedUnaggregate { attestation: package.attestation, @@ -307,7 +308,7 @@ impl NetworkBeaconProcessor { #[allow(clippy::too_many_arguments)] fn process_gossip_attestation_result( self: &Arc, - result: Result, RejectedUnaggregate>, + result: Result, RejectedUnaggregate>, message_id: MessageId, peer_id: PeerId, subnet_id: SubnetId, @@ -403,147 +404,6 @@ impl NetworkBeaconProcessor { } } - /// Process an unaggregated attestation requiring conversion. - /// - /// This function performs the conversion, and if successfull queues a new message to be - /// processed by `process_gossip_attestation`. If unsuccessful due to block unavailability, - /// a retry message will be pushed to the `reprocess_tx` if it is `Some`. - #[allow(clippy::too_many_arguments)] - pub fn process_gossip_attestation_to_convert( - self: Arc, - message_id: MessageId, - peer_id: PeerId, - single_attestation: Box, - subnet_id: SubnetId, - should_import: bool, - reprocess_tx: Option>, - seen_timestamp: Duration, - ) { - let conversion_result = self.chain.with_committee_cache( - single_attestation.data.target.root, - single_attestation - .data - .slot - .epoch(T::EthSpec::slots_per_epoch()), - |committee_cache, _| { - let slot = single_attestation.data.slot; - let committee_index = single_attestation.committee_index; - let Some(committee) = committee_cache.get_beacon_committee(slot, committee_index) - else { - return Ok(Err(AttnError::NoCommitteeForSlotAndIndex { - slot, - index: committee_index, - })); - }; - - Ok(single_attestation_to_attestation( - &single_attestation, - committee.committee, - )) - }, - ); - - match conversion_result { - Ok(Ok(attestation)) => { - let slot = attestation.data().slot; - if let Err(e) = self.send_unaggregated_attestation( - message_id.clone(), - peer_id, - attestation, - subnet_id, - should_import, - seen_timestamp, - ) { - error!( - error = %e, - %slot, - "Unable to queue converted SingleAttestation" - ); - self.propagate_validation_result( - message_id, - peer_id, - MessageAcceptance::Ignore, - ); - } - } - // Outermost error (from `with_committee_cache`) indicating that the block is not known - // and that this conversion should be retried. - Err(BeaconChainError::MissingBeaconBlock(beacon_block_root)) => { - if let Some(sender) = reprocess_tx { - metrics::inc_counter( - &metrics::BEACON_PROCESSOR_UNAGGREGATED_ATTESTATION_REQUEUED_TOTAL, - ); - // We don't know the block, get the sync manager to handle the block lookup, and - // send the attestation to be scheduled for re-processing. - self.sync_tx - .send(SyncMessage::UnknownBlockHashFromAttestation( - peer_id, - beacon_block_root, - )) - .unwrap_or_else(|_| { - warn!(msg = "UnknownBlockHash", "Failed to send to sync service") - }); - let processor = self.clone(); - // Do not allow this attestation to be re-processed beyond this point. - let reprocess_msg = - ReprocessQueueMessage::UnknownBlockUnaggregate(QueuedUnaggregate { - beacon_block_root, - process_fn: Box::new(move || { - processor.process_gossip_attestation_to_convert( - message_id, - peer_id, - single_attestation, - subnet_id, - should_import, - None, - seen_timestamp, - ) - }), - }); - if sender.try_send(reprocess_msg).is_err() { - error!("Failed to send attestation for re-processing") - } - } else { - // We shouldn't make any further attempts to process this attestation. - // - // Don't downscore the peer since it's not clear if we requested this head - // block from them or not. - self.propagate_validation_result( - message_id, - peer_id, - MessageAcceptance::Ignore, - ); - } - } - Ok(Err(error)) => { - // We already handled reprocessing above so do not attempt it in the error handler. - self.handle_attestation_verification_failure( - peer_id, - message_id, - FailedAtt::SingleUnaggregate { - attestation: single_attestation, - }, - None, - error, - seen_timestamp, - ); - } - Err(error) => { - // We already handled reprocessing above so do not attempt it in the error handler. - self.handle_attestation_verification_failure( - peer_id, - message_id, - FailedAtt::SingleUnaggregate { - attestation: single_attestation, - }, - None, - AttnError::BeaconChainError(Box::new(error)), - seen_timestamp, - ); - } - } - } - /// Process the aggregated attestation received from the gossip network and: /// /// - If it passes gossip propagation criteria, tell the network thread to forward it. @@ -2508,16 +2368,6 @@ impl NetworkBeaconProcessor { }), }) } - FailedAtt::SingleUnaggregate { .. } => { - // This should never happen, as we handle the unknown head block case - // for `SingleAttestation`s separately and should not be able to hit - // an `UnknownHeadBlock` error. - error!( - block_root = ?beacon_block_root, - "Dropping SingleAttestation instead of requeueing" - ); - return; - } FailedAtt::Unaggregate { attestation, subnet_id, @@ -2613,19 +2463,6 @@ impl NetworkBeaconProcessor { "attn_no_committee", ); } - AttnError::NotExactlyOneAggregationBitSet(_) => { - /* - * The unaggregated attestation doesn't have only one signature. - * - * The peer has published an invalid consensus message. - */ - self.propagate_validation_result(message_id, peer_id, MessageAcceptance::Reject); - self.gossip_penalize_peer( - peer_id, - PeerAction::LowToleranceError, - "attn_too_many_agg_bits", - ); - } AttnError::NotExactlyOneCommitteeBitSet(_) => { /* * The attestation doesn't have only one committee bit set. diff --git a/beacon_node/network/src/network_beacon_processor/mod.rs b/beacon_node/network/src/network_beacon_processor/mod.rs index f9390a2c7b8..6b7a754540d 100644 --- a/beacon_node/network/src/network_beacon_processor/mod.rs +++ b/beacon_node/network/src/network_beacon_processor/mod.rs @@ -75,20 +75,21 @@ impl NetworkBeaconProcessor { self.beacon_processor_send.try_send(event) } - /// Create a new `Work` event for some `SingleAttestation`. - pub fn send_single_attestation( + /// Create a new `Work` event for some unaggregated attestation. + pub fn send_unaggregated_attestation( self: &Arc, message_id: MessageId, peer_id: PeerId, - single_attestation: SingleAttestation, + attestation: SingleAttestation, subnet_id: SubnetId, should_import: bool, seen_timestamp: Duration, ) -> Result<(), Error> { + // Define a closure for processing individual attestations. let processor = self.clone(); let process_individual = move |package: GossipAttestationPackage| { let reprocess_tx = processor.reprocess_tx.clone(); - processor.process_gossip_attestation_to_convert( + processor.process_gossip_attestation( package.message_id, package.peer_id, package.attestation, @@ -99,48 +100,6 @@ impl NetworkBeaconProcessor { ) }; - self.try_send(BeaconWorkEvent { - drop_during_sync: true, - work: Work::GossipAttestationToConvert { - attestation: Box::new(GossipAttestationPackage { - message_id, - peer_id, - attestation: Box::new(single_attestation), - subnet_id, - should_import, - seen_timestamp, - }), - process_individual: Box::new(process_individual), - }, - }) - } - - /// Create a new `Work` event for some unaggregated attestation. - pub fn send_unaggregated_attestation( - self: &Arc, - message_id: MessageId, - peer_id: PeerId, - attestation: Attestation, - subnet_id: SubnetId, - should_import: bool, - seen_timestamp: Duration, - ) -> Result<(), Error> { - // Define a closure for processing individual attestations. - let processor = self.clone(); - let process_individual = - move |package: GossipAttestationPackage>| { - let reprocess_tx = processor.reprocess_tx.clone(); - processor.process_gossip_attestation( - package.message_id, - package.peer_id, - package.attestation, - package.subnet_id, - package.should_import, - Some(reprocess_tx), - package.seen_timestamp, - ) - }; - // Define a closure for processing batches of attestations. let processor = self.clone(); let process_batch = move |attestations| { diff --git a/beacon_node/network/src/network_beacon_processor/tests.rs b/beacon_node/network/src/network_beacon_processor/tests.rs index cb9c9764044..1b3ea6c0b2b 100644 --- a/beacon_node/network/src/network_beacon_processor/tests.rs +++ b/beacon_node/network/src/network_beacon_processor/tests.rs @@ -36,9 +36,9 @@ use std::time::Duration; use tokio::sync::mpsc; use types::blob_sidecar::FixedBlobSidecarList; use types::{ - Attestation, AttesterSlashing, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecarList, + AttesterSlashing, BlobSidecar, BlobSidecarList, ChainSpec, DataColumnSidecarList, DataColumnSubnetId, Epoch, Hash256, MainnetEthSpec, ProposerSlashing, SignedAggregateAndProof, - SignedBeaconBlock, SignedVoluntaryExit, Slot, SubnetId, + SignedBeaconBlock, SignedVoluntaryExit, SingleAttestation, Slot, SubnetId, }; type E = MainnetEthSpec; @@ -60,8 +60,8 @@ struct TestRig { next_block: Arc>, next_blobs: Option>, next_data_columns: Option>, - attestations: Vec<(Attestation, SubnetId)>, - next_block_attestations: Vec<(Attestation, SubnetId)>, + attestations: Vec<(SingleAttestation, SubnetId)>, + next_block_attestations: Vec<(SingleAttestation, SubnetId)>, next_block_aggregate_attestations: Vec>, attester_slashing: AttesterSlashing, proposer_slashing: ProposerSlashing, @@ -143,7 +143,7 @@ impl TestRig { let head_state_root = head.beacon_state_root(); let attestations = harness - .get_unaggregated_attestations( + .get_single_attestations( &AttestationStrategy::AllValidators, &head.beacon_state, head_state_root, @@ -160,7 +160,7 @@ impl TestRig { ); let next_block_attestations = harness - .get_unaggregated_attestations( + .get_single_attestations( &AttestationStrategy::AllValidators, &next_state, next_block_tuple.0.state_root(), diff --git a/beacon_node/network/src/router.rs b/beacon_node/network/src/router.rs index 2a7bc597c26..960a1203a66 100644 --- a/beacon_node/network/src/router.rs +++ b/beacon_node/network/src/router.rs @@ -354,17 +354,6 @@ impl Router { timestamp_now(), ), ), - PubsubMessage::SingleAttestation(subnet_attestation) => self - .handle_beacon_processor_send_result( - self.network_beacon_processor.send_single_attestation( - message_id, - peer_id, - subnet_attestation.1, - subnet_attestation.0, - should_process, - timestamp_now(), - ), - ), PubsubMessage::BeaconBlock(block) => self.handle_beacon_processor_send_result( self.network_beacon_processor.send_gossip_beacon_block( message_id, diff --git a/beacon_node/network/src/service.rs b/beacon_node/network/src/service.rs index 77204b455da..b66a57b8a5f 100644 --- a/beacon_node/network/src/service.rs +++ b/beacon_node/network/src/service.rs @@ -539,23 +539,7 @@ impl NetworkService { // the attestation, else we just just propagate the Attestation. let should_process = self.subnet_service.should_process_attestation( Subnet::Attestation(subnet_id), - attestation.data(), - ); - self.send_to_router(RouterMessage::PubsubMessage( - id, - source, - message, - should_process, - )); - } - PubsubMessage::SingleAttestation(ref subnet_and_attestation) => { - let subnet_id = subnet_and_attestation.0; - let single_attestation = &subnet_and_attestation.1; - // checks if we have an aggregator for the slot. If so, we should process - // the attestation, else we just just propagate the Attestation. - let should_process = self.subnet_service.should_process_attestation( - Subnet::Attestation(subnet_id), - &single_attestation.data, + &attestation.data, ); self.send_to_router(RouterMessage::PubsubMessage( id, diff --git a/common/eth2/src/lib.rs b/common/eth2/src/lib.rs index 52cc91ba298..1dd2970c10e 100644 --- a/common/eth2/src/lib.rs +++ b/common/eth2/src/lib.rs @@ -18,7 +18,6 @@ use self::mixin::{RequestAccept, ResponseOptional}; use self::types::{Error as ResponseError, *}; use ::types::beacon_response::ExecutionOptimisticFinalizedBeaconResponse; use derivative::Derivative; -use either::Either; use futures::Stream; use futures_util::StreamExt; use libp2p_identity::PeerId; @@ -1434,29 +1433,10 @@ impl BeaconNodeHttpClient { .map(|opt| opt.map(BeaconResponse::ForkVersioned)) } - /// `POST v1/beacon/pool/attestations` - pub async fn post_beacon_pool_attestations_v1( - &self, - attestations: &[Attestation], - ) -> Result<(), Error> { - let mut path = self.eth_path(V1)?; - - path.path_segments_mut() - .map_err(|()| Error::InvalidUrl(self.server.clone()))? - .push("beacon") - .push("pool") - .push("attestations"); - - self.post_with_timeout(path, &attestations, self.timeouts.attestation) - .await?; - - Ok(()) - } - /// `POST v2/beacon/pool/attestations` pub async fn post_beacon_pool_attestations_v2( &self, - attestations: Either>, Vec>, + attestations: Vec, fork_name: ForkName, ) -> Result<(), Error> { let mut path = self.eth_path(V2)?; @@ -1467,26 +1447,13 @@ impl BeaconNodeHttpClient { .push("pool") .push("attestations"); - match attestations { - Either::Right(attestations) => { - self.post_with_timeout_and_consensus_header( - path, - &attestations, - self.timeouts.attestation, - fork_name, - ) - .await?; - } - Either::Left(attestations) => { - self.post_with_timeout_and_consensus_header( - path, - &attestations, - self.timeouts.attestation, - fork_name, - ) - .await?; - } - }; + self.post_with_timeout_and_consensus_header( + path, + &attestations, + self.timeouts.attestation, + fork_name, + ) + .await?; Ok(()) } diff --git a/consensus/fork_choice/tests/tests.rs b/consensus/fork_choice/tests/tests.rs index 95bdee574d2..8d510d0e896 100644 --- a/consensus/fork_choice/tests/tests.rs +++ b/consensus/fork_choice/tests/tests.rs @@ -15,6 +15,7 @@ use std::fmt; use std::sync::Mutex; use std::time::Duration; use store::MemoryStore; +use types::SingleAttestation; use types::{ test_utils::generate_deterministic_keypair, BeaconBlockRef, BeaconState, ChainSpec, Checkpoint, Epoch, EthSpec, FixedBytesExtended, ForkName, Hash256, IndexedAttestation, MainnetEthSpec, @@ -463,10 +464,17 @@ impl ForkChoiceTest { ) .expect("should sign attestation"); + let single_attestation = SingleAttestation { + attester_index: validator_index as u64, + committee_index: validator_committee_index as u64, + data: attestation.data().clone(), + signature: attestation.signature().clone(), + }; + let mut verified_attestation = self .harness .chain - .verify_unaggregated_attestation_for_gossip(&attestation, Some(subnet_id)) + .verify_unaggregated_attestation_for_gossip(&single_attestation, Some(subnet_id)) .expect("precondition: should gossip verify attestation"); if let MutationDelay::Blocks(slots) = delay { diff --git a/consensus/types/src/attestation.rs b/consensus/types/src/attestation.rs index 286e4622f84..e9a1ab4ceb4 100644 --- a/consensus/types/src/attestation.rs +++ b/consensus/types/src/attestation.rs @@ -1,7 +1,13 @@ -use crate::context_deserialize; +use super::{ + AggregateSignature, AttestationData, BitList, ChainSpec, Domain, EthSpec, Fork, SecretKey, + Signature, SignedRoot, +}; use crate::slot_data::SlotData; +use crate::{context_deserialize, IndexedAttestation}; use crate::{test_utils::TestRandom, Hash256, Slot}; -use crate::{Checkpoint, ContextDeserialize, ForkName}; +use crate::{ + Checkpoint, ContextDeserialize, ForkName, IndexedAttestationBase, IndexedAttestationElectra, +}; use derivative::Derivative; use serde::{Deserialize, Deserializer, Serialize}; use ssz_derive::{Decode, Encode}; @@ -12,11 +18,6 @@ use superstruct::superstruct; use test_random_derive::TestRandom; use tree_hash_derive::TreeHash; -use super::{ - AggregateSignature, AttestationData, BitList, ChainSpec, Domain, EthSpec, Fork, SecretKey, - Signature, SignedRoot, -}; - #[derive(Debug, PartialEq, Clone)] pub enum Error { SszTypesError(ssz_types::Error), @@ -246,10 +247,17 @@ impl Attestation { attester_index: u64, ) -> Result { match self { - Self::Base(_) => Err(Error::IncorrectStateVariant), + Self::Base(attn) => attn.to_single_attestation_with_attester_index(attester_index), Self::Electra(attn) => attn.to_single_attestation_with_attester_index(attester_index), } } + + pub fn get_aggregation_bits(&self) -> Vec { + match self { + Self::Base(attn) => attn.get_aggregation_bits(), + Self::Electra(attn) => attn.get_aggregation_bits(), + } + } } impl AttestationRef<'_, E> { @@ -461,6 +469,26 @@ impl AttestationBase { ) -> Result, ssz::BitfieldError> { self.aggregation_bits.resize::() } + + pub fn get_aggregation_bits(&self) -> Vec { + self.aggregation_bits + .iter() + .enumerate() + .filter_map(|(index, bit)| if bit { Some(index as u64) } else { None }) + .collect() + } + + pub fn to_single_attestation_with_attester_index( + &self, + attester_index: u64, + ) -> Result { + Ok(SingleAttestation { + committee_index: self.data.index, + attester_index, + data: self.data.clone(), + signature: self.signature.clone(), + }) + } } impl SlotData for Attestation { @@ -596,6 +624,24 @@ pub struct SingleAttestation { pub signature: AggregateSignature, } +impl SingleAttestation { + pub fn to_indexed(&self, fork_name: ForkName) -> IndexedAttestation { + if fork_name.electra_enabled() { + IndexedAttestation::Electra(IndexedAttestationElectra { + attesting_indices: vec![self.attester_index].into(), + data: self.data.clone(), + signature: self.signature.clone(), + }) + } else { + IndexedAttestation::Base(IndexedAttestationBase { + attesting_indices: vec![self.attester_index].into(), + data: self.data.clone(), + signature: self.signature.clone(), + }) + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/validator_client/validator_services/src/attestation_service.rs b/validator_client/validator_services/src/attestation_service.rs index f7765677065..e4063cd2117 100644 --- a/validator_client/validator_services/src/attestation_service.rs +++ b/validator_client/validator_services/src/attestation_service.rs @@ -1,6 +1,5 @@ use crate::duties_service::{DutiesService, DutyAndProof}; use beacon_node_fallback::{ApiTopic, BeaconNodeFallback}; -use either::Either; use futures::future::join_all; use logging::crit; use slot_clock::SlotClock; @@ -461,40 +460,32 @@ impl AttestationService Some(a), - Err(e) => { - // This shouldn't happen unless BN and VC are out of sync with - // respect to the Electra fork. - error!( - error = ?e, - committee_index = attestation_data.index, - slot = slot.as_u64(), - "type" = "unaggregated", - "Unable to convert to SingleAttestation" - ); - None - } + + let single_attestations = attestations + .iter() + .zip(validator_indices) + .filter_map(|(a, i)| { + match a.to_single_attestation_with_attester_index(*i) { + Ok(a) => Some(a), + Err(e) => { + // This shouldn't happen unless BN and VC are out of sync with + // respect to the Electra fork. + error!( + error = ?e, + committee_index = attestation_data.index, + slot = slot.as_u64(), + "type" = "unaggregated", + "Unable to convert to SingleAttestation" + ); + None } - }) - .collect::>(); + } + }) + .collect::>(); - beacon_node - .post_beacon_pool_attestations_v2::( - Either::Right(single_attestations), - fork_name, - ) - .await - } else { - beacon_node - .post_beacon_pool_attestations_v1(attestations) - .await - } + beacon_node + .post_beacon_pool_attestations_v2::(single_attestations, fork_name) + .await }) .await {