Skip to content

Commit 054745a

Browse files
author
Ariel Ben-Yehuda
committed
test it as well
1 parent 2b8ba07 commit 054745a

File tree

3 files changed

+124
-10
lines changed

3 files changed

+124
-10
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ num_cpus = "1.13.1"
3737
serde = { version = "1.0.136", features = ["derive"] }
3838
serde_json = "1.0.79"
3939
tokio = { version = "1.41.0", features = ["full", "rt", "time", "macros", "test-util"] }
40+
metrics-util = { version = "0.19", features = ["debugging"] }
41+
metrics = { version = "0.24" }
4042

4143
[[example]]
4244
name = "runtime"

src/runtime/metrics_integration.rs

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
use std::{fmt, time::Duration};
22

3+
use tokio::runtime::Handle;
4+
35
use super::{RuntimeIntervals, RuntimeMetrics, RuntimeMonitor};
46

57
/// A reporter builder
68
pub struct RuntimeMetricsReporterBuilder {
79
interval: Duration,
8-
metrics_transformer: Box<dyn FnMut(&'static str) -> metrics::Key>,
10+
metrics_transformer: Box<dyn FnMut(&'static str) -> metrics::Key + Send>,
911
}
1012

1113
impl fmt::Debug for RuntimeMetricsReporterBuilder {
@@ -35,7 +37,12 @@ impl RuntimeMetricsReporterBuilder {
3537
}
3638

3739
/// Build the reporter
38-
pub fn build(mut self, monitor: RuntimeMonitor) -> RuntimeMetricsReporter {
40+
pub fn build(self) -> RuntimeMetricsReporter {
41+
self.build_with_monitor(RuntimeMonitor::new(&Handle::current()))
42+
}
43+
44+
/// Build the reporter with a specific [`RuntimeMonitor`]
45+
pub fn build_with_monitor(mut self, monitor: RuntimeMonitor) -> RuntimeMetricsReporter {
3946
RuntimeMetricsReporter {
4047
interval: self.interval,
4148
intervals: monitor.intervals(),
@@ -50,13 +57,13 @@ impl RuntimeMetricsReporterBuilder {
5057
}
5158

5259
/// Run the reporter, describing the metrics beforehand
53-
pub async fn describe_and_run(self, monitor: RuntimeMonitor) {
54-
self.describe().build(monitor).run().await;
60+
pub async fn describe_and_run(self) {
61+
self.describe().build().run().await;
5562
}
5663

5764
/// Run the reporter, not describing the metrics beforehand
58-
pub async fn run_without_describing(self, monitor: RuntimeMonitor) {
59-
self.build(monitor).run().await;
65+
pub async fn run_without_describing(self) {
66+
self.build().run().await;
6067
}
6168
}
6269

@@ -77,15 +84,16 @@ macro_rules! metric_key {
7784
($transform_fn:ident, $name:ident) => ($transform_fn(concat!("tokio_", stringify!($name))))
7885
}
7986

87+
// calling `trim` since /// inserts spaces into docs
8088
macro_rules! describe_metric_ref {
8189
($transform_fn:ident, $doc:expr, $name:ident: Counter<$unit:ident> []) => (
82-
metrics::describe_counter!(metric_key!($transform_fn, $name).name().to_owned(), metrics::Unit::$unit, $doc)
90+
metrics::describe_counter!(metric_key!($transform_fn, $name).name().to_owned(), metrics::Unit::$unit, $doc.trim())
8391
);
8492
($transform_fn:ident, $doc:expr, $name:ident: Gauge<$unit:ident> []) => (
85-
metrics::describe_gauge!(metric_key!($transform_fn, $name).name().to_owned(), metrics::Unit::$unit, $doc)
93+
metrics::describe_gauge!(metric_key!($transform_fn, $name).name().to_owned(), metrics::Unit::$unit, $doc.trim())
8694
);
8795
($transform_fn:ident, $doc:expr, $name:ident: Histogram<$unit:ident> []) => (
88-
metrics::describe_histogram!(metric_key!($transform_fn, $name).name().to_owned(), metrics::Unit::$unit, $doc)
96+
metrics::describe_histogram!(metric_key!($transform_fn, $name).name().to_owned(), metrics::Unit::$unit, $doc.trim())
8997
);
9098
}
9199

@@ -260,7 +268,11 @@ impl MyMetricOp for (&metrics::Histogram, Vec<u64>) {
260268
fn op(self, tokio: &tokio::runtime::RuntimeMetrics) {
261269
for (i, bucket) in self.1.iter().enumerate() {
262270
let range = tokio.poll_time_histogram_bucket_range(i);
263-
self.0.record_many(((range.start + range.end).as_micros() / 2) as f64, *bucket as usize);
271+
if *bucket > 0 {
272+
// emit using range.start to avoid very large numbers for open bucket
273+
// FIXME: do we want to do something else here
274+
self.0.record_many(range.start.as_micros() as f64, *bucket as usize);
275+
}
264276
}
265277
}
266278
}

tests/auto_metrics.rs

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
macro_rules! cfg_rt {
2+
($($item:item)*) => {
3+
$(
4+
#[cfg(all(tokio_unstable, feature = "rt"))]
5+
#[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "rt"))))]
6+
$item
7+
)*
8+
};
9+
}
10+
11+
cfg_rt! {
12+
#[cfg(feature = "metrics-integration")]
13+
#[test]
14+
fn main() {
15+
use metrics_util::debugging::DebugValue;
16+
use std::{sync::Arc, time::Duration};
17+
use tokio::runtime::{HistogramConfiguration, LogHistogram};
18+
use tokio_metrics::RuntimeMetricsReporterBuilder;
19+
20+
let worker_threads = 10;
21+
22+
let config = HistogramConfiguration::log(LogHistogram::default());
23+
24+
let rt = tokio::runtime::Builder::new_multi_thread()
25+
.enable_time()
26+
.enable_metrics_poll_time_histogram()
27+
.metrics_poll_time_histogram_configuration(config)
28+
.worker_threads(worker_threads)
29+
.build()
30+
.unwrap();
31+
32+
rt.block_on(async {
33+
let recorder = Arc::new(metrics_util::debugging::DebuggingRecorder::new());
34+
metrics::set_global_recorder(recorder.clone()).unwrap();
35+
tokio::task::spawn(RuntimeMetricsReporterBuilder::default().with_interval(Duration::from_millis(100)).describe_and_run());
36+
let mut done = false;
37+
for _ in 0..1000 {
38+
tokio::time::sleep(Duration::from_millis(10)).await;
39+
let snapshot = recorder.snapshotter().snapshot().into_vec();
40+
if let Some(metric) = snapshot.iter().find(|metrics| {
41+
metrics.0.key().name() == "tokio_workers_count"
42+
}) {
43+
done = true;
44+
match metric {
45+
(_, Some(metrics::Unit::Count), Some(s), DebugValue::Gauge(count))
46+
if &s[..] == "The number of worker threads used by the runtime" =>
47+
{
48+
assert_eq!(count.into_inner() as usize, worker_threads);
49+
}
50+
_ => panic!("bad {metric:?}"),
51+
}
52+
break;
53+
}
54+
}
55+
assert!(done, "metric not found");
56+
tokio::task::spawn(async {
57+
// spawn a thread with a long poll time, let's see we can find it
58+
std::thread::sleep(std::time::Duration::from_millis(100));
59+
}).await.unwrap();
60+
let mut long_polls_found = 0;
61+
for _ in 0..15 {
62+
tokio::time::sleep(Duration::from_millis(100)).await;
63+
let snapshot = recorder.snapshotter().snapshot().into_vec();
64+
if let Some(metric) = snapshot.iter().find(|metrics| {
65+
metrics.0.key().name() == "tokio_poll_time_histogram"
66+
}) {
67+
match metric {
68+
(_, Some(metrics::Unit::Microseconds), Some(s), DebugValue::Histogram(hist))
69+
if &s[..] == "A histogram of task polls since the previous probe grouped by poll times" =>
70+
{
71+
for entry in hist {
72+
// look for a poll of around 100 milliseconds
73+
// the default bucket for 100 milliseconds is between 100 and 100/1.25 = 80
74+
if entry.into_inner() >= 80e3 && entry.into_inner() <= 250e3 {
75+
long_polls_found += 1;
76+
}
77+
}
78+
}
79+
_ => panic!("bad {metric:?}"),
80+
}
81+
}
82+
let metric = snapshot.iter().find(|metrics| {
83+
metrics.0.key().name() == "tokio_total_polls_count"
84+
}).unwrap();
85+
match metric {
86+
(_, Some(metrics::Unit::Count), Some(s), DebugValue::Counter(count))
87+
if &s[..] == "The number of tasks that have been polled across all worker threads" && *count > 0 =>
88+
{
89+
}
90+
_ => panic!("bad {metric:?}"),
91+
}
92+
if long_polls_found > 0 {
93+
break
94+
}
95+
}
96+
// check that we found exactly 1 poll in the 100ms region
97+
assert_eq!(long_polls_found, 1);
98+
});
99+
}
100+
}

0 commit comments

Comments
 (0)