Skip to content
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
c294b16
Memory & CPU Profiling.
sapatrjv Mar 21, 2026
062e1fb
Enable Memory & CPU profiling for df_engine.
sapatrjv Mar 24, 2026
f79dffd
Merge branch 'main' into dev/sapatr/memprofilingwithdhat
sapatrjv Mar 24, 2026
1ab21be
Enable Memory & CPU Profiling for df_engine
sapatrjv Mar 24, 2026
8c6daa6
Merge branch 'dev/sapatr/memprofilingwithdhat' of https://github.com/…
sapatrjv Mar 24, 2026
d23af0e
Enable Memory & CPU Profiling for df_engine
sapatrjv Mar 24, 2026
ad71e6b
Revert "Enable Memory & CPU Profiling for df_engine"
sapatrjv Mar 24, 2026
7482d31
Enable Memory & CPU Profiling for df_engine
sapatrjv Mar 24, 2026
aebde81
Enable Memory & CPU Profiling for df_engine
sapatrjv Mar 25, 2026
cc9e748
Enable Memory & CPU Profiling for df_engine
sapatrjv Mar 25, 2026
9222922
Merge branch 'main' into dev/sapatr/memprofilingwithdhat
sapatrjv Mar 25, 2026
6568f0d
Enable Memory & CPU Profiling for df_engine
sapatrjv Mar 25, 2026
a87c8fe
Enable Memory & CPU Profiling for df_engine.
sapatrjv Mar 30, 2026
157f5ab
Enable Memory & CPU Profiling for df_engine.
sapatrjv Mar 30, 2026
d73b21e
Enable Memory & CPU Profiling for df_engine
sapatrjv Mar 31, 2026
eed136f
Merge branch 'main' into dev/sapatr/memprofilingwithdhat
sapatrjv Mar 31, 2026
34c417c
Enable Memory & CPU Profiling for df_engine
sapatrjv Mar 31, 2026
0ad845f
Merge branch 'dev/sapatr/memprofilingwithdhat' of https://github.com/…
sapatrjv Mar 31, 2026
30b679a
Enable Memory & CPU Profiling for df_engine
sapatrjv Mar 31, 2026
a7c9240
Enable Memory & CPU Profiling for df_engine
sapatrjv Mar 31, 2026
12b6ba4
Enable Memory & CPU Profiling for df_engine
sapatrjv Mar 31, 2026
6e77031
Enable Memory & CPU Profiling for df_engine
sapatrjv Mar 31, 2026
37891c6
Enable Memory & CPU Profiling for df_engine
sapatrjv Apr 2, 2026
d95c7b7
Enable Memory & CPU Profiling for df_engine
sapatrjv Apr 2, 2026
b46664d
Enable Memory & CPU Profiling for df_engine
sapatrjv Apr 2, 2026
bba7248
Enable Memory & CPU Profiling for df_engine
sapatrjv Apr 2, 2026
64f7ec7
Enable Memory & CPU Profiling for df_engine
sapatrjv Apr 2, 2026
2af5ef7
Enable Memory & CPU Profiling for df_engine
sapatrjv Apr 2, 2026
8c9801b
Apply suggestion from @reyang
reyang Apr 2, 2026
7c28e45
Merge branch 'dev/sapatr/memprofilingwithdhat' of https://github.com/…
sapatrjv Apr 2, 2026
3897c66
Apply suggestions from code review
sapatrjv Apr 2, 2026
1720c5f
Enable Memory & CPU Profiling for df_engine
sapatrjv Apr 2, 2026
72377fb
Enable Memory & CPU Profiling for df_engine
sapatrjv Apr 2, 2026
42c08f2
Enable Memory & CPU Profiling for df_engine
sapatrjv Apr 2, 2026
6d6b315
Apply suggestions from code review
sapatrjv Apr 2, 2026
8856edc
Enable Memory & CPU Profiling for df_engine
sapatrjv Apr 3, 2026
d9e0bfd
Merge branch 'dev/sapatr/memprofilingwithdhat' of https://github.com/…
sapatrjv Apr 3, 2026
66f02a4
Enable Memory & CPU Profiling for df_engine
sapatrjv Apr 3, 2026
a9186cc
Enable Memory & CPU Profiling for df_engine
sapatrjv Apr 3, 2026
e533701
Enable Memory & CPU Profiling for df_engine
sapatrjv Apr 3, 2026
0358fa0
Enable Memory & CPU Profiling for df_engine
sapatrjv Apr 3, 2026
06111a1
Enable Memory & CPU Profiling for df_engine
sapatrjv Apr 3, 2026
ebdf8d4
Enable Memory & CPU Profiling for df_engine
sapatrjv Apr 3, 2026
cd8bf90
Enable Memory & CPU Profiling for df_engine
sapatrjv Apr 3, 2026
1b4185f
Enable Memory & CPU Profiling for df_engine.
sapatrjv Apr 14, 2026
600d336
Merge branch 'main' into dev/sapatr/memprofilingwithdhat
sapatrjv Apr 14, 2026
a6dad10
Merge branch 'main' into dev/sapatr/memprofilingwithdhat
jmacd Apr 15, 2026
658e151
Enable Memory & CPU Profiling for df_engine
sapatrjv Apr 15, 2026
285883f
Merge branch 'dev/sapatr/memprofilingwithdhat' of https://github.com/…
sapatrjv Apr 15, 2026
787bfa5
Remove code coverage on profiling path.
sapatrjv Apr 15, 2026
7c81041
Remove code coverage on profiling path.
sapatrjv Apr 15, 2026
7d2a092
Merge branch 'main' into dev/sapatr/memprofilingwithdhat
lalitb Apr 15, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions rust/otap-dataflow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ serde_json.workspace = true
clap.workspace = true
mimalloc = { workspace = true, optional = true }
sysinfo.workspace = true
dhat = { workspace = true, optional = true }
cfg-if.workspace = true

