Skip to content

Commit 47cc99e

Browse files
committed
FIXUP - only push down partial filters into parquet exec
Signed-off-by: Adrian Tanase <[email protected]>
1 parent adb635f commit 47cc99e

File tree

1 file changed

+25
-2
lines changed
  • crates/core/src/delta_datafusion

1 file changed

+25
-2
lines changed

crates/core/src/delta_datafusion/mod.rs

+25-2
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ use datafusion_common::{
6060
use datafusion_expr::execution_props::ExecutionProps;
6161
use datafusion_expr::logical_plan::CreateExternalTable;
6262
use datafusion_expr::simplify::SimplifyContext;
63-
use datafusion_expr::utils::conjunction;
63+
use datafusion_expr::utils::{conjunction, split_conjunction};
6464
use datafusion_expr::{
6565
col, BinaryExpr, Expr, Extension, LogicalPlan, Operator, TableProviderFilterPushDown,
6666
Volatility,
@@ -565,6 +565,8 @@ impl<'a> DeltaScanBuilder<'a> {
565565
let context = SessionContext::new();
566566
let df_schema = logical_schema.clone().to_dfschema()?;
567567

568+
// TODO: extract to method and untangle pushdown filter from the full logical filter
569+
let mut pushdown_filter = None;
568570
let logical_filter = self.filter.map(|expr| {
569571
// Simplify the expression first
570572
let props = ExecutionProps::new();
@@ -573,6 +575,27 @@ impl<'a> DeltaScanBuilder<'a> {
573575
let simplifier = ExprSimplifier::new(simplify_context).with_max_cycles(10);
574576
let simplified = simplifier.simplify(expr).unwrap();
575577

578+
// FIXME: we probably want to do this before simplification
579+
let predicates = split_conjunction(&simplified);
580+
let pushdown_filters =
581+
get_pushdown_filters(&predicates, self.snapshot.metadata().partition_columns.clone());
582+
583+
let filtered_predicates = predicates
584+
.into_iter()
585+
.zip(pushdown_filters.into_iter())
586+
.filter_map(|(filter, pushdown)| {
587+
if pushdown == TableProviderFilterPushDown::Inexact {
588+
Some(filter.clone())
589+
} else {
590+
None
591+
}
592+
});
593+
let pushdown_filter = conjunction(filtered_predicates).map(|e| {
594+
context
595+
.create_physical_expr(e, &df_schema)
596+
.unwrap()
597+
});
598+
576599
context
577600
.create_physical_expr(simplified, &df_schema)
578601
.unwrap()
@@ -728,7 +751,7 @@ impl<'a> DeltaScanBuilder<'a> {
728751
// Sometimes (i.e Merge) we want to prune files that don't make the
729752
// filter and read the entire contents for files that do match the
730753
// filter
731-
if let Some(predicate) = logical_filter {
754+
if let Some(predicate) = pushdown_filter {
732755
if config.enable_parquet_pushdown {
733756
file_source = file_source.with_predicate(Arc::clone(&file_schema), predicate);
734757
}

0 commit comments

Comments
 (0)