diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index d785615033..efa0a3d907 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -49,7 +49,7 @@ use datafusion::sql::sqlparser::parser::Parser; use datafusion::sql::sqlparser::tokenizer::Tokenizer; use tracing::log::*; -use super::DeltaParserOptions; +use crate::delta_datafusion::session::DeltaParserOptions; use crate::{DeltaResult, DeltaTableError}; /// This struct is like Datafusion's MakeArray but ensures that `element` is used rather than `item diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index f9e21489aa..c53593be74 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -26,8 +26,8 @@ use std::fmt::Debug; use std::sync::Arc; -use arrow_array::types::UInt16Type; -use arrow_array::{Array, DictionaryArray, RecordBatch, StringArray, TypedDictionaryArray}; +use arrow::array::types::UInt16Type; +use arrow::array::{Array, DictionaryArray, RecordBatch, StringArray, TypedDictionaryArray}; use arrow_cast::display::array_value_to_string; use arrow_cast::{cast_with_options, CastOptions}; use arrow_schema::{ @@ -41,7 +41,7 @@ use datafusion::common::{ }; use datafusion::datasource::physical_plan::wrap_partition_type_in_dict; use datafusion::datasource::{MemTable, TableProvider}; -use datafusion::execution::context::{SessionConfig, SessionContext}; +use datafusion::execution::context::SessionContext; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::execution::FunctionRegistry; use datafusion::logical_expr::logical_plan::CreateExternalTable; @@ -50,7 +50,6 @@ use datafusion::logical_expr::{Expr, Extension, LogicalPlan}; use datafusion::physical_optimizer::pruning::PruningPredicate; use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; use datafusion::physical_plan::{ExecutionPlan, Statistics}; -use datafusion::sql::planner::ParserOptions; use datafusion_proto::logical_plan::LogicalExtensionCodec; use datafusion_proto::physical_plan::PhysicalExtensionCodec; use delta_kernel::engine::arrow_conversion::TryIntoArrow as _; @@ -71,6 +70,7 @@ use crate::table::state::DeltaTableState; use crate::table::{Constraint, GeneratedColumn}; use crate::{open_table, open_table_with_storage_options, DeltaTable}; +pub use self::session::*; pub(crate) use find_files::*; pub(crate) const PATH_COLUMN: &str = "__delta_rs_path"; @@ -83,6 +83,7 @@ pub mod logical; pub mod physical; pub mod planner; mod schema_adapter; +mod session; mod table_provider; pub use cdf::scan::DeltaCdfTableProvider; @@ -125,7 +126,7 @@ pub trait DataFusionMixins { fn parse_predicate_expression( &self, expr: impl AsRef, - session: &impl Session, + session: &dyn Session, ) -> DeltaResult; } @@ -149,7 +150,7 @@ impl DataFusionMixins for Snapshot { fn parse_predicate_expression( &self, expr: impl AsRef, - session: &impl Session, + session: &dyn Session, ) -> DeltaResult { let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?; parse_predicate_expression(&schema, expr, session) @@ -188,7 +189,7 @@ impl DataFusionMixins for LogDataHandler<'_> { fn parse_predicate_expression( &self, expr: impl AsRef, - session: &impl Session, + session: &dyn Session, ) -> DeltaResult { let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?; parse_predicate_expression(&schema, expr, session) @@ -207,7 +208,7 @@ impl DataFusionMixins for EagerSnapshot { fn parse_predicate_expression( &self, expr: impl AsRef, - session: &impl Session, + session: &dyn Session, ) -> DeltaResult { self.snapshot().parse_predicate_expression(expr, session) } @@ -792,67 +793,6 @@ impl TableProviderFactory for DeltaTableFactory { } } -/// A wrapper for sql_parser's ParserOptions to capture sane default table defaults -pub struct DeltaParserOptions { - inner: ParserOptions, -} - -impl Default for DeltaParserOptions { - fn default() -> Self { - DeltaParserOptions { - inner: ParserOptions { - enable_ident_normalization: false, - ..ParserOptions::default() - }, - } - } -} - -impl From for ParserOptions { - fn from(value: DeltaParserOptions) -> Self { - value.inner - } -} - -/// A wrapper for Deltafusion's SessionConfig to capture sane default table defaults -pub struct DeltaSessionConfig { - inner: SessionConfig, -} - -impl Default for DeltaSessionConfig { - fn default() -> Self { - DeltaSessionConfig { - inner: SessionConfig::default() - .set_bool("datafusion.sql_parser.enable_ident_normalization", false), - } - } -} - -impl From for SessionConfig { - fn from(value: DeltaSessionConfig) -> Self { - value.inner - } -} - -/// A wrapper for Deltafusion's SessionContext to capture sane default table defaults -pub struct DeltaSessionContext { - inner: SessionContext, -} - -impl Default for DeltaSessionContext { - fn default() -> Self { - DeltaSessionContext { - inner: SessionContext::new_with_config(DeltaSessionConfig::default().into()), - } - } -} - -impl From for SessionContext { - fn from(value: DeltaSessionContext) -> Self { - value.inner - } -} - /// A wrapper for Deltafusion's Column to preserve case-sensitivity during string conversion pub struct DeltaColumn { inner: Column, @@ -914,7 +854,7 @@ mod tests { use datafusion::logical_expr::lit; use datafusion::physical_plan::empty::EmptyExec; use datafusion::physical_plan::{visit_execution_plan, ExecutionPlanVisitor, PhysicalExpr}; - use datafusion::prelude::col; + use datafusion::prelude::{col, SessionConfig}; use datafusion_proto::physical_plan::AsExecutionPlan; use datafusion_proto::protobuf; use delta_kernel::path::{LogPathFileType, ParsedLogPath}; diff --git a/crates/core/src/delta_datafusion/session.rs b/crates/core/src/delta_datafusion/session.rs new file mode 100644 index 0000000000..0b6f08453e --- /dev/null +++ b/crates/core/src/delta_datafusion/session.rs @@ -0,0 +1,101 @@ +use datafusion::{ + catalog::Session, + common::{exec_datafusion_err, Result as DataFusionResult}, + execution::{SessionState, SessionStateBuilder}, + prelude::{SessionConfig, SessionContext}, + sql::planner::ParserOptions, +}; + +use crate::delta_datafusion::planner::DeltaPlanner; + +pub fn create_session() -> DeltaSessionContext { + DeltaSessionContext::default() +} + +// Given a `Session` reference, get the concrete `SessionState` reference +// Note: this may stop working in future versions, +#[deprecated( + since = "0.29.1", + note = "Stop gap to get rid of all explicit session state references" +)] +pub(crate) fn session_state_from_session(session: &dyn Session) -> DataFusionResult<&SessionState> { + session + .as_any() + .downcast_ref::() + .ok_or_else(|| exec_datafusion_err!("Failed to downcast Session to SessionState")) +} + +/// A wrapper for sql_parser's ParserOptions to capture sane default table defaults +pub struct DeltaParserOptions { + inner: ParserOptions, +} + +impl Default for DeltaParserOptions { + fn default() -> Self { + DeltaParserOptions { + inner: ParserOptions { + enable_ident_normalization: false, + ..ParserOptions::default() + }, + } + } +} + +impl From for ParserOptions { + fn from(value: DeltaParserOptions) -> Self { + value.inner + } +} + +/// A wrapper for Deltafusion's SessionConfig to capture sane default table defaults +pub struct DeltaSessionConfig { + inner: SessionConfig, +} + +impl Default for DeltaSessionConfig { + fn default() -> Self { + DeltaSessionConfig { + inner: SessionConfig::default() + .set_bool("datafusion.sql_parser.enable_ident_normalization", false), + } + } +} + +impl From for SessionConfig { + fn from(value: DeltaSessionConfig) -> Self { + value.inner + } +} + +/// A wrapper for Deltafusion's SessionContext to capture sane default table defaults +pub struct DeltaSessionContext { + inner: SessionContext, +} + +impl DeltaSessionContext { + pub fn new() -> Self { + let ctx = SessionContext::new_with_config(DeltaSessionConfig::default().into()); + let planner = DeltaPlanner::new(); + let state = SessionStateBuilder::new_from_existing(ctx.state()) + .with_query_planner(planner) + .build(); + let inner = SessionContext::new_with_state(state); + Self { inner } + } + + pub fn into_inner(self) -> SessionContext { + self.inner + } +} + +impl Default for DeltaSessionContext { + fn default() -> Self { + Self::new() + } +} + +impl From for SessionContext { + fn from(value: DeltaSessionContext) -> Self { + value.inner + } +} diff --git a/crates/core/src/logstore/factories.rs b/crates/core/src/logstore/factories.rs index d4203b2a9c..6a000a0f2b 100644 --- a/crates/core/src/logstore/factories.rs +++ b/crates/core/src/logstore/factories.rs @@ -66,9 +66,9 @@ fn default_parse_url_opts( /// Access global registry of object store factories pub fn object_store_factories() -> ObjectStoreFactoryRegistry { static REGISTRY: OnceLock = OnceLock::new(); - let factory = Arc::new(DefaultObjectStoreFactory::default()); REGISTRY .get_or_init(|| { + let factory = Arc::new(DefaultObjectStoreFactory::default()); let registry = ObjectStoreFactoryRegistry::default(); registry.insert(Url::parse("memory://").unwrap(), factory.clone()); registry.insert(Url::parse("file://").unwrap(), factory); diff --git a/crates/core/src/operations/constraints.rs b/crates/core/src/operations/constraints.rs index fcd60985ad..5fc4372a42 100644 --- a/crates/core/src/operations/constraints.rs +++ b/crates/core/src/operations/constraints.rs @@ -4,9 +4,8 @@ use std::sync::Arc; use datafusion::catalog::Session; use datafusion::common::ToDFSchema; -use datafusion::execution::{SendableRecordBatchStream, SessionState}; +use datafusion::execution::SendableRecordBatchStream; use datafusion::physical_plan::ExecutionPlan; -use datafusion::prelude::SessionContext; use delta_kernel::table_features::WriterFeature; use futures::future::BoxFuture; use futures::StreamExt; @@ -14,9 +13,7 @@ use futures::StreamExt; use super::datafusion_utils::into_expr; use super::{CustomExecuteHandler, Operation}; use crate::delta_datafusion::expr::fmt_expr_to_sql; -use crate::delta_datafusion::{ - register_store, DeltaDataChecker, DeltaScanBuilder, DeltaSessionContext, -}; +use crate::delta_datafusion::{create_session, register_store, DeltaDataChecker, DeltaScanBuilder}; use crate::kernel::transaction::{CommitBuilder, CommitProperties}; use crate::kernel::{EagerSnapshot, MetadataExt, ProtocolExt as _, ProtocolInner}; use crate::logstore::LogStoreRef; @@ -132,19 +129,16 @@ impl std::future::IntoFuture for ConstraintBuilder { let session = this .session - .and_then(|session| session.as_any().downcast_ref::().cloned()) - .unwrap_or_else(|| { - let session: SessionContext = DeltaSessionContext::default().into(); - session.state() - }); + .unwrap_or_else(|| Arc::new(create_session().into_inner().state())); register_store(this.log_store.clone(), session.runtime_env().as_ref()); - let scan = DeltaScanBuilder::new(&this.snapshot, this.log_store.clone(), &session) - .build() - .await?; + let scan = + DeltaScanBuilder::new(&this.snapshot, this.log_store.clone(), session.as_ref()) + .build() + .await?; let schema = scan.schema().to_dfschema()?; - let expr = into_expr(expr, &schema, &session)?; + let expr = into_expr(expr, &schema, session.as_ref())?; let expr_str = fmt_expr_to_sql(&expr)?; // Checker built here with the one time constraint to check. @@ -156,9 +150,8 @@ impl std::future::IntoFuture for ConstraintBuilder { for p in 0..plan.properties().output_partitioning().partition_count() { let inner_plan = plan.clone(); let inner_checker = checker.clone(); - let task_ctx = Arc::new((&session).into()); let mut record_stream: SendableRecordBatchStream = - inner_plan.execute(p, task_ctx)?; + inner_plan.execute(p, session.task_ctx())?; let handle: tokio::task::JoinHandle> = tokio::task::spawn(async move { while let Some(maybe_batch) = record_stream.next().await { diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 9799a9d078..c9284cd5a0 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -23,8 +23,7 @@ use datafusion::common::ScalarValue; use datafusion::dataframe::DataFrame; use datafusion::datasource::provider_as_source; use datafusion::error::Result as DataFusionResult; -use datafusion::execution::context::{SessionContext, SessionState}; -use datafusion::execution::session_state::SessionStateBuilder; +use datafusion::execution::context::SessionState; use datafusion::logical_expr::{ lit, Extension, LogicalPlan, LogicalPlanBuilder, UserDefinedLogicalNode, }; @@ -47,10 +46,9 @@ use super::Operation; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::logical::MetricObserver; use crate::delta_datafusion::physical::{find_metric_node, get_metric, MetricObserverExec}; -use crate::delta_datafusion::planner::DeltaPlanner; use crate::delta_datafusion::{ - find_files, register_store, DataFusionMixins, DeltaScanConfigBuilder, DeltaSessionContext, - DeltaTableProvider, + create_session, find_files, register_store, session_state_from_session, DataFusionMixins, + DeltaScanConfigBuilder, DeltaTableProvider, }; use crate::errors::DeltaResult; use crate::kernel::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; @@ -170,6 +168,62 @@ impl DeleteBuilder { } } +impl std::future::IntoFuture for DeleteBuilder { + type Output = DeltaResult<(DeltaTable, DeleteMetrics)>; + type IntoFuture = BoxFuture<'static, Self::Output>; + + fn into_future(self) -> Self::IntoFuture { + let this = self; + + Box::pin(async move { + PROTOCOL.check_append_only(&this.snapshot)?; + PROTOCOL.can_write_to(&this.snapshot)?; + + let operation_id = this.get_operation_id(); + this.pre_execute(operation_id).await?; + + let session = this + .session + .unwrap_or_else(|| Arc::new(create_session().into_inner().state())); + + register_store(this.log_store.clone(), session.runtime_env().as_ref()); + + let predicate = match this.predicate { + Some(predicate) => match predicate { + Expression::DataFusion(expr) => Some(expr), + Expression::String(s) => Some( + this.snapshot + .parse_predicate_expression(s, session.as_ref())?, + ), + }, + None => None, + }; + + let (new_snapshot, metrics) = execute( + predicate, + this.log_store.clone(), + this.snapshot, + session.as_ref(), + this.writer_properties, + this.commit_properties, + operation_id, + this.custom_execute_handler.as_ref(), + ) + .await?; + + Ok(( + DeltaTable::new_with_state( + this.log_store, + DeltaTableState { + snapshot: new_snapshot, + }, + ), + metrics, + )) + }) + } +} + #[derive(Clone, Debug)] pub(crate) struct DeleteMetricExtensionPlanner {} @@ -210,7 +264,7 @@ impl ExtensionPlanner for DeleteMetricExtensionPlanner { async fn execute_non_empty_expr( snapshot: &EagerSnapshot, log_store: LogStoreRef, - session: &SessionState, + session: &dyn Session, expression: &Expr, rewrite: &[Add], metrics: &mut DeleteMetrics, @@ -223,12 +277,6 @@ async fn execute_non_empty_expr( let mut actions: Vec = Vec::new(); let table_partition_cols = snapshot.metadata().partition_columns().clone(); - let delete_planner = DeltaPlanner::new(); - - let state = SessionStateBuilder::new_from_existing(session.clone()) - .with_query_planner(delete_planner) - .build(); - let scan_config = DeltaScanConfigBuilder::default() .with_file_column(false) .with_schema(snapshot.input_schema()) @@ -249,8 +297,6 @@ async fn execute_non_empty_expr( }), }); - let df = DataFrame::new(state.clone(), source); - let writer_stats_config = WriterStatsConfig::new( snapshot.table_properties().num_indexed_cols(), snapshot @@ -263,16 +309,14 @@ async fn execute_non_empty_expr( if !partition_scan { // Apply the negation of the filter and rewrite files let negated_expression = Expr::Not(Box::new(Expr::IsTrue(Box::new(expression.clone())))); - - let filter = df - .clone() + let filter = LogicalPlanBuilder::from(source.clone()) .filter(negated_expression)? - .create_physical_plan() - .await?; + .build()?; + let filter = session.create_physical_plan(&filter).await?; let add_actions: Vec = write_execution_plan( Some(snapshot), - state.clone(), + session, filter.clone(), table_partition_cols.clone(), log_store.object_store(Some(operation_id)), @@ -300,7 +344,8 @@ async fn execute_non_empty_expr( if let Ok(true) = should_write_cdc(snapshot) { // Create CDC scan let change_type_lit = lit(ScalarValue::Utf8(Some("delete".to_string()))); - let cdc_filter = df + let state = session_state_from_session(session)?; + let cdc_filter = DataFrame::new(state.clone(), source) .filter(expression.clone())? .with_column("_change_type", change_type_lit)? .create_physical_plan() @@ -308,7 +353,7 @@ async fn execute_non_empty_expr( let cdc_actions = write_execution_plan_cdc( Some(snapshot), - state.clone(), + session, cdc_filter, table_partition_cols.clone(), log_store.object_store(Some(operation_id)), @@ -330,7 +375,7 @@ async fn execute( predicate: Option, log_store: LogStoreRef, snapshot: EagerSnapshot, - session: SessionState, + session: &dyn Session, writer_properties: Option, mut commit_properties: CommitProperties, operation_id: Uuid, @@ -344,7 +389,7 @@ async fn execute( let mut metrics = DeleteMetrics::default(); let scan_start = Instant::now(); - let candidates = find_files(&snapshot, log_store.clone(), &session, predicate.clone()).await?; + let candidates = find_files(&snapshot, log_store.clone(), session, predicate.clone()).await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis() as u64; let predicate = predicate.unwrap_or(lit(true)); @@ -354,7 +399,7 @@ async fn execute( let add = execute_non_empty_expr( &snapshot, log_store.clone(), - &session, + session, &predicate, &candidates.candidates, &mut metrics, @@ -422,65 +467,6 @@ async fn execute( Ok((commit.snapshot().snapshot, metrics)) } -impl std::future::IntoFuture for DeleteBuilder { - type Output = DeltaResult<(DeltaTable, DeleteMetrics)>; - type IntoFuture = BoxFuture<'static, Self::Output>; - - fn into_future(self) -> Self::IntoFuture { - let this = self; - - Box::pin(async move { - PROTOCOL.check_append_only(&this.snapshot)?; - PROTOCOL.can_write_to(&this.snapshot)?; - - let operation_id = this.get_operation_id(); - this.pre_execute(operation_id).await?; - - let session = this - .session - .and_then(|session| session.as_any().downcast_ref::().cloned()) - .unwrap_or_else(|| { - let session: SessionContext = DeltaSessionContext::default().into(); - session.state() - }); - - register_store(this.log_store.clone(), session.runtime_env().as_ref()); - - let predicate = match this.predicate { - Some(predicate) => match predicate { - Expression::DataFusion(expr) => Some(expr), - Expression::String(s) => { - Some(this.snapshot.parse_predicate_expression(s, &session)?) - } - }, - None => None, - }; - - let (new_snapshot, metrics) = execute( - predicate, - this.log_store.clone(), - this.snapshot, - session, - this.writer_properties, - this.commit_properties, - operation_id, - this.custom_execute_handler.as_ref(), - ) - .await?; - - Ok(( - DeltaTable::new_with_state( - this.log_store, - DeltaTableState { - snapshot: new_snapshot, - }, - ), - metrics, - )) - }) - } -} - #[cfg(test)] mod tests { use crate::kernel::DataType as DeltaDataType; diff --git a/crates/core/src/operations/load.rs b/crates/core/src/operations/load.rs index b5e8ca9198..35b3c9f9fe 100644 --- a/crates/core/src/operations/load.rs +++ b/crates/core/src/operations/load.rs @@ -2,14 +2,12 @@ use std::sync::Arc; use datafusion::catalog::Session; use datafusion::datasource::TableProvider; -use datafusion::execution::context::TaskContext; -use datafusion::execution::{SessionState, SessionStateBuilder}; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; use futures::future::BoxFuture; use super::CustomExecuteHandler; -use crate::delta_datafusion::{DataFusionMixins as _, DeltaSessionConfig}; +use crate::delta_datafusion::{create_session, DataFusionMixins as _}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::transaction::PROTOCOL; use crate::kernel::EagerSnapshot; @@ -108,17 +106,14 @@ impl std::future::IntoFuture for LoadBuilder { let session = this .session - .and_then(|session| session.as_any().downcast_ref::().cloned()) - .unwrap_or_else(|| { - SessionStateBuilder::new() - .with_default_features() - .with_config(DeltaSessionConfig::default().into()) - .build() - }); - let scan_plan = table.scan(&session, projection.as_ref(), &[], None).await?; + .unwrap_or_else(|| Arc::new(create_session().into_inner().state())); + + let scan_plan = table + .scan(session.as_ref(), projection.as_ref(), &[], None) + .await?; + let plan = CoalescePartitionsExec::new(scan_plan); - let task_ctx = Arc::new(TaskContext::from(&session)); - let stream = plan.execute(0, task_ctx)?; + let stream = plan.execute(0, session.task_ctx())?; Ok((table, stream)) }) diff --git a/crates/core/src/operations/merge/filter.rs b/crates/core/src/operations/merge/filter.rs index b74aab9246..71434648fc 100644 --- a/crates/core/src/operations/merge/filter.rs +++ b/crates/core/src/operations/merge/filter.rs @@ -3,9 +3,9 @@ use std::collections::HashMap; use std::sync::Arc; use arrow::compute::concat_batches; +use datafusion::catalog::Session; use datafusion::common::tree_node::{Transformed, TreeNode}; use datafusion::common::{ScalarValue, TableReference}; -use datafusion::execution::context::SessionState; use datafusion::functions_aggregate::expr_fn::{max, min}; use datafusion::logical_expr::expr::{InList, Placeholder}; use datafusion::logical_expr::{lit, Aggregate, Between, BinaryExpr, Expr, LogicalPlan, Operator}; @@ -325,7 +325,7 @@ pub(crate) fn generalize_filter( pub(crate) async fn try_construct_early_filter( join_predicate: Expr, table_snapshot: &EagerSnapshot, - session_state: &SessionState, + session_state: &dyn Session, source: &LogicalPlan, source_name: &TableReference, target_name: &TableReference, @@ -397,7 +397,7 @@ pub(crate) async fn try_construct_early_filter( } async fn execute_plan_to_batch( - state: &SessionState, + state: &dyn Session, plan: Arc, ) -> DeltaResult { let data = futures::future::try_join_all( diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index b4b8069c23..acce00aad4 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -529,7 +529,7 @@ impl MergeOperation { fn try_from( config: MergeOperationConfig, schema: &DFSchema, - state: &SessionState, + state: &dyn Session, target_alias: &Option, ) -> DeltaResult { let mut ops = HashMap::with_capacity(config.operations.capacity()); @@ -1402,7 +1402,7 @@ async fn execute( let (mut actions, write_plan_metrics) = write_execution_plan_v2( Some(&snapshot), - state.clone(), + &state, write, table_partition_cols.clone(), log_store.object_store(Some(operation_id)), diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index 337734c90e..4242764566 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -25,8 +25,8 @@ use std::fmt; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; -use arrow_array::RecordBatch; -use arrow_schema::SchemaRef as ArrowSchemaRef; +use arrow::array::RecordBatch; +use arrow::datatypes::SchemaRef; use datafusion::catalog::Session; use datafusion::execution::context::SessionState; use datafusion::execution::memory_pool::FairSpillPool; @@ -491,7 +491,7 @@ pub struct MergeTaskParameters { /// Parameters passed to optimize operation input_parameters: OptimizeInput, /// Schema of written files - file_schema: ArrowSchemaRef, + file_schema: SchemaRef, /// Properties passed to parquet writer writer_properties: WriterProperties, /// Num index cols to collect stats for diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 4ef0562f27..66c3d531ca 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -40,7 +40,6 @@ use datafusion::{ execution::session_state::SessionStateBuilder, physical_plan::{metrics::MetricBuilder, ExecutionPlan}, physical_planner::{ExtensionPlanner, PhysicalPlanner}, - prelude::SessionContext, }; use futures::future::BoxFuture; use parquet::file::properties::WriterProperties; @@ -54,26 +53,26 @@ use super::{ write::execution::{write_execution_plan, write_execution_plan_cdc}, CustomExecuteHandler, Operation, }; -use crate::kernel::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; -use crate::kernel::{Action, Remove}; +use crate::delta_datafusion::{find_files, planner::DeltaPlanner, register_store}; use crate::logstore::LogStoreRef; use crate::operations::cdc::*; use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; use crate::{ delta_datafusion::{ + create_session, expr::fmt_expr_to_sql, logical::MetricObserver, physical::{find_metric_node, get_metric, MetricObserverExec}, - DataFusionMixins, DeltaColumn, DeltaScanConfigBuilder, DeltaSessionContext, + session_state_from_session, DataFusionMixins, DeltaColumn, DeltaScanConfigBuilder, DeltaTableProvider, }, + kernel::{ + transaction::{CommitBuilder, CommitProperties, PROTOCOL}, + Action, EagerSnapshot, Remove, + }, table::config::TablePropertiesExt, }; -use crate::{ - delta_datafusion::{find_files, planner::DeltaPlanner, register_store}, - kernel::EagerSnapshot, -}; use crate::{DeltaResult, DeltaTable, DeltaTableError}; /// Custom column name used for marking internal [RecordBatch] rows as updated @@ -401,7 +400,7 @@ async fn execute( let add_actions = write_execution_plan( Some(&snapshot), - session.clone(), + &session, physical_plan.clone(), table_partition_cols.clone(), log_store.object_store(Some(operation_id)).clone(), @@ -463,7 +462,7 @@ async fn execute( Ok(df) => { let cdc_actions = write_execution_plan_cdc( Some(&snapshot), - session, + &session, df.create_physical_plan().await?, table_partition_cols, log_store.object_store(Some(operation_id)), @@ -510,19 +509,16 @@ impl std::future::IntoFuture for UpdateBuilder { let session = this .session - .and_then(|session| session.as_any().downcast_ref::().cloned()) - .unwrap_or_else(|| { - let session: SessionContext = DeltaSessionContext::default().into(); - session.state() - }); + .unwrap_or_else(|| Arc::new(create_session().into_inner().state())); register_store(this.log_store.clone(), session.runtime_env().as_ref()); + let state = session_state_from_session(session.as_ref())?; let (snapshot, metrics) = execute( this.predicate, this.updates, this.log_store.clone(), this.snapshot, - session, + state.clone(), this.writer_properties, this.commit_properties, this.safe_cast, diff --git a/crates/core/src/operations/vacuum.rs b/crates/core/src/operations/vacuum.rs index 59fbe99cbe..d751f6a7f8 100644 --- a/crates/core/src/operations/vacuum.rs +++ b/crates/core/src/operations/vacuum.rs @@ -28,8 +28,7 @@ use std::sync::Arc; use chrono::{Duration, Utc}; use futures::future::{ready, BoxFuture}; use futures::{StreamExt, TryStreamExt}; -use object_store::Error; -use object_store::{path::Path, ObjectStore}; +use object_store::{path::Path, Error, ObjectStore}; use serde::Serialize; use tracing::*; diff --git a/crates/core/src/operations/write/execution.rs b/crates/core/src/operations/write/execution.rs index 38f96dd0d3..5cc9e15cc7 100644 --- a/crates/core/src/operations/write/execution.rs +++ b/crates/core/src/operations/write/execution.rs @@ -1,10 +1,12 @@ use std::sync::Arc; use std::vec; -use arrow_schema::SchemaRef as ArrowSchemaRef; -use datafusion::datasource::provider_as_source; -use datafusion::execution::context::{SessionState, TaskContext}; -use datafusion::logical_expr::{lit, when, Expr, LogicalPlanBuilder}; +use arrow::compute::concat_batches; +use arrow::datatypes::Schema; +use datafusion::catalog::{Session, TableProvider}; +use datafusion::datasource::{provider_as_source, MemTable}; +use datafusion::execution::context::SessionContext; +use datafusion::logical_expr::{col, lit, when, Expr, LogicalPlanBuilder}; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::DataFrame; use delta_kernel::engine::arrow_conversion::TryIntoKernel as _; @@ -16,26 +18,19 @@ use uuid::Uuid; use super::writer::{DeltaWriter, WriterConfig}; use crate::delta_datafusion::expr::fmt_expr_to_sql; -use crate::delta_datafusion::{find_files, DeltaScanConfigBuilder, DeltaTableProvider}; -use crate::delta_datafusion::{DataFusionMixins, DeltaDataChecker}; +use crate::delta_datafusion::{ + find_files, session_state_from_session, DataFusionMixins, DeltaDataChecker, + DeltaScanConfigBuilder, DeltaTableProvider, +}; use crate::errors::DeltaResult; use crate::kernel::{Action, Add, AddCDCFile, EagerSnapshot, Remove, StructType, StructTypeExt}; use crate::logstore::{LogStoreRef, ObjectStoreRef}; -use crate::operations::cdc::should_write_cdc; +use crate::operations::cdc::{should_write_cdc, CDC_COLUMN_NAME}; +use crate::operations::write::{WriteError, WriterStatsConfig}; use crate::table::config::TablePropertiesExt as _; use crate::table::Constraint as DeltaConstraint; use crate::DeltaTableError; -use arrow::compute::concat_batches; -use arrow_schema::Schema; -use datafusion::catalog::TableProvider; -use datafusion::datasource::MemTable; -use datafusion::execution::context::SessionContext; -use datafusion::logical_expr::col; - -use crate::operations::cdc::CDC_COLUMN_NAME; -use crate::operations::write::{WriteError, WriterStatsConfig}; - #[derive(Debug, Default)] pub(crate) struct WriteExecutionPlanMetrics { pub scan_time_ms: u64, @@ -45,7 +40,7 @@ pub(crate) struct WriteExecutionPlanMetrics { #[allow(clippy::too_many_arguments)] pub(crate) async fn write_execution_plan_cdc( snapshot: Option<&EagerSnapshot>, - session: SessionState, + session: &dyn Session, plan: Arc, partition_columns: Vec, object_store: ObjectStoreRef, @@ -92,7 +87,7 @@ pub(crate) async fn write_execution_plan_cdc( #[allow(clippy::too_many_arguments)] pub(crate) async fn write_execution_plan( snapshot: Option<&EagerSnapshot>, - session: SessionState, + session: &dyn Session, plan: Arc, partition_columns: Vec, object_store: ObjectStoreRef, @@ -122,7 +117,7 @@ pub(crate) async fn write_execution_plan( pub(crate) async fn execute_non_empty_expr( snapshot: &EagerSnapshot, log_store: LogStoreRef, - session: SessionState, + session: &dyn Session, partition_columns: Vec, expression: &Expr, rewrite: &[Add], @@ -147,24 +142,20 @@ pub(crate) async fn execute_non_empty_expr( ); let target_provider = provider_as_source(target_provider); - let source = LogicalPlanBuilder::scan("target", target_provider.clone(), None)?.build()?; - // We don't want to verify the predicate against existing data - - let df = DataFrame::new(session.clone(), source); + let source = + Arc::new(LogicalPlanBuilder::scan("target", target_provider.clone(), None)?.build()?); let cdf_df = if !partition_scan { // Apply the negation of the filter and rewrite files let negated_expression = Expr::Not(Box::new(Expr::IsTrue(Box::new(expression.clone())))); - - let filter = df - .clone() + let filter = LogicalPlanBuilder::from(source.clone()) .filter(negated_expression)? - .create_physical_plan() - .await?; + .build()?; + let filter = session.create_physical_plan(&filter).await?; let add_actions: Vec = write_execution_plan( Some(snapshot), - session.clone(), + session, filter, partition_columns.clone(), log_store.object_store(Some(operation_id)), @@ -181,6 +172,8 @@ pub(crate) async fn execute_non_empty_expr( // Only write when CDC actions when it was not a partition scan, load_cdf can deduce the deletes in that case // based on the remove actions if a partition got deleted if should_write_cdc(snapshot)? { + let state = session_state_from_session(session)?; + let df = DataFrame::new(state.clone(), source.as_ref().clone()); Some( df.filter(expression.clone())? .with_column(CDC_COLUMN_NAME, lit("delete"))?, @@ -201,7 +194,7 @@ pub(crate) async fn prepare_predicate_actions( predicate: Expr, log_store: LogStoreRef, snapshot: &EagerSnapshot, - session: SessionState, + session: &dyn Session, partition_columns: Vec, writer_properties: Option, deletion_timestamp: i64, @@ -211,7 +204,7 @@ pub(crate) async fn prepare_predicate_actions( let candidates = find_files( snapshot, log_store.clone(), - &session, + session, Some(predicate.clone()), ) .await?; @@ -252,7 +245,7 @@ pub(crate) async fn prepare_predicate_actions( #[allow(clippy::too_many_arguments)] pub(crate) async fn write_execution_plan_v2( snapshot: Option<&EagerSnapshot>, - session: SessionState, + session: &dyn Session, plan: Arc, partition_columns: Vec, object_store: ObjectStoreRef, @@ -265,7 +258,7 @@ pub(crate) async fn write_execution_plan_v2( ) -> DeltaResult<(Vec, WriteExecutionPlanMetrics)> { // We always take the plan Schema since the data may contain Large/View arrow types, // the schema and batches were prior constructed with this in mind. - let schema: ArrowSchemaRef = plan.schema(); + let schema = plan.schema(); let mut checker = if let Some(snapshot) = snapshot { DeltaDataChecker::new(snapshot) } else { @@ -290,7 +283,6 @@ pub(crate) async fn write_execution_plan_v2( for i in 0..plan.properties().output_partitioning().partition_count() { let inner_plan = plan.clone(); let inner_schema = schema.clone(); - let task_ctx = Arc::new(TaskContext::from(&session)); let config = WriterConfig::new( inner_schema.clone(), partition_columns.clone(), @@ -303,7 +295,7 @@ pub(crate) async fn write_execution_plan_v2( let mut writer = DeltaWriter::new(object_store.clone(), config); let checker_stream = checker.clone(); let scan_start = std::time::Instant::now(); - let mut stream = inner_plan.execute(i, task_ctx)?; + let mut stream = inner_plan.execute(i, session.task_ctx())?; let handle: tokio::task::JoinHandle< DeltaResult<(Vec, WriteExecutionPlanMetrics)>, @@ -356,7 +348,6 @@ pub(crate) async fn write_execution_plan_v2( .collect::>(), )); let cdf_schema = schema.clone(); - let task_ctx = Arc::new(TaskContext::from(&session)); let normal_config = WriterConfig::new( write_schema.clone(), partition_columns.clone(), @@ -383,7 +374,7 @@ pub(crate) async fn write_execution_plan_v2( let checker_stream = checker.clone(); let scan_start = std::time::Instant::now(); - let mut stream = inner_plan.execute(i, task_ctx)?; + let mut stream = inner_plan.execute(i, session.task_ctx())?; let session_context = SessionContext::new(); diff --git a/crates/core/src/operations/write/mod.rs b/crates/core/src/operations/write/mod.rs index e4a9880f06..0e8977cbd3 100644 --- a/crates/core/src/operations/write/mod.rs +++ b/crates/core/src/operations/write/mod.rs @@ -23,34 +23,21 @@ //! let table = ops.write(vec![batch]).await?; //! ```` -pub(crate) mod async_utils; -pub mod configs; -pub(crate) mod execution; -pub(crate) mod generated_columns; -pub(crate) mod metrics; -pub(crate) mod schema_evolution; -pub mod writer; - -use arrow_schema::Schema; -pub use configs::WriterStatsConfig; -use datafusion::execution::SessionStateBuilder; -use delta_kernel::engine::arrow_conversion::TryIntoKernel as _; -use generated_columns::{able_to_gc, add_generated_columns, add_missing_generated_columns}; -use metrics::{SOURCE_COUNT_ID, SOURCE_COUNT_METRIC}; use std::collections::HashMap; use std::str::FromStr; use std::sync::Arc; use std::time::{Instant, SystemTime, UNIX_EPOCH}; use std::vec; -use arrow_array::RecordBatch; +use arrow::array::RecordBatch; +use arrow_schema::Schema; use datafusion::catalog::{Session, TableProvider}; use datafusion::common::{Column, DFSchema, Result, ScalarValue}; use datafusion::datasource::MemTable; use datafusion::execution::context::{SessionContext, SessionState}; use datafusion::logical_expr::{cast, lit, try_cast, Expr, Extension, LogicalPlan}; use datafusion::prelude::DataFrame; -use execution::{prepare_predicate_actions, write_execution_plan_v2}; +use delta_kernel::engine::arrow_conversion::TryIntoKernel as _; use futures::future::BoxFuture; use parquet::file::properties::WriterProperties; use schema_evolution::try_cast_schema; @@ -58,6 +45,10 @@ use serde::{Deserialize, Serialize}; use tracing::log::*; use tracing::Instrument; +pub use self::configs::WriterStatsConfig; +use self::execution::{prepare_predicate_actions, write_execution_plan_v2}; +use self::generated_columns::{able_to_gc, add_generated_columns, add_missing_generated_columns}; +use self::metrics::{SOURCE_COUNT_ID, SOURCE_COUNT_METRIC}; use super::cdc::CDC_COLUMN_NAME; use super::datafusion_utils::Expression; use super::{CreateBuilder, CustomExecuteHandler, Operation}; @@ -66,8 +57,8 @@ use crate::delta_datafusion::expr::parse_predicate_expression; use crate::delta_datafusion::logical::MetricObserver; use crate::delta_datafusion::physical::{find_metric_node, get_metric}; use crate::delta_datafusion::planner::DeltaPlanner; -use crate::delta_datafusion::register_store; -use crate::delta_datafusion::DataFusionMixins; +use crate::delta_datafusion::{create_session, register_store}; +use crate::delta_datafusion::{session_state_from_session, DataFusionMixins}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::schema::cast::merge_arrow_schema; use crate::kernel::transaction::{CommitBuilder, CommitProperties, TableReference, PROTOCOL}; @@ -79,6 +70,14 @@ use crate::logstore::LogStoreRef; use crate::protocol::{DeltaOperation, SaveMode}; use crate::DeltaTable; +pub(crate) mod async_utils; +pub mod configs; +pub(crate) mod execution; +pub(crate) mod generated_columns; +pub(crate) mod metrics; +pub(crate) mod schema_evolution; +pub mod writer; + #[derive(thiserror::Error, Debug)] pub(crate) enum WriteError { #[error("No data source supplied to write command.")] @@ -446,18 +445,15 @@ impl std::future::IntoFuture for WriteBuilder { let session = this .session - .and_then(|session| session.as_any().downcast_ref::().cloned()) - .map(SessionStateBuilder::new_from_existing) - .unwrap_or_default() - .with_query_planner(write_planner) - .build(); + .unwrap_or_else(|| Arc::new(create_session().into_inner().state())); register_store(this.log_store.clone(), session.runtime_env().as_ref()); + let state = session_state_from_session(session.as_ref())?; let mut schema_drift = false; let mut generated_col_exp = None; let mut missing_gen_col = None; let mut source = - DataFrame::new(session.clone(), this.input.unwrap().as_ref().clone()); + DataFrame::new(state.clone(), this.input.unwrap().as_ref().clone()); if let Some(snapshot) = &this.snapshot { if able_to_gc(snapshot)? { let generated_col_expressions = @@ -546,7 +542,7 @@ impl std::future::IntoFuture for WriteBuilder { source, &generated_columns_exp, &missing_generated_col, - &session, + state, )?; } } @@ -559,7 +555,7 @@ impl std::future::IntoFuture for WriteBuilder { }), }); - let mut source = DataFrame::new(session.clone(), source); + let mut source = DataFrame::new(state.clone(), source); let schema = Arc::new(source.schema().as_arrow().clone()); @@ -607,7 +603,7 @@ impl std::future::IntoFuture for WriteBuilder { Expression::DataFusion(expr) => expr, Expression::String(s) => { let df_schema = DFSchema::try_from(schema.as_ref().to_owned())?; - parse_predicate_expression(&df_schema, s, &session)? + parse_predicate_expression(&df_schema, s, session.as_ref())? } }; (Some(fmt_expr_to_sql(&pred)?), Some(pred)) @@ -647,7 +643,7 @@ impl std::future::IntoFuture for WriteBuilder { pred.clone(), this.log_store.clone(), snapshot, - session.clone(), + session.as_ref(), partition_columns.clone(), this.writer_properties.clone(), deletion_timestamp, @@ -687,7 +683,7 @@ impl std::future::IntoFuture for WriteBuilder { // Here we need to validate if the new data conforms to a predicate if one is provided let (add_actions, _) = write_execution_plan_v2( this.snapshot.as_ref(), - session.clone(), + session.as_ref(), source_plan.clone(), partition_columns.clone(), this.log_store.object_store(Some(operation_id)).clone(), diff --git a/crates/core/tests/integration_datafusion.rs b/crates/core/tests/integration_datafusion.rs index e6ded8302a..cce70533bf 100644 --- a/crates/core/tests/integration_datafusion.rs +++ b/crates/core/tests/integration_datafusion.rs @@ -15,7 +15,7 @@ use datafusion::common::scalar::ScalarValue; use datafusion::common::ScalarValue::*; use datafusion::common::{DataFusionError, Result}; use datafusion::datasource::TableProvider; -use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; +use datafusion::execution::context::{SessionContext, TaskContext}; use datafusion::execution::SessionStateBuilder; use datafusion::logical_expr::Expr; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; @@ -51,6 +51,7 @@ pub fn context_with_delta_table_factory() -> SessionContext { mod local { use super::*; + use datafusion::catalog::Session; use datafusion::common::assert_contains; use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion}; use datafusion::datasource::source::DataSourceExec; @@ -61,6 +62,7 @@ mod local { use datafusion::prelude::SessionConfig; use datafusion::{common::stats::Precision, datasource::provider_as_source}; use delta_kernel::engine::arrow_conversion::TryIntoKernel as _; + use deltalake_core::delta_datafusion::create_session; use deltalake_core::{ delta_datafusion::DeltaLogicalCodec, logstore::default_logstore, writer::JsonWriter, }; @@ -412,7 +414,7 @@ mod local { }; // Build a new context from scratch and deserialize the plan - let ctx = SessionContext::new(); + let ctx = create_session().into_inner(); let state = ctx.state(); let source_scan = Arc::new(logical_plan_from_bytes_with_extension_codec( &source_scan_bytes, @@ -422,7 +424,6 @@ mod local { let schema: StructType = source_scan.schema().as_arrow().try_into_kernel().unwrap(); let fields = schema.fields().cloned(); - dbg!(schema.fields().collect_vec().clone()); // Create target Delta Table let target_table = CreateBuilder::new() .with_location("memory:///target") @@ -653,7 +654,7 @@ mod local { async fn get_scan_metrics_with_limit( table: &DeltaTable, - state: &SessionState, + state: &dyn Session, e: &[Expr], limit: Option, ) -> Result { @@ -675,7 +676,7 @@ mod local { async fn get_scan_metrics( table: &DeltaTable, - state: &SessionState, + state: &dyn Session, e: &[Expr], ) -> Result { get_scan_metrics_with_limit(table, state, e, None).await