Skip to content

Commit 1548992

Browse files
authored
Add process-wide RSS metric to engine metrics (open-telemetry#2153)
Adds a memory_rss metric that reports the process-wide Resident Set Size (physical RAM) using the memory-stats crate. Unlike the existing jemalloc-based memory_usage (per-thread heap only), this captures the full process memory footprint — matching what external tools like kubectl top pod or htop report. Works on Linux, macOS, and Windows with no feature flags required. This is reported under engine level. If the direction is okay, will introduce more engine level metrics (total CPU utilization etc.) (OTel Collector has this metric already. Slightly different name, just like every other metric) ## How are these changes tested? Locally ran engine, and then query `http://127.0.0.1:8080/metrics?reset=false&format=prometheus` and look for the new metric - they match what I see using external tools for RSS memory tracking. ```txt # HELP memory_rss Process-wide Resident Set Size — physical RAM currently used by the process. Matches what external tools report (e.g. `kubectl top pod`, `htop`, `ps rss`). # TYPE memory_rss gauge memory_rss{set="engine.metrics",process_instance_id="AGOLC2UIVF4SFAEKCW6BLZ5XMM",host_id="",container_id=""} 354533376 1772503088570 ``` ## Are there any user-facing changes? New metric, no change to existing. Note: Decided to add a new dependency which brings libc which is already a dependency. If concerns about external crate, we can hand roll this ourselves.
1 parent 63a23cf commit 1548992

6 files changed

Lines changed: 191 additions & 0 deletions

File tree

rust/otap-dataflow/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ libmimalloc-sys = { version = "0.1.44", features = ["extended", "v3"] }
106106
tikv-jemallocator = { version = "0.6.1" }
107107
tikv-jemalloc-ctl = { version = "0.6.1" }
108108
memmap2 = "0.9"
109+
memory-stats = "1"
109110
nix = { version = "0.31.0", features = ["process", "signal", "fs"] }
110111
notify = "8.0" # Uses platform-native backend: inotify (Linux), kqueue (macOS), ReadDirectoryChanges (Windows)
111112
num_enum = "0.7"

rust/otap-dataflow/crates/controller/src/lib.rs

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,48 @@ impl<PData: 'static + Clone + Send + Sync + std::fmt::Debug> Controller<PData> {
270270
move |cancellation_token| obs_state_store.run(cancellation_token),
271271
)?;
272272

273+
// Start the engine-wide metrics collection task.
274+
// This samples engine-level metrics (e.g. RSS) on a fixed interval and
275+
// reports them once per engine, rather than duplicating across pipelines.
276+
let engine_entity_key = controller_ctx.register_engine_entity();
277+
let engine_registry = controller_ctx.telemetry_registry();
278+
let engine_reporter = metrics_reporter.clone();
279+
let engine_metrics_handle = spawn_thread_local_task(
280+
"engine-metrics",
281+
admin_tracing_setup.clone(),
282+
move |cancellation_token| async move {
283+
use otap_df_engine::engine_metrics::EngineMetricsMonitor;
284+
use std::time::Duration;
285+
use tokio::time::{MissedTickBehavior, interval};
286+
287+
// TODO: Make this interval configurable via engine config.
288+
const ENGINE_METRICS_INTERVAL: Duration = Duration::from_secs(5);
289+
290+
let mut monitor =
291+
EngineMetricsMonitor::new(engine_registry, engine_entity_key, engine_reporter);
292+
293+
let mut ticker = interval(ENGINE_METRICS_INTERVAL);
294+
ticker.set_missed_tick_behavior(MissedTickBehavior::Skip);
295+
296+
loop {
297+
tokio::select! {
298+
_ = cancellation_token.cancelled() => {
299+
return Ok::<(), otap_df_telemetry::error::Error>(());
300+
}
301+
_ = ticker.tick() => {
302+
monitor.update();
303+
if let Err(err) = monitor.report() {
304+
otel_warn!(
305+
"engine.metrics.reporting.fail",
306+
error = err.to_string()
307+
);
308+
}
309+
}
310+
}
311+
}
312+
},
313+
)?;
314+
273315
let mut threads = Vec::new();
274316
let mut ctrl_msg_senders = Vec::new();
275317