[target.'cfg(not(windows))'.dependencies]
tikv-jemallocator = { workspace = true, optional = true }
Expand Down Expand Up @@ -80,6 +82,7 @@ bitflags = "2.10"
bytes = "1.11.1"
bytemuck = "1.24"
blake3 = "1.5.5"
cfg-if = "1.0.4"
Comment thread
sapatrjv marked this conversation as resolved.
Comment thread
sapatrjv marked this conversation as resolved.
chrono = { version = "0.4", features = ["serde"] }
chrono-tz = { version = "0.10" }
ciborium = "0.2.2"
Expand All @@ -88,6 +91,7 @@ core_affinity = "0.8.3"
criterion = "0.8.0"
data-encoding = "2.9.0"
datafusion = { version = "53.0.0", default-features = false }
dhat = "0.3.3"
fluke-hpack = "0.3.1"
flume = { version = "0.12.0", default-features = false, features = ["async"] }
futures = "0.3.32"
Expand Down Expand Up @@ -239,6 +243,11 @@ aws = ["otap-df-otap/aws"]
unsafe-optimizations = ["unchecked-index", "unchecked-arithmetic"]
unchecked-index = []
unchecked-arithmetic = []
dhat-heap = [
"dep:dhat",
]

Comment thread
sapatrjv marked this conversation as resolved.


# Dev/test tooling (fake data generator).
dev-tools = ["otap-df-core-nodes/dev-tools"]
Expand Down Expand Up @@ -322,6 +331,10 @@ missing_crate_level_docs = "deny"
[profile.release]
debug = "line-tables-only" # minimum required for profiling

[profile.profiling]
inherits = "release"
debug = 2

