Skip to content

Commit adb635f

Browse files
committed
FIXUP - prune DF stats with the same predicate as files
Signed-off-by: Adrian Tanase <[email protected]> # Conflicts: # crates/core/src/delta_datafusion/mod.rs
1 parent 7ea7975 commit adb635f

File tree

2 files changed

+79
-9
lines changed

2 files changed

+79
-9
lines changed

crates/core/src/delta_datafusion/mod.rs

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,17 @@ use std::fmt::{self, Debug};
2727
use std::sync::Arc;
2828

2929
use arrow_array::types::UInt16Type;
30-
use arrow_array::{Array, DictionaryArray, RecordBatch, StringArray, TypedDictionaryArray};
30+
use arrow_array::{
31+
Array, BooleanArray, DictionaryArray, RecordBatch, StringArray, TypedDictionaryArray,
32+
};
3133
use arrow_cast::display::array_value_to_string;
3234
use arrow_cast::{cast_with_options, CastOptions};
3335
use arrow_schema::{
3436
ArrowError, DataType as ArrowDataType, Field, Schema as ArrowSchema, SchemaRef,
3537
SchemaRef as ArrowSchemaRef, TimeUnit,
3638
};
3739
use arrow_select::concat::concat_batches;
40+
use arrow_select::filter::filter_record_batch;
3841
use async_trait::async_trait;
3942
use chrono::{DateTime, TimeZone, Utc};
4043
use datafusion::catalog::{Session, TableProviderFactory};
@@ -87,7 +90,9 @@ use url::Url;
8790
use crate::delta_datafusion::expr::parse_predicate_expression;
8891
use crate::delta_datafusion::schema_adapter::DeltaSchemaAdapterFactory;
8992
use crate::errors::{DeltaResult, DeltaTableError};
90-
use crate::kernel::{Add, DataCheck, EagerSnapshot, Invariant, Snapshot, StructTypeExt};
93+
use crate::kernel::{
94+
Add, DataCheck, EagerSnapshot, Invariant, LogDataHandler, Snapshot, StructTypeExt,
95+
};
9196
use crate::logstore::LogStoreRef;
9297
use crate::table::builder::ensure_table_uri;
9398
use crate::table::state::DeltaTableState;
@@ -573,6 +578,8 @@ impl<'a> DeltaScanBuilder<'a> {
573578
.unwrap()
574579
});
575580

