Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ __blobstorage__
.githubchangeloggenerator.cache.log
.githubchangeloggenerator.cache/
.githubchangeloggenerator*
data
.zed/

# Add all Cargo.lock files except for those in binary crates
Expand Down
9 changes: 9 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ regex = { version = "1" }
thiserror = { version = "2" }
url = { version = "2" }
percent-encoding-rfc3986 = { version = "0.1.3" }
tempfile = { version = "3" }
uuid = { version = "1" }

# runtime / async
Expand Down Expand Up @@ -101,3 +102,11 @@ Arro3 = "Arro3"
AKS = "AKS"
# to avoid using 'type' as a field name.
tpe = "tpe"

# for better flamegraphs when benchmarking
[profile.bench]
debug = true

[profile.profiling]
inherits = "release"
debug = true
25 changes: 12 additions & 13 deletions crates/benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,22 @@ license = "Apache-2.0"
keywords = ["deltalake", "delta", "datalake"]
description = "Delta-rs Benchmarks"
edition = "2021"
publish = false

[dependencies]
clap = { version = "4", features = [ "derive" ] }
chrono = { version = "0.4.31", default-features = false, features = ["clock"] }
tokio = { version = "1", features = ["fs", "macros", "rt", "io-util"] }

# arrow
arrow = { workspace = true }
arrow-array = { workspace = true }

# serde
serde_json = { workspace = true }

# datafusion
datafusion = { workspace = true }
clap = { version = "4", features = ["derive"] }
tokio = { workspace = true, features = ["fs", "macros", "rt", "io-util"] }
url = { workspace = true }
tempfile = { workspace = true }

[dependencies.deltalake-core]
path = "../core"
version = "0"
features = ["datafusion"]

[dev-dependencies]
divan = "0.1"

