Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 37 additions & 14 deletions benchmarks/datafusion-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ use vortex_bench::conversions::convert_parquet_directory_to_vortex;
use vortex_bench::create_benchmark;
use vortex_bench::create_output_writer;
use vortex_bench::display::DisplayFormat;
use vortex_bench::runner::BenchmarkMode;
use vortex_bench::runner::BenchmarkQueryResult;
use vortex_bench::runner::SqlBenchmarkRunner;
use vortex_bench::runner::filter_queries;
use vortex_bench::setup_logging_and_tracing;
Expand Down Expand Up @@ -95,9 +97,6 @@ struct Args {
#[arg(long, default_value_t = false)]
explain: bool,

#[arg(long, default_value_t = false)]
explain_analyze: bool,

#[arg(long, value_delimiter = ',', value_parser = value_parser!(Format))]
formats: Vec<Format>,

Expand Down Expand Up @@ -162,10 +161,18 @@ async fn main() -> anyhow::Result<()> {
Arc::new(Mutex::new(Vec::new()));
let show_metrics = args.show_metrics;

let mode = if args.explain {
BenchmarkMode::Explain
} else {
BenchmarkMode::Run {
iterations: args.iterations,
}
};

runner
.run_all_async(
&filtered_queries,
args.iterations,
mode,
|format| {
let benchmark = &*benchmark;
async move {
Expand All @@ -187,7 +194,6 @@ async fn main() -> anyhow::Result<()> {
.with_labelset(get_labelset_from_global())
.await?;
let time = timer.elapsed();
let row_count = batches.iter().map(|batch| batch.num_rows()).sum::<usize>();

// Store plan for metrics (only store once per query/format combination)
if show_metrics {
Expand All @@ -201,23 +207,25 @@ async fn main() -> anyhow::Result<()> {
}
}

anyhow::Ok((row_count, Some(time), plan))
anyhow::Ok((Some(time), DataFusionQueryResult(batches)))
}
.with_labelset(labelset),
)
},
)
.await?;

// Print metrics if requested
if show_metrics {
let plans = collected_plans.lock();
print_metrics(plans.as_ref());
}
if !args.explain {
// Print metrics if requested
if show_metrics {
let plans = collected_plans.lock();
print_metrics(plans.as_ref());
}

let benchmark_id = format!("datafusion-{}", benchmark.dataset_name());
let writer = create_output_writer(&args.display_format, args.output_path, &benchmark_id)?;
runner.export_to(&args.display_format, writer)?;
let benchmark_id = format!("datafusion-{}", benchmark.dataset_name());
let writer = create_output_writer(&args.display_format, args.output_path, &benchmark_id)?;
runner.export_to(&args.display_format, writer)?;
}

Ok(())
}
Expand Down Expand Up @@ -379,6 +387,21 @@ async fn register_arrow_tables<B: Benchmark + ?Sized>(
Ok(())
}

/// Wrapper around DataFusion record batches implementing `BenchmarkQueryResult`.
pub struct DataFusionQueryResult(pub Vec<RecordBatch>);

impl BenchmarkQueryResult for DataFusionQueryResult {
fn row_count(&self) -> usize {
self.0.iter().map(|batch| batch.num_rows()).sum()
}

fn display(self) -> String {
datafusion::arrow::util::pretty::pretty_format_batches(&self.0)
.map(|d| d.to_string())
.unwrap_or_else(|e| format!("<error: {e}>"))
}
}

pub async fn execute_query(
ctx: &SessionContext,
query: &str,
Expand Down
31 changes: 31 additions & 0 deletions benchmarks/duckdb-bench/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

//! DuckDB context for benchmarks.

use std::ops::Deref;
use std::path::PathBuf;
use std::time::Duration;
use std::time::Instant;
Expand All @@ -14,9 +15,11 @@ use vortex_bench::Benchmark;
use vortex_bench::Format;
use vortex_bench::IdempotentPath;
use vortex_bench::generate_duckdb_registration_sql;
use vortex_bench::runner::BenchmarkQueryResult;
use vortex_duckdb::duckdb::Config;
use vortex_duckdb::duckdb::Connection;
use vortex_duckdb::duckdb::Database;
use vortex_duckdb::duckdb::QueryResult;

/// DuckDB context for benchmarks.
pub struct DuckClient {
Expand Down Expand Up @@ -189,4 +192,32 @@ impl DuckClient {

Ok(())
}

/// Execute a query and return a `DuckQueryResult` wrapper.
pub fn execute_query_result(&self, query: &str) -> Result<(Option<Duration>, DuckQueryResult)> {
trace!("execute duckdb query: {query}");
let time_instant = Instant::now();
let result = self.connection().query(query)?;
let query_time = time_instant.elapsed();
Ok((Some(query_time), DuckQueryResult(result)))
}
}

/// Wrapper around DuckDB's `QueryResult` implementing `BenchmarkQueryResult`.
pub struct DuckQueryResult(pub QueryResult);

impl BenchmarkQueryResult for DuckQueryResult {
fn row_count(&self) -> usize {
usize::try_from(self.0.row_count()).unwrap_or(0)
}

fn display(self) -> String {
let mut output = String::new();
for chunk in self.0 {
let chunk_str =
String::try_from(chunk.deref()).unwrap_or_else(|_| "<error>".to_string());
output.push_str(&chunk_str);
}
output
}
}
49 changes: 16 additions & 33 deletions benchmarks/duckdb-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

mod validation;

use std::ops::Deref;
use std::path::PathBuf;

use clap::Parser;
Expand All @@ -21,6 +20,7 @@ use vortex_bench::conversions::convert_parquet_directory_to_vortex;
use vortex_bench::create_benchmark;
use vortex_bench::create_output_writer;
use vortex_bench::display::DisplayFormat;
use vortex_bench::runner::BenchmarkMode;
use vortex_bench::runner::SqlBenchmarkRunner;
use vortex_bench::runner::filter_queries;
use vortex_bench::setup_logging_and_tracing;
Expand Down Expand Up @@ -139,33 +139,6 @@ fn main() -> anyhow::Result<()> {
})?;
}

