Skip to content

Enhance threads stacks collection and sampling frequency on Linux #191

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ Cargo.lock
#Cargo.lock

.idea

.devcontainer/
14 changes: 11 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,27 @@ backtrace = { version = "0.3" }
once_cell = "1.9"
libc = "^0.2.66"
log = "0.4"
nix = { version = "0.26", default-features = false, features = ["signal", "fs"] }
nix = { version = "0.26", default-features = false, features = [
"signal",
"fs",
] }
parking_lot = "0.12"
tempfile = "3.1"
thiserror = "1.0"
findshlibs = "0.10"
cfg-if = "1.0"
smallvec = "1.7"

inferno = { version = "0.11", default-features = false, features = ["nameattr"], optional = true }
inferno = { version = "0.11", default-features = false, features = [
"nameattr",
], optional = true }
prost = { version = "0.11", optional = true }
prost-derive = { version = "0.11", optional = true }
protobuf = { version = "2.0", optional = true }
criterion = {version = "0.4", optional = true}
criterion = { version = "0.4", optional = true }

[target.'cfg(target_os = "linux")'.dependencies]
errno = { version = "0.2.8" }

[dependencies.symbolic-demangle]
version = "10.1"
Expand Down
159 changes: 137 additions & 22 deletions src/profiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@ use std::convert::TryInto;
use std::os::raw::c_int;
use std::time::SystemTime;

#[cfg(not(target_os = "linux"))]
use nix::sys::signal;
use once_cell::sync::Lazy;
use parking_lot::RwLock;
use smallvec::SmallVec;

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

#[cfg(any(
target_arch = "x86_64",
target_arch = "aarch64",
Expand All @@ -32,7 +36,7 @@ pub struct Profiler {
pub(crate) data: Collector<UnresolvedFrames>,
sample_counter: i32,

running: bool,
running: Arc<AtomicBool>,

#[cfg(all(any(
target_arch = "x86_64",
Expand Down Expand Up @@ -137,10 +141,19 @@ impl ProfilerGuardBuilder {
}

match profiler.start() {
Ok(()) => Ok(ProfilerGuard::<'static> {
profiler: &PROFILER,
timer: Some(Timer::new(self.frequency)),
}),
Ok(()) => {
#[cfg(target_os = "linux")]
{
let running_arc = profiler.running.clone();
std::thread::spawn(move || {
fire_rt_signals(running_arc, self.frequency as u64)
});
}
Ok(ProfilerGuard::<'static> {
profiler: &PROFILER,
timer: Some(Timer::new(self.frequency)),
})
}
Err(err) => Err(err),
}
}
Expand Down Expand Up @@ -176,6 +189,7 @@ impl ProfilerGuard<'_> {

