Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]
## Changed
## Fixed
- Lading will now ignore child processes when polling /proc if the children are
forked but not exec'd.

## [0.25.7]
## Changed
Expand Down
167 changes: 107 additions & 60 deletions lading/src/observer/linux/procfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,13 @@ mod memory;
mod stat;
mod uptime;

use std::{
collections::{VecDeque, hash_map::Entry},
io,
};
use std::{collections::VecDeque, io};

use metrics::gauge;
use metrics::{counter, gauge};
use nix::errno::Errno;
use procfs::process::Process;
use rustc_hash::{FxHashMap, FxHashSet};
use tracing::{error, warn};
use tracing::{error, info, warn};

const BYTES_PER_KIBIBYTE: u64 = 1024;

Expand Down Expand Up @@ -87,13 +84,36 @@ impl Sampler {
let mut processes_found: i32 = 0;
let mut pids_skipped: FxHashSet<i32> = FxHashSet::default();

// Clear process_info at the start of each poll. A process is capable of
// changing its details in key ways between polls.
self.process_info.clear();

// Every sample run we collect all the child processes rooted at the
// parent. As noted by the procfs documentation is this done by
// dereferencing the `/proc/<pid>/root` symlink.
let mut pids: FxHashSet<i32> = FxHashSet::default();
// We must be sure to initialize the parent process info.
let parent_pid = self.parent.pid();
let parent_info = match initialize_process_info(parent_pid).await {
Ok(Some(info)) => info,
Ok(None) => {
warn!("Could not initialize parent process info, will retry.");
return Ok(());
}
Err(e) => {
warn!("Error initializing parent process info: {:?}", e);
return Ok(());
}
};
self.process_info.insert(parent_pid, parent_info);

let mut processes: VecDeque<Process> = VecDeque::with_capacity(16); // an arbitrary smallish number
processes.push_back(Process::new(self.parent.pid())?);
pids.insert(self.parent.pid());
processes.push_back(Process::new(parent_pid)?);

// We need to track the pids of processes we've already seen to avoid
// duplicate reads of process info. A child pid may be listed multiple
// times, see below loop for details.
let mut pids: FxHashSet<i32> = FxHashSet::default();
pids.insert(parent_pid);

while let Some(process) = processes.pop_back() {
// Search for child processes. This is done by querying for every
Expand All @@ -102,6 +122,11 @@ impl Sampler {
// id equal to their pid. It's also possible for a pid to list
// itself as a child so we reference the pid hashset above to avoid
// infinite loops.
//
// We take special care to avoid processes that are forked but not
// exec'd. We do this because unexeced processes register the heap
// memory of the parent process in their smaps, leading to a double
// counting.
if let Ok(tasks) = process.tasks() {
for task in tasks.flatten() {
if let Ok(mut children) = task.children() {
Expand All @@ -111,9 +136,39 @@ impl Sampler {
{
let pid = child.pid();
if !pids.contains(&pid) {
// We have not seen this process and do need to
// record it for child scanning and sampling if
// it proves to be a process.
// This is a new process. We initialize its
// process info and then determine by
// examination of the exe/cmdline if the process
// is fork'd but not exec'd -- meaning we will
// not poll it -- or if it's a process in good
// standing and we'll attempt to poll it.
let child_info = match initialize_process_info(pid).await {
Ok(Some(info)) => info,
Ok(None) => continue,
Err(e) => {
warn!(
"Couldn't initialize process info for pid {}: {:?}",
pid, e
);
continue;
}
};

// Check if this is a forked but not execed
// process.
let parent_info = self
.process_info
.get(&process.pid())
.expect("parent process info should exist");
if child_info.exe == parent_info.exe
&& child_info.cmdline == parent_info.cmdline
{
counter!("process_skipped").increment(1);
info!("Skipping process {pid}: {child_info:?}");
continue;
}

self.process_info.insert(pid, child_info);
processes.push_back(child);
pids.insert(pid);
}
Expand All @@ -137,9 +192,6 @@ impl Sampler {
}
}

// Update the process_info map to only hold processes seen by the current poll call.
self.process_info.retain(|pid, _| pids.contains(pid));

gauge!("total_rss_bytes").set(aggr.rss as f64);
gauge!("total_pss_bytes").set(aggr.pss as f64);
gauge!("processes_found").set(processes_found as f64);
Expand All @@ -157,7 +209,7 @@ impl Sampler {
}

/// Handle a process. Returns true if the process was handled successfully,
/// false if it was skipped for any reason.
/// false if it was skipped for any reason.
#[allow(
clippy::similar_names,
clippy::too_many_lines,
Expand Down Expand Up @@ -188,50 +240,8 @@ impl Sampler {
return Ok(false);
}

// If we haven't seen this process before, initialize its ProcessInfo.
match self.process_info.entry(pid) {
Entry::Occupied(_) => { /* Already initialized */ }
Entry::Vacant(entry) => {
let exe = match proc_exe(pid).await {
Ok(exe) => exe,
Err(e) => {
warn!("Couldn't read exe for pid {}: {:?}", pid, e);
// The pid may have exited since we scanned it or we may not
// have sufficient permission.
return Ok(false);
}
};
let comm = match proc_comm(pid).await {
Ok(comm) => comm,
Err(e) => {
warn!("Couldn't read comm for pid {}: {:?}", pid, e);
// The pid may have exited since we scanned it or we may not
// have sufficient permission.
return Ok(false);
}
};
let cmdline = match proc_cmdline(pid).await {
Ok(cmdline) => cmdline,
Err(e) => {
warn!("Couldn't read cmdline for pid {}: {:?}", pid, e);
// The pid may have exited since we scanned it or we may not
// have sufficient permission.
return Ok(false);
}
};
let pid_s = format!("{pid}");
let stat_sampler = stat::Sampler::new();
entry.insert(ProcessInfo {
cmdline,
exe,
comm,
pid_s,
stat_sampler,
});
}
}

// SAFETY: We've just inserted this pid into the map.
// SAFETY: We've inserted process info into this map when polling for
// child processes.
let pinfo = self
.process_info
.get_mut(&pid)
Expand Down Expand Up @@ -348,6 +358,43 @@ impl Sampler {
}
}

/// Initialize [`ProcessInfo`] for a process. Returns `None` if the process should be skipped.
async fn initialize_process_info(pid: i32) -> Result<Option<ProcessInfo>, Error> {
let exe = match proc_exe(pid).await {
Ok(exe) => exe,
Err(e) => {
warn!("Couldn't read exe for pid {}: {:?}", pid, e);
return Ok(None);
}
};
let comm = match proc_comm(pid).await {
Ok(comm) => comm,
Err(e) => {
warn!("Couldn't read comm for pid {}: {:?}", pid, e);
return Ok(None);
}
};
let cmdline = match proc_cmdline(pid).await {
Ok(cmdline) => cmdline,
Err(e) => {
warn!("Couldn't read cmdline for pid {}: {:?}", pid, e);
return Ok(None);
}
};

let pid_s = format!("{pid}");
let stat_sampler = stat::Sampler::new();
let info = ProcessInfo {
cmdline,
exe,
comm,
pid_s,
stat_sampler,
};

Ok(Some(info))
}

/// Read `/proc/{pid}/comm`
async fn proc_comm(pid: i32) -> Result<String, Error> {
let comm_path = format!("/proc/{pid}/comm");
Expand Down
Loading