Skip to content

Duty logic validation for consensus messages #228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 40 commits into from
Apr 25, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
89f0306
duty logic validation
diegomrsantos Apr 3, 2025
62596d7
add duties tracker
diegomrsantos Apr 15, 2025
e2c6186
fix compilation and simplify
diegomrsantos Apr 15, 2025
9a2ebaf
Merge branch 'unstable' into duty-logic
diegomrsantos Apr 15, 2025
f48e866
add parking_lot
diegomrsantos Apr 15, 2025
43a00db
add proposers duties tracking
diegomrsantos Apr 15, 2025
17075d3
uncomment pre-commit
diegomrsantos Apr 16, 2025
3ccb16a
simplify duties tracking
diegomrsantos Apr 16, 2025
763589b
use dashmap
diegomrsantos Apr 16, 2025
4dc2620
update docs for SyncCommitteePerPeriod
diegomrsantos Apr 16, 2025
1e7b599
remove metrics
diegomrsantos Apr 16, 2025
80ff366
improvements
diegomrsantos Apr 16, 2025
ae6a864
remove functions not used in BeaconNetwork
diegomrsantos Apr 16, 2025
2684a67
remove slots_per_epoch from Validator
diegomrsantos Apr 16, 2025
c63075b
remove extra #
diegomrsantos Apr 17, 2025
f0e9e89
Merge branch 'unstable' into duty-logic
diegomrsantos Apr 17, 2025
efb45e2
remove unnecessary get_slot_start_time
diegomrsantos Apr 17, 2025
36b0a68
propagate error instead of failing silently
diegomrsantos Apr 17, 2025
070d434
improve logging adding gossipsub and ssv msg id
diegomrsantos Apr 17, 2025
9c9d3f1
change log to trace
diegomrsantos Apr 17, 2025
2bd726a
remove unnecessary things from proposers tracking
diegomrsantos Apr 17, 2025
812e09a
better error handling in poll_beacon_proposers
diegomrsantos Apr 17, 2025
31b1a7e
remove estimated_epoch_at_slot
diegomrsantos Apr 17, 2025
d0e43d2
remove estimated_time_at_slot
diegomrsantos Apr 17, 2025
f40dc52
remove is_first_slot_of_epoch
diegomrsantos Apr 17, 2025
439d3b9
remove BeaconNetwork
diegomrsantos Apr 17, 2025
5a98514
Merge branch 'unstable' into duty-logic
diegomrsantos Apr 17, 2025
91db909
add issue to todo
diegomrsantos Apr 17, 2025
de8938d
propagate error in message_lateness
diegomrsantos Apr 18, 2025
7f74742
use into() for ValidatorIndex
diegomrsantos Apr 18, 2025
5f367fa
remove comment
diegomrsantos Apr 18, 2025
db85d16
decrease duplication in partial_signature tests
diegomrsantos Apr 18, 2025
8e7c892
fix constants values
diegomrsantos Apr 18, 2025
ee4cc61
fix test and refactor duty_limit
diegomrsantos Apr 22, 2025
cae7132
Merge branch 'unstable' into duty-logic
diegomrsantos Apr 22, 2025
8797378
rename var and fix typo
diegomrsantos Apr 22, 2025
37ba5a8
add test_early_message_fails_validation and test_late_message_fails_v…
diegomrsantos Apr 22, 2025
ce86f70
make test_validate_ssv_message_consensus_success easier to understand
diegomrsantos Apr 23, 2025
415c9c7
fix duty limit comparison
diegomrsantos Apr 24, 2025
401d245
Merge branch 'unstable' into duty-logic
diegomrsantos Apr 24, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions .githooks/pre-commit
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#!/bin/sh
echo "Running cargo fmt --all..."
cargo +nightly fmt --all || exit 1

echo "Running cargo clippy --all..."
cargo clippy --all || exit 1

