Skip to content

Commit 9354757

Browse files
authored
Add metrics_rs integration for task metrics (#100)
1 parent 503a3b1 commit 9354757

File tree

6 files changed

+561
-212
lines changed

6 files changed

+561
-212
lines changed

src/lib.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,5 +179,14 @@ cfg_rt! {
179179
)]
180180
pub use runtime::metrics_rs_integration::{RuntimeMetricsReporter, RuntimeMetricsReporterBuilder};
181181

182+
#[cfg(all(feature = "rt", feature = "metrics-rs-integration"))]
183+
mod metrics_rs;
182184
mod task;
185+
186+
#[cfg(all(feature = "rt", feature = "metrics-rs-integration"))]
187+
#[cfg_attr(
188+
docsrs,
189+
doc(cfg(all(feature = "rt", feature = "metrics-rs-integration")))
190+
)]
191+
pub use task::metrics_rs_integration::{TaskMetricsReporter, TaskMetricsReporterBuilder};
183192
pub use task::{Instrumented, TaskMetrics, TaskMonitor};

src/metrics_rs.rs

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
use std::time::Duration;
2+
3+
pub(crate) const DEFAULT_METRIC_SAMPLING_INTERVAL: Duration = Duration::from_secs(30);
4+
5+
macro_rules! kind_to_type {
6+
(Counter) => {
7+
metrics::Counter
8+
};
9+
(Gauge) => {
10+
metrics::Gauge
11+
};
12+
(PollTimeHistogram) => {
13+
metrics::Histogram
14+
};
15+
}
16+
17+
macro_rules! metric_key {
18+
($transform_fn:ident, $name:ident) => {
19+
$transform_fn(concat!("tokio_", stringify!($name)))
20+
};
21+
}
22+
23+
// calling `trim` since /// inserts spaces into docs
24+
macro_rules! describe_metric_ref {
25+
($transform_fn:ident, $doc:expr, $name:ident: Counter<$unit:ident> []) => {
26+
metrics::describe_counter!(
27+
crate::metrics_rs::metric_key!($transform_fn, $name)
28+
.name()
29+
.to_owned(),
30+
metrics::Unit::$unit,
31+
$doc.trim()
32+
)
33+
};
34+
($transform_fn:ident, $doc:expr, $name:ident: Gauge<$unit:ident> []) => {
35+
metrics::describe_gauge!(
36+
crate::metrics_rs::metric_key!($transform_fn, $name)
37+
.name()
38+
.to_owned(),
39+
metrics::Unit::$unit,
40+
$doc.trim()
41+
)
42+
};
43+
($transform_fn:ident, $doc:expr, $name:ident: PollTimeHistogram<$unit:ident> []) => {
44+
metrics::describe_histogram!(
45+
crate::metrics_rs::metric_key!($transform_fn, $name)
46+
.name()
47+
.to_owned(),
48+
metrics::Unit::$unit,
49+
$doc.trim()
50+
)
51+
};
52+
}
53+
54+
macro_rules! capture_metric_ref {
55+
($transform_fn:ident, $name:ident: Counter []) => {{
56+
let (name, labels) = crate::metrics_rs::metric_key!($transform_fn, $name).into_parts();
57+
metrics::counter!(name, labels)
58+
}};
59+
($transform_fn:ident, $name:ident: Gauge []) => {{
60+
let (name, labels) = crate::metrics_rs::metric_key!($transform_fn, $name).into_parts();
61+
metrics::gauge!(name, labels)
62+
}};
63+
($transform_fn:ident, $name:ident: PollTimeHistogram []) => {{
64+
let (name, labels) = crate::metrics_rs::metric_key!($transform_fn, $name).into_parts();
65+
metrics::histogram!(name, labels)
66+
}};
67+
}
68+
69+
macro_rules! metric_refs {
70+
(
71+
[$struct_name:ident] [$($ignore:ident),* $(,)?] [$metrics_name:ty] [$emit_arg_type:ty] {
72+
stable {
73+
$(
74+
#[doc = $doc:tt]
75+
$name:ident: $kind:tt <$unit:ident> $opts:tt
76+
),*
77+
$(,)?
78+
}
79+
unstable {
80+
$(
81+
#[doc = $unstable_doc:tt]
82+
$unstable_name:ident: $unstable_kind:tt <$unstable_unit:ident> $unstable_opts:tt
83+
),*
84+
$(,)?
85+
}
86+
}
87+
) => {
88+
struct $struct_name {
89+
$(
90+
$name: crate::metrics_rs::kind_to_type!($kind),
91+
)*
92+
$(
93+
#[cfg(tokio_unstable)]
94+
$unstable_name: crate::metrics_rs::kind_to_type!($unstable_kind),
95+
)*
96+
}
97+
98+
impl $struct_name {
99+
fn capture(transform_fn: &mut dyn FnMut(&'static str) -> metrics::Key) -> Self {
100+
Self {
101+
$(
102+
$name: crate::metrics_rs::capture_metric_ref!(transform_fn, $name: $kind $opts),
103+
)*
104+
$(
105+
#[cfg(tokio_unstable)]
106+
$unstable_name: crate::metrics_rs::capture_metric_ref!(transform_fn, $unstable_name: $unstable_kind $unstable_opts),
107+
)*
108+
}
109+
}
110+
111+
fn emit(&self, metrics: $metrics_name, emit_arg: $emit_arg_type) {
112+
$(
113+
crate::metrics_rs::MyMetricOp::op((&self.$name, metrics.$name), emit_arg);
114+
)*
115+
$(
116+
#[cfg(tokio_unstable)]
117+
crate::metrics_rs::MyMetricOp::op((&self.$unstable_name, metrics.$unstable_name), emit_arg);
118+
)*
119+
}
120+
121+
fn describe(transform_fn: &mut dyn FnMut(&'static str) -> metrics::Key) {
122+
$(
123+
crate::metrics_rs::describe_metric_ref!(transform_fn, $doc, $name: $kind<$unit> $opts);
124+
)*
125+
$(
126+
#[cfg(tokio_unstable)]
127+
crate::metrics_rs::describe_metric_ref!(transform_fn, $unstable_doc, $unstable_name: $unstable_kind<$unstable_unit> $unstable_opts);
128+
)*
129+
}
130+
}
131+
132+
#[test]
133+
fn test_no_fields_missing() {
134+
// test that no fields are missing. We can't use exhaustive matching here
135+
// since the metrics structs are #[non_exhaustive], so use a debug impl
136+
let debug = format!("{:#?}", <$metrics_name>::default());
137+
for line in debug.lines() {
138+
if line == format!("{} {{", stringify!($metrics_name)) || line == "}" {
139+
continue
140+
}
141+
$(
142+
let expected = format!(" {}:", stringify!($ignore));
143+
if line.contains(&expected) {
144+
continue
145+
}
146+
);*
147+
$(
148+
let expected = format!(" {}:", stringify!($name));
149+
eprintln!("{}", expected);
150+
if line.contains(&expected) {
151+
continue
152+
}
153+
);*
154+
$(
155+
let expected = format!(" {}:", stringify!($unstable_name));
156+
eprintln!("{}", expected);
157+
if line.contains(&expected) {
158+
continue
159+
}
160+
);*
161+
panic!("missing metric {:?}", line);
162+
}
163+
}
164+
}
165+
}
166+
167+
pub(crate) use capture_metric_ref;
168+
pub(crate) use describe_metric_ref;
169+
pub(crate) use kind_to_type;
170+
pub(crate) use metric_key;
171+
pub(crate) use metric_refs;
172+
173+
pub(crate) trait MyMetricOp<T> {
174+
fn op(self, t: T);
175+
}
176+
177+
impl<T> MyMetricOp<T> for (&metrics::Counter, Duration) {
178+
fn op(self, _: T) {
179+
self.0
180+
.increment(self.1.as_micros().try_into().unwrap_or(u64::MAX));
181+
}
182+
}
183+
184+
impl<T> MyMetricOp<T> for (&metrics::Counter, u64) {
185+
fn op(self, _t: T) {
186+
self.0.increment(self.1);
187+
}
188+
}
189+
190+
impl<T> MyMetricOp<T> for (&metrics::Gauge, Duration) {
191+
fn op(self, _t: T) {
192+
self.0.set(self.1.as_micros() as f64);
193+
}
194+
}
195+
196+
impl<T> MyMetricOp<T> for (&metrics::Gauge, u64) {
197+
fn op(self, _: T) {
198+
self.0.set(self.1 as f64);
199+
}
200+
}
201+
202+
impl<T> MyMetricOp<T> for (&metrics::Gauge, usize) {
203+
fn op(self, _t: T) {
204+
self.0.set(self.1 as f64);
205+
}
206+
}
207+
208+
#[cfg(tokio_unstable)]
209+
impl MyMetricOp<&tokio::runtime::RuntimeMetrics> for (&metrics::Histogram, Vec<u64>) {
210+
fn op(self, tokio: &tokio::runtime::RuntimeMetrics) {
211+
for (i, bucket) in self.1.iter().enumerate() {
212+
let range = tokio.poll_time_histogram_bucket_range(i);
213+
if *bucket > 0 {
214+
// emit using range.start to avoid very large numbers for open bucket
215+
// FIXME: do we want to do something else here?
216+
self.0
217+
.record_many(range.start.as_micros() as f64, *bucket as usize);
218+
}
219+
}
220+
}
221+
}

0 commit comments

Comments
 (0)