Skip to content

Commit f9c3035

Browse files
committed
feat(datafusion): file pruning based on pushdown limit for partition cols filters
1 parent 58d07b9 commit f9c3035

File tree

1 file changed

+51
-18
lines changed
  • crates/core/src/delta_datafusion

1 file changed

+51
-18
lines changed

crates/core/src/delta_datafusion/mod.rs

Lines changed: 51 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ use datafusion::execution::context::{SessionConfig, SessionContext, SessionState
4747
use datafusion::execution::runtime_env::RuntimeEnv;
4848
use datafusion::execution::FunctionRegistry;
4949
use datafusion::optimizer::simplify_expressions::ExprSimplifier;
50-
use datafusion::physical_optimizer::pruning::PruningPredicate;
50+
use datafusion::physical_optimizer::pruning::{PruningPredicate, PruningStatistics};
5151
use datafusion_common::scalar::ScalarValue;
5252
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
5353
use datafusion_common::{
@@ -573,31 +573,64 @@ impl<'a> DeltaScanBuilder<'a> {
573573
(files, files_scanned, 0)
574574
}
575575
None => {
576-
if let Some(predicate) = &logical_filter {
577-
let pruning_predicate =
578-
PruningPredicate::try_new(predicate.clone(), logical_schema.clone())?;
579-
let files_to_prune = pruning_predicate.prune(self.snapshot)?;
580-
let mut files_pruned = 0usize;
581-
let files = self
576+
// early return in case we have no push down filters or limit
577+
if logical_filter.is_none() && self.limit.is_none() {
578+
let files = self.snapshot.file_actions()?;
579+
let files_scanned = files.len();
580+
(files, files_scanned, 0)
581+
} else {
582+
let num_containers = self.snapshot.num_containers();
583+
584+
let files_to_prune = if let Some(predicate) = &logical_filter {
585+
let pruning_predicate =
586+
PruningPredicate::try_new(predicate.clone(), logical_schema.clone())?;
587+
pruning_predicate.prune(self.snapshot)?
588+
} else {
589+
vec![true; num_containers]
590+
};
591+
592+
// needed to enforce limit and deal with missing statistics
593+
// rust port of https://github.com/delta-io/delta/pull/1495
594+
let mut pruned_without_stats = vec![];
595+
let mut rows_collected = 0;
596+
let mut files = vec![];
597+
598+
for (action, keep) in self
582599
.snapshot
583600
.file_actions_iter()?
584601
.zip(files_to_prune.into_iter())
585-
.filter_map(|(action, keep)| {
586-
if keep {
587-
Some(action.to_owned())
602+
{
603+
// prune file based on predicate pushdown
604+
if keep {
605+
// prune file based on limit pushdown
606+
if let Some(limit) = self.limit {
607+
if let Some(stats) = action.get_stats()? {
608+
if rows_collected <= limit as i64 {
609+
rows_collected += stats.num_records;
610+
files.push(action.to_owned());
611+
} else {
612+
break;
613+
}
614+
} else {
615+
// some files are missing stats; skipping but storing them
616+
// in a list in case we can't reach the target limit
617+
pruned_without_stats.push(action.to_owned());
618+
}
588619
} else {
589-
files_pruned += 1;
590-
None
620+
files.push(action.to_owned());
591621
}
592-
})
593-
.collect::<Vec<_>>();
622+
}
623+
}
624+
625+
if let Some(limit) = self.limit {
626+
if rows_collected < limit as i64 {
627+
files.extend(pruned_without_stats);
628+
}
629+
}
594630

595631
let files_scanned = files.len();
632+
let files_pruned = num_containers - files_scanned;
596633
(files, files_scanned, files_pruned)
597-
} else {
598-
let files = self.snapshot.file_actions()?;
599-
let files_scanned = files.len();
600-
(files, files_scanned, 0)
601634
}
602635
}
603636
};

0 commit comments

Comments
 (0)