@@ -432,6 +474,7 @@ impl<PData: 'static + Clone + Send + Sync + std::fmt::Debug> Controller<PData> {
432474
}
433475

434476
// All pipelines have finished; shut down the admin HTTP server and metric aggregator gracefully.
477+
engine_metrics_handle.shutdown_and_join()?;
435478
admin_server_handle.shutdown_and_join()?;
436479
metrics_agg_handle.shutdown_and_join()?;
437480
if let Some(handle) = metrics_dispatcher_handle {

rust/otap-dataflow/crates/engine/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ data-encoding = { workspace = true }
4545
prost = { workspace = true }
4646
byte-unit = { workspace = true }
4747
cpu-time = { workspace = true }
48+
memory-stats = { workspace = true }
4849
nix = { workspace = true, features = ["resource"] }
4950

5051
[target.'cfg(not(windows))'.dependencies]

rust/otap-dataflow/crates/engine/src/context.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,28 @@ impl ControllerContext {
160160
thread_id,
161161
)
162162
}
163+
164+
/// Registers the engine-level entity for engine-wide metrics.
165+
///
166+
/// Returns the [`EntityKey`] to pass to
167+
/// [`EngineMetricsMonitor::new`](crate::engine_metrics::EngineMetricsMonitor::new).
168+
#[must_use]
169+
pub fn register_engine_entity(&self) -> EntityKey {
170+
use crate::attributes::ResourceAttributeSet;
171+
172+
self.telemetry_registry_handle
173+
.register_entity(ResourceAttributeSet {
174+
process_instance_id: self.process_instance_id.clone(),
175+
host_id: self.host_id.clone(),
176+
container_id: self.container_id.clone(),
177+
})
178+
}
179+
180+
/// Returns a handle to the telemetry registry.
181+
#[must_use]
182+
pub fn telemetry_registry(&self) -> TelemetryRegistryHandle {
183+
self.telemetry_registry_handle.clone()
184+
}
163185
}
164186