impl<'a> Drop for ProfilerGuard<'a> {
fn drop(&mut self) {
#[cfg(not(target_os = "linux"))]
drop(self.timer.take());

match self.profiler.write().as_mut() {
Expand Down Expand Up @@ -368,7 +382,7 @@ impl Profiler {
Ok(Profiler {
data: Collector::new()?,
sample_counter: 0,
running: false,
running: Arc::new(AtomicBool::new(false)),

#[cfg(all(any(
target_arch = "x86_64",
Expand Down Expand Up @@ -399,11 +413,11 @@ impl Profiler {
impl Profiler {
pub fn start(&mut self) -> Result<()> {
log::info!("starting cpu profiler");
if self.running {
if self.running.load(Ordering::SeqCst) {
Err(Error::Running)
} else {
self.register_signal_handler()?;
self.running = true;
self.running.store(true, Ordering::SeqCst);

Ok(())
}
Expand All @@ -412,14 +426,18 @@ impl Profiler {
fn init(&mut self) -> Result<()> {
self.sample_counter = 0;
self.data = Collector::new()?;
self.running = false;

// set current running to false so that the signalling thread
// will stop accordingly
self.running.store(false, Ordering::SeqCst);
// assign a fresh Arc to running so that we can avoid
// race conditions with the signalling thread
self.running = Arc::new(AtomicBool::new(false));
Ok(())
}

pub fn stop(&mut self) -> Result<()> {
log::info!("stopping cpu profiler");
if self.running {
if self.running.load(Ordering::SeqCst) {
self.unregister_signal_handler()?;
self.init()?;

Expand All @@ -430,22 +448,52 @@ impl Profiler {
}

fn register_signal_handler(&self) -> Result<()> {
let handler = signal::SigHandler::SigAction(perf_signal_handler);
let sigaction = signal::SigAction::new(
handler,
// SA_RESTART will only restart a syscall when it's safe to do so,
// e.g. when it's a blocking read(2) or write(2). See man 7 signal.
signal::SaFlags::SA_SIGINFO | signal::SaFlags::SA_RESTART,
signal::SigSet::empty(),
);
unsafe { signal::sigaction(signal::SIGPROF, &sigaction) }?;
#[cfg(target_os = "linux")]
{
use std::mem::MaybeUninit;
let mut sigaction: libc::sigaction = unsafe { MaybeUninit::zeroed().assume_init() };
sigaction.sa_sigaction = perf_signal_handler as usize;
sigaction.sa_flags = libc::SA_RESTART | libc::SA_SIGINFO;

unsafe {
let res = libc::sigaction(
libc::SIGRTMIN(),
&sigaction as *const _,
std::ptr::null::<libc::sigaction>() as *mut libc::sigaction,
);
if res == -1 {
return Err(Error::NixError(nix::errno::from_i32(errno::errno().0)));
}
};
}

#[cfg(not(target_os = "linux"))]
{
let handler = signal::SigHandler::SigAction(perf_signal_handler);
let sigaction = signal::SigAction::new(
handler,
// SA_RESTART will only restart a syscall when it's safe to do so,
// e.g. when it's a blocking read(2) or write(2). See man 7 signal.
signal::SaFlags::SA_SIGINFO | signal::SaFlags::SA_RESTART,
signal::SigSet::empty(),
);
unsafe { signal::sigaction(signal::SIGPROF, &sigaction) }?;
}

Ok(())
}

fn unregister_signal_handler(&self) -> Result<()> {
let handler = signal::SigHandler::SigIgn;
unsafe { signal::signal(signal::SIGPROF, handler) }?;
#[cfg(target_os = "linux")]
unsafe {
let res = libc::signal(libc::SIGRTMIN(), libc::SIG_IGN);
if res == libc::SIG_ERR {
return Err(Error::NixError(nix::errno::from_i32(errno::errno().0)));
}
};

#[cfg(not(target_os = "linux"))]
unsafe { signal::signal(signal::SIGPROF, signal::SigHandler::SigIgn) }?;

Ok(())
}
Expand All @@ -465,6 +513,73 @@ impl Profiler {
}
}

// The kernel thread ID ( the same returned by a call to gettid(2))
// is not the same thing as the thread ID returned by pthread_self().
//
// Here we fetch a list of kernel thread IDs from /proc/{pid}/task
#[cfg(target_os = "linux")]
fn get_tids(pid_t: u32) -> impl Iterator<Item = u32> {
std::fs::read_dir(format!("/proc/{}/task", pid_t))
.unwrap()
.into_iter()
.filter_map(|entry| entry.map_or(None, |entry| entry.file_name().into_string().ok()))
.filter_map(|tid| tid.parse::<u32>().ok())
}

#[cfg(target_os = "linux")]
fn fire_rt_signals(running: Arc<AtomicBool>, frequency: u64) {
use std::{thread, time::Duration};

let pid = nix::unistd::Pid::this().as_raw() as u32;
let prof_tid = unsafe { libc::syscall(libc::SYS_gettid) as u32 };

let interval = 1e6 as u64 / frequency;
let tv_usec = interval % 1e6 as u64;

while running.load(Ordering::SeqCst) {
let tids = get_tids(pid);
for tid in tids {
// skip the thread running this function
if tid == prof_tid {
continue;
}
signal_thread(pid, tid, libc::SIGRTMIN());
}
thread::sleep(Duration::from_micros(tv_usec));
}
}

// Sends signal @signum to thread @tid of process group @pid
// Returns -1 on a failure and sets errno appropriately
// (see man rt_tgsigqueueinfo). Retuns 0 on success.
#[cfg(target_os = "linux")]
fn signal_thread(pid: u32, tid: u32, signum: i32) -> isize {
let mut siginfo: libc::siginfo_t = unsafe { std::mem::MaybeUninit::zeroed().assume_init() };

siginfo.si_signo = signum;
siginfo.si_code = -2; // SI_QUEUE code

// NB: we can't use the sigqueue() syscall to deliver a signal to a precise
// thread since the kernel is free to deliver such a signal to any thread of that
// process group.
// We can't use pthread_sigqueue syscall neither since it expects a p_thread
// while we've collected Kernel Thread IDs (tid).
// We will use the rt_tgsigqueueinfo instead, that sends the signal and data to the
// single thread specified by the combination of tgid, a thread group ID, and tid,
// a thread in that thread group.
//
// see: https://man7.org/linux/man-pages/man2/rt_sigqueueinfo.2.html
unsafe {
libc::syscall(
libc::SYS_rt_tgsigqueueinfo,
pid as usize,
tid as usize,
signum as usize,
&siginfo as *const _ as usize,
) as isize
}
}

#[cfg(test)]
#[cfg(target_os = "linux")]
mod tests {
Expand Down
46 changes: 30 additions & 16 deletions src/timer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use std::os::raw::c_int;
#[cfg(not(target_os = "linux"))]
use std::ptr::null_mut;
use std::time::{Duration, Instant, SystemTime};

Expand All @@ -18,10 +19,12 @@ struct Itimerval {
pub it_value: Timeval,
}

#[cfg(not(target_os = "linux"))]
extern "C" {
fn setitimer(which: c_int, new_value: *mut Itimerval, old_value: *mut Itimerval) -> c_int;
}

#[cfg(not(target_os = "linux"))]
const ITIMER_PROF: c_int = 2;

pub struct Timer {
Expand All @@ -32,23 +35,29 @@ pub struct Timer {

impl Timer {
pub fn new(frequency: c_int) -> Timer {
let interval = 1e6 as i64 / i64::from(frequency);
let it_interval = Timeval {
tv_sec: interval / 1e6 as i64,
tv_usec: interval % 1e6 as i64,
};
let it_value = it_interval.clone();
// we need to set a timer only for non-linux OS
// since for linux we'll have an external thread
// sending real-time signals to the existing threads
#[cfg(not(target_os = "linux"))]
{
let interval = 1e6 as i64 / i64::from(frequency);
let it_interval = Timeval {
tv_sec: interval / 1e6 as i64,
tv_usec: interval % 1e6 as i64,
};
let it_value = it_interval.clone();

unsafe {
setitimer(
ITIMER_PROF,
&mut Itimerval {
it_interval,
it_value,
},
null_mut(),
)
};
unsafe {
setitimer(
ITIMER_PROF,
&mut Itimerval {
it_interval,
it_value,
},
null_mut(),
)
};
}

Timer {
frequency,
Expand All @@ -68,13 +77,18 @@ impl Timer {
}
}

#[cfg(not(target_os = "linux"))]
impl Drop for Timer {
fn drop(&mut self) {
let it_interval = Timeval {
tv_sec: 0,
tv_usec: 0,
};
let it_value = it_interval.clone();

// the timer was set only for non-linux OS
// as a consequence, we'll only need to set
// it back to 0 in that case
unsafe {
setitimer(
ITIMER_PROF,
Expand Down