Skip to content

Commit ddfd9f7

Browse files
committed
feat(datafusion): file pruning based on pushdown limit for partition cols filters
Signed-off-by: Adrian Tanase <[email protected]>
1 parent 3d2bbef commit ddfd9f7

File tree

2 files changed

+90
-20
lines changed

2 files changed

+90
-20
lines changed

crates/core/src/delta_datafusion/mod.rs

+51-18
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use datafusion::execution::context::{SessionConfig, SessionContext, SessionState
4949
use datafusion::execution::runtime_env::RuntimeEnv;
5050
use datafusion::execution::FunctionRegistry;
5151
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
52-
use datafusion::physical_optimizer::pruning::PruningPredicate;
52+
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
5353
use datafusion_common::scalar::ScalarValue;
5454
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
5555
use datafusion_common::{
@@ -570,31 +570,64 @@ impl<'a> DeltaScanBuilder<'a> {
570570
(files, files_scanned, 0)
571571
}
572572
None => {
573-
if let Some(predicate) = &logical_filter {
574-
let pruning_predicate =
575-
PruningPredicate::try_new(predicate.clone(), logical_schema.clone())?;
576-
let files_to_prune = pruning_predicate.prune(self.snapshot)?;
577-
let mut files_pruned = 0usize;
578-
let files = self
573+
// early return in case we have no push down filters or limit
574+
if logical_filter.is_none() && self.limit.is_none() {
575+
let files = self.snapshot.file_actions()?;
576+
let files_scanned = files.len();
577+
(files, files_scanned, 0)
578+
} else {
579+
let num_containers = self.snapshot.num_containers();
580+
581+
let files_to_prune = if let Some(predicate) = &logical_filter {
582+
let pruning_predicate =
583+
PruningPredicate::try_new(predicate.clone(), logical_schema.clone())?;
584+
pruning_predicate.prune(self.snapshot)?
585+
} else {
586+
vec![true; num_containers]
587+
};
588+
589+
// needed to enforce limit and deal with missing statistics
590+
// rust port of https://github.com/delta-io/delta/pull/1495
591+
let mut pruned_without_stats = vec![];
592+
let mut rows_collected = 0;
593+
let mut files = vec![];
594+
595+
for (action, keep) in self
579596
.snapshot
580597
.file_actions_iter()?
581598
.zip(files_to_prune.into_iter())
582-
.filter_map(|(action, keep)| {
583-
if keep {
584-
Some(action.to_owned())
599+
{
600+
// prune file based on predicate pushdown
601+
if keep {
602+
// prune file based on limit pushdown
603+
if let Some(limit) = self.limit {
604+
if let Some(stats) = action.get_stats()? {
605+
if rows_collected <= limit as i64 {
606+
rows_collected += stats.num_records;
607+
files.push(action.to_owned());
608+
} else {
609+
break;
610+
}
611+
} else {
612+
// some files are missing stats; skipping but storing them
613+
// in a list in case we can't reach the target limit
614+
pruned_without_stats.push(action.to_owned());
615+
}
585616
} else {
586-
files_pruned += 1;
587-
None
617+
files.push(action.to_owned());
588618
}
589-
})
590-
.collect::<Vec<_>>();
619+
}
620+
}
621+
622+
if let Some(limit) = self.limit {
623+
if rows_collected < limit as i64 {
624+
files.extend(pruned_without_stats);
625+
}
626+
}
591627

592628
let files_scanned = files.len();
629+
let files_pruned = num_containers - files_scanned;
593630
(files, files_scanned, files_pruned)
594-
} else {
595-
let files = self.snapshot.file_actions()?;
596-
let files_scanned = files.len();
597-
(files, files_scanned, 0)
598631
}
599632
}
600633
};

crates/core/tests/integration_datafusion.rs

+39-2
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,34 @@ mod local {
188188
Ok(())
189189
}
190190

191+
#[tokio::test]
192+
async fn test_files_scanned_pushdown_limit() -> Result<()> {
193+
use datafusion::prelude::*;
194+
let ctx = SessionContext::new();
195+
let state = ctx.state();
196+
let table = open_table("../test/tests/data/delta-0.8.0")
197+
.await?;
198+
199+
// Simple Equality test, we only exercise the limit in this test
200+
let e = col("value").eq(lit(2));
201+
let metrics = get_scan_metrics(&table, &state, &[e.clone()]).await?;
202+
assert_eq!(metrics.num_scanned_files(), 2);
203+
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
204+
assert_eq!(metrics.skip_count, 0);
205+
206+
let metrics = get_scan_metrics_with_limit(&table, &state, &[e.clone()], Some(1)).await?;
207+
assert_eq!(metrics.num_scanned_files(), 1);
208+
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
209+
assert_eq!(metrics.skip_count, 1);
210+
211+
let metrics = get_scan_metrics_with_limit(&table, &state, &[e.clone()], Some(3)).await?;
212+
assert_eq!(metrics.num_scanned_files(), 2);
213+
assert_eq!(metrics.num_scanned_files(), metrics.keep_count);
214+
assert_eq!(metrics.skip_count, 0);
215+
216+
Ok(())
217+
}
218+
191219
#[tokio::test]
192220
async fn test_datafusion_write_from_serialized_delta_scan() -> Result<()> {
193221
// Build an execution plan for scanning a DeltaTable and serialize it to bytes.
@@ -416,13 +444,14 @@ mod local {
416444
Ok(())
417445
}
418446

419-
async fn get_scan_metrics(
447+
async fn get_scan_metrics_with_limit(
420448
table: &DeltaTable,
421449
state: &SessionState,
422450
e: &[Expr],
451+
limit: Option<usize>,
423452
) -> Result<ExecutionMetricsCollector> {
424453
let mut metrics = ExecutionMetricsCollector::default();
425-
let scan = table.scan(state, None, e, None).await?;
454+
let scan = table.scan(state, None, e, limit).await?;
426455
if scan.properties().output_partitioning().partition_count() > 0 {
427456
let plan = CoalescePartitionsExec::new(scan);
428457
let task_ctx = Arc::new(TaskContext::from(state));
@@ -437,6 +466,14 @@ mod local {
437466
Ok(metrics)
438467
}
439468

469+
async fn get_scan_metrics(
470+
table: &DeltaTable,
471+
state: &SessionState,
472+
e: &[Expr],
473+
) -> Result<ExecutionMetricsCollector> {
474+
get_scan_metrics_with_limit(table, state, e, None).await
475+
}
476+
440477
fn create_all_types_batch(
441478
not_null_rows: usize,
442479
null_rows: usize,

0 commit comments

Comments
 (0)