Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion crates/benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@ edition = "2021"
publish = false

[dependencies]
clap = { version = "4", features = ["derive"] }
clap = { version = "4", features = ["derive", "env"] }
tokio = { workspace = true, features = ["fs", "macros", "rt", "io-util"] }
url = { workspace = true }
tempfile = { workspace = true }
anyhow = "1"

[dependencies.deltalake-core]
path = "../core"
Expand All @@ -26,3 +27,11 @@ divan = "0.1"
[[bench]]
name = "merge"
harness = false

[[bench]]
name = "smoke"
harness = false

[[bench]]
name = "tpcds"
harness = false
15 changes: 11 additions & 4 deletions crates/benchmarks/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Merge
# Benchmarks

The merge benchmarks are similar to the ones used by [Delta Spark](https://github.com/delta-io/delta/pull/1835).


Expand Down Expand Up @@ -48,13 +49,19 @@ A simple CLI is available to run a single merge with configurable parameters (us

Run (from repo root):
```bash
cargo run --profile profiling -p delta-benchmarks -- upsert --matched 0.01 --not-matched 0.10
cargo run --profile profiling -p delta-benchmarks -- merge --op upsert --matched 0.01 --not-matched 0.10
```

Options:
- `upsert | delete | insert`: operation to benchmark
- `--op <upsert|delete|insert>`: operation to benchmark
- `--matched <fraction>`: fraction of rows that match existing keys (default 0.01)
- `--not-matched <fraction>`: fraction of rows that do not match (default 0.10)
- `--case <name>`: run one of the predefined merge scenarios mirrored from the Delta Spark suite

List cases with:
```bash
cargo run --release -p delta-benchmarks -- merge --case single_insert_only_filesMatchedFraction_0.05_rowsNotMatchedFraction_0.05
```

### Flamegraphs using `samply`

Expand All @@ -66,4 +73,4 @@ To start,
cargo install samply --locked
cargo build --profile profiling -p delta-benchmarks
samply record ./target/profiling/delta-benchmarks upsert
```
```
78 changes: 18 additions & 60 deletions crates/benchmarks/benches/merge.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::path::PathBuf;

use delta_benchmarks::{
merge_delete, merge_insert, merge_upsert, prepare_source_and_table, MergeOp, MergePerfParams,
delete_only_cases, insert_only_cases, prepare_source_and_table, upsert_cases, MergeTestCase,
};

use divan::{AllocProfiler, Bencher};
Expand All @@ -13,7 +13,7 @@ fn main() {
#[global_allocator]
static ALLOC: AllocProfiler = AllocProfiler::system();

fn bench_merge(bencher: Bencher, op: MergeOp, params: &MergePerfParams) {
fn bench_merge_case(bencher: Bencher, case: &MergeTestCase) {
let rt = tokio::runtime::Runtime::new().unwrap();
bencher
.with_inputs(|| {
Expand All @@ -23,74 +23,32 @@ fn bench_merge(bencher: Bencher, op: MergeOp, params: &MergePerfParams) {
.unwrap_or_else(|_| "data/tpcds_parquet".to_string()),
);
rt.block_on(async move {
let (source, table) = prepare_source_and_table(params, &tmp_dir, &parquet_dir)
.await
.unwrap();
(source, table, tmp_dir)
let (source, table) =
prepare_source_and_table(&case.params, &tmp_dir, &parquet_dir)
.await
.expect("prepare inputs");
(case, source, table, tmp_dir)
})
})
.bench_local_values(|(source, table, tmp_dir)| {
.bench_local_values(|(case, source, table, tmp_dir)| {
rt.block_on(async move {
let _ = divan::black_box(op(source, table).unwrap().await.unwrap());
divan::black_box(case.execute(source, table).await.expect("execute merge"));
});
drop(tmp_dir);
});
}

#[divan::bench(args = [
MergePerfParams {
sample_matched_rows: 0.05,
sample_not_matched_rows: 0.0,
}
])]
fn delete_only(bencher: Bencher, params: &MergePerfParams) {
bench_merge(bencher, merge_delete, params);
#[divan::bench(args = insert_only_cases())]
fn insert_only(bencher: Bencher, case: &MergeTestCase) {
bench_merge_case(bencher, case);
}

#[divan::bench(args = [
MergePerfParams {
sample_matched_rows: 0.00,
sample_not_matched_rows: 0.05,
},
MergePerfParams {
sample_matched_rows: 0.00,
sample_not_matched_rows: 0.50,
},
MergePerfParams {
sample_matched_rows: 0.00,
sample_not_matched_rows: 1.0,
},
])]
fn multiple_insert_only(bencher: Bencher, params: &MergePerfParams) {
bench_merge(bencher, merge_insert, params);
#[divan::bench(args = delete_only_cases())]
fn delete_only(bencher: Bencher, case: &MergeTestCase) {
bench_merge_case(bencher, case);
}

#[divan::bench(args = [
MergePerfParams {
sample_matched_rows: 0.01,
sample_not_matched_rows: 0.1,
},
MergePerfParams {
sample_matched_rows: 0.1,
sample_not_matched_rows: 0.0,
},
MergePerfParams {
sample_matched_rows: 0.1,
sample_not_matched_rows: 0.01,
},
MergePerfParams {
sample_matched_rows: 0.5,
sample_not_matched_rows: 0.001,
},
MergePerfParams {
sample_matched_rows: 0.99,
sample_not_matched_rows: 0.001,
},
MergePerfParams {
sample_matched_rows: 0.001,
sample_not_matched_rows: 0.001,
},
])]
fn upsert_file_matched(bencher: Bencher, params: &MergePerfParams) {
bench_merge(bencher, merge_upsert, params);
#[divan::bench(args = upsert_cases())]
fn upsert(bencher: Bencher, case: &MergeTestCase) {
bench_merge_case(bencher, case);
}
35 changes: 35 additions & 0 deletions crates/benchmarks/benches/smoke.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use delta_benchmarks::{run_smoke_once, SmokeParams};
use divan::{AllocProfiler, Bencher};
use url::Url;

fn main() {
divan::main();
}

#[global_allocator]
static ALLOC: AllocProfiler = AllocProfiler::system();

type Runtime = tokio::runtime::Runtime;

fn bench_smoke(bencher: Bencher, params: &SmokeParams) {
let rt = Runtime::new().expect("create tokio runtime");
bencher
.with_inputs(|| tempfile::tempdir().expect("create temp dir"))
.bench_local_values(|tmp_dir| {
let table_url = Url::from_directory_path(tmp_dir.path()).expect("tmp dir url");
rt.block_on(async {
run_smoke_once(&table_url, params).await.expect("smoke run");
});
drop(tmp_dir);
});
}

#[divan::bench(args = [
SmokeParams { rows: 2 },
SmokeParams { rows: 10 },
SmokeParams { rows: 100 },
SmokeParams { rows: 1_000 },
])]
fn smoke(bencher: Bencher, params: &SmokeParams) {
bench_smoke(bencher, params);
}
42 changes: 42 additions & 0 deletions crates/benchmarks/benches/tpcds.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::path::PathBuf;