165187
impl PipelineContext {
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
//! Engine-level metrics for the OTAP dataflow engine.
5+
//!
6+
//! Unlike per-pipeline metrics (which are sampled on each pipeline thread),
7+
//! engine metrics are emitted **once per engine instance** by a dedicated
8+
//! background task spawned by the controller.
9+
//!
10+
//! **Metrics**
11+
//! - `memory_rss` (`ObserveUpDownCounter<u64>`, `{By}`):
12+
//! Process-wide Resident Set Size — physical memory currently held in RAM.
13+
//! Matches what external tools report (e.g. `kubectl top pod`, `htop`, `ps rss`).
14+
15+
use otap_df_telemetry::instrument::ObserveUpDownCounter;
16+
use otap_df_telemetry::metrics::MetricSet;
17+
use otap_df_telemetry::registry::{EntityKey, TelemetryRegistryHandle};
18+
use otap_df_telemetry::reporter::MetricsReporter;
19+
use otap_df_telemetry_macros::metric_set;
20+
21+
/// Engine-wide metrics emitted once per engine instance.
22+
#[metric_set(name = "engine.metrics")]
23+
#[derive(Debug, Default, Clone)]
24+
pub struct EngineMetrics {
25+
/// Process-wide Resident Set Size — physical RAM currently used by the process.
26+
/// Matches what external tools report (e.g. `kubectl top pod`, `htop`, `ps rss`).
27+
#[metric(unit = "{By}")]
28+
pub memory_rss: ObserveUpDownCounter<u64>,
29+
}
30+
31+
/// Monitors and reports engine-wide metrics.
32+
///
33+
/// Created by the controller and driven by a periodic timer in a dedicated
34+
/// background task. Call [`update`](Self::update) to sample current values
35+
/// and [`report`](Self::report) to flush them to the metrics pipeline.
36+
pub struct EngineMetricsMonitor {
37+
metrics: MetricSet<EngineMetrics>,
38+
reporter: MetricsReporter,
39+
registry: TelemetryRegistryHandle,
40+
}
41+
42+
impl EngineMetricsMonitor {
43+
/// Creates a new engine metrics monitor.
44+
///
45+
/// The caller must have already registered the engine entity via
46+
/// [`ControllerContext::register_engine_entity`](crate::context::ControllerContext::register_engine_entity).
47+
#[must_use]
48+
pub fn new(
49+
registry: TelemetryRegistryHandle,
50+
entity_key: EntityKey,
51+
reporter: MetricsReporter,
52+
) -> Self {
53+
let metrics = registry.register_metric_set_for_entity::<EngineMetrics>(entity_key);
54+
Self {
55+
metrics,
56+
reporter,
57+
registry,
58+
}
59+
}
60+
61+
/// Samples current engine-wide metrics (RSS, etc.).
62+
pub fn update(&mut self) {
63+
self.metrics.memory_rss.observe(get_rss_bytes());
64+
}
65+
66+
/// Flushes sampled metrics to the reporting pipeline.
67+
///
68+
/// Returns an error only if the metrics channel is permanently closed.
69+
/// A full channel is silently tolerated (non-blocking, try-send semantics).
70+
pub fn report(&mut self) -> Result<(), otap_df_telemetry::error::Error> {
71+
self.reporter.report(&mut self.metrics)
72+
}
73+
}
74+
75+
/// Returns the current process-wide RSS (Resident Set Size) in bytes.
76+
fn get_rss_bytes() -> u64 {
77+
memory_stats::memory_stats()
78+
.map(|stats| stats.physical_mem as u64)
79+
.unwrap_or(0)
80+
}
81+
82+
impl Drop for EngineMetricsMonitor {
83+
fn drop(&mut self) {
84+
let _ = self
85+
.registry
86+
.unregister_metric_set(self.metrics.metric_set_key());
87+
}
88+
}
89+
90+
#[cfg(test)]
91+
mod tests {
92+
use super::*;
93+
use crate::context::ControllerContext;
94+
use otap_df_telemetry::registry::TelemetryRegistryHandle;
95+
96+
#[test]
97+
fn engine_metrics_reports_nonzero_rss() {
98+
let registry = TelemetryRegistryHandle::new();
99+
let controller = ControllerContext::new(registry.clone());
100+
let entity_key = controller.register_engine_entity();
101+
let (_rx, reporter) = MetricsReporter::create_new_and_receiver(16);
102+
103+
let mut monitor = EngineMetricsMonitor::new(registry, entity_key, reporter);
104+
monitor.update();
105+
106+
assert!(
107+
monitor.metrics.memory_rss.get() > 0,
108+
"memory_rss should report non-zero process RSS"
109+
);
110+
}
111+
112+
#[test]
113+
fn engine_metrics_report_succeeds() {
114+
let registry = TelemetryRegistryHandle::new();
115+
let controller = ControllerContext::new(registry.clone());
116+
let entity_key = controller.register_engine_entity();
117+
let (_rx, reporter) = MetricsReporter::create_new_and_receiver(16);
118+
119+
let mut monitor = EngineMetricsMonitor::new(registry, entity_key, reporter);
120+
monitor.update();
121+
assert!(monitor.report().is_ok());
122+
}
123+
}

rust/otap-dataflow/crates/engine/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ pub mod config;
5858
pub mod context;
5959
pub mod control;
6060
pub mod effect_handler;
61+
pub mod engine_metrics;
6162
pub mod entity_context;
6263
pub mod local;
6364
pub mod node;

0 commit comments

Comments
 (0)