if args.explain {
for format in &args.formats {
let ctx = DuckClient::new(
&*benchmark,
*format,
args.delete_duckdb_database,
args.threads,
)?;
ctx.register_tables(&*benchmark, *format)?;

for (query_idx, query) in &filtered_queries {
println!("=== Q{query_idx} [{format}] ===");
println!("{query}");
println!();
let result = ctx.connection().query(&format!("EXPLAIN {query}"))?;
for chunk in result {
let chunk_str =
String::try_from(chunk.deref()).unwrap_or_else(|_| "<error>".to_string());
println!("{chunk_str}");
}
println!();
}
}

return Ok(());
}

let mut runner = SqlBenchmarkRunner::new(
&*benchmark,
Engine::DuckDB,
Expand All @@ -176,9 +149,17 @@ fn main() -> anyhow::Result<()> {

let benchmark_name = benchmark.dataset().to_string();

let mode = if args.explain {
BenchmarkMode::Explain
} else {
BenchmarkMode::Run {
iterations: args.iterations,
}
};

runner.run_all(
&filtered_queries,
args.iterations,
mode,
|format| {
let ctx = DuckClient::new(
&*benchmark,
Expand All @@ -200,13 +181,15 @@ fn main() -> anyhow::Result<()> {
if !args.reuse {
ctx.reopen()?;
}
ctx.execute_query(query)
ctx.execute_query_result(query)
},
)?;

let benchmark_id = format!("duckdb-{}", benchmark.dataset_name());
let writer = create_output_writer(&args.display_format, args.output_path, &benchmark_id)?;
runner.export_to(&args.display_format, writer)?;
if !args.explain {
let benchmark_id = format!("duckdb-{}", benchmark.dataset_name());
let writer = create_output_writer(&args.display_format, args.output_path, &benchmark_id)?;
runner.export_to(&args.display_format, writer)?;
}

Ok(())
}
27 changes: 23 additions & 4 deletions benchmarks/lance-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use vortex_bench::Opts;
use vortex_bench::create_benchmark;
use vortex_bench::create_output_writer;
use vortex_bench::display::DisplayFormat;
use vortex_bench::runner::BenchmarkMode;
use vortex_bench::runner::BenchmarkQueryResult;
use vortex_bench::runner::SqlBenchmarkRunner;
use vortex_bench::runner::filter_queries;
use vortex_bench::setup_logging_and_tracing;
Expand Down Expand Up @@ -99,7 +101,9 @@ async fn main() -> anyhow::Result<()> {
runner
.run_all_async(
&filtered_queries,
args.iterations,
BenchmarkMode::Run {
iterations: args.iterations,
},
|_format| async {
let session = SessionContext::new();
register_lance_tables(&session, &*benchmark).await?;
Expand All @@ -108,10 +112,9 @@ async fn main() -> anyhow::Result<()> {
|_query_idx, session, query| {
Box::pin(async move {
let timer = Instant::now();
let (batches, plan) = execute_query(session, query).await?;
let (batches, _plan) = execute_query(session, query).await?;
let time = timer.elapsed();
let row_count = batches.iter().map(|batch| batch.num_rows()).sum::<usize>();
anyhow::Ok((row_count, Some(time), plan))
anyhow::Ok((Some(time), LanceQueryResult(batches)))
})
},
)
Expand Down Expand Up @@ -148,6 +151,22 @@ async fn register_lance_tables<B: Benchmark + ?Sized>(
Ok(())
}

/// Wrapper around Lance/DataFusion record batches implementing `BenchmarkQueryResult`.
struct LanceQueryResult(Vec<RecordBatch>);

impl BenchmarkQueryResult for LanceQueryResult {
fn row_count(&self) -> usize {
self.0.iter().map(|batch| batch.num_rows()).sum()
}

fn display(self) -> String {
// Lance uses the same Arrow RecordBatch type
lance::deps::datafusion::arrow::util::pretty::pretty_format_batches(&self.0)
.map(|d| d.to_string())
.unwrap_or_else(|e| format!("<error: {e}>"))
}
}

pub async fn execute_query(
ctx: &SessionContext,
query: &str,
Expand Down
Loading
Loading