diff --git a/src/metrics.rs b/src/metrics.rs index 7152c3b..b20fef1 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -71,6 +71,16 @@ lazy_static! { ) .unwrap(); + /// Total time worker threads are active (not parked), in seconds. + pub static ref WORKER_ACTIVE_SECONDS: CounterVec = CounterVec::new( + new_opts( + "yatp_worker_active_seconds_total", + "total time worker threads are active (not parked) in seconds" + ), + &["name"] + ) + .unwrap(); + static ref NAMESPACE: Mutex> = Mutex::new(None); } diff --git a/src/pool/builder.rs b/src/pool/builder.rs index 29e3be3..a9b24d4 100644 --- a/src/pool/builder.rs +++ b/src/pool/builder.rs @@ -1,5 +1,6 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. +use crate::metrics::WORKER_ACTIVE_SECONDS; use crate::pool::spawn::QueueCore; use crate::pool::worker::WorkerThread; use crate::pool::{CloneRunnerBuilder, Local, Remote, Runner, RunnerBuilder, ThreadPool}; @@ -99,6 +100,9 @@ where F::Runner: Runner + Send + 'static, { let mut threads = Vec::with_capacity(self.builder.sched_config.max_thread_count); + let worker_active_seconds = WORKER_ACTIVE_SECONDS + .get_metric_with_label_values(&[self.builder.name_prefix.as_str()]) + .unwrap(); for (i, local_queue) in self.local_queues.into_iter().enumerate() { let runner = factory.build(); let name = format!("{}-{}", self.builder.name_prefix, i); @@ -106,7 +110,8 @@ where if let Some(size) = self.builder.stack_size { builder = builder.stack_size(size) } - let local = Local::new(i + 1, local_queue, self.core.clone()); + let mut local = Local::new(i + 1, local_queue, self.core.clone()); + local.enable_worker_activity(worker_active_seconds.local()); let thd = WorkerThread::new(local, runner); threads.push( builder diff --git a/src/pool/spawn.rs b/src/pool/spawn.rs index f66f91a..f006bac 100644 --- a/src/pool/spawn.rs +++ b/src/pool/spawn.rs @@ -8,10 +8,12 @@ use crate::pool::SchedConfig; use crate::queue::{Extras, LocalQueue, Pop, TaskCell, TaskInjector, WithExtras}; use fail::fail_point; use parking_lot_core::{FilterOp, ParkResult, ParkToken, UnparkToken}; +use prometheus::local::LocalCounter; use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, Weak, }; +use std::time::Instant; /// An usize is used to trace the threads that are working actively. /// To save additional memory and atomic operation, the number and @@ -25,6 +27,8 @@ use std::sync::{ const SHUTDOWN_BIT: usize = 1; const WORKER_COUNT_SHIFT: usize = 1; const WORKER_COUNT_BASE: usize = 2; +const ACTIVE_TIME_FLUSH_THRESHOLD_SECS: f64 = 1.0; +const ACTIVE_TIME_SAMPLE_EVERY_TASKS: u32 = 256; /// Checks if shutdown bit is set. pub fn is_shutdown(cnt: usize) -> bool { @@ -208,8 +212,10 @@ impl Clone for Remote { /// Note that implements of Runner assumes `Remote` is `Sync` and `Send`. /// So we need to use assert trait to ensure the constraint at compile time /// to avoid future breaks. +#[allow(dead_code)] trait AssertSync: Sync {} impl AssertSync for Remote {} +#[allow(dead_code)] trait AssertSend: Send {} impl AssertSend for Remote {} @@ -240,6 +246,81 @@ impl Clone for WeakRemote { impl AssertSync for WeakRemote {} impl AssertSend for WeakRemote {} +struct WorkerActivity { + counter: LocalCounter, + active_since: Option, + ticks: u32, +} + +impl WorkerActivity { + fn new(counter: LocalCounter) -> WorkerActivity { + WorkerActivity { + counter, + active_since: None, + ticks: 0, + } + } + + #[inline] + fn on_start(&mut self) { + self.active_since = Some(Instant::now()); + } + + #[inline] + fn on_task_complete(&mut self) { + self.ticks = self.ticks.wrapping_add(1); + if self.ticks == ACTIVE_TIME_SAMPLE_EVERY_TASKS { + self.ticks = 0; + self.checkpoint(); + } + } + + #[inline] + fn on_park(&mut self) { + let now = Instant::now(); + self.add_elapsed(now); + self.active_since = None; + self.counter.flush(); + } + + #[inline] + fn on_unpark(&mut self) { + if self.active_since.is_none() { + self.active_since = Some(Instant::now()); + } + } + + #[inline] + fn on_end(&mut self) { + let now = Instant::now(); + self.add_elapsed(now); + self.active_since = None; + self.counter.flush(); + } + + #[inline] + fn checkpoint(&mut self) { + if self.active_since.is_none() { + return; + } + let now = Instant::now(); + self.add_elapsed(now); + self.active_since = Some(now); + if self.counter.get() >= ACTIVE_TIME_FLUSH_THRESHOLD_SECS { + self.counter.flush(); + } + } + + #[inline] + fn add_elapsed(&mut self, now: Instant) { + if let Some(since) = self.active_since { + let elapsed = now.saturating_duration_since(since); + if !elapsed.is_zero() { + self.counter.inc_by(elapsed.as_secs_f64()); + } + } + } +} /// Spawns tasks to the associated thread pool. /// @@ -250,6 +331,7 @@ pub struct Local { id: usize, local_queue: LocalQueue, core: Arc>, + activity: Option, } impl Local { @@ -258,6 +340,7 @@ impl Local { id, local_queue, core, + activity: None, } } @@ -297,21 +380,31 @@ impl Local { /// If there are no tasks at the moment, it will go to sleep until woken /// up by other threads. pub(crate) fn pop_or_sleep(&mut self) -> Option> { - let address = &*self.core as *const QueueCore as usize; + let Local { + id, + local_queue, + core, + activity, + } = self; + let address = &**core as *const QueueCore as usize; let mut task = None; - let id = self.id; + let id = *id; let res = unsafe { parking_lot_core::park( address, || { - if !self.core.mark_sleep() { + if !core.mark_sleep() { return false; } - task = self.local_queue.pop(); + task = local_queue.pop(); task.is_none() }, - || {}, + || { + if let Some(activity) = activity.as_mut() { + activity.on_park(); + } + }, |_, _| {}, ParkToken(id), None, @@ -319,7 +412,10 @@ impl Local { }; match res { ParkResult::Unparked(_) | ParkResult::Invalid => { - self.core.mark_woken(); + core.mark_woken(); + if let Some(activity) = activity.as_mut() { + activity.on_unpark(); + } task } ParkResult::TimedOut => unreachable!(), @@ -333,6 +429,28 @@ impl Local { fail_point!("need-preempt", |r| { r.unwrap().parse().unwrap() }); self.local_queue.has_tasks_or_pull() } + + pub(crate) fn enable_worker_activity(&mut self, counter: LocalCounter) { + self.activity = Some(WorkerActivity::new(counter)); + } + + pub(crate) fn on_worker_start(&mut self) { + if let Some(activity) = self.activity.as_mut() { + activity.on_start(); + } + } + + pub(crate) fn on_worker_end(&mut self) { + if let Some(activity) = self.activity.as_mut() { + activity.on_end(); + } + } + + pub(crate) fn on_task_complete(&mut self) { + if let Some(activity) = self.activity.as_mut() { + activity.on_task_complete(); + } + } } /// Building remotes and locals from the given queue and configuration. diff --git a/src/pool/tests.rs b/src/pool/tests.rs index 9b96737..b649b42 100644 --- a/src/pool/tests.rs +++ b/src/pool/tests.rs @@ -1,9 +1,11 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. +use crate::metrics::WORKER_ACTIVE_SECONDS; use crate::pool::*; use crate::task::callback::Handle; use futures_timer::Delay; use rand::seq::SliceRandom; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::mpsc; use std::thread; use std::time::*; @@ -294,3 +296,28 @@ fn test_scale_down_workers() { pool.shutdown(); } + +#[test] +fn test_worker_active_seconds_metric() { + static ID: AtomicUsize = AtomicUsize::new(0); + let name = format!( + "test_worker_active_seconds_{}", + ID.fetch_add(1, Ordering::SeqCst) + ); + let pool = Builder::new(name.clone()) + .max_thread_count(1) + .build_callback_pool(); + let (tx, rx) = mpsc::channel(); + pool.spawn(move |_: &mut Handle<'_>| { + thread::sleep(Duration::from_millis(10)); + tx.send(()).unwrap(); + }); + rx.recv_timeout(Duration::from_secs(1)).unwrap(); + pool.shutdown(); + + let value = WORKER_ACTIVE_SECONDS + .get_metric_with_label_values(&[name.as_str()]) + .unwrap() + .get(); + assert!(value > 0.0); +} diff --git a/src/pool/worker.rs b/src/pool/worker.rs index 0af3ef8..10139e6 100644 --- a/src/pool/worker.rs +++ b/src/pool/worker.rs @@ -40,17 +40,21 @@ where pub fn run(mut self) { self.runner.start(&mut self.local); + self.local.on_worker_start(); while !self.local.core().is_shutdown() { let task = match self.pop() { Some(t) => t, None => continue, }; self.runner.handle(&mut self.local, task.task_cell); + self.local.on_task_complete(); } self.runner.end(&mut self.local); // Drain all futures in the queue while self.local.pop().is_some() {} + + self.local.on_worker_end(); } }