[[bench]]
name = "merge"
harness = false
80 changes: 47 additions & 33 deletions crates/benchmarks/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,52 +4,66 @@ The merge benchmarks are similar to the ones used by [Delta Spark](https://githu

## Dataset

Databricks maintains a public S3 bucket of the TPC-DS dataset with various factor where requesters must pay to download this dataset. Below is an example of how to list the 1gb scale factor
To generate the database, `duckdb` can be used. Install `duckdb` by following [these instructions](https://duckdb.org/#quickinstall).

Run the following commands:

```bash
❯ duckdb
D CALL dsdgen(sf = 1);
100% ▕██████████████████████████████████████▏ (00:00:05.76 elapsed)
┌─────────┐
│ Success │
│ boolean │
├─────────┤
│ 0 rows │
└─────────┘
D EXPORT DATABASE 'tpcds_parquet' (FORMAT PARQUET);
```
aws s3api list-objects --bucket devrel-delta-datasets --request-payer requester --prefix tpcds-2.13/tpcds_sf1_parquet/web_returns/
```

You can generate the TPC-DS dataset yourself by downloading and compiling [the generator](https://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp)
You may need to update the CFLAGS to include `-fcommon` to compile on newer versions of GCC.
This will generate a folder called `tpcds_parquet` containing many parquet files. Place it at `crates/benchmarks/data/tpcds_parquet` (or set `TPCDS_PARQUET_DIR`). Credits to [Xuanwo's Blog](https://xuanwo.io/links/2025/02/duckdb-is-the-best-tpc-data-generator/).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


## Running benchmarks

## Commands
These commands can be executed from the root of the benchmark crate. Some commands depend on the existence of the TPC-DS Dataset existing.
Benchmarks use Divan and time only the merge operation. A temporary Delta table is created per iteration from `web_returns.parquet` and removed afterwards.

### Convert
Converts a TPC-DS web_returns csv into a Delta table
Assumes the dataset is pipe delimited and records do not have a trailing delimiter
Environment variables:
- `TPCDS_PARQUET_DIR` (optional): directory containing `web_returns.parquet`. Default: `crates/benchmarks/data/tpcds_parquet`.

From the repo root:
```
cargo run --release --bin merge -- convert data/tpcds/web_returns.dat data/web_returns
cargo bench -p delta-benchmarks --bench merge
```

### Standard
Execute the standard merge bench suite.
Results can be saved to a delta table for further analysis.
This table has the following schema:

group_id: Used to group all tests that executed as a part of this call. Default value is the timestamp of execution
name: The benchmark name that was executed
sample: The iteration number for a given benchmark name
duration_ms: How long the benchmark took in ms
data: Free field to pack any additional data

Filter a specific suite:
```
cargo run --release --bin merge -- standard data/web_returns 1 data/merge_results
cargo bench -p delta-benchmarks --bench merge -- delete_only
cargo bench -p delta-benchmarks --bench merge -- multiple_insert_only
cargo bench -p delta-benchmarks --bench merge -- upsert_file_matched
```

### Compare
Compare the results of two different runs.
The a Delta table paths and the `group_id` of each run and obtain the speedup for each test case
## Profiling script

```
cargo run --release --bin merge -- compare data/benchmarks/ 1698636172801 data/benchmarks/ 1699759539902
A simple CLI is available to run a single merge with configurable parameters (useful for profiling or ad-hoc runs). It creates a fresh temporary Delta table per sample from `web_returns.parquet`, times only the merge, and prints duration and metrics.

Run (from repo root):
```bash
cargo run --profile profiling -p delta-benchmarks -- upsert --matched 0.01 --not-matched 0.10
```

### Show
Show all benchmarks results from a delta table
Options:
- `upsert | delete | insert`: operation to benchmark
- `--matched <fraction>`: fraction of rows that match existing keys (default 0.01)
- `--not-matched <fraction>`: fraction of rows that do not match (default 0.10)

```
cargo run --release --bin merge -- show data/benchmark
```
### Flamegraphs using `samply`

Using `samply`, you can generate flamegraphs from the profile script.

To start,

```bash
cargo install samply --locked
cargo build --profile profiling -p delta-benchmarks
samply record ./target/profiling/delta-benchmarks upsert
```
96 changes: 96 additions & 0 deletions crates/benchmarks/benches/merge.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use std::path::PathBuf;

use delta_benchmarks::{
merge_delete, merge_insert, merge_upsert, prepare_source_and_table, MergeOp, MergePerfParams,
};

use divan::{AllocProfiler, Bencher};

fn main() {
divan::main();
}

#[global_allocator]
static ALLOC: AllocProfiler = AllocProfiler::system();

fn bench_merge(bencher: Bencher, op: MergeOp, params: &MergePerfParams) {
let rt = tokio::runtime::Runtime::new().unwrap();
bencher
.with_inputs(|| {
let tmp_dir = tempfile::tempdir().unwrap();
let parquet_dir = PathBuf::from(
std::env::var("TPCDS_PARQUET_DIR")
.unwrap_or_else(|_| "data/tpcds_parquet".to_string()),
);
rt.block_on(async move {
let (source, table) = prepare_source_and_table(params, &tmp_dir, &parquet_dir)
.await
.unwrap();
(source, table, tmp_dir)
})
})
.bench_local_values(|(source, table, tmp_dir)| {
rt.block_on(async move {
let _ = divan::black_box(op(source, table).unwrap().await.unwrap());
});
drop(tmp_dir);
});
}

#[divan::bench(args = [
MergePerfParams {
sample_matched_rows: 0.05,
sample_not_matched_rows: 0.0,
}
])]
fn delete_only(bencher: Bencher, params: &MergePerfParams) {
bench_merge(bencher, merge_delete, params);
}

#[divan::bench(args = [
MergePerfParams {
sample_matched_rows: 0.00,
sample_not_matched_rows: 0.05,
},
MergePerfParams {
sample_matched_rows: 0.00,
sample_not_matched_rows: 0.50,
},
MergePerfParams {
sample_matched_rows: 0.00,
sample_not_matched_rows: 1.0,
},
])]
fn multiple_insert_only(bencher: Bencher, params: &MergePerfParams) {
bench_merge(bencher, merge_insert, params);
}

#[divan::bench(args = [
MergePerfParams {
sample_matched_rows: 0.01,
sample_not_matched_rows: 0.1,
},
MergePerfParams {
sample_matched_rows: 0.1,
sample_not_matched_rows: 0.0,
},
MergePerfParams {
sample_matched_rows: 0.1,
sample_not_matched_rows: 0.01,
},
MergePerfParams {
sample_matched_rows: 0.5,
sample_not_matched_rows: 0.001,
},
MergePerfParams {
sample_matched_rows: 0.99,
sample_not_matched_rows: 0.001,
},
MergePerfParams {
sample_matched_rows: 0.001,
sample_not_matched_rows: 0.001,
},
])]
fn upsert_file_matched(bencher: Bencher, params: &MergePerfParams) {
bench_merge(bencher, merge_upsert, params);
}
2 changes: 2 additions & 0 deletions crates/benchmarks/data/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*
!.gitignore
Loading
Loading