Skip to content

Commit 1eca9af

Browse files
committed
FIXUP - only push down partial filters into parquet exec
Signed-off-by: Adrian Tanase <[email protected]>
1 parent adb635f commit 1eca9af

File tree

1 file changed

+35
-14
lines changed
  • crates/core/src/delta_datafusion

1 file changed

+35
-14
lines changed

crates/core/src/delta_datafusion/mod.rs

+35-14
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ use datafusion_common::{
6060
use datafusion_expr::execution_props::ExecutionProps;
6161
use datafusion_expr::logical_plan::CreateExternalTable;
6262
use datafusion_expr::simplify::SimplifyContext;
63-
use datafusion_expr::utils::conjunction;
63+
use datafusion_expr::utils::{conjunction, split_conjunction};
6464
use datafusion_expr::{
6565
col, BinaryExpr, Expr, Extension, LogicalPlan, Operator, TableProviderFilterPushDown,
6666
Volatility,
@@ -565,18 +565,25 @@ impl<'a> DeltaScanBuilder<'a> {
565565
let context = SessionContext::new();
566566
let df_schema = logical_schema.clone().to_dfschema()?;
567567

568-
let logical_filter = self.filter.map(|expr| {
569-
// Simplify the expression first
570-
let props = ExecutionProps::new();
571-
let simplify_context =
572-
SimplifyContext::new(&props).with_schema(df_schema.clone().into());
573-
let simplifier = ExprSimplifier::new(simplify_context).with_max_cycles(10);
574-
let simplified = simplifier.simplify(expr).unwrap();
575-
576-
context
577-
.create_physical_expr(simplified, &df_schema)
578-
.unwrap()
579-
});
568+
let logical_filter = self.filter.clone()
569+
.map(|expr| simplify_predicate(&context, &df_schema, expr));
570+
let pushdown_filter = self.filter.and_then(|expr| {
571+
let predicates = split_conjunction(&expr);
572+
let pushdown_filters =
573+
get_pushdown_filters(&predicates, self.snapshot.metadata().partition_columns.clone());
574+
575+
let filtered_predicates = predicates
576+
.into_iter()
577+
.zip(pushdown_filters.into_iter())
578+
.filter_map(|(filter, pushdown)| {
579+
if pushdown == TableProviderFilterPushDown::Inexact {
580+
Some(filter.clone())
581+
} else {
582+
None
583+
}
584+
});
585+
conjunction(filtered_predicates)
586+
}).map(|expr| simplify_predicate(&context, &df_schema, expr));
580587

581588
let mut pruning_mask: Option<_> = None;
582589

@@ -728,7 +735,7 @@ impl<'a> DeltaScanBuilder<'a> {
728735
// Sometimes (i.e Merge) we want to prune files that don't make the
729736
// filter and read the entire contents for files that do match the
730737
// filter
731-
if let Some(predicate) = logical_filter {
738+
if let Some(predicate) = pushdown_filter {
732739
if config.enable_parquet_pushdown {
733740
file_source = file_source.with_predicate(Arc::clone(&file_schema), predicate);
734741
}
@@ -771,6 +778,20 @@ impl<'a> DeltaScanBuilder<'a> {
771778
metrics,
772779
})
773780
}
781+
782+
}
783+
784+
fn simplify_predicate(context: &SessionContext, df_schema: &DFSchema, expr: Expr) -> Arc<dyn PhysicalExpr> {
785+
// Simplify the expression first
786+
let props = ExecutionProps::new();
787+
let simplify_context =
788+
SimplifyContext::new(&props).with_schema(df_schema.clone().into());
789+
let simplifier = ExprSimplifier::new(simplify_context).with_max_cycles(10);
790+
let simplified = simplifier.simplify(expr).unwrap();
791+
792+
context
793+
.create_physical_expr(simplified, &df_schema)
794+
.unwrap()
774795
}
775796

776797
fn prune_file_statistics(

0 commit comments

Comments
 (0)