Skip to content

Commit 1850204

Browse files
committed
[CONTINT-4562] Add WSS measurement (Working Set Size)
1 parent 6c6a683 commit 1850204

File tree

5 files changed

+330
-0
lines changed

5 files changed

+330
-0
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lading/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ flate2 = { version = "1.1.1", default-features = false, features = [
3636
] }
3737
futures = "0.3.31"
3838
fuser = { version = "0.15", optional = true }
39+
page_size = { version = "0.6.0" }
3940
heck = { version = "0.5", default-features = false }
4041
http = { workspace = true }
4142
http-body-util = { workspace = true }

lading/src/observer/linux.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
mod cgroup;
22
mod procfs;
3+
mod wss;
34

45
use tracing::error;
56

@@ -12,24 +13,32 @@ pub enum Error {
1213
/// Wrapper for [`procfs::Error`]
1314
#[error("Procfs: {0}")]
1415
Procfs(#[from] procfs::Error),
16+
/// Wrapper for [`wss::Error`]
17+
#[error("WSS: {0}")]
18+
Wss(#[from] wss::Error),
1519
}
1620

1721
#[derive(Debug)]
1822
pub(crate) struct Sampler {
1923
procfs: procfs::Sampler,
2024
cgroup: cgroup::Sampler,
25+
wss: wss::Sampler,
2126
smaps_interval: u8,
27+
last_wss_sample: tokio::time::Instant,
2228
}
2329

2430
impl Sampler {
2531
pub(crate) fn new(parent_pid: i32, labels: Vec<(String, String)>) -> Result<Self, Error> {
2632
let procfs_sampler = procfs::Sampler::new(parent_pid)?;
2733
let cgroup_sampler = cgroup::Sampler::new(parent_pid, labels)?;
34+
let wss_sampler = wss::Sampler::new(parent_pid)?;
2835

2936
Ok(Self {
3037
procfs: procfs_sampler,
3138
cgroup: cgroup_sampler,
39+
wss: wss_sampler,
3240
smaps_interval: 10,
41+
last_wss_sample: tokio::time::Instant::now() - tokio::time::Duration::from_secs(61),
3342
})
3443
}
3544

@@ -45,6 +54,14 @@ impl Sampler {
4554
self.procfs.poll(sample_smaps).await?;
4655
self.cgroup.poll().await?;
4756

57+
// WSS measures the amount of memory that has been accessed since the last poll.
58+
// As a consequence, the poll interval impacts the measure.
59+
// That’s why we need to be sure we don’t poll more often than once per minute.
60+
if self.last_wss_sample.elapsed() > tokio::time::Duration::from_secs(60) {
61+
self.wss.poll().await?;
62+
self.last_wss_sample = tokio::time::Instant::now();
63+
}
64+
4865
Ok(())
4966
}
5067
}

lading/src/observer/linux/wss.rs

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
use metrics::gauge;
2+
use procfs::process::{MemoryMap, MemoryPageFlags, PageInfo, Pfn, Process};
3+
use std::collections::HashMap;
4+
use std::io::{Read, Seek, SeekFrom, Write};
5+
use tracing::debug;
6+
7+
const PAGE_OFFSET: u64 = 0xffff_8800_0000_0000;
8+
9+
#[derive(thiserror::Error, Debug)]
10+
pub enum Error {
11+
/// Wrapper for [`std::io::Error`]
12+
#[error("IO error: {0}")]
13+
Io(#[from] std::io::Error),
14+
/// Wrapper for [`procfs::ProcError`]
15+
#[error("Unable to read procfs: {0}")]
16+
Proc(#[from] procfs::ProcError),
17+
}
18+
19+
#[derive(Debug)]
20+
pub(crate) struct Sampler {
21+
parent_pid: i32,
22+
}
23+
24+
impl Sampler {
25+
pub(crate) fn new(parent_pid: i32) -> Result<Self, Error> {
26+
Ok(Self { parent_pid })
27+
}
28+
29+
pub(crate) async fn poll(&mut self) -> Result<(), Error> {
30+
let page_size = page_size::get();
31+
let mut pfn_set = PfnSet::new();
32+
33+
for process in ProcessDescendentsIterator::new(self.parent_pid) {
34+
debug!("Process PID: {}", process.pid());
35+
let mut pagemap = process.pagemap()?;
36+
for MemoryMap {
37+
address: (begin, end),
38+
..
39+
} in process.maps()?
40+
{
41+
if begin > PAGE_OFFSET {
42+
continue; // page idle tracking is user mem only
43+
}
44+
debug!("Memory region: {:#x} — {:#x}", begin, end);
45+
let begin = begin as usize / page_size;
46+
let end = end as usize / page_size;
47+
for page in pagemap.get_range_info(begin..end)? {
48+
if let PageInfo::MemoryPage(memory_page_flags) = page {
49+
if memory_page_flags.contains(MemoryPageFlags::PRESENT) {
50+
pfn_set.insert(memory_page_flags.get_page_frame_number());
51+
}
52+
}
53+
}
54+
}
55+
}
56+
57+
let mut nb_pages = 0;
58+
59+
// See https://www.kernel.org/doc/html/latest/admin-guide/mm/idle_page_tracking.html
60+
let mut page_idle_bitmap = std::fs::OpenOptions::new()
61+
.read(true)
62+
.write(true)
63+
.open("/sys/kernel/mm/page_idle/bitmap")?;
64+
65+
for (pfn_block, pfn_bitset) in pfn_set {
66+
page_idle_bitmap.seek(SeekFrom::Start(pfn_block * 8))?;
67+
68+
let mut buffer = [0; 8];
69+
page_idle_bitmap.read_exact(&mut buffer)?;
70+
let bitset = u64::from_ne_bytes(buffer);
71+
72+
nb_pages += (!bitset & pfn_bitset).count_ones() as usize;
73+
74+
page_idle_bitmap.seek(SeekFrom::Start(pfn_block * 8))?;
75+
page_idle_bitmap.write_all(&pfn_bitset.to_ne_bytes())?;
76+
}
77+
78+
gauge!("total_wss_bytes").set((nb_pages * page_size) as f64);
79+
80+
Ok(())
81+
}
82+
}
83+
84+
struct ProcessDescendentsIterator {
85+
stack: Vec<Process>,
86+
}
87+
88+
impl ProcessDescendentsIterator {
89+
fn new(parent_pid: i32) -> Self {
90+
Self {
91+
stack: vec![
92+
Process::new(parent_pid).expect(format!("process {parent_pid} not found").as_str()),
93+
],
94+
}
95+
}
96+
}
97+
98+
impl Iterator for ProcessDescendentsIterator {
99+
type Item = Process;
100+
101+
fn next(&mut self) -> Option<Self::Item> {
102+
while let Some(process) = self.stack.pop() {
103+
if let Ok(tasks) = process.tasks() {
104+
for task in tasks.flatten() {
105+
if let Ok(children) = task.children() {
106+
for child in children {
107+
if let Ok(c) = Process::new(child as i32) {
108+
self.stack.push(c);
109+
}
110+
}
111+
}
112+
}
113+
}
114+
return Some(process);
115+
}
116+
None
117+
}
118+
}
119+
120+
#[derive(Debug)]
121+
struct PfnSet(HashMap<u64, u64>);
122+
123+
impl PfnSet {
124+
fn new() -> Self {
125+
Self(HashMap::with_capacity(1024))
126+
}
127+
128+
fn insert(&mut self, pfn: Pfn) {
129+
*self.0.entry(pfn.0 / 64).or_default() |= 1 << (pfn.0 % 64);
130+
}
131+
}
132+
133+
impl IntoIterator for PfnSet {
134+
type Item = (u64, u64);
135+
type IntoIter = std::collections::hash_map::IntoIter<u64, u64>;
136+
137+
fn into_iter(self) -> Self::IntoIter {
138+
self.0.into_iter()
139+
}
140+
}
141+
142+
#[cfg(test)]
143+
mod tests {
144+
use super::*;
145+
use std::collections::HashSet;
146+
use std::io::BufRead;
147+
use std::io::BufReader;
148+
use std::process::{Command, Stdio};
149+
150+
#[test]
151+
fn process_descendants_iterator() {
152+
const NB_PROCESSES_PER_LEVEL: usize = 3;
153+
const NB_LEVELS: u32 = 3;
154+
// The total number of processes is the sum of the NB_LEVELS first terms
155+
// of the geometric progression with common ratio of NB_PROCESSES_PER_LEVEL.
156+
const NB_PROCESSES: usize =
157+
(NB_PROCESSES_PER_LEVEL.pow(NB_LEVELS + 1) - 1) / (NB_PROCESSES_PER_LEVEL - 1);
158+
159+
let mut child = Command::new("src/observer/linux/wss/tests/create_process_tree.py")
160+
.arg(NB_PROCESSES_PER_LEVEL.to_string())
161+
.arg(NB_LEVELS.to_string())
162+
.stdout(Stdio::piped())
163+
.spawn()
164+
.expect("Failed to create process tree");
165+
166+
let mut children_pids = HashSet::with_capacity(NB_PROCESSES);
167+
children_pids.insert(child.id() as i32);
168+
169+
let mut reader = BufReader::new(child.stdout.take().unwrap());
170+
for _ in 0..NB_PROCESSES - 1 {
171+
let mut line = String::new();
172+
reader.read_line(&mut line).expect("Failed to read line");
173+
let pid: i32 = line.trim().parse().expect("Failed to parse PID");
174+
assert!(children_pids.insert(pid));
175+
}
176+
177+
for process in ProcessDescendentsIterator::new(child.id() as i32) {
178+
assert!(
179+
children_pids.remove(&process.pid()),
180+
"ProcessDescendentsIterator returned unexpected PID {pid}",
181+
pid = process.pid()
182+
);
183+
}
184+
assert!(
185+
children_pids.is_empty(),
186+
"ProcessDescendentsIterator didn’t return all PIDs: {children_pids:?}"
187+
);
188+
189+
nix::sys::signal::kill(
190+
nix::unistd::Pid::from_raw(child.id() as i32),
191+
nix::sys::signal::Signal::SIGTERM,
192+
)
193+
.expect("Failed to kill process tree");
194+
195+
child
196+
.wait()
197+
.expect("Failed to wait for process tree completion");
198+
}
199+
200+
#[test]
201+
fn pfn_set() {
202+
let input = vec![0, 64, 65, 66, 128, 136, 255];
203+
let expected = vec![(0, 0x1), (1, 0x7), (2, 0x101), (3, 0x8000_0000_0000_0000)];
204+
205+
let mut pfn_set = PfnSet::new();
206+
for i in input {
207+
pfn_set.insert(Pfn(i));
208+
}
209+
let mut output: Vec<_> = pfn_set.into_iter().collect();
210+
output.sort_by_key(|(k, _)| *k);
211+
212+
assert_eq!(output, expected);
213+
}
214+
}
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
4+
"""
5+
This script creates a tree of idle processes with a specified depth and breadth.
6+
The processes are spawned using fork and will wait for a signal to terminate.
7+
8+
Usage:
9+
create_process_tree.py <number_of_processes> <depth>
10+
11+
Example:
12+
create_process_tree.py 3 2
13+
14+
This will create a process tree with 3 processes at each level and a depth of 2.
15+
It would look like:
16+
17+
```
18+
$ pstree -a -pg 30881
19+
python3,30881,30881 ./create_process_tree.py 3 2
20+
├─python3,30899,30881 ./create_process_tree.py 3 2
21+
│ ├─python3,30902,30881 ./create_process_tree.py 3 2
22+
│ ├─python3,30903,30881 ./create_process_tree.py 3 2
23+
│ └─python3,30906,30881 ./create_process_tree.py 3 2
24+
├─python3,30900,30881 ./create_process_tree.py 3 2
25+
│ ├─python3,30904,30881 ./create_process_tree.py 3 2
26+
│ ├─python3,30907,30881 ./create_process_tree.py 3 2
27+
│ └─python3,30909,30881 ./create_process_tree.py 3 2
28+
└─python3,30901,30881 ./create_process_tree.py 3 2
29+
├─python3,30905,30881 ./create_process_tree.py 3 2
30+
├─python3,30908,30881 ./create_process_tree.py 3 2
31+
└─python3,30910,30881 ./create_process_tree.py 3 2
32+
```
33+
34+
Each process will print its PID.
35+
36+
The script also includes a signal handler to terminate the whole process group
37+
when a SIGTERM signal is received.
38+
"""
39+
40+
import argparse
41+
import logging
42+
import os
43+
import signal
44+
45+
46+
def spawn_processes(nb: int, depth: int) -> None:
47+
"""
48+
Spawn a number of processes in a tree structure.
49+
50+
Args:
51+
nb (int): Number of processes to spawn.
52+
depth (int): Depth of the process tree.
53+
"""
54+
if depth == 0 or nb <= 0:
55+
return
56+
57+
for i in range(nb):
58+
pid = os.fork()
59+
if pid == 0: # Child process
60+
print(os.getpid(), flush=True)
61+
logging.info(f"Child PID: {os.getpid()}, Parent PID: {os.getppid()}")
62+
spawn_processes(nb, depth - 1)
63+
signal.pause()
64+
65+
66+
def handler(signum: int, frame) -> None:
67+
"""
68+
Signal handler to terminate the process group.
69+
70+
Args:
71+
signum (int): Signal number.
72+
frame (signal.FrameType): Current stack frame.
73+
"""
74+
signal.signal(signal.SIGTERM, signal.SIG_DFL)
75+
os.killpg(os.getpid(), signal.SIGTERM)
76+
77+
78+
if __name__ == "__main__":
79+
parser = argparse.ArgumentParser(description="Create a process tree.")
80+
parser.add_argument("nb", type=int, help="Number of processes to spawn per level")
81+
parser.add_argument("depth", type=int, help="Depth of the process tree")
82+
parser.add_argument(
83+
"--log-level",
84+
type=str,
85+
default="warning",
86+
help="Logging level",
87+
choices=["debug", "info", "warning", "error", "critical"],
88+
)
89+
args = parser.parse_args()
90+
logging.basicConfig(level=args.log_level.upper())
91+
92+
os.setpgid(0, 0)
93+
94+
spawn_processes(args.nb, args.depth)
95+
96+
signal.signal(signal.SIGTERM, handler)
97+
signal.pause()

0 commit comments

Comments
 (0)