# A more in-depth analysis is necessary to determine the optimal parameters for the release profile.
#[profile.release]
#lto = "thin"
Expand Down
68 changes: 68 additions & 0 deletions rust/otap-dataflow/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,74 @@
docker build --build-context otel-arrow=../../ -f Dockerfile -t df_engine .
```

## Profiling
Comment thread
sapatrjv marked this conversation as resolved.

This section covers Memory & CPU Profiling for df_engine using dhat-rs and
samply profilers respectively.

**Requirements**:

- dhat-rs <https://docs.rs/dhat/latest/dhat/>
- samply <https://github.com/mstange/samply>

**Installation**:

```cmd/pwsh/bash
cargo install --locked samply
Comment thread
sapatrjv marked this conversation as resolved.
Outdated
```

Note: dhat-rs is a library crate designed for heap profiling in Rust programs.
Hence it does not require explicit installation.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is not explicitly required, why would we want to document it under the "Requirements" section?


**Build**:

**Build for both CPU & Memory profiling**:
Comment thread
sapatrjv marked this conversation as resolved.
Outdated

```cmd/pwsh/bash
cargo build --profile profiling --no-default-features --features dhat-heap --workspace
```

**Build for only CPU profiling**:

```cmd/pwsh/bash
cargo build --profile profiling --workspace
```

**Run**:

**Run with both Memory & CPU profiling enabled**:
Comment thread
reyang marked this conversation as resolved.
Outdated

```cmd/pwsh
samply record .\target\profiling\df_engine.exe --config .\configs\otap-noop.yaml
```

Check failure on line 426 in rust/otap-dataflow/README.md

View workflow job for this annotation

GitHub Actions / markdownlint

Fenced code blocks should be surrounded by blank lines

rust/otap-dataflow/README.md:426 MD031/blanks-around-fences Fenced code blocks should be surrounded by blank lines [Context: "```"] https://github.com/DavidAnson/markdownlint/blob/v0.40.0/doc/md031.md
or

```bash
samply record ./target/profiling/df_engine --config ./configs/otap-noop.yaml
```

**Run with only Memory profiling enabled**:
Comment thread
sapatrjv marked this conversation as resolved.
Outdated

```cmd/pwsh
Comment thread
sapatrjv marked this conversation as resolved.
Outdated
.\target\profiling\df_engine.exe --config .\configs\otap-noop.yaml
```

Check failure on line 437 in rust/otap-dataflow/README.md

View workflow job for this annotation

GitHub Actions / markdownlint

Fenced code blocks should be surrounded by blank lines

rust/otap-dataflow/README.md:437 MD031/blanks-around-fences Fenced code blocks should be surrounded by blank lines [Context: "```"] https://github.com/DavidAnson/markdownlint/blob/v0.40.0/doc/md031.md
or

```bash
./target/profiling/df_engine --config ./configs/otap-noop.yaml
Comment thread
reyang marked this conversation as resolved.
Outdated
```

**Result**:

On successful termination of df_engine.exe, it will generate dhat-heap.json file
Comment thread
sapatrjv marked this conversation as resolved.
Outdated
for Memory profiling that need to be rendered by uploading it to:
Comment thread
sapatrjv marked this conversation as resolved.
Outdated
<https://nnethercote.github.io/dh_view/>.

CPU profiling output will be automatically renderd on browser.
Comment thread
sapatrjv marked this conversation as resolved.
Outdated

Note: dhat needs a clean shutdown to generate dhat-heap.json file. In df_engine
this can be done manually with Ctrl-C.
Comment thread
sapatrjv marked this conversation as resolved.
Outdated
Comment thread
sapatrjv marked this conversation as resolved.
Outdated

## Contributing

- [Contribution Guidelines](CONTRIBUTING.md)
Expand Down
88 changes: 69 additions & 19 deletions rust/otap-dataflow/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,73 @@ use otap_df_core_nodes as _;
use otap_df_otap::OTAP_PIPELINE_FACTORY;
use std::path::PathBuf;
use sysinfo::System;
use cfg_if::cfg_if;

// -----------------------------------------------------------------------------
// Feature guard: jemalloc + mimalloc + dhat-heap any two together should fail (non-Windows only)
Comment thread
sapatrjv marked this conversation as resolved.
Outdated
// -----------------------------------------------------------------------------
#[cfg(all(
not(windows),
feature = "jemalloc",
feature = "mimalloc",
not(any(test, doc)),
not(clippy)
not(clippy),
any(
all(feature = "dhat-heap", feature = "mimalloc"),
all(feature = "dhat-heap", feature = "jemalloc"),
all(feature = "jemalloc", feature = "mimalloc"),
)
))]
compile_error!(
"Features `jemalloc` and `mimalloc` are mutually exclusive. \
To build with mimalloc, use: cargo build --release --no-default-features --features mimalloc"
"Allocator features are mutually exclusive. Enable only one allocator: `dhat-heap`, `mimalloc`, `jemalloc`. \
Example: \
(mimalloc): cargo build --release --no-default-features --features mimalloc. \
(jemalloc): cargo build --release --no-default-features --features jemalloc. \
(dhat): cargo build --profile profiling --no-default-features --features dhat-heap."
);

