Skip to content

Commit 517b2f0

Browse files
[actor] Add Backoff Metrics (#3802)
1 parent a342e15 commit 517b2f0

21 files changed

Lines changed: 321 additions & 147 deletions

File tree

.github/workflows/publish.yml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,6 @@ jobs:
6060
continue-on-error: true
6161
env:
6262
CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }}
63-
- name: Publish actor
64-
run: cargo publish --manifest-path actor/Cargo.toml
65-
continue-on-error: true
66-
env:
67-
CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }}
6863
- name: Publish math
6964
run: cargo publish --manifest-path math/Cargo.toml
7065
continue-on-error: true
@@ -85,6 +80,11 @@ jobs:
8580
continue-on-error: true
8681
env:
8782
CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }}
83+
- name: Publish actor
84+
run: cargo publish --manifest-path actor/Cargo.toml
85+
continue-on-error: true
86+
env:
87+
CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }}
8888
- name: Publish storage
8989
run: cargo publish --manifest-path storage/Cargo.toml
9090
continue-on-error: true

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

actor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ workspace = true
1616
[dependencies]
1717
cfg-if.workspace = true
1818
commonware-macros.workspace = true
19+
commonware-runtime.workspace = true
1920
crossbeam-queue.workspace = true
2021
futures-util.workspace = true
2122
loom = { workspace = true, features = ["futures"], optional = true }

actor/src/benches/mailbox.rs

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
use commonware_actor::{mailbox, Feedback};
2+
use commonware_runtime::{
3+
telemetry::metrics::{Metric, Registered, Registration},
4+
Metrics, Name, Supervisor,
5+
};
26
use commonware_utils::NZUsize;
37
use criterion::{criterion_group, BatchSize, Criterion, Throughput};
48
use futures::pin_mut;
@@ -15,6 +19,44 @@ const PRODUCERS: usize = 4;
1519
const PRODUCER_MESSAGES: usize = 16 * 1024;
1620
const REPLACE_CAPACITY: usize = 1024;
1721

