Skip to content

Commit 7ea7975

Browse files
committed
feat(datafusion): file pruning based on pushdown limit for partition cols filters
Signed-off-by: Adrian Tanase <[email protected]>
1 parent 2e60024 commit 7ea7975

File tree

2 files changed

+90
-20
lines changed

2 files changed

+90
-20
lines changed

crates/core/src/delta_datafusion/mod.rs

+51-18
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use datafusion::execution::context::{SessionConfig, SessionContext, SessionState
4747
use datafusion::execution::runtime_env::RuntimeEnv;
4848
use datafusion::execution::FunctionRegistry;
4949
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
50-
use datafusion::physical_optimizer::pruning::PruningPredicate;
50+
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
5151
use datafusion_common::scalar::ScalarValue;
5252
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
5353
use datafusion_common::{
@@ -581,31 +581,64 @@ impl<'a> DeltaScanBuilder<'a> {
581581
(files, files_scanned, 0)
582582
}
583583
None => {
584-
if let Some(predicate) = &logical_filter {
585-
let pruning_predicate =
586-
PruningPredicate::try_new(predicate.clone(), logical_schema.clone())?;
587-
let files_to_prune = pruning_predicate.prune(self.snapshot)?;
588-
let mut files_pruned = 0usize;
589-
let files = self
584+
// early return in case we have no push down filters or limit
585+
if logical_filter.is_none() && self.limit.is_none() {
586+
let files = self.snapshot.file_actions()?;
587+
let files_scanned = files.len();
588+
(files, files_scanned, 0)
589+
} else {
590+
let num_containers = self.snapshot.num_containers();
591+
592+
let files_to_prune = if let Some(predicate) = &logical_filter {
593+
let pruning_predicate =
594+
PruningPredicate::try_new(predicate.clone(), logical_schema.clone())?;
595+
pruning_predicate.prune(self.snapshot)?
596+
} else {
597+
vec![true; num_containers]
598+
};
599+
600+
// needed to enforce limit and deal with missing statistics
601+
// rust port of https://github.com/delta-io/delta/pull/1495
602+
let mut pruned_without_stats = vec![];
603+
let mut rows_collected = 0;
604+
let mut files = vec![];
605+
606+
for (action, keep) in self
590607
.snapshot
591608
.file_actions_iter()?
592609
.zip(files_to_prune.into_iter())
593-
.filter_map(|(action, keep)| {
594-
if keep {
595-
Some(action.to_owned())
610+
{
611+
// prune file based on predicate pushdown
612+
if keep {
613+
// prune file based on limit pushdown
614+
if let Some(limit) = self.limit {
615+
if let Some(stats) = action.get_stats()? {
616+
if rows_collected <= limit as i64 {
617+
rows_collected += stats.num_records;
618+
files.push(action.to_owned());
619+
} else {
620+
break;
621+
}
622+
} else {
623+
// some files are missing stats; skipping but storing them
624+
// in a list in case we can't reach the target limit
625+
pruned_without_stats.push(action.to_owned());
626+
}
596627
} else {
597-
files_pruned += 1;
598-
None
628+
files.push(action.to_owned());
599629
}
600-
})
601-
.collect::<Vec<_>>();
630+
}
631+
}
632+
633+
if let Some(limit) = self.limit {
634+
if rows_collected < limit as i64 {
635+
files.extend(pruned_without_stats);
636+
}
637+
}
602638

603639
let files_scanned = files.len();
640+
let files_pruned = num_containers - files_scanned;
604641
(files, files_scanned, files_pruned)
605-
} else {
606-
let files = self.snapshot.file_actions()?;
607-
let files_scanned = files.len();
608-
(files, files_scanned, 0)
609642
}
610643
}
611644
};

crates/core/tests/integration_datafusion.rs

+39-2
Original file line numberDiff line numberDiff line change
@@ -285,6 +285,34 @@ mod local {
285285
Ok(())
286286
}
287287

288+
#[tokio::test]
289+
async fn test_files_scanned_pushdown_limit() -> Result<()> {
290+
use datafusion::prelude::*;
291+
let ctx = SessionContext::new();
292+
let state = ctx.state();
293+
let table = open_table("../test/tests/data/delta-0.8.0")
294+
.await?;
295+
296+
// Simple Equality test, we only exercise the limit in this test
297+
let e = col("value").eq(lit(2));
298+
let metrics = get_scan_metrics(&table, &state, &[e.clone()]).await?;
299+
assert_eq!(metrics.num_scanned_files(), 2);
300+
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
301+
assert_eq!(metrics.skip_count, 0);
302+
303+
let metrics = get_scan_metrics_with_limit(&table, &state, &[e.clone()], Some(1)).await?;
304+
assert_eq!(metrics.num_scanned_files(), 1);
305+
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
306+
assert_eq!(metrics.skip_count, 1);
307+
308+
let metrics = get_scan_metrics_with_limit(&table, &state, &[e.clone()], Some(3)).await?;
309+
assert_eq!(metrics.num_scanned_files(), 2);
310+
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
311+
assert_eq!(metrics.skip_count, 0);
312+
313+
Ok(())
314+
}
315+
288316
#[tokio::test]
289317
async fn test_datafusion_write_from_serialized_delta_scan() -> Result<()> {
290318
// Build an execution plan for scanning a DeltaTable and serialize it to bytes.
@@ -513,13 +541,14 @@ mod local {
513541
Ok(())
514542
}
515543

516-
async fn get_scan_metrics(
544+
async fn get_scan_metrics_with_limit(
517545
table: &DeltaTable,
518546
state: &SessionState,
519547
e: &[Expr],
548+
limit: Option<usize>,
520549
) -> Result<ExecutionMetricsCollector> {
521550
let mut metrics = ExecutionMetricsCollector::default();
522-
let scan = table.scan(state, None, e, None).await?;
551+
let scan = table.scan(state, None, e, limit).await?;
523552
if scan.properties().output_partitioning().partition_count() > 0 {
524553
let plan = CoalescePartitionsExec::new(scan);
525554
let task_ctx = Arc::new(TaskContext::from(state));
@@ -534,6 +563,14 @@ mod local {
534563
Ok(metrics)
535564
}
536565

566+
async fn get_scan_metrics(
567+
table: &DeltaTable,
568+
state: &SessionState,
569+
e: &[Expr],
570+
) -> Result<ExecutionMetricsCollector> {
571+
get_scan_metrics_with_limit(table, state, e, None).await
572+
}
573+
537574
fn create_all_types_batch(
538575
not_null_rows: usize,
539576
null_rows: usize,

0 commit comments

Comments
 (0)