use delta_benchmarks::{register_tpcds_tables, tpcds_query, tpcds_query_names};
use divan::{AllocProfiler, Bencher};

fn main() {
divan::main();
}

#[global_allocator]
static ALLOC: AllocProfiler = AllocProfiler::system();

#[divan::bench(args = tpcds_query_names())]
fn tpcds_query_execution(bencher: Bencher, name: &'static str) {
let rt = tokio::runtime::Runtime::new().unwrap();
let sql = tpcds_query(name)
.expect("query must exist")
.split(";")
.filter(|s| !s.trim().is_empty())
.collect::<Vec<_>>();

let tmp_dir = tempfile::tempdir().unwrap();
let parquet_dir = PathBuf::from(
std::env::var("TPCDS_PARQUET_DIR").unwrap_or_else(|_| "data/tpcds_parquet".to_string()),
);

let ctx = rt.block_on(async {
register_tpcds_tables(&tmp_dir, &parquet_dir)
.await
.expect("failed to register TPC-DS tables")
});

bencher.bench_local(|| {
rt.block_on(async {
for sql in sql.iter() {
let df = ctx.sql(sql).await.expect("failed to create dataframe");
divan::black_box(df.collect().await.expect("failed to execute query"));
}
});
});
drop(tmp_dir);
}
1 change: 1 addition & 0 deletions crates/benchmarks/queries/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
TPC-DS SQL is vendored from [datafusion-benchmarks](https://github.com/apache/datafusion-benchmarks) repository.
26 changes: 26 additions & 0 deletions crates/benchmarks/queries/tpcds/q1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
-- SQLBench-DS query 1 derived from TPC-DS query 1 under the terms of the TPC Fair Use Policy.
-- TPC-DS queries are Copyright 2021 Transaction Processing Performance Council.
-- This query was generated at scale factor 1.
with customer_total_return as
(select sr_customer_sk as ctr_customer_sk
,sr_store_sk as ctr_store_sk
,sum(SR_RETURN_AMT_INC_TAX) as ctr_total_return
from store_returns
,date_dim
where sr_returned_date_sk = d_date_sk
and d_year =1999
group by sr_customer_sk
,sr_store_sk)
select c_customer_id
from customer_total_return ctr1
,store
,customer
where ctr1.ctr_total_return > (select avg(ctr_total_return)*1.2
from customer_total_return ctr2
where ctr1.ctr_store_sk = ctr2.ctr_store_sk)
and s_store_sk = ctr1.ctr_store_sk
and s_state = 'TN'
and ctr1.ctr_customer_sk = c_customer_sk
order by c_customer_id
LIMIT 100;

60 changes: 60 additions & 0 deletions crates/benchmarks/queries/tpcds/q10.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
-- SQLBench-DS query 10 derived from TPC-DS query 10 under the terms of the TPC Fair Use Policy.
-- TPC-DS queries are Copyright 2021 Transaction Processing Performance Council.
-- This query was generated at scale factor 1.
select
cd_gender,
cd_marital_status,
cd_education_status,
count(*) cnt1,
cd_purchase_estimate,
count(*) cnt2,
cd_credit_rating,
count(*) cnt3,
cd_dep_count,
count(*) cnt4,
cd_dep_employed_count,
count(*) cnt5,
cd_dep_college_count,
count(*) cnt6
from
customer c,customer_address ca,customer_demographics
where
c.c_current_addr_sk = ca.ca_address_sk and
ca_county in ('Clinton County','Platte County','Franklin County','Louisa County','Harmon County') and
cd_demo_sk = c.c_current_cdemo_sk and
exists (select *
from store_sales,date_dim
where c.c_customer_sk = ss_customer_sk and
ss_sold_date_sk = d_date_sk and
d_year = 2002 and
d_moy between 3 and 3+3) and
(exists (select *
from web_sales,date_dim
where c.c_customer_sk = ws_bill_customer_sk and
ws_sold_date_sk = d_date_sk and
d_year = 2002 and
d_moy between 3 ANd 3+3) or
exists (select *
from catalog_sales,date_dim
where c.c_customer_sk = cs_ship_customer_sk and
cs_sold_date_sk = d_date_sk and
d_year = 2002 and
d_moy between 3 and 3+3))
group by cd_gender,
cd_marital_status,
cd_education_status,
cd_purchase_estimate,
cd_credit_rating,
cd_dep_count,
cd_dep_employed_count,
cd_dep_college_count
order by cd_gender,
cd_marital_status,
cd_education_status,
cd_purchase_estimate,
cd_credit_rating,
cd_dep_count,
cd_dep_employed_count,
cd_dep_college_count
LIMIT 100;

Loading
Loading