From a76ff569ab7c674daa739fe419aca3fb23f16528 Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Sat, 11 Oct 2025 13:50:38 -0400 Subject: [PATCH 1/3] Add new benchmark script, harness, and include profiling information Signed-off-by: Abhi Agarwal --- .gitignore | 1 - Cargo.toml | 9 + crates/benchmarks/Cargo.toml | 16 +- crates/benchmarks/README.md | 80 ++-- crates/benchmarks/benches/merge.rs | 90 ++++ crates/benchmarks/data/.gitignore | 2 + crates/benchmarks/src/bin/merge.rs | 665 ----------------------------- crates/benchmarks/src/lib.rs | 182 ++++++++ crates/benchmarks/src/main.rs | 66 +++ crates/catalog-unity/Cargo.toml | 2 +- crates/core/Cargo.toml | 4 +- crates/gcp/Cargo.toml | 2 +- crates/mount/Cargo.toml | 2 +- crates/test/Cargo.toml | 2 +- 14 files changed, 415 insertions(+), 708 deletions(-) create mode 100644 crates/benchmarks/benches/merge.rs create mode 100644 crates/benchmarks/data/.gitignore delete mode 100644 crates/benchmarks/src/bin/merge.rs create mode 100644 crates/benchmarks/src/lib.rs create mode 100644 crates/benchmarks/src/main.rs diff --git a/.gitignore b/.gitignore index 06425842e4..3808335f5c 100644 --- a/.gitignore +++ b/.gitignore @@ -22,7 +22,6 @@ __blobstorage__ .githubchangeloggenerator.cache.log .githubchangeloggenerator.cache/ .githubchangeloggenerator* -data .zed/ # Add all Cargo.lock files except for those in binary crates diff --git a/Cargo.toml b/Cargo.toml index 08ab3fddb0..c781dfe102 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,7 @@ regex = { version = "1" } thiserror = { version = "2" } url = { version = "2" } percent-encoding-rfc3986 = { version = "0.1.3" } +tempfile = { version = "3" } uuid = { version = "1" } # runtime / async @@ -101,3 +102,11 @@ Arro3 = "Arro3" AKS = "AKS" # to avoid using 'type' as a field name. tpe = "tpe" + +# for better flamegraphs when benchmarking +[profile.bench] +debug = true + +[profile.profiling] +inherits = "release" +debug = true diff --git a/crates/benchmarks/Cargo.toml b/crates/benchmarks/Cargo.toml index e166389bae..bc3ea1bd9c 100644 --- a/crates/benchmarks/Cargo.toml +++ b/crates/benchmarks/Cargo.toml @@ -7,11 +7,14 @@ license = "Apache-2.0" keywords = ["deltalake", "delta", "datalake"] description = "Delta-rs Benchmarks" edition = "2021" +publish = false [dependencies] -clap = { version = "4", features = [ "derive" ] } -chrono = { version = "0.4.31", default-features = false, features = ["clock"] } -tokio = { version = "1", features = ["fs", "macros", "rt", "io-util"] } +clap = { version = "4", features = ["derive"] } +tokio = { workspace = true, features = ["fs", "macros", "rt", "io-util"] } +chrono = { workspace = true } +url = { workspace = true } +tempfile = { workspace = true } # arrow arrow = { workspace = true } @@ -27,3 +30,10 @@ datafusion = { workspace = true } path = "../core" version = "0" features = ["datafusion"] + +[dev-dependencies] +divan = "0.1" + +[[bench]] +name = "merge" +harness = false diff --git a/crates/benchmarks/README.md b/crates/benchmarks/README.md index 959a857977..523ecd1161 100644 --- a/crates/benchmarks/README.md +++ b/crates/benchmarks/README.md @@ -4,52 +4,66 @@ The merge benchmarks are similar to the ones used by [Delta Spark](https://githu ## Dataset -Databricks maintains a public S3 bucket of the TPC-DS dataset with various factor where requesters must pay to download this dataset. Below is an example of how to list the 1gb scale factor +To generate the database, `duckdb` can be used. Install `duckdb` by following [these instructions](https://duckdb.org/#quickinstall). +Run the following commands: + +```bash +❯ duckdb +D CALL dsdgen(sf = 1); +100% ▕██████████████████████████████████████▏ (00:00:05.76 elapsed) +┌─────────┐ +│ Success │ +│ boolean │ +├─────────┤ +│ 0 rows │ +└─────────┘ +D EXPORT DATABASE 'tpcds_parquet' (FORMAT PARQUET); ``` -aws s3api list-objects --bucket devrel-delta-datasets --request-payer requester --prefix tpcds-2.13/tpcds_sf1_parquet/web_returns/ -``` -You can generate the TPC-DS dataset yourself by downloading and compiling [the generator](https://www.tpc.org/tpc_documents_current_versions/current_specifications5.asp) -You may need to update the CFLAGS to include `-fcommon` to compile on newer versions of GCC. +This will generate a folder called `tpcds_parquet` containing many parquet files. Place it at `crates/benchmarks/data/tpcds_parquet` (or set `TPCDS_PARQUET_DIR`). Credits to [Xuanwo's Blog](https://xuanwo.io/links/2025/02/duckdb-is-the-best-tpc-data-generator/). + +## Running benchmarks -## Commands -These commands can be executed from the root of the benchmark crate. Some commands depend on the existence of the TPC-DS Dataset existing. +Benchmarks use Divan and time only the merge operation. A temporary Delta table is created per iteration from `web_returns.parquet` and removed afterwards. -### Convert -Converts a TPC-DS web_returns csv into a Delta table -Assumes the dataset is pipe delimited and records do not have a trailing delimiter +Environment variables: +- `TPCDS_PARQUET_DIR` (optional): directory containing `web_returns.parquet`. Default: `crates/benchmarks/data/tpcds_parquet`. +From the repo root: ``` - cargo run --release --bin merge -- convert data/tpcds/web_returns.dat data/web_returns +cargo bench -p delta-benchmarks --bench merge ``` -### Standard -Execute the standard merge bench suite. -Results can be saved to a delta table for further analysis. -This table has the following schema: - -group_id: Used to group all tests that executed as a part of this call. Default value is the timestamp of execution -name: The benchmark name that was executed -sample: The iteration number for a given benchmark name -duration_ms: How long the benchmark took in ms -data: Free field to pack any additional data - +Filter a specific suite: ``` - cargo run --release --bin merge -- standard data/web_returns 1 data/merge_results +cargo bench -p delta-benchmarks --bench merge -- delete_only +cargo bench -p delta-benchmarks --bench merge -- multiple_insert_only +cargo bench -p delta-benchmarks --bench merge -- upsert_file_matched ``` -### Compare -Compare the results of two different runs. -The a Delta table paths and the `group_id` of each run and obtain the speedup for each test case +## Profiling script -``` - cargo run --release --bin merge -- compare data/benchmarks/ 1698636172801 data/benchmarks/ 1699759539902 +A simple CLI is available to run a single merge with configurable parameters (useful for profiling or ad-hoc runs). It creates a fresh temporary Delta table per sample from `web_returns.parquet`, times only the merge, and prints duration and metrics. + +Run (from repo root): +```bash +cargo run --profile profiling -p delta-benchmarks -- upsert --matched 0.01 --not-matched 0.10 ``` -### Show -Show all benchmarks results from a delta table +Options: +- `upsert | delete | insert`: operation to benchmark +- `--matched `: fraction of rows that match existing keys (default 0.01) +- `--not-matched `: fraction of rows that do not match (default 0.10) -``` - cargo run --release --bin merge -- show data/benchmark -``` +### Flamegraphs using `samply` + +Using `samply`, you can generate flamegraphs from the profile script. + +To start, + +```bash +cargo install samply --locked +cargo build --profile profiling -p delta-benchmarks +samply record ./target/profiling/delta-benchmarks upsert +``` \ No newline at end of file diff --git a/crates/benchmarks/benches/merge.rs b/crates/benchmarks/benches/merge.rs new file mode 100644 index 0000000000..4851d934f8 --- /dev/null +++ b/crates/benchmarks/benches/merge.rs @@ -0,0 +1,90 @@ +use std::path::PathBuf; + +use delta_benchmarks::{ + merge_delete, merge_insert, merge_upsert, prepare_source_and_table, MergeOp, MergePerfParams, +}; + +use divan::{AllocProfiler, Bencher}; + +fn main() { + divan::main(); +} + +#[global_allocator] +static ALLOC: AllocProfiler = AllocProfiler::system(); + +fn bench_merge(bencher: Bencher, op: MergeOp, params: &MergePerfParams) { + bencher.bench_local(move || { + let params = params.clone(); + let rt = tokio::runtime::Runtime::new().unwrap(); + let parquet_dir = PathBuf::from( + std::env::var("TPCDS_PARQUET_DIR").unwrap_or_else(|_| "data/tpcds_parquet".to_string()), + ); + rt.block_on(async move { + let tmp_dir = tempfile::tempdir().unwrap(); + let (source, table) = prepare_source_and_table(¶ms, &tmp_dir, &parquet_dir) + .await + .unwrap(); + + let _ = divan::black_box(op(source, table).unwrap().await.unwrap()); + }) + }); +} + +#[divan::bench(args = [ + MergePerfParams { + sample_matched_rows: 0.05, + sample_not_matched_rows: 0.0, + } +])] +fn delete_only(bencher: Bencher, params: &MergePerfParams) { + bench_merge(bencher, merge_delete, params); +} + +#[divan::bench(args = [ + MergePerfParams { + sample_matched_rows: 0.00, + sample_not_matched_rows: 0.05, + }, + MergePerfParams { + sample_matched_rows: 0.00, + sample_not_matched_rows: 0.50, + }, + MergePerfParams { + sample_matched_rows: 0.00, + sample_not_matched_rows: 1.0, + }, +])] +fn multiple_insert_only(bencher: Bencher, params: &MergePerfParams) { + bench_merge(bencher, merge_insert, params); +} + +#[divan::bench(args = [ + MergePerfParams { + sample_matched_rows: 0.01, + sample_not_matched_rows: 0.1, + }, + MergePerfParams { + sample_matched_rows: 0.1, + sample_not_matched_rows: 0.0, + }, + MergePerfParams { + sample_matched_rows: 0.1, + sample_not_matched_rows: 0.01, + }, + MergePerfParams { + sample_matched_rows: 0.5, + sample_not_matched_rows: 0.001, + }, + MergePerfParams { + sample_matched_rows: 0.99, + sample_not_matched_rows: 0.001, + }, + MergePerfParams { + sample_matched_rows: 0.001, + sample_not_matched_rows: 0.001, + }, +])] +fn upsert_file_matched(bencher: Bencher, params: &MergePerfParams) { + bench_merge(bencher, merge_upsert, params); +} diff --git a/crates/benchmarks/data/.gitignore b/crates/benchmarks/data/.gitignore new file mode 100644 index 0000000000..c96a04f008 --- /dev/null +++ b/crates/benchmarks/data/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore \ No newline at end of file diff --git a/crates/benchmarks/src/bin/merge.rs b/crates/benchmarks/src/bin/merge.rs deleted file mode 100644 index 0e1f7a6f9d..0000000000 --- a/crates/benchmarks/src/bin/merge.rs +++ /dev/null @@ -1,665 +0,0 @@ -use std::{ - sync::Arc, - time::{SystemTime, UNIX_EPOCH}, -}; - -use arrow::datatypes::Schema as ArrowSchema; -use arrow_array::{RecordBatch, StringArray, UInt32Array}; -use chrono::Duration; -use clap::{command, Args, Parser, Subcommand}; -use datafusion::common::DataFusionError; -use datafusion::functions::expr_fn::random; -use datafusion::logical_expr::{cast, col, lit}; -use datafusion::{datasource::MemTable, prelude::DataFrame}; -use deltalake_core::protocol::SaveMode; -use deltalake_core::{ - arrow::{ - self, - datatypes::{DataType, Field}, - }, - datafusion::prelude::{CsvReadOptions, SessionContext}, - delta_datafusion::{DeltaScanConfig, DeltaTableProvider}, - ensure_table_uri, - operations::merge::{MergeBuilder, MergeMetrics}, - DeltaOps, DeltaTable, DeltaTableBuilder, DeltaTableError, ObjectStore, Path, -}; -use serde_json::json; -use tokio::time::Instant; - -/* Convert web_returns dataset from TPC DS's datagen utility into a Delta table - This table will be partitioned on `wr_returned_date_sk` -*/ -pub async fn convert_tpcds_web_returns(input_path: String, table_path: String) -> Result<(), ()> { - let ctx = SessionContext::new(); - - let schema = ArrowSchema::new(vec![ - Field::new("wr_returned_date_sk", DataType::Int64, true), - Field::new("wr_returned_time_sk", DataType::Int64, true), - Field::new("wr_item_sk", DataType::Int64, false), - Field::new("wr_refunded_customer_sk", DataType::Int64, true), - Field::new("wr_refunded_cdemo_sk", DataType::Int64, true), - Field::new("wr_refunded_hdemo_sk", DataType::Int64, true), - Field::new("wr_refunded_addr_sk", DataType::Int64, true), - Field::new("wr_returning_customer_sk", DataType::Int64, true), - Field::new("wr_returning_cdemo_sk", DataType::Int64, true), - Field::new("wr_returning_hdemo_sk", DataType::Int64, true), - Field::new("wr_returning_addr_sk", DataType::Int64, true), - Field::new("wr_web_page_sk", DataType::Int64, true), - Field::new("wr_reason_sk", DataType::Int64, true), - Field::new("wr_order_number", DataType::Int64, false), - Field::new("wr_return_quantity", DataType::Int32, true), - Field::new("wr_return_amt", DataType::Decimal128(7, 2), true), - Field::new("wr_return_tax", DataType::Decimal128(7, 2), true), - Field::new("wr_return_amt_inc_tax", DataType::Decimal128(7, 2), true), - Field::new("wr_fee", DataType::Decimal128(7, 2), true), - Field::new("wr_return_ship_cost", DataType::Decimal128(7, 2), true), - Field::new("wr_refunded_cash", DataType::Decimal128(7, 2), true), - Field::new("wr_reversed_charge", DataType::Decimal128(7, 2), true), - Field::new("wr_account_credit", DataType::Decimal128(7, 2), true), - Field::new("wr_net_loss", DataType::Decimal128(7, 2), true), - ]); - - let table = ctx - .read_csv( - input_path, - CsvReadOptions { - has_header: false, - delimiter: b'|', - file_extension: ".dat", - schema: Some(&schema), - ..Default::default() - }, - ) - .await - .unwrap(); - - let table_url = ensure_table_uri(&table_path).unwrap(); - DeltaOps::try_from_uri(table_url) - .await - .unwrap() - .write(table.collect().await.unwrap()) - .with_partition_columns(vec!["wr_returned_date_sk"]) - .await - .unwrap(); - - Ok(()) -} - -fn merge_upsert(source: DataFrame, table: DeltaTable) -> Result { - DeltaOps(table) - .merge(source, "source.wr_item_sk = target.wr_item_sk and source.wr_order_number = target.wr_order_number") - .with_source_alias("source") - .with_target_alias("target") - .when_matched_update(|update| { - update - .update("wr_returned_date_sk", "source.wr_returned_date_sk") - .update("wr_returned_time_sk", "source.wr_returned_time_sk") - .update("wr_item_sk", "source.wr_item_sk") - .update("wr_refunded_customer_sk", "source.wr_refunded_customer_sk") - .update("wr_refunded_cdemo_sk", "source.wr_refunded_cdemo_sk") - .update("wr_refunded_hdemo_sk", "source.wr_refunded_hdemo_sk") - .update("wr_refunded_addr_sk", "source.wr_refunded_addr_sk") - .update("wr_returning_customer_sk", "source.wr_returning_customer_sk") - .update("wr_returning_cdemo_sk", "source.wr_returning_cdemo_sk") - .update("wr_returning_hdemo_sk", "source.wr_returning_hdemo_sk") - .update("wr_returning_addr_sk", "source.wr_returning_addr_sk") - .update("wr_web_page_sk", "source.wr_web_page_sk") - .update("wr_reason_sk", "source.wr_reason_sk") - .update("wr_order_number", "source.wr_order_number") - .update("wr_return_quantity", "source.wr_return_quantity") - .update("wr_return_amt", "source.wr_return_amt") - .update("wr_return_tax", "source.wr_return_tax") - .update("wr_return_amt_inc_tax", "source.wr_return_amt_inc_tax") - .update("wr_fee", "source.wr_fee") - .update("wr_return_ship_cost", "source.wr_return_ship_cost") - .update("wr_refunded_cash", "source.wr_refunded_cash") - .update("wr_reversed_charge", "source.wr_reversed_charge") - .update("wr_account_credit", "source.wr_account_credit") - .update("wr_net_loss", "source.wr_net_loss") - })? - .when_not_matched_insert(|insert| { - insert - .set("wr_returned_date_sk", "source.wr_returned_date_sk") - .set("wr_returned_time_sk", "source.wr_returned_time_sk") - .set("wr_item_sk", "source.wr_item_sk") - .set("wr_refunded_customer_sk", "source.wr_refunded_customer_sk") - .set("wr_refunded_cdemo_sk", "source.wr_refunded_cdemo_sk") - .set("wr_refunded_hdemo_sk", "source.wr_refunded_hdemo_sk") - .set("wr_refunded_addr_sk", "source.wr_refunded_addr_sk") - .set("wr_returning_customer_sk", "source.wr_returning_customer_sk") - .set("wr_returning_cdemo_sk", "source.wr_returning_cdemo_sk") - .set("wr_returning_hdemo_sk", "source.wr_returning_hdemo_sk") - .set("wr_returning_addr_sk", "source.wr_returning_addr_sk") - .set("wr_web_page_sk", "source.wr_web_page_sk") - .set("wr_reason_sk", "source.wr_reason_sk") - .set("wr_order_number", "source.wr_order_number") - .set("wr_return_quantity", "source.wr_return_quantity") - .set("wr_return_amt", "source.wr_return_amt") - .set("wr_return_tax", "source.wr_return_tax") - .set("wr_return_amt_inc_tax", "source.wr_return_amt_inc_tax") - .set("wr_fee", "source.wr_fee") - .set("wr_return_ship_cost", "source.wr_return_ship_cost") - .set("wr_refunded_cash", "source.wr_refunded_cash") - .set("wr_reversed_charge", "source.wr_reversed_charge") - .set("wr_account_credit", "source.wr_account_credit") - .set("wr_net_loss", "source.wr_net_loss") - }) -} - -fn merge_insert(source: DataFrame, table: DeltaTable) -> Result { - DeltaOps(table) - .merge(source, "source.wr_item_sk = target.wr_item_sk and source.wr_order_number = target.wr_order_number") - .with_source_alias("source") - .with_target_alias("target") - .when_not_matched_insert(|insert| { - insert - .set("wr_returned_date_sk", "source.wr_returned_date_sk") - .set("wr_returned_time_sk", "source.wr_returned_time_sk") - .set("wr_item_sk", "source.wr_item_sk") - .set("wr_refunded_customer_sk", "source.wr_refunded_customer_sk") - .set("wr_refunded_cdemo_sk", "source.wr_refunded_cdemo_sk") - .set("wr_refunded_hdemo_sk", "source.wr_refunded_hdemo_sk") - .set("wr_refunded_addr_sk", "source.wr_refunded_addr_sk") - .set("wr_returning_customer_sk", "source.wr_returning_customer_sk") - .set("wr_returning_cdemo_sk", "source.wr_returning_cdemo_sk") - .set("wr_returning_hdemo_sk", "source.wr_returning_hdemo_sk") - .set("wr_returning_addr_sk", "source.wr_returning_addr_sk") - .set("wr_web_page_sk", "source.wr_web_page_sk") - .set("wr_reason_sk", "source.wr_reason_sk") - .set("wr_order_number", "source.wr_order_number") - .set("wr_return_quantity", "source.wr_return_quantity") - .set("wr_return_amt", "source.wr_return_amt") - .set("wr_return_tax", "source.wr_return_tax") - .set("wr_return_amt_inc_tax", "source.wr_return_amt_inc_tax") - .set("wr_fee", "source.wr_fee") - .set("wr_return_ship_cost", "source.wr_return_ship_cost") - .set("wr_refunded_cash", "source.wr_refunded_cash") - .set("wr_reversed_charge", "source.wr_reversed_charge") - .set("wr_account_credit", "source.wr_account_credit") - .set("wr_net_loss", "source.wr_net_loss") - }) -} - -fn merge_delete(source: DataFrame, table: DeltaTable) -> Result { - DeltaOps(table) - .merge(source, "source.wr_item_sk = target.wr_item_sk and source.wr_order_number = target.wr_order_number") - .with_source_alias("source") - .with_target_alias("target") - .when_matched_delete(|delete| { - delete - }) -} - -async fn benchmark_merge_tpcds( - path: String, - parameters: MergePerfParams, - merge: fn(DataFrame, DeltaTable) -> Result, -) -> Result<(core::time::Duration, MergeMetrics), DataFusionError> { - let table_url = ensure_table_uri(&path)?; - let table = DeltaTableBuilder::from_uri(table_url)?.load().await?; - - let provider = DeltaTableProvider::try_new( - table.snapshot()?.snapshot().clone(), - table.log_store(), - DeltaScanConfig { - file_column_name: Some("file_path".to_string()), - ..Default::default() - }, - ) - .unwrap(); - - let ctx = SessionContext::new(); - ctx.register_table("t1", Arc::new(provider))?; - - let files = ctx - .sql("select file_path as file from t1 group by file") - .await? - .with_column("r", random())? - .filter(col("r").lt_eq(lit(parameters.sample_files)))?; - - let file_sample = files.collect_partitioned().await?; - let schema = file_sample.first().unwrap().first().unwrap().schema(); - let mem_table = Arc::new(MemTable::try_new(schema, file_sample)?); - ctx.register_table("file_sample", mem_table)?; - let file_sample_count = ctx.table("file_sample").await?.count().await?; - - let row_sample = ctx.table("t1").await?.join( - ctx.table("file_sample").await?, - datafusion::common::JoinType::Inner, - &["file_path"], - &["file"], - None, - )?; - - let matched = row_sample - .clone() - .filter(random().lt_eq(lit(parameters.sample_matched_rows)))?; - - let rand = cast(random() * lit(u32::MAX), DataType::Int64); - let not_matched = row_sample - .filter(random().lt_eq(lit(parameters.sample_not_matched_rows)))? - .with_column("wr_item_sk", rand.clone())? - .with_column("wr_order_number", rand)?; - - let source = matched.union(not_matched)?; - - let start = Instant::now(); - let (table, metrics) = merge(source, table)?.await?; - let end = Instant::now(); - - let duration = end.duration_since(start); - - println!("File sample count: {file_sample_count}"); - println!("{metrics:?}"); - println!("Seconds: {}", duration.as_secs_f32()); - - // Clean up and restore to original state. - let (table, _) = DeltaOps(table).restore().with_version_to_restore(0).await?; - let (table, _) = DeltaOps(table) - .vacuum() - .with_retention_period(Duration::seconds(0)) - .with_enforce_retention_duration(false) - .await?; - table - .object_store() - .delete(&Path::parse("_delta_log/00000000000000000001.json")?) - .await?; - table - .object_store() - .delete(&Path::parse("_delta_log/00000000000000000002.json")?) - .await?; - table - .object_store() - .delete(&Path::parse("_delta_log/00000000000000000003.json")?) - .await?; - let _ = table - .object_store() - .delete(&Path::parse("_delta_log/00000000000000000004.json")?) - .await; - - Ok((duration, metrics)) -} - -#[derive(Subcommand, Debug)] -enum Command { - Convert(Convert), - Bench(BenchArg), - Standard(Standard), - Compare(Compare), - Show(Show), -} - -#[derive(Debug, Args)] -struct Convert { - tpcds_path: String, - delta_path: String, -} - -#[derive(Debug, Args)] -struct Standard { - delta_path: String, - samples: Option, - output_path: Option, - group_id: Option, -} - -#[derive(Debug, Args)] -struct Compare { - before_path: String, - before_group_id: String, - after_path: String, - after_group_id: String, -} - -#[derive(Debug, Args)] -struct Show { - path: String, -} - -#[derive(Debug, Args)] -struct BenchArg { - table_path: String, - #[command(subcommand)] - name: MergeBench, -} - -struct Bench { - name: String, - op: fn(DataFrame, DeltaTable) -> Result, - params: MergePerfParams, -} - -impl Bench { - fn new( - name: S, - op: fn(DataFrame, DeltaTable) -> Result, - params: MergePerfParams, - ) -> Self { - Bench { - name: name.to_string(), - op, - params, - } - } -} - -#[derive(Debug, Args, Clone)] -struct MergePerfParams { - pub sample_files: f32, - pub sample_matched_rows: f32, - pub sample_not_matched_rows: f32, -} - -#[derive(Debug, Clone, Subcommand)] -enum MergeBench { - Upsert(MergePerfParams), - Delete(MergePerfParams), - Insert(MergePerfParams), -} - -#[derive(Parser, Debug)] -#[command(about)] -struct MergePrefArgs { - #[command(subcommand)] - command: Command, -} - -#[tokio::main] -async fn main() { - type MergeOp = fn(DataFrame, DeltaTable) -> Result; - match MergePrefArgs::parse().command { - Command::Convert(Convert { - tpcds_path, - delta_path, - }) => { - convert_tpcds_web_returns(tpcds_path, delta_path) - .await - .unwrap(); - } - - Command::Bench(BenchArg { table_path, name }) => { - let (merge_op, params): (MergeOp, MergePerfParams) = match name { - MergeBench::Upsert(params) => (merge_upsert, params), - MergeBench::Delete(params) => (merge_delete, params), - MergeBench::Insert(params) => (merge_insert, params), - }; - - benchmark_merge_tpcds(table_path, params, merge_op) - .await - .unwrap(); - } - Command::Standard(Standard { - delta_path, - samples, - output_path, - group_id, - }) => { - let benches = vec![Bench::new( - "delete_only_fileMatchedFraction_0.05_rowMatchedFraction_0.05", - merge_delete, - MergePerfParams { - sample_files: 0.05, - sample_matched_rows: 0.05, - sample_not_matched_rows: 0.0, - }, - ), - Bench::new( - "multiple_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_0.05", - merge_insert, - MergePerfParams { - sample_files: 0.05, - sample_matched_rows: 0.00, - sample_not_matched_rows: 0.05, - }, - ), - Bench::new( - "multiple_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_0.50", - merge_insert, - MergePerfParams { - sample_files: 0.05, - sample_matched_rows: 0.00, - sample_not_matched_rows: 0.50, - }, - ), - Bench::new( - "multiple_insert_only_fileMatchedFraction_0.05_rowNotMatchedFraction_1.0", - merge_insert, - MergePerfParams { - sample_files: 0.05, - sample_matched_rows: 0.00, - sample_not_matched_rows: 1.0, - }, - ), - Bench::new( - "upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.01_rowNotMatchedFraction_0.1", - merge_upsert, - MergePerfParams { - sample_files: 0.05, - sample_matched_rows: 0.01, - sample_not_matched_rows: 0.1, - }, - ), - Bench::new( - "upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.0_rowNotMatchedFraction_0.1", - merge_upsert, - MergePerfParams { - sample_files: 0.05, - sample_matched_rows: 0.00, - sample_not_matched_rows: 0.1, - }, - ), - Bench::new( - "upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.1_rowNotMatchedFraction_0.0", - merge_upsert, - MergePerfParams { - sample_files: 0.05, - sample_matched_rows: 0.1, - sample_not_matched_rows: 0.0, - }, - ), - Bench::new( - "upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.1_rowNotMatchedFraction_0.01", - merge_upsert, - MergePerfParams { - sample_files: 0.05, - sample_matched_rows: 0.1, - sample_not_matched_rows: 0.01, - }, - ), - Bench::new( - "upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.5_rowNotMatchedFraction_0.001", - merge_upsert, - MergePerfParams { - sample_files: 0.05, - sample_matched_rows: 0.5, - sample_not_matched_rows: 0.001, - }, - ), - Bench::new( - "upsert_fileMatchedFraction_0.05_rowMatchedFraction_0.99_rowNotMatchedFraction_0.001", - merge_upsert, - MergePerfParams { - sample_files: 0.05, - sample_matched_rows: 0.99, - sample_not_matched_rows: 0.001, - }, - ), - Bench::new( - "upsert_fileMatchedFraction_0.05_rowMatchedFraction_1.0_rowNotMatchedFraction_0.001", - merge_upsert, - MergePerfParams { - sample_files: 0.05, - sample_matched_rows: 1.0, - sample_not_matched_rows: 0.001, - }, - ), - Bench::new( - "upsert_fileMatchedFraction_0.5_rowMatchedFraction_0.001_rowNotMatchedFraction_0.001", - merge_upsert, - MergePerfParams { - sample_files: 0.5, - sample_matched_rows: 0.001, - sample_not_matched_rows: 0.001, - }, - ), - Bench::new( - "upsert_fileMatchedFraction_1.0_rowMatchedFraction_0.001_rowNotMatchedFraction_0.001", - merge_upsert, - MergePerfParams { - sample_files: 1.0, - sample_matched_rows: 0.001, - sample_not_matched_rows: 0.001, - }, - ) - ]; - - let num_samples = samples.unwrap_or(1); - let group_id = group_id.unwrap_or( - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis() - .to_string(), - ); - let output = output_path.unwrap_or("data/benchmarks".into()); - - let mut group_ids = vec![]; - let mut name = vec![]; - let mut samples = vec![]; - let mut duration_ms = vec![]; - let mut data = vec![]; - - for bench in benches { - for sample in 0..num_samples { - println!("Test: {} Sample: {sample}", bench.name); - let res = - benchmark_merge_tpcds(delta_path.clone(), bench.params.clone(), bench.op) - .await - .unwrap(); - - group_ids.push(group_id.clone()); - name.push(bench.name.clone()); - samples.push(sample); - duration_ms.push(res.0.as_millis() as u32); - data.push(json!(res.1).to_string()); - } - } - - let schema = Arc::new(ArrowSchema::new(vec![ - Field::new("group_id", DataType::Utf8, false), - Field::new("name", DataType::Utf8, false), - Field::new("sample", DataType::UInt32, false), - Field::new("duration_ms", DataType::UInt32, false), - Field::new("data", DataType::Utf8, true), - ])); - - let batch = RecordBatch::try_new( - schema, - vec![ - Arc::new(StringArray::from(group_ids)), - Arc::new(StringArray::from(name)), - Arc::new(UInt32Array::from(samples)), - Arc::new(UInt32Array::from(duration_ms)), - Arc::new(StringArray::from(data)), - ], - ) - .unwrap(); - - let output_url = ensure_table_uri(&output).unwrap(); - DeltaOps::try_from_uri(output_url) - .await - .unwrap() - .write(vec![batch]) - .with_save_mode(SaveMode::Append) - .await - .unwrap(); - } - Command::Compare(Compare { - before_path, - before_group_id, - after_path, - after_group_id, - }) => { - let before_url = ensure_table_uri(&before_path).unwrap(); - let before_table = DeltaTableBuilder::from_uri(before_url) - .unwrap() - .load() - .await - .unwrap(); - let after_url = ensure_table_uri(&after_path).unwrap(); - let after_table = DeltaTableBuilder::from_uri(after_url) - .unwrap() - .load() - .await - .unwrap(); - - let ctx = SessionContext::new(); - ctx.register_table("before", Arc::new(before_table)) - .unwrap(); - ctx.register_table("after", Arc::new(after_table)).unwrap(); - - let before_stats = ctx - .sql(&format!( - " - select name as before_name, - avg(cast(duration_ms as float)) as before_duration_avg - from before where group_id = {before_group_id} - group by name - ", - )) - .await - .unwrap(); - - let after_stats = ctx - .sql(&format!( - " - select name as after_name, - avg(cast(duration_ms as float)) as after_duration_avg - from after where group_id = {after_group_id} - group by name - ", - )) - .await - .unwrap(); - - before_stats - .join( - after_stats, - datafusion::common::JoinType::Inner, - &["before_name"], - &["after_name"], - None, - ) - .unwrap() - .select(vec![ - col("before_name").alias("name"), - col("before_duration_avg"), - col("after_duration_avg"), - (col("before_duration_avg") / (col("after_duration_avg"))), - ]) - .unwrap() - .sort(vec![col("name").sort(true, true)]) - .unwrap() - .show() - .await - .unwrap(); - } - Command::Show(Show { path }) => { - let table_url = ensure_table_uri(&path).unwrap(); - let stats = DeltaTableBuilder::from_uri(table_url) - .unwrap() - .load() - .await - .unwrap(); - let ctx = SessionContext::new(); - ctx.register_table("stats", Arc::new(stats)).unwrap(); - - ctx.sql("select * from stats") - .await - .unwrap() - .show() - .await - .unwrap(); - } - } -} diff --git a/crates/benchmarks/src/lib.rs b/crates/benchmarks/src/lib.rs new file mode 100644 index 0000000000..5277a9da26 --- /dev/null +++ b/crates/benchmarks/src/lib.rs @@ -0,0 +1,182 @@ +use std::path::Path; + +use datafusion::logical_expr::{cast, lit}; +use datafusion::prelude::DataFrame; +use datafusion::prelude::ParquetReadOptions; +use deltalake_core::kernel::engine::arrow_conversion::TryIntoKernel; +use deltalake_core::kernel::{StructField, StructType}; +use deltalake_core::operations::merge::MergeBuilder; +use deltalake_core::DeltaResult; +use deltalake_core::{datafusion::prelude::SessionContext, DeltaOps, DeltaTable, DeltaTableError}; +use tempfile::TempDir; +use url::Url; + +pub type MergeOp = fn(DataFrame, DeltaTable) -> Result; + +#[derive(Debug, Clone)] +pub struct MergePerfParams { + pub sample_matched_rows: f32, + pub sample_not_matched_rows: f32, +} + +pub fn merge_upsert(source: DataFrame, table: DeltaTable) -> Result { + deltalake_core::DeltaOps(table) + .merge(source, "source.wr_item_sk = target.wr_item_sk and source.wr_order_number = target.wr_order_number") + .with_source_alias("source") + .with_target_alias("target") + .when_matched_update(|update| { + update + .update("wr_returned_date_sk", "source.wr_returned_date_sk") + .update("wr_returned_time_sk", "source.wr_returned_time_sk") + .update("wr_item_sk", "source.wr_item_sk") + .update("wr_refunded_customer_sk", "source.wr_refunded_customer_sk") + .update("wr_refunded_cdemo_sk", "source.wr_refunded_cdemo_sk") + .update("wr_refunded_hdemo_sk", "source.wr_refunded_hdemo_sk") + .update("wr_refunded_addr_sk", "source.wr_refunded_addr_sk") + .update("wr_returning_customer_sk", "source.wr_returning_customer_sk") + .update("wr_returning_cdemo_sk", "source.wr_returning_cdemo_sk") + .update("wr_returning_hdemo_sk", "source.wr_returning_hdemo_sk") + .update("wr_returning_addr_sk", "source.wr_returning_addr_sk") + .update("wr_web_page_sk", "source.wr_web_page_sk") + .update("wr_reason_sk", "source.wr_reason_sk") + .update("wr_order_number", "source.wr_order_number") + .update("wr_return_quantity", "source.wr_return_quantity") + .update("wr_return_amt", "source.wr_return_amt") + .update("wr_return_tax", "source.wr_return_tax") + .update("wr_return_amt_inc_tax", "source.wr_return_amt_inc_tax") + .update("wr_fee", "source.wr_fee") + .update("wr_return_ship_cost", "source.wr_return_ship_cost") + .update("wr_refunded_cash", "source.wr_refunded_cash") + .update("wr_reversed_charge", "source.wr_reversed_charge") + .update("wr_account_credit", "source.wr_account_credit") + .update("wr_net_loss", "source.wr_net_loss") + })? + .when_not_matched_insert(|insert| { + insert + .set("wr_returned_date_sk", "source.wr_returned_date_sk") + .set("wr_returned_time_sk", "source.wr_returned_time_sk") + .set("wr_item_sk", "source.wr_item_sk") + .set("wr_refunded_customer_sk", "source.wr_refunded_customer_sk") + .set("wr_refunded_cdemo_sk", "source.wr_refunded_cdemo_sk") + .set("wr_refunded_hdemo_sk", "source.wr_refunded_hdemo_sk") + .set("wr_refunded_addr_sk", "source.wr_refunded_addr_sk") + .set("wr_returning_customer_sk", "source.wr_returning_customer_sk") + .set("wr_returning_cdemo_sk", "source.wr_returning_cdemo_sk") + .set("wr_returning_hdemo_sk", "source.wr_returning_hdemo_sk") + .set("wr_returning_addr_sk", "source.wr_returning_addr_sk") + .set("wr_web_page_sk", "source.wr_web_page_sk") + .set("wr_reason_sk", "source.wr_reason_sk") + .set("wr_order_number", "source.wr_order_number") + .set("wr_return_quantity", "source.wr_return_quantity") + .set("wr_return_amt", "source.wr_return_amt") + .set("wr_return_tax", "source.wr_return_tax") + .set("wr_return_amt_inc_tax", "source.wr_return_amt_inc_tax") + .set("wr_fee", "source.wr_fee") + .set("wr_return_ship_cost", "source.wr_return_ship_cost") + .set("wr_refunded_cash", "source.wr_refunded_cash") + .set("wr_reversed_charge", "source.wr_reversed_charge") + .set("wr_account_credit", "source.wr_account_credit") + .set("wr_net_loss", "source.wr_net_loss") + }) +} + +pub fn merge_insert(source: DataFrame, table: DeltaTable) -> Result { + deltalake_core::DeltaOps(table) + .merge(source, "source.wr_item_sk = target.wr_item_sk and source.wr_order_number = target.wr_order_number") + .with_source_alias("source") + .with_target_alias("target") + .when_not_matched_insert(|insert| { + insert + .set("wr_returned_date_sk", "source.wr_returned_date_sk") + .set("wr_returned_time_sk", "source.wr_returned_time_sk") + .set("wr_item_sk", "source.wr_item_sk") + .set("wr_refunded_customer_sk", "source.wr_refunded_customer_sk") + .set("wr_refunded_cdemo_sk", "source.wr_refunded_cdemo_sk") + .set("wr_refunded_hdemo_sk", "source.wr_refunded_hdemo_sk") + .set("wr_refunded_addr_sk", "source.wr_refunded_addr_sk") + .set("wr_returning_customer_sk", "source.wr_returning_customer_sk") + .set("wr_returning_cdemo_sk", "source.wr_returning_cdemo_sk") + .set("wr_returning_hdemo_sk", "source.wr_returning_hdemo_sk") + .set("wr_returning_addr_sk", "source.wr_returning_addr_sk") + .set("wr_web_page_sk", "source.wr_web_page_sk") + .set("wr_reason_sk", "source.wr_reason_sk") + .set("wr_order_number", "source.wr_order_number") + .set("wr_return_quantity", "source.wr_return_quantity") + .set("wr_return_amt", "source.wr_return_amt") + .set("wr_return_tax", "source.wr_return_tax") + .set("wr_return_amt_inc_tax", "source.wr_return_amt_inc_tax") + .set("wr_fee", "source.wr_fee") + .set("wr_return_ship_cost", "source.wr_return_ship_cost") + .set("wr_refunded_cash", "source.wr_refunded_cash") + .set("wr_reversed_charge", "source.wr_reversed_charge") + .set("wr_account_credit", "source.wr_account_credit") + .set("wr_net_loss", "source.wr_net_loss") + }) +} + +pub fn merge_delete(source: DataFrame, table: DeltaTable) -> Result { + deltalake_core::DeltaOps(table) + .merge(source, "source.wr_item_sk = target.wr_item_sk and source.wr_order_number = target.wr_order_number") + .with_source_alias("source") + .with_target_alias("target") + .when_matched_delete(|delete| delete) +} + +/// Prepare source DataFrame and target Delta table from DuckDB-generated TPC-DS parquet. +/// Creates a temporary Delta table from web_returns.parquet as the target. +/// Returns (source_df, target_table) for benchmarking. +pub async fn prepare_source_and_table( + params: &MergePerfParams, + tmp_dir: &TempDir, + parquet_dir: &Path, +) -> DeltaResult<(DataFrame, DeltaTable)> { + let ctx = SessionContext::new(); + + let parquet_path = parquet_dir + .join("web_returns.parquet") + .to_str() + .unwrap() + .to_owned(); + + let parquet_df = ctx + .read_parquet(&parquet_path, ParquetReadOptions::default()) + .await?; + let temp_table_url = Url::from_directory_path(tmp_dir).unwrap(); + + let schema = parquet_df.schema(); + let delta_schema: StructType = schema.as_arrow().try_into_kernel().unwrap(); + + let batches = parquet_df.collect().await?; + let fields: Vec = delta_schema.fields().cloned().collect(); + let table = DeltaOps::try_from_uri(temp_table_url) + .await? + .create() + .with_columns(fields) + .await?; + + let table = DeltaOps(table).write(batches).await?; + + // Now prepare source DataFrame with sampling + let source = ctx + .read_parquet(&parquet_path, ParquetReadOptions::default()) + .await?; + + // Split matched and not-matched portions + let matched = source + .clone() + .filter(datafusion::functions::expr_fn::random().lt_eq(lit(params.sample_matched_rows)))?; + + let rand = cast( + datafusion::functions::expr_fn::random() * lit(u32::MAX), + datafusion::arrow::datatypes::DataType::Int64, + ); + let not_matched = source + .filter( + datafusion::functions::expr_fn::random().lt_eq(lit(params.sample_not_matched_rows)), + )? + .with_column("wr_item_sk", rand.clone())? + .with_column("wr_order_number", rand)?; + + let source = matched.union(not_matched)?; + Ok((source, table)) +} diff --git a/crates/benchmarks/src/main.rs b/crates/benchmarks/src/main.rs new file mode 100644 index 0000000000..fa7112d8b2 --- /dev/null +++ b/crates/benchmarks/src/main.rs @@ -0,0 +1,66 @@ +use std::{path::PathBuf, time::Instant}; + +use clap::{Parser, ValueEnum}; + +use delta_benchmarks::{ + merge_delete, merge_insert, merge_upsert, prepare_source_and_table, MergeOp, MergePerfParams, +}; + +#[derive(Copy, Clone, Debug, ValueEnum)] +enum OpKind { + Upsert, + Delete, + Insert, +} + +#[derive(Parser, Debug)] +#[command(about = "Run a merge benchmark with configurable parameters")] +struct Cli { + /// Operation to benchmark + #[arg(value_enum)] + op: OpKind, + + /// Fraction of rows that match an existing key (0.0-1.0) + #[arg(long, default_value_t = 0.01)] + matched: f32, + + /// Fraction of rows that do not match (0.0-1.0) + #[arg(long, default_value_t = 0.10)] + not_matched: f32, +} + +#[tokio::main] +async fn main() { + let cli = Cli::parse(); + + let op_fn: MergeOp = match cli.op { + OpKind::Upsert => merge_upsert, + OpKind::Delete => merge_delete, + OpKind::Insert => merge_insert, + }; + + let params = MergePerfParams { + sample_matched_rows: cli.matched, + sample_not_matched_rows: cli.not_matched, + }; + + let tmp_dir = tempfile::tempdir().expect("create tmp dir"); + + let parquet_dir = PathBuf::from( + std::env::var("TPCDS_PARQUET_DIR") + .unwrap_or_else(|_| "crates/benchmarks/data/tpcds_parquet".to_string()), + ); + + let (source, table) = prepare_source_and_table(¶ms, &tmp_dir, &parquet_dir) + .await + .expect("prepare inputs"); + + let start = Instant::now(); + let (_table, metrics) = op_fn(source, table) + .expect("build merge") + .await + .expect("execute merge"); + let elapsed = start.elapsed(); + + println!("duration_ms={} metrics={:?}", elapsed.as_millis(), metrics) +} diff --git a/crates/catalog-unity/Cargo.toml b/crates/catalog-unity/Cargo.toml index fd251dc04b..23afd3dc46 100644 --- a/crates/catalog-unity/Cargo.toml +++ b/crates/catalog-unity/Cargo.toml @@ -34,7 +34,7 @@ moka = { version = "0.12", optional = true, features = ["future"] } [dev-dependencies] tokio = { version = "1", features = ["macros", "rt-multi-thread"] } -tempfile = "3" +tempfile = { workspace = true } httpmock = { version = "0.8.0-alpha.1" } tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 578162468b..97e3e00f0b 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -69,7 +69,7 @@ tokio = { workspace = true, features = [ # caching foyer = { version = "0.20.0", optional = true, features = ["serde"] } -tempfile = { version = "3.19.1", optional = true } +tempfile = { workspace = true, optional = true } # other deps (these should be organized and pulled into workspace.dependencies as necessary) cfg-if = "1" @@ -98,7 +98,7 @@ pretty_assertions = "1.2.1" pretty_env_logger = "0.5.0" rstest = { version = "0.26.1" } serial_test = "3" -tempfile = "3" +tempfile = { workspace = true } tokio = { version = "1", features = ["macros", "rt-multi-thread"] } tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/crates/gcp/Cargo.toml b/crates/gcp/Cargo.toml index 06067802dd..975faa71fb 100644 --- a/crates/gcp/Cargo.toml +++ b/crates/gcp/Cargo.toml @@ -31,7 +31,7 @@ deltalake-test = { path = "../test" } pretty_env_logger = "0.5.0" rand = "0.8" serde_json = { workspace = true } -tempfile = "3" +tempfile = { workspace = true } [features] integration_test = [] diff --git a/crates/mount/Cargo.toml b/crates/mount/Cargo.toml index 4f6b872fb5..af2457bf13 100644 --- a/crates/mount/Cargo.toml +++ b/crates/mount/Cargo.toml @@ -31,7 +31,7 @@ deltalake-test = { path = "../test" } pretty_env_logger = "0.5.0" rand = "0.8" serde_json = { workspace = true } -tempfile = "3" +tempfile = { workspace = true } fs_extra = "1.3.0" [features] diff --git a/crates/test/Cargo.toml b/crates/test/Cargo.toml index 4aee45354e..6c0b076c47 100644 --- a/crates/test/Cargo.toml +++ b/crates/test/Cargo.toml @@ -28,7 +28,7 @@ url = { workspace = true } dotenvy = "0" fs_extra = "1.3.0" futures = { version = "0.3" } -tempfile = "3" +tempfile = { workspace = true } tokio = { version = "1", features = ["macros", "rt-multi-thread"] } [features] From 9900fd9ab7a63bc030621405e7539f2ba09da2ed Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Sat, 11 Oct 2025 14:43:01 -0400 Subject: [PATCH 2/3] Remove unused imports Signed-off-by: Abhi Agarwal --- crates/benchmarks/Cargo.toml | 10 ---------- crates/benchmarks/src/lib.rs | 24 +++++++++++++----------- 2 files changed, 13 insertions(+), 21 deletions(-) diff --git a/crates/benchmarks/Cargo.toml b/crates/benchmarks/Cargo.toml index bc3ea1bd9c..352290b443 100644 --- a/crates/benchmarks/Cargo.toml +++ b/crates/benchmarks/Cargo.toml @@ -16,16 +16,6 @@ chrono = { workspace = true } url = { workspace = true } tempfile = { workspace = true } -# arrow -arrow = { workspace = true } -arrow-array = { workspace = true } - -# serde -serde_json = { workspace = true } - -# datafusion -datafusion = { workspace = true } - [dependencies.deltalake-core] path = "../core" version = "0" diff --git a/crates/benchmarks/src/lib.rs b/crates/benchmarks/src/lib.rs index 5277a9da26..f8533e4cd2 100644 --- a/crates/benchmarks/src/lib.rs +++ b/crates/benchmarks/src/lib.rs @@ -1,13 +1,17 @@ use std::path::Path; -use datafusion::logical_expr::{cast, lit}; -use datafusion::prelude::DataFrame; -use datafusion::prelude::ParquetReadOptions; +use deltalake_core::datafusion::functions::expr_fn; use deltalake_core::kernel::engine::arrow_conversion::TryIntoKernel; use deltalake_core::kernel::{StructField, StructType}; use deltalake_core::operations::merge::MergeBuilder; -use deltalake_core::DeltaResult; -use deltalake_core::{datafusion::prelude::SessionContext, DeltaOps, DeltaTable, DeltaTableError}; +use deltalake_core::{arrow, DeltaResult}; +use deltalake_core::{ + datafusion::{ + logical_expr::{cast, lit}, + prelude::{DataFrame, ParquetReadOptions, SessionContext}, + }, + DeltaOps, DeltaTable, DeltaTableError, +}; use tempfile::TempDir; use url::Url; @@ -164,16 +168,14 @@ pub async fn prepare_source_and_table( // Split matched and not-matched portions let matched = source .clone() - .filter(datafusion::functions::expr_fn::random().lt_eq(lit(params.sample_matched_rows)))?; + .filter(expr_fn::random().lt_eq(lit(params.sample_matched_rows)))?; let rand = cast( - datafusion::functions::expr_fn::random() * lit(u32::MAX), - datafusion::arrow::datatypes::DataType::Int64, + expr_fn::random() * lit(u32::MAX), + arrow::datatypes::DataType::Int64, ); let not_matched = source - .filter( - datafusion::functions::expr_fn::random().lt_eq(lit(params.sample_not_matched_rows)), - )? + .filter(expr_fn::random().lt_eq(lit(params.sample_not_matched_rows)))? .with_column("wr_item_sk", rand.clone())? .with_column("wr_order_number", rand)?; From 74f41108a7b0b08874440c13c62bf107bb894cd2 Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Sat, 11 Oct 2025 15:06:45 -0400 Subject: [PATCH 3/3] Make sure divan is only benchmarking the merge operation and not the setup Signed-off-by: Abhi Agarwal --- crates/benchmarks/Cargo.toml | 1 - crates/benchmarks/benches/merge.rs | 32 ++++++++++++++++++------------ 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/crates/benchmarks/Cargo.toml b/crates/benchmarks/Cargo.toml index 352290b443..58c7aa11fd 100644 --- a/crates/benchmarks/Cargo.toml +++ b/crates/benchmarks/Cargo.toml @@ -12,7 +12,6 @@ publish = false [dependencies] clap = { version = "4", features = ["derive"] } tokio = { workspace = true, features = ["fs", "macros", "rt", "io-util"] } -chrono = { workspace = true } url = { workspace = true } tempfile = { workspace = true } diff --git a/crates/benchmarks/benches/merge.rs b/crates/benchmarks/benches/merge.rs index 4851d934f8..5ef6660c4b 100644 --- a/crates/benchmarks/benches/merge.rs +++ b/crates/benchmarks/benches/merge.rs @@ -14,21 +14,27 @@ fn main() { static ALLOC: AllocProfiler = AllocProfiler::system(); fn bench_merge(bencher: Bencher, op: MergeOp, params: &MergePerfParams) { - bencher.bench_local(move || { - let params = params.clone(); - let rt = tokio::runtime::Runtime::new().unwrap(); - let parquet_dir = PathBuf::from( - std::env::var("TPCDS_PARQUET_DIR").unwrap_or_else(|_| "data/tpcds_parquet".to_string()), - ); - rt.block_on(async move { + let rt = tokio::runtime::Runtime::new().unwrap(); + bencher + .with_inputs(|| { let tmp_dir = tempfile::tempdir().unwrap(); - let (source, table) = prepare_source_and_table(¶ms, &tmp_dir, &parquet_dir) - .await - .unwrap(); - - let _ = divan::black_box(op(source, table).unwrap().await.unwrap()); + let parquet_dir = PathBuf::from( + std::env::var("TPCDS_PARQUET_DIR") + .unwrap_or_else(|_| "data/tpcds_parquet".to_string()), + ); + rt.block_on(async move { + let (source, table) = prepare_source_and_table(params, &tmp_dir, &parquet_dir) + .await + .unwrap(); + (source, table, tmp_dir) + }) }) - }); + .bench_local_values(|(source, table, tmp_dir)| { + rt.block_on(async move { + let _ = divan::black_box(op(source, table).unwrap().await.unwrap()); + }); + drop(tmp_dir); + }); } #[divan::bench(args = [