Skip to content

Commit ceed00f

Browse files
committed
add push down limit scan test
Signed-off-by: Adrian Tanase <[email protected]>
1 parent 6ee079a commit ceed00f

File tree

1 file changed

+40
-2
lines changed

1 file changed

+40
-2
lines changed

crates/core/tests/integration_datafusion.rs

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,34 @@ mod local {
285285
Ok(())
286286
}
287287

288+
#[tokio::test]
289+
async fn test_files_scanned_pushdown_limit() -> Result<()> {
290+
use datafusion::prelude::*;
291+
let ctx = SessionContext::new();
292+
let state = ctx.state();
293+
let table = open_table("../test/tests/data/delta-0.8.0")
294+
.await?;
295+
296+
// Simple Equality test, we only exercise the limit in this test
297+
let e = col("value").eq(lit(2));
298+
let metrics = get_scan_metrics(&table, &state, &[e.clone()]).await?;
299+
assert_eq!(metrics.num_scanned_files(), 2);
300+
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
301+
assert_eq!(metrics.skip_count, 0);
302+
303+
let metrics = get_scan_metrics_with_limit(&table, &state, &[e.clone()], Some(1)).await?;
304+
assert_eq!(metrics.num_scanned_files(), 1);
305+
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
306+
assert_eq!(metrics.skip_count, 1);
307+
308+
let metrics = get_scan_metrics_with_limit(&table, &state, &[e.clone()], Some(3)).await?;
309+
assert_eq!(metrics.num_scanned_files(), 2);
310+
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
311+
assert_eq!(metrics.skip_count, 0);
312+
313+
Ok(())
314+
}
315+
288316
#[tokio::test]
289317
async fn test_datafusion_write_from_serialized_delta_scan() -> Result<()> {
290318
// Build an execution plan for scanning a DeltaTable and serialize it to bytes.
@@ -513,13 +541,15 @@ mod local {
513541
Ok(())
514542
}
515543

516-
async fn get_scan_metrics(
544+
async fn get_scan_metrics_with_limit(
517545
table: &DeltaTable,
518546
state: &SessionState,
519547
e: &[Expr],
548+
limit: Option<usize>,
520549
) -> Result<ExecutionMetricsCollector> {
521550
let mut metrics = ExecutionMetricsCollector::default();
522-
let scan = table.scan(state, None, e, None).await?;
551+
let scan = table.scan(state, None, e, limit).await?;
552+
println!("scan {:?}", scan);
523553
if scan.properties().output_partitioning().partition_count() > 0 {
524554
let plan = CoalescePartitionsExec::new(scan);
525555
let task_ctx = Arc::new(TaskContext::from(state));
@@ -534,6 +564,14 @@ mod local {
534564
Ok(metrics)
535565
}
536566

567+
async fn get_scan_metrics(
568+
table: &DeltaTable,
569+
state: &SessionState,
570+
e: &[Expr],
571+
) -> Result<ExecutionMetricsCollector> {
572+
get_scan_metrics_with_limit(table, state, e, None).await
573+
}
574+
537575
fn create_all_types_batch(
538576
not_null_rows: usize,
539577
null_rows: usize,

0 commit comments

Comments
 (0)