Skip to content

Commit f87c2d7

Browse files
authored
feat(threshold-signer): add metrics & extra libp2p logs (#27)
* feat(dcipher-agents/libp2p): add connection closed & topic unsubscribe logs * feat(dcipher-agents/threshold_signer): add metrics for connections * feat(dcipher-agents/threshold_signer): add metrics for partials * feat(blocklock-agent & randomness-agent): expose threshold signer metrics * feat(threshold-signer): add missing metrics module
1 parent e91a046 commit f87c2d7

5 files changed

Lines changed: 162 additions & 6 deletions

File tree

blocklock-agent/examples/blocklock/healthcheck.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use anyhow::anyhow;
2-
use dcipher_agents::agents::blocklock::metrics::Metrics;
2+
use dcipher_agents::agents::blocklock::metrics::Metrics as BlocklockMetrics;
3+
use dcipher_agents::signer::threshold_signer::metrics::Metrics as ThresholdSignerMetrics;
34
use prometheus::{Encoder, TextEncoder};
45
use std::net::IpAddr;
56
use warp::Filter;
@@ -12,7 +13,7 @@ pub async fn start_api(listen_addr: IpAddr, port: u16) -> anyhow::Result<()> {
1213

1314
let metrics = warp::path!("metrics").map(|| {
1415
let encoder = TextEncoder::new();
15-
let metrics = Metrics::gather();
16+
let metrics = [BlocklockMetrics::gather(), ThresholdSignerMetrics::gather()].concat();
1617
let mut buffer = Vec::new();
1718

1819
match encoder.encode(&metrics, &mut buffer) {

dcipher-agents/src/signer/threshold_signer.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
44
mod aggregation;
55
mod libp2p;
6+
pub mod metrics;
67

78
pub use aggregation::lagrange_points_interpolate_at;
89

910
use crate::ser::EvmSerialize;
1011
use crate::signer::threshold_signer::libp2p::LibP2PNode;
12+
use crate::signer::threshold_signer::metrics::Metrics;
1113
use crate::signer::{AsynchronousSigner, BlsSigner, BlsVerifier};
1214
use ark_ec::{AffineRepr, CurveGroup};
1315
use itertools::Either;
@@ -355,6 +357,7 @@ where
355357

356358
let m = serde_cbor::to_vec(&partial)
357359
.expect("serialization should always work");
360+
Metrics::report_partials_sent(1);
358361
if tx_to_libp2p.send(m).is_err() {
359362
tracing::error!("Failed to send message to signer: channel closed");
360363
}
@@ -398,13 +401,16 @@ where
398401
}
399402
};
400403

404+
Metrics::report_partials_received(1);
405+
401406
// Verify the validity of the partial signature against its pk
402407
let Some(pk) = self.pks.get(usize::from(party_id) - 1) else {
403408
tracing::error!(sender_id = party_id, "Invalid party_id / pks vector");
404409
continue;
405410
};
406411
if !self.signer.verify(&partial.m, partial.sig, *pk) {
407412
tracing::error!(sender_id = party_id, "Received invalid partial signature");
413+
Metrics::report_invalid_partials(1);
408414
continue;
409415
}
410416

dcipher-agents/src/signer/threshold_signer/libp2p.rs

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
//! Libp2p node that can be used to broadcast and receive arbitrary messages using floodsub and a
22
//! peer whitelist.
33
4+
use crate::signer::threshold_signer::metrics::Metrics;
45
use futures_util::StreamExt;
56
use libp2p::allow_block_list::AllowedPeers;
67
use libp2p::floodsub::{FloodsubEvent, FloodsubMessage};
@@ -10,6 +11,7 @@ use libp2p::{
1011
Multiaddr, PeerId, Swarm, allow_block_list, floodsub, noise, swarm::SwarmEvent, tcp, yamux,
1112
};
1213
use std::collections::HashMap;
14+
use std::num::NonZeroU32;
1315
use std::time::Duration;
1416
use thiserror::Error;
1517
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
@@ -223,14 +225,40 @@ impl LibP2PNode {
223225
peer_id,
224226
topic,
225227
})) => {
226-
tracing::debug!(%peer_id, ?topic, "Peer listening to topic");
228+
let short_id = peer_id_to_short_id.get(&peer_id).or_else(|| {
229+
tracing::error!(
230+
incoming_peer_id = %peer_id,
231+
"Failed to convert peer_id to short_id"
232+
);
233+
None
234+
});
235+
236+
tracing::info!(%peer_id, ?short_id, ?topic, "Peer subscribed to topic");
227237
// Once we've received at least one topic subscription from a remote peer, we should
228238
// be able to send messages.
229239
*ready_send_messages = true;
230240
}
231241

242+
SwarmEvent::Behaviour(BehaviourEvent::Floodsub(FloodsubEvent::Unsubscribed {
243+
peer_id,
244+
topic,
245+
})) => {
246+
let short_id = peer_id_to_short_id.get(&peer_id).or_else(|| {
247+
tracing::error!(
248+
incoming_peer_id = %peer_id,
249+
"Failed to convert peer_id to short_id"
250+
);
251+
None
252+
});
253+
254+
tracing::info!(%peer_id, ?short_id, ?topic, "Peer unsubscribed to topic");
255+
}
256+
232257
SwarmEvent::ConnectionEstablished {
233-
peer_id, endpoint, ..
258+
peer_id,
259+
endpoint,
260+
num_established,
261+
..
234262
} => {
235263
let short_id = peer_id_to_short_id.get(&peer_id).or_else(|| {
236264
tracing::error!(
@@ -241,14 +269,51 @@ impl LibP2PNode {
241269
None
242270
});
243271

272+
if num_established == const { NonZeroU32::new(1).unwrap() } {
273+
// First connection established, report new peer connected
274+
Metrics::report_peer_connected();
275+
}
276+
244277
tracing::info!(
245278
incoming_peer_id = %peer_id,
246279
incoming_short_id = ?short_id,
247280
incoming_remote_addr = %endpoint.get_remote_address(),
281+
num_established,
248282
"Libp2p node established connection with peer"
249283
);
250284
}
251285

286+
SwarmEvent::ConnectionClosed {
287+
peer_id,
288+
endpoint,
289+
num_established,
290+
cause,
291+
..
292+
} => {
293+
let short_id = peer_id_to_short_id.get(&peer_id).or_else(|| {
294+
tracing::error!(
295+
incoming_peer_id = %peer_id,
296+
incoming_remote_addr = %endpoint.get_remote_address(),
297+
"Libp2p node closed connection with an unknown peer"
298+
);
299+
None
300+
});
301+
302+
if num_established == 0 {
303+
// No more connections, report disconnect
304+
Metrics::report_peer_disconnected();
305+
}
306+
307+
tracing::info!(
308+
incoming_peer_id = %peer_id,
309+
incoming_short_id = ?short_id,
310+
incoming_remote_addr = %endpoint.get_remote_address(),
311+
remaining_connections = num_established,
312+
?cause,
313+
"Libp2p node closed connection to peer"
314+
);
315+
}
316+
252317
SwarmEvent::NewListenAddr { address, .. } => {
253318
tracing::info!(
254319
"Local node is listening on {}",
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
use prometheus::proto::MetricFamily;
2+
use prometheus::{IntCounter, IntGauge, Registry};
3+
use std::sync::LazyLock;
4+
5+
pub struct Metrics {
6+
registry: Registry,
7+
connected_peers: IntGauge,
8+
partials_sent: IntCounter,
9+
partials_received: IntCounter,
10+
invalid_partials: IntCounter,
11+
}
12+
13+
static METRICS: LazyLock<Metrics> = LazyLock::new(|| {
14+
let registry = Registry::new();
15+
16+
let connected_peers = IntGauge::new("libp2p_connected_peers", "Number of connected peers")
17+
.expect("metrics failed to initialise");
18+
19+
let partials_sent = IntCounter::new(
20+
"partial_signatures_sent",
21+
"Number of partial signatures sent",
22+
)
23+
.expect("metrics failed to initialise");
24+
25+
let partials_received = IntCounter::new(
26+
"partial_signatures_received",
27+
"Number of partial signatures received",
28+
)
29+
.expect("metrics failed to initialise");
30+
31+
let invalid_partials = IntCounter::new(
32+
"invalid_partial_signature_received",
33+
"Number of invalid partial signatures received",
34+
)
35+
.expect("metrics failed to initialise");
36+
37+
registry
38+
.register(Box::new(connected_peers.clone()))
39+
.expect("metrics failed to initialise");
40+
registry
41+
.register(Box::new(partials_sent.clone()))
42+
.expect("metrics failed to initialise");
43+
registry
44+
.register(Box::new(partials_received.clone()))
45+
.expect("metrics failed to initialise");
46+
registry
47+
.register(Box::new(invalid_partials.clone()))
48+
.expect("metrics failed to initialise");
49+
50+
Metrics {
51+
registry,
52+
connected_peers,
53+
partials_received,
54+
partials_sent,
55+
invalid_partials,
56+
}
57+
});
58+
59+
impl Metrics {
60+
pub(super) fn report_peer_connected() {
61+
METRICS.connected_peers.inc();
62+
}
63+
64+
pub(super) fn report_peer_disconnected() {
65+
METRICS.connected_peers.dec();
66+
}
67+
68+
pub(super) fn report_partials_received(count: u64) {
69+
METRICS.partials_received.inc_by(count)
70+
}
71+
72+
pub(super) fn report_partials_sent(count: u64) {
73+
METRICS.partials_sent.inc_by(count)
74+
}
75+
76+
pub(super) fn report_invalid_partials(count: u64) {
77+
METRICS.invalid_partials.inc_by(count)
78+
}
79+
80+
pub fn gather() -> Vec<MetricFamily> {
81+
METRICS.registry.gather()
82+
}
83+
}

randomness-agent/examples/randomness/healthcheck.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use anyhow::anyhow;
2-
use dcipher_agents::agents::randomness::metrics::Metrics;
2+
use dcipher_agents::agents::blocklock::metrics::Metrics as BlocklockMetrics;
3+
use dcipher_agents::signer::threshold_signer::metrics::Metrics as ThresholdSignerMetrics;
34
use prometheus::{Encoder, TextEncoder};
45
use std::net::IpAddr;
56
use warp::Filter;
@@ -12,7 +13,7 @@ pub async fn start_api(listen_addr: IpAddr, port: u16) -> anyhow::Result<()> {
1213

1314
let metrics = warp::path!("metrics").map(|| {
1415
let encoder = TextEncoder::new();
15-
let metrics = Metrics::gather();
16+
let metrics = [BlocklockMetrics::gather(), ThresholdSignerMetrics::gather()].concat();
1617
let mut buffer = Vec::new();
1718

1819
match encoder.encode(&metrics, &mut buffer) {

0 commit comments

Comments
 (0)