Skip to content

Duty logic validation #228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 33 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
89f0306
duty logic validation
diegomrsantos Apr 3, 2025
62596d7
add duties tracker
diegomrsantos Apr 15, 2025
e2c6186
fix compilation and simplify
diegomrsantos Apr 15, 2025
9a2ebaf
Merge branch 'unstable' into duty-logic
diegomrsantos Apr 15, 2025
f48e866
add parking_lot
diegomrsantos Apr 15, 2025
43a00db
add proposers duties tracking
diegomrsantos Apr 15, 2025
17075d3
uncomment pre-commit
diegomrsantos Apr 16, 2025
3ccb16a
simplify duties tracking
diegomrsantos Apr 16, 2025
763589b
use dashmap
diegomrsantos Apr 16, 2025
4dc2620
update docs for SyncCommitteePerPeriod
diegomrsantos Apr 16, 2025
1e7b599
remove metrics
diegomrsantos Apr 16, 2025
80ff366
improvements
diegomrsantos Apr 16, 2025
ae6a864
remove functions not used in BeaconNetwork
diegomrsantos Apr 16, 2025
2684a67
remove slots_per_epoch from Validator
diegomrsantos Apr 16, 2025
c63075b
remove extra #
diegomrsantos Apr 17, 2025
f0e9e89
Merge branch 'unstable' into duty-logic
diegomrsantos Apr 17, 2025
efb45e2
remove unnecessary get_slot_start_time
diegomrsantos Apr 17, 2025
36b0a68
propagate error instead of failing silently
diegomrsantos Apr 17, 2025
070d434
improve logging adding gossipsub and ssv msg id
diegomrsantos Apr 17, 2025
9c9d3f1
change log to trace
diegomrsantos Apr 17, 2025
2bd726a
remove unnecessary things from proposers tracking
diegomrsantos Apr 17, 2025
812e09a
better error handling in poll_beacon_proposers
diegomrsantos Apr 17, 2025
31b1a7e
remove estimated_epoch_at_slot
diegomrsantos Apr 17, 2025
d0e43d2
remove estimated_time_at_slot
diegomrsantos Apr 17, 2025
f40dc52
remove is_first_slot_of_epoch
diegomrsantos Apr 17, 2025
439d3b9
remove BeaconNetwork
diegomrsantos Apr 17, 2025
5a98514
Merge branch 'unstable' into duty-logic
diegomrsantos Apr 17, 2025
91db909
add issue to todo
diegomrsantos Apr 17, 2025
de8938d
propagate error in message_lateness
diegomrsantos Apr 18, 2025
7f74742
use into() for ValidatorIndex
diegomrsantos Apr 18, 2025
5f367fa
remove comment
diegomrsantos Apr 18, 2025
db85d16
decrease duplication in partial_signature tests
diegomrsantos Apr 18, 2025
8e7c892
fix constants values
diegomrsantos Apr 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 12 additions & 1 deletion anchor/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use eth2::{
use keygen::{encryption::decrypt, run_keygen, Keygen};
use message_receiver::NetworkMessageReceiver;
use message_sender::{impostor::ImpostorMessageSender, MessageSender, NetworkMessageSender};
use message_validator::Validator;
use message_validator::{DutiesTracker, Validator};
use network::Network;
use openssl::{pkey::Private, rsa::Rsa};
use parking_lot::RwLock;
Expand Down Expand Up @@ -381,9 +381,20 @@ impl Client {
// Network sender/receiver
let (network_tx, network_rx) = mpsc::channel::<(SubnetId, Vec<u8>)>(9001);

let duties_tracker = Arc::new(DutiesTracker::new(
beacon_nodes.clone(),
spec.clone(),
E::slots_per_epoch(),
slot_clock.clone(),
database.watch(),
));
duties_tracker.clone().start(executor.clone());

let message_validator = Arc::new(Validator::new(
database.watch(),
E::slots_per_epoch(),
spec.epochs_per_sync_committee_period.as_u64(),
duties_tracker.clone(),
slot_clock.clone(),
));

Expand Down
6 changes: 6 additions & 0 deletions anchor/common/ssv_types/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ pub struct ClusterMember {
#[ssz(struct_behaviour = "transparent")]
pub struct ValidatorIndex(pub usize);

impl From<ValidatorIndex> for u64 {
fn from(value: ValidatorIndex) -> Self {
value.0 as u64
}
}

/// General Metadata about a Validator
#[derive(Debug, Clone)]
pub struct ValidatorMetadata {
Expand Down
8 changes: 8 additions & 0 deletions anchor/database/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,4 +368,12 @@ impl NetworkState {
validator_indices: validator_index.map(|idx| vec![idx]).unwrap_or(vec![]),
})
}

pub fn validator_indices(&self) -> Vec<u64> {
self.multi_state
.validator_metadata
.values()
.filter_map(|metadata| metadata.index.map(|idx| idx.into()))
.collect()
}
}
30 changes: 16 additions & 14 deletions anchor/message_receiver/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;
use database::{NetworkState, NonUniqueIndex, UniqueIndex};
use gossipsub::{Message, MessageAcceptance, MessageId};
use libp2p::PeerId;
use message_validator::{ValidatedMessage, ValidatedSSVMessage, Validator};
use message_validator::{DutiesProvider, ValidatedMessage, ValidatedSSVMessage, Validator};
use qbft_manager::QbftManager;
use signature_collector::SignatureCollectorManager;
use slot_clock::SlotClock;
Expand All @@ -22,23 +22,23 @@ pub struct Outcome {
}

/// A message receiver that passes messages to responsible managers.
pub struct NetworkMessageReceiver<S: SlotClock> {
pub struct NetworkMessageReceiver<S: SlotClock, D: DutiesProvider> {
processor: processor::Senders,
qbft_manager: Arc<QbftManager>,
signature_collector: Arc<SignatureCollectorManager>,
network_state_rx: watch::Receiver<NetworkState>,
outcome_tx: mpsc::Sender<Outcome>,
validator: Arc<Validator<S>>,
validator: Arc<Validator<S, D>>,
}

impl<S: SlotClock + 'static> NetworkMessageReceiver<S> {
impl<S: SlotClock + 'static, D: DutiesProvider> NetworkMessageReceiver<S, D> {
pub fn new(
processor: processor::Senders,
qbft_manager: Arc<QbftManager>,
signature_collector: Arc<SignatureCollectorManager>,
network_state_rx: watch::Receiver<NetworkState>,
outcome_tx: mpsc::Sender<Outcome>,
validator: Arc<Validator<S>>,
validator: Arc<Validator<S, D>>,
) -> Arc<Self> {
Arc::new(Self {
processor,
Expand All @@ -51,7 +51,9 @@ impl<S: SlotClock + 'static> NetworkMessageReceiver<S> {
}
}

impl<S: SlotClock + 'static> MessageReceiver for Arc<NetworkMessageReceiver<S>> {
impl<S: SlotClock + 'static, D: DutiesProvider> MessageReceiver
for Arc<NetworkMessageReceiver<S, D>>
{
fn receive(
&self,
propagation_source: PeerId,
Expand Down Expand Up @@ -92,12 +94,12 @@ impl<S: SlotClock + 'static> MessageReceiver for Arc<NetworkMessageReceiver<S>>
} = match result {
Ok(message) => message,
Err(failure) => {
debug!(?failure, "Validation failure");
debug!(gosspisub_message_id = ?message_id, ?failure, "Validation failure");
return;
}
};

let msg_id = signed_ssv_message.ssv_message().msg_id();
let msg_id = signed_ssv_message.ssv_message().msg_id().clone();

match msg_id.duty_executor() {
Some(DutyExecutor::Validator(validator)) => {
Expand All @@ -109,7 +111,7 @@ impl<S: SlotClock + 'static> MessageReceiver for Arc<NetworkMessageReceiver<S>>
.is_none()
{
// We are not a signer for this validator, return without passing.
trace!(?validator, ?msg_id, "Not interested");
trace!(gosspisub_message_id = ?message_id, ssv_msg_id = ?msg_id, ?validator, "Not interested");
return;
}
}
Expand All @@ -123,19 +125,19 @@ impl<S: SlotClock + 'static> MessageReceiver for Arc<NetworkMessageReceiver<S>>
let committee = state.clusters().get_all_by(&committee_id);
// We only need to check one cluster, as all clusters will have the same set
// of operators.
let is_member = committee
let is_member = committee.clone()
.and_then(|mut v| v.pop())
.map(|c| c.cluster_members.contains(&own_id))
.unwrap_or(false);

if is_member {
// We are not a member for this committee, return without passing.
trace!(?committee_id, ?msg_id, "Not interested");
trace!(gosspisub_message_id = ?message_id, ssv_msg_id = ?msg_id, ?committee, "Not interested");
return;
}
}
None => {
error!(?msg_id, "Invalid message ID");
error!(gosspisub_message_id = ?message_id, ssv_msg_id = ?msg_id, "Invalid message ID");
return;
}
}
Expand All @@ -146,15 +148,15 @@ impl<S: SlotClock + 'static> MessageReceiver for Arc<NetworkMessageReceiver<S>>
.qbft_manager
.receive_data(signed_ssv_message, qbft_message)
{
error!(?err, "Unable to receive QBFT message");
error!(gosspisub_message_id = ?message_id, ssv_msg_id = ?msg_id, ?err, "Unable to receive QBFT message");
}
}
ValidatedSSVMessage::PartialSignatureMessages(messages) => {
if let Err(err) = receiver
.signature_collector
.receive_partial_signatures(messages)
{
error!(?err, "Unable to receive partial signature message");
error!(gosspisub_message_id = ?message_id, ssv_msg_id = ?msg_id, ?err, "Unable to receive partial signature message");
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions anchor/message_sender/src/network.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use message_validator::Validator;
use message_validator::{DutiesProvider, Validator};
use openssl::{
error::ErrorStack,
hash::MessageDigest,
Expand All @@ -22,16 +22,16 @@ use crate::{Error, MessageCallback, MessageSender};
const SIGNER_NAME: &str = "message_sign_and_send";
const SENDER_NAME: &str = "message_send";

pub struct NetworkMessageSender<S: SlotClock> {
pub struct NetworkMessageSender<S: SlotClock, D: DutiesProvider> {
processor: processor::Senders,
network_tx: mpsc::Sender<(SubnetId, Vec<u8>)>,
private_key: PKey<Private>,
operator_id: OperatorId,
validator: Option<Arc<Validator<S>>>,
validator: Option<Arc<Validator<S, D>>>,
subnet_count: usize,
}

impl<S: SlotClock + 'static> MessageSender for Arc<NetworkMessageSender<S>> {
impl<S: SlotClock + 'static, D: DutiesProvider> MessageSender for Arc<NetworkMessageSender<S, D>> {
fn sign_and_send(
&self,
message: UnsignedSSVMessage,
Expand Down Expand Up @@ -94,13 +94,13 @@ impl<S: SlotClock + 'static> MessageSender for Arc<NetworkMessageSender<S>> {
}
}

impl<S: SlotClock> NetworkMessageSender<S> {
impl<S: SlotClock, D: DutiesProvider> NetworkMessageSender<S, D> {
pub fn new(
processor: processor::Senders,
network_tx: mpsc::Sender<(SubnetId, Vec<u8>)>,
private_key: Rsa<Private>,
operator_id: OperatorId,
validator: Option<Arc<Validator<S>>>,
validator: Option<Arc<Validator<S, D>>>,
subnet_count: usize,
) -> Result<Arc<Self>, String> {
let private_key = PKey::from_rsa(private_key)
Expand Down
8 changes: 7 additions & 1 deletion anchor/message_validator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,26 @@ edition = { workspace = true }
authors = ["Sigma Prime <[email protected]>"]

[dependencies]
beacon_node_fallback = { workspace = true }
bls = { workspace = true }
dashmap = { workspace = true }
database = { workspace = true }
eth2 = { workspace = true }
ethereum_ssz = { workspace = true }
gossipsub = { workspace = true }
hex = { workspace = true }
openssl = { workspace = true }
parking_lot = { workspace = true }
processor = { workspace = true }
safe_arith = { workspace = true }
sha2 = { workspace = true }
slot_clock = { workspace = true }
ssv_types = { workspace = true }
task_executor = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
types = { workspace = true }

[dev-dependencies]
bls = { workspace = true }
types = { workspace = true }
Loading
Loading