Skip to content

Commit c45c19a

Browse files
[runtime] Track RSS (#1457)
1 parent b4585e7 commit c45c19a

7 files changed

Lines changed: 244 additions & 5 deletions

File tree

Cargo.lock

Lines changed: 80 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ io-uring = "0.7.4"
8686
rayon = "1.10.0"
8787
async-lock = "3.4.0"
8888
libc = "0.2.172"
89+
sysinfo = "0.33.0"
8990
zeroize = "1.5.7"
9091
blst = "0.3.13"
9192
p256 = "0.13.2"

runtime/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ rayon = { workspace = true }
2929
async-lock = { workspace = true }
3030
io-uring = { workspace = true, optional = true }
3131
libc = { workspace = true }
32+
sysinfo = { workspace = true }
3233

3334
[features]
3435
default = []

runtime/src/lib.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ cfg_if::cfg_if! {
3939
}
4040
}
4141
mod network;
42+
mod process;
4243
mod storage;
4344
pub mod telemetry;
4445
mod utils;
@@ -1672,6 +1673,37 @@ mod tests {
16721673
test_metrics_label(executor);
16731674
}
16741675

1676+
#[test]
1677+
fn test_tokio_process_rss_metric() {
1678+
let executor = tokio::Runner::default();
1679+
executor.start(|context| async move {
1680+
loop {
1681+
// Wait for RSS metric to be available
1682+
let metrics = context.encode();
1683+
if !metrics.contains("runtime_process_rss") {
1684+
context.sleep(Duration::from_millis(100)).await;
1685+
continue;
1686+
}
1687+
1688+
// Verify the RSS value is eventually populated (greater than 0)
1689+
for line in metrics.lines() {
1690+
if line.starts_with("runtime_process_rss")
1691+
&& !line.starts_with("runtime_process_rss{")
1692+
{
1693+
let parts: Vec<&str> = line.split_whitespace().collect();
1694+
if parts.len() >= 2 {
1695+
let rss_value: i64 =
1696+
parts[1].parse().expect("Failed to parse RSS value");
1697+
if rss_value > 0 {
1698+
return;
1699+
}
1700+
}
1701+
}
1702+
}
1703+
}
1704+
});
1705+
}
1706+
16751707
#[test]
16761708
fn test_tokio_telemetry() {
16771709
let executor = tokio::Runner::default();

runtime/src/process/metered.rs

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
//! Process metrics collection.
2+
3+
use prometheus_client::{metrics::gauge::Gauge, registry::Registry};
4+
use std::{future::Future, time::Duration};
5+
use sysinfo::{ProcessRefreshKind, ProcessesToUpdate, System};
6+
7+
/// The interval at which to update process metrics.
8+
const TICK_INTERVAL: Duration = Duration::from_secs(10);
9+
10+
/// Process metrics collector.
11+
pub struct Metrics {
12+
/// Resident set size in bytes.
13+
pub rss: Gauge,
14+
/// Virtual memory size in bytes.
15+
pub virtual_memory: Gauge,
16+
17+
/// Process ID.
18+
pid: sysinfo::Pid,
19+
/// System information handle.
20+
system: System,
21+
}
22+
23+
impl Metrics {
24+
/// Initialize process metrics and register them with the given registry.
25+
pub fn init(registry: &mut Registry) -> Self {
26+
let metrics = Self {
27+
pid: sysinfo::Pid::from_u32(std::process::id()),
28+
rss: Gauge::default(),
29+
virtual_memory: Gauge::default(),
30+
31+
system: System::new(),
32+
};
33+
34+
// Register all metrics
35+
registry.register(
36+
"process_rss",
37+
"Resident set size of the current process",
38+
metrics.rss.clone(),
39+
);
40+
registry.register(
41+
"process_virtual_memory",
42+
"Virtual memory size of the current process",
43+
metrics.virtual_memory.clone(),
44+
);
45+
46+
metrics
47+
}
48+
49+
/// Update all process metrics.
50+
fn update(&mut self) {
51+
// Refresh process information
52+
self.system.refresh_processes_specifics(
53+
ProcessesToUpdate::Some(&[self.pid]),
54+
false,
55+
ProcessRefreshKind::nothing().with_memory(),
56+
);
57+
58+
// If the process exists, update the metrics
59+
if let Some(process) = self.system.process(self.pid) {
60+
self.rss.set(process.memory() as i64);
61+
self.virtual_memory.set(process.virtual_memory() as i64);
62+
}
63+
}
64+
65+
/// Update process metrics periodically.
66+
///
67+
/// This function takes a sleep function as a parameter to allow different runtimes
68+
/// to provide their own implementation.
69+
pub async fn collect<F, Fut>(mut self, sleep_fn: F)
70+
where
71+
F: Fn(Duration) -> Fut,
72+
Fut: Future<Output = ()>,
73+
{
74+
loop {
75+
self.update();
76+
sleep_fn(TICK_INTERVAL).await;
77+
}
78+
}
79+
}
80+
81+
#[cfg(test)]
82+
mod tests {
83+
use super::*;
84+
85+
#[test]
86+
fn test_process_metrics_init() {
87+
let mut registry = Registry::default();
88+
let mut metrics = Metrics::init(&mut registry);
89+
90+
// Update metrics
91+
metrics.update();
92+
93+
// Check that RSS is reasonable (> 1MB for a running process)
94+
let rss = metrics.rss.get();
95+
assert!(rss > 0);
96+
97+
// Check that virtual memory is >= RSS
98+
let virt = metrics.virtual_memory.get();
99+
assert!(virt >= rss);
100+
101+
// Allocate some memory
102+
let mut vec = vec![0; 10 * 1024 * 1024]; // 10MB
103+
vec.push(1);
104+
105+
// Update metrics
106+
metrics.update();
107+
108+
// Check that the metrics are updated
109+
let new_rss = metrics.rss.get();
110+
assert!(new_rss > rss, "RSS should be > {rss}");
111+
112+
// Check that virtual memory is updated
113+
let new_virt = metrics.virtual_memory.get();
114+
assert!(new_virt > virt, "Virtual memory should be > {virt}");
115+
}
116+
}

runtime/src/process/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
//! Process implementations.
2+
3+
pub mod metered;

runtime/src/tokio/runtime.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ use crate::{
1010
network::iouring::{Config as IoUringNetworkConfig, Network as IoUringNetwork},
1111
};
1212
use crate::{
13-
network::metered::Network as MeteredNetwork, signal::Signal,
14-
storage::metered::Storage as MeteredStorage, telemetry::metrics::task::Label,
13+
network::metered::Network as MeteredNetwork, process::metered::Metrics as MeteredProcess,
14+
signal::Signal, storage::metered::Storage as MeteredStorage, telemetry::metrics::task::Label,
1515
utils::signal::Stopper, Clock, Error, Handle, SinkOf, StreamOf, METRICS_PREFIX,
1616
};
1717
use commonware_macros::select;
@@ -253,6 +253,14 @@ impl crate::Runner for Runner {
253253
.build()
254254
.expect("failed to create Tokio runtime");
255255

256+
// Collect process metrics.
257+
//
258+
// We prefer to collect process metrics outside of `Context` because
259+
// we are using `runtime_registry` rather than the one provided by `Context`.
260+
let process = MeteredProcess::init(runtime_registry);
261+
runtime.spawn(process.collect(tokio::time::sleep));
262+
263+
// Initialize storage
256264
cfg_if::cfg_if! {
257265
if #[cfg(feature = "iouring-storage")] {
258266
let iouring_registry = runtime_registry.sub_registry_with_prefix("iouring_storage");
@@ -274,6 +282,7 @@ impl crate::Runner for Runner {
274282
}
275283
}
276284

285+
// Initialize network
277286
cfg_if::cfg_if! {
278287
if #[cfg(feature = "iouring-network")] {
279288
let iouring_registry = runtime_registry.sub_registry_with_prefix("iouring_network");

0 commit comments

Comments
 (0)