22+
#[derive(Clone, Copy, Debug, Default)]
23+
struct NoopMetrics;
24+
25+
impl Supervisor for NoopMetrics {
26+
fn name(&self) -> Name {
27+
Name::default()
28+
}
29+
30+
fn child(&self, _label: &'static str) -> Self {
31+
Self
32+
}
33+
34+
fn with_attribute(self, _key: &'static str, _value: impl std::fmt::Display) -> Self {
35+
self
36+
}
37+
}
38+
39+
impl Metrics for NoopMetrics {
40+
fn register<N: Into<String>, H: Into<String>, M: Metric>(
41+
&self,
42+
_name: N,
43+
_help: H,
44+
metric: M,
45+
) -> Registered<M> {
46+
Registered::with_registration(metric, Registration::from(()))
47+
}
48+
49+
fn encode(&self) -> String {
50+
String::new()
51+
}
52+
}
53+
54+
fn new<T: mailbox::Policy>(
55+
capacity: std::num::NonZeroUsize,
56+
) -> (mailbox::Sender<T>, mailbox::Receiver<T>) {
57+
mailbox::new(NoopMetrics, capacity)
58+
}
59+
1860
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
1961
enum Policy {
2062
Drop,
@@ -77,7 +119,7 @@ fn bench_enqueue_ready(c: &mut Criterion) {
77119

78120
group.bench_function(format!("capacity={CAPACITY}"), |b| {
79121
b.iter_batched(
80-
|| mailbox::new::<Message>(NZUsize!(CAPACITY)),
122+
|| new::<Message>(NZUsize!(CAPACITY)),
81123
|(sender, _receiver)| {
82124
for _ in 0..MESSAGES {
83125
let result = sender.enqueue(black_box(Message::drop()));
@@ -99,7 +141,7 @@ fn bench_try_recv_ready(c: &mut Criterion) {
99141
group.bench_function(format!("capacity={CAPACITY}"), |b| {
100142
b.iter_batched(
101143
|| {
102-
let (sender, receiver) = mailbox::new::<Message>(NZUsize!(CAPACITY));
144+
let (sender, receiver) = new::<Message>(NZUsize!(CAPACITY));
103145
for _ in 0..MESSAGES {
104146
assert_eq!(sender.enqueue(Message::drop()), Feedback::Ok);
105147
}
@@ -124,7 +166,7 @@ fn bench_try_recv_overflow(c: &mut Criterion) {
124166
group.bench_function(format!("capacity={CAPACITY} overflow={MESSAGES}"), |b| {
125167
b.iter_batched(
126168
|| {
127-
let (sender, receiver) = mailbox::new::<Message>(NZUsize!(CAPACITY));
169+
let (sender, receiver) = new::<Message>(NZUsize!(CAPACITY));
128170
for _ in 0..CAPACITY {
129171
assert_eq!(sender.enqueue(Message::drop()), Feedback::Ok);
130172
}
@@ -151,7 +193,7 @@ fn bench_round_trip_ready(c: &mut Criterion) {
151193

152194
group.bench_function(format!("capacity={CAPACITY}"), |b| {
153195
b.iter_batched(
154-
|| mailbox::new::<Message>(NZUsize!(CAPACITY)),
196+
|| new::<Message>(NZUsize!(CAPACITY)),
155197
|(sender, mut receiver)| {
156198
for _ in 0..MESSAGES {
157199
let result = sender.enqueue(black_box(Message::drop()));
@@ -173,7 +215,7 @@ fn bench_recv_waiting(c: &mut Criterion) {
173215

174216
group.bench_function("capacity=1", |b| {
175217
b.iter_batched(
176-
|| mailbox::new::<Message>(NZUsize!(1)),
218+
|| new::<Message>(NZUsize!(1)),
177219
|(sender, mut receiver)| {
178220
futures::executor::block_on(async {
179221
for _ in 0..MESSAGES {
@@ -207,7 +249,7 @@ fn bench_overflow_drop(c: &mut Criterion) {
207249
group.bench_function("capacity=1", |b| {
208250
b.iter_batched(
209251
|| {
210-
let (sender, receiver) = mailbox::new::<Message>(NZUsize!(1));
252+
let (sender, receiver) = new::<Message>(NZUsize!(1));
211253
assert_eq!(sender.enqueue(Message::drop()), Feedback::Ok);
212254
(sender, receiver)
213255
},
@@ -232,7 +274,7 @@ fn bench_overflow_spill(c: &mut Criterion) {
232274
group.bench_function("capacity=1", |b| {
233275
b.iter_batched(
234276
|| {
235-
let (sender, receiver) = mailbox::new::<Message>(NZUsize!(1));
277+
let (sender, receiver) = new::<Message>(NZUsize!(1));
236278
assert_eq!(sender.enqueue(Message::drop()), Feedback::Ok);
237279
(sender, receiver)
238280
},
@@ -251,7 +293,7 @@ fn bench_overflow_spill(c: &mut Criterion) {
251293
}
252294

253295
fn replace_queue(newest: bool) -> (mailbox::Sender<Message>, mailbox::Receiver<Message>) {
254-
let (sender, receiver) = mailbox::new::<Message>(NZUsize!(REPLACE_CAPACITY));
296+
let (sender, receiver) = new::<Message>(NZUsize!(REPLACE_CAPACITY));
255297

256298
for _ in 0..REPLACE_CAPACITY {
257299
assert_eq!(sender.enqueue(Message::drop()), Feedback::Ok);
@@ -300,7 +342,7 @@ fn bench_concurrent_enqueue(c: &mut Criterion) {
300342

301343
group.bench_function(format!("producers={PRODUCERS} capacity={total}"), |b| {
302344
b.iter(|| {
303-
let (sender, _receiver) = mailbox::new::<Message>(NZUsize!(total));
345+
let (sender, _receiver) = new::<Message>(NZUsize!(total));
304346

305347
std::thread::scope(|scope| {
306348
for _ in 0..PRODUCERS {

actor/src/mailbox.rs

Lines changed: 81 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@
4040
//! however, are not globally ordered and may be observed in any interleaving.
4141
4242
use crate::Feedback;
43+
use commonware_runtime::{
44+
telemetry::metrics::{Counter, MetricsExt as _},
45+
Metrics,
46+
};
4347
use std::{
4448
collections::VecDeque,
4549
fmt,
@@ -368,6 +372,7 @@ impl Drop for Mutation<'_> {
368372
struct State<T: Policy> {
369373
ready: Ready<T>,
370374
overflow: OverflowState<T>,
375+
backoff: Counter,
371376
closed: AtomicBool,
372377
senders: AtomicUsize,
373378
waker: AtomicWaker,
@@ -435,6 +440,11 @@ impl<T: Policy> Sender<T> {
435440
self.state.closed.load(Ordering::Acquire)
436441
});
437442

443+
// Record any backoff.
444+
if feedback == Feedback::Backoff {
445+
self.state.backoff.inc();
446+
}
447+
438448
// Wake on any handled enqueue because a receiver may have skipped
439449
// refill while this overflow mutation was active. By the time we wake,
440450
// the mutation has published its overflow state. Spurious wakes are
@@ -530,10 +540,11 @@ impl<T: Policy> Drop for Receiver<T> {
530540
}
531541

532542
/// Create a new bounded mailbox.
533-
pub fn new<T: Policy>(capacity: NonZeroUsize) -> (Sender<T>, Receiver<T>) {
543+
pub fn new<T: Policy>(metrics: impl Metrics, capacity: NonZeroUsize) -> (Sender<T>, Receiver<T>) {
534544
let state = Arc::new(State {
535545
ready: Ready::new(capacity.get()),
536546
overflow: OverflowState::new(),
547+
backoff: metrics.counter("backoff", "number of enqueue calls that requested backoff"),
537548
closed: AtomicBool::new(false),
538549
senders: AtomicUsize::new(1),
539550
waker: AtomicWaker::new(),
@@ -546,10 +557,52 @@ pub fn new<T: Policy>(capacity: NonZeroUsize) -> (Sender<T>, Receiver<T>) {
546557
)
547558
}
548559

560+
#[cfg(test)]
561+
mod mocks {
562+
use commonware_runtime::{
563+
telemetry::metrics::{Metric, Registered, Registration},
564+
Metrics as RuntimeMetrics, Name, Supervisor,
565+
};
566+
use std::fmt;
567+
568+
#[derive(Clone, Copy, Debug, Default)]
569+
pub(super) struct Metrics;
570+
571+
impl Supervisor for Metrics {
572+
fn name(&self) -> Name {
573+
Name::default()
574+
}
575+
576+
fn child(&self, _label: &'static str) -> Self {
577+
Self
578+
}
579+
580+
fn with_attribute(self, _key: &'static str, _value: impl fmt::Display) -> Self {
581+
self
582+
}
583+
}
584+
585+
impl RuntimeMetrics for Metrics {
586+
fn register<N: Into<String>, H: Into<String>, M: Metric>(
587+
&self,
588+
_name: N,
589+
_help: H,
590+
metric: M,
591+
) -> Registered<M> {
592+
Registered::with_registration(metric, Registration::from(()))
593+
}
594+
595+
fn encode(&self) -> String {
596+
String::new()
597+
}
598+
}
599+
}
600+
549601
#[cfg(all(test, not(feature = "loom")))]
550602
mod tests {
551-
use super::*;
603+
use super::{mocks, *};
552604
use commonware_macros::test_async;
605+
use commonware_runtime::{deterministic, Runner as _, Supervisor};
553606
use commonware_utils::{channel::oneshot, NZUsize};
554607
use futures::{
555608
pin_mut,
@@ -562,6 +615,10 @@ mod tests {
562615
Arc,
563616
};
564617

618+
fn new<T: Policy>(capacity: NonZeroUsize) -> (Sender<T>, Receiver<T>) {
619+
super::new(mocks::Metrics, capacity)
620+
}
621+
565622
#[derive(Debug, PartialEq, Eq)]
566623
enum Message {
567624
Update(u64),
@@ -706,6 +763,23 @@ mod tests {
706763
assert_eq!(receiver.try_recv(), Ok(Message::Buffered(2)));
707764
}
708765

766+
#[test]
767+
fn backoff_metric_counts_backoff_feedback() {
768+
let executor = deterministic::Runner::default();
769+
executor.start(|context| async move {
770+
let (sender, _receiver) = super::new(context.child("mailbox"), NZUsize!(1));
771+
assert_eq!(sender.enqueue(Message::Vote(1)), Feedback::Ok);
772+
assert_eq!(sender.enqueue(Message::Buffered(2)), Feedback::Backoff);
773+
assert_eq!(sender.enqueue(Message::Buffered(3)), Feedback::Backoff);
774+
775+
let buffer = context.encode();
776+
assert!(
777+
buffer.contains("mailbox_backoff_total 2"),
778+
"missing backoff count in metrics: {buffer}"
779+
);
780+
});
781+
}
782+
709783
#[test]
710784
fn try_recv_drains_buffered_messages_after_senders_drop() {
711785
let (sender, mut receiver) = new(NZUsize!(1));
@@ -990,7 +1064,7 @@ mod tests {
9901064

9911065
#[cfg(all(test, feature = "loom"))]
9921066
mod loom_tests {
993-
use super::*;
1067+
use super::{mocks, *};
9941068
use commonware_utils::NZUsize;
9951069
use futures::pin_mut;
9961070
use loom::{
@@ -1005,6 +1079,10 @@ mod loom_tests {
10051079
task::{RawWaker, RawWakerVTable, Waker},
10061080
};
10071081

1082+
fn new<T: Policy>(capacity: NonZeroUsize) -> (Sender<T>, Receiver<T>) {
1083+
super::new(mocks::Metrics, capacity)
1084+
}
1085+
10081086
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
10091087
enum Message {
10101088
Drop(u8),

broadcast/src/buffered/engine.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,8 @@ where
117117
/// Creates a new engine with the given context and configuration.
118118
/// Returns the engine and a mailbox for sending messages to the engine.
119119
pub fn new(context: E, cfg: Config<P, M::Cfg, D>) -> (Self, Mailbox<P, M>) {
120-
let (mailbox_sender, mailbox_receiver) = mailbox::new(cfg.mailbox_size);
120+
let (mailbox_sender, mailbox_receiver) =
121+
mailbox::new(context.child("mailbox"), cfg.mailbox_size);
121122
let mailbox = Mailbox::<P, M>::new(mailbox_sender);
122123

123124
let metrics = metrics::Metrics::init(&context);

consensus/src/aggregation/mocks/reporter.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use commonware_actor::{
1212
use commonware_codec::{Decode, DecodeExt, Encode};
1313
use commonware_cryptography::{certificate::Scheme, Digest};
1414
use commonware_parallel::Sequential;
15-
use commonware_runtime::{spawn_cell, ContextCell, Handle, Spawner};
15+
use commonware_runtime::{spawn_cell, ContextCell, Handle, Metrics, Spawner};
1616
use commonware_utils::{channel::oneshot, NZUsize};
1717
use rand_core::CryptoRngCore;
1818
use std::collections::{btree_map::Entry, BTreeMap, HashSet, VecDeque};
@@ -63,12 +63,12 @@ pub struct Reporter<R: CryptoRngCore, S: Scheme, D: Digest> {
6363

6464
impl<R, S, D> Reporter<R, S, D>
6565
where
66-
R: CryptoRngCore,
66+
R: CryptoRngCore + Metrics,
6767
S: scheme::Scheme<D>,
6868
D: Digest,
6969
{
7070
pub fn new(rng: R, scheme: S) -> (Self, Mailbox<S, D>) {
71-
let (sender, receiver) = mailbox::new(NZUsize!(1024));
71+
let (sender, receiver) = mailbox::new(rng.child("mailbox"), NZUsize!(1024));
7272
(
7373
Self {
7474
context: ContextCell::new(rng),

consensus/src/marshal/coding/shards/engine.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ where
369369
/// Create a new [`Engine`] with the given configuration.
370370
pub fn new(context: E, config: Config<P, S, X, D, C, H, B, T>) -> (Self, Mailbox<B, C, H, P>) {
371371
let metrics = ShardMetrics::new(&context);
372-
let (sender, mailbox) = mailbox::new(config.mailbox_size);
372+
let (sender, mailbox) = mailbox::new(context.child("mailbox"), config.mailbox_size);
373373
(
374374
Self {
375375
context: ContextCell::new(context),

consensus/src/marshal/core/actor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ where
335335
let _ = processed_height.try_set(last_processed_height.get());
336336

337337
// Initialize mailbox
338-
let (sender, mailbox) = mailbox::new(config.mailbox_size);
338+
let (sender, mailbox) = mailbox::new(context.child("mailbox"), config.mailbox_size);
339339
(
340340
Self {
341341
context: ContextCell::new(context),

consensus/src/marshal/resolver/p2p.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,10 @@ where
6161
R: Receiver<PublicKey = P>,
6262
P: PublicKey,
6363
{
64-
let (sender, receiver) = mailbox::new(config.mailbox_size);
64+
let (sender, receiver) = mailbox::new(context.child("handler"), config.mailbox_size);
6565
let handler = handler::Handler::new(sender);
6666
let (resolver_engine, resolver) = p2p::Engine::new(
67-
context,
67+
context.child("resolver"),
6868
p2p::Config {
6969
peer_provider: config.peer_provider,
7070
blocker: config.blocker,

0 commit comments

Comments
 (0)