diff --git a/rust/otap-dataflow/Cargo.toml b/rust/otap-dataflow/Cargo.toml
index 2868645142..ff3d9119f2 100644
--- a/rust/otap-dataflow/Cargo.toml
+++ b/rust/otap-dataflow/Cargo.toml
@@ -40,6 +40,9 @@ thiserror.workspace = true
serde_json.workspace = true
clap.workspace = true
mimalloc = { workspace = true, optional = true }
+sysinfo.workspace = true
+dhat = { workspace = true, optional = true }
+cfg-if.workspace = true
[target.'cfg(not(windows))'.dependencies]
tikv-jemallocator = { workspace = true, optional = true }
@@ -84,6 +87,7 @@ bitflags = "2.10"
bytes = "1.11.1"
bytemuck = "1.24"
blake3 = "1.5.5"
+cfg-if = "1.0.4"
chrono = { version = "0.4", features = ["serde"] }
chrono-tz = { version = "0.10" }
ciborium = "0.2.2"
@@ -92,6 +96,7 @@ core_affinity = "0.8.3"
criterion = "0.8.0"
data-encoding = "2.9.0"
datafusion = { version = "53.0.0", default-features = false }
+dhat = "0.3.3"
fluke-hpack = "0.3.1"
flume = { version = "0.12.0", default-features = false, features = ["async"] }
futures = "0.3.32"
@@ -250,6 +255,9 @@ aws = ["otap-df-otap/aws"]
unsafe-optimizations = ["unchecked-index", "unchecked-arithmetic"]
unchecked-index = []
unchecked-arithmetic = []
+dhat-heap = [
+ "dep:dhat",
+]
# Dev/test tooling (fake data generator).
dev-tools = ["otap-df-core-nodes/dev-tools"]
@@ -270,6 +278,9 @@ condense-attributes-processor = ["otap-df-contrib-nodes/condense-attributes-proc
recordset-kql-processor = ["otap-df-contrib-nodes/recordset-kql-processor"]
resource-validator-processor = ["otap-df-contrib-nodes/resource-validator-processor"]
+[lints.rust]
+unexpected_cfgs = { level = "warn", check-cfg = ['cfg(tarpaulin_include)'] }
+
[workspace.lints.rust]
# General compatibility lints
rust_2018_idioms = { level = "warn", priority = -1 }
@@ -330,6 +341,10 @@ missing_crate_level_docs = "deny"
[profile.release]
debug = "line-tables-only" # minimum required for profiling
+[profile.profiling]
+inherits = "release"
+debug = 2
+
# A more in-depth analysis is necessary to determine the optimal parameters for the release profile.
#[profile.release]
#lto = "thin"
diff --git a/rust/otap-dataflow/PROFILING.md b/rust/otap-dataflow/PROFILING.md
new file mode 100644
index 0000000000..2400d2c7d6
--- /dev/null
+++ b/rust/otap-dataflow/PROFILING.md
@@ -0,0 +1,57 @@
+# Profiling
+
+This section covers CPU and memory profiling for `df_engine`.
+
+**Requirements**:
+
+- [samply](https://github.com/mstange/samply)
+
+**Installation**:
+
+```cmd/pwsh/bash
+cargo install --locked samply
+```
+
+**Build**:
+
+**Build for both CPU and memory profiling**:
+
+```cmd/pwsh/bash
+cargo build --profile profiling --no-default-features --features dhat-heap --workspace
+```
+
+> [!NOTE]
+> In this command, all default features are disabled.
+> Use specific flags to enable individual features.
+
+**Build for only CPU profiling**:
+
+```cmd/pwsh/bash
+cargo build --profile profiling --workspace
+```
+
+**Run**:
+
+**Run with both CPU and memory profiling enabled**:
+
+```pwsh/bash
+samply record ./target/profiling/df_engine --config ./configs/otap-noop.yaml
+```
+
+**Run with only memory profiling enabled**:
+
+```pwsh/bash
+./target/profiling/df_engine --config ./configs/otap-noop.yaml
+```
+
+**Result**:
+
+On graceful shutdown of `df_engine`, it will generate `dhat-heap.json` file
+for memory profiling that needs to be rendered by uploading it to:
+.
+
+CPU profiling output will be automatically rendered on browser.
+
+> [!NOTE]
+> `dhat` needs a clean shutdown to generate `dhat-heap.json` file.
+> In `df_engine` this can be done manually with Ctrl+C.
diff --git a/rust/otap-dataflow/README.md b/rust/otap-dataflow/README.md
index 30de819da9..216c9d37a2 100644
--- a/rust/otap-dataflow/README.md
+++ b/rust/otap-dataflow/README.md
@@ -445,6 +445,10 @@ cargo run --example
docker build --build-context otel-arrow=../../ -f Dockerfile -t df_engine .
```
+## Profiling
+
+[Refer profiling Page](./PROFILING.md)
+
## Contributing
- [Contribution Guidelines](CONTRIBUTING.md)
diff --git a/rust/otap-dataflow/src/main.rs b/rust/otap-dataflow/src/main.rs
index 88e59fd932..fffc52d713 100644
--- a/rust/otap-dataflow/src/main.rs
+++ b/rust/otap-dataflow/src/main.rs
@@ -16,6 +16,7 @@ use otap_df_controller::startup;
// Keep this side-effect import so the crate is linked and its `linkme`
// distributed-slice registrations (core nodes) are visible
// in `OTAP_PIPELINE_FACTORY` at runtime.
+use cfg_if::cfg_if;
use otap_df_core_nodes as _;
use otap_df_otap::OTAP_PIPELINE_FACTORY;
/// Project license text (Apache-2.0), embedded at compile time.
@@ -34,18 +35,70 @@ fn memory_allocator_name() -> &'static str {
}
}
+// -----------------------------------------------------------------------------
+// Feature guard: jemalloc + mimalloc + dhat-heap any two together should fail.
+// -----------------------------------------------------------------------------
#[cfg(all(
- not(windows),
- feature = "jemalloc",
- feature = "mimalloc",
not(any(test, doc)),
- not(clippy)
+ not(clippy),
+ any(
+ all(feature = "dhat-heap", feature = "mimalloc"),
+ all(feature = "dhat-heap", feature = "jemalloc"),
+ all(feature = "jemalloc", feature = "mimalloc"),
+ )
))]
compile_error!(
- "Features `jemalloc` and `mimalloc` are mutually exclusive. \
- To build with mimalloc, use: cargo build --release --no-default-features --features mimalloc"
+ "Allocator features are mutually exclusive. Enable only one allocator: `dhat-heap`, `mimalloc`, `jemalloc`. \
+ Example: \
+ (mimalloc): cargo build --release --no-default-features --features mimalloc. \
+ (jemalloc): cargo build --release --no-default-features --features jemalloc. \
+ (dhat): cargo build --profile profiling --no-default-features --features dhat-heap."
);
+#[cfg(feature = "dhat-heap")]
+use {
+ dhat::Profiler,
+ std::sync::{LazyLock, Mutex},
+};
+
+#[cfg(all(not(clippy), feature = "mimalloc"))]
+use mimalloc::MiMalloc;
+
+#[cfg(all(not(clippy), not(windows), feature = "jemalloc"))]
+use tikv_jemallocator::Jemalloc;
+
+// -----------------------------------------------------------------------------
+// Global allocator selection.
+// -----------------------------------------------------------------------------
+cfg_if! {
+ // dhat (profiling) — wins everywhere when enabled
+ if #[cfg(all(not(tarpaulin_include), feature = "dhat-heap"))] {
+ #[global_allocator]
+ static GLOBAL: dhat::Alloc = dhat::Alloc;
+ static DHAT_PROFILER: LazyLock>> = LazyLock::new(|| Mutex::new(None));
+
+ fn dhat_start() {
+ let mut profiler = DHAT_PROFILER.lock().unwrap();
+ *profiler = Some(dhat::Profiler::new_heap());
+ }
+
+ fn dhat_finish() {
+ let mut profiler = DHAT_PROFILER.lock().unwrap();
+ let _ = profiler.take();
+ }
+
+ // Windows default: mimalloc
+ } else if #[cfg(feature = "mimalloc")] {
+ #[global_allocator]
+ static GLOBAL: MiMalloc = MiMalloc;
+
+ // Linux default: jemalloc
+ } else if #[cfg(all(not(windows), feature = "jemalloc"))] {
+ #[global_allocator]
+ static GLOBAL: Jemalloc = Jemalloc;
+ }
+}
+
// Crypto provider features are mutually exclusive.
// The `not(any(test, doc))` and `not(clippy)` guards mirror the jemalloc/mimalloc
// pattern so that `cargo test --all-features` (used in CI) does not fail.
@@ -82,20 +135,6 @@ compile_error!(
Use --no-default-features to disable the default crypto provider, then enable exactly one."
);
-#[cfg(feature = "mimalloc")]
-use mimalloc::MiMalloc;
-
-#[cfg(all(not(windows), feature = "jemalloc", not(feature = "mimalloc")))]
-use tikv_jemallocator::Jemalloc;
-
-#[cfg(feature = "mimalloc")]
-#[global_allocator]
-static GLOBAL: MiMalloc = MiMalloc;
-
-#[cfg(all(not(windows), feature = "jemalloc", not(feature = "mimalloc")))]
-#[global_allocator]
-static GLOBAL: Jemalloc = Jemalloc;
-
#[derive(Parser, Debug)]
#[command(
author,
@@ -183,6 +222,10 @@ fn parse_core_id_range(s: &str) -> Result {
}
fn main() -> Result<(), Box> {
+ #[cfg(all(not(tarpaulin_include), feature = "dhat-heap"))]
+ {
+ dhat_start();
+ }
// Install the rustls crypto provider selected by the crypto-* feature flag.
// This must happen before any TLS connections (reqwest, tonic, etc.).
otap_df_otap::crypto::install_crypto_provider()
@@ -225,6 +268,11 @@ fn main() -> Result<(), Box> {
let controller = Controller::new(&OTAP_PIPELINE_FACTORY);
let result = controller.run_forever(engine_cfg);
+ #[cfg(all(not(tarpaulin_include), feature = "dhat-heap"))]
+ {
+ dhat_finish();
+ }
+
match result {
Ok(_) => {
println!("Pipeline run successfully");