diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6a3f1e1..dbe0b67 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -75,10 +75,18 @@ jobs: run: cargo hack check --each-feature --no-dev-deps --all test-versions: + name: test-version (${{ matrix.name }}) needs: check runs-on: ubuntu-latest + strategy: + matrix: + include: + - rustflags: "--cfg tokio_unstable -Dwarnings" + name: "tokio-unstable" + - rustflags: "-Dwarnings" + name: "stable" env: - RUSTFLAGS: --cfg tokio_unstable -Dwarnings + RUSTFLAGS: ${{ matrix.rustflags }} steps: - uses: actions/checkout@master - uses: actions-rs/toolchain@v1 diff --git a/Cargo.toml b/Cargo.toml index e8faa2f..3c001b0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ rt = ["tokio"] tokio-stream = "0.1.11" futures-util = "0.3.19" pin-project-lite = "0.2.7" -tokio = { version = "1.41.0", features = ["rt", "time", "net"], optional = true } +tokio = { version = "1.45.1", features = ["rt", "time", "net"], optional = true } metrics = { version = "0.24", optional = true } [dev-dependencies] @@ -36,7 +36,7 @@ futures = "0.3.21" num_cpus = "1.13.1" serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.79" -tokio = { version = "1.41.0", features = ["full", "rt", "time", "macros", "test-util"] } +tokio = { version = "1.45.1", features = ["full", "rt", "time", "macros", "test-util"] } metrics-util = { version = "0.19", features = ["debugging"] } metrics = { version = "0.24" } metrics-exporter-prometheus = { version = "0.16", features = ["uds-listener"] } diff --git a/README.md b/README.md index 6df0172..173725a 100644 --- a/README.md +++ b/README.md @@ -146,7 +146,7 @@ loop { ## Getting Started With Runtime Metrics -This unstable functionality requires `tokio_unstable`, and the `rt` crate +Not all runtime metrics are stable. Using unstable metrics requires `tokio_unstable`, and the `rt` crate feature. To enable `tokio_unstable`, the `--cfg` `tokio_unstable` must be passed to `rustc` when compiling. You can do this by setting the `RUSTFLAGS` environment variable before compiling your application; e.g.: @@ -170,6 +170,17 @@ More information about where cargo looks for configuration files can be found Missing this configuration file during compilation will cause tokio-metrics to not work, and alternating between building with and without this configuration file included will cause full rebuilds of your project. +### Stable Runtime Metrics + +- **[`workers_count`]** +- **[`total_park_count`]** +- **[`max_park_count`]** +- **[`min_park_count`]** +- **[`total_busy_duration`]** +- **[`max_busy_duration`]** +- **[`min_busy_duration`]** +- **[`global_queue_depth`]** + ### Collecting Runtime Metrics directly The `rt` feature of `tokio-metrics` is on by default; simply check that you do diff --git a/examples/axum.rs b/examples/axum.rs index 5ac41cc..5591dc9 100644 --- a/examples/axum.rs +++ b/examples/axum.rs @@ -40,9 +40,9 @@ async fn main() { let intervals = root_intervals.zip(create_user_intervals); for (root_route, (create_user_route, create_user_insert)) in intervals { - println!("root_route = {:#?}", root_route); - println!("create_user_route = {:#?}", create_user_route); - println!("create_user_insert = {:#?}", create_user_insert); + println!("root_route = {root_route:#?}"); + println!("create_user_route = {create_user_route:#?}"); + println!("create_user_insert = {create_user_insert:#?}"); tokio::time::sleep(metrics_frequency).await; } }); diff --git a/examples/runtime.rs b/examples/runtime.rs index 7834710..45d0c94 100644 --- a/examples/runtime.rs +++ b/examples/runtime.rs @@ -11,7 +11,7 @@ async fn main() -> Result<(), Box> { tokio::spawn(async move { for interval in runtime_monitor.intervals() { // pretty-print the metric interval - println!("{:?}", interval); + println!("{interval:?}"); // wait 500ms tokio::time::sleep(Duration::from_millis(500)).await; } diff --git a/examples/stream.rs b/examples/stream.rs index d2928bd..eef0294 100644 --- a/examples/stream.rs +++ b/examples/stream.rs @@ -12,7 +12,7 @@ async fn main() -> Result<(), Box> { tokio::spawn(async move { for deltas in metrics_monitor.intervals() { // pretty-print the metric deltas - println!("{:?}", deltas); + println!("{deltas:?}"); // wait 500ms tokio::time::sleep(Duration::from_millis(500)).await; } diff --git a/examples/task.rs b/examples/task.rs index 914a214..9d995da 100644 --- a/examples/task.rs +++ b/examples/task.rs @@ -10,7 +10,7 @@ async fn main() -> Result<(), Box> { tokio::spawn(async move { for deltas in metrics_monitor.intervals() { // pretty-print the metric deltas - println!("{:?}", deltas); + println!("{deltas:?}"); // wait 500ms tokio::time::sleep(Duration::from_millis(500)).await; } diff --git a/src/lib.rs b/src/lib.rs index b2ab1a9..c45be13 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -56,11 +56,11 @@ //! ``` #![cfg_attr( - all(tokio_unstable, feature = "rt"), + feature = "rt", doc = r##" -### Monitoring runtime metrics (unstable) +### Monitoring runtime metrics [Monitor][RuntimeMonitor] key [metrics][RuntimeMetrics] of a tokio runtime. -**This functionality requires `tokio_unstable` and the crate feature `rt`.** +**This functionality requires crate feature `rt` and some metrics require `tokio_unstable`.** In the below example, a [`RuntimeMonitor`] is [constructed][RuntimeMonitor::new] and three tasks are spawned and awaited; meanwhile, a fourth task prints [metrics][RuntimeMetrics] @@ -104,7 +104,7 @@ async fn do_work() { } ``` -### Monitoring and publishing runtime metrics (unstable) +### Monitoring and publishing runtime metrics If the `metrics-rs-integration` feature is additionally enabled, this crate allows publishing runtime metrics externally via [metrics-rs](metrics) exporters. @@ -156,8 +156,8 @@ async fn main() { macro_rules! cfg_rt { ($($item:item)*) => { $( - #[cfg(all(tokio_unstable, feature = "rt"))] - #[cfg_attr(docsrs, doc(cfg(all(tokio_unstable, feature = "rt"))))] + #[cfg(feature = "rt")] + #[cfg_attr(docsrs, doc(cfg(feature = "rt")))] $item )* }; @@ -172,10 +172,10 @@ cfg_rt! { }; } -#[cfg(all(tokio_unstable, feature = "rt", feature = "metrics-rs-integration"))] +#[cfg(all(feature = "rt", feature = "metrics-rs-integration"))] #[cfg_attr( docsrs, - doc(cfg(all(tokio_unstable, feature = "rt", feature = "metrics-rs-integration"))) + doc(cfg(all(feature = "rt", feature = "metrics-rs-integration"))) )] pub use runtime::metrics_rs_integration::{RuntimeMetricsReporter, RuntimeMetricsReporterBuilder}; diff --git a/src/runtime.rs b/src/runtime.rs index ea5002e..e67a01d 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -51,1179 +51,1265 @@ pub struct RuntimeMonitor { runtime: runtime::RuntimeMetrics, } -/// Key runtime metrics. -#[non_exhaustive] -#[derive(Default, Debug, Clone)] -pub struct RuntimeMetrics { - /// The number of worker threads used by the runtime. - /// - /// This metric is static for a runtime. - /// - /// This metric is always equal to [`tokio::runtime::RuntimeMetrics::num_workers`]. - /// When using the `current_thread` runtime, the return value is always `1`. - /// - /// The number of workers is set by configuring - /// [`worker_threads`][`tokio::runtime::Builder::worker_threads`] with - /// [`tokio::runtime::Builder`], or by parameterizing [`tokio::main`]. - /// - /// ##### Examples - /// In the below example, the number of workers is set by parameterizing [`tokio::main`]: - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main(flavor = "multi_thread", worker_threads = 10)] - /// async fn main() { - /// let handle = tokio::runtime::Handle::current(); - /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); - /// let mut intervals = monitor.intervals(); - /// let mut next_interval = || intervals.next().unwrap(); - /// - /// assert_eq!(next_interval().workers_count, 10); - /// } - /// ``` - /// - /// [`tokio::main`]: https://docs.rs/tokio/latest/tokio/attr.main.html - /// - /// When using the `current_thread` runtime, the return value is always `1`; e.g.: - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main(flavor = "current_thread")] - /// async fn main() { - /// let handle = tokio::runtime::Handle::current(); - /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); - /// let mut intervals = monitor.intervals(); - /// let mut next_interval = || intervals.next().unwrap(); - /// - /// assert_eq!(next_interval().workers_count, 1); - /// } - /// ``` - /// - /// This metric is always equal to [`tokio::runtime::RuntimeMetrics::num_workers`]; e.g.: - /// ``` - /// use tokio::runtime::Handle; - /// - /// #[tokio::main] - /// async fn main() { - /// let handle = Handle::current(); - /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); - /// let mut intervals = monitor.intervals(); - /// let mut next_interval = || intervals.next().unwrap(); - /// - /// assert_eq!(next_interval().workers_count, handle.metrics().num_workers()); - /// } - /// ``` - pub workers_count: usize, - - /// The number of times worker threads parked. - /// - /// The worker park count increases by one each time the worker parks the thread waiting for - /// new inbound events to process. This usually means the worker has processed all pending work - /// and is currently idle. - /// - /// ##### Definition - /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_park_count`] - /// across all worker threads. - /// - /// ##### See also - /// - [`RuntimeMetrics::max_park_count`] - /// - [`RuntimeMetrics::min_park_count`] - /// - /// ##### Examples - /// ``` - /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)] - /// async fn main() { - /// let handle = tokio::runtime::Handle::current(); - /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); - /// let mut intervals = monitor.intervals(); - /// let mut next_interval = || intervals.next().unwrap(); - /// - /// let interval = next_interval(); // end of interval 1 - /// assert_eq!(interval.total_park_count, 0); - /// - /// induce_parks().await; - /// - /// let interval = next_interval(); // end of interval 2 - /// assert!(interval.total_park_count >= 1); // usually 1 or 2 parks - /// } - /// - /// async fn induce_parks() { - /// let _ = tokio::time::timeout(std::time::Duration::ZERO, async { - /// loop { tokio::task::yield_now().await; } - /// }).await; - /// } - /// ``` - pub total_park_count: u64, - - /// The maximum number of times any worker thread parked. - /// - /// ##### Definition - /// This metric is derived from the maximum of - /// [`tokio::runtime::RuntimeMetrics::worker_park_count`] across all worker threads. - /// - /// ##### See also - /// - [`RuntimeMetrics::total_park_count`] - /// - [`RuntimeMetrics::min_park_count`] - pub max_park_count: u64, - - /// The minimum number of times any worker thread parked. - /// - /// ##### Definition - /// This metric is derived from the maximum of - /// [`tokio::runtime::RuntimeMetrics::worker_park_count`] across all worker threads. - /// - /// ##### See also - /// - [`RuntimeMetrics::total_park_count`] - /// - [`RuntimeMetrics::max_park_count`] - pub min_park_count: u64, - - /// The average duration of a single invocation of poll on a task. - /// - /// This average is an exponentially-weighted moving average of the duration - /// of task polls on all runtime workers. - /// - /// ##### Examples - /// ``` - /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)] - /// async fn main() { - /// let handle = tokio::runtime::Handle::current(); - /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); - /// let mut intervals = monitor.intervals(); - /// let mut next_interval = || intervals.next().unwrap(); - /// - /// let interval = next_interval(); - /// println!("mean task poll duration is {:?}", interval.mean_poll_duration); - /// } - /// ``` - pub mean_poll_duration: Duration, - - /// The average duration of a single invocation of poll on a task on the - /// worker with the lowest value. - /// - /// This average is an exponentially-weighted moving average of the duration - /// of task polls on the runtime worker with the lowest value. - /// - /// ##### Examples - /// ``` - /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)] - /// async fn main() { - /// let handle = tokio::runtime::Handle::current(); - /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); - /// let mut intervals = monitor.intervals(); - /// let mut next_interval = || intervals.next().unwrap(); - /// - /// let interval = next_interval(); - /// println!("min mean task poll duration is {:?}", interval.mean_poll_duration_worker_min); - /// } - /// ``` - pub mean_poll_duration_worker_min: Duration, - - /// The average duration of a single invocation of poll on a task on the - /// worker with the highest value. - /// - /// This average is an exponentially-weighted moving average of the duration - /// of task polls on the runtime worker with the highest value. - /// - /// ##### Examples - /// ``` - /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)] - /// async fn main() { - /// let handle = tokio::runtime::Handle::current(); - /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); - /// let mut intervals = monitor.intervals(); - /// let mut next_interval = || intervals.next().unwrap(); - /// - /// let interval = next_interval(); - /// println!("max mean task poll duration is {:?}", interval.mean_poll_duration_worker_max); - /// } - /// ``` - pub mean_poll_duration_worker_max: Duration, - - /// A histogram of task polls since the previous probe grouped by poll - /// times. - /// - /// This metric must be explicitly enabled when creating the runtime with - /// [`enable_metrics_poll_time_histogram`][tokio::runtime::Builder::enable_metrics_poll_time_histogram]. - /// Bucket sizes are fixed and configured at the runtime level. See - /// configuration options on - /// [`runtime::Builder`][tokio::runtime::Builder::enable_metrics_poll_time_histogram]. - /// - /// ##### Examples - /// ``` - /// use tokio::runtime::HistogramConfiguration; - /// use std::time::Duration; - /// - /// let config = HistogramConfiguration::linear(Duration::from_micros(50), 12); - /// - /// let rt = tokio::runtime::Builder::new_multi_thread() - /// .enable_metrics_poll_time_histogram() - /// .metrics_poll_time_histogram_configuration(config) - /// .build() - /// .unwrap(); - /// - /// rt.block_on(async { - /// let handle = tokio::runtime::Handle::current(); - /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); - /// let mut intervals = monitor.intervals(); - /// let mut next_interval = || intervals.next().unwrap(); - /// - /// let interval = next_interval(); - /// println!("poll count histogram {:?}", interval.poll_time_histogram); - /// }); - /// ``` - pub poll_time_histogram: Vec, - - /// The number of times worker threads unparked but performed no work before parking again. - /// - /// The worker no-op count increases by one each time the worker unparks the thread but finds - /// no new work and goes back to sleep. This indicates a false-positive wake up. - /// - /// ##### Definition - /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_noop_count`] - /// across all worker threads. - /// - /// ##### Examples - /// Unfortunately, there isn't a great way to reliably induce no-op parks, as they occur as - /// false-positive events under concurrency. - /// - /// The below example triggers fewer than two parks in the single-threaded runtime: - /// ``` - /// #[tokio::main(flavor = "current_thread")] - /// async fn main() { - /// let handle = tokio::runtime::Handle::current(); - /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); - /// let mut intervals = monitor.intervals(); - /// let mut next_interval = || intervals.next().unwrap(); - /// - /// assert_eq!(next_interval().total_park_count, 0); - /// - /// async { - /// tokio::time::sleep(std::time::Duration::from_millis(1)).await; - /// }.await; - /// - /// assert!(next_interval().total_park_count > 0); - /// } - /// ``` - /// - /// The below example triggers fewer than two parks in the multi-threaded runtime: - /// ``` - /// #[tokio::main(flavor = "multi_thread")] - /// async fn main() { - /// let handle = tokio::runtime::Handle::current(); - /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); - /// let mut intervals = monitor.intervals(); - /// let mut next_interval = || intervals.next().unwrap(); - /// - /// assert_eq!(next_interval().total_noop_count, 0); - /// - /// async { - /// tokio::time::sleep(std::time::Duration::from_millis(1)).await; - /// }.await; - /// - /// assert!(next_interval().total_noop_count > 0); - /// } - /// ``` - pub total_noop_count: u64, - - /// The maximum number of times any worker thread unparked but performed no work before parking - /// again. - /// - /// ##### Definition - /// This metric is derived from the maximum of - /// [`tokio::runtime::RuntimeMetrics::worker_noop_count`] across all worker threads. - /// - /// ##### See also - /// - [`RuntimeMetrics::total_noop_count`] - /// - [`RuntimeMetrics::min_noop_count`] - pub max_noop_count: u64, - - /// The minimum number of times any worker thread unparked but performed no work before parking - /// again. - /// - /// ##### Definition - /// This metric is derived from the minimum of - /// [`tokio::runtime::RuntimeMetrics::worker_noop_count`] across all worker threads. - /// - /// ##### See also - /// - [`RuntimeMetrics::total_noop_count`] - /// - [`RuntimeMetrics::max_noop_count`] - pub min_noop_count: u64, - - /// The number of tasks worker threads stole from another worker thread. - /// - /// The worker steal count increases by the amount of stolen tasks each time the worker - /// has processed its scheduled queue and successfully steals more pending tasks from another - /// worker. - /// - /// This metric only applies to the **multi-threaded** runtime and will always return `0` when - /// using the current thread runtime. - /// - /// ##### Definition - /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_steal_count`] for - /// all worker threads. - /// - /// ##### See also - /// - [`RuntimeMetrics::min_steal_count`] - /// - [`RuntimeMetrics::max_steal_count`] - /// - /// ##### Examples - /// In the below example, a blocking channel is used to backup one worker thread: - /// ``` - /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)] - /// async fn main() { - /// let handle = tokio::runtime::Handle::current(); - /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); - /// let mut intervals = monitor.intervals(); - /// let mut next_interval = || intervals.next().unwrap(); - /// - /// let interval = next_interval(); // end of first sampling interval - /// assert_eq!(interval.total_steal_count, 0); - /// assert_eq!(interval.min_steal_count, 0); - /// assert_eq!(interval.max_steal_count, 0); - /// - /// // induce a steal - /// async { - /// let (tx, rx) = std::sync::mpsc::channel(); - /// // Move to the runtime. - /// tokio::spawn(async move { - /// // Spawn the task that sends to the channel - /// tokio::spawn(async move { - /// tx.send(()).unwrap(); - /// }); - /// // Spawn a task that bumps the previous task out of the "next - /// // scheduled" slot. - /// tokio::spawn(async {}); - /// // Blocking receive on the channel. - /// rx.recv().unwrap(); - /// flush_metrics().await; - /// }).await.unwrap(); - /// flush_metrics().await; - /// }.await; - /// - /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2 - /// println!("total={}; min={}; max={}", interval.total_steal_count, interval.min_steal_count, interval.max_steal_count); - /// - /// let interval = { flush_metrics().await; next_interval() }; // end of interval 3 - /// println!("total={}; min={}; max={}", interval.total_steal_count, interval.min_steal_count, interval.max_steal_count); - /// } - /// - /// async fn flush_metrics() { - /// let _ = tokio::time::sleep(std::time::Duration::ZERO).await; - /// } - /// ``` - pub total_steal_count: u64, - - /// The maximum number of tasks any worker thread stole from another worker thread. - /// - /// ##### Definition - /// This metric is derived from the maximum of [`tokio::runtime::RuntimeMetrics::worker_steal_count`] - /// across all worker threads. - /// - /// ##### See also - /// - [`RuntimeMetrics::total_steal_count`] - /// - [`RuntimeMetrics::min_steal_count`] - pub max_steal_count: u64, - - /// The minimum number of tasks any worker thread stole from another worker thread. - /// - /// ##### Definition - /// This metric is derived from the minimum of [`tokio::runtime::RuntimeMetrics::worker_steal_count`] - /// across all worker threads. - /// - /// ##### See also - /// - [`RuntimeMetrics::total_steal_count`] - /// - [`RuntimeMetrics::max_steal_count`] - pub min_steal_count: u64, - - /// The number of times worker threads stole tasks from another worker thread. - /// - /// The worker steal operations increases by one each time the worker has processed its - /// scheduled queue and successfully steals more pending tasks from another worker. - /// - /// This metric only applies to the **multi-threaded** runtime and will always return `0` when - /// using the current thread runtime. - /// - /// ##### Definition - /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`] - /// for all worker threads. - /// - /// ##### See also - /// - [`RuntimeMetrics::min_steal_operations`] - /// - [`RuntimeMetrics::max_steal_operations`] - /// - /// ##### Examples - /// In the below example, a blocking channel is used to backup one worker thread: - /// ``` - /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)] - /// async fn main() { - /// let handle = tokio::runtime::Handle::current(); - /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); - /// let mut intervals = monitor.intervals(); - /// let mut next_interval = || intervals.next().unwrap(); - /// - /// let interval = next_interval(); // end of first sampling interval - /// assert_eq!(interval.total_steal_operations, 0); - /// assert_eq!(interval.min_steal_operations, 0); - /// assert_eq!(interval.max_steal_operations, 0); - /// - /// // induce a steal - /// async { - /// let (tx, rx) = std::sync::mpsc::channel(); - /// // Move to the runtime. - /// tokio::spawn(async move { - /// // Spawn the task that sends to the channel - /// tokio::spawn(async move { - /// tx.send(()).unwrap(); - /// }); - /// // Spawn a task that bumps the previous task out of the "next - /// // scheduled" slot. - /// tokio::spawn(async {}); - /// // Blocking receive on the channe. - /// rx.recv().unwrap(); - /// flush_metrics().await; - /// }).await.unwrap(); - /// flush_metrics().await; - /// }.await; - /// - /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2 - /// println!("total={}; min={}; max={}", interval.total_steal_operations, interval.min_steal_operations, interval.max_steal_operations); - /// - /// let interval = { flush_metrics().await; next_interval() }; // end of interval 3 - /// println!("total={}; min={}; max={}", interval.total_steal_operations, interval.min_steal_operations, interval.max_steal_operations); - /// } - /// - /// async fn flush_metrics() { - /// let _ = tokio::time::sleep(std::time::Duration::ZERO).await; - /// } - /// ``` - pub total_steal_operations: u64, - - /// The maximum number of times any worker thread stole tasks from another worker thread. - /// - /// ##### Definition - /// This metric is derived from the maximum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`] - /// across all worker threads. - /// - /// ##### See also - /// - [`RuntimeMetrics::total_steal_operations`] - /// - [`RuntimeMetrics::min_steal_operations`] - pub max_steal_operations: u64, - - /// The minimum number of times any worker thread stole tasks from another worker thread. - /// - /// ##### Definition - /// This metric is derived from the minimum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`] - /// across all worker threads. - /// - /// ##### See also - /// - [`RuntimeMetrics::total_steal_operations`] - /// - [`RuntimeMetrics::max_steal_operations`] - pub min_steal_operations: u64, - - /// The number of tasks scheduled from **outside** of the runtime. - /// - /// The remote schedule count increases by one each time a task is woken from **outside** of - /// the runtime. This usually means that a task is spawned or notified from a non-runtime - /// thread and must be queued using the Runtime's global queue, which tends to be slower. - /// - /// ##### Definition - /// This metric is derived from [`tokio::runtime::RuntimeMetrics::remote_schedule_count`]. - /// - /// ##### Examples - /// In the below example, a remote schedule is induced by spawning a system thread, then - /// spawning a tokio task from that system thread: - /// ``` - /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)] - /// async fn main() { - /// let handle = tokio::runtime::Handle::current(); - /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); - /// let mut intervals = monitor.intervals(); - /// let mut next_interval = || intervals.next().unwrap(); - /// - /// let interval = next_interval(); // end of first sampling interval - /// assert_eq!(interval.num_remote_schedules, 0); - /// - /// // spawn a non-runtime thread - /// std::thread::spawn(move || { - /// // spawn two tasks from this non-runtime thread - /// async move { - /// handle.spawn(async {}).await; - /// handle.spawn(async {}).await; - /// } - /// }).join().unwrap().await; - /// - /// let interval = next_interval(); // end of second sampling interval - /// assert_eq!(interval.num_remote_schedules, 2); - /// - /// let interval = next_interval(); // end of third sampling interval - /// assert_eq!(interval.num_remote_schedules, 0); - /// } - /// ``` - pub num_remote_schedules: u64, - - /// The number of tasks scheduled from worker threads. - /// - /// The local schedule count increases by one each time a task is woken from **inside** of the - /// runtime. This usually means that a task is spawned or notified from within a runtime thread - /// and will be queued on the worker-local queue. - /// - /// ##### Definition - /// This metric is derived from the sum of - /// [`tokio::runtime::RuntimeMetrics::worker_local_schedule_count`] across all worker threads. - /// - /// ##### See also - /// - [`RuntimeMetrics::min_local_schedule_count`] - /// - [`RuntimeMetrics::max_local_schedule_count`] - /// - /// ##### Examples - /// ###### With `current_thread` runtime - /// In the below example, two tasks are spawned from the context of a third tokio task: - /// ``` - /// #[tokio::main(flavor = "current_thread")] - /// async fn main() { - /// let handle = tokio::runtime::Handle::current(); - /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); - /// let mut intervals = monitor.intervals(); - /// let mut next_interval = || intervals.next().unwrap(); - /// - /// let interval = { flush_metrics().await; next_interval() }; // end interval 2 - /// assert_eq!(interval.total_local_schedule_count, 0); - /// - /// let task = async { - /// tokio::spawn(async {}); // local schedule 1 - /// tokio::spawn(async {}); // local schedule 2 - /// }; - /// - /// let handle = tokio::spawn(task); // local schedule 3 - /// - /// let interval = { flush_metrics().await; next_interval() }; // end interval 2 - /// assert_eq!(interval.total_local_schedule_count, 3); - /// - /// let _ = handle.await; - /// - /// let interval = { flush_metrics().await; next_interval() }; // end interval 3 - /// assert_eq!(interval.total_local_schedule_count, 0); - /// } - /// - /// async fn flush_metrics() { - /// tokio::task::yield_now().await; - /// } - /// ``` - /// - /// ###### With `multi_thread` runtime - /// In the below example, 100 tasks are spawned: - /// ``` - /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)] - /// async fn main() { - /// let handle = tokio::runtime::Handle::current(); - /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); - /// let mut intervals = monitor.intervals(); - /// let mut next_interval = || intervals.next().unwrap(); - /// - /// let interval = next_interval(); // end of interval 1 - /// assert_eq!(interval.total_local_schedule_count, 0); - /// - /// use std::sync::atomic::{AtomicBool, Ordering}; - /// static SPINLOCK: AtomicBool = AtomicBool::new(true); - /// - /// // block the other worker thread - /// tokio::spawn(async { - /// while SPINLOCK.load(Ordering::SeqCst) {} - /// }); - /// - /// // FIXME: why does this need to be in a `spawn`? - /// let _ = tokio::spawn(async { - /// // spawn 100 tasks - /// for _ in 0..100 { - /// tokio::spawn(async {}); - /// } - /// // this spawns 1 more task - /// flush_metrics().await; - /// }).await; - /// - /// // unblock the other worker thread - /// SPINLOCK.store(false, Ordering::SeqCst); - /// - /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2 - /// assert_eq!(interval.total_local_schedule_count, 100 + 1); - /// } - /// - /// async fn flush_metrics() { - /// let _ = tokio::time::sleep(std::time::Duration::ZERO).await; - /// } - /// ``` - pub total_local_schedule_count: u64, - - /// The maximum number of tasks scheduled from any one worker thread. - /// - /// ##### Definition - /// This metric is derived from the maximum of - /// [`tokio::runtime::RuntimeMetrics::worker_local_schedule_count`] for all worker threads. - /// - /// ##### See also - /// - [`RuntimeMetrics::total_local_schedule_count`] - /// - [`RuntimeMetrics::min_local_schedule_count`] - pub max_local_schedule_count: u64, - - /// The minimum number of tasks scheduled from any one worker thread. - /// - /// ##### Definition - /// This metric is derived from the minimum of - /// [`tokio::runtime::RuntimeMetrics::worker_local_schedule_count`] for all worker threads. - /// - /// ##### See also - /// - [`RuntimeMetrics::total_local_schedule_count`] - /// - [`RuntimeMetrics::max_local_schedule_count`] - pub min_local_schedule_count: u64, - - /// The number of times worker threads saturated their local queues. - /// - /// The worker steal count increases by one each time the worker attempts to schedule a task - /// locally, but its local queue is full. When this happens, half of the - /// local queue is moved to the global queue. - /// - /// This metric only applies to the **multi-threaded** scheduler. - /// - /// ##### Definition - /// This metric is derived from the sum of - /// [`tokio::runtime::RuntimeMetrics::worker_overflow_count`] across all worker threads. - /// - /// ##### See also - /// - [`RuntimeMetrics::min_overflow_count`] - /// - [`RuntimeMetrics::max_overflow_count`] - /// - /// ##### Examples - /// ``` - /// #[tokio::main(flavor = "multi_thread", worker_threads = 1)] - /// async fn main() { - /// let handle = tokio::runtime::Handle::current(); - /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); - /// let mut intervals = monitor.intervals(); - /// let mut next_interval = || intervals.next().unwrap(); - /// - /// let interval = next_interval(); // end of interval 1 - /// assert_eq!(interval.total_overflow_count, 0); - /// - /// use std::sync::atomic::{AtomicBool, Ordering}; - /// - /// // spawn a ton of tasks - /// let _ = tokio::spawn(async { - /// // we do this in a `tokio::spawn` because it is impossible to - /// // overflow the main task - /// for _ in 0..300 { - /// tokio::spawn(async {}); - /// } - /// }).await; - /// - /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2 - /// assert_eq!(interval.total_overflow_count, 1); - /// } - /// - /// async fn flush_metrics() { - /// let _ = tokio::time::sleep(std::time::Duration::from_millis(1)).await; - /// } - /// ``` - pub total_overflow_count: u64, - - /// The maximum number of times any one worker saturated its local queue. - /// - /// ##### Definition - /// This metric is derived from the maximum of - /// [`tokio::runtime::RuntimeMetrics::worker_overflow_count`] across all worker threads. - /// - /// ##### See also - /// - [`RuntimeMetrics::total_overflow_count`] - /// - [`RuntimeMetrics::min_overflow_count`] - pub max_overflow_count: u64, - - /// The minimum number of times any one worker saturated its local queue. - /// - /// ##### Definition - /// This metric is derived from the maximum of - /// [`tokio::runtime::RuntimeMetrics::worker_overflow_count`] across all worker threads. - /// - /// ##### See also - /// - [`RuntimeMetrics::total_overflow_count`] - /// - [`RuntimeMetrics::max_overflow_count`] - pub min_overflow_count: u64, - - /// The number of tasks that have been polled across all worker threads. - /// - /// The worker poll count increases by one each time a worker polls a scheduled task. - /// - /// ##### Definition - /// This metric is derived from the sum of - /// [`tokio::runtime::RuntimeMetrics::worker_poll_count`] across all worker threads. - /// - /// ##### See also - /// - [`RuntimeMetrics::min_polls_count`] - /// - [`RuntimeMetrics::max_polls_count`] - /// - /// ##### Examples - /// In the below example, 42 tasks are spawned and polled: - /// ``` - /// #[tokio::main(flavor = "current_thread")] - /// async fn main() { - /// let handle = tokio::runtime::Handle::current(); - /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); - /// let mut intervals = monitor.intervals(); - /// let mut next_interval = || intervals.next().unwrap(); - /// - /// let interval = { flush_metrics().await; next_interval() }; // end of interval 1 - /// assert_eq!(interval.total_polls_count, 0); - /// assert_eq!(interval.min_polls_count, 0); - /// assert_eq!(interval.max_polls_count, 0); - /// - /// const N: u64 = 42; - /// - /// for _ in 0..N { - /// let _ = tokio::spawn(async {}).await; - /// } - /// - /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2 - /// assert_eq!(interval.total_polls_count, N); - /// assert_eq!(interval.min_polls_count, N); - /// assert_eq!(interval.max_polls_count, N); - /// } - /// - /// async fn flush_metrics() { - /// let _ = tokio::task::yield_now().await; - /// } - /// ``` - pub total_polls_count: u64, - - /// The maximum number of tasks that have been polled in any worker thread. - /// - /// ##### Definition - /// This metric is derived from the maximum of - /// [`tokio::runtime::RuntimeMetrics::worker_poll_count`] across all worker threads. - /// - /// ##### See also - /// - [`RuntimeMetrics::total_polls_count`] - /// - [`RuntimeMetrics::min_polls_count`] - pub max_polls_count: u64, - - /// The minimum number of tasks that have been polled in any worker thread. - /// - /// ##### Definition - /// This metric is derived from the minimum of - /// [`tokio::runtime::RuntimeMetrics::worker_poll_count`] across all worker threads. - /// - /// ##### See also - /// - [`RuntimeMetrics::total_polls_count`] - /// - [`RuntimeMetrics::max_polls_count`] - pub min_polls_count: u64, - - /// The amount of time worker threads were busy. - /// - /// The worker busy duration increases whenever the worker is spending time processing work. - /// Using this value can indicate the total load of workers. - /// - /// ##### Definition - /// This metric is derived from the sum of - /// [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`] across all worker threads. - /// - /// ##### See also - /// - [`RuntimeMetrics::min_busy_duration`] - /// - [`RuntimeMetrics::max_busy_duration`] - /// - /// ##### Examples - /// In the below example, tasks spend a total of 3s busy: - /// ``` - /// use tokio::time::Duration; - /// - /// fn main() { - /// let start = tokio::time::Instant::now(); - /// - /// let rt = tokio::runtime::Builder::new_current_thread() - /// .enable_all() - /// .build() - /// .unwrap(); - /// - /// let handle = rt.handle(); - /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); - /// let mut intervals = monitor.intervals(); - /// let mut next_interval = || intervals.next().unwrap(); - /// - /// let delay_1s = Duration::from_secs(1); - /// let delay_3s = Duration::from_secs(3); - /// - /// rt.block_on(async { - /// // keep the main task busy for 1s - /// spin_for(delay_1s); - /// - /// // spawn a task and keep it busy for 2s - /// let _ = tokio::spawn(async move { - /// spin_for(delay_3s); - /// }).await; - /// }); - /// - /// // flush metrics - /// drop(rt); - /// - /// let elapsed = start.elapsed(); - /// - /// let interval = next_interval(); // end of interval 2 - /// assert!(interval.total_busy_duration >= delay_1s + delay_3s); - /// assert!(interval.total_busy_duration <= elapsed); - /// } - /// - /// fn time(task: F) -> Duration - /// where - /// F: Fn() -> () - /// { - /// let start = tokio::time::Instant::now(); - /// task(); - /// start.elapsed() - /// } - /// - /// /// Block the current thread for a given `duration`. - /// fn spin_for(duration: Duration) { - /// let start = tokio::time::Instant::now(); - /// while start.elapsed() <= duration {} - /// } - /// ``` - /// - /// Busy times may not accumulate as the above example suggests (FIXME: Why?); e.g., if we - /// remove the three second delay, the time spent busy falls to mere microseconds: - /// ```should_panic - /// use tokio::time::Duration; - /// - /// fn main() { - /// let rt = tokio::runtime::Builder::new_current_thread() - /// .enable_all() - /// .build() - /// .unwrap(); - /// - /// let handle = rt.handle(); - /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); - /// let mut intervals = monitor.intervals(); - /// let mut next_interval = || intervals.next().unwrap(); - /// - /// let delay_1s = Duration::from_secs(1); - /// - /// let elapsed = time(|| rt.block_on(async { - /// // keep the main task busy for 1s - /// spin_for(delay_1s); - /// })); - /// - /// // flush metrics - /// drop(rt); - /// - /// let interval = next_interval(); // end of interval 2 - /// assert!(interval.total_busy_duration >= delay_1s); // FAIL - /// assert!(interval.total_busy_duration <= elapsed); - /// } - /// - /// fn time(task: F) -> Duration - /// where - /// F: Fn() -> () - /// { - /// let start = tokio::time::Instant::now(); - /// task(); - /// start.elapsed() - /// } - /// - /// /// Block the current thread for a given `duration`. - /// fn spin_for(duration: Duration) { - /// let start = tokio::time::Instant::now(); - /// while start.elapsed() <= duration {} - /// } - /// ``` - pub total_busy_duration: Duration, - - /// The maximum amount of time a worker thread was busy. - /// - /// ##### Definition - /// This metric is derived from the maximum of - /// [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`] across all worker threads. - /// - /// ##### See also - /// - [`RuntimeMetrics::total_busy_duration`] - /// - [`RuntimeMetrics::min_busy_duration`] - pub max_busy_duration: Duration, - - /// The minimum amount of time a worker thread was busy. - /// - /// ##### Definition - /// This metric is derived from the minimum of - /// [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`] across all worker threads. - /// - /// ##### See also - /// - [`RuntimeMetrics::total_busy_duration`] - /// - [`RuntimeMetrics::max_busy_duration`] - pub min_busy_duration: Duration, - - /// The number of tasks currently scheduled in the runtime's global queue. - /// - /// Tasks that are spawned or notified from a non-runtime thread are scheduled using the - /// runtime's global queue. This metric returns the **current** number of tasks pending in - /// the global queue. As such, the returned value may increase or decrease as new tasks are - /// scheduled and processed. - /// - /// ##### Definition - /// This metric is derived from [`tokio::runtime::RuntimeMetrics::global_queue_depth`]. - /// - /// ##### Example - /// ``` - /// # let current_thread = tokio::runtime::Builder::new_current_thread() - /// # .enable_all() - /// # .build() - /// # .unwrap(); - /// # - /// # let multi_thread = tokio::runtime::Builder::new_multi_thread() - /// # .worker_threads(2) - /// # .enable_all() - /// # .build() - /// # .unwrap(); - /// # - /// # for runtime in [current_thread, multi_thread] { - /// let handle = runtime.handle().clone(); - /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); - /// let mut intervals = monitor.intervals(); - /// let mut next_interval = || intervals.next().unwrap(); - /// - /// let interval = next_interval(); // end of interval 1 - /// assert_eq!(interval.num_remote_schedules, 0); - /// - /// // spawn a system thread outside of the runtime - /// std::thread::spawn(move || { - /// // spawn two tasks from this non-runtime thread - /// handle.spawn(async {}); - /// handle.spawn(async {}); - /// }).join().unwrap(); - /// - /// // flush metrics - /// drop(runtime); - /// - /// let interval = next_interval(); // end of interval 2 - /// assert_eq!(interval.num_remote_schedules, 2); - /// # } - /// ``` - pub global_queue_depth: usize, - - /// The total number of tasks currently scheduled in workers' local queues. - /// - /// Tasks that are spawned or notified from within a runtime thread are scheduled using that - /// worker's local queue. This metric returns the **current** number of tasks pending in all - /// workers' local queues. As such, the returned value may increase or decrease as new tasks - /// are scheduled and processed. - /// - /// ##### Definition - /// This metric is derived from [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`]. - /// - /// ##### See also - /// - [`RuntimeMetrics::min_local_queue_depth`] - /// - [`RuntimeMetrics::max_local_queue_depth`] - /// - /// ##### Example - /// - /// ###### With `current_thread` runtime - /// The below example spawns 100 tasks: - /// ``` - /// #[tokio::main(flavor = "current_thread")] - /// async fn main() { - /// const N: usize = 100; - /// - /// let handle = tokio::runtime::Handle::current(); - /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); - /// let mut intervals = monitor.intervals(); - /// let mut next_interval = || intervals.next().unwrap(); - /// - /// let interval = next_interval(); // end of interval 1 - /// assert_eq!(interval.total_local_queue_depth, 0); - /// - /// - /// for _ in 0..N { - /// tokio::spawn(async {}); - /// } - /// let interval = next_interval(); // end of interval 2 - /// assert_eq!(interval.total_local_queue_depth, N); - /// } - /// ``` - /// - /// ###### With `multi_thread` runtime - /// The below example spawns 100 tasks: - /// ``` - /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)] - /// async fn main() { - /// const N: usize = 100; - /// - /// let handle = tokio::runtime::Handle::current(); - /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); - /// let mut intervals = monitor.intervals(); - /// let mut next_interval = || intervals.next().unwrap(); - /// - /// let interval = next_interval(); // end of interval 1 - /// assert_eq!(interval.total_local_queue_depth, 0); - /// - /// use std::sync::atomic::{AtomicBool, Ordering}; - /// static SPINLOCK_A: AtomicBool = AtomicBool::new(true); - /// - /// // block the other worker thread - /// tokio::spawn(async { - /// while SPINLOCK_A.load(Ordering::SeqCst) {} - /// }); - /// - /// static SPINLOCK_B: AtomicBool = AtomicBool::new(true); - /// - /// let _ = tokio::spawn(async { - /// for _ in 0..N { - /// tokio::spawn(async { - /// while SPINLOCK_B.load(Ordering::SeqCst) {} - /// }); - /// } - /// }).await; - /// - /// // unblock the other worker thread - /// SPINLOCK_A.store(false, Ordering::SeqCst); - /// - /// let interval = next_interval(); // end of interval 2 - /// assert_eq!(interval.total_local_queue_depth, N - 1); - /// - /// SPINLOCK_B.store(false, Ordering::SeqCst); - /// } - /// ``` - pub total_local_queue_depth: usize, - - /// The maximum number of tasks currently scheduled any worker's local queue. - /// - /// ##### Definition - /// This metric is derived from the maximum of - /// [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`] across all worker threads. - /// - /// ##### See also - /// - [`RuntimeMetrics::total_local_queue_depth`] - /// - [`RuntimeMetrics::min_local_queue_depth`] - pub max_local_queue_depth: usize, - - /// The minimum number of tasks currently scheduled any worker's local queue. - /// - /// ##### Definition - /// This metric is derived from the minimum of - /// [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`] across all worker threads. - /// - /// ##### See also - /// - [`RuntimeMetrics::total_local_queue_depth`] - /// - [`RuntimeMetrics::max_local_queue_depth`] - pub min_local_queue_depth: usize, - - /// The number of tasks currently waiting to be executed in the runtime's blocking threadpool. - /// - /// ##### Definition - /// This metric is derived from [`tokio::runtime::RuntimeMetrics::blocking_queue_depth`]. - pub blocking_queue_depth: usize, - - /// The current number of alive tasks in the runtime. - /// - /// ##### Definition - /// This metric is derived from [`tokio::runtime::RuntimeMetrics::num_alive_tasks`]. - pub live_tasks_count: usize, - - /// The number of additional threads spawned by the runtime. - /// - /// ##### Definition - /// This metric is derived from [`tokio::runtime::RuntimeMetrics::num_blocking_threads`]. - pub blocking_threads_count: usize, - - /// The number of idle threads, which have spawned by the runtime for `spawn_blocking` calls. - /// - /// ##### Definition - /// This metric is derived from [`tokio::runtime::RuntimeMetrics::num_idle_blocking_threads`]. - pub idle_blocking_threads_count: usize, - - /// Total amount of time elapsed since observing runtime metrics. - pub elapsed: Duration, - - /// Returns the number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets. - /// - /// This count starts at zero when the runtime is created and increases by one each time a task yields due to exhausting its budget. - /// - /// The counter is monotonically increasing. It is never decremented or reset to zero. - /// - /// ##### Definition - /// This metric is derived from [`tokio::runtime::RuntimeMetrics::budget_forced_yield_count`]. - pub budget_forced_yield_count: u64, +macro_rules! define_runtime_metrics { + ( + stable { + $( + $(#[$($attributes:tt)*])* + $vis:vis $name:ident: $ty:ty + ),* + $(,)? + } + unstable { + $( + $(#[$($unstable_attributes:tt)*])* + $unstable_vis:vis $unstable_name:ident: $unstable_ty:ty + ),* + $(,)? + } + ) => { + /// Key runtime metrics. + #[non_exhaustive] + #[derive(Default, Debug, Clone)] + pub struct RuntimeMetrics { + $( + $(#[$($attributes)*])* + #[cfg_attr(docsrs, doc(cfg(feature = "rt")))] + $vis $name: $ty, + )* + $( + $(#[$($unstable_attributes)*])* + #[cfg(tokio_unstable)] + #[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))] + $unstable_vis $unstable_name: $unstable_ty, + )* + } + }; +} - /// Returns the number of ready events processed by the runtime’s I/O driver. - /// - /// ##### Definition - /// This metric is derived from [`tokio::runtime::RuntimeMetrics::io_driver_ready_count`]. - pub io_driver_ready_count: u64, +define_runtime_metrics! { + stable { + /// The number of worker threads used by the runtime. + /// + /// This metric is static for a runtime. + /// + /// This metric is always equal to [`tokio::runtime::RuntimeMetrics::num_workers`]. + /// When using the `current_thread` runtime, the return value is always `1`. + /// + /// The number of workers is set by configuring + /// [`worker_threads`][`tokio::runtime::Builder::worker_threads`] with + /// [`tokio::runtime::Builder`], or by parameterizing [`tokio::main`]. + /// + /// ##### Examples + /// In the below example, the number of workers is set by parameterizing [`tokio::main`]: + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main(flavor = "multi_thread", worker_threads = 10)] + /// async fn main() { + /// let handle = tokio::runtime::Handle::current(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// assert_eq!(next_interval().workers_count, 10); + /// } + /// ``` + /// + /// [`tokio::main`]: https://docs.rs/tokio/latest/tokio/attr.main.html + /// + /// When using the `current_thread` runtime, the return value is always `1`; e.g.: + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main(flavor = "current_thread")] + /// async fn main() { + /// let handle = tokio::runtime::Handle::current(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// assert_eq!(next_interval().workers_count, 1); + /// } + /// ``` + /// + /// This metric is always equal to [`tokio::runtime::RuntimeMetrics::num_workers`]; e.g.: + /// ``` + /// use tokio::runtime::Handle; + /// + /// #[tokio::main] + /// async fn main() { + /// let handle = Handle::current(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// assert_eq!(next_interval().workers_count, handle.metrics().num_workers()); + /// } + /// ``` + pub workers_count: usize, + + /// The number of times worker threads parked. + /// + /// The worker park count increases by one each time the worker parks the thread waiting for + /// new inbound events to process. This usually means the worker has processed all pending work + /// and is currently idle. + /// + /// ##### Definition + /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_park_count`] + /// across all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::max_park_count`] + /// - [`RuntimeMetrics::min_park_count`] + /// + /// ##### Examples + /// ``` + /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)] + /// async fn main() { + /// let handle = tokio::runtime::Handle::current(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// let interval = next_interval(); // end of interval 1 + /// assert_eq!(interval.total_park_count, 0); + /// + /// induce_parks().await; + /// + /// let interval = next_interval(); // end of interval 2 + /// assert!(interval.total_park_count >= 1); // usually 1 or 2 parks + /// } + /// + /// async fn induce_parks() { + /// let _ = tokio::time::timeout(std::time::Duration::ZERO, async { + /// loop { tokio::task::yield_now().await; } + /// }).await; + /// } + /// ``` + pub total_park_count: u64, + + /// The maximum number of times any worker thread parked. + /// + /// ##### Definition + /// This metric is derived from the maximum of + /// [`tokio::runtime::RuntimeMetrics::worker_park_count`] across all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::total_park_count`] + /// - [`RuntimeMetrics::min_park_count`] + pub max_park_count: u64, + + /// The minimum number of times any worker thread parked. + /// + /// ##### Definition + /// This metric is derived from the maximum of + /// [`tokio::runtime::RuntimeMetrics::worker_park_count`] across all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::total_park_count`] + /// - [`RuntimeMetrics::max_park_count`] + pub min_park_count: u64, + + /// The amount of time worker threads were busy. + /// + /// The worker busy duration increases whenever the worker is spending time processing work. + /// Using this value can indicate the total load of workers. + /// + /// ##### Definition + /// This metric is derived from the sum of + /// [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`] across all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::min_busy_duration`] + /// - [`RuntimeMetrics::max_busy_duration`] + /// + /// ##### Examples + /// In the below example, tasks spend a total of 3s busy: + /// ``` + /// use tokio::time::Duration; + /// + /// fn main() { + /// let start = tokio::time::Instant::now(); + /// + /// let rt = tokio::runtime::Builder::new_current_thread() + /// .enable_all() + /// .build() + /// .unwrap(); + /// + /// let handle = rt.handle(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// let delay_1s = Duration::from_secs(1); + /// let delay_3s = Duration::from_secs(3); + /// + /// rt.block_on(async { + /// // keep the main task busy for 1s + /// spin_for(delay_1s); + /// + /// // spawn a task and keep it busy for 2s + /// let _ = tokio::spawn(async move { + /// spin_for(delay_3s); + /// }).await; + /// }); + /// + /// // flush metrics + /// drop(rt); + /// + /// let elapsed = start.elapsed(); + /// + /// let interval = next_interval(); // end of interval 2 + /// assert!(interval.total_busy_duration >= delay_1s + delay_3s); + /// assert!(interval.total_busy_duration <= elapsed); + /// } + /// + /// fn time(task: F) -> Duration + /// where + /// F: Fn() -> () + /// { + /// let start = tokio::time::Instant::now(); + /// task(); + /// start.elapsed() + /// } + /// + /// /// Block the current thread for a given `duration`. + /// fn spin_for(duration: Duration) { + /// let start = tokio::time::Instant::now(); + /// while start.elapsed() <= duration {} + /// } + /// ``` + /// + /// Busy times may not accumulate as the above example suggests (FIXME: Why?); e.g., if we + /// remove the three second delay, the time spent busy falls to mere microseconds: + /// ```should_panic + /// use tokio::time::Duration; + /// + /// fn main() { + /// let rt = tokio::runtime::Builder::new_current_thread() + /// .enable_all() + /// .build() + /// .unwrap(); + /// + /// let handle = rt.handle(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// let delay_1s = Duration::from_secs(1); + /// + /// let elapsed = time(|| rt.block_on(async { + /// // keep the main task busy for 1s + /// spin_for(delay_1s); + /// })); + /// + /// // flush metrics + /// drop(rt); + /// + /// let interval = next_interval(); // end of interval 2 + /// assert!(interval.total_busy_duration >= delay_1s); // FAIL + /// assert!(interval.total_busy_duration <= elapsed); + /// } + /// + /// fn time(task: F) -> Duration + /// where + /// F: Fn() -> () + /// { + /// let start = tokio::time::Instant::now(); + /// task(); + /// start.elapsed() + /// } + /// + /// /// Block the current thread for a given `duration`. + /// fn spin_for(duration: Duration) { + /// let start = tokio::time::Instant::now(); + /// while start.elapsed() <= duration {} + /// } + /// ``` + pub total_busy_duration: Duration, + + /// The maximum amount of time a worker thread was busy. + /// + /// ##### Definition + /// This metric is derived from the maximum of + /// [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`] across all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::total_busy_duration`] + /// - [`RuntimeMetrics::min_busy_duration`] + pub max_busy_duration: Duration, + + /// The minimum amount of time a worker thread was busy. + /// + /// ##### Definition + /// This metric is derived from the minimum of + /// [`tokio::runtime::RuntimeMetrics::worker_total_busy_duration`] across all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::total_busy_duration`] + /// - [`RuntimeMetrics::max_busy_duration`] + pub min_busy_duration: Duration, + + /// The number of tasks currently scheduled in the runtime's global queue. + /// + /// Tasks that are spawned or notified from a non-runtime thread are scheduled using the + /// runtime's global queue. This metric returns the **current** number of tasks pending in + /// the global queue. As such, the returned value may increase or decrease as new tasks are + /// scheduled and processed. + /// + /// ##### Definition + /// This metric is derived from [`tokio::runtime::RuntimeMetrics::global_queue_depth`]. + /// + /// ##### Example + /// ``` + /// # let current_thread = tokio::runtime::Builder::new_current_thread() + /// # .enable_all() + /// # .build() + /// # .unwrap(); + /// # + /// # let multi_thread = tokio::runtime::Builder::new_multi_thread() + /// # .worker_threads(2) + /// # .enable_all() + /// # .build() + /// # .unwrap(); + /// # + /// # for runtime in [current_thread, multi_thread] { + /// let handle = runtime.handle().clone(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// let interval = next_interval(); // end of interval 1 + /// # #[cfg(tokio_unstable)] + /// assert_eq!(interval.num_remote_schedules, 0); + /// + /// // spawn a system thread outside of the runtime + /// std::thread::spawn(move || { + /// // spawn two tasks from this non-runtime thread + /// handle.spawn(async {}); + /// handle.spawn(async {}); + /// }).join().unwrap(); + /// + /// // flush metrics + /// drop(runtime); + /// + /// let interval = next_interval(); // end of interval 2 + /// # #[cfg(tokio_unstable)] + /// assert_eq!(interval.num_remote_schedules, 2); + /// # } + /// ``` + pub global_queue_depth: usize, + + /// Total amount of time elapsed since observing runtime metrics. + pub elapsed: Duration, + } + unstable { + /// The average duration of a single invocation of poll on a task. + /// + /// This average is an exponentially-weighted moving average of the duration + /// of task polls on all runtime workers. + /// + /// ##### Examples + /// ``` + /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)] + /// async fn main() { + /// let handle = tokio::runtime::Handle::current(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// let interval = next_interval(); + /// println!("mean task poll duration is {:?}", interval.mean_poll_duration); + /// } + /// ``` + pub mean_poll_duration: Duration, + + /// The average duration of a single invocation of poll on a task on the + /// worker with the lowest value. + /// + /// This average is an exponentially-weighted moving average of the duration + /// of task polls on the runtime worker with the lowest value. + /// + /// ##### Examples + /// ``` + /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)] + /// async fn main() { + /// let handle = tokio::runtime::Handle::current(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// let interval = next_interval(); + /// println!("min mean task poll duration is {:?}", interval.mean_poll_duration_worker_min); + /// } + /// ``` + pub mean_poll_duration_worker_min: Duration, + + /// The average duration of a single invocation of poll on a task on the + /// worker with the highest value. + /// + /// This average is an exponentially-weighted moving average of the duration + /// of task polls on the runtime worker with the highest value. + /// + /// ##### Examples + /// ``` + /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)] + /// async fn main() { + /// let handle = tokio::runtime::Handle::current(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// let interval = next_interval(); + /// println!("max mean task poll duration is {:?}", interval.mean_poll_duration_worker_max); + /// } + /// ``` + pub mean_poll_duration_worker_max: Duration, + + /// A histogram of task polls since the previous probe grouped by poll + /// times. + /// + /// This metric must be explicitly enabled when creating the runtime with + /// [`enable_metrics_poll_time_histogram`][tokio::runtime::Builder::enable_metrics_poll_time_histogram]. + /// Bucket sizes are fixed and configured at the runtime level. See + /// configuration options on + /// [`runtime::Builder`][tokio::runtime::Builder::enable_metrics_poll_time_histogram]. + /// + /// ##### Examples + /// ``` + /// use tokio::runtime::HistogramConfiguration; + /// use std::time::Duration; + /// + /// let config = HistogramConfiguration::linear(Duration::from_micros(50), 12); + /// + /// let rt = tokio::runtime::Builder::new_multi_thread() + /// .enable_metrics_poll_time_histogram() + /// .metrics_poll_time_histogram_configuration(config) + /// .build() + /// .unwrap(); + /// + /// rt.block_on(async { + /// let handle = tokio::runtime::Handle::current(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// let interval = next_interval(); + /// println!("poll count histogram {:?}", interval.poll_time_histogram); + /// }); + /// ``` + pub poll_time_histogram: Vec, + + /// The number of times worker threads unparked but performed no work before parking again. + /// + /// The worker no-op count increases by one each time the worker unparks the thread but finds + /// no new work and goes back to sleep. This indicates a false-positive wake up. + /// + /// ##### Definition + /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_noop_count`] + /// across all worker threads. + /// + /// ##### Examples + /// Unfortunately, there isn't a great way to reliably induce no-op parks, as they occur as + /// false-positive events under concurrency. + /// + /// The below example triggers fewer than two parks in the single-threaded runtime: + /// ``` + /// #[tokio::main(flavor = "current_thread")] + /// async fn main() { + /// let handle = tokio::runtime::Handle::current(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// assert_eq!(next_interval().total_park_count, 0); + /// + /// async { + /// tokio::time::sleep(std::time::Duration::from_millis(1)).await; + /// }.await; + /// + /// assert!(next_interval().total_park_count > 0); + /// } + /// ``` + /// + /// The below example triggers fewer than two parks in the multi-threaded runtime: + /// ``` + /// #[tokio::main(flavor = "multi_thread")] + /// async fn main() { + /// let handle = tokio::runtime::Handle::current(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// assert_eq!(next_interval().total_noop_count, 0); + /// + /// async { + /// tokio::time::sleep(std::time::Duration::from_millis(1)).await; + /// }.await; + /// + /// assert!(next_interval().total_noop_count > 0); + /// } + /// ``` + pub total_noop_count: u64, + + /// The maximum number of times any worker thread unparked but performed no work before parking + /// again. + /// + /// ##### Definition + /// This metric is derived from the maximum of + /// [`tokio::runtime::RuntimeMetrics::worker_noop_count`] across all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::total_noop_count`] + /// - [`RuntimeMetrics::min_noop_count`] + pub max_noop_count: u64, + + /// The minimum number of times any worker thread unparked but performed no work before parking + /// again. + /// + /// ##### Definition + /// This metric is derived from the minimum of + /// [`tokio::runtime::RuntimeMetrics::worker_noop_count`] across all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::total_noop_count`] + /// - [`RuntimeMetrics::max_noop_count`] + pub min_noop_count: u64, + + /// The number of tasks worker threads stole from another worker thread. + /// + /// The worker steal count increases by the amount of stolen tasks each time the worker + /// has processed its scheduled queue and successfully steals more pending tasks from another + /// worker. + /// + /// This metric only applies to the **multi-threaded** runtime and will always return `0` when + /// using the current thread runtime. + /// + /// ##### Definition + /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_steal_count`] for + /// all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::min_steal_count`] + /// - [`RuntimeMetrics::max_steal_count`] + /// + /// ##### Examples + /// In the below example, a blocking channel is used to backup one worker thread: + /// ``` + /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)] + /// async fn main() { + /// let handle = tokio::runtime::Handle::current(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// let interval = next_interval(); // end of first sampling interval + /// assert_eq!(interval.total_steal_count, 0); + /// assert_eq!(interval.min_steal_count, 0); + /// assert_eq!(interval.max_steal_count, 0); + /// + /// // induce a steal + /// async { + /// let (tx, rx) = std::sync::mpsc::channel(); + /// // Move to the runtime. + /// tokio::spawn(async move { + /// // Spawn the task that sends to the channel + /// tokio::spawn(async move { + /// tx.send(()).unwrap(); + /// }); + /// // Spawn a task that bumps the previous task out of the "next + /// // scheduled" slot. + /// tokio::spawn(async {}); + /// // Blocking receive on the channel. + /// rx.recv().unwrap(); + /// flush_metrics().await; + /// }).await.unwrap(); + /// flush_metrics().await; + /// }.await; + /// + /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2 + /// println!("total={}; min={}; max={}", interval.total_steal_count, interval.min_steal_count, interval.max_steal_count); + /// + /// let interval = { flush_metrics().await; next_interval() }; // end of interval 3 + /// println!("total={}; min={}; max={}", interval.total_steal_count, interval.min_steal_count, interval.max_steal_count); + /// } + /// + /// async fn flush_metrics() { + /// let _ = tokio::time::sleep(std::time::Duration::ZERO).await; + /// } + /// ``` + pub total_steal_count: u64, + + /// The maximum number of tasks any worker thread stole from another worker thread. + /// + /// ##### Definition + /// This metric is derived from the maximum of [`tokio::runtime::RuntimeMetrics::worker_steal_count`] + /// across all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::total_steal_count`] + /// - [`RuntimeMetrics::min_steal_count`] + pub max_steal_count: u64, + + /// The minimum number of tasks any worker thread stole from another worker thread. + /// + /// ##### Definition + /// This metric is derived from the minimum of [`tokio::runtime::RuntimeMetrics::worker_steal_count`] + /// across all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::total_steal_count`] + /// - [`RuntimeMetrics::max_steal_count`] + pub min_steal_count: u64, + + /// The number of times worker threads stole tasks from another worker thread. + /// + /// The worker steal operations increases by one each time the worker has processed its + /// scheduled queue and successfully steals more pending tasks from another worker. + /// + /// This metric only applies to the **multi-threaded** runtime and will always return `0` when + /// using the current thread runtime. + /// + /// ##### Definition + /// This metric is derived from the sum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`] + /// for all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::min_steal_operations`] + /// - [`RuntimeMetrics::max_steal_operations`] + /// + /// ##### Examples + /// In the below example, a blocking channel is used to backup one worker thread: + /// ``` + /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)] + /// async fn main() { + /// let handle = tokio::runtime::Handle::current(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// let interval = next_interval(); // end of first sampling interval + /// assert_eq!(interval.total_steal_operations, 0); + /// assert_eq!(interval.min_steal_operations, 0); + /// assert_eq!(interval.max_steal_operations, 0); + /// + /// // induce a steal + /// async { + /// let (tx, rx) = std::sync::mpsc::channel(); + /// // Move to the runtime. + /// tokio::spawn(async move { + /// // Spawn the task that sends to the channel + /// tokio::spawn(async move { + /// tx.send(()).unwrap(); + /// }); + /// // Spawn a task that bumps the previous task out of the "next + /// // scheduled" slot. + /// tokio::spawn(async {}); + /// // Blocking receive on the channe. + /// rx.recv().unwrap(); + /// flush_metrics().await; + /// }).await.unwrap(); + /// flush_metrics().await; + /// }.await; + /// + /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2 + /// println!("total={}; min={}; max={}", interval.total_steal_operations, interval.min_steal_operations, interval.max_steal_operations); + /// + /// let interval = { flush_metrics().await; next_interval() }; // end of interval 3 + /// println!("total={}; min={}; max={}", interval.total_steal_operations, interval.min_steal_operations, interval.max_steal_operations); + /// } + /// + /// async fn flush_metrics() { + /// let _ = tokio::time::sleep(std::time::Duration::ZERO).await; + /// } + /// ``` + pub total_steal_operations: u64, + + /// The maximum number of times any worker thread stole tasks from another worker thread. + /// + /// ##### Definition + /// This metric is derived from the maximum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`] + /// across all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::total_steal_operations`] + /// - [`RuntimeMetrics::min_steal_operations`] + pub max_steal_operations: u64, + + /// The minimum number of times any worker thread stole tasks from another worker thread. + /// + /// ##### Definition + /// This metric is derived from the minimum of [`tokio::runtime::RuntimeMetrics::worker_steal_operations`] + /// across all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::total_steal_operations`] + /// - [`RuntimeMetrics::max_steal_operations`] + pub min_steal_operations: u64, + + /// The number of tasks scheduled from **outside** of the runtime. + /// + /// The remote schedule count increases by one each time a task is woken from **outside** of + /// the runtime. This usually means that a task is spawned or notified from a non-runtime + /// thread and must be queued using the Runtime's global queue, which tends to be slower. + /// + /// ##### Definition + /// This metric is derived from [`tokio::runtime::RuntimeMetrics::remote_schedule_count`]. + /// + /// ##### Examples + /// In the below example, a remote schedule is induced by spawning a system thread, then + /// spawning a tokio task from that system thread: + /// ``` + /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)] + /// async fn main() { + /// let handle = tokio::runtime::Handle::current(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// let interval = next_interval(); // end of first sampling interval + /// assert_eq!(interval.num_remote_schedules, 0); + /// + /// // spawn a non-runtime thread + /// std::thread::spawn(move || { + /// // spawn two tasks from this non-runtime thread + /// async move { + /// handle.spawn(async {}).await; + /// handle.spawn(async {}).await; + /// } + /// }).join().unwrap().await; + /// + /// let interval = next_interval(); // end of second sampling interval + /// assert_eq!(interval.num_remote_schedules, 2); + /// + /// let interval = next_interval(); // end of third sampling interval + /// assert_eq!(interval.num_remote_schedules, 0); + /// } + /// ``` + pub num_remote_schedules: u64, + + /// The number of tasks scheduled from worker threads. + /// + /// The local schedule count increases by one each time a task is woken from **inside** of the + /// runtime. This usually means that a task is spawned or notified from within a runtime thread + /// and will be queued on the worker-local queue. + /// + /// ##### Definition + /// This metric is derived from the sum of + /// [`tokio::runtime::RuntimeMetrics::worker_local_schedule_count`] across all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::min_local_schedule_count`] + /// - [`RuntimeMetrics::max_local_schedule_count`] + /// + /// ##### Examples + /// ###### With `current_thread` runtime + /// In the below example, two tasks are spawned from the context of a third tokio task: + /// ``` + /// #[tokio::main(flavor = "current_thread")] + /// async fn main() { + /// let handle = tokio::runtime::Handle::current(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// let interval = { flush_metrics().await; next_interval() }; // end interval 2 + /// assert_eq!(interval.total_local_schedule_count, 0); + /// + /// let task = async { + /// tokio::spawn(async {}); // local schedule 1 + /// tokio::spawn(async {}); // local schedule 2 + /// }; + /// + /// let handle = tokio::spawn(task); // local schedule 3 + /// + /// let interval = { flush_metrics().await; next_interval() }; // end interval 2 + /// assert_eq!(interval.total_local_schedule_count, 3); + /// + /// let _ = handle.await; + /// + /// let interval = { flush_metrics().await; next_interval() }; // end interval 3 + /// assert_eq!(interval.total_local_schedule_count, 0); + /// } + /// + /// async fn flush_metrics() { + /// tokio::task::yield_now().await; + /// } + /// ``` + /// + /// ###### With `multi_thread` runtime + /// In the below example, 100 tasks are spawned: + /// ``` + /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)] + /// async fn main() { + /// let handle = tokio::runtime::Handle::current(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// let interval = next_interval(); // end of interval 1 + /// assert_eq!(interval.total_local_schedule_count, 0); + /// + /// use std::sync::atomic::{AtomicBool, Ordering}; + /// static SPINLOCK: AtomicBool = AtomicBool::new(true); + /// + /// // block the other worker thread + /// tokio::spawn(async { + /// while SPINLOCK.load(Ordering::SeqCst) {} + /// }); + /// + /// // FIXME: why does this need to be in a `spawn`? + /// let _ = tokio::spawn(async { + /// // spawn 100 tasks + /// for _ in 0..100 { + /// tokio::spawn(async {}); + /// } + /// // this spawns 1 more task + /// flush_metrics().await; + /// }).await; + /// + /// // unblock the other worker thread + /// SPINLOCK.store(false, Ordering::SeqCst); + /// + /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2 + /// assert_eq!(interval.total_local_schedule_count, 100 + 1); + /// } + /// + /// async fn flush_metrics() { + /// let _ = tokio::time::sleep(std::time::Duration::ZERO).await; + /// } + /// ``` + pub total_local_schedule_count: u64, + + /// The maximum number of tasks scheduled from any one worker thread. + /// + /// ##### Definition + /// This metric is derived from the maximum of + /// [`tokio::runtime::RuntimeMetrics::worker_local_schedule_count`] for all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::total_local_schedule_count`] + /// - [`RuntimeMetrics::min_local_schedule_count`] + pub max_local_schedule_count: u64, + + /// The minimum number of tasks scheduled from any one worker thread. + /// + /// ##### Definition + /// This metric is derived from the minimum of + /// [`tokio::runtime::RuntimeMetrics::worker_local_schedule_count`] for all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::total_local_schedule_count`] + /// - [`RuntimeMetrics::max_local_schedule_count`] + pub min_local_schedule_count: u64, + + /// The number of times worker threads saturated their local queues. + /// + /// The worker steal count increases by one each time the worker attempts to schedule a task + /// locally, but its local queue is full. When this happens, half of the + /// local queue is moved to the global queue. + /// + /// This metric only applies to the **multi-threaded** scheduler. + /// + /// ##### Definition + /// This metric is derived from the sum of + /// [`tokio::runtime::RuntimeMetrics::worker_overflow_count`] across all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::min_overflow_count`] + /// - [`RuntimeMetrics::max_overflow_count`] + /// + /// ##### Examples + /// ``` + /// #[tokio::main(flavor = "multi_thread", worker_threads = 1)] + /// async fn main() { + /// let handle = tokio::runtime::Handle::current(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// let interval = next_interval(); // end of interval 1 + /// assert_eq!(interval.total_overflow_count, 0); + /// + /// use std::sync::atomic::{AtomicBool, Ordering}; + /// + /// // spawn a ton of tasks + /// let _ = tokio::spawn(async { + /// // we do this in a `tokio::spawn` because it is impossible to + /// // overflow the main task + /// for _ in 0..300 { + /// tokio::spawn(async {}); + /// } + /// }).await; + /// + /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2 + /// assert_eq!(interval.total_overflow_count, 1); + /// } + /// + /// async fn flush_metrics() { + /// let _ = tokio::time::sleep(std::time::Duration::from_millis(1)).await; + /// } + /// ``` + pub total_overflow_count: u64, + + /// The maximum number of times any one worker saturated its local queue. + /// + /// ##### Definition + /// This metric is derived from the maximum of + /// [`tokio::runtime::RuntimeMetrics::worker_overflow_count`] across all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::total_overflow_count`] + /// - [`RuntimeMetrics::min_overflow_count`] + pub max_overflow_count: u64, + + /// The minimum number of times any one worker saturated its local queue. + /// + /// ##### Definition + /// This metric is derived from the maximum of + /// [`tokio::runtime::RuntimeMetrics::worker_overflow_count`] across all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::total_overflow_count`] + /// - [`RuntimeMetrics::max_overflow_count`] + pub min_overflow_count: u64, + + /// The number of tasks that have been polled across all worker threads. + /// + /// The worker poll count increases by one each time a worker polls a scheduled task. + /// + /// ##### Definition + /// This metric is derived from the sum of + /// [`tokio::runtime::RuntimeMetrics::worker_poll_count`] across all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::min_polls_count`] + /// - [`RuntimeMetrics::max_polls_count`] + /// + /// ##### Examples + /// In the below example, 42 tasks are spawned and polled: + /// ``` + /// #[tokio::main(flavor = "current_thread")] + /// async fn main() { + /// let handle = tokio::runtime::Handle::current(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// let interval = { flush_metrics().await; next_interval() }; // end of interval 1 + /// assert_eq!(interval.total_polls_count, 0); + /// assert_eq!(interval.min_polls_count, 0); + /// assert_eq!(interval.max_polls_count, 0); + /// + /// const N: u64 = 42; + /// + /// for _ in 0..N { + /// let _ = tokio::spawn(async {}).await; + /// } + /// + /// let interval = { flush_metrics().await; next_interval() }; // end of interval 2 + /// assert_eq!(interval.total_polls_count, N); + /// assert_eq!(interval.min_polls_count, N); + /// assert_eq!(interval.max_polls_count, N); + /// } + /// + /// async fn flush_metrics() { + /// let _ = tokio::task::yield_now().await; + /// } + /// ``` + pub total_polls_count: u64, + + /// The maximum number of tasks that have been polled in any worker thread. + /// + /// ##### Definition + /// This metric is derived from the maximum of + /// [`tokio::runtime::RuntimeMetrics::worker_poll_count`] across all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::total_polls_count`] + /// - [`RuntimeMetrics::min_polls_count`] + pub max_polls_count: u64, + + /// The minimum number of tasks that have been polled in any worker thread. + /// + /// ##### Definition + /// This metric is derived from the minimum of + /// [`tokio::runtime::RuntimeMetrics::worker_poll_count`] across all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::total_polls_count`] + /// - [`RuntimeMetrics::max_polls_count`] + pub min_polls_count: u64, + + /// The total number of tasks currently scheduled in workers' local queues. + /// + /// Tasks that are spawned or notified from within a runtime thread are scheduled using that + /// worker's local queue. This metric returns the **current** number of tasks pending in all + /// workers' local queues. As such, the returned value may increase or decrease as new tasks + /// are scheduled and processed. + /// + /// ##### Definition + /// This metric is derived from [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`]. + /// + /// ##### See also + /// - [`RuntimeMetrics::min_local_queue_depth`] + /// - [`RuntimeMetrics::max_local_queue_depth`] + /// + /// ##### Example + /// + /// ###### With `current_thread` runtime + /// The below example spawns 100 tasks: + /// ``` + /// #[tokio::main(flavor = "current_thread")] + /// async fn main() { + /// const N: usize = 100; + /// + /// let handle = tokio::runtime::Handle::current(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// let interval = next_interval(); // end of interval 1 + /// assert_eq!(interval.total_local_queue_depth, 0); + /// + /// + /// for _ in 0..N { + /// tokio::spawn(async {}); + /// } + /// let interval = next_interval(); // end of interval 2 + /// assert_eq!(interval.total_local_queue_depth, N); + /// } + /// ``` + /// + /// ###### With `multi_thread` runtime + /// The below example spawns 100 tasks: + /// ``` + /// #[tokio::main(flavor = "multi_thread", worker_threads = 2)] + /// async fn main() { + /// const N: usize = 100; + /// + /// let handle = tokio::runtime::Handle::current(); + /// let monitor = tokio_metrics::RuntimeMonitor::new(&handle); + /// let mut intervals = monitor.intervals(); + /// let mut next_interval = || intervals.next().unwrap(); + /// + /// let interval = next_interval(); // end of interval 1 + /// assert_eq!(interval.total_local_queue_depth, 0); + /// + /// use std::sync::atomic::{AtomicBool, Ordering}; + /// static SPINLOCK_A: AtomicBool = AtomicBool::new(true); + /// + /// // block the other worker thread + /// tokio::spawn(async { + /// while SPINLOCK_A.load(Ordering::SeqCst) {} + /// }); + /// + /// static SPINLOCK_B: AtomicBool = AtomicBool::new(true); + /// + /// let _ = tokio::spawn(async { + /// for _ in 0..N { + /// tokio::spawn(async { + /// while SPINLOCK_B.load(Ordering::SeqCst) {} + /// }); + /// } + /// }).await; + /// + /// // unblock the other worker thread + /// SPINLOCK_A.store(false, Ordering::SeqCst); + /// + /// let interval = next_interval(); // end of interval 2 + /// assert_eq!(interval.total_local_queue_depth, N - 1); + /// + /// SPINLOCK_B.store(false, Ordering::SeqCst); + /// } + /// ``` + pub total_local_queue_depth: usize, + + /// The maximum number of tasks currently scheduled any worker's local queue. + /// + /// ##### Definition + /// This metric is derived from the maximum of + /// [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`] across all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::total_local_queue_depth`] + /// - [`RuntimeMetrics::min_local_queue_depth`] + pub max_local_queue_depth: usize, + + /// The minimum number of tasks currently scheduled any worker's local queue. + /// + /// ##### Definition + /// This metric is derived from the minimum of + /// [`tokio::runtime::RuntimeMetrics::worker_local_queue_depth`] across all worker threads. + /// + /// ##### See also + /// - [`RuntimeMetrics::total_local_queue_depth`] + /// - [`RuntimeMetrics::max_local_queue_depth`] + pub min_local_queue_depth: usize, + + /// The number of tasks currently waiting to be executed in the runtime's blocking threadpool. + /// + /// ##### Definition + /// This metric is derived from [`tokio::runtime::RuntimeMetrics::blocking_queue_depth`]. + pub blocking_queue_depth: usize, + + /// The current number of alive tasks in the runtime. + /// + /// ##### Definition + /// This metric is derived from [`tokio::runtime::RuntimeMetrics::num_alive_tasks`]. + pub live_tasks_count: usize, + + /// The number of additional threads spawned by the runtime. + /// + /// ##### Definition + /// This metric is derived from [`tokio::runtime::RuntimeMetrics::num_blocking_threads`]. + pub blocking_threads_count: usize, + + /// The number of idle threads, which have spawned by the runtime for `spawn_blocking` calls. + /// + /// ##### Definition + /// This metric is derived from [`tokio::runtime::RuntimeMetrics::num_idle_blocking_threads`]. + pub idle_blocking_threads_count: usize, + + /// Returns the number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets. + /// + /// This count starts at zero when the runtime is created and increases by one each time a task yields due to exhausting its budget. + /// + /// The counter is monotonically increasing. It is never decremented or reset to zero. + /// + /// ##### Definition + /// This metric is derived from [`tokio::runtime::RuntimeMetrics::budget_forced_yield_count`]. + pub budget_forced_yield_count: u64, + + /// Returns the number of ready events processed by the runtime’s I/O driver. + /// + /// ##### Definition + /// This metric is derived from [`tokio::runtime::RuntimeMetrics::io_driver_ready_count`]. + pub io_driver_ready_count: u64, + } } -/// Snapshot of per-worker metrics -#[derive(Debug)] -struct Worker { - worker: usize, - total_park_count: u64, - total_noop_count: u64, - total_steal_count: u64, - total_steal_operations: u64, - total_local_schedule_count: u64, - total_overflow_count: u64, - total_polls_count: u64, - total_busy_duration: Duration, - poll_time_histogram: Vec, +macro_rules! define_semi_stable { + ( + $(#[$($attributes:tt)*])* + $vis:vis struct $name:ident { + stable { + $($stable_name:ident: $stable_ty:ty),* + $(,)? + } + $(,)? + unstable { + $($unstable_name:ident: $unstable_ty:ty),* + $(,)? + } + } + ) => { + $(#[$($attributes)*])* + $vis struct $name { + $( + $stable_name: $stable_ty, + )* + $( + #[cfg(tokio_unstable)] + #[cfg_attr(docsrs, doc(cfg(all(feature = "rt", tokio_unstable))))] + $unstable_name: $unstable_ty, + )* + } + }; } -/// Iterator returned by [`RuntimeMonitor::intervals`]. -/// -/// See that method's documentation for more details. -#[derive(Debug)] -pub struct RuntimeIntervals { - runtime: runtime::RuntimeMetrics, - started_at: Instant, - workers: Vec, +define_semi_stable! { + /// Snapshot of per-worker metrics + #[derive(Debug, Default)] + struct Worker { + stable { + worker: usize, + total_park_count: u64, + total_busy_duration: Duration, + } + unstable { + total_noop_count: u64, + total_steal_count: u64, + total_steal_operations: u64, + total_local_schedule_count: u64, + total_overflow_count: u64, + total_polls_count: u64, + poll_time_histogram: Vec, + } + } +} - // Number of tasks scheduled from *outside* of the runtime - num_remote_schedules: u64, - budget_forced_yield_count: u64, - io_driver_ready_count: u64, +define_semi_stable! { + /// Iterator returned by [`RuntimeMonitor::intervals`]. + /// + /// See that method's documentation for more details. + #[derive(Debug)] + pub struct RuntimeIntervals { + stable { + runtime: runtime::RuntimeMetrics, + started_at: Instant, + workers: Vec, + } + unstable { + // Number of tasks scheduled from *outside* of the runtime + num_remote_schedules: u64, + budget_forced_yield_count: u64, + io_driver_ready_count: u64, + } + } } impl RuntimeIntervals { fn probe(&mut self) -> RuntimeMetrics { let now = Instant::now(); - let num_remote_schedules = self.runtime.remote_schedule_count(); - let budget_forced_yields = self.runtime.budget_forced_yield_count(); - let io_driver_ready_events = self.runtime.io_driver_ready_count(); - let mut metrics = RuntimeMetrics { workers_count: self.runtime.num_workers(), elapsed: now - self.started_at, global_queue_depth: self.runtime.global_queue_depth(), - num_remote_schedules: num_remote_schedules - self.num_remote_schedules, min_park_count: u64::MAX, - min_noop_count: u64::MAX, - min_steal_count: u64::MAX, - min_local_schedule_count: u64::MAX, - min_overflow_count: u64::MAX, - min_polls_count: u64::MAX, min_busy_duration: Duration::from_secs(1000000000), - min_local_queue_depth: usize::MAX, - mean_poll_duration_worker_min: Duration::MAX, - poll_time_histogram: vec![0; self.runtime.poll_time_histogram_num_buckets()], - budget_forced_yield_count: budget_forced_yields - self.budget_forced_yield_count, - io_driver_ready_count: io_driver_ready_events - self.io_driver_ready_count, ..Default::default() }; - self.num_remote_schedules = num_remote_schedules; + #[cfg(tokio_unstable)] + { + let num_remote_schedules = self.runtime.remote_schedule_count(); + let budget_forced_yields = self.runtime.budget_forced_yield_count(); + let io_driver_ready_events = self.runtime.io_driver_ready_count(); + + metrics.num_remote_schedules = num_remote_schedules - self.num_remote_schedules; + metrics.min_noop_count = u64::MAX; + metrics.min_steal_count = u64::MAX; + metrics.min_local_schedule_count = u64::MAX; + metrics.min_overflow_count = u64::MAX; + metrics.min_polls_count = u64::MAX; + metrics.min_local_queue_depth = usize::MAX; + metrics.mean_poll_duration_worker_min = Duration::MAX; + metrics.poll_time_histogram = vec![0; self.runtime.poll_time_histogram_num_buckets()]; + metrics.budget_forced_yield_count = + budget_forced_yields - self.budget_forced_yield_count; + metrics.io_driver_ready_count = io_driver_ready_events - self.io_driver_ready_count; + + self.num_remote_schedules = num_remote_schedules; + self.budget_forced_yield_count = budget_forced_yields; + self.io_driver_ready_count = io_driver_ready_events; + } self.started_at = now; - self.budget_forced_yield_count = budget_forced_yields; - self.io_driver_ready_count = io_driver_ready_events; for worker in &mut self.workers { worker.probe(&self.runtime, &mut metrics); } - if metrics.total_polls_count == 0 { - debug_assert_eq!(metrics.mean_poll_duration, Duration::default()); + #[cfg(tokio_unstable)] + { + if metrics.total_polls_count == 0 { + debug_assert_eq!(metrics.mean_poll_duration, Duration::default()); - metrics.mean_poll_duration_worker_max = Duration::default(); - metrics.mean_poll_duration_worker_min = Duration::default(); + metrics.mean_poll_duration_worker_max = Duration::default(); + metrics.mean_poll_duration_worker_min = Duration::default(); + } } metrics @@ -1304,8 +1390,12 @@ impl RuntimeMonitor { runtime: self.runtime.clone(), started_at, workers, + + #[cfg(tokio_unstable)] num_remote_schedules: self.runtime.remote_schedule_count(), + #[cfg(tokio_unstable)] budget_forced_yield_count: self.runtime.budget_forced_yield_count(), + #[cfg(tokio_unstable)] io_driver_ready_count: self.runtime.io_driver_ready_count(), } } @@ -1313,24 +1403,30 @@ impl RuntimeMonitor { impl Worker { fn new(worker: usize, rt: &runtime::RuntimeMetrics) -> Worker { - let poll_time_histogram = if rt.poll_time_histogram_enabled() { - vec![0; rt.poll_time_histogram_num_buckets()] - } else { - vec![] - }; - - Worker { + #[allow(unused_mut, clippy::needless_update)] + let mut wrk = Worker { worker, total_park_count: rt.worker_park_count(worker), - total_noop_count: rt.worker_noop_count(worker), - total_steal_count: rt.worker_steal_count(worker), - total_steal_operations: rt.worker_steal_operations(worker), - total_local_schedule_count: rt.worker_local_schedule_count(worker), - total_overflow_count: rt.worker_overflow_count(worker), - total_polls_count: rt.worker_poll_count(worker), total_busy_duration: rt.worker_total_busy_duration(worker), - poll_time_histogram, - } + ..Default::default() + }; + + #[cfg(tokio_unstable)] + { + let poll_time_histogram = if rt.poll_time_histogram_enabled() { + vec![0; rt.poll_time_histogram_num_buckets()] + } else { + vec![] + }; + wrk.total_noop_count = rt.worker_noop_count(worker); + wrk.total_steal_count = rt.worker_steal_count(worker); + wrk.total_steal_operations = rt.worker_steal_operations(worker); + wrk.total_local_schedule_count = rt.worker_local_schedule_count(worker); + wrk.total_overflow_count = rt.worker_overflow_count(worker); + wrk.total_polls_count = rt.worker_poll_count(worker); + wrk.poll_time_histogram = poll_time_histogram; + }; + wrk } fn probe(&mut self, rt: &runtime::RuntimeMetrics, metrics: &mut RuntimeMetrics) { @@ -1352,51 +1448,12 @@ impl Worker { }}; } - let mut worker_polls_count = self.total_polls_count; - let total_polls_count = metrics.total_polls_count; - metric!( total_park_count, max_park_count, min_park_count, worker_park_count ); - metric!( - total_noop_count, - max_noop_count, - min_noop_count, - worker_noop_count - ); - metric!( - total_steal_count, - max_steal_count, - min_steal_count, - worker_steal_count - ); - metric!( - total_steal_operations, - max_steal_operations, - min_steal_operations, - worker_steal_operations - ); - metric!( - total_local_schedule_count, - max_local_schedule_count, - min_local_schedule_count, - worker_local_schedule_count - ); - metric!( - total_overflow_count, - max_overflow_count, - min_overflow_count, - worker_overflow_count - ); - metric!( - total_polls_count, - max_polls_count, - min_polls_count, - worker_poll_count - ); metric!( total_busy_duration, max_busy_duration, @@ -1404,71 +1461,115 @@ impl Worker { worker_total_busy_duration ); - // Get the number of polls since last probe - worker_polls_count = self.total_polls_count - worker_polls_count; + #[cfg(tokio_unstable)] + { + let mut worker_polls_count = self.total_polls_count; + let total_polls_count = metrics.total_polls_count; + + metric!( + total_noop_count, + max_noop_count, + min_noop_count, + worker_noop_count + ); + metric!( + total_steal_count, + max_steal_count, + min_steal_count, + worker_steal_count + ); + metric!( + total_steal_operations, + max_steal_operations, + min_steal_operations, + worker_steal_operations + ); + metric!( + total_local_schedule_count, + max_local_schedule_count, + min_local_schedule_count, + worker_local_schedule_count + ); + metric!( + total_overflow_count, + max_overflow_count, + min_overflow_count, + worker_overflow_count + ); + metric!( + total_polls_count, + max_polls_count, + min_polls_count, + worker_poll_count + ); + + // Get the number of polls since last probe + worker_polls_count = self.total_polls_count - worker_polls_count; + + // Update the mean task poll duration if there were polls + if worker_polls_count > 0 { + let val = rt.worker_mean_poll_time(self.worker); + + if val > metrics.mean_poll_duration_worker_max { + metrics.mean_poll_duration_worker_max = val; + } - // Update the mean task poll duration if there were polls - if worker_polls_count > 0 { - let val = rt.worker_mean_poll_time(self.worker); + if val < metrics.mean_poll_duration_worker_min { + metrics.mean_poll_duration_worker_min = val; + } - if val > metrics.mean_poll_duration_worker_max { - metrics.mean_poll_duration_worker_max = val; - } + // First, scale the current value down + let ratio = total_polls_count as f64 / metrics.total_polls_count as f64; + let mut mean = metrics.mean_poll_duration.as_nanos() as f64 * ratio; - if val < metrics.mean_poll_duration_worker_min { - metrics.mean_poll_duration_worker_min = val; - } + // Add the scaled current worker's mean poll duration + let ratio = worker_polls_count as f64 / metrics.total_polls_count as f64; + mean += val.as_nanos() as f64 * ratio; - // First, scale the current value down - let ratio = total_polls_count as f64 / metrics.total_polls_count as f64; - let mut mean = metrics.mean_poll_duration.as_nanos() as f64 * ratio; + metrics.mean_poll_duration = Duration::from_nanos(mean as u64); + } - // Add the scaled current worker's mean poll duration - let ratio = worker_polls_count as f64 / metrics.total_polls_count as f64; - mean += val.as_nanos() as f64 * ratio; + // Update the histogram counts if there were polls since last count + if worker_polls_count > 0 { + for (bucket, cell) in metrics.poll_time_histogram.iter_mut().enumerate() { + let new = rt.poll_time_histogram_bucket_count(self.worker, bucket); + let delta = new - self.poll_time_histogram[bucket]; + self.poll_time_histogram[bucket] = new; - metrics.mean_poll_duration = Duration::from_nanos(mean as u64); - } - - // Update the histogram counts if there were polls since last count - if worker_polls_count > 0 { - for (bucket, cell) in metrics.poll_time_histogram.iter_mut().enumerate() { - let new = rt.poll_time_histogram_bucket_count(self.worker, bucket); - let delta = new - self.poll_time_histogram[bucket]; - self.poll_time_histogram[bucket] = new; - - *cell += delta; + *cell += delta; + } } - } - // Local scheduled tasks is an absolute value - let local_scheduled_tasks = rt.worker_local_queue_depth(self.worker); - metrics.total_local_queue_depth += local_scheduled_tasks; + // Local scheduled tasks is an absolute value + let local_scheduled_tasks = rt.worker_local_queue_depth(self.worker); + metrics.total_local_queue_depth += local_scheduled_tasks; - if local_scheduled_tasks > metrics.max_local_queue_depth { - metrics.max_local_queue_depth = local_scheduled_tasks; - } + if local_scheduled_tasks > metrics.max_local_queue_depth { + metrics.max_local_queue_depth = local_scheduled_tasks; + } - if local_scheduled_tasks < metrics.min_local_queue_depth { - metrics.min_local_queue_depth = local_scheduled_tasks; - } + if local_scheduled_tasks < metrics.min_local_queue_depth { + metrics.min_local_queue_depth = local_scheduled_tasks; + } - // Blocking queue depth is an absolute value too - metrics.blocking_queue_depth = rt.blocking_queue_depth(); + // Blocking queue depth is an absolute value too + metrics.blocking_queue_depth = rt.blocking_queue_depth(); - #[allow(deprecated)] - { - // use the deprecated active_tasks_count here to support slightly older versions of Tokio, - // it's the same. - metrics.live_tasks_count = rt.active_tasks_count(); + #[allow(deprecated)] + { + // use the deprecated active_tasks_count here to support slightly older versions of Tokio, + // it's the same. + metrics.live_tasks_count = rt.active_tasks_count(); + } + metrics.blocking_threads_count = rt.num_blocking_threads(); + metrics.idle_blocking_threads_count = rt.num_idle_blocking_threads(); } - metrics.blocking_threads_count = rt.num_blocking_threads(); - metrics.idle_blocking_threads_count = rt.num_idle_blocking_threads(); } } impl RuntimeMetrics { /// Returns the ratio of the [`RuntimeMetrics::total_polls_count`] to the [`RuntimeMetrics::total_noop_count`]. + #[cfg(tokio_unstable)] pub fn mean_polls_per_park(&self) -> f64 { let total_park_count = self.total_park_count - self.total_noop_count; if total_park_count == 0 { diff --git a/src/runtime/metrics_rs_integration.rs b/src/runtime/metrics_rs_integration.rs index 6b7692e..45685e4 100644 --- a/src/runtime/metrics_rs_integration.rs +++ b/src/runtime/metrics_rs_integration.rs @@ -14,11 +14,11 @@ use super::{RuntimeIntervals, RuntimeMetrics, RuntimeMonitor}; /// can use the [`with_metrics_transformer`] function to customize the metric names. /// /// ### Usage -/// +/// /// To upload metrics via [metrics-rs], you need to set up a reporter, which /// is actually what exports the metrics outside of the program. You must set /// up the reporter before you call [`describe_and_run`]. -/// +/// /// You can find exporters within the [metrics-rs] docs. One such reporter /// is the [metrics_exporter_prometheus] reporter, which makes metrics visible /// through Prometheus. @@ -53,7 +53,7 @@ use super::{RuntimeIntervals, RuntimeMetrics, RuntimeMonitor}; /// .unwrap(); /// } /// ``` -/// +/// /// [`describe_and_run`]: RuntimeMetricsReporterBuilder::describe_and_run /// [`with_metrics_transformer`]: RuntimeMetricsReporterBuilder::with_metrics_transformer /// [metrics-rs]: metrics @@ -66,9 +66,9 @@ pub struct RuntimeMetricsReporterBuilder { impl fmt::Debug for RuntimeMetricsReporterBuilder { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("RuntimeMetricsReporterBuilder") - .field("interval", &self.interval) - // skip metrics_transformer field - .finish() + .field("interval", &self.interval) + // skip metrics_transformer field + .finish() } } @@ -105,7 +105,7 @@ impl RuntimeMetricsReporterBuilder { /// Set a custom "metrics transformer", which is used during `build` to transform the metric /// names into metric keys, for example to add dimensions. The string metric names used by this reporter /// all start with `tokio_`. The default transformer is just [`metrics::Key::from_static_name`] - /// + /// /// For example, to attach a dimension named "application" with value "my_app", and to replace /// `tokio_` with `my_app_` /// ``` @@ -126,7 +126,10 @@ impl RuntimeMetricsReporterBuilder { /// ); /// } /// ``` - pub fn with_metrics_transformer(mut self, transformer: impl FnMut(&'static str) -> metrics::Key + Send + 'static) -> Self { + pub fn with_metrics_transformer( + mut self, + transformer: impl FnMut(&'static str) -> metrics::Key + Send + 'static, + ) -> Self { self.metrics_transformer = Box::new(transformer); self } @@ -134,7 +137,7 @@ impl RuntimeMetricsReporterBuilder { /// Build the [`RuntimeMetricsReporter`] for the current Tokio runtime. This function will capture /// the [`Counter`]s, [`Gauge`]s and [`Histogram`]s from the current [metrics-rs] reporter, /// so if you are using [`with_local_recorder`], you should wrap this function and [`describe`] with it. - /// + /// /// For example: /// ``` /// # use std::sync::Arc; @@ -144,7 +147,7 @@ impl RuntimeMetricsReporterBuilder { /// let builder = tokio_metrics::RuntimeMetricsReporterBuilder::default(); /// let recorder = Arc::new(metrics_util::debugging::DebuggingRecorder::new()); /// let metrics_reporter = metrics::with_local_recorder(&recorder, || builder.describe().build()); - /// + /// /// // no need to wrap `run()`, since the metrics are already captured /// tokio::task::spawn(metrics_reporter.run()); /// } @@ -166,7 +169,7 @@ impl RuntimeMetricsReporterBuilder { /// the [`Counter`]s, [`Gauge`]s and [`Histogram`]s from the current [metrics-rs] reporter, /// so if you are using [`with_local_recorder`], you should wrap this function and [`describe`] /// with it. - /// + /// /// [`Counter`]: metrics::Counter /// [`Gauge`]: metrics::Counter /// [`Histogram`]: metrics::Counter @@ -188,7 +191,7 @@ impl RuntimeMetricsReporterBuilder { /// which makes them easier to use. However, some reporters don't support /// describing the same metric name more than once, so it is generally a good /// idea to only call this function once per metric reporter. - /// + /// /// [`describe_counter`]: metrics::describe_counter /// [metrics-rs]: metrics pub fn describe(mut self) -> Self { @@ -203,13 +206,13 @@ impl RuntimeMetricsReporterBuilder { /// describing the same metric name more than once. If you are emitting multiple /// metrics via a single reporter, try to call [`describe`] once and [`run`] for each /// runtime metrics reporter. - /// + /// /// ### Working with a custom reporter /// /// If you want to set a local metrics reporter, you shouldn't be calling this method, /// but you should instead call `.describe().build()` within [`with_local_recorder`] and then /// call `run` (see the docs on [`build`]). - /// + /// /// [describing]: Self::describe /// [`describe`]: Self::describe /// [`build`]: Self::build. @@ -243,71 +246,102 @@ pub struct RuntimeMetricsReporter { } macro_rules! kind_to_type { - (Counter) => (metrics::Counter); - (Gauge) => (metrics::Gauge); - (Histogram) => (metrics::Histogram); + (Counter) => { + metrics::Counter + }; + (Gauge) => { + metrics::Gauge + }; + (Histogram) => { + metrics::Histogram + }; } macro_rules! metric_key { - ($transform_fn:ident, $name:ident) => ($transform_fn(concat!("tokio_", stringify!($name)))) + ($transform_fn:ident, $name:ident) => { + $transform_fn(concat!("tokio_", stringify!($name))) + }; } // calling `trim` since /// inserts spaces into docs macro_rules! describe_metric_ref { - ($transform_fn:ident, $doc:expr, $name:ident: Counter<$unit:ident> []) => ( - metrics::describe_counter!(metric_key!($transform_fn, $name).name().to_owned(), metrics::Unit::$unit, $doc.trim()) - ); - ($transform_fn:ident, $doc:expr, $name:ident: Gauge<$unit:ident> []) => ( - metrics::describe_gauge!(metric_key!($transform_fn, $name).name().to_owned(), metrics::Unit::$unit, $doc.trim()) - ); - ($transform_fn:ident, $doc:expr, $name:ident: Histogram<$unit:ident> []) => ( - metrics::describe_histogram!(metric_key!($transform_fn, $name).name().to_owned(), metrics::Unit::$unit, $doc.trim()) - ); + ($transform_fn:ident, $doc:expr, $name:ident: Counter<$unit:ident> []) => { + metrics::describe_counter!( + metric_key!($transform_fn, $name).name().to_owned(), + metrics::Unit::$unit, + $doc.trim() + ) + }; + ($transform_fn:ident, $doc:expr, $name:ident: Gauge<$unit:ident> []) => { + metrics::describe_gauge!( + metric_key!($transform_fn, $name).name().to_owned(), + metrics::Unit::$unit, + $doc.trim() + ) + }; + ($transform_fn:ident, $doc:expr, $name:ident: Histogram<$unit:ident> []) => { + metrics::describe_histogram!( + metric_key!($transform_fn, $name).name().to_owned(), + metrics::Unit::$unit, + $doc.trim() + ) + }; } macro_rules! capture_metric_ref { - ($transform_fn:ident, $name:ident: Counter []) => ( - { - let (name, labels) = metric_key!($transform_fn, $name).into_parts(); - metrics::counter!(name, labels) - } - ); - ($transform_fn:ident, $name:ident: Gauge []) => ( - { - let (name, labels) = metric_key!($transform_fn, $name).into_parts(); - metrics::gauge!(name, labels) - } - ); - ($transform_fn:ident, $name:ident: Histogram []) => ( - { - let (name, labels) = metric_key!($transform_fn, $name).into_parts(); - metrics::histogram!(name, labels) - } - ); + ($transform_fn:ident, $name:ident: Counter []) => {{ + let (name, labels) = metric_key!($transform_fn, $name).into_parts(); + metrics::counter!(name, labels) + }}; + ($transform_fn:ident, $name:ident: Gauge []) => {{ + let (name, labels) = metric_key!($transform_fn, $name).into_parts(); + metrics::gauge!(name, labels) + }}; + ($transform_fn:ident, $name:ident: Histogram []) => {{ + let (name, labels) = metric_key!($transform_fn, $name).into_parts(); + metrics::histogram!(name, labels) + }}; } macro_rules! metric_refs { ( [$struct_name:ident] [$($ignore:ident),* $(,)?] { - $( - #[doc = $doc:tt] - $name:ident: $kind:tt <$unit:ident> $opts:tt - ),* - $(,)? + stable { + $( + #[doc = $doc:tt] + $name:ident: $kind:tt <$unit:ident> $opts:tt + ),* + $(,)? + } + unstable { + $( + #[doc = $unstable_doc:tt] + $unstable_name:ident: $unstable_kind:tt <$unstable_unit:ident> $unstable_opts:tt + ),* + $(,)? + } } ) => { struct $struct_name { $( - $name: kind_to_type!($kind) - ),* + $name: kind_to_type!($kind), + )* + $( + #[cfg(tokio_unstable)] + $unstable_name: kind_to_type!($unstable_kind), + )* } impl $struct_name { fn capture(transform_fn: &mut dyn FnMut(&'static str) -> metrics::Key) -> Self { Self { $( - $name: capture_metric_ref!(transform_fn, $name: $kind $opts) - ),* + $name: capture_metric_ref!(transform_fn, $name: $kind $opts), + )* + $( + #[cfg(tokio_unstable)] + $unstable_name: capture_metric_ref!(transform_fn, $unstable_name: $unstable_kind $unstable_opts), + )* } } @@ -315,12 +349,20 @@ macro_rules! metric_refs { $( MyMetricOp::op((&self.$name, metrics.$name), tokio); )* + $( + #[cfg(tokio_unstable)] + MyMetricOp::op((&self.$unstable_name, metrics.$unstable_name), tokio); + )* } fn describe(transform_fn: &mut dyn FnMut(&'static str) -> metrics::Key) { $( describe_metric_ref!(transform_fn, $doc, $name: $kind<$unit> $opts); )* + $( + #[cfg(tokio_unstable)] + describe_metric_ref!(transform_fn, $unstable_doc, $unstable_name: $unstable_kind<$unstable_unit> $unstable_opts); + )* } } @@ -346,6 +388,13 @@ macro_rules! metric_refs { continue } );* + $( + let expected = format!(" {}:", stringify!($unstable_name)); + eprintln!("{}", expected); + if line.contains(&expected) { + continue + } + );* panic!("missing metric {:?}", line); } } @@ -354,86 +403,90 @@ macro_rules! metric_refs { metric_refs! { [RuntimeMetricRefs] [elapsed] { - /// The number of worker threads used by the runtime - workers_count: Gauge [], - /// The number of times worker threads parked - max_park_count: Gauge [], - /// The minimum number of times any worker thread parked - min_park_count: Gauge [], - /// The number of times worker threads parked - total_park_count: Gauge [], - /// The average duration of a single invocation of poll on a task - mean_poll_duration: Gauge [], - /// The average duration of a single invocation of poll on a task on the worker with the lowest value - mean_poll_duration_worker_min: Gauge [], - /// The average duration of a single invocation of poll on a task on the worker with the highest value - mean_poll_duration_worker_max: Gauge [], - /// A histogram of task polls since the previous probe grouped by poll times - poll_time_histogram: Histogram [], - /// The number of times worker threads unparked but performed no work before parking again - total_noop_count: Counter [], - /// The maximum number of times any worker thread unparked but performed no work before parking again - max_noop_count: Counter [], - /// The minimum number of times any worker thread unparked but performed no work before parking again - min_noop_count: Counter [], - /// The number of tasks worker threads stole from another worker thread - total_steal_count: Counter [], - /// The maximum number of tasks any worker thread stole from another worker thread. - max_steal_count: Counter [], - /// The minimum number of tasks any worker thread stole from another worker thread - min_steal_count: Counter [], - /// The number of times worker threads stole tasks from another worker thread - total_steal_operations: Counter [], - /// The maximum number of times any worker thread stole tasks from another worker thread - max_steal_operations: Counter [], - /// The minimum number of times any worker thread stole tasks from another worker thread - min_steal_operations: Counter [], - /// The number of tasks scheduled from **outside** of the runtime - num_remote_schedules: Counter [], - /// The number of tasks scheduled from worker threads - total_local_schedule_count: Counter [], - /// The maximum number of tasks scheduled from any one worker thread - max_local_schedule_count: Counter [], - /// The minimum number of tasks scheduled from any one worker thread - min_local_schedule_count: Counter [], - /// The number of times worker threads saturated their local queues - total_overflow_count: Counter [], - /// The maximum number of times any one worker saturated its local queue - max_overflow_count: Counter [], - /// The minimum number of times any one worker saturated its local queue - min_overflow_count: Counter [], - /// The number of tasks that have been polled across all worker threads - total_polls_count: Counter [], - /// The maximum number of tasks that have been polled in any worker thread - max_polls_count: Counter [], - /// The minimum number of tasks that have been polled in any worker thread - min_polls_count: Counter [], - /// The amount of time worker threads were busy - total_busy_duration: Counter [], - /// The maximum amount of time a worker thread was busy - max_busy_duration: Counter [], - /// The minimum amount of time a worker thread was busy - min_busy_duration: Counter [], - /// The number of tasks currently scheduled in the runtime's global queue - global_queue_depth: Gauge [], - /// The total number of tasks currently scheduled in workers' local queues - total_local_queue_depth: Gauge [], - /// The maximum number of tasks currently scheduled any worker's local queue - max_local_queue_depth: Gauge [], - /// The minimum number of tasks currently scheduled any worker's local queue - min_local_queue_depth: Gauge [], - /// The number of tasks currently waiting to be executed in the runtime's blocking threadpool. - blocking_queue_depth: Gauge [], - /// The current number of alive tasks in the runtime. - live_tasks_count: Gauge [], - /// The number of additional threads spawned by the runtime. - blocking_threads_count: Gauge [], - /// The number of idle threads, which have spawned by the runtime for `spawn_blocking` calls. - idle_blocking_threads_count: Gauge [], - /// Returns the number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets - budget_forced_yield_count: Counter [], - /// Returns the number of ready events processed by the runtime’s I/O driver - io_driver_ready_count: Counter [], + stable { + /// The number of worker threads used by the runtime + workers_count: Gauge [], + /// The number of times worker threads parked + max_park_count: Gauge [], + /// The minimum number of times any worker thread parked + min_park_count: Gauge [], + /// The number of times worker threads parked + total_park_count: Gauge [], + /// The amount of time worker threads were busy + total_busy_duration: Counter [], + /// The maximum amount of time a worker thread was busy + max_busy_duration: Counter [], + /// The minimum amount of time a worker thread was busy + min_busy_duration: Counter [], + /// The number of tasks currently scheduled in the runtime's global queue + global_queue_depth: Gauge [], + } + unstable { + /// The average duration of a single invocation of poll on a task + mean_poll_duration: Gauge [], + /// The average duration of a single invocation of poll on a task on the worker with the lowest value + mean_poll_duration_worker_min: Gauge [], + /// The average duration of a single invocation of poll on a task on the worker with the highest value + mean_poll_duration_worker_max: Gauge [], + /// A histogram of task polls since the previous probe grouped by poll times + poll_time_histogram: Histogram [], + /// The number of times worker threads unparked but performed no work before parking again + total_noop_count: Counter [], + /// The maximum number of times any worker thread unparked but performed no work before parking again + max_noop_count: Counter [], + /// The minimum number of times any worker thread unparked but performed no work before parking again + min_noop_count: Counter [], + /// The number of tasks worker threads stole from another worker thread + total_steal_count: Counter [], + /// The maximum number of tasks any worker thread stole from another worker thread. + max_steal_count: Counter [], + /// The minimum number of tasks any worker thread stole from another worker thread + min_steal_count: Counter [], + /// The number of times worker threads stole tasks from another worker thread + total_steal_operations: Counter [], + /// The maximum number of times any worker thread stole tasks from another worker thread + max_steal_operations: Counter [], + /// The minimum number of times any worker thread stole tasks from another worker thread + min_steal_operations: Counter [], + /// The number of tasks scheduled from **outside** of the runtime + num_remote_schedules: Counter [], + /// The number of tasks scheduled from worker threads + total_local_schedule_count: Counter [], + /// The maximum number of tasks scheduled from any one worker thread + max_local_schedule_count: Counter [], + /// The minimum number of tasks scheduled from any one worker thread + min_local_schedule_count: Counter [], + /// The number of times worker threads saturated their local queues + total_overflow_count: Counter [], + /// The maximum number of times any one worker saturated its local queue + max_overflow_count: Counter [], + /// The minimum number of times any one worker saturated its local queue + min_overflow_count: Counter [], + /// The number of tasks that have been polled across all worker threads + total_polls_count: Counter [], + /// The maximum number of tasks that have been polled in any worker thread + max_polls_count: Counter [], + /// The minimum number of tasks that have been polled in any worker thread + min_polls_count: Counter [], + /// The total number of tasks currently scheduled in workers' local queues + total_local_queue_depth: Gauge [], + /// The maximum number of tasks currently scheduled any worker's local queue + max_local_queue_depth: Gauge [], + /// The minimum number of tasks currently scheduled any worker's local queue + min_local_queue_depth: Gauge [], + /// The number of tasks currently waiting to be executed in the runtime's blocking threadpool. + blocking_queue_depth: Gauge [], + /// The current number of alive tasks in the runtime. + live_tasks_count: Gauge [], + /// The number of additional threads spawned by the runtime. + blocking_threads_count: Gauge [], + /// The number of idle threads, which have spawned by the runtime for `spawn_blocking` calls. + idle_blocking_threads_count: Gauge [], + /// Returns the number of times that tasks have been forced to yield back to the scheduler after exhausting their task budgets + budget_forced_yield_count: Counter [], + /// Returns the number of ready events processed by the runtime’s I/O driver + io_driver_ready_count: Counter [], + } } } trait MyMetricOp { @@ -442,7 +495,8 @@ trait MyMetricOp { impl MyMetricOp for (&metrics::Counter, Duration) { fn op(self, _tokio: &tokio::runtime::RuntimeMetrics) { - self.0.increment(self.1.as_micros().try_into().unwrap_or(u64::MAX)); + self.0 + .increment(self.1.as_micros().try_into().unwrap_or(u64::MAX)); } } @@ -470,6 +524,7 @@ impl MyMetricOp for (&metrics::Gauge, usize) { } } +#[cfg(tokio_unstable)] impl MyMetricOp for (&metrics::Histogram, Vec) { fn op(self, tokio: &tokio::runtime::RuntimeMetrics) { for (i, bucket) in self.1.iter().enumerate() { @@ -477,7 +532,8 @@ impl MyMetricOp for (&metrics::Histogram, Vec) { if *bucket > 0 { // emit using range.start to avoid very large numbers for open bucket // FIXME: do we want to do something else here? - self.0.record_many(range.start.as_micros() as f64, *bucket as usize); + self.0 + .record_many(range.start.as_micros() as f64, *bucket as usize); } } } @@ -486,17 +542,19 @@ impl MyMetricOp for (&metrics::Histogram, Vec) { impl fmt::Debug for RuntimeMetricsReporter { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("RuntimeMetricsReporter") - .field("interval", &self.interval) - // skip intervals field - .finish() + .field("interval", &self.interval) + // skip intervals field + .finish() } } -impl RuntimeMetricsReporter -{ +impl RuntimeMetricsReporter { /// Collect and publish metrics once to the configured [metrics_rs](metrics) reporter. pub fn run_once(&mut self) { - let metrics = self.intervals.next().expect("RuntimeIntervals::next never returns None"); + let metrics = self + .intervals + .next() + .expect("RuntimeIntervals::next never returns None"); self.emitter.emit(metrics, &self.intervals.runtime); }