581+
let mut pruning_mask: Option<_> = None;
582+
576583
// Perform Pruning of files to scan
577584
let (files, files_scanned, files_pruned) = match self.files {
578585
Some(files) => {
@@ -592,7 +599,9 @@ impl<'a> DeltaScanBuilder<'a> {
592599
let files_to_prune = if let Some(predicate) = &logical_filter {
593600
let pruning_predicate =
594601
PruningPredicate::try_new(predicate.clone(), logical_schema.clone())?;
595-
pruning_predicate.prune(self.snapshot)?
602+
let mask = pruning_predicate.prune(self.snapshot)?;
603+
pruning_mask = Some(mask.clone());
604+
mask
596605
} else {
597606
vec![true; num_containers]
598607
};
@@ -695,10 +704,18 @@ impl<'a> DeltaScanBuilder<'a> {
695704
));
696705
}
697706

698-
let stats = self
699-
.snapshot
700-
.datafusion_table_statistics()
701-
.unwrap_or(Statistics::new_unknown(&schema));
707+
// FIXME - where is the correct place to marry file pruning with statistics pruning?
708+
// Temporarily re-generating the log handler, just so that we can compute the stats.
709+
// Should we update datafusion_table_statistics to optionally take the mask?
710+
let stats = if let Some(mask) = pruning_mask {
711+
let es = self.snapshot.snapshot();
712+
let pruned_stats = prune_file_statistics(&es.files, mask);
713+
LogDataHandler::new(&pruned_stats, es.metadata(), es.schema()).statistics()
714+
} else {
715+
self.snapshot.datafusion_table_statistics()
716+
};
717+
718+
let stats = stats.unwrap_or(Statistics::new_unknown(&schema));
702719

703720
let parquet_options = TableParquetOptions {
704721
global: self.session.config().options().execution.parquet.clone(),
@@ -756,6 +773,27 @@ impl<'a> DeltaScanBuilder<'a> {
756773
}
757774
}
758775

776+
fn prune_file_statistics(
777+
record_batches: &Vec<RecordBatch>,
778+
pruning_mask: Vec<bool>,
779+
) -> Vec<RecordBatch> {
780+
let mut filtered_batches = Vec::new();
781+
let mut mask_offset = 0;
782+
783+
for batch in record_batches {
784+
let num_rows = batch.num_rows();
785+
let batch_mask = &pruning_mask[mask_offset..mask_offset + num_rows];
786+
mask_offset += num_rows;
787+
788+
let boolean_mask = BooleanArray::from(batch_mask.to_vec());
789+
let filtered_batch =
790+
filter_record_batch(batch, &boolean_mask).expect("Failed to filter RecordBatch");
791+
filtered_batches.push(filtered_batch);
792+
}
793+
794+
filtered_batches
795+
}
796+
759797
// TODO: implement this for Snapshot, not for DeltaTable
760798
#[async_trait]
761799
impl TableProvider for DeltaTable {

crates/core/tests/integration_datafusion.rs

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,14 @@ use url::Url;
4141
mod local {
4242
use super::*;
4343
use datafusion::datasource::source::DataSourceExec;
44+
use datafusion::prelude::SessionConfig;
4445
use datafusion::{common::stats::Precision, datasource::provider_as_source};
46+
use datafusion_common::assert_contains;
4547
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
4648
use datafusion_expr::{
4749
LogicalPlan, LogicalPlanBuilder, TableProviderFilterPushDown, TableScan,
4850
};
51+
use datafusion_physical_plan::displayable;
4952
use deltalake_core::{
5053
delta_datafusion::DeltaLogicalCodec, logstore::default_logstore, writer::JsonWriter,
5154
};
@@ -212,6 +215,36 @@ mod local {
212215
}
213216
}
214217

218+
#[tokio::test]
219+
async fn test_datafusion_optimize_stats_partitioned_pushdown() -> Result<()> {
220+
let config = SessionConfig::new().with_target_partitions(2);
221+
let ctx = SessionContext::new_with_config(config);
222+
let table = open_table("../test/tests/data/http_requests").await?;
223+
ctx.register_table("http_requests", Arc::new(table.clone()))?;
224+
225+
let sql = "SELECT COUNT(*) as num_events FROM http_requests WHERE date > '2023-04-13'";
226+
let df = ctx.sql(sql).await?;
227+
let plan = df.clone().create_physical_plan().await?;
228+
229+
// convert to explain plan form
230+
let display = displayable(plan.as_ref()).indent(true).to_string();
231+
232+
assert_contains!(
233+
&display,
234+
"ProjectionExec: expr=[1437 as num_events]\n PlaceholderRowExec"
235+
);
236+
237+
let batches = df.collect().await?;
238+
let batch = &batches[0];
239+
240+
assert_eq!(
241+
batch.column(0).as_ref(),
242+
Arc::new(Int64Array::from(vec![1437])).as_ref(),
243+
);
244+
245+
Ok(())
246+
}
247+
215248
#[tokio::test]
216249
async fn test_datafusion_query_partitioned_pushdown() -> Result<()> {
217250
let ctx = SessionContext::new();
@@ -290,8 +323,7 @@ mod local {
290323
use datafusion::prelude::*;
291324
let ctx = SessionContext::new();
292325
let state = ctx.state();
293-
let table = open_table("../test/tests/data/delta-0.8.0")
294-
.await?;
326+
let table = open_table("../test/tests/data/delta-0.8.0").await?;
295327

296328
// Simple Equality test, we only exercise the limit in this test
297329
let e = col("value").eq(lit(2));

0 commit comments

Comments
 (0)