diff --git a/Cargo.lock b/Cargo.lock index 409002eca..f85c254fe 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -789,6 +789,7 @@ dependencies = [ "database", "eth2", "ethereum_ssz", + "fork", "futures", "hex", "lru 0.16.2", @@ -1387,7 +1388,7 @@ checksum = "0e050f626429857a27ddccb31e0aca21356bfa709c04041aefddac081a8f068a" [[package]] name = "beacon_node_fallback" version = "0.1.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "bls", "clap", @@ -1487,7 +1488,7 @@ dependencies = [ [[package]] name = "bls" version = "0.2.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "alloy-primitives 1.5.2", "arbitrary", @@ -2620,7 +2621,7 @@ dependencies = [ [[package]] name = "eip_3076" version = "0.1.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "bls", "ethereum_serde_utils 0.8.0", @@ -2859,7 +2860,7 @@ dependencies = [ [[package]] name = "eth2" version = "0.1.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "bls", "context_deserialize", @@ -2885,7 +2886,7 @@ dependencies = [ [[package]] name = "eth2_config" version = "0.2.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "paste", "types", @@ -2894,7 +2895,7 @@ dependencies = [ [[package]] name = "eth2_interop_keypairs" version = "0.2.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "bls", "ethereum_hashing 0.8.0", @@ -2907,7 +2908,7 @@ dependencies = [ [[package]] name = "eth2_key_derivation" version = "0.1.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "bls", "num-bigint-dig", @@ -2919,7 +2920,7 @@ dependencies = [ [[package]] name = "eth2_keystore" version = "0.1.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "aes", "bls", @@ -2943,7 +2944,7 @@ dependencies = [ [[package]] name = "eth2_network_config" version = "0.2.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "bytes", "discv5", @@ -3144,7 +3145,7 @@ dependencies = [ [[package]] name = "filesystem" version = "0.1.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "winapi", "windows-acl", @@ -3171,7 +3172,7 @@ dependencies = [ [[package]] name = "fixed_bytes" version = "0.1.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "alloy-primitives 1.5.2", "safe_arith", @@ -3479,7 +3480,7 @@ dependencies = [ [[package]] name = "graffiti_file" version = "0.1.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "bls", "serde", @@ -3590,7 +3591,7 @@ dependencies = [ [[package]] name = "health_metrics" version = "0.1.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "eth2", "metrics", @@ -4108,7 +4109,7 @@ dependencies = [ [[package]] name = "int_to_bytes" version = "0.2.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "bytes", ] @@ -4273,7 +4274,7 @@ dependencies = [ [[package]] name = "kzg" version = "0.1.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "arbitrary", "c-kzg", @@ -4812,7 +4813,7 @@ dependencies = [ [[package]] name = "logging" version = "0.2.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "chrono", "logroller", @@ -4876,7 +4877,7 @@ checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" [[package]] name = "lru_cache" version = "0.1.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "fnv", ] @@ -4957,7 +4958,7 @@ dependencies = [ [[package]] name = "merkle_proof" version = "0.2.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "alloy-primitives 1.5.2", "ethereum_hashing 0.8.0", @@ -5057,7 +5058,7 @@ dependencies = [ [[package]] name = "metrics" version = "0.2.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "prometheus", ] @@ -5317,7 +5318,7 @@ dependencies = [ [[package]] name = "network_utils" version = "0.1.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "discv5", "libp2p-identity", @@ -5884,7 +5885,7 @@ dependencies = [ [[package]] name = "pretty_reqwest_error" version = "0.1.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "reqwest", "sensitive_url", @@ -6036,7 +6037,7 @@ dependencies = [ [[package]] name = "proto_array" version = "0.2.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "ethereum_ssz", "ethereum_ssz_derive", @@ -7166,7 +7167,7 @@ checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" [[package]] name = "slashing_protection" version = "0.1.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "arbitrary", "bls", @@ -7187,7 +7188,7 @@ dependencies = [ [[package]] name = "slot_clock" version = "0.2.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "metrics", "parking_lot", @@ -7408,7 +7409,7 @@ dependencies = [ [[package]] name = "swap_or_not_shuffle" version = "0.2.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "alloy-primitives 1.5.2", "ethereum_hashing 0.8.0", @@ -7511,7 +7512,7 @@ checksum = "c63f48baada5c52e65a29eef93ab4f8982681b67f9e8d29c7b05abcfec2b9ffe" [[package]] name = "task_executor" version = "0.1.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "async-channel 1.9.0", "futures", @@ -7554,7 +7555,7 @@ checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683" [[package]] name = "test_random_derive" version = "0.2.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "quote", "syn 2.0.112", @@ -8013,7 +8014,7 @@ checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" [[package]] name = "types" version = "0.2.1" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "alloy-primitives 1.5.2", "alloy-rlp", @@ -8215,7 +8216,7 @@ dependencies = [ [[package]] name = "validator_metrics" version = "0.1.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "metrics", ] @@ -8223,7 +8224,7 @@ dependencies = [ [[package]] name = "validator_services" version = "0.1.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "beacon_node_fallback", "bls", @@ -8247,7 +8248,7 @@ dependencies = [ [[package]] name = "validator_store" version = "0.1.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "bls", "eth2", @@ -8848,7 +8849,7 @@ checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" [[package]] name = "workspace_members" version = "0.1.0" -source = "git+https://github.com/sigp/lighthouse?rev=58b153cac#58b153cac5a078d849bf8478bd21baedfe98d9ee" +source = "git+https://github.com/shane-moore/lighthouse?rev=35fb9763#35fb9763f40c4e43ecfa8b701d3f6c97db8fdea5" dependencies = [ "cargo_metadata", "quote", diff --git a/Cargo.toml b/Cargo.toml index a8afb4545..91c4cfd53 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -68,23 +68,25 @@ ssv_types = { path = "anchor/common/ssv_types" } subnet_service = { path = "anchor/subnet_service" } version = { path = "anchor/common/version" } -# Lighthouse commit is unstable branch `58b153cac` as of 1/19/2026 -beacon_node_fallback = { git = "https://github.com/sigp/lighthouse", rev = "58b153cac" } -bls = { git = "https://github.com/sigp/lighthouse", rev = "58b153cac" } -eth2 = { git = "https://github.com/sigp/lighthouse", rev = "58b153cac" } -eth2_keystore = { git = "https://github.com/sigp/lighthouse", rev = "58b153cac" } -eth2_network_config = { git = "https://github.com/sigp/lighthouse", rev = "58b153cac" } -health_metrics = { git = "https://github.com/sigp/lighthouse", rev = "58b153cac" } -metrics = { git = "https://github.com/sigp/lighthouse", rev = "58b153cac" } -network_utils = { git = "https://github.com/sigp/lighthouse", rev = "58b153cac" } -slashing_protection = { git = "https://github.com/sigp/lighthouse", rev = "58b153cac" } -slot_clock = { git = "https://github.com/sigp/lighthouse", rev = "58b153cac" } -task_executor = { git = "https://github.com/sigp/lighthouse", rev = "58b153cac" } -types = { git = "https://github.com/sigp/lighthouse", rev = "58b153cac" } -validator_metrics = { git = "https://github.com/sigp/lighthouse", rev = "58b153cac" } -validator_services = { git = "https://github.com/sigp/lighthouse", rev = "58b153cac" } -validator_store = { git = "https://github.com/sigp/lighthouse", rev = "58b153cac" } -workspace_members = { git = "https://github.com/sigp/lighthouse", rev = "58b153cac" } +# TODO: Switch back to sigp/lighthouse after https://github.com/sigp/lighthouse/pull/8684 is merged. +# These changes are expected in lighthouse 8.1 release per https://github.com/sigp/lighthouse/issues/8681 +# Lighthouse fork with watch-attest-sync-duties branch `8725b85c` as of 1/20/2026 +beacon_node_fallback = { git = "https://github.com/shane-moore/lighthouse", rev = "35fb9763" } +bls = { git = "https://github.com/shane-moore/lighthouse", rev = "35fb9763" } +eth2 = { git = "https://github.com/shane-moore/lighthouse", rev = "35fb9763" } +eth2_keystore = { git = "https://github.com/shane-moore/lighthouse", rev = "35fb9763" } +eth2_network_config = { git = "https://github.com/shane-moore/lighthouse", rev = "35fb9763" } +health_metrics = { git = "https://github.com/shane-moore/lighthouse", rev = "35fb9763" } +metrics = { git = "https://github.com/shane-moore/lighthouse", rev = "35fb9763" } +network_utils = { git = "https://github.com/shane-moore/lighthouse", rev = "35fb9763" } +slashing_protection = { git = "https://github.com/shane-moore/lighthouse", rev = "35fb9763" } +slot_clock = { git = "https://github.com/shane-moore/lighthouse", rev = "35fb9763" } +task_executor = { git = "https://github.com/shane-moore/lighthouse", rev = "35fb9763" } +types = { git = "https://github.com/shane-moore/lighthouse", rev = "35fb9763" } +validator_metrics = { git = "https://github.com/shane-moore/lighthouse", rev = "35fb9763" } +validator_services = { git = "https://github.com/shane-moore/lighthouse", rev = "35fb9763" } +validator_store = { git = "https://github.com/shane-moore/lighthouse", rev = "35fb9763" } +workspace_members = { git = "https://github.com/shane-moore/lighthouse", rev = "35fb9763" } alloy = { version = "1.2.1", features = [ "sol-types", diff --git a/anchor/client/src/lib.rs b/anchor/client/src/lib.rs index 635d0eaaf..f0c823e2f 100644 --- a/anchor/client/src/lib.rs +++ b/anchor/client/src/lib.rs @@ -542,6 +542,7 @@ impl Client { spec.clone(), genesis_validators_root, config.impostor.is_none().then_some(key), + fork_schedule.clone(), config.gas_limit, config.builder_boost_factor, config.prefer_builder_proposals, diff --git a/anchor/common/ssv_types/src/consensus.rs b/anchor/common/ssv_types/src/consensus.rs index 43f2f71ad..7048bc097 100644 --- a/anchor/common/ssv_types/src/consensus.rs +++ b/anchor/common/ssv_types/src/consensus.rs @@ -28,7 +28,7 @@ use types::{ ForkName, Hash256, Slot, SyncCommitteeContribution, }; -use crate::{ValidatorIndex, message::*}; +use crate::{CommitteeId, ValidatorIndex, message::*, partial_sig::PartialSignatureKind}; // UnsignedSSVMessage // ---------------------------------------------- // | | @@ -861,6 +861,39 @@ impl QbftData for BeaconVote { } } +/// Identifies a batch of pre-consensus selection proofs for a committee. +/// All operators compute the same hash for a given `(slot, committee_id)` pair, ensuring +/// consistent batching across the network. +/// +/// Unlike `BeaconVote::hash()` which hashes decided consensus data, this hash is an +/// artificial correlation identifier since pre-consensus selection proofs have different +/// signing roots (attestation vs sync committee selection proofs use different domains). +#[derive(Debug, Clone)] +pub struct SelectionProofBatchId { + pub slot: Slot, + pub committee_id: CommitteeId, +} + +impl SelectionProofBatchId { + pub fn new(slot: Slot, committee_id: CommitteeId) -> Self { + Self { slot, committee_id } + } + + /// Compute deterministic hash for batching correlation. + /// + /// The hash includes the SSZ encoding of `PartialSignatureKind::AggregatorCommitteePartialSig` + /// as a domain separator to prevent collision with other hashes in the system. + pub fn hash(&self) -> Hash256 { + let mut hasher = Sha256::new(); + // Domain separator: SSZ encoding of the partial signature kind + hasher.update(PartialSignatureKind::AggregatorCommitteePartialSig.as_ssz_bytes()); + hasher.update(self.slot.as_u64().to_le_bytes()); + hasher.update(self.committee_id.0); + + Hash256::from_slice(&hasher.finalize()) + } +} + pub struct BeaconVoteValidator { slot: Slot, // `None` if slashing protection is disabled via CLI. @@ -1897,4 +1930,75 @@ mod tests { err => panic!("Expected DifferentCheckpoint error, got: {:?}", err), } } + + // ═══════════════════════════════════════════════════════════════════════════════ + // SelectionProofBatchId Tests + // ═══════════════════════════════════════════════════════════════════════════════ + + #[test] + fn test_selection_proof_batch_id_same_inputs_same_hash() { + let slot = Slot::new(12345); + let committee_id = CommitteeId::from([0u8; 32]); + + // Compute hash twice with same inputs + let batch_id1 = SelectionProofBatchId::new(slot, committee_id); + let batch_id2 = SelectionProofBatchId::new(slot, committee_id); + + // Same inputs should produce same hash + assert_eq!(batch_id1.hash(), batch_id2.hash()); + } + + #[test] + fn test_selection_proof_batch_id_different_slots() { + let committee_id = CommitteeId::from([1u8; 32]); + + // Different slots + let slot1 = Slot::new(12345); + let slot2 = Slot::new(12346); + + let batch_id1 = SelectionProofBatchId::new(slot1, committee_id); + let batch_id2 = SelectionProofBatchId::new(slot2, committee_id); + + // Different slots should produce different hashes + assert_ne!(batch_id1.hash(), batch_id2.hash()); + } + + #[test] + fn test_selection_proof_batch_id_different_committees() { + let slot = Slot::new(12345); + + // Different committee IDs + let committee_id1 = CommitteeId::from([1u8; 32]); + let committee_id2 = CommitteeId::from([2u8; 32]); + + let batch_id1 = SelectionProofBatchId::new(slot, committee_id1); + let batch_id2 = SelectionProofBatchId::new(slot, committee_id2); + + // Different committees should produce different hashes + assert_ne!(batch_id1.hash(), batch_id2.hash()); + } + + #[test] + fn test_selection_proof_batch_id_deterministic_across_operators() { + // This test verifies that the hash is deterministic and identical + // across different operators for the same inputs + + let slot = Slot::new(42); + let committee_bytes = [ + 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc, 0xde, 0xf0, 0x11, 0x22, 0x33, 0x44, 0x55, 0x66, + 0x77, 0x88, 0x99, 0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, 0x00, 0x10, 0x20, 0x30, 0x40, + 0x50, 0x60, 0x70, 0x80, + ]; + let committee_id = CommitteeId::from(committee_bytes); + + let batch_id = SelectionProofBatchId::new(slot, committee_id); + let expected_hash = batch_id.hash(); + + // Simulate computing on different "operators" (same calculation repeated) + for _ in 0..3 { + let batch_id = SelectionProofBatchId::new(slot, committee_id); + // All operators should get the same hash + assert_eq!(batch_id.hash(), expected_hash); + } + } } diff --git a/anchor/qbft_manager/src/lib.rs b/anchor/qbft_manager/src/lib.rs index e5ee66be3..dede604ea 100644 --- a/anchor/qbft_manager/src/lib.rs +++ b/anchor/qbft_manager/src/lib.rs @@ -311,7 +311,7 @@ pub trait QbftDecidable: QbftData + Send + Sync + 'static { dashmap::Entry::Occupied(entry) => entry.get().clone(), dashmap::Entry::Vacant(entry) => { // There is not an instance running yet, store the sender and spawn a new instance - // with the reeiver + // with the receiver let (tx, rx) = mpsc::unbounded_channel(); let span = debug_span!("qbft_instance", instance_id = ?entry.key()); let tx = entry.insert(tx); diff --git a/anchor/validator_store/Cargo.toml b/anchor/validator_store/Cargo.toml index 456173a2b..473df03c6 100644 --- a/anchor/validator_store/Cargo.toml +++ b/anchor/validator_store/Cargo.toml @@ -10,6 +10,7 @@ bls = { workspace = true } database = { workspace = true } eth2 = { workspace = true } ethereum_ssz = { workspace = true } +fork = { workspace = true } futures = { workspace = true } hex = { workspace = true } lru = { workspace = true } diff --git a/anchor/validator_store/src/lib.rs b/anchor/validator_store/src/lib.rs index 965de4aed..d5a8206b5 100644 --- a/anchor/validator_store/src/lib.rs +++ b/anchor/validator_store/src/lib.rs @@ -15,6 +15,7 @@ use std::{ use bls::{PublicKeyBytes, SecretKey, Signature}; use database::{NetworkDatabase, NonUniqueIndex, UniqueIndex}; use eth2::types::{BlockContents, FullBlockContents, PublishBlockRequest}; +use fork::{Fork, ForkSchedule}; use lru::LruCache; use openssl::{ pkey::Private, @@ -37,7 +38,8 @@ use ssv_types::{ consensus::{ BEACON_ROLE_AGGREGATOR, BEACON_ROLE_PROPOSER, BEACON_ROLE_SYNC_COMMITTEE_CONTRIBUTION, BeaconVote, BeaconVoteValidator, Contribution, ContributionWrapper, Contributions, - QbftData, ValidatorConsensusData, ValidatorConsensusDataValidator, ValidatorDuty, + QbftData, SelectionProofBatchId, ValidatorConsensusData, ValidatorConsensusDataValidator, + ValidatorDuty, }, msgid::Role, partial_sig::PartialSignatureKind, @@ -96,7 +98,12 @@ pub struct AnchorValidatorStore { spec: Arc, genesis_validators_root: Hash256, private_key: Option>, - slot_metadata: watch::Sender>>>, + fork_schedule: Arc, + voting_context_tx: watch::Sender>>, + /// Watch channel for VotingAssignments (cached at slot start) + voting_assignments_tx: watch::Sender>>, + /// Watch channel for AggregationAssignments (cached at 2/3 slot) + aggregation_assignments_tx: watch::Sender>>>, gas_limit: u64, // MEV configuration is applied at the operator level and applies to all validators this // operator controls @@ -118,6 +125,7 @@ impl AnchorValidatorStore { spec: Arc, genesis_validators_root: Hash256, private_key: Option>, + fork_schedule: Arc, gas_limit: u64, builder_boost_factor: Option, prefer_builder_proposals: bool, @@ -136,7 +144,10 @@ impl AnchorValidatorStore { spec, genesis_validators_root, private_key, - slot_metadata: watch::channel(None).0, + fork_schedule, + voting_context_tx: watch::channel(None).0, + voting_assignments_tx: watch::channel(None).0, + aggregation_assignments_tx: watch::channel(None).0, gas_limit, builder_boost_factor, prefer_builder_proposals, @@ -195,7 +206,7 @@ impl AnchorValidatorStore { &self, signature_kind: PartialSignatureKind, role: Role, - collection_mode: CollectionMode, + collection_mode: CollectionMode, validator: &ValidatorMetadata, cluster: &Cluster, signing_root: Hash256, @@ -221,30 +232,12 @@ impl AnchorValidatorStore { pubkey: validator.public_key, }, CollectionMode::Committee { - slot_metadata, + num_signatures_to_collect, base_hash, - } => { - let num_signatures_to_collect = state - .metadata() - .get_all_by(&committee_id) - .map(|validator| { - let mut duties = 0; - if let Some(idx) = &validator.index { - if slot_metadata.attesting_validator_indices.contains(idx) { - duties += 1; - } - if slot_metadata.sync_validators.contains(idx) { - duties += 1; - } - } - duties - }) - .sum(); - SignatureRequester::Committee { - num_signatures_to_collect, - base_hash, - } - } + } => SignatureRequester::Committee { + num_signatures_to_collect, + base_hash, + }, }; let encrypted_private_key = state .shares() @@ -418,16 +411,19 @@ impl AnchorValidatorStore { Ok(signable_block.to_signed_block(signature)) } - /// Get the [`SlotMetadata`] for the given [`Slot`], waiting for it to become available if + /// Get the [`VotingContext`] for the given [`Slot`], waiting for it to become available if /// necessary. If the requested slot has already passed, an error is returned. /// - /// IMPORTANT: The slot metadata is computed starting at 1/3rd into the slot - so do not try + /// IMPORTANT: The voting context is computed starting at 1/3rd into the slot - so do not try /// to retrieve it if sleeping until then is not tolerable. - async fn get_slot_metadata(&self, slot: Slot) -> Result>, Error> { + async fn get_voting_context(&self, slot: Slot) -> Result, Error> { let Some(metadata) = self - .slot_metadata + .voting_context_tx .subscribe() - .wait_for(|m| m.as_ref().is_some_and(|metadata| metadata.slot >= slot)) + .wait_for(|m| { + m.as_ref() + .is_some_and(|metadata| metadata.voting_assignments.slot >= slot) + }) .await .ok() .and_then(|metadata| metadata.clone()) @@ -436,7 +432,7 @@ impl AnchorValidatorStore { return Err(Error::SpecificError(SpecificError::Metadata)); }; - if metadata.slot == slot { + if metadata.voting_assignments.slot == slot { Ok(metadata.clone()) } else { error!("Got newer metadata - performance issues?"); @@ -444,8 +440,86 @@ impl AnchorValidatorStore { } } - fn update_slot_metadata(&self, metadata: SlotMetadata) { - self.slot_metadata.send_replace(Some(Arc::new(metadata))); + fn update_voting_context(&self, metadata: VotingContext) { + self.voting_context_tx + .send_replace(Some(Arc::new(metadata))); + } + + /// Get validator voting assignments, waiting if not yet available for this slot. + /// + /// This method waits until VotingAssignments for the requested slot becomes available. + /// Returns an error if the requested slot has already passed or if the watch channel is closed. + pub async fn get_voting_assignments( + &self, + slot: Slot, + ) -> Result, Error> { + let Some(voting_assignments) = self + .voting_assignments_tx + .subscribe() + .wait_for(|d| d.as_ref().is_some_and(|info| info.slot >= slot)) + .await + .ok() + .and_then(|info| info.clone()) + else { + return Err(Error::SpecificError(SpecificError::MetadataChannelClosed)); + }; + + if voting_assignments.slot == slot { + Ok(voting_assignments) + } else { + Err(Error::SpecificError(SpecificError::MetadataSlotPassed)) + } + } + + /// Update validator voting assignments (called by MetadataService at slot start). + /// + /// This publishes the VotingAssignments to all subscribers via the watch channel. + pub fn update_voting_assignments(&self, voting_assignments: VotingAssignments) { + self.voting_assignments_tx + .send_replace(Some(Arc::new(voting_assignments))); + } + + /// Get aggregator voting assignments, waiting if not yet available for this slot. + /// + /// This method waits until AggregationAssignments for the requested slot becomes available. + /// Called by `produce_signed_aggregate_and_proof` and `produce_signed_contribution_and_proof` + /// at 2/3 slot. + /// + /// Returns an error if the requested slot has already passed or if the watch channel is closed. + pub async fn get_aggregation_assignments( + &self, + slot: Slot, + ) -> Result>, Error> { + let Some(aggregator_info) = self + .aggregation_assignments_tx + .subscribe() + .wait_for(|a| a.as_ref().is_some_and(|info| info.slot >= slot)) + .await + .ok() + .and_then(|info| info.clone()) + else { + return Err(Error::SpecificError( + SpecificError::AggregatorInfoChannelClosed, + )); + }; + + if aggregator_info.slot == slot { + Ok(aggregator_info) + } else { + Err(Error::SpecificError( + SpecificError::AggregatorInfoSlotPassed, + )) + } + } + + /// Update aggregator voting assignments (called by MetadataService Phase 3 at 2/3 slot). + /// + /// This publishes the AggregationAssignments to all subscribers via the watch channel. + /// At 2/3 slot, selection proofs have been computed by Lighthouse, so + /// `DutyAndProof.selection_proof.is_some()` accurately indicates `is_aggregator`. + pub fn update_aggregation_assignments(&self, info: AggregationAssignments) { + self.aggregation_assignments_tx + .send_replace(Some(Arc::new(info))); } /// Return [`SpecificError::Timeout`] if the given future does not complete at `delay` into the @@ -568,7 +642,7 @@ impl AnchorValidatorStore { fn get_attesting_validators_in_committee( &self, - metadata: &SlotMetadata, + metadata: &VotingContext, committee_id: CommitteeId, ) -> HashMap { let committee_validators = self @@ -580,7 +654,8 @@ impl AnchorValidatorStore { .collect::>(); metadata - .attesting_validator_committees + .voting_assignments + .attesting_committees .iter() .filter_map(|(&pubkey, &index)| { committee_validators @@ -668,24 +743,159 @@ fn decrypt_key_share( .map_err(|err| error!(?err, validator = %pubkey_bytes, "Invalid secret key decrypted")) } -struct SlotMetadata { - /// The slot this metadata is about. - slot: Slot, - /// The BeaconVote we will use as initial QBFT data. +struct VotingContext { + /// Cached voting assignments (computed at slot start, reused here) + voting_assignments: Arc, + /// The BeaconVote (only available at 1/3 slot from beacon node) beacon_vote: BeaconVote, - /// The indices of all our validators that are attesting in this slot. - attesting_validator_indices: Vec, - /// The pubkeys of all our validators that are attesting in this slot, mapped to their - /// attestation committee index. - attesting_validator_committees: HashMap, - /// All our validators that are in the sync committee for this slot. - sync_validators: Vec, - /// All validators that are aggregator for this slot multiple times, and thus require special - /// synchronization. +} + +/// Cached validator voting assignments for a slot. +/// +/// This struct caches voting assignments computed at slot start and reuses it at 1/3 slot, +/// eliminating redundant computation. It supports two different counting patterns: +/// +/// 1. **Committee messages** (attestation + sync): `+1` per sync validator +/// 2. **Selection proofs** (aggregator committee): `+N` per sync validator (N = subnets) +#[derive(Debug, Clone)] +pub struct VotingAssignments { + /// The slot this voting assignments is about. + pub slot: Slot, + /// The indices of validators that are attesting in this slot. + pub attesting_validators: Vec, + /// The pubkeys of attesting validators mapped to their attestation committee index. + pub attesting_committees: HashMap, + /// Sync committee validators mapped to their subnet IDs. + /// A validator may participate in multiple subnets. + pub sync_validators_by_subnet: HashMap>, +} + +impl VotingAssignments { + /// Returns a flat list of all sync validator indices. + /// + /// Derives this from the keys of `sync_validators_by_subnet`. + pub fn sync_validators(&self) -> Vec { + self.sync_validators_by_subnet.keys().copied().collect() + } + + /// Counts expected signatures for selection proof collection. + /// + /// For each validator in the committee: + /// - `+1` if the validator is attesting + /// - `+N` if the validator is in sync committee (N = number of subnets) + /// + /// This counting pattern is used for aggregator committee pre-consensus where + /// each sync validator produces one selection proof per subnet they participate in. + pub fn selection_proof_count_for_committee(&self, is_in_committee: F) -> usize + where + F: Fn(&ValidatorIndex) -> bool, + { + let mut count = 0; + + // Count attesting validators: +1 each + for validator_idx in &self.attesting_validators { + if is_in_committee(validator_idx) { + count += 1; + } + } + + // Count sync validators: +N each (N = number of subnets) + for (validator_idx, subnets) in &self.sync_validators_by_subnet { + if is_in_committee(validator_idx) { + count += subnets.len(); + } + } + + count + } + + /// Counts expected signatures for voting message collection. + /// + /// For each validator in the committee: + /// - `+1` if the validator is attesting + /// - `+1` if the validator is in sync committee (regardless of subnet count) + /// + /// This counting pattern is used for post-consensus attestation and sync committee + /// voting messages where each validator produces one message regardless of + /// how many subnets they participate in. + pub fn voting_message_count_for_committee(&self, is_in_committee: F) -> usize + where + F: Fn(&ValidatorIndex) -> bool, + { + let mut count = 0; + + // Count attesting validators: +1 each + for validator_idx in &self.attesting_validators { + if is_in_committee(validator_idx) { + count += 1; + } + } + + // Count sync validators: +1 each (flat, regardless of subnet count) + for validator_idx in self.sync_validators_by_subnet.keys() { + if is_in_committee(validator_idx) { + count += 1; + } + } + + count + } +} + +/// Aggregator-specific voting assignments, cached at 2/3 slot when selection proofs are known. +/// +/// This struct is separate from `VotingAssignments` because: +/// - `VotingAssignments` is cached at slot start, before selection proofs are computed +/// - `AggregationAssignments` is cached at 2/3 slot, after Lighthouse fills in selection proofs +/// +/// At 2/3 slot, `DutyAndProof.selection_proof.is_some()` indicates `is_aggregator = true`. +/// +/// Also tracks multi-subnet sync aggregators. When a validator aggregates for multiple +/// sync subnets, `produce_signed_contribution_and_proof` is called multiple times (once +/// per subnet). The waiter ensures all contributions are collected before starting QBFT. +pub struct AggregationAssignments { + /// The slot this info is for + pub slot: Slot, + + /// Validators that are attestation aggregators (selection_proof.is_some()) + pub aggregating_attesters: HashSet, + + /// Pubkey -> committee_index for aggregating validators + pub aggregator_committees: HashMap, + + /// Sync committee aggregators: validator_index -> set of subnet IDs they aggregate + pub sync_aggregators_by_subnet: HashMap>, + + /// Multi-subnet sync aggregators (validators aggregating > 1 subnet) multi_sync_aggregators: HashMap>, } -struct ContributionWaiter { +impl AggregationAssignments { + /// Total attestation aggregators for a committee + pub fn attestation_aggregator_count(&self, is_in_committee: F) -> usize + where + F: Fn(&ValidatorIndex) -> bool, + { + self.aggregating_attesters + .iter() + .filter(|idx| is_in_committee(idx)) + .count() + } + + /// Total sync aggregator assignments (validator * subnet pairs) for a committee + pub fn sync_aggregator_assignment_count(&self, is_in_committee: F) -> usize + where + F: Fn(&ValidatorIndex) -> bool, + { + self.sync_aggregators_by_subnet + .iter() + .filter(|(idx, _)| is_in_committee(idx)) + .map(|(_, subnets)| subnets.len()) + .sum() + } +} + +pub struct ContributionWaiter { data: RwLock>>, barrier: Barrier, } @@ -719,10 +929,10 @@ pub struct ContributionAndProofSigningData { selection_proof: SyncSelectionProof, } -enum CollectionMode { +enum CollectionMode { SingleValidator, Committee { - slot_metadata: Arc>, + num_signatures_to_collect: usize, base_hash: Hash256, }, } @@ -750,6 +960,25 @@ pub enum SpecificError { KeyShareDecryptionFailed, DataTooLarge(String), ClusterLiquidated, + /// Requested slot has already passed the current cached slot in VotingAssignments + MetadataSlotPassed, + /// Watch channel for VotingAssignments has been closed + MetadataChannelClosed, + /// Requested slot has already passed the current cached slot in AggregationAssignments + AggregatorInfoSlotPassed, + /// Watch channel for AggregationAssignments has been closed + AggregatorInfoChannelClosed, + /// produce_selection_proof called for validator not in VotingAssignments.attesting_committees + ValidatorNotAttesting { + validator_pubkey: PublicKeyBytes, + slot: Slot, + }, + /// produce_sync_selection_proof called for validator not in + /// VotingAssignments.sync_validators_by_subnet + ValidatorNotInSyncCommittee { + validator_pubkey: PublicKeyBytes, + slot: Slot, + }, } impl From for SpecificError { @@ -1016,10 +1245,10 @@ impl ValidatorStore for AnchorValidatorStore { } let (validator, cluster) = self.get_validator_and_cluster(validator_pubkey)?; - let slot_metadata = self.get_slot_metadata(attestation.data().slot).await?; + let voting_context_tx = self.get_voting_context(attestation.data().slot).await?; - let validator_attestation_committees = - self.get_attesting_validators_in_committee(&slot_metadata, cluster.committee_id()); + let validator_attestation_committees = self + .get_attesting_validators_in_committee(&voting_context_tx, cluster.committee_id()); let timer = metrics::start_timer_vec(&metrics::CONSENSUS_TIMES, &[metrics::BEACON_VOTE]); @@ -1070,13 +1299,31 @@ impl ValidatorStore for AnchorValidatorStore { ))?; } + // Calculate signature count for post-consensus committee collection + // Build a set of validator indices in this committee + let committee_validator_indices: HashSet = { + let state = self.database.state(); + state + .metadata() + .get_all_by(&cluster.committee_id()) + .filter_map(|v| v.index) + .collect() + }; + + // Use voting_message_count_for_committee for post-consensus (flat counting) + let num_signatures_to_collect = voting_context_tx + .voting_assignments + .voting_message_count_for_committee(|idx| { + committee_validator_indices.contains(idx) + }); + let signing_root = attestation.data().signing_root(domain_hash); let signature = self .collect_signature( PartialSignatureKind::PostConsensus, Role::Committee, CollectionMode::Committee { - slot_metadata, + num_signatures_to_collect, base_hash: data_hash, }, &validator, @@ -1291,8 +1538,71 @@ impl ValidatorStore for AnchorValidatorStore { // then. let delay = Duration::from_secs(self.spec.seconds_per_slot) * 2 / 3; - let signature = self - .timeout_within_slot( + let signature = if self.fork_schedule.active_fork(epoch) >= Fork::Boole { + let committee_id = cluster.committee_id(); + let voting_assignments = self.get_voting_assignments(slot).await?; + + // Defensive check: validator should be in `VotingAssignments` since both this + // function call and `VotingAssignments` are derived from `DutiesService`. If not, + // there's an inconsistency (e.g., stale cache after poll timeout) and we + // should not participate with a wrong `num_signatures_to_collect`. + if !voting_assignments + .attesting_committees + .contains_key(&validator_pubkey) + { + return Err(SpecificError::ValidatorNotAttesting { + validator_pubkey, + slot, + } + .into()); + } + + // Build a set of validator indices in this committee + // This handles divergent operator views, since we only count validators we have + // shares + let committee_validator_indices: HashSet = { + let state = self.database.state(); + state + .metadata() + .get_all_by(&committee_id) + .filter_map(|v| v.index) + .collect() + }; + + // Calculate how many selection proofs to collect using the selection proof counting + // method. + let num_signatures_to_collect = voting_assignments + .selection_proof_count_for_committee(|idx| { + committee_validator_indices.contains(idx) + }); + + // Compute deterministic base_hash for batching (same across all operators in a + // committee) + let batch_id = SelectionProofBatchId::new(slot, committee_id); + let base_hash = batch_id.hash(); + + let collection_mode = CollectionMode::Committee { + num_signatures_to_collect, + base_hash, + }; + + self.timeout_within_slot( + slot, + delay, + self.collect_signature( + PartialSignatureKind::AggregatorCommitteePartialSig, + Role::AggregatorCommittee, + collection_mode, + &validator, + &cluster, + signing_root, + slot, + ), + ) + .await? + } else { + // Single validator collection (original behavior) + self.timeout_within_slot( slot, delay, self.collect_signature( @@ -1305,7 +1615,9 @@ impl ValidatorStore for AnchorValidatorStore { slot, ), ) - .await?; + .await? + }; + Ok(signature.into()) }; @@ -1338,12 +1650,78 @@ impl ValidatorStore for AnchorValidatorStore { // then. let delay = Duration::from_secs(self.spec.seconds_per_slot) * 2 / 3; - let signature = self - .timeout_within_slot( + let signature = if self.fork_schedule.active_fork(epoch) >= Fork::Boole { + // Boole fork: Committee-based batching (batches with attestation selection proofs) + let committee_id = cluster.committee_id(); + let voting_assignments = self.get_voting_assignments(slot).await?; + + // Defensive check: validator should be in `VotingAssignments` since both this + // function call and `VotingAssignments` are derived from `DutiesService`. If not, + // there's an inconsistency (e.g., stale cache after poll timeout) and we + // should not participate with a wrong `num_signatures_to_collect`. + let validator_index = validator.index.ok_or(SpecificError::MissingIndex)?; + if !voting_assignments + .sync_validators_by_subnet + .contains_key(&validator_index) + { + return Err(SpecificError::ValidatorNotInSyncCommittee { + validator_pubkey: *validator_pubkey, + slot, + } + .into()); + } + + // Build a set of validator indices in this committee + // This handles divergent operator views, since we only count validators we have + // shares for + let committee_validator_indices: HashSet = { + let state = self.database.state(); + state + .metadata() + .get_all_by(&committee_id) + .filter_map(|v| v.index) + .collect() + }; + + // Calculate how many selection proofs to collect using the selection proof counting + // method. + let num_signatures_to_collect = voting_assignments + .selection_proof_count_for_committee(|idx| { + committee_validator_indices.contains(idx) + }); + + // Compute deterministic base_hash for batching (SAME as attestation selection + // proofs). This ensures all selection proofs (attestation + sync) batch together + // into one P2P message per committee. + let batch_id = SelectionProofBatchId::new(slot, committee_id); + let base_hash = batch_id.hash(); + + let collection_mode = CollectionMode::Committee { + num_signatures_to_collect, + base_hash, + }; + + self.timeout_within_slot( slot, delay, self.collect_signature( - PartialSignatureKind::ContributionProofs, + PartialSignatureKind::AggregatorCommitteePartialSig, + Role::AggregatorCommittee, + collection_mode, + &validator, + &cluster, + signing_root, + slot, + ), + ) + .await? + } else { + // Single-validator collection (original behavior) + self.timeout_within_slot( + slot, + delay, + self.collect_signature( + PartialSignatureKind::ContributionProofs, // Original Alan-only enum Role::SyncCommittee, CollectionMode::SingleValidator, &validator, @@ -1352,7 +1730,8 @@ impl ValidatorStore for AnchorValidatorStore { slot, ), ) - .await?; + .await? + }; Ok(signature.into()) }; @@ -1375,7 +1754,7 @@ impl ValidatorStore for AnchorValidatorStore { let future = async { let epoch = slot.epoch(E::slots_per_epoch()); let (validator, cluster) = self.get_validator_and_cluster(*validator_pubkey)?; - let metadata = self.get_slot_metadata(slot).await?; + let metadata = self.get_voting_context(slot).await?; let validator_attestation_committees = self.get_attesting_validators_in_committee(&metadata, cluster.committee_id()); @@ -1405,6 +1784,23 @@ impl ValidatorStore for AnchorValidatorStore { Completed::Success(data) => data, }; + // Calculate signature count for post-consensus committee collection + let committee_validator_indices: HashSet = { + let state = self.database.state(); + state + .metadata() + .get_all_by(&cluster.committee_id()) + .filter_map(|v| v.index) + .collect() + }; + + // Use voting_message_count_for_committee for post-consensus (flat counting) + let num_signatures_to_collect = metadata + .voting_assignments + .voting_message_count_for_committee(|idx| { + committee_validator_indices.contains(idx) + }); + let domain = self.get_domain(epoch, Domain::SyncCommittee); let signing_root = data.block_root.signing_root(domain); let signature = self @@ -1412,7 +1808,7 @@ impl ValidatorStore for AnchorValidatorStore { PartialSignatureKind::PostConsensus, Role::Committee, CollectionMode::Committee { - slot_metadata: metadata, + num_signatures_to_collect, base_hash: data.hash(), }, &validator, @@ -1457,9 +1853,13 @@ impl ValidatorStore for AnchorValidatorStore { selection_proof, }; - let metadata = self.get_slot_metadata(slot).await?; + // Get aggregator voting assignments from Phase 3 (published at 2/3 slot) + let aggregator_info = self.get_aggregation_assignments(slot).await?; - let signing_data = match metadata.multi_sync_aggregators.get(&aggregator_pubkey) { + let signing_data = match aggregator_info + .multi_sync_aggregators + .get(&aggregator_pubkey) + { None => vec![signing_data], Some(contribution_waiter) => { let mut data = contribution_waiter.submit_and_wait(signing_data).await; @@ -1710,3 +2110,222 @@ impl SignableBlock for BeaconBlock> { ))) } } + +#[cfg(test)] +mod tests { + use super::*; + + /// Creates a test `VotingAssignments` with the given parameters. + fn create_test_voting_assignments( + attesting_validators: Vec, + sync_validators_by_subnet: Vec<(usize, Vec)>, + ) -> VotingAssignments { + VotingAssignments { + slot: Slot::new(100), + attesting_validators: attesting_validators + .into_iter() + .map(ValidatorIndex) + .collect(), + attesting_committees: HashMap::new(), + sync_validators_by_subnet: sync_validators_by_subnet + .into_iter() + .map(|(idx, subnets)| { + ( + ValidatorIndex(idx), + subnets.into_iter().map(SyncSubnetId::new).collect(), + ) + }) + .collect(), + } + } + + #[test] + fn test_selection_proof_count_with_multi_subnet_validators() { + // Create voting assignments with: + // - Validators 1, 2 attesting + // - Validator 3 in 1 subnet (contributes 1) + // - Validator 4 in 3 subnets (contributes 3) + // - Validator 5 in 2 subnets (contributes 2) + let voting_assignments = create_test_voting_assignments( + vec![1, 2], + vec![(3, vec![0]), (4, vec![0, 1, 2]), (5, vec![0, 1])], + ); + + // All validators in committee + let all_in_committee = |_: &ValidatorIndex| true; + let count = voting_assignments.selection_proof_count_for_committee(all_in_committee); + // 2 attesting + 1 + 3 + 2 sync = 8 + assert_eq!(count, 8); + } + + #[test] + fn test_selection_proof_count_with_filter() { + let voting_assignments = create_test_voting_assignments( + vec![1, 2, 3], + vec![ + (4, vec![0, 1]), // 2 subnets + (5, vec![0, 1, 2]), // 3 subnets + ], + ); + + // Only validators 1, 2, 4 are in the committee + let in_committee = |idx: &ValidatorIndex| matches!(idx.0, 1 | 2 | 4); + let count = voting_assignments.selection_proof_count_for_committee(in_committee); + // 2 attesting (1, 2) + 2 sync subnets (validator 4) = 4 + assert_eq!(count, 4); + } + + #[test] + fn test_committee_message_count_with_multi_subnet_validators() { + // Same setup as selection proof test, but counting should be flat + let voting_assignments = create_test_voting_assignments( + vec![1, 2], + vec![ + (3, vec![0]), + (4, vec![0, 1, 2]), // 3 subnets but counts as 1 + (5, vec![0, 1]), // 2 subnets but counts as 1 + ], + ); + + let all_in_committee = |_: &ValidatorIndex| true; + let count = voting_assignments.voting_message_count_for_committee(all_in_committee); + // 2 attesting + 3 sync validators (flat) = 5 + assert_eq!(count, 5); + } + + #[test] + fn test_committee_message_count_with_filter() { + let voting_assignments = create_test_voting_assignments( + vec![1, 2, 3], + vec![ + (4, vec![0, 1]), // 2 subnets + (5, vec![0, 1, 2]), // 3 subnets + ], + ); + + // Only validators 1, 2, 4 are in the committee + let in_committee = |idx: &ValidatorIndex| matches!(idx.0, 1 | 2 | 4); + let count = voting_assignments.voting_message_count_for_committee(in_committee); + // 2 attesting (1, 2) + 1 sync (validator 4) = 3 + assert_eq!(count, 3); + } + + #[test] + fn test_counting_difference_between_methods() { + // Demonstrate the key difference between the two counting methods + let voting_assignments = create_test_voting_assignments( + vec![1], // 1 attesting validator + vec![ + (2, vec![0, 1, 2, 3]), // Validator in 4 subnets + ], + ); + + let all_in_committee = |_: &ValidatorIndex| true; + + // Selection proof: 1 + 4 = 5 + let selection_count = + voting_assignments.selection_proof_count_for_committee(all_in_committee); + assert_eq!(selection_count, 5); + + // Voting message: 1 + 1 = 2 + let message_count = voting_assignments.voting_message_count_for_committee(all_in_committee); + assert_eq!(message_count, 2); + + // The difference highlights the counting patterns: + // - Selection proofs need one proof per subnet per validator + // - Voting messages need one message per validator regardless of subnets + } + + #[test] + fn test_overlapping_attesting_and_sync_validators() { + // Validator can be both attesting and in sync committee + let mut voting_assignments = create_test_voting_assignments( + vec![1, 2], // Validators 1 and 2 attesting + vec![ + (1, vec![0]), // Validator 1 also in sync (1 subnet) + (2, vec![0, 1]), // Validator 2 also in sync (2 subnets) + ], + ); + voting_assignments.attesting_validators = vec![ValidatorIndex(1), ValidatorIndex(2)]; + + let all_in_committee = |_: &ValidatorIndex| true; + + // Selection proof: 2 attesting + (1 + 2) sync = 5 + let selection_count = + voting_assignments.selection_proof_count_for_committee(all_in_committee); + assert_eq!(selection_count, 5); + + // Voting message: 2 attesting + 2 sync = 4 + let message_count = voting_assignments.voting_message_count_for_committee(all_in_committee); + assert_eq!(message_count, 4); + } + + #[tokio::test] + async fn test_validator_voting_assignments_watch_channel_waits_for_update() { + // Test the watch channel behavior directly without creating a full AnchorValidatorStore + let (tx, mut rx) = watch::channel::>>(None); + + // Start a task to wait for slot 5 + let wait_task = tokio::spawn(async move { + loop { + let current = rx.borrow().clone(); + if let Some(voting_assignments) = current + && voting_assignments.slot == Slot::new(5) + { + return Ok::<_, ()>(voting_assignments); + } + // Wait for update + if rx.changed().await.is_err() { + return Err(()); + } + } + }); + + // Give the task time to start waiting + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + + // Update with voting assignments for slot 5 + let voting_assignments = VotingAssignments { + slot: Slot::new(5), + attesting_validators: vec![ValidatorIndex(1)], + attesting_committees: HashMap::new(), + sync_validators_by_subnet: HashMap::new(), + }; + tx.send_replace(Some(Arc::new(voting_assignments))); + + // The wait task should now complete successfully + let result = wait_task.await.unwrap(); + assert!(result.is_ok()); + let received = result.unwrap(); + assert_eq!(received.slot, Slot::new(5)); + assert_eq!(received.attesting_validators, vec![ValidatorIndex(1)]); + } + + #[tokio::test] + async fn test_validator_voting_assignments_errors_if_slot_passed() { + // Test the watch channel behavior directly + let (tx, rx) = watch::channel::>>(None); + + // Update with voting assignments for slot 10 (newer than what we'll request) + let voting_assignments = VotingAssignments { + slot: Slot::new(10), + attesting_validators: vec![], + attesting_committees: HashMap::new(), + sync_validators_by_subnet: HashMap::new(), + }; + tx.send_replace(Some(Arc::new(voting_assignments))); + + // Simulate requesting slot 5 (older than cached slot 10) + let current = rx.borrow().clone(); + if let Some(voting_assignments) = current { + if voting_assignments.slot > Slot::new(5) { + // This simulates the error condition we'd return + assert_eq!(voting_assignments.slot, Slot::new(10)); + } else { + panic!("Should have newer slot cached"); + } + } else { + panic!("Should have voting assignments cached"); + } + } +} diff --git a/anchor/validator_store/src/metadata_service.rs b/anchor/validator_store/src/metadata_service.rs index ac5a5962c..8740a7d1a 100644 --- a/anchor/validator_store/src/metadata_service.rs +++ b/anchor/validator_store/src/metadata_service.rs @@ -1,16 +1,24 @@ -use std::{collections::HashMap, sync::Arc, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + sync::Arc, + time::{Duration, Instant}, +}; use beacon_node_fallback::BeaconNodeFallback; use slot_clock::SlotClock; use ssv_types::{ValidatorIndex, consensus::BeaconVote}; use task_executor::TaskExecutor; -use tokio::time::sleep; -use tracing::{error, info, trace}; -use types::{ChainSpec, EthSpec}; +use tokio::{sync::watch, time::sleep}; +use tracing::{error, info, trace, warn}; +use types::{ChainSpec, EthSpec, Slot, SyncSubnetId}; use validator_services::duties_service::DutiesService; -use crate::{AnchorValidatorStore, ContributionWaiter, SlotMetadata}; +use crate::{ + AggregationAssignments, AnchorValidatorStore, ContributionWaiter, VotingAssignments, + VotingContext, metrics, +}; +#[derive(Clone)] pub struct MetadataService { duties_service: Arc, T>>, validator_store: Arc>, @@ -18,6 +26,8 @@ pub struct MetadataService { beacon_nodes: Arc>, executor: TaskExecutor, spec: Arc, + attesters_poll_rx: watch::Receiver, + sync_poll_rx: watch::Receiver, } impl MetadataService { @@ -29,6 +39,9 @@ impl MetadataService { executor: TaskExecutor, spec: Arc, ) -> Self { + let attesters_poll_rx = duties_service.subscribe_to_attesters_poll(); + let sync_poll_rx = duties_service.subscribe_to_sync_poll(); + Self { duties_service, validator_store, @@ -36,6 +49,8 @@ impl MetadataService { beacon_nodes, executor, spec, + attesters_poll_rx, + sync_poll_rx, } } @@ -53,31 +68,272 @@ impl MetadataService { let executor = self.executor.clone(); - let interval_fut = async move { - loop { - if let Some(duration_to_next_slot) = self.slot_clock.duration_to_next_slot() { - sleep(duration_to_next_slot + slot_duration / 3).await; + // ═══════════════════════════════════════════════════════════════════════ + // PHASE 1: VotingAssignments (slot start) + // Caches voting assignments for use by both selection proofs AND voting context. + // Waits for DutiesService to complete polling before reading duties. + // + // RESILIENCE: We wait up to 3.5s for poll signals, but always proceed to + // read from duties cache regardless of poll outcome. This handles: + // - Normal operation: Signals arrive quickly (< 100ms), we read fresh duties + // - Beacon node slow: Signal arrives late (< 3s), we still get fresh duties + // - Beacon node down: Timeout after 3.5s, we read from cache (stale or empty) + // + // The 3.5s timeout is chosen because: + // - Lighthouse BN API timeout is 3 seconds (slot_duration / 4) + // - Phase 2 starts at 4 seconds (1/3 slot) + // - This gives 500ms buffer after BN timeout before Phase 2 deadline + // ═══════════════════════════════════════════════════════════════════════ + let self_clone_phase1 = self.clone(); + executor.spawn( + async move { + let mut attesters_poll_rx = self_clone_phase1.attesters_poll_rx.clone(); + let mut sync_poll_rx = self_clone_phase1.sync_poll_rx.clone(); + let poll_timeout = Duration::from_millis(3500); + + loop { + if let Some(duration_to_next_slot) = + self_clone_phase1.slot_clock.duration_to_next_slot() + { + // Sleep until slot start + sleep(duration_to_next_slot).await; + + let slot: Slot = self_clone_phase1.slot_clock.now().unwrap_or_default(); + let poll_start = Instant::now(); + + // Wait for poll signals with timeout (parallel execution). + let (attesters_ready, sync_ready) = tokio::join!( + async { + tokio::time::timeout( + poll_timeout, + attesters_poll_rx.wait_for(|&s| s >= slot), + ) + .await + .is_ok_and(|r| r.is_ok()) + }, + async { + tokio::time::timeout( + poll_timeout, + sync_poll_rx.wait_for(|&s| s >= slot), + ) + .await + .is_ok_and(|r| r.is_ok()) + } + ); + + let poll_duration = poll_start.elapsed(); + + // Log poll failures + if !attesters_ready { + warn!( + %slot, + poll_duration_ms = poll_duration.as_millis(), + "Attesters poll failed or timed out - will use cached duties" + ); + } + if !sync_ready { + warn!( + %slot, + poll_duration_ms = poll_duration.as_millis(), + "Sync poll failed or timed out - will use cached duties" + ); + } + + // Record poll telemetry + metrics::inc_counter_vec( + &metrics::METADATA_SERVICE_POLL_TOTAL, + &[ + metrics::ATTESTERS, + if attesters_ready { + metrics::SUCCESS + } else { + metrics::FAILED + }, + ], + ); + metrics::inc_counter_vec( + &metrics::METADATA_SERVICE_POLL_TOTAL, + &[ + metrics::SYNC, + if sync_ready { + metrics::SUCCESS + } else { + metrics::FAILED + }, + ], + ); + metrics::observe( + &metrics::METADATA_SERVICE_POLL_DURATION, + poll_duration.as_secs_f64(), + ); + + // Always proceed to build VotingAssignments regardless of poll results. + // Even if polls failed, the duties cache may have: + // - Fresh data from a previous poll this slot + // - Stale data from previous slot/epoch (better than nothing) + // - Empty data (only if poll timed out AND this is a fresh restart) + if let Err(err) = self_clone_phase1.update_voting_assignments() { + error!(err, "Failed to update validator voting assignments"); + } + } else { + error!("Failed to read slot clock"); + sleep(slot_duration).await; + } + } + }, + "voting_assignments_service", + ); + + // ═══════════════════════════════════════════════════════════════════════ + // PHASE 2: VotingContext (1/3 slot) + // Gets cached voting assignments, fetches beacon_vote, builds VotingContext. + // ═══════════════════════════════════════════════════════════════════════ + let self_clone_phase2 = self.clone(); + executor.spawn( + async move { + loop { + if let Some(duration_to_next_slot) = + self_clone_phase2.slot_clock.duration_to_next_slot() + { + // Sleep until 1/3 into slot + sleep(duration_to_next_slot + slot_duration / 3).await; - if let Err(err) = self.update_metadata().await { - error!(err, "Failed to update slot metadata") + if let Err(err) = self_clone_phase2.update_voting_context().await { + error!(err, "Failed to update voting context") + } else { + trace!("Updated voting context"); + } } else { - trace!("Updated slot metadata"); + error!("Failed to read slot clock"); + sleep(slot_duration).await; } - } else { - error!("Failed to read slot clock"); - // If we can't read the slot clock, just wait another slot. - sleep(slot_duration).await; } - } + }, + "voting_context_service", + ); + + // ═══════════════════════════════════════════════════════════════════════ + // PHASE 3: AggregationAssignments (2/3 slot) + // Re-fetches duties_service.attesters() after selection proofs are computed. + // At this point, DutyAndProof.selection_proof.is_some() accurately indicates + // is_aggregator for attestation duties. + // ═══════════════════════════════════════════════════════════════════════ + let self_clone_phase3 = self.clone(); + executor.spawn( + async move { + loop { + if let Some(duration_to_next_slot) = + self_clone_phase3.slot_clock.duration_to_next_slot() + { + // Sleep until 2/3 into slot + sleep(duration_to_next_slot + slot_duration * 2 / 3).await; + + if let Err(err) = self_clone_phase3.update_aggregation_assignments() { + error!(err, "Failed to update aggregator voting assignments"); + } + } else { + error!("Failed to read slot clock"); + sleep(slot_duration).await; + } + } + }, + "aggregation_assignments_service", + ); + + Ok(()) + } + + /// Phase 1: Build and publish VotingAssignments at slot start. + fn update_voting_assignments(&self) -> Result<(), String> { + let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; + + // Get attestation validators + let (attesting_validators, attesting_committees): (Vec<_>, HashMap<_, _>) = self + .duties_service + .attesters(slot) + .into_iter() + .map(|duty| { + ( + ValidatorIndex(duty.duty.validator_index as usize), + (duty.duty.pubkey, duty.duty.committee_index), + ) + }) + .unzip(); + + // Get sync validators by subnet + let sync_validators_by_subnet = self + .duties_service + .sync_duties + .get_duties_for_slot::(slot, &self.spec) + .as_ref() + .map(|sync_duties| { + let mut map = HashMap::>::new(); + sync_duties + .duties + .iter() + .filter_map(|duty| { + SyncSubnetId::compute_subnets_for_sync_committee::( + &duty.validator_sync_committee_indices, + ) + .map_err(|e| { + tracing::warn!( + "Failed to compute sync subnets for validator {}: {e:?}", + duty.validator_index + ); + }) + .ok() + .map(|subnet_ids| { + (ValidatorIndex(duty.validator_index as usize), subnet_ids) + }) + }) + .for_each(|(validator_index, subnet_ids)| { + map.entry(validator_index).or_default().extend(subnet_ids); + }); + map + }) + .unwrap_or_default(); + + let attester_count = attesting_validators.len(); + let sync_count = sync_validators_by_subnet.len(); + + let voting_assignments = VotingAssignments { + slot, + attesting_validators, + attesting_committees, + sync_validators_by_subnet, }; - executor.spawn(interval_fut, "metadata_service"); + self.validator_store + .update_voting_assignments(voting_assignments); + + // Record validator count metrics + metrics::set_gauge( + &metrics::METADATA_SERVICE_ATTESTING_VALIDATORS, + attester_count as i64, + ); + metrics::set_gauge( + &metrics::METADATA_SERVICE_SYNC_VALIDATORS, + sync_count as i64, + ); + if attester_count == 0 && sync_count == 0 { + metrics::inc_counter(&metrics::METADATA_SERVICE_EMPTY_ASSIGNMENTS_TOTAL); + } + + trace!(%slot, attester_count, sync_count, "Published VotingAssignments at slot start"); Ok(()) } - async fn update_metadata(&self) -> Result<(), String> { + /// Phase 2: Build and publish VotingContext at 1/3 slot. + async fn update_voting_context(&self) -> Result<(), String> { let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; + let voting_assignments = self + .validator_store + .get_voting_assignments(slot) + .await + .map_err(|e| format!("Failed to get cached voting assignments: {:?}", e))?; + + // Fetch beacon_vote from beacon node let attestation_data = self .beacon_nodes .first_success(|beacon_node| async move { @@ -100,34 +356,70 @@ impl MetadataService { target: attestation_data.target, }; - let (attesting_validator_indices, attesting_validator_committees) = self + let voting_context = VotingContext { + voting_assignments, + beacon_vote, + }; + + self.validator_store.update_voting_context(voting_context); + + trace!(%slot, "Published VotingContext at 1/3 slot"); + Ok(()) + } + + /// Phase 3: Build and publish AggregationAssignments at 2/3 slot. + fn update_aggregation_assignments(&self) -> Result<(), String> { + let slot = self.slot_clock.now().ok_or("Failed to read slot clock")?; + + // Re-fetch attesters - `selection_proof.is_some()` means is_aggregator + let (aggregating_attesters, aggregator_committees) = self .duties_service .attesters(slot) .into_iter() - .map(|duty| { + .filter(|duty_and_proof| duty_and_proof.selection_proof.is_some()) + .map(|duty_and_proof| { ( - ValidatorIndex(duty.duty.validator_index as usize), - (duty.duty.pubkey, duty.duty.committee_index), + ValidatorIndex(duty_and_proof.duty.validator_index as usize), + ( + duty_and_proof.duty.pubkey, + duty_and_proof.duty.committee_index, + ), ) }) .unzip(); + // Get sync aggregators from sync duties let sync_duties = self .duties_service .sync_duties .get_duties_for_slot::(slot, &self.spec); - let sync_validators = sync_duties + // Build sync_aggregators_by_subnet + let sync_aggregators_by_subnet = sync_duties .as_ref() .map(|duties| { + let mut validator_subnets_map = + HashMap::>::new(); duties - .duties + .aggregators .iter() - .map(|duty| ValidatorIndex(duty.validator_index as usize)) - .collect() + .flat_map(|(subnet_id, aggregators)| { + aggregators.iter().map(move |(validator_index, _, _)| { + (ValidatorIndex(*validator_index as usize), *subnet_id) + }) + }) + .for_each(|(validator_index, subnet_id)| { + validator_subnets_map + .entry(validator_index) + .or_default() + .insert(subnet_id); + }); + validator_subnets_map }) .unwrap_or_default(); + // Build multi_sync_aggregators - validators aggregating on multiple subnets need + // coordination let multi_sync_aggregators = sync_duties .map(|duties| { let mut aggregators_by_validator = HashMap::new(); @@ -144,17 +436,18 @@ impl MetadataService { }) .unwrap_or_default(); - let metadata = SlotMetadata { + let aggregator_info = AggregationAssignments { slot, - beacon_vote, - attesting_validator_indices, - attesting_validator_committees, - sync_validators, + aggregating_attesters, + aggregator_committees, + sync_aggregators_by_subnet, multi_sync_aggregators, }; - self.validator_store.update_slot_metadata(metadata); + self.validator_store + .update_aggregation_assignments(aggregator_info); + trace!(%slot, "Published AggregationAssignments at 2/3 slot"); Ok(()) } } diff --git a/anchor/validator_store/src/metrics.rs b/anchor/validator_store/src/metrics.rs index f51540a1d..f6ab7c506 100644 --- a/anchor/validator_store/src/metrics.rs +++ b/anchor/validator_store/src/metrics.rs @@ -24,3 +24,56 @@ pub static SIGNED_RANDAO_REVEALS_TOTAL: LazyLock> = LazyLo &["status"], ) }); + +// ═══════════════════════════════════════════════════════════════════════════════ +// MetadataService metrics +// ═══════════════════════════════════════════════════════════════════════════════ + +// Poll outcome labels +pub const ATTESTERS: &str = "attesters"; +pub const SYNC: &str = "sync"; +pub const SUCCESS: &str = "success"; +pub const FAILED: &str = "failed"; + +/// Count of poll attempts by type (attesters/sync) and outcome (success/failed) +pub static METADATA_SERVICE_POLL_TOTAL: LazyLock> = LazyLock::new(|| { + try_create_int_counter_vec( + "anchor_metadata_service_poll_total", + "Count of DutiesService poll wait attempts", + &["type", "outcome"], + ) +}); + +/// Duration of poll wait phase (both polls in parallel) +pub static METADATA_SERVICE_POLL_DURATION: LazyLock> = LazyLock::new(|| { + try_create_histogram( + "anchor_metadata_service_poll_duration_seconds", + "Duration waiting for DutiesService poll signals", + ) +}); + +/// Current count of attesting validators in VotingAssignments +pub static METADATA_SERVICE_ATTESTING_VALIDATORS: LazyLock> = + LazyLock::new(|| { + try_create_int_gauge( + "anchor_metadata_service_attesting_validators", + "Count of validators with attestation duties this slot", + ) + }); + +/// Current count of sync committee validators in VotingAssignments +pub static METADATA_SERVICE_SYNC_VALIDATORS: LazyLock> = LazyLock::new(|| { + try_create_int_gauge( + "anchor_metadata_service_sync_validators", + "Count of validators with sync committee duties this slot", + ) +}); + +/// Count of slots where VotingAssignments was empty +pub static METADATA_SERVICE_EMPTY_ASSIGNMENTS_TOTAL: LazyLock> = + LazyLock::new(|| { + try_create_int_counter( + "anchor_metadata_service_empty_assignments_total", + "Count of slots where VotingAssignments had no duties", + ) + });