Skip to content

Commit 9cfd71b

Browse files
committed
feat(datafusion): optmize partition pruning, pushdown full predicates for DF integration
Signed-off-by: Adrian Tanase <[email protected]>
1 parent ddfd9f7 commit 9cfd71b

File tree

3 files changed

+355
-46
lines changed

3 files changed

+355
-46
lines changed

crates/core/src/delta_datafusion/mod.rs

+174-33
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,17 @@ use std::fmt::{self, Debug};
2727
use std::sync::Arc;
2828

2929
use arrow_array::types::UInt16Type;
30-
use arrow_array::{Array, DictionaryArray, RecordBatch, StringArray, TypedDictionaryArray};
30+
use arrow_array::{
31+
Array, BooleanArray, DictionaryArray, RecordBatch, StringArray, TypedDictionaryArray,
32+
};
3133
use arrow_cast::display::array_value_to_string;
3234
use arrow_cast::{cast_with_options, CastOptions};
3335
use arrow_schema::{
3436
ArrowError, DataType as ArrowDataType, Field, Schema as ArrowSchema, SchemaRef,
3537
SchemaRef as ArrowSchemaRef, TimeUnit,
3638
};
3739
use arrow_select::concat::concat_batches;
40+
use arrow_select::filter::filter_record_batch;
3841
use async_trait::async_trait;
3942
use chrono::{DateTime, TimeZone, Utc};
4043
use datafusion::catalog::memory::DataSourceExec;
@@ -59,8 +62,11 @@ use datafusion_common::{
5962
use datafusion_expr::execution_props::ExecutionProps;
6063
use datafusion_expr::logical_plan::CreateExternalTable;
6164
use datafusion_expr::simplify::SimplifyContext;
62-
use datafusion_expr::utils::conjunction;
63-
use datafusion_expr::{col, Expr, Extension, LogicalPlan, TableProviderFilterPushDown, Volatility};
65+
use datafusion_expr::utils::{conjunction, split_conjunction};
66+
use datafusion_expr::{
67+
col, BinaryExpr, Expr, Extension, LogicalPlan, Operator, TableProviderFilterPushDown,
68+
Volatility,
69+
};
6470
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
6571
use datafusion_physical_plan::filter::FilterExec;
6672
use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
@@ -86,7 +92,9 @@ use url::Url;
8692
use crate::delta_datafusion::expr::parse_predicate_expression;
8793
use crate::delta_datafusion::schema_adapter::DeltaSchemaAdapterFactory;
8894
use crate::errors::{DeltaResult, DeltaTableError};
89-
use crate::kernel::{Add, DataCheck, EagerSnapshot, Invariant, Snapshot, StructTypeExt};
95+
use crate::kernel::{
96+
Add, DataCheck, EagerSnapshot, Invariant, LogDataHandler, Snapshot, StructTypeExt,
97+
};
9098
use crate::logstore::LogStoreRef;
9199
use crate::table::builder::ensure_table_uri;
92100
use crate::table::state::DeltaTableState;
@@ -541,6 +549,16 @@ impl<'a> DeltaScanBuilder<'a> {
541549
for idx in used_columns {
542550
fields.push(logical_schema.field(*idx).to_owned());
543551
}
552+
// partition filters with Exact pushdown were removed from projection by DF optimizer,
553+
// we need to add them back for the predicate pruning to work
554+
if let Some(expr) = &self.filter {
555+
for c in expr.column_refs() {
556+
let idx = logical_schema.index_of(c.name.as_str())?;
557+
if !used_columns.contains(&idx) {
558+
fields.push(logical_schema.field(idx).to_owned());
559+
}
560+
}
561+
}
544562
Arc::new(ArrowSchema::new(fields))
545563
} else {
546564
logical_schema
@@ -549,32 +567,48 @@ impl<'a> DeltaScanBuilder<'a> {
549567
let context = SessionContext::new();
550568
let df_schema = logical_schema.clone().to_dfschema()?;
551569

552-
let logical_filter = self.filter.map(|expr| {
553-
// Simplify the expression first
554-
let props = ExecutionProps::new();
555-
let simplify_context =
556-
SimplifyContext::new(&props).with_schema(df_schema.clone().into());
557-
let simplifier = ExprSimplifier::new(simplify_context).with_max_cycles(10);
558-
let simplified = simplifier.simplify(expr).unwrap();
570+
let logical_filter = self
571+
.filter
572+
.clone()
573+
.map(|expr| simplify_expr(&context, &df_schema, expr));
574+
// only inexact filters should be pushed down to the data source, doing otherwise
575+
// will make stats inexact and disable datafusion optimizations like AggregateStatistics
576+
let pushdown_filter = self
577+
.filter
578+
.and_then(|expr| {
579+
let predicates = split_conjunction(&expr);
580+
let pushdown_filters = get_pushdown_filters(
581+
&predicates,
582+
self.snapshot.metadata().partition_columns.as_slice(),
583+
);
559584

560-
context
561-
.create_physical_expr(simplified, &df_schema)
562-
.unwrap()
563-
});
585+
let filtered_predicates = predicates
586+
.into_iter()
587+
.zip(pushdown_filters.into_iter())
588+
.filter_map(|(filter, pushdown)| {
589+
if pushdown == TableProviderFilterPushDown::Inexact {
590+
Some(filter.clone())
591+
} else {
592+
None
593+
}
594+
});
595+
conjunction(filtered_predicates)
596+
})
597+
.map(|expr| simplify_expr(&context, &df_schema, expr));
564598

565599
// Perform Pruning of files to scan
566-
let (files, files_scanned, files_pruned) = match self.files {
600+
let (files, files_scanned, files_pruned, pruning_mask) = match self.files {
567601
Some(files) => {
568602
let files = files.to_owned();
569603
let files_scanned = files.len();
570-
(files, files_scanned, 0)
604+
(files, files_scanned, 0, None)
571605
}
572606
None => {
573607
// early return in case we have no push down filters or limit
574608
if logical_filter.is_none() && self.limit.is_none() {
575609
let files = self.snapshot.file_actions()?;
576610
let files_scanned = files.len();
577-
(files, files_scanned, 0)
611+
(files, files_scanned, 0, None)
578612
} else {
579613
let num_containers = self.snapshot.num_containers();
580614

@@ -595,7 +629,7 @@ impl<'a> DeltaScanBuilder<'a> {
595629
for (action, keep) in self
596630
.snapshot
597631
.file_actions_iter()?
598-
.zip(files_to_prune.into_iter())
632+
.zip(files_to_prune.iter().cloned())
599633
{
600634
// prune file based on predicate pushdown
601635
if keep {
@@ -627,7 +661,7 @@ impl<'a> DeltaScanBuilder<'a> {
627661

628662
let files_scanned = files.len();
629663
let files_pruned = num_containers - files_scanned;
630-
(files, files_scanned, files_pruned)
664+
(files, files_scanned, files_pruned, Some(files_to_prune))
631665
}
632666
}
633667
};
@@ -684,10 +718,18 @@ impl<'a> DeltaScanBuilder<'a> {
684718
));
685719
}
686720

687-
let stats = self
688-
.snapshot
689-
.datafusion_table_statistics()
690-
.unwrap_or(Statistics::new_unknown(&schema));
721+
// FIXME - where is the correct place to marry file pruning with statistics pruning?
722+
// Temporarily re-generating the log handler, just so that we can compute the stats.
723+
// Should we update datafusion_table_statistics to optionally take the mask?
724+
let stats = if let Some(mask) = pruning_mask {
725+
let es = self.snapshot.snapshot();
726+
let pruned_stats = prune_file_statistics(&es.files, mask);
727+
LogDataHandler::new(&pruned_stats, es.metadata(), es.schema()).statistics()
728+
} else {
729+
self.snapshot.datafusion_table_statistics()
730+
};
731+
732+
let stats = stats.unwrap_or(Statistics::new_unknown(&schema));
691733

692734
let parquet_options = TableParquetOptions {
693735
global: self.session.config().options().execution.parquet.clone(),
@@ -700,7 +742,7 @@ impl<'a> DeltaScanBuilder<'a> {
700742
// Sometimes (i.e Merge) we want to prune files that don't make the
701743
// filter and read the entire contents for files that do match the
702744
// filter
703-
if let Some(predicate) = logical_filter {
745+
if let Some(predicate) = pushdown_filter {
704746
if config.enable_parquet_pushdown {
705747
file_source = file_source.with_predicate(Arc::clone(&file_schema), predicate);
706748
}
@@ -746,6 +788,43 @@ impl<'a> DeltaScanBuilder<'a> {
746788
}
747789
}
748790

791+
fn simplify_expr(
792+
context: &SessionContext,
793+
df_schema: &DFSchema,
794+
expr: Expr,
795+
) -> Arc<dyn PhysicalExpr> {
796+
// Simplify the expression first
797+
let props = ExecutionProps::new();
798+
let simplify_context = SimplifyContext::new(&props).with_schema(df_schema.clone().into());
799+
let simplifier = ExprSimplifier::new(simplify_context).with_max_cycles(10);
800+
let simplified = simplifier.simplify(expr).unwrap();
801+
802+
context
803+
.create_physical_expr(simplified, &df_schema)
804+
.unwrap()
805+
}
806+
807+
fn prune_file_statistics(
808+
record_batches: &Vec<RecordBatch>,
809+
pruning_mask: Vec<bool>,
810+
) -> Vec<RecordBatch> {
811+
let mut filtered_batches = Vec::new();
812+
let mut mask_offset = 0;
813+
814+
for batch in record_batches {
815+
let num_rows = batch.num_rows();
816+
let batch_mask = &pruning_mask[mask_offset..mask_offset + num_rows];
817+
mask_offset += num_rows;
818+
819+
let boolean_mask = BooleanArray::from(batch_mask.to_vec());
820+
let filtered_batch =
821+
filter_record_batch(batch, &boolean_mask).expect("Failed to filter RecordBatch");
822+
filtered_batches.push(filtered_batch);
823+
}
824+
825+
filtered_batches
826+
}
827+
749828
// TODO: implement this for Snapshot, not for DeltaTable
750829
#[async_trait]
751830
impl TableProvider for DeltaTable {
@@ -793,17 +872,81 @@ impl TableProvider for DeltaTable {
793872
&self,
794873
filter: &[&Expr],
795874
) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
796-
Ok(filter
797-
.iter()
798-
.map(|_| TableProviderFilterPushDown::Inexact)
799-
.collect())
875+
let partition_cols = self.snapshot()?.metadata().partition_columns.as_slice();
876+
Ok(get_pushdown_filters(filter, partition_cols))
800877
}
801878

802879
fn statistics(&self) -> Option<Statistics> {
803880
self.snapshot().ok()?.datafusion_table_statistics()
804881
}
805882
}
806883

884+
fn get_pushdown_filters(
885+
filter: &[&Expr],
886+
partition_cols: &[String],
887+
) -> Vec<TableProviderFilterPushDown> {
888+
filter
889+
.iter()
890+
.cloned()
891+
.map(|expr| {
892+
let applicable = expr_is_exact_predicate_for_cols(partition_cols, expr);
893+
if !expr.column_refs().is_empty() && applicable {
894+
TableProviderFilterPushDown::Exact
895+
} else {
896+
TableProviderFilterPushDown::Inexact
897+
}
898+
})
899+
.collect()
900+
}
901+
902+
// inspired from datafusion::listing::helpers, but adapted to only stats based pruning
903+
fn expr_is_exact_predicate_for_cols(partition_cols: &[String], expr: &Expr) -> bool {
904+
let mut is_applicable = true;
905+
expr.apply(|expr| match expr {
906+
Expr::Column(Column { ref name, .. }) => {
907+
is_applicable &= partition_cols.contains(&name);
908+
909+
// TODO: decide if we should constrain this to Utf8 columns (including views, dicts etc)
910+
911+
if is_applicable {
912+
Ok(TreeNodeRecursion::Jump)
913+
} else {
914+
Ok(TreeNodeRecursion::Stop)
915+
}
916+
}
917+
Expr::BinaryExpr(BinaryExpr { ref op, .. }) => {
918+
is_applicable &= matches!(
919+
op,
920+
Operator::And
921+
| Operator::Or
922+
| Operator::NotEq
923+
| Operator::Eq
924+
| Operator::Gt
925+
| Operator::GtEq
926+
| Operator::Lt
927+
| Operator::LtEq
928+
);
929+
if is_applicable {
930+
Ok(TreeNodeRecursion::Continue)
931+
} else {
932+
Ok(TreeNodeRecursion::Stop)
933+
}
934+
}
935+
Expr::Literal(_)
936+
| Expr::Not(_)
937+
| Expr::IsNotNull(_)
938+
| Expr::IsNull(_)
939+
| Expr::Between(_)
940+
| Expr::InList(_) => Ok(TreeNodeRecursion::Continue),
941+
_ => {
942+
is_applicable = false;
943+
Ok(TreeNodeRecursion::Stop)
944+
}
945+
})
946+
.unwrap();
947+
is_applicable
948+
}
949+
807950
/// A Delta table provider that enables additional metadata columns to be included during the scan
808951
#[derive(Debug)]
809952
pub struct DeltaTableProvider {
@@ -885,10 +1028,8 @@ impl TableProvider for DeltaTableProvider {
8851028
&self,
8861029
filter: &[&Expr],
8871030
) -> DataFusionResult<Vec<TableProviderFilterPushDown>> {
888-
Ok(filter
889-
.iter()
890-
.map(|_| TableProviderFilterPushDown::Inexact)
891-
.collect())
1031+
let partition_cols = self.snapshot.metadata().partition_columns.as_slice();
1032+
Ok(get_pushdown_filters(filter, partition_cols))
8921033
}
8931034

8941035
fn statistics(&self) -> Option<Statistics> {

crates/core/src/kernel/snapshot/log_data.rs

+47-8
Original file line numberDiff line numberDiff line change
@@ -891,14 +891,53 @@ mod datafusion {
891891
arrow_cast::cast(batch.column_by_name("output")?, &ArrowDataType::UInt64).ok()
892892
}
893893

894-
// This function is required since DataFusion 35.0, but is implemented as a no-op
895-
// https://github.com/apache/arrow-datafusion/blob/ec6abece2dcfa68007b87c69eefa6b0d7333f628/datafusion/core/src/datasource/physical_plan/parquet/page_filter.rs#L550
896-
fn contained(
897-
&self,
898-
_column: &Column,
899-
_value: &HashSet<ScalarValue>,
900-
) -> Option<BooleanArray> {
901-
None
894+
// This function is optional but will optimize partition column pruning
895+
fn contained(&self, column: &Column, value: &HashSet<ScalarValue>) -> Option<BooleanArray> {
896+
if value.is_empty() || !self.metadata.partition_columns.contains(&column.name) {
897+
return None;
898+
}
899+
900+
// Retrieve the partition values for the column
901+
let partition_values = self.pick_stats(column, "__dummy__")?;
902+
903+
let partition_values = partition_values
904+
.as_any()
905+
.downcast_ref::<StringArray>()
906+
.ok_or(DeltaTableError::generic(
907+
"failed to downcast string result to StringArray.",
908+
))
909+
.ok()?;
910+
911+
let mut contains = Vec::with_capacity(partition_values.len());
912+
913+
// TODO: this was inspired by parquet's BloomFilter pruning, decide if we should
914+
// just convert to Vec<String> for a subset of column types and use .contains
915+
fn check_scalar(pv: &str, value: &ScalarValue) -> bool {
916+
match value {
917+
ScalarValue::Utf8(Some(v))
918+
| ScalarValue::Utf8View(Some(v))
919+
| ScalarValue::LargeUtf8(Some(v)) => pv == v,
920+
921+
ScalarValue::Dictionary(_, inner) => check_scalar(pv, inner),
922+
// FIXME: is this a good enough default or should we sync this with
923+
// expr_applicable_for_cols and bail out with None
924+
_ => value.to_string() == pv,
925+
}
926+
}
927+
928+
for i in 0..partition_values.len() {
929+
if partition_values.is_null(i) {
930+
contains.push(false);
931+
} else {
932+
contains.push(
933+
value
934+
.iter()
935+
.any(|scalar| check_scalar(partition_values.value(i), scalar)),
936+
);
937+
}
938+
}
939+
940+
Some(BooleanArray::from(contains))
902941
}
903942
}
904943
}

0 commit comments

Comments
 (0)