Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ lazy_static! {
)
.unwrap();

/// Total time worker threads are active (not parked), in seconds.
pub static ref WORKER_ACTIVE_SECONDS: CounterVec = CounterVec::new(
new_opts(
"yatp_worker_active_seconds_total",
"total time worker threads are active (not parked) in seconds"
),
&["name"]
)
.unwrap();

static ref NAMESPACE: Mutex<Option<String>> = Mutex::new(None);
}

Expand Down
7 changes: 6 additions & 1 deletion src/pool/builder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use crate::metrics::WORKER_ACTIVE_SECONDS;
use crate::pool::spawn::QueueCore;
use crate::pool::worker::WorkerThread;
use crate::pool::{CloneRunnerBuilder, Local, Remote, Runner, RunnerBuilder, ThreadPool};
Expand Down Expand Up @@ -99,14 +100,18 @@ where
F::Runner: Runner<TaskCell = T> + Send + 'static,
{
let mut threads = Vec::with_capacity(self.builder.sched_config.max_thread_count);
let worker_active_seconds = WORKER_ACTIVE_SECONDS
.get_metric_with_label_values(&[self.builder.name_prefix.as_str()])
.unwrap();
for (i, local_queue) in self.local_queues.into_iter().enumerate() {
let runner = factory.build();
let name = format!("{}-{}", self.builder.name_prefix, i);
let mut builder = thread::Builder::new().name(name);
if let Some(size) = self.builder.stack_size {
builder = builder.stack_size(size)
}
let local = Local::new(i + 1, local_queue, self.core.clone());
let mut local = Local::new(i + 1, local_queue, self.core.clone());
local.enable_worker_activity(worker_active_seconds.local());
let thd = WorkerThread::new(local, runner);
threads.push(
builder
Expand Down
130 changes: 124 additions & 6 deletions src/pool/spawn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@ use crate::pool::SchedConfig;
use crate::queue::{Extras, LocalQueue, Pop, TaskCell, TaskInjector, WithExtras};
use fail::fail_point;
use parking_lot_core::{FilterOp, ParkResult, ParkToken, UnparkToken};
use prometheus::local::LocalCounter;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc, Weak,
};
use std::time::Instant;

/// An usize is used to trace the threads that are working actively.
/// To save additional memory and atomic operation, the number and
Expand All @@ -25,6 +27,8 @@ use std::sync::{
const SHUTDOWN_BIT: usize = 1;
const WORKER_COUNT_SHIFT: usize = 1;
const WORKER_COUNT_BASE: usize = 2;
const ACTIVE_TIME_FLUSH_THRESHOLD_SECS: f64 = 1.0;
const ACTIVE_TIME_SAMPLE_EVERY_TASKS: u32 = 256;

/// Checks if shutdown bit is set.
pub fn is_shutdown(cnt: usize) -> bool {
Expand Down Expand Up @@ -208,8 +212,10 @@ impl<T> Clone for Remote<T> {
/// Note that implements of Runner assumes `Remote` is `Sync` and `Send`.
/// So we need to use assert trait to ensure the constraint at compile time
/// to avoid future breaks.
#[allow(dead_code)]
trait AssertSync: Sync {}
impl<T: Send> AssertSync for Remote<T> {}
#[allow(dead_code)]
trait AssertSend: Send {}
impl<T: Send> AssertSend for Remote<T> {}

Expand Down Expand Up @@ -240,6 +246,81 @@ impl<T> Clone for WeakRemote<T> {

impl<T: Send> AssertSync for WeakRemote<T> {}
impl<T: Send> AssertSend for WeakRemote<T> {}
struct WorkerActivity {
counter: LocalCounter,
active_since: Option<Instant>,
ticks: u32,
}

impl WorkerActivity {
fn new(counter: LocalCounter) -> WorkerActivity {
WorkerActivity {
counter,
active_since: None,
ticks: 0,
}
}

#[inline]
fn on_start(&mut self) {
self.active_since = Some(Instant::now());
}

#[inline]
fn on_task_complete(&mut self) {
self.ticks = self.ticks.wrapping_add(1);
if self.ticks == ACTIVE_TIME_SAMPLE_EVERY_TASKS {
self.ticks = 0;
self.checkpoint();
}
}

#[inline]
fn on_park(&mut self) {
let now = Instant::now();
self.add_elapsed(now);
self.active_since = None;
self.counter.flush();
}

#[inline]
fn on_unpark(&mut self) {
if self.active_since.is_none() {
self.active_since = Some(Instant::now());
}
}

#[inline]
fn on_end(&mut self) {
let now = Instant::now();
self.add_elapsed(now);
self.active_since = None;
self.counter.flush();
}

#[inline]
fn checkpoint(&mut self) {
if self.active_since.is_none() {
return;
}
let now = Instant::now();
self.add_elapsed(now);
self.active_since = Some(now);
if self.counter.get() >= ACTIVE_TIME_FLUSH_THRESHOLD_SECS {
self.counter.flush();
}
}

#[inline]
fn add_elapsed(&mut self, now: Instant) {
if let Some(since) = self.active_since {
let elapsed = now.saturating_duration_since(since);
if !elapsed.is_zero() {
self.counter.inc_by(elapsed.as_secs_f64());
}
}
}
}

/// Spawns tasks to the associated thread pool.
///
Expand All @@ -250,6 +331,7 @@ pub struct Local<T> {
id: usize,
local_queue: LocalQueue<T>,
core: Arc<QueueCore<T>>,
activity: Option<WorkerActivity>,
}

impl<T: TaskCell + Send> Local<T> {
Expand All @@ -258,6 +340,7 @@ impl<T: TaskCell + Send> Local<T> {
id,
local_queue,
core,
activity: None,
}
}

Expand Down Expand Up @@ -297,29 +380,42 @@ impl<T: TaskCell + Send> Local<T> {
/// If there are no tasks at the moment, it will go to sleep until woken
/// up by other threads.
pub(crate) fn pop_or_sleep(&mut self) -> Option<Pop<T>> {
let address = &*self.core as *const QueueCore<T> as usize;
let Local {
id,
local_queue,
core,
activity,
} = self;
let address = &**core as *const QueueCore<T> as usize;
let mut task = None;
let id = self.id;
let id = *id;

let res = unsafe {
parking_lot_core::park(
address,
|| {
if !self.core.mark_sleep() {
if !core.mark_sleep() {
return false;
}
task = self.local_queue.pop();
task = local_queue.pop();
task.is_none()
},
|| {},
|| {
if let Some(activity) = activity.as_mut() {
activity.on_park();
}
},
|_, _| {},
ParkToken(id),
None,
)
};
match res {
ParkResult::Unparked(_) | ParkResult::Invalid => {
self.core.mark_woken();
core.mark_woken();
if let Some(activity) = activity.as_mut() {
activity.on_unpark();
}
task
}
ParkResult::TimedOut => unreachable!(),
Expand All @@ -333,6 +429,28 @@ impl<T: TaskCell + Send> Local<T> {
fail_point!("need-preempt", |r| { r.unwrap().parse().unwrap() });
self.local_queue.has_tasks_or_pull()
}

pub(crate) fn enable_worker_activity(&mut self, counter: LocalCounter) {
self.activity = Some(WorkerActivity::new(counter));
}

pub(crate) fn on_worker_start(&mut self) {
if let Some(activity) = self.activity.as_mut() {
activity.on_start();
}
}

pub(crate) fn on_worker_end(&mut self) {
if let Some(activity) = self.activity.as_mut() {
activity.on_end();
}
}

pub(crate) fn on_task_complete(&mut self) {
if let Some(activity) = self.activity.as_mut() {
activity.on_task_complete();
}
}
}

/// Building remotes and locals from the given queue and configuration.
Expand Down
27 changes: 27 additions & 0 deletions src/pool/tests.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use crate::metrics::WORKER_ACTIVE_SECONDS;
use crate::pool::*;
use crate::task::callback::Handle;
use futures_timer::Delay;
use rand::seq::SliceRandom;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc;
use std::thread;
use std::time::*;
Expand Down Expand Up @@ -294,3 +296,28 @@ fn test_scale_down_workers() {

pool.shutdown();
}

#[test]
fn test_worker_active_seconds_metric() {
static ID: AtomicUsize = AtomicUsize::new(0);
let name = format!(
"test_worker_active_seconds_{}",
ID.fetch_add(1, Ordering::SeqCst)
);
let pool = Builder::new(name.clone())
.max_thread_count(1)
.build_callback_pool();
let (tx, rx) = mpsc::channel();
pool.spawn(move |_: &mut Handle<'_>| {
thread::sleep(Duration::from_millis(10));
tx.send(()).unwrap();
});
rx.recv_timeout(Duration::from_secs(1)).unwrap();
pool.shutdown();

let value = WORKER_ACTIVE_SECONDS
.get_metric_with_label_values(&[name.as_str()])
.unwrap()
.get();
assert!(value > 0.0);
}
4 changes: 4 additions & 0 deletions src/pool/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,21 @@ where

pub fn run(mut self) {
self.runner.start(&mut self.local);
self.local.on_worker_start();
while !self.local.core().is_shutdown() {
let task = match self.pop() {
Some(t) => t,
None => continue,
};
self.runner.handle(&mut self.local, task.task_cell);
self.local.on_task_complete();
}
self.runner.end(&mut self.local);

// Drain all futures in the queue
while self.local.pop().is_some() {}

self.local.on_worker_end();
}
}

Expand Down
Loading