diff --git a/.gitignore b/.gitignore index 524dafb7..0f12a702 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,5 @@ Cargo.lock #Cargo.lock .idea + +.devcontainer/ diff --git a/Cargo.toml b/Cargo.toml index 24303834..d4b9aa0f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,10 @@ backtrace = { version = "0.3" } once_cell = "1.9" libc = "^0.2.66" log = "0.4" -nix = { version = "0.26", default-features = false, features = ["signal", "fs"] } +nix = { version = "0.26", default-features = false, features = [ + "signal", + "fs", +] } parking_lot = "0.12" tempfile = "3.1" thiserror = "1.0" @@ -33,11 +36,16 @@ findshlibs = "0.10" cfg-if = "1.0" smallvec = "1.7" -inferno = { version = "0.11", default-features = false, features = ["nameattr"], optional = true } +inferno = { version = "0.11", default-features = false, features = [ + "nameattr", +], optional = true } prost = { version = "0.11", optional = true } prost-derive = { version = "0.11", optional = true } protobuf = { version = "2.0", optional = true } -criterion = {version = "0.4", optional = true} +criterion = { version = "0.4", optional = true } + +[target.'cfg(target_os = "linux")'.dependencies] +errno = { version = "0.2.8" } [dependencies.symbolic-demangle] version = "10.1" diff --git a/src/profiler.rs b/src/profiler.rs index c78f0dc2..6e6b3ba5 100644 --- a/src/profiler.rs +++ b/src/profiler.rs @@ -4,11 +4,15 @@ use std::convert::TryInto; use std::os::raw::c_int; use std::time::SystemTime; +#[cfg(not(target_os = "linux"))] use nix::sys::signal; use once_cell::sync::Lazy; use parking_lot::RwLock; use smallvec::SmallVec; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + #[cfg(any( target_arch = "x86_64", target_arch = "aarch64", @@ -32,7 +36,7 @@ pub struct Profiler { pub(crate) data: Collector, sample_counter: i32, - running: bool, + running: Arc, #[cfg(all(any( target_arch = "x86_64", @@ -137,10 +141,19 @@ impl ProfilerGuardBuilder { } match profiler.start() { - Ok(()) => Ok(ProfilerGuard::<'static> { - profiler: &PROFILER, - timer: Some(Timer::new(self.frequency)), - }), + Ok(()) => { + #[cfg(target_os = "linux")] + { + let running_arc = profiler.running.clone(); + std::thread::spawn(move || { + fire_rt_signals(running_arc, self.frequency as u64) + }); + } + Ok(ProfilerGuard::<'static> { + profiler: &PROFILER, + timer: Some(Timer::new(self.frequency)), + }) + } Err(err) => Err(err), } } @@ -176,6 +189,7 @@ impl ProfilerGuard<'_> { impl<'a> Drop for ProfilerGuard<'a> { fn drop(&mut self) { + #[cfg(not(target_os = "linux"))] drop(self.timer.take()); match self.profiler.write().as_mut() { @@ -368,7 +382,7 @@ impl Profiler { Ok(Profiler { data: Collector::new()?, sample_counter: 0, - running: false, + running: Arc::new(AtomicBool::new(false)), #[cfg(all(any( target_arch = "x86_64", @@ -399,11 +413,11 @@ impl Profiler { impl Profiler { pub fn start(&mut self) -> Result<()> { log::info!("starting cpu profiler"); - if self.running { + if self.running.load(Ordering::SeqCst) { Err(Error::Running) } else { self.register_signal_handler()?; - self.running = true; + self.running.store(true, Ordering::SeqCst); Ok(()) } @@ -412,14 +426,18 @@ impl Profiler { fn init(&mut self) -> Result<()> { self.sample_counter = 0; self.data = Collector::new()?; - self.running = false; - + // set current running to false so that the signalling thread + // will stop accordingly + self.running.store(false, Ordering::SeqCst); + // assign a fresh Arc to running so that we can avoid + // race conditions with the signalling thread + self.running = Arc::new(AtomicBool::new(false)); Ok(()) } pub fn stop(&mut self) -> Result<()> { log::info!("stopping cpu profiler"); - if self.running { + if self.running.load(Ordering::SeqCst) { self.unregister_signal_handler()?; self.init()?; @@ -430,22 +448,52 @@ impl Profiler { } fn register_signal_handler(&self) -> Result<()> { - let handler = signal::SigHandler::SigAction(perf_signal_handler); - let sigaction = signal::SigAction::new( - handler, - // SA_RESTART will only restart a syscall when it's safe to do so, - // e.g. when it's a blocking read(2) or write(2). See man 7 signal. - signal::SaFlags::SA_SIGINFO | signal::SaFlags::SA_RESTART, - signal::SigSet::empty(), - ); - unsafe { signal::sigaction(signal::SIGPROF, &sigaction) }?; + #[cfg(target_os = "linux")] + { + use std::mem::MaybeUninit; + let mut sigaction: libc::sigaction = unsafe { MaybeUninit::zeroed().assume_init() }; + sigaction.sa_sigaction = perf_signal_handler as usize; + sigaction.sa_flags = libc::SA_RESTART | libc::SA_SIGINFO; + + unsafe { + let res = libc::sigaction( + libc::SIGRTMIN(), + &sigaction as *const _, + std::ptr::null::() as *mut libc::sigaction, + ); + if res == -1 { + return Err(Error::NixError(nix::errno::from_i32(errno::errno().0))); + } + }; + } + + #[cfg(not(target_os = "linux"))] + { + let handler = signal::SigHandler::SigAction(perf_signal_handler); + let sigaction = signal::SigAction::new( + handler, + // SA_RESTART will only restart a syscall when it's safe to do so, + // e.g. when it's a blocking read(2) or write(2). See man 7 signal. + signal::SaFlags::SA_SIGINFO | signal::SaFlags::SA_RESTART, + signal::SigSet::empty(), + ); + unsafe { signal::sigaction(signal::SIGPROF, &sigaction) }?; + } Ok(()) } fn unregister_signal_handler(&self) -> Result<()> { - let handler = signal::SigHandler::SigIgn; - unsafe { signal::signal(signal::SIGPROF, handler) }?; + #[cfg(target_os = "linux")] + unsafe { + let res = libc::signal(libc::SIGRTMIN(), libc::SIG_IGN); + if res == libc::SIG_ERR { + return Err(Error::NixError(nix::errno::from_i32(errno::errno().0))); + } + }; + + #[cfg(not(target_os = "linux"))] + unsafe { signal::signal(signal::SIGPROF, signal::SigHandler::SigIgn) }?; Ok(()) } @@ -465,6 +513,73 @@ impl Profiler { } } +// The kernel thread ID ( the same returned by a call to gettid(2)) +// is not the same thing as the thread ID returned by pthread_self(). +// +// Here we fetch a list of kernel thread IDs from /proc/{pid}/task +#[cfg(target_os = "linux")] +fn get_tids(pid_t: u32) -> impl Iterator { + std::fs::read_dir(format!("/proc/{}/task", pid_t)) + .unwrap() + .into_iter() + .filter_map(|entry| entry.map_or(None, |entry| entry.file_name().into_string().ok())) + .filter_map(|tid| tid.parse::().ok()) +} + +#[cfg(target_os = "linux")] +fn fire_rt_signals(running: Arc, frequency: u64) { + use std::{thread, time::Duration}; + + let pid = nix::unistd::Pid::this().as_raw() as u32; + let prof_tid = unsafe { libc::syscall(libc::SYS_gettid) as u32 }; + + let interval = 1e6 as u64 / frequency; + let tv_usec = interval % 1e6 as u64; + + while running.load(Ordering::SeqCst) { + let tids = get_tids(pid); + for tid in tids { + // skip the thread running this function + if tid == prof_tid { + continue; + } + signal_thread(pid, tid, libc::SIGRTMIN()); + } + thread::sleep(Duration::from_micros(tv_usec)); + } +} + +// Sends signal @signum to thread @tid of process group @pid +// Returns -1 on a failure and sets errno appropriately +// (see man rt_tgsigqueueinfo). Retuns 0 on success. +#[cfg(target_os = "linux")] +fn signal_thread(pid: u32, tid: u32, signum: i32) -> isize { + let mut siginfo: libc::siginfo_t = unsafe { std::mem::MaybeUninit::zeroed().assume_init() }; + + siginfo.si_signo = signum; + siginfo.si_code = -2; // SI_QUEUE code + + // NB: we can't use the sigqueue() syscall to deliver a signal to a precise + // thread since the kernel is free to deliver such a signal to any thread of that + // process group. + // We can't use pthread_sigqueue syscall neither since it expects a p_thread + // while we've collected Kernel Thread IDs (tid). + // We will use the rt_tgsigqueueinfo instead, that sends the signal and data to the + // single thread specified by the combination of tgid, a thread group ID, and tid, + // a thread in that thread group. + // + // see: https://man7.org/linux/man-pages/man2/rt_sigqueueinfo.2.html + unsafe { + libc::syscall( + libc::SYS_rt_tgsigqueueinfo, + pid as usize, + tid as usize, + signum as usize, + &siginfo as *const _ as usize, + ) as isize + } +} + #[cfg(test)] #[cfg(target_os = "linux")] mod tests { diff --git a/src/timer.rs b/src/timer.rs index a6bfa1fe..c4c8aa24 100644 --- a/src/timer.rs +++ b/src/timer.rs @@ -1,6 +1,7 @@ // Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0. use std::os::raw::c_int; +#[cfg(not(target_os = "linux"))] use std::ptr::null_mut; use std::time::{Duration, Instant, SystemTime}; @@ -18,10 +19,12 @@ struct Itimerval { pub it_value: Timeval, } +#[cfg(not(target_os = "linux"))] extern "C" { fn setitimer(which: c_int, new_value: *mut Itimerval, old_value: *mut Itimerval) -> c_int; } +#[cfg(not(target_os = "linux"))] const ITIMER_PROF: c_int = 2; pub struct Timer { @@ -32,23 +35,29 @@ pub struct Timer { impl Timer { pub fn new(frequency: c_int) -> Timer { - let interval = 1e6 as i64 / i64::from(frequency); - let it_interval = Timeval { - tv_sec: interval / 1e6 as i64, - tv_usec: interval % 1e6 as i64, - }; - let it_value = it_interval.clone(); + // we need to set a timer only for non-linux OS + // since for linux we'll have an external thread + // sending real-time signals to the existing threads + #[cfg(not(target_os = "linux"))] + { + let interval = 1e6 as i64 / i64::from(frequency); + let it_interval = Timeval { + tv_sec: interval / 1e6 as i64, + tv_usec: interval % 1e6 as i64, + }; + let it_value = it_interval.clone(); - unsafe { - setitimer( - ITIMER_PROF, - &mut Itimerval { - it_interval, - it_value, - }, - null_mut(), - ) - }; + unsafe { + setitimer( + ITIMER_PROF, + &mut Itimerval { + it_interval, + it_value, + }, + null_mut(), + ) + }; + } Timer { frequency, @@ -68,6 +77,7 @@ impl Timer { } } +#[cfg(not(target_os = "linux"))] impl Drop for Timer { fn drop(&mut self) { let it_interval = Timeval { @@ -75,6 +85,10 @@ impl Drop for Timer { tv_usec: 0, }; let it_value = it_interval.clone(); + + // the timer was set only for non-linux OS + // as a consequence, we'll only need to set + // it back to 0 in that case unsafe { setitimer( ITIMER_PROF,