#[cfg(feature = "dhat-heap")]
Comment thread
sapatrjv marked this conversation as resolved.
use {
std::sync::{LazyLock, Mutex},
dhat::Profiler,
};

#[cfg(feature = "mimalloc")]
use mimalloc::MiMalloc;

#[cfg(all(not(windows), feature = "jemalloc"))]
use tikv_jemallocator::Jemalloc;

// -----------------------------------------------------------------------------
// Global allocator selection.
// -----------------------------------------------------------------------------
cfg_if! {
// dhat (profiling) — wins everywhere when enabled
if #[cfg(feature = "dhat-heap")] {
#[global_allocator]
static GLOBAL: dhat::Alloc = dhat::Alloc;
static DHAT_PROFILER: LazyLock<Mutex<Option<Profiler>>> = LazyLock::new(|| Mutex::new(None));

fn dhat_start() {
let mut profiler = DHAT_PROFILER.lock().unwrap();
*profiler = Some(dhat::Profiler::new_heap());
}

fn dhat_finish() {
// Dropping the Profiler is what triggers dhat-heap.json generation. [1](https://outlook.office365.com/owa/?ItemID=AAMkADYwZTk3ZGM5LTU2ZjItNDM2NC1hZmQ0LWY5NjhmZmM5M2NiNwBGAAAAAAC7q57xFfmuSb3iOsuh% 2fVOABwAlpRXD5QvATJ55erMTPKHzAAAAH3TYAABzDoBtHPxlRIZv17%2bj%2fBx2AAhyk6imAAA%3d&exvsurl=1&viewmodel=ReadMessageItem)
Comment thread
sapatrjv marked this conversation as resolved.
Outdated
let mut profiler = DHAT_PROFILER.lock().unwrap();
let _ = profiler.take();
}
Comment thread
sapatrjv marked this conversation as resolved.
Outdated

// Windows default: mimalloc
} else if #[cfg(feature = "mimalloc")] {
#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;

// Linux default: jemalloc
} else if #[cfg(all(not(windows), feature = "jemalloc"))] {
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;
}
}

// Crypto provider features are mutually exclusive.
// The `not(any(test, doc))` and `not(clippy)` guards mirror the jemalloc/mimalloc
// pattern so that `cargo test --all-features` (used in CI) does not fail.
Expand Down Expand Up @@ -72,19 +126,6 @@ compile_error!(
Use --no-default-features to disable the default crypto provider, then enable exactly one."
);

#[cfg(feature = "mimalloc")]
use mimalloc::MiMalloc;

#[cfg(all(not(windows), feature = "jemalloc", not(feature = "mimalloc")))]
use tikv_jemallocator::Jemalloc;

#[cfg(feature = "mimalloc")]
#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;

#[cfg(all(not(windows), feature = "jemalloc", not(feature = "mimalloc")))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;

#[derive(Parser, Debug)]
#[command(
Expand Down Expand Up @@ -291,6 +332,10 @@ fn validate_engine_components(
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
#[cfg(feature = "dhat-heap")]
{
dhat_start()
Comment thread
sapatrjv marked this conversation as resolved.
Outdated
}
// Install the rustls crypto provider selected by the crypto-* feature flag.
// This must happen before any TLS connections (reqwest, tonic, etc.).
otap_df_otap::crypto::install_crypto_provider()
Expand Down Expand Up @@ -318,6 +363,11 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

let controller = Controller::new(&OTAP_PIPELINE_FACTORY);
let result = controller.run_forever(engine_cfg);
#[cfg(feature = "dhat-heap")]
{
dhat_finish()
}

match result {
Ok(_) => {
println!("Pipeline run successfully");
Expand Down
Loading