diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index 364c2e1c35..b0b7332c2f 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -816,6 +816,7 @@ mod test { &table .snapshot() .unwrap() + .snapshot() .input_schema() .unwrap() .as_ref() @@ -926,6 +927,7 @@ mod test { let actual_expr = table .snapshot() .unwrap() + .snapshot() .parse_predicate_expression(actual, &session.state()) .unwrap(); diff --git a/crates/core/src/delta_datafusion/find_files.rs b/crates/core/src/delta_datafusion/find_files.rs new file mode 100644 index 0000000000..ebfed00f8a --- /dev/null +++ b/crates/core/src/delta_datafusion/find_files.rs @@ -0,0 +1,301 @@ +use std::collections::HashMap; +use std::fmt::Debug; +use std::sync::Arc; + +use arrow_array::{Array, RecordBatch, StringArray}; +use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, Schema as ArrowSchema}; +use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; +use datafusion::datasource::MemTable; +use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; +use datafusion::logical_expr::{col, Expr, Volatility}; +use datafusion::physical_plan::filter::FilterExec; +use datafusion::physical_plan::limit::LocalLimitExec; +use datafusion::physical_plan::ExecutionPlan; +use itertools::Itertools; + +use crate::delta_datafusion::{ + df_logical_schema, get_path_column, DeltaScanBuilder, DeltaScanConfigBuilder, PATH_COLUMN, +}; +use crate::errors::{DeltaResult, DeltaTableError}; +use crate::kernel::{Add, EagerSnapshot}; +use crate::logstore::LogStoreRef; + +#[derive(Debug, Hash, Eq, PartialEq)] +/// Representing the result of the [find_files] function. +pub(crate) struct FindFiles { + /// A list of `Add` objects that match the given predicate + pub candidates: Vec, + /// Was a physical read to the datastore required to determine the candidates + pub partition_scan: bool, +} + +/// Finds files in a snapshot that match the provided predicate. +pub(crate) async fn find_files( + snapshot: &EagerSnapshot, + log_store: LogStoreRef, + state: &SessionState, + predicate: Option, +) -> DeltaResult { + let current_metadata = snapshot.metadata(); + + match &predicate { + Some(predicate) => { + // Validate the Predicate and determine if it only contains partition columns + let mut expr_properties = FindFilesExprProperties { + partition_only: true, + partition_columns: current_metadata.partition_columns().clone(), + result: Ok(()), + }; + + TreeNode::visit(predicate, &mut expr_properties)?; + expr_properties.result?; + + if expr_properties.partition_only { + let candidates = scan_memory_table(snapshot, predicate).await?; + Ok(FindFiles { + candidates, + partition_scan: true, + }) + } else { + let candidates = + find_files_scan(snapshot, log_store, state, predicate.to_owned()).await?; + + Ok(FindFiles { + candidates, + partition_scan: false, + }) + } + } + None => Ok(FindFiles { + candidates: snapshot.log_data().iter().map(|f| f.add_action()).collect(), + partition_scan: true, + }), + } +} + +struct FindFilesExprProperties { + pub partition_columns: Vec, + + pub partition_only: bool, + pub result: DeltaResult<()>, +} + +/// Ensure only expressions that make sense are accepted, check for +/// non-deterministic functions, and determine if the expression only contains +/// partition columns +impl TreeNodeVisitor<'_> for FindFilesExprProperties { + type Node = Expr; + + fn f_down(&mut self, expr: &Self::Node) -> datafusion::common::Result { + // TODO: We can likely relax the volatility to STABLE. Would require further + // research to confirm the same value is generated during the scan and + // rewrite phases. + + match expr { + Expr::Column(c) => { + if !self.partition_columns.contains(&c.name) { + self.partition_only = false; + } + } + Expr::ScalarVariable(_, _) + | Expr::Literal(_, _) + | Expr::Alias(_) + | Expr::BinaryExpr(_) + | Expr::Like(_) + | Expr::SimilarTo(_) + | Expr::Not(_) + | Expr::IsNotNull(_) + | Expr::IsNull(_) + | Expr::IsTrue(_) + | Expr::IsFalse(_) + | Expr::IsUnknown(_) + | Expr::IsNotTrue(_) + | Expr::IsNotFalse(_) + | Expr::IsNotUnknown(_) + | Expr::Negative(_) + | Expr::InList { .. } + | Expr::Between(_) + | Expr::Case(_) + | Expr::Cast(_) + | Expr::TryCast(_) => (), + Expr::ScalarFunction(scalar_function) => { + match scalar_function.func.signature().volatility { + Volatility::Immutable => (), + _ => { + self.result = Err(DeltaTableError::Generic(format!( + "Find files predicate contains nondeterministic function {}", + scalar_function.func.name() + ))); + return Ok(TreeNodeRecursion::Stop); + } + } + } + _ => { + self.result = Err(DeltaTableError::Generic(format!( + "Find files predicate contains unsupported expression {expr}" + ))); + return Ok(TreeNodeRecursion::Stop); + } + } + + Ok(TreeNodeRecursion::Continue) + } +} + +fn join_batches_with_add_actions( + batches: Vec, + mut actions: HashMap, + path_column: &str, + dict_array: bool, +) -> DeltaResult> { + // Given RecordBatches that contains `__delta_rs_path` perform a hash join + // with actions to obtain original add actions + + let mut files = Vec::with_capacity(batches.iter().map(|batch| batch.num_rows()).sum()); + for batch in batches { + let err = || DeltaTableError::Generic("Unable to obtain Delta-rs path column".to_string()); + + let iter: Box>> = if dict_array { + let array = get_path_column(&batch, path_column)?; + Box::new(array.into_iter()) + } else { + let array = batch + .column_by_name(path_column) + .ok_or_else(err)? + .as_any() + .downcast_ref::() + .ok_or_else(err)?; + Box::new(array.into_iter()) + }; + + for path in iter { + let path = path.ok_or(DeltaTableError::Generic(format!( + "{path_column} cannot be null" + )))?; + + match actions.remove(path) { + Some(action) => files.push(action), + None => { + return Err(DeltaTableError::Generic( + "Unable to map __delta_rs_path to action.".to_owned(), + )) + } + } + } + } + Ok(files) +} + +/// Determine which files contain a record that satisfies the predicate +async fn find_files_scan( + snapshot: &EagerSnapshot, + log_store: LogStoreRef, + state: &SessionState, + expression: Expr, +) -> DeltaResult> { + let candidate_map: HashMap = snapshot + .log_data() + .iter() + .map(|f| f.add_action()) + .map(|add| (add.path.clone(), add.to_owned())) + .collect(); + + let scan_config = DeltaScanConfigBuilder::default() + .with_file_column(true) + .build(snapshot)?; + + let logical_schema = df_logical_schema(snapshot, &scan_config.file_column_name, None)?; + + // Identify which columns we need to project + let mut used_columns = expression + .column_refs() + .into_iter() + .map(|column| logical_schema.index_of(&column.name)) + .collect::, ArrowError>>()?; + // Add path column + used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?); + + let scan = DeltaScanBuilder::new(snapshot, log_store, state) + .with_filter(Some(expression.clone())) + .with_projection(Some(&used_columns)) + .with_scan_config(scan_config) + .build() + .await?; + let scan = Arc::new(scan); + + let config = &scan.config; + let input_schema = scan.logical_schema.as_ref().to_owned(); + let input_dfschema = input_schema.clone().try_into()?; + + let predicate_expr = + state.create_physical_expr(Expr::IsTrue(Box::new(expression.clone())), &input_dfschema)?; + + let filter: Arc = + Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?); + let limit: Arc = Arc::new(LocalLimitExec::new(filter, 1)); + + let task_ctx = Arc::new(TaskContext::from(state)); + let path_batches = datafusion::physical_plan::collect(limit, task_ctx).await?; + + join_batches_with_add_actions( + path_batches, + candidate_map, + config.file_column_name.as_ref().unwrap(), + true, + ) +} + +async fn scan_memory_table(snapshot: &EagerSnapshot, predicate: &Expr) -> DeltaResult> { + let actions = snapshot + .log_data() + .iter() + .map(|f| f.add_action()) + .collect_vec(); + + let batch = snapshot.add_actions_table(true)?; + let mut arrays = Vec::new(); + let mut fields = Vec::new(); + + let schema = batch.schema(); + + arrays.push( + batch + .column_by_name("path") + .ok_or(DeltaTableError::Generic( + "Column with name `path` does not exist".to_owned(), + ))? + .to_owned(), + ); + fields.push(Field::new(PATH_COLUMN, ArrowDataType::Utf8, false)); + + for field in schema.fields() { + if field.name().starts_with("partition.") { + let name = field.name().strip_prefix("partition.").unwrap(); + + arrays.push(batch.column_by_name(field.name()).unwrap().to_owned()); + fields.push(Field::new( + name, + field.data_type().to_owned(), + field.is_nullable(), + )); + } + } + + let schema = Arc::new(ArrowSchema::new(fields)); + let batch = RecordBatch::try_new(schema, arrays)?; + let mem_table = MemTable::try_new(batch.schema(), vec![vec![batch]])?; + + let ctx = SessionContext::new(); + let mut df = ctx.read_table(Arc::new(mem_table))?; + df = df + .filter(predicate.to_owned())? + .select(vec![col(PATH_COLUMN)])?; + let batches = df.collect().await?; + + let map = actions + .into_iter() + .map(|action| (action.path.clone(), action)) + .collect::>(); + + join_batches_with_add_actions(batches, map, PATH_COLUMN, false) +} diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index a656b235e2..04444a1e8e 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -23,7 +23,6 @@ //! }; //! ``` -use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; @@ -32,28 +31,24 @@ use arrow_array::{Array, DictionaryArray, RecordBatch, StringArray, TypedDiction use arrow_cast::display::array_value_to_string; use arrow_cast::{cast_with_options, CastOptions}; use arrow_schema::{ - ArrowError, DataType as ArrowDataType, Field, Schema as ArrowSchema, SchemaRef, + DataType as ArrowDataType, Field, Schema as ArrowSchema, SchemaRef, SchemaRef as ArrowSchemaRef, TimeUnit, }; -use arrow_select::concat::concat_batches; use async_trait::async_trait; use datafusion::catalog::{Session, TableProviderFactory}; use datafusion::common::scalar::ScalarValue; -use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion::common::{ Column, DFSchema, DataFusionError, Result as DataFusionResult, TableReference, ToDFSchema, }; use datafusion::datasource::physical_plan::wrap_partition_type_in_dict; use datafusion::datasource::{MemTable, TableProvider}; -use datafusion::execution::context::{SessionConfig, SessionContext, SessionState, TaskContext}; +use datafusion::execution::context::{SessionConfig, SessionContext, SessionState}; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::execution::FunctionRegistry; use datafusion::logical_expr::logical_plan::CreateExternalTable; use datafusion::logical_expr::utils::conjunction; -use datafusion::logical_expr::{col, Expr, Extension, LogicalPlan, Volatility}; +use datafusion::logical_expr::{Expr, Extension, LogicalPlan}; use datafusion::physical_optimizer::pruning::PruningPredicate; -use datafusion::physical_plan::filter::FilterExec; -use datafusion::physical_plan::limit::LocalLimitExec; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::{ExecutionPlan, Statistics}; use datafusion::sql::planner::ParserOptions; @@ -62,7 +57,6 @@ use datafusion_proto::physical_plan::PhysicalExtensionCodec; use delta_kernel::engine::arrow_conversion::TryIntoArrow as _; use delta_kernel::table_configuration::TableConfiguration; use either::Either; -use futures::TryStreamExt; use itertools::Itertools; use url::Url; @@ -79,10 +73,13 @@ use crate::table::state::DeltaTableState; use crate::table::{Constraint, GeneratedColumn}; use crate::{open_table, open_table_with_storage_options, DeltaTable}; +pub(crate) use find_files::*; + pub(crate) const PATH_COLUMN: &str = "__delta_rs_path"; pub mod cdf; pub mod expr; +mod find_files; pub mod logical; pub mod physical; pub mod planner; @@ -189,24 +186,6 @@ impl DataFusionMixins for EagerSnapshot { } } -impl DataFusionMixins for DeltaTableState { - fn arrow_schema(&self) -> DeltaResult { - self.snapshot.arrow_schema() - } - - fn input_schema(&self) -> DeltaResult { - self.snapshot.input_schema() - } - - fn parse_predicate_expression( - &self, - expr: impl AsRef, - df_state: &SessionState, - ) -> DeltaResult { - self.snapshot.parse_predicate_expression(expr, df_state) - } -} - fn _arrow_schema( snapshot: &TableConfiguration, wrap_partitions: bool, @@ -484,30 +463,6 @@ pub(crate) fn to_correct_scalar_value( } } -pub(crate) async fn execute_plan_to_batch( - state: &SessionState, - plan: Arc, -) -> DeltaResult { - let data = futures::future::try_join_all( - (0..plan.properties().output_partitioning().partition_count()).map(|p| { - let plan_copy = plan.clone(); - let task_context = state.task_ctx().clone(); - async move { - let batch_stream = plan_copy.execute(p, task_context)?; - - let schema = batch_stream.schema(); - - let batches = batch_stream.try_collect::>().await?; - - DataFusionResult::<_>::Ok(concat_batches(&schema, batches.iter())?) - } - }), - ) - .await?; - - Ok(concat_batches(&plan.schema(), data.iter())?) -} - /// Responsible for checking batches of data conform to table's invariants, constraints and nullability. #[derive(Clone, Default)] pub struct DeltaDataChecker { @@ -813,292 +768,6 @@ impl TableProviderFactory for DeltaTableFactory { } } -pub(crate) struct FindFilesExprProperties { - pub partition_columns: Vec, - - pub partition_only: bool, - pub result: DeltaResult<()>, -} - -/// Ensure only expressions that make sense are accepted, check for -/// non-deterministic functions, and determine if the expression only contains -/// partition columns -impl TreeNodeVisitor<'_> for FindFilesExprProperties { - type Node = Expr; - - fn f_down(&mut self, expr: &Self::Node) -> datafusion::common::Result { - // TODO: We can likely relax the volatility to STABLE. Would require further - // research to confirm the same value is generated during the scan and - // rewrite phases. - - match expr { - Expr::Column(c) => { - if !self.partition_columns.contains(&c.name) { - self.partition_only = false; - } - } - Expr::ScalarVariable(_, _) - | Expr::Literal(_, _) - | Expr::Alias(_) - | Expr::BinaryExpr(_) - | Expr::Like(_) - | Expr::SimilarTo(_) - | Expr::Not(_) - | Expr::IsNotNull(_) - | Expr::IsNull(_) - | Expr::IsTrue(_) - | Expr::IsFalse(_) - | Expr::IsUnknown(_) - | Expr::IsNotTrue(_) - | Expr::IsNotFalse(_) - | Expr::IsNotUnknown(_) - | Expr::Negative(_) - | Expr::InList { .. } - | Expr::Between(_) - | Expr::Case(_) - | Expr::Cast(_) - | Expr::TryCast(_) => (), - Expr::ScalarFunction(scalar_function) => { - match scalar_function.func.signature().volatility { - Volatility::Immutable => (), - _ => { - self.result = Err(DeltaTableError::Generic(format!( - "Find files predicate contains nondeterministic function {}", - scalar_function.func.name() - ))); - return Ok(TreeNodeRecursion::Stop); - } - } - } - _ => { - self.result = Err(DeltaTableError::Generic(format!( - "Find files predicate contains unsupported expression {expr}" - ))); - return Ok(TreeNodeRecursion::Stop); - } - } - - Ok(TreeNodeRecursion::Continue) - } -} - -#[derive(Debug, Hash, Eq, PartialEq)] -/// Representing the result of the [find_files] function. -pub struct FindFiles { - /// A list of `Add` objects that match the given predicate - pub candidates: Vec, - /// Was a physical read to the datastore required to determine the candidates - pub partition_scan: bool, -} - -fn join_batches_with_add_actions( - batches: Vec, - mut actions: HashMap, - path_column: &str, - dict_array: bool, -) -> DeltaResult> { - // Given RecordBatches that contains `__delta_rs_path` perform a hash join - // with actions to obtain original add actions - - let mut files = Vec::with_capacity(batches.iter().map(|batch| batch.num_rows()).sum()); - for batch in batches { - let err = || DeltaTableError::Generic("Unable to obtain Delta-rs path column".to_string()); - - let iter: Box>> = if dict_array { - let array = get_path_column(&batch, path_column)?; - Box::new(array.into_iter()) - } else { - let array = batch - .column_by_name(path_column) - .ok_or_else(err)? - .as_any() - .downcast_ref::() - .ok_or_else(err)?; - Box::new(array.into_iter()) - }; - - for path in iter { - let path = path.ok_or(DeltaTableError::Generic(format!( - "{path_column} cannot be null" - )))?; - - match actions.remove(path) { - Some(action) => files.push(action), - None => { - return Err(DeltaTableError::Generic( - "Unable to map __delta_rs_path to action.".to_owned(), - )) - } - } - } - } - Ok(files) -} - -/// Determine which files contain a record that satisfies the predicate -pub(crate) async fn find_files_scan( - snapshot: &EagerSnapshot, - log_store: LogStoreRef, - state: &SessionState, - expression: Expr, -) -> DeltaResult> { - let candidate_map: HashMap = snapshot - .log_data() - .iter() - .map(|f| f.add_action()) - .map(|add| (add.path.clone(), add.to_owned())) - .collect(); - - let scan_config = DeltaScanConfigBuilder { - include_file_column: true, - ..Default::default() - } - .build(snapshot)?; - - let logical_schema = df_logical_schema(snapshot, &scan_config.file_column_name, None)?; - - // Identify which columns we need to project - let mut used_columns = expression - .column_refs() - .into_iter() - .map(|column| logical_schema.index_of(&column.name)) - .collect::, ArrowError>>()?; - // Add path column - used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?); - - let scan = DeltaScanBuilder::new(snapshot, log_store, state) - .with_filter(Some(expression.clone())) - .with_projection(Some(&used_columns)) - .with_scan_config(scan_config) - .build() - .await?; - let scan = Arc::new(scan); - - let config = &scan.config; - let input_schema = scan.logical_schema.as_ref().to_owned(); - let input_dfschema = input_schema.clone().try_into()?; - - let predicate_expr = - state.create_physical_expr(Expr::IsTrue(Box::new(expression.clone())), &input_dfschema)?; - - let filter: Arc = - Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?); - let limit: Arc = Arc::new(LocalLimitExec::new(filter, 1)); - - let task_ctx = Arc::new(TaskContext::from(state)); - let path_batches = datafusion::physical_plan::collect(limit, task_ctx).await?; - - join_batches_with_add_actions( - path_batches, - candidate_map, - config.file_column_name.as_ref().unwrap(), - true, - ) -} - -pub(crate) async fn scan_memory_table( - log_store: &dyn LogStore, - snapshot: &EagerSnapshot, - predicate: &Expr, -) -> DeltaResult> { - let actions = snapshot - .log_data() - .iter() - .map(|f| f.add_action()) - .collect_vec(); - - let batch = snapshot.add_actions_table(true)?; - let mut arrays = Vec::new(); - let mut fields = Vec::new(); - - let schema = batch.schema(); - - arrays.push( - batch - .column_by_name("path") - .ok_or(DeltaTableError::Generic( - "Column with name `path` does not exist".to_owned(), - ))? - .to_owned(), - ); - fields.push(Field::new(PATH_COLUMN, ArrowDataType::Utf8, false)); - - for field in schema.fields() { - if field.name().starts_with("partition.") { - let name = field.name().strip_prefix("partition.").unwrap(); - - arrays.push(batch.column_by_name(field.name()).unwrap().to_owned()); - fields.push(Field::new( - name, - field.data_type().to_owned(), - field.is_nullable(), - )); - } - } - - let schema = Arc::new(ArrowSchema::new(fields)); - let batch = RecordBatch::try_new(schema, arrays)?; - let mem_table = MemTable::try_new(batch.schema(), vec![vec![batch]])?; - - let ctx = SessionContext::new(); - let mut df = ctx.read_table(Arc::new(mem_table))?; - df = df - .filter(predicate.to_owned())? - .select(vec![col(PATH_COLUMN)])?; - let batches = df.collect().await?; - - let map = actions - .into_iter() - .map(|action| (action.path.clone(), action)) - .collect::>(); - - join_batches_with_add_actions(batches, map, PATH_COLUMN, false) -} - -/// Finds files in a snapshot that match the provided predicate. -pub async fn find_files( - snapshot: &EagerSnapshot, - log_store: LogStoreRef, - state: &SessionState, - predicate: Option, -) -> DeltaResult { - let current_metadata = snapshot.metadata(); - - match &predicate { - Some(predicate) => { - // Validate the Predicate and determine if it only contains partition columns - let mut expr_properties = FindFilesExprProperties { - partition_only: true, - partition_columns: current_metadata.partition_columns().clone(), - result: Ok(()), - }; - - TreeNode::visit(predicate, &mut expr_properties)?; - expr_properties.result?; - - if expr_properties.partition_only { - let candidates = scan_memory_table(&log_store, snapshot, predicate).await?; - Ok(FindFiles { - candidates, - partition_scan: true, - }) - } else { - let candidates = - find_files_scan(snapshot, log_store, state, predicate.to_owned()).await?; - - Ok(FindFiles { - candidates, - partition_scan: false, - }) - } - } - None => Ok(FindFiles { - candidates: snapshot.log_data().iter().map(|f| f.add_action()).collect(), - partition_scan: true, - }), - } -} - /// A wrapper for sql_parser's ParserOptions to capture sane default table defaults pub struct DeltaParserOptions { inner: ParserOptions, @@ -1221,6 +890,7 @@ mod tests { use datafusion::logical_expr::lit; use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::{visit_execution_plan, ExecutionPlanVisitor, PhysicalExpr}; + use datafusion::prelude::col; use datafusion_proto::physical_plan::AsExecutionPlan; use datafusion_proto::protobuf; use delta_kernel::path::{LogPathFileType, ParsedLogPath}; diff --git a/crates/core/src/delta_datafusion/table_provider.rs b/crates/core/src/delta_datafusion/table_provider.rs index 17eb2ce5a5..5f2743acac 100644 --- a/crates/core/src/delta_datafusion/table_provider.rs +++ b/crates/core/src/delta_datafusion/table_provider.rs @@ -720,7 +720,7 @@ impl TableProvider for DeltaTable { } fn schema(&self) -> Arc { - self.snapshot().unwrap().arrow_schema().unwrap() + self.snapshot().unwrap().snapshot().arrow_schema().unwrap() } fn table_type(&self) -> TableType { diff --git a/crates/core/src/kernel/transaction/state.rs b/crates/core/src/kernel/transaction/state.rs index b7603d7249..f4170baeb7 100644 --- a/crates/core/src/kernel/transaction/state.rs +++ b/crates/core/src/kernel/transaction/state.rs @@ -277,17 +277,21 @@ mod tests { // parses simple expression let parsed = snapshot + .snapshot() .parse_predicate_expression("value > 10", &state) .unwrap(); let expected = col("value").gt(lit::(10)); assert_eq!(parsed, expected); // fails for unknown column - let parsed = snapshot.parse_predicate_expression("non_existent > 10", &state); + let parsed = snapshot + .snapshot() + .parse_predicate_expression("non_existent > 10", &state); assert!(parsed.is_err()); // parses complex expression let parsed = snapshot + .snapshot() .parse_predicate_expression("value > 10 OR value <= 0", &state) .unwrap(); let expected = col("value") diff --git a/crates/core/src/operations/load.rs b/crates/core/src/operations/load.rs index 8bcdf66eb1..cd2dcf9be9 100644 --- a/crates/core/src/operations/load.rs +++ b/crates/core/src/operations/load.rs @@ -70,7 +70,7 @@ impl std::future::IntoFuture for LoadBuilder { snapshot: this.snapshot, }, ); - let schema = table.snapshot()?.arrow_schema()?; + let schema = table.snapshot()?.snapshot().arrow_schema()?; let projection = this .columns .map(|cols| { diff --git a/crates/core/src/operations/merge/filter.rs b/crates/core/src/operations/merge/filter.rs index d6bf31ed35..b74aab9246 100644 --- a/crates/core/src/operations/merge/filter.rs +++ b/crates/core/src/operations/merge/filter.rs @@ -1,19 +1,19 @@ //! Utility functions to determine early filters for file/partition pruning -use datafusion::functions_aggregate::expr_fn::{max, min}; use std::collections::HashMap; +use std::sync::Arc; +use arrow::compute::concat_batches; use datafusion::common::tree_node::{Transformed, TreeNode}; use datafusion::common::{ScalarValue, TableReference}; use datafusion::execution::context::SessionState; +use datafusion::functions_aggregate::expr_fn::{max, min}; use datafusion::logical_expr::expr::{InList, Placeholder}; -use datafusion::logical_expr::{lit, Aggregate, BinaryExpr, LogicalPlan, Operator}; -use datafusion::logical_expr::{Between, Expr}; - +use datafusion::logical_expr::{lit, Aggregate, Between, BinaryExpr, Expr, LogicalPlan, Operator}; +use datafusion::physical_plan::ExecutionPlan; use either::{Left, Right}; - +use futures::TryStreamExt as _; use itertools::Itertools; -use crate::delta_datafusion::execute_plan_to_batch; use crate::kernel::EagerSnapshot; use crate::{DeltaResult, DeltaTableError}; @@ -395,6 +395,28 @@ pub(crate) async fn try_construct_early_filter( } } } + +async fn execute_plan_to_batch( + state: &SessionState, + plan: Arc, +) -> DeltaResult { + let data = futures::future::try_join_all( + (0..plan.properties().output_partitioning().partition_count()).map(|p| { + let plan_copy = plan.clone(); + let task_context = state.task_ctx().clone(); + async move { + let batch_stream = plan_copy.execute(p, task_context)?; + let schema = batch_stream.schema(); + let batches = batch_stream.try_collect::>().await?; + datafusion::error::Result::<_>::Ok(concat_batches(&schema, batches.iter())?) + } + }), + ) + .await?; + + Ok(concat_batches(&plan.schema(), data.iter())?) +} + #[cfg(test)] mod tests { use crate::operations::merge::tests::setup_table;