echo "Running cargo sort workspace..."
cargo sort --workspace || exit 1
##!/bin/sh
#echo "Running cargo fmt --all..."
#cargo +nightly fmt --all || exit 1
#
#echo "Running cargo clippy --all..."
#cargo clippy --all || exit 1
#
#echo "Running cargo sort workspace..."
#cargo sort --workspace || exit 1
8 changes: 6 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 16 additions & 2 deletions anchor/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use eth2::{
use keygen::{encryption::decrypt, run_keygen, Keygen};
use message_receiver::NetworkMessageReceiver;
use message_sender::NetworkMessageSender;
use message_validator::Validator;
use message_validator::{BeaconNetwork, DutiesTracker, Validator};
use network::Network;
use openssl::{pkey::Private, rsa::Rsa};
use parking_lot::RwLock;
Expand Down Expand Up @@ -372,10 +372,24 @@ impl Client {
// Network sender/receiver
let (network_tx, network_rx) = mpsc::channel::<(SubnetId, Vec<u8>)>(9001);

let duties_tracker = Arc::new(DutiesTracker::new(
beacon_nodes.clone(),
spec.clone(),
E::slots_per_epoch(),
slot_clock.clone(),
executor.clone(),
database.watch(),
));

let message_validator = Arc::new(Validator::new(
database.watch(),
E::slots_per_epoch(),
slot_clock.clone(),
BeaconNetwork::new(
slot_clock.clone(),
E::slots_per_epoch(),
spec.epochs_per_sync_committee_period.as_u64(),
),
duties_tracker.clone(),
));

let network_message_sender = NetworkMessageSender::new(
Expand Down
6 changes: 6 additions & 0 deletions anchor/common/ssv_types/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ pub struct ClusterMember {
#[ssz(struct_behaviour = "transparent")]
pub struct ValidatorIndex(pub usize);

impl From<ValidatorIndex> for u64 {
fn from(value: ValidatorIndex) -> Self {
value.0 as u64
}
}

/// General Metadata about a Validator
#[derive(Debug, Clone)]
pub struct ValidatorMetadata {
Expand Down
8 changes: 8 additions & 0 deletions anchor/database/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,4 +365,12 @@ impl NetworkState {
validator_indices: validator_index.map(|idx| vec![idx]).unwrap_or(vec![]),
})
}

pub fn validator_indices(&self) -> Vec<u64> {
self.multi_state
.validator_metadata
.values()
.filter_map(|metadata| metadata.index.map(|idx| idx.0 as u64))
.collect()
}
}
14 changes: 8 additions & 6 deletions anchor/message_receiver/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use database::{NetworkState, UniqueIndex};
use gossipsub::{Message, MessageAcceptance, MessageId};
use libp2p::PeerId;
use message_validator::{ValidatedMessage, ValidatedSSVMessage, Validator};
use message_validator::{DutiesProvider, ValidatedMessage, ValidatedSSVMessage, Validator};
use qbft_manager::QbftManager;
use signature_collector::SignatureCollectorManager;
use slot_clock::SlotClock;
Expand All @@ -22,23 +22,23 @@ pub struct Outcome {
}

/// A message receiver that passes messages to responsible managers.
pub struct NetworkMessageReceiver<S: SlotClock> {
pub struct NetworkMessageReceiver<S: SlotClock, D: DutiesProvider> {
processor: processor::Senders,
qbft_manager: Arc<QbftManager>,
signature_collector: Arc<SignatureCollectorManager>,
network_state_rx: watch::Receiver<NetworkState>,
outcome_tx: mpsc::Sender<Outcome>,
validator: Arc<Validator<S>>,
validator: Arc<Validator<S, D>>,
}

impl<S: SlotClock + 'static> NetworkMessageReceiver<S> {
impl<S: SlotClock + 'static, D: DutiesProvider> NetworkMessageReceiver<S, D> {
pub fn new(
processor: processor::Senders,
qbft_manager: Arc<QbftManager>,
signature_collector: Arc<SignatureCollectorManager>,
network_state_rx: watch::Receiver<NetworkState>,
outcome_tx: mpsc::Sender<Outcome>,
validator: Arc<Validator<S>>,
validator: Arc<Validator<S, D>>,
) -> Arc<Self> {
Arc::new(Self {
processor,
Expand All @@ -51,7 +51,9 @@ impl<S: SlotClock + 'static> NetworkMessageReceiver<S> {
}
}

impl<S: SlotClock + 'static> MessageReceiver for Arc<NetworkMessageReceiver<S>> {
impl<S: SlotClock + 'static, D: DutiesProvider> MessageReceiver
for Arc<NetworkMessageReceiver<S, D>>
{
fn receive(
&self,
propagation_source: PeerId,
Expand Down
12 changes: 6 additions & 6 deletions anchor/message_sender/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use message_validator::Validator;
use message_validator::{DutiesProvider, Validator};
use openssl::{
error::ErrorStack,
hash::MessageDigest,
Expand All @@ -22,16 +22,16 @@ use crate::{Error, MessageCallback, MessageSender};
const SIGNER_NAME: &str = "message_sign_and_send";
const SENDER_NAME: &str = "message_send";

pub struct NetworkMessageSender<S: SlotClock> {
pub struct NetworkMessageSender<S: SlotClock, D: DutiesProvider> {
processor: processor::Senders,
network_tx: mpsc::Sender<(SubnetId, Vec<u8>)>,
private_key: PKey<Private>,
operator_id: OperatorId,
validator: Option<Arc<Validator<S>>>,
validator: Option<Arc<Validator<S, D>>>,
subnet_count: usize,
}

impl<S: SlotClock + 'static> MessageSender for Arc<NetworkMessageSender<S>> {
impl<S: SlotClock + 'static, D: DutiesProvider> MessageSender for Arc<NetworkMessageSender<S, D>> {
fn sign_and_send(
&self,
message: UnsignedSSVMessage,
Expand Down Expand Up @@ -94,13 +94,13 @@ impl<S: SlotClock + 'static> MessageSender for Arc<NetworkMessageSender<S>> {
}
}

impl<S: SlotClock> NetworkMessageSender<S> {
impl<S: SlotClock, D: DutiesProvider> NetworkMessageSender<S, D> {
pub fn new(
processor: processor::Senders,
network_tx: mpsc::Sender<(SubnetId, Vec<u8>)>,
private_key: Rsa<Private>,
operator_id: OperatorId,
validator: Option<Arc<Validator<S>>>,
validator: Option<Arc<Validator<S, D>>>,
subnet_count: usize,
) -> Result<Arc<Self>, String> {
let private_key = PKey::from_rsa(private_key)
Expand Down
6 changes: 6 additions & 0 deletions anchor/message_validator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,26 @@ edition = { workspace = true }
authors = ["Sigma Prime <[email protected]>"]

[dependencies]
beacon_node_fallback = { workspace = true }
bls = { workspace = true }
dashmap = { workspace = true }
database = { workspace = true }
eth2 = { workspace = true }
ethereum_ssz = { workspace = true }
gossipsub = { workspace = true }
hex = { workspace = true }
openssl = { workspace = true }
parking_lot = { workspace = true }
processor = { workspace = true }
safe_arith = { workspace = true }
sha2 = { workspace = true }
slot_clock = { workspace = true }
ssv_types = { workspace = true }
task_executor = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
types = { workspace = true }

[dev-dependencies]
bls = { workspace = true }
Expand Down
121 changes: 121 additions & 0 deletions anchor/message_validator/src/beacon_network.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use std::time::{Duration, SystemTime};

use slot_clock::SlotClock;
use types::{Epoch, Slot};

/// Wrapper around SlotClock to provide beacon chain network functionality
#[derive(Clone)]
pub struct BeaconNetwork<S: SlotClock> {
slot_clock: S,
slots_per_epoch: u64,
epochs_per_sync_committee_period: u64,
}

impl<S: SlotClock> BeaconNetwork<S> {
/// Create a new BeaconNetwork
pub fn new(slot_clock: S, slots_per_epoch: u64, epochs_per_sync_committee_period: u64) -> Self {
Self {
slot_clock,
slots_per_epoch,
epochs_per_sync_committee_period,
}
}

/// Returns the slot clock
pub fn slot_clock(&self) -> &S {
&self.slot_clock
}

/// Returns the slot duration
pub fn slot_duration(&self) -> Duration {
self.slot_clock.slot_duration()
}

/// Returns the number of slots per epoch
pub fn slots_per_epoch(&self) -> u64 {
self.slots_per_epoch
}

/// Estimates the current slot
pub fn estimated_current_slot(&self) -> Slot {
self.slot_clock.now().unwrap_or_default()
}

/// Estimates the slot at the given time
pub fn estimated_slot_at_time(&self, time: SystemTime) -> Slot {
let since_unix = time
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default();

self.slot_clock.slot_of(since_unix).unwrap_or_default()
}

/// Estimates the time at the given slot
pub fn estimated_time_at_slot(&self, slot: Slot) -> SystemTime {
let duration = self.slot_clock.start_of(slot).unwrap_or_default();
SystemTime::UNIX_EPOCH + duration
}

/// Estimates the current epoch
pub fn estimated_current_epoch(&self) -> Epoch {
self.estimated_epoch_at_slot(self.estimated_current_slot())
}

/// Estimates the epoch at the given slot
pub fn estimated_epoch_at_slot(&self, slot: Slot) -> Epoch {
Epoch::new(slot.as_u64() / self.slots_per_epoch)
}

/// Returns the first slot at the given epoch
pub fn first_slot_at_epoch(&self, epoch: u64) -> Slot {
Slot::new(epoch * self.slots_per_epoch)
}

/// Returns the start time of the given epoch
pub fn epoch_start_time(&self, epoch: u64) -> SystemTime {
self.estimated_time_at_slot(self.first_slot_at_epoch(epoch))
}

/// Returns the start time of the given slot
pub fn get_slot_start_time(&self, slot: Slot) -> SystemTime {
self.estimated_time_at_slot(slot)
}

/// Returns the end time of the given slot
pub fn get_slot_end_time(&self, slot: Slot) -> SystemTime {
self.estimated_time_at_slot(slot + 1)
}

/// Checks if the given slot is the first slot of its epoch
pub fn is_first_slot_of_epoch(&self, slot: Slot) -> bool {
slot.as_u64() % self.slots_per_epoch == 0
}

/// Returns the first slot of the given epoch
pub fn get_epoch_first_slot(&self, epoch: u64) -> Slot {
self.first_slot_at_epoch(epoch)
}

/// Returns the number of epochs per sync committee period
pub fn epochs_per_sync_committee_period(&self) -> u64 {
self.epochs_per_sync_committee_period
}

/// Estimates the sync committee period at the given epoch
pub fn estimated_sync_committee_period_at_epoch(&self, epoch: Epoch) -> u64 {
epoch.as_u64() / self.epochs_per_sync_committee_period
}

/// Returns the first epoch of the given sync committee period
pub fn first_epoch_of_sync_period(&self, period: u64) -> u64 {
period * self.epochs_per_sync_committee_period
}

/// Returns the last slot of the given sync committee period
pub fn last_slot_of_sync_period(&self, period: u64) -> Slot {
let last_epoch = self.first_epoch_of_sync_period(period + 1) - 1;
// If we are in the sync committee that ends at slot x we do not generate a message
// during slot x-1 as it will never be included, hence -2.
self.get_epoch_first_slot(last_epoch + 1) - 2
}
}
Loading
Loading