From 81feae8c925c5c6eb1c4f73f7eaffd4e639e6855 Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Thu, 2 Oct 2025 16:20:27 -0400 Subject: [PATCH 1/6] Unify `SessionState` api Signed-off-by: Abhi Agarwal --- crates/core/src/operations/constraints.rs | 2 +- crates/core/src/operations/load.rs | 27 +++++++++++++++------ crates/core/src/operations/load_cdf.rs | 9 +++++++ crates/core/src/operations/optimize.rs | 4 +-- crates/core/src/operations/write/mod.rs | 7 ++++++ crates/core/tests/integration_datafusion.rs | 2 +- 6 files changed, 40 insertions(+), 11 deletions(-) diff --git a/crates/core/src/operations/constraints.rs b/crates/core/src/operations/constraints.rs index 8581668a78..40b05ead91 100644 --- a/crates/core/src/operations/constraints.rs +++ b/crates/core/src/operations/constraints.rs @@ -76,7 +76,7 @@ impl ConstraintBuilder { self } - /// Specify the datafusion session context + /// The Datafusion session state to use pub fn with_session_state(mut self, state: SessionState) -> Self { self.state = Some(state); self diff --git a/crates/core/src/operations/load.rs b/crates/core/src/operations/load.rs index 68268cecfd..cc24c355e4 100644 --- a/crates/core/src/operations/load.rs +++ b/crates/core/src/operations/load.rs @@ -1,13 +1,14 @@ use std::sync::Arc; use datafusion::datasource::TableProvider; -use datafusion::execution::context::{SessionContext, TaskContext}; +use datafusion::execution::context::{TaskContext}; +use datafusion::execution::{SessionState, SessionStateBuilder}; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; use futures::future::BoxFuture; use super::CustomExecuteHandler; -use crate::delta_datafusion::DataFusionMixins as _; +use crate::delta_datafusion::{DataFusionMixins as _, DeltaSessionConfig}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::transaction::PROTOCOL; use crate::kernel::EagerSnapshot; @@ -23,6 +24,8 @@ pub struct LoadBuilder { log_store: LogStoreRef, /// A sub-selection of columns to be loaded columns: Option>, + /// Datafusion session state relevant for executing the input plan + state: Option, } impl super::Operation<()> for LoadBuilder { @@ -41,6 +44,7 @@ impl LoadBuilder { snapshot, log_store, columns: None, + state: None, } } @@ -49,6 +53,12 @@ impl LoadBuilder { self.columns = Some(columns.into_iter().map(|s| s.into()).collect()); self } + + /// The Datafusion session state to use + pub fn with_session_state(mut self, state: SessionState) -> Self { + self.state = Some(state); + self + } } impl std::future::IntoFuture for LoadBuilder { @@ -86,12 +96,15 @@ impl std::future::IntoFuture for LoadBuilder { }) .transpose()?; - let ctx = SessionContext::new(); - let scan_plan = table - .scan(&ctx.state(), projection.as_ref(), &[], None) - .await?; + let state = this.state.unwrap_or_else(|| { + SessionStateBuilder::new() + .with_default_features() + .with_config(DeltaSessionConfig::default().into()) + .build() + }); + let scan_plan = table.scan(&state, projection.as_ref(), &[], None).await?; let plan = CoalescePartitionsExec::new(scan_plan); - let task_ctx = Arc::new(TaskContext::from(&ctx.state())); + let task_ctx = Arc::new(TaskContext::from(&state)); let stream = plan.execute(0, task_ctx)?; Ok((table, stream)) diff --git a/crates/core/src/operations/load_cdf.rs b/crates/core/src/operations/load_cdf.rs index d013477ce9..9e24bb97bd 100644 --- a/crates/core/src/operations/load_cdf.rs +++ b/crates/core/src/operations/load_cdf.rs @@ -55,6 +55,8 @@ pub struct CdfLoadBuilder { ending_timestamp: Option>, /// Enable ending version or timestamp exceeding the last commit allow_out_of_range: bool, + /// Datafusion session state relevant for executing the input plan + state: Option, } impl CdfLoadBuilder { @@ -68,6 +70,7 @@ impl CdfLoadBuilder { starting_timestamp: None, ending_timestamp: None, allow_out_of_range: false, + state: None, } } @@ -101,6 +104,12 @@ impl CdfLoadBuilder { self } + /// The Datafusion session state to use + pub fn with_session_state(mut self, state: SessionState) -> Self { + self.state = Some(state); + self + } + async fn calculate_earliest_version(&self) -> DeltaResult { let ts = self.starting_timestamp.unwrap_or(DateTime::UNIX_EPOCH); for v in 0..self.snapshot.version() { diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index 94f6114733..d3ced85e54 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -317,8 +317,8 @@ impl<'a> OptimizeBuilder<'a> { self } - /// A session state accompanying a given input plan, containing e.g. registered object stores - pub fn with_input_session_state(mut self, state: SessionState) -> Self { + /// The Datafusion session state to use + pub fn with_session_state(mut self, state: SessionState) -> Self { self.state = Some(state); self } diff --git a/crates/core/src/operations/write/mod.rs b/crates/core/src/operations/write/mod.rs index 765c45c3df..68d1d79a8a 100644 --- a/crates/core/src/operations/write/mod.rs +++ b/crates/core/src/operations/write/mod.rs @@ -247,11 +247,18 @@ impl WriteBuilder { } /// A session state accompanying a given input plan, containing e.g. registered object stores + #[deprecated(since = "0.29.0", note = "Use `with_session_state` instead")] pub fn with_input_session_state(mut self, state: SessionState) -> Self { self.state = Some(state); self } + /// The Datafusion session state to use + pub fn with_session_state(mut self, state: SessionState) -> Self { + self.state = Some(state); + self + } + /// Specify the target file size for data files written to the delta table. pub fn with_target_file_size(mut self, target_file_size: usize) -> Self { self.target_file_size = Some(target_file_size); diff --git a/crates/core/tests/integration_datafusion.rs b/crates/core/tests/integration_datafusion.rs index 5caab81862..7b8ded6164 100644 --- a/crates/core/tests/integration_datafusion.rs +++ b/crates/core/tests/integration_datafusion.rs @@ -436,7 +436,7 @@ mod local { target_table.snapshot().ok().map(|s| s.snapshot()).cloned(), ) .with_input_execution_plan(source_scan) - .with_input_session_state(state) + .with_session_state(state) .await?; ctx.register_table("target", Arc::new(target_table))?; From 974cea906530cb9e27d1ebfaf22b51f43ea06706 Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Sun, 5 Oct 2025 17:18:47 -0400 Subject: [PATCH 2/6] Move to `dyn Session` Signed-off-by: Abhi Agarwal --- crates/core/src/delta_datafusion/expr.rs | 23 +++++---- .../core/src/delta_datafusion/find_files.rs | 7 +-- crates/core/src/delta_datafusion/mod.rs | 16 +++---- .../src/delta_datafusion/table_provider.rs | 6 +-- crates/core/src/operations/constraints.rs | 27 ++++++----- crates/core/src/operations/delete.rs | 46 +++++++++++------- crates/core/src/operations/load.rs | 33 +++++++++---- crates/core/src/operations/load_cdf.rs | 22 +++++++-- crates/core/src/operations/merge/mod.rs | 25 +++++----- crates/core/src/operations/mod.rs | 11 ++--- crates/core/src/operations/optimize.rs | 47 +++++++++---------- crates/core/src/operations/update.rs | 25 +++++----- crates/core/src/operations/write/mod.rs | 31 ++++++------ crates/core/tests/command_optimize.rs | 17 ++++--- crates/core/tests/integration_datafusion.rs | 2 +- 15 files changed, 197 insertions(+), 141 deletions(-) diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index cbfb1e8547..d785615033 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -21,11 +21,13 @@ //! Utility functions for Datafusion's Expressions use std::fmt::{self, Display, Error, Formatter, Write}; +use std::marker::PhantomData; use std::sync::Arc; use arrow_array::{Array, GenericListArray}; use arrow_schema::{DataType, Field}; use chrono::{DateTime, NaiveDate}; +use datafusion::catalog::Session; use datafusion::common::Result as DFResult; use datafusion::common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference}; use datafusion::execution::context::SessionState; @@ -186,30 +188,35 @@ impl ExprPlanner for CustomNestedFunctionPlanner { pub(crate) struct DeltaContextProvider<'a> { state: SessionState, - /// Keeping this around just to make use of the 'a lifetime - _original: &'a SessionState, planners: Vec>, + /// Keeping this around just to make use of the 'a lifetime + _phantom: PhantomData<&'a SessionState>, } impl<'a> DeltaContextProvider<'a> { - fn new(state: &'a SessionState) -> Self { + fn new(session: &'a dyn Session) -> Self { // default planners are [CoreFunctionPlanner, NestedFunctionPlanner, FieldAccessPlanner, // UserDefinedFunctionPlanner] let planners: Vec> = vec![ Arc::new(CoreFunctionPlanner::default()), Arc::new(CustomNestedFunctionPlanner::default()), Arc::new(FieldAccessPlanner), - Arc::new(datafusion::functions::planner::UserDefinedFunctionPlanner), + Arc::new(datafusion::functions::unicode::planner::UnicodeFunctionPlanner), + Arc::new(datafusion::functions::datetime::planner::DatetimeFunctionPlanner), ]; // Disable the above for testing //let planners = state.expr_planners(); - let new_state = SessionStateBuilder::new_from_existing(state.clone()) + let new_state = session + .as_any() + .downcast_ref::() + .map(|state| SessionStateBuilder::new_from_existing(state.clone())) + .unwrap_or_default() .with_expr_planners(planners.clone()) .build(); DeltaContextProvider { planners, state: new_state, - _original: state, + _phantom: PhantomData, } } } @@ -260,7 +267,7 @@ impl ContextProvider for DeltaContextProvider<'_> { pub fn parse_predicate_expression( schema: &DFSchema, expr: impl AsRef, - df_state: &SessionState, + session: &dyn Session, ) -> DeltaResult { let dialect = &GenericDialect {}; let mut tokenizer = Tokenizer::new(dialect, expr.as_ref()); @@ -276,7 +283,7 @@ pub fn parse_predicate_expression( source: Box::new(err), })?; - let context_provider = DeltaContextProvider::new(df_state); + let context_provider = DeltaContextProvider::new(session); let sql_to_rel = SqlToRel::new_with_options(&context_provider, DeltaParserOptions::default().into()); diff --git a/crates/core/src/delta_datafusion/find_files.rs b/crates/core/src/delta_datafusion/find_files.rs index ebfed00f8a..7c5174863a 100644 --- a/crates/core/src/delta_datafusion/find_files.rs +++ b/crates/core/src/delta_datafusion/find_files.rs @@ -5,8 +5,9 @@ use std::sync::Arc; use arrow_array::{Array, RecordBatch, StringArray}; use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, Schema as ArrowSchema}; use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; +use datafusion::catalog::Session; use datafusion::datasource::MemTable; -use datafusion::execution::context::{SessionContext, SessionState, TaskContext}; +use datafusion::execution::context::{SessionContext, TaskContext}; use datafusion::logical_expr::{col, Expr, Volatility}; use datafusion::physical_plan::filter::FilterExec; use datafusion::physical_plan::limit::LocalLimitExec; @@ -33,7 +34,7 @@ pub(crate) struct FindFiles { pub(crate) async fn find_files( snapshot: &EagerSnapshot, log_store: LogStoreRef, - state: &SessionState, + state: &dyn Session, predicate: Option, ) -> DeltaResult { let current_metadata = snapshot.metadata(); @@ -190,7 +191,7 @@ fn join_batches_with_add_actions( async fn find_files_scan( snapshot: &EagerSnapshot, log_store: LogStoreRef, - state: &SessionState, + state: &dyn Session, expression: Expr, ) -> DeltaResult> { let candidate_map: HashMap = snapshot diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 8f47e86457..0b54d8d6f7 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -126,7 +126,7 @@ pub trait DataFusionMixins { fn parse_predicate_expression( &self, expr: impl AsRef, - df_state: &SessionState, + session: &impl Session, ) -> DeltaResult; } @@ -150,10 +150,10 @@ impl DataFusionMixins for Snapshot { fn parse_predicate_expression( &self, expr: impl AsRef, - df_state: &SessionState, + session: &impl Session, ) -> DeltaResult { let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?; - parse_predicate_expression(&schema, expr, df_state) + parse_predicate_expression(&schema, expr, session) } } @@ -189,10 +189,10 @@ impl DataFusionMixins for LogDataHandler<'_> { fn parse_predicate_expression( &self, expr: impl AsRef, - df_state: &SessionState, + session: &impl Session, ) -> DeltaResult { let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?; - parse_predicate_expression(&schema, expr, df_state) + parse_predicate_expression(&schema, expr, session) } } @@ -208,9 +208,9 @@ impl DataFusionMixins for EagerSnapshot { fn parse_predicate_expression( &self, expr: impl AsRef, - df_state: &SessionState, + session: &impl Session, ) -> DeltaResult { - self.snapshot().parse_predicate_expression(expr, df_state) + self.snapshot().parse_predicate_expression(expr, session) } } @@ -303,7 +303,7 @@ impl DeltaTableState { // each delta table must register a specific object store, since paths are internally // handled relative to the table root. -pub(crate) fn register_store(store: LogStoreRef, env: Arc) { +pub(crate) fn register_store(store: LogStoreRef, env: &RuntimeEnv) { let object_store_url = store.object_store_url(); let url: &Url = object_store_url.as_ref(); env.register_object_store(url, store.object_store(None)); diff --git a/crates/core/src/delta_datafusion/table_provider.rs b/crates/core/src/delta_datafusion/table_provider.rs index d4c6becdc8..f375cb80fe 100644 --- a/crates/core/src/delta_datafusion/table_provider.rs +++ b/crates/core/src/delta_datafusion/table_provider.rs @@ -729,7 +729,7 @@ impl TableProvider for DeltaTable { filters: &[Expr], limit: Option, ) -> Result> { - register_store(self.log_store(), session.runtime_env().clone()); + register_store(self.log_store(), session.runtime_env().as_ref()); let filter_expr = conjunction(filters.iter().cloned()); let scan = DeltaScanBuilder::new(self.snapshot()?.snapshot(), self.log_store(), session) @@ -817,7 +817,7 @@ impl TableProvider for DeltaTableProvider { filters: &[Expr], limit: Option, ) -> Result> { - register_store(self.log_store.clone(), session.runtime_env().clone()); + register_store(self.log_store.clone(), session.runtime_env().as_ref()); let filter_expr = conjunction(filters.iter().cloned()); let mut scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session) @@ -855,7 +855,7 @@ impl TableProvider for DeltaTableProvider { input: Arc, insert_op: InsertOp, ) -> Result> { - register_store(self.log_store.clone(), state.runtime_env().clone()); + register_store(self.log_store.clone(), state.runtime_env().as_ref()); let save_mode = match insert_op { InsertOp::Append => SaveMode::Append, diff --git a/crates/core/src/operations/constraints.rs b/crates/core/src/operations/constraints.rs index 40b05ead91..786a4cbc23 100644 --- a/crates/core/src/operations/constraints.rs +++ b/crates/core/src/operations/constraints.rs @@ -2,9 +2,9 @@ use std::sync::Arc; +use datafusion::catalog::Session; use datafusion::common::ToDFSchema; -use datafusion::execution::context::SessionState; -use datafusion::execution::{SendableRecordBatchStream, TaskContext}; +use datafusion::execution::{SendableRecordBatchStream, SessionState, TaskContext}; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; use delta_kernel::table_features::WriterFeature; @@ -36,7 +36,7 @@ pub struct ConstraintBuilder { /// Delta object store for handling data files log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan - state: Option, + state: Option>, /// Additional information to add to the commit commit_properties: CommitProperties, custom_execute_handler: Option>, @@ -77,7 +77,7 @@ impl ConstraintBuilder { } /// The Datafusion session state to use - pub fn with_session_state(mut self, state: SessionState) -> Self { + pub fn with_session_state(mut self, state: Arc) -> Self { self.state = Some(state); self } @@ -130,18 +130,21 @@ impl std::future::IntoFuture for ConstraintBuilder { ))); } - let state = this.state.unwrap_or_else(|| { - let session: SessionContext = DeltaSessionContext::default().into(); - register_store(this.log_store.clone(), session.runtime_env()); - session.state() - }); + let session = this + .state + .and_then(|state| state.as_any().downcast_ref::().cloned()) + .unwrap_or_else(|| { + let session: SessionContext = DeltaSessionContext::default().into(); + session.state() + }); + register_store(this.log_store.clone(), session.runtime_env().as_ref()); - let scan = DeltaScanBuilder::new(&this.snapshot, this.log_store.clone(), &state) + let scan = DeltaScanBuilder::new(&this.snapshot, this.log_store.clone(), &session) .build() .await?; let schema = scan.schema().to_dfschema()?; - let expr = into_expr(expr, &schema, &state)?; + let expr = into_expr(expr, &schema, &session)?; let expr_str = fmt_expr_to_sql(&expr)?; // Checker built here with the one time constraint to check. @@ -153,7 +156,7 @@ impl std::future::IntoFuture for ConstraintBuilder { for p in 0..plan.properties().output_partitioning().partition_count() { let inner_plan = plan.clone(); let inner_checker = checker.clone(); - let task_ctx = Arc::new(TaskContext::from(&state)); + let task_ctx= Arc::new((&session).into()); let mut record_stream: SendableRecordBatchStream = inner_plan.execute(p, task_ctx)?; let handle: tokio::task::JoinHandle> = diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index f5005e0ac9..e5609c9a40 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -18,6 +18,7 @@ //! ```` use async_trait::async_trait; +use datafusion::catalog::Session; use datafusion::common::ScalarValue; use datafusion::dataframe::DataFrame; use datafusion::datasource::provider_as_source; @@ -68,6 +69,7 @@ const SOURCE_COUNT_METRIC: &str = "num_source_rows"; /// Delete Records from the Delta Table. /// See this module's documentation for more information +#[derive(Clone)] pub struct DeleteBuilder { /// Which records to delete predicate: Option, @@ -76,7 +78,7 @@ pub struct DeleteBuilder { /// Delta object store for handling data files log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan - state: Option, + state: Option>, /// Properties passed to underlying parquet writer for when files are rewritten writer_properties: Option, /// Commit properties and configuration @@ -84,6 +86,17 @@ pub struct DeleteBuilder { custom_execute_handler: Option>, } +impl std::fmt::Debug for DeleteBuilder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DeleteBuilder") + .field("predicate", &self.predicate) + .field("snapshot", &self.snapshot) + .field("log_store", &self.log_store) + .field("commit_properties", &self.commit_properties) + .finish() + } +} + #[derive(Default, Debug, Serialize)] /// Metrics for the Delete Operation pub struct DeleteMetrics { @@ -133,7 +146,7 @@ impl DeleteBuilder { } /// The Datafusion session state to use - pub fn with_session_state(mut self, state: SessionState) -> Self { + pub fn with_session_state(mut self, state: Arc) -> Self { self.state = Some(state); self } @@ -197,7 +210,7 @@ impl ExtensionPlanner for DeleteMetricExtensionPlanner { async fn execute_non_empty_expr( snapshot: &EagerSnapshot, log_store: LogStoreRef, - state: &SessionState, + session: &SessionState, expression: &Expr, rewrite: &[Add], metrics: &mut DeleteMetrics, @@ -212,7 +225,7 @@ async fn execute_non_empty_expr( let delete_planner = DeltaPlanner::new(); - let state = SessionStateBuilder::new_from_existing(state.clone()) + let state = SessionStateBuilder::new_from_existing(session.clone()) .with_query_planner(delete_planner) .build(); @@ -316,7 +329,7 @@ async fn execute( predicate: Option, log_store: LogStoreRef, snapshot: EagerSnapshot, - state: SessionState, + session: SessionState, writer_properties: Option, mut commit_properties: CommitProperties, operation_id: Uuid, @@ -330,7 +343,7 @@ async fn execute( let mut metrics = DeleteMetrics::default(); let scan_start = Instant::now(); - let candidates = find_files(&snapshot, log_store.clone(), &state, predicate.clone()).await?; + let candidates = find_files(&snapshot, log_store.clone(), &session, predicate.clone()).await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis() as u64; let predicate = predicate.unwrap_or(lit(true)); @@ -340,7 +353,7 @@ async fn execute( let add = execute_non_empty_expr( &snapshot, log_store.clone(), - &state, + &session, &predicate, &candidates.candidates, &mut metrics, @@ -422,20 +435,21 @@ impl std::future::IntoFuture for DeleteBuilder { let operation_id = this.get_operation_id(); this.pre_execute(operation_id).await?; - let state = this.state.unwrap_or_else(|| { - let session: SessionContext = DeltaSessionContext::default().into(); - - // If a user provides their own their DF state then they must register the store themselves - register_store(this.log_store.clone(), session.runtime_env()); + let session = this + .state + .and_then(|state| state.as_any().downcast_ref::().cloned()) + .unwrap_or_else(|| { + let session: SessionContext = DeltaSessionContext::default().into(); + session.state() + }); - session.state() - }); + register_store(this.log_store.clone(), session.runtime_env().as_ref()); let predicate = match this.predicate { Some(predicate) => match predicate { Expression::DataFusion(expr) => Some(expr), Expression::String(s) => { - Some(this.snapshot.parse_predicate_expression(s, &state)?) + Some(this.snapshot.parse_predicate_expression(s, &session)?) } }, None => None, @@ -445,7 +459,7 @@ impl std::future::IntoFuture for DeleteBuilder { predicate, this.log_store.clone(), this.snapshot, - state, + session, this.writer_properties, this.commit_properties, operation_id, diff --git a/crates/core/src/operations/load.rs b/crates/core/src/operations/load.rs index cc24c355e4..de20213465 100644 --- a/crates/core/src/operations/load.rs +++ b/crates/core/src/operations/load.rs @@ -1,7 +1,8 @@ use std::sync::Arc; +use datafusion::catalog::Session; use datafusion::datasource::TableProvider; -use datafusion::execution::context::{TaskContext}; +use datafusion::execution::context::TaskContext; use datafusion::execution::{SessionState, SessionStateBuilder}; use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream}; @@ -16,7 +17,7 @@ use crate::logstore::LogStoreRef; use crate::table::state::DeltaTableState; use crate::DeltaTable; -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct LoadBuilder { /// A snapshot of the to-be-loaded table's state snapshot: EagerSnapshot, @@ -25,7 +26,16 @@ pub struct LoadBuilder { /// A sub-selection of columns to be loaded columns: Option>, /// Datafusion session state relevant for executing the input plan - state: Option, + state: Option>, +} + +impl std::fmt::Debug for LoadBuilder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("LoadBuilder") + .field("snapshot", &self.snapshot) + .field("log_store", &self.log_store) + .finish() + } } impl super::Operation<()> for LoadBuilder { @@ -55,7 +65,7 @@ impl LoadBuilder { } /// The Datafusion session state to use - pub fn with_session_state(mut self, state: SessionState) -> Self { + pub fn with_session_state(mut self, state: Arc) -> Self { self.state = Some(state); self } @@ -96,12 +106,15 @@ impl std::future::IntoFuture for LoadBuilder { }) .transpose()?; - let state = this.state.unwrap_or_else(|| { - SessionStateBuilder::new() - .with_default_features() - .with_config(DeltaSessionConfig::default().into()) - .build() - }); + let state = this + .state + .and_then(|state| state.as_any().downcast_ref::().cloned()) + .unwrap_or_else(|| { + SessionStateBuilder::new() + .with_default_features() + .with_config(DeltaSessionConfig::default().into()) + .build() + }); let scan_plan = table.scan(&state, projection.as_ref(), &[], None).await?; let plan = CoalescePartitionsExec::new(scan_plan); let task_ctx = Arc::new(TaskContext::from(&state)); diff --git a/crates/core/src/operations/load_cdf.rs b/crates/core/src/operations/load_cdf.rs index 9e24bb97bd..d2e232d708 100644 --- a/crates/core/src/operations/load_cdf.rs +++ b/crates/core/src/operations/load_cdf.rs @@ -39,7 +39,7 @@ use crate::DeltaTableError; use crate::{delta_datafusion::cdf::*, kernel::Remove}; /// Builder for create a read of change data feeds for delta tables -#[derive(Clone, Debug)] +#[derive(Clone)] pub struct CdfLoadBuilder { /// A snapshot of the to-be-loaded table's state pub snapshot: EagerSnapshot, @@ -56,7 +56,21 @@ pub struct CdfLoadBuilder { /// Enable ending version or timestamp exceeding the last commit allow_out_of_range: bool, /// Datafusion session state relevant for executing the input plan - state: Option, + state: Option>, +} + +impl std::fmt::Debug for CdfLoadBuilder { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("CdfLoadBuilder") + .field("snapshot", &self.snapshot) + .field("log_store", &self.log_store) + .field("starting_version", &self.starting_version) + .field("ending_version", &self.ending_version) + .field("starting_timestamp", &self.starting_timestamp) + .field("ending_timestamp", &self.ending_timestamp) + .field("allow_out_of_range", &self.allow_out_of_range) + .finish() + } } impl CdfLoadBuilder { @@ -105,7 +119,7 @@ impl CdfLoadBuilder { } /// The Datafusion session state to use - pub fn with_session_state(mut self, state: SessionState) -> Self { + pub fn with_session_state(mut self, state: Arc) -> Self { self.state = Some(state); self } @@ -340,7 +354,7 @@ impl CdfLoadBuilder { PROTOCOL.can_read_from(&self.snapshot)?; let (cdc, add, remove) = self.determine_files_to_read().await?; - register_store(self.log_store.clone(), session.runtime_env().clone()); + register_store(self.log_store.clone(), session.runtime_env().as_ref()); let partition_values = self.snapshot.metadata().partition_columns().clone(); let schema = self.snapshot.input_schema(); diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index d22f2b6088..a27fac61b1 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -35,11 +35,11 @@ use std::time::Instant; use arrow_schema::{DataType, Field, SchemaBuilder}; use async_trait::async_trait; +use datafusion::catalog::Session; use datafusion::common::tree_node::{Transformed, TreeNode}; use datafusion::common::{plan_err, Column, DFSchema, ExprSchema, ScalarValue, TableReference}; use datafusion::datasource::provider_as_source; use datafusion::error::Result as DataFusionResult; -use datafusion::execution::context::SessionConfig; use datafusion::execution::session_state::SessionStateBuilder; use datafusion::logical_expr::build_join_schema; use datafusion::logical_expr::execution_props::ExecutionProps; @@ -77,7 +77,7 @@ use crate::delta_datafusion::physical::{find_metric_node, get_metric, MetricObse use crate::delta_datafusion::planner::DeltaPlanner; use crate::delta_datafusion::{ register_store, DataFusionMixins, DeltaColumn, DeltaScan, DeltaScanConfigBuilder, - DeltaSessionConfig, DeltaTableProvider, + DeltaSessionContext, DeltaTableProvider, }; use crate::kernel::schema::cast::{merge_arrow_field, merge_arrow_schema}; use crate::kernel::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; @@ -144,7 +144,7 @@ pub struct MergeBuilder { /// Delta object store for handling data files log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan - state: Option, + state: Option>, /// Properties passed to underlying parquet writer for when files are rewritten writer_properties: Option, /// Additional information to add to the commit @@ -384,7 +384,7 @@ impl MergeBuilder { } /// The Datafusion session state to use - pub fn with_session_state(mut self, state: SessionState) -> Self { + pub fn with_session_state(mut self, state: Arc) -> Self { self.state = Some(state); self } @@ -1540,15 +1540,16 @@ impl std::future::IntoFuture for MergeBuilder { let operation_id = this.get_operation_id(); this.pre_execute(operation_id).await?; - let state = this.state.unwrap_or_else(|| { - let config: SessionConfig = DeltaSessionConfig::default().into(); - let session = SessionContext::new_with_config(config); + let state = this + .state + .map(|state| state.as_any().downcast_ref::().cloned()) + .flatten() + .unwrap_or_else(|| { + let session: SessionContext = DeltaSessionContext::default().into(); + session.state() + }); - // If a user provides their own their DF state then they must register the store themselves - register_store(this.log_store.clone(), session.runtime_env()); - - session.state() - }); + register_store(this.log_store.clone(), state.runtime_env().as_ref()); let (snapshot, metrics) = execute( this.predicate, diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index af4148abca..3d2eafc2be 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -423,8 +423,7 @@ pub(crate) fn get_target_file_size( #[cfg(feature = "datafusion")] mod datafusion_utils { - use datafusion::common::DFSchema; - use datafusion::execution::context::SessionState; + use datafusion::{catalog::Session, common::DFSchema}; use datafusion::logical_expr::Expr; use crate::{delta_datafusion::expr::parse_predicate_expression, DeltaResult}; @@ -458,21 +457,21 @@ mod datafusion_utils { pub(crate) fn into_expr( expr: Expression, schema: &DFSchema, - df_state: &SessionState, + session: &dyn Session, ) -> DeltaResult { match expr { Expression::DataFusion(expr) => Ok(expr), - Expression::String(s) => parse_predicate_expression(schema, s, df_state), + Expression::String(s) => parse_predicate_expression(schema, s, session), } } pub(crate) fn maybe_into_expr( expr: Option, schema: &DFSchema, - df_state: &SessionState, + session: &dyn Session, ) -> DeltaResult> { Ok(match expr { - Some(predicate) => Some(into_expr(predicate, schema, df_state)?), + Some(predicate) => Some(into_expr(predicate, schema, session)?), None => None, }) } diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index d3ced85e54..95e5071a0c 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -27,6 +27,7 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use arrow_array::RecordBatch; use arrow_schema::SchemaRef as ArrowSchemaRef; +use datafusion::catalog::Session; use datafusion::execution::context::SessionState; use datafusion::execution::memory_pool::FairSpillPool; use datafusion::execution::runtime_env::RuntimeEnvBuilder; @@ -50,7 +51,7 @@ use uuid::Uuid; use super::write::writer::{PartitionWriter, PartitionWriterConfig}; use super::{CustomExecuteHandler, Operation}; -use crate::delta_datafusion::DeltaTableProvider; +use crate::delta_datafusion::{DeltaSessionContext, DeltaTableProvider}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::transaction::{CommitBuilder, CommitProperties, DEFAULT_RETRIES, PROTOCOL}; use crate::kernel::EagerSnapshot; @@ -219,7 +220,7 @@ pub struct OptimizeBuilder<'a> { /// Optimize type optimize_type: OptimizeType, /// Datafusion session state relevant for executing the input plan - state: Option, + state: Option>, min_commit_interval: Option, custom_execute_handler: Option>, } @@ -318,7 +319,7 @@ impl<'a> OptimizeBuilder<'a> { } /// The Datafusion session state to use - pub fn with_session_state(mut self, state: SessionState) -> Self { + pub fn with_session_state(mut self, state: Arc) -> Self { self.state = Some(state); self } @@ -345,6 +346,20 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { .set_created_by(format!("delta-rs version {}", crate_version())) .build() }); + let state = this + .state + .and_then(|state| state.as_any().downcast_ref::().cloned()) + .unwrap_or_else(|| { + let memory_pool = FairSpillPool::new(this.max_spill_size); + let runtime = RuntimeEnvBuilder::new() + .with_memory_pool(Arc::new(memory_pool)) + .build_arc() + .unwrap(); + SessionStateBuilder::new() + .with_default_features() + .with_runtime_env(runtime) + .build() + }); let plan = create_merge_plan( &this.log_store, this.optimize_type, @@ -352,7 +367,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { this.filters, this.target_size.to_owned(), writer_properties, - this.state, + state, ) .await?; @@ -361,7 +376,6 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { this.log_store.clone(), &this.snapshot, this.max_concurrent_tasks, - this.max_spill_size, this.min_commit_interval, this.commit_properties.clone(), operation_id, @@ -448,7 +462,7 @@ enum OptimizeOperations { ZOrder( Vec, HashMap, MergeBin)>, - Box>, + Box, ), // TODO: Sort } @@ -623,8 +637,6 @@ impl MergePlan { log_store: LogStoreRef, snapshot: &EagerSnapshot, max_concurrent_tasks: usize, - #[allow(unused_variables)] // used behind a feature flag - max_spill_size: usize, min_commit_interval: Option, commit_properties: CommitProperties, operation_id: Uuid, @@ -676,22 +688,9 @@ impl MergePlan { OptimizeOperations::ZOrder(zorder_columns, bins, state) => { debug!("Starting zorder with the columns: {zorder_columns:?} {bins:?}"); - let state = if let Some(state) = *state { - state - } else { - let memory_pool = FairSpillPool::new(max_spill_size); - let runtime = RuntimeEnvBuilder::new() - .with_memory_pool(Arc::new(memory_pool)) - .build_arc()?; - SessionStateBuilder::new() - .with_default_features() - .with_runtime_env(runtime) - .build() - }; - let exec_context = Arc::new(zorder::ZOrderExecContext::new( zorder_columns, - state, + *state, object_store, )?); let task_parameters = self.task_parameters.clone(); @@ -831,7 +830,7 @@ pub async fn create_merge_plan( filters: &[PartitionFilter], target_size: Option, writer_properties: WriterProperties, - state: Option, + state: SessionState, ) -> Result { let target_size = target_size.unwrap_or_else(|| snapshot.table_properties().target_file_size().get()); @@ -1009,7 +1008,7 @@ async fn build_zorder_plan( snapshot: &EagerSnapshot, partition_keys: &[String], filters: &[PartitionFilter], - state: Option, + state: SessionState, ) -> Result<(OptimizeOperations, Metrics), DeltaTableError> { if zorder_columns.is_empty() { return Err(DeltaTableError::Generic( diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 9cfb9a9326..1af4631941 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -25,11 +25,14 @@ use std::{ }; use async_trait::async_trait; -use datafusion::common::{Column, ScalarValue}; use datafusion::error::Result as DataFusionResult; use datafusion::logical_expr::{ case, col, lit, when, Expr, Extension, LogicalPlan, LogicalPlanBuilder, UserDefinedLogicalNode, }; +use datafusion::{ + catalog::Session, + common::{Column, ScalarValue}, +}; use datafusion::{ dataframe::DataFrame, datasource::provider_as_source, @@ -92,7 +95,7 @@ pub struct UpdateBuilder { /// Delta object store for handling data files log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan - state: Option, + state: Option>, /// Properties passed to underlying parquet writer for when files are rewritten writer_properties: Option, /// Additional information to add to the commit @@ -162,7 +165,7 @@ impl UpdateBuilder { } /// The Datafusion session state to use - pub fn with_session_state(mut self, state: SessionState) -> Self { + pub fn with_session_state(mut self, state: Arc) -> Self { self.state = Some(state); self } @@ -506,14 +509,14 @@ impl std::future::IntoFuture for UpdateBuilder { let operation_id = this.get_operation_id(); this.pre_execute(operation_id).await?; - let state = this.state.unwrap_or_else(|| { - let session: SessionContext = DeltaSessionContext::default().into(); - - // If a user provides their own their DF state then they must register the store themselves - register_store(this.log_store.clone(), session.runtime_env()); - - session.state() - }); + let state = this + .state + .and_then(|state| state.as_any().downcast_ref::().cloned()) + .unwrap_or_else(|| { + let session: SessionContext = DeltaSessionContext::default().into(); + session.state() + }); + register_store(this.log_store.clone(), state.runtime_env().as_ref()); let (snapshot, metrics) = execute( this.predicate, diff --git a/crates/core/src/operations/write/mod.rs b/crates/core/src/operations/write/mod.rs index 68d1d79a8a..4600049acb 100644 --- a/crates/core/src/operations/write/mod.rs +++ b/crates/core/src/operations/write/mod.rs @@ -44,7 +44,7 @@ use std::time::{Instant, SystemTime, UNIX_EPOCH}; use std::vec; use arrow_array::RecordBatch; -use datafusion::catalog::TableProvider; +use datafusion::catalog::{Session, TableProvider}; use datafusion::common::{Column, DFSchema, Result, ScalarValue}; use datafusion::datasource::MemTable; use datafusion::execution::context::{SessionContext, SessionState}; @@ -136,7 +136,7 @@ pub struct WriteBuilder { /// The input plan input: Option>, /// Datafusion session state relevant for executing the input plan - state: Option, + state: Option>, /// SaveMode defines how to treat data already written to table location mode: SaveMode, /// Column names for table partitioning @@ -249,12 +249,12 @@ impl WriteBuilder { /// A session state accompanying a given input plan, containing e.g. registered object stores #[deprecated(since = "0.29.0", note = "Use `with_session_state` instead")] pub fn with_input_session_state(mut self, state: SessionState) -> Self { - self.state = Some(state); + self.state = Some(Arc::new(state)); self } /// The Datafusion session state to use - pub fn with_session_state(mut self, state: SessionState) -> Self { + pub fn with_session_state(mut self, state: Arc) -> Self { self.state = Some(state); self } @@ -440,19 +440,16 @@ impl std::future::IntoFuture for WriteBuilder { let partition_columns = this.get_partition_columns()?; - let state = match this.state { - Some(state) => SessionStateBuilder::new_from_existing(state.clone()) - .with_query_planner(write_planner.clone()) - .build(), - None => { - let state = SessionStateBuilder::new() - .with_default_features() - .with_query_planner(write_planner) - .build(); - register_store(this.log_store.clone(), state.runtime_env().clone()); - state - } - }; + let state = this + .state + .map(|state| state.as_any().downcast_ref::().cloned()) + .flatten() + .map(|state| SessionStateBuilder::new_from_existing(state)) + .unwrap_or_default() + .with_query_planner(write_planner) + .build(); + register_store(this.log_store.clone(), state.runtime_env().as_ref()); + let mut schema_drift = false; let mut generated_col_exp = None; let mut missing_gen_col = None; diff --git a/crates/core/tests/command_optimize.rs b/crates/core/tests/command_optimize.rs index 725fa81b5b..da3a00b429 100644 --- a/crates/core/tests/command_optimize.rs +++ b/crates/core/tests/command_optimize.rs @@ -4,6 +4,8 @@ use std::{error::Error, sync::Arc}; use arrow_array::{Int32Array, RecordBatch, StringArray}; use arrow_schema::{DataType as ArrowDataType, Field, Schema as ArrowSchema}; use arrow_select::concat::concat_batches; +use datafusion::prelude::SessionContext; +use deltalake_core::delta_datafusion::DeltaSessionContext; use deltalake_core::ensure_table_uri; use deltalake_core::errors::DeltaTableError; use deltalake_core::kernel::transaction::{CommitBuilder, CommitProperties}; @@ -289,6 +291,8 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box> { let version = dt.version().unwrap(); + let df_context: SessionContext = DeltaSessionContext::default().into(); + //create the merge plan, remove a file, and execute the plan. let filter = vec![PartitionFilter::try_from(("date", "=", "2022-05-22"))?]; let plan = create_merge_plan( @@ -298,7 +302,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box> { &filter, None, WriterProperties::builder().build(), - None, + df_context.state(), ) .await?; @@ -319,7 +323,6 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box> { dt.log_store(), dt.snapshot()?.snapshot(), 1, - 20, None, CommitProperties::default(), Uuid::new_v4(), @@ -356,6 +359,8 @@ async fn test_no_conflict_for_append_actions() -> Result<(), Box> { let version = dt.version().unwrap(); + let df_context: SessionContext = DeltaSessionContext::default().into(); + let filter = vec![PartitionFilter::try_from(("date", "=", "2022-05-22"))?]; let plan = create_merge_plan( &dt.log_store(), @@ -364,7 +369,7 @@ async fn test_no_conflict_for_append_actions() -> Result<(), Box> { &filter, None, WriterProperties::builder().build(), - None, + df_context.state(), ) .await?; @@ -384,7 +389,6 @@ async fn test_no_conflict_for_append_actions() -> Result<(), Box> { dt.log_store(), dt.snapshot()?.snapshot(), 1, - 20, None, CommitProperties::default(), Uuid::new_v4(), @@ -420,6 +424,8 @@ async fn test_commit_interval() -> Result<(), Box> { let version = dt.version().unwrap(); + let context: SessionContext = DeltaSessionContext::default().into(); + let plan = create_merge_plan( &dt.log_store(), OptimizeType::Compact, @@ -427,7 +433,7 @@ async fn test_commit_interval() -> Result<(), Box> { &[], None, WriterProperties::builder().build(), - None, + context.state(), ) .await?; @@ -436,7 +442,6 @@ async fn test_commit_interval() -> Result<(), Box> { dt.log_store(), dt.snapshot()?.snapshot(), 1, - 20, Some(Duration::from_secs(0)), // this will cause as many commits as num_files_added CommitProperties::default(), Uuid::new_v4(), diff --git a/crates/core/tests/integration_datafusion.rs b/crates/core/tests/integration_datafusion.rs index 7b8ded6164..e6ded8302a 100644 --- a/crates/core/tests/integration_datafusion.rs +++ b/crates/core/tests/integration_datafusion.rs @@ -436,7 +436,7 @@ mod local { target_table.snapshot().ok().map(|s| s.snapshot()).cloned(), ) .with_input_execution_plan(source_scan) - .with_session_state(state) + .with_session_state(Arc::new(state)) .await?; ctx.register_table("target", Arc::new(target_table))?; From 5c22d6e7828a85a82f241d58efc4db0b5844c92e Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Sun, 5 Oct 2025 17:19:03 -0400 Subject: [PATCH 3/6] fmt Signed-off-by: Abhi Agarwal --- crates/core/src/operations/constraints.rs | 2 +- crates/core/src/operations/mod.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/core/src/operations/constraints.rs b/crates/core/src/operations/constraints.rs index 786a4cbc23..c0d4487ffc 100644 --- a/crates/core/src/operations/constraints.rs +++ b/crates/core/src/operations/constraints.rs @@ -156,7 +156,7 @@ impl std::future::IntoFuture for ConstraintBuilder { for p in 0..plan.properties().output_partitioning().partition_count() { let inner_plan = plan.clone(); let inner_checker = checker.clone(); - let task_ctx= Arc::new((&session).into()); + let task_ctx = Arc::new((&session).into()); let mut record_stream: SendableRecordBatchStream = inner_plan.execute(p, task_ctx)?; let handle: tokio::task::JoinHandle> = diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index 3d2eafc2be..cb6e9dcb83 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -423,8 +423,8 @@ pub(crate) fn get_target_file_size( #[cfg(feature = "datafusion")] mod datafusion_utils { - use datafusion::{catalog::Session, common::DFSchema}; use datafusion::logical_expr::Expr; + use datafusion::{catalog::Session, common::DFSchema}; use crate::{delta_datafusion::expr::parse_predicate_expression, DeltaResult}; From 6f805a28ffc79eaaf8b31c6c0e93aad375816091 Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Thu, 9 Oct 2025 11:36:50 -0400 Subject: [PATCH 4/6] Unused imports Signed-off-by: Abhi Agarwal --- crates/core/src/delta_datafusion/find_files.rs | 2 +- crates/core/src/delta_datafusion/mod.rs | 3 +-- crates/core/src/lib.rs | 1 - crates/core/src/operations/constraints.rs | 2 +- crates/core/src/operations/merge/mod.rs | 3 +-- crates/core/src/operations/optimize.rs | 2 +- crates/core/src/operations/write/mod.rs | 5 ++--- 7 files changed, 7 insertions(+), 11 deletions(-) diff --git a/crates/core/src/delta_datafusion/find_files.rs b/crates/core/src/delta_datafusion/find_files.rs index 7c5174863a..8561612282 100644 --- a/crates/core/src/delta_datafusion/find_files.rs +++ b/crates/core/src/delta_datafusion/find_files.rs @@ -4,8 +4,8 @@ use std::sync::Arc; use arrow_array::{Array, RecordBatch, StringArray}; use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, Schema as ArrowSchema}; -use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion::catalog::Session; +use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor}; use datafusion::datasource::MemTable; use datafusion::execution::context::{SessionContext, TaskContext}; use datafusion::logical_expr::{col, Expr, Volatility}; diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 0b54d8d6f7..88ff36f266 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -42,7 +42,7 @@ use datafusion::common::{ }; use datafusion::datasource::physical_plan::wrap_partition_type_in_dict; use datafusion::datasource::{MemTable, TableProvider}; -use datafusion::execution::context::{SessionConfig, SessionContext, SessionState}; +use datafusion::execution::context::{SessionConfig, SessionContext}; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::execution::FunctionRegistry; use datafusion::logical_expr::logical_plan::CreateExternalTable; @@ -55,7 +55,6 @@ use datafusion::sql::planner::ParserOptions; use datafusion_proto::logical_plan::LogicalExtensionCodec; use datafusion_proto::physical_plan::PhysicalExtensionCodec; use delta_kernel::engine::arrow_conversion::TryIntoArrow as _; -use delta_kernel::table_configuration::TableConfiguration; use either::Either; use itertools::Itertools; use url::Url; diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 58d0958e0a..e6599dc1d5 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -198,7 +198,6 @@ mod tests { use super::*; use crate::table::PeekCommit; - use std::collections::HashMap; #[tokio::test] async fn read_delta_2_0_table_without_version() { diff --git a/crates/core/src/operations/constraints.rs b/crates/core/src/operations/constraints.rs index c0d4487ffc..e048f3db1e 100644 --- a/crates/core/src/operations/constraints.rs +++ b/crates/core/src/operations/constraints.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use datafusion::catalog::Session; use datafusion::common::ToDFSchema; -use datafusion::execution::{SendableRecordBatchStream, SessionState, TaskContext}; +use datafusion::execution::{SendableRecordBatchStream, SessionState}; use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::SessionContext; use delta_kernel::table_features::WriterFeature; diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index a27fac61b1..636a103b2b 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -1542,8 +1542,7 @@ impl std::future::IntoFuture for MergeBuilder { let state = this .state - .map(|state| state.as_any().downcast_ref::().cloned()) - .flatten() + .and_then(|state| state.as_any().downcast_ref::().cloned()) .unwrap_or_else(|| { let session: SessionContext = DeltaSessionContext::default().into(); session.state() diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index 95e5071a0c..437f32eb6f 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -51,7 +51,7 @@ use uuid::Uuid; use super::write::writer::{PartitionWriter, PartitionWriterConfig}; use super::{CustomExecuteHandler, Operation}; -use crate::delta_datafusion::{DeltaSessionContext, DeltaTableProvider}; +use crate::delta_datafusion::DeltaTableProvider; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::transaction::{CommitBuilder, CommitProperties, DEFAULT_RETRIES, PROTOCOL}; use crate::kernel::EagerSnapshot; diff --git a/crates/core/src/operations/write/mod.rs b/crates/core/src/operations/write/mod.rs index 4600049acb..9e1636cb8d 100644 --- a/crates/core/src/operations/write/mod.rs +++ b/crates/core/src/operations/write/mod.rs @@ -442,9 +442,8 @@ impl std::future::IntoFuture for WriteBuilder { let state = this .state - .map(|state| state.as_any().downcast_ref::().cloned()) - .flatten() - .map(|state| SessionStateBuilder::new_from_existing(state)) + .and_then(|state| state.as_any().downcast_ref::().cloned()) + .map(SessionStateBuilder::new_from_existing) .unwrap_or_default() .with_query_planner(write_planner) .build(); From 4adcecf9b19dc8a80c9c39e3a8fde5a19ce32c18 Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Thu, 9 Oct 2025 11:45:16 -0400 Subject: [PATCH 5/6] remame `state` -> `session` Signed-off-by: Abhi Agarwal --- .../core/src/delta_datafusion/find_files.rs | 14 +++---- crates/core/src/operations/constraints.rs | 12 +++--- crates/core/src/operations/delete.rs | 12 +++--- crates/core/src/operations/load.rs | 18 ++++----- crates/core/src/operations/load_cdf.rs | 8 ++-- crates/core/src/operations/optimize.rs | 28 +++++++------- crates/core/src/operations/update.rs | 38 +++++++++---------- crates/core/src/operations/write/execution.rs | 29 ++++++++------ crates/core/src/operations/write/mod.rs | 30 +++++++-------- 9 files changed, 96 insertions(+), 93 deletions(-) diff --git a/crates/core/src/delta_datafusion/find_files.rs b/crates/core/src/delta_datafusion/find_files.rs index 8561612282..bbf591f0e0 100644 --- a/crates/core/src/delta_datafusion/find_files.rs +++ b/crates/core/src/delta_datafusion/find_files.rs @@ -34,7 +34,7 @@ pub(crate) struct FindFiles { pub(crate) async fn find_files( snapshot: &EagerSnapshot, log_store: LogStoreRef, - state: &dyn Session, + session: &dyn Session, predicate: Option, ) -> DeltaResult { let current_metadata = snapshot.metadata(); @@ -59,7 +59,7 @@ pub(crate) async fn find_files( }) } else { let candidates = - find_files_scan(snapshot, log_store, state, predicate.to_owned()).await?; + find_files_scan(snapshot, log_store, session, predicate.to_owned()).await?; Ok(FindFiles { candidates, @@ -191,7 +191,7 @@ fn join_batches_with_add_actions( async fn find_files_scan( snapshot: &EagerSnapshot, log_store: LogStoreRef, - state: &dyn Session, + session: &dyn Session, expression: Expr, ) -> DeltaResult> { let candidate_map: HashMap = snapshot @@ -216,7 +216,7 @@ async fn find_files_scan( // Add path column used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?); - let scan = DeltaScanBuilder::new(snapshot, log_store, state) + let scan = DeltaScanBuilder::new(snapshot, log_store, session) .with_filter(Some(expression.clone())) .with_projection(Some(&used_columns)) .with_scan_config(scan_config) @@ -228,14 +228,14 @@ async fn find_files_scan( let input_schema = scan.logical_schema.as_ref().to_owned(); let input_dfschema = input_schema.clone().try_into()?; - let predicate_expr = - state.create_physical_expr(Expr::IsTrue(Box::new(expression.clone())), &input_dfschema)?; + let predicate_expr = session + .create_physical_expr(Expr::IsTrue(Box::new(expression.clone())), &input_dfschema)?; let filter: Arc = Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?); let limit: Arc = Arc::new(LocalLimitExec::new(filter, 1)); - let task_ctx = Arc::new(TaskContext::from(state)); + let task_ctx = Arc::new(TaskContext::from(session)); let path_batches = datafusion::physical_plan::collect(limit, task_ctx).await?; join_batches_with_add_actions( diff --git a/crates/core/src/operations/constraints.rs b/crates/core/src/operations/constraints.rs index e048f3db1e..fcd60985ad 100644 --- a/crates/core/src/operations/constraints.rs +++ b/crates/core/src/operations/constraints.rs @@ -36,7 +36,7 @@ pub struct ConstraintBuilder { /// Delta object store for handling data files log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan - state: Option>, + session: Option>, /// Additional information to add to the commit commit_properties: CommitProperties, custom_execute_handler: Option>, @@ -59,7 +59,7 @@ impl ConstraintBuilder { expr: None, snapshot, log_store, - state: None, + session: None, commit_properties: CommitProperties::default(), custom_execute_handler: None, } @@ -77,8 +77,8 @@ impl ConstraintBuilder { } /// The Datafusion session state to use - pub fn with_session_state(mut self, state: Arc) -> Self { - self.state = Some(state); + pub fn with_session_state(mut self, session: Arc) -> Self { + self.session = Some(session); self } @@ -131,8 +131,8 @@ impl std::future::IntoFuture for ConstraintBuilder { } let session = this - .state - .and_then(|state| state.as_any().downcast_ref::().cloned()) + .session + .and_then(|session| session.as_any().downcast_ref::().cloned()) .unwrap_or_else(|| { let session: SessionContext = DeltaSessionContext::default().into(); session.state() diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index e5609c9a40..d7e1aa46b6 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -78,7 +78,7 @@ pub struct DeleteBuilder { /// Delta object store for handling data files log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan - state: Option>, + session: Option>, /// Properties passed to underlying parquet writer for when files are rewritten writer_properties: Option, /// Commit properties and configuration @@ -132,7 +132,7 @@ impl DeleteBuilder { predicate: None, snapshot, log_store, - state: None, + session: None, commit_properties: CommitProperties::default(), writer_properties: None, custom_execute_handler: None, @@ -146,8 +146,8 @@ impl DeleteBuilder { } /// The Datafusion session state to use - pub fn with_session_state(mut self, state: Arc) -> Self { - self.state = Some(state); + pub fn with_session_state(mut self, session: Arc) -> Self { + self.session = Some(session); self } @@ -436,8 +436,8 @@ impl std::future::IntoFuture for DeleteBuilder { this.pre_execute(operation_id).await?; let session = this - .state - .and_then(|state| state.as_any().downcast_ref::().cloned()) + .session + .and_then(|session| session.as_any().downcast_ref::().cloned()) .unwrap_or_else(|| { let session: SessionContext = DeltaSessionContext::default().into(); session.state() diff --git a/crates/core/src/operations/load.rs b/crates/core/src/operations/load.rs index de20213465..b5e8ca9198 100644 --- a/crates/core/src/operations/load.rs +++ b/crates/core/src/operations/load.rs @@ -26,7 +26,7 @@ pub struct LoadBuilder { /// A sub-selection of columns to be loaded columns: Option>, /// Datafusion session state relevant for executing the input plan - state: Option>, + session: Option>, } impl std::fmt::Debug for LoadBuilder { @@ -54,7 +54,7 @@ impl LoadBuilder { snapshot, log_store, columns: None, - state: None, + session: None, } } @@ -65,8 +65,8 @@ impl LoadBuilder { } /// The Datafusion session state to use - pub fn with_session_state(mut self, state: Arc) -> Self { - self.state = Some(state); + pub fn with_session_state(mut self, session: Arc) -> Self { + self.session = Some(session); self } } @@ -106,18 +106,18 @@ impl std::future::IntoFuture for LoadBuilder { }) .transpose()?; - let state = this - .state - .and_then(|state| state.as_any().downcast_ref::().cloned()) + let session = this + .session + .and_then(|session| session.as_any().downcast_ref::().cloned()) .unwrap_or_else(|| { SessionStateBuilder::new() .with_default_features() .with_config(DeltaSessionConfig::default().into()) .build() }); - let scan_plan = table.scan(&state, projection.as_ref(), &[], None).await?; + let scan_plan = table.scan(&session, projection.as_ref(), &[], None).await?; let plan = CoalescePartitionsExec::new(scan_plan); - let task_ctx = Arc::new(TaskContext::from(&state)); + let task_ctx = Arc::new(TaskContext::from(&session)); let stream = plan.execute(0, task_ctx)?; Ok((table, stream)) diff --git a/crates/core/src/operations/load_cdf.rs b/crates/core/src/operations/load_cdf.rs index d2e232d708..798b8abee1 100644 --- a/crates/core/src/operations/load_cdf.rs +++ b/crates/core/src/operations/load_cdf.rs @@ -56,7 +56,7 @@ pub struct CdfLoadBuilder { /// Enable ending version or timestamp exceeding the last commit allow_out_of_range: bool, /// Datafusion session state relevant for executing the input plan - state: Option>, + session: Option>, } impl std::fmt::Debug for CdfLoadBuilder { @@ -84,7 +84,7 @@ impl CdfLoadBuilder { starting_timestamp: None, ending_timestamp: None, allow_out_of_range: false, - state: None, + session: None, } } @@ -119,8 +119,8 @@ impl CdfLoadBuilder { } /// The Datafusion session state to use - pub fn with_session_state(mut self, state: Arc) -> Self { - self.state = Some(state); + pub fn with_session_state(mut self, session: Arc) -> Self { + self.session = Some(session); self } diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index 437f32eb6f..840ca96762 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -220,7 +220,7 @@ pub struct OptimizeBuilder<'a> { /// Optimize type optimize_type: OptimizeType, /// Datafusion session state relevant for executing the input plan - state: Option>, + session: Option>, min_commit_interval: Option, custom_execute_handler: Option>, } @@ -249,7 +249,7 @@ impl<'a> OptimizeBuilder<'a> { max_spill_size: 20 * 1024 * 1024 * 1024, // 20 GB. optimize_type: OptimizeType::Compact, min_commit_interval: None, - state: None, + session: None, custom_execute_handler: None, } } @@ -319,8 +319,8 @@ impl<'a> OptimizeBuilder<'a> { } /// The Datafusion session state to use - pub fn with_session_state(mut self, state: Arc) -> Self { - self.state = Some(state); + pub fn with_session_state(mut self, session: Arc) -> Self { + self.session = Some(session); self } } @@ -346,9 +346,9 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { .set_created_by(format!("delta-rs version {}", crate_version())) .build() }); - let state = this - .state - .and_then(|state| state.as_any().downcast_ref::().cloned()) + let session = this + .session + .and_then(|session| session.as_any().downcast_ref::().cloned()) .unwrap_or_else(|| { let memory_pool = FairSpillPool::new(this.max_spill_size); let runtime = RuntimeEnvBuilder::new() @@ -367,7 +367,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { this.filters, this.target_size.to_owned(), writer_properties, - state, + session, ) .await?; @@ -830,7 +830,7 @@ pub async fn create_merge_plan( filters: &[PartitionFilter], target_size: Option, writer_properties: WriterProperties, - state: SessionState, + session: SessionState, ) -> Result { let target_size = target_size.unwrap_or_else(|| snapshot.table_properties().target_file_size().get()); @@ -847,7 +847,7 @@ pub async fn create_merge_plan( snapshot, partitions_keys, filters, - state, + session, ) .await? } @@ -1008,7 +1008,7 @@ async fn build_zorder_plan( snapshot: &EagerSnapshot, partition_keys: &[String], filters: &[PartitionFilter], - state: SessionState, + session: SessionState, ) -> Result<(OptimizeOperations, Metrics), DeltaTableError> { if zorder_columns.is_empty() { return Err(DeltaTableError::Generic( @@ -1065,7 +1065,7 @@ async fn build_zorder_plan( debug!("partition_files inside the zorder plan: {partition_files:?}"); } - let operation = OptimizeOperations::ZOrder(zorder_columns, partition_files, Box::new(state)); + let operation = OptimizeOperations::ZOrder(zorder_columns, partition_files, Box::new(session)); Ok((operation, metrics)) } @@ -1127,12 +1127,12 @@ pub(super) mod zorder { impl ZOrderExecContext { pub fn new( columns: Vec, - session_state: SessionState, + session: SessionState, object_store_ref: ObjectStoreRef, ) -> Result { let columns = columns.into(); - let ctx = SessionContext::new_with_state(session_state); + let ctx = SessionContext::new_with_state(session); ctx.register_udf(ScalarUDF::from(datafusion::ZOrderUDF)); ctx.register_object_store(&Url::parse("delta-rs://").unwrap(), object_store_ref); Ok(Self { columns, ctx }) diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 1af4631941..5c5e2c74d6 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -95,7 +95,7 @@ pub struct UpdateBuilder { /// Delta object store for handling data files log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan - state: Option>, + session: Option>, /// Properties passed to underlying parquet writer for when files are rewritten writer_properties: Option, /// Additional information to add to the commit @@ -140,7 +140,7 @@ impl UpdateBuilder { updates: HashMap::new(), snapshot, log_store, - state: None, + session: None, writer_properties: None, commit_properties: CommitProperties::default(), safe_cast: false, @@ -165,8 +165,8 @@ impl UpdateBuilder { } /// The Datafusion session state to use - pub fn with_session_state(mut self, state: Arc) -> Self { - self.state = Some(state); + pub fn with_session_state(mut self, session: Arc) -> Self { + self.session = Some(session); self } @@ -253,7 +253,7 @@ async fn execute( updates: HashMap, log_store: LogStoreRef, snapshot: EagerSnapshot, - state: SessionState, + session: SessionState, writer_properties: Option, mut commit_properties: CommitProperties, _safe_cast: bool, @@ -273,7 +273,7 @@ async fn execute( // NOTE: The optimize_projections rule is being temporarily disabled because it errors with // our schemas for Lists due to issues discussed // [here](https://github.com/delta-io/delta-rs/pull/2886#issuecomment-2481550560> - let rules: Vec> = state + let rules: Vec> = session .optimizers() .iter() .filter(|rule| { @@ -281,13 +281,11 @@ async fn execute( }) .cloned() .collect(); - let state = SessionStateBuilder::from(state) - .with_optimizer_rules(rules) - .build(); let update_planner = DeltaPlanner::new(); - let state = SessionStateBuilder::new_from_existing(state) + let session = SessionStateBuilder::from(session) + .with_optimizer_rules(rules) .with_query_planner(update_planner) .build(); @@ -301,7 +299,7 @@ async fn execute( let predicate = match predicate { Some(predicate) => match predicate { Expression::DataFusion(expr) => Some(expr), - Expression::String(s) => Some(snapshot.parse_predicate_expression(s, &state)?), + Expression::String(s) => Some(snapshot.parse_predicate_expression(s, &session)?), }, None => None, }; @@ -311,7 +309,7 @@ async fn execute( .map(|(key, expr)| match expr { Expression::DataFusion(e) => Ok((key.name, e)), Expression::String(s) => snapshot - .parse_predicate_expression(s, &state) + .parse_predicate_expression(s, &session) .map(|e| (key.name, e)), }) .collect::, _>>()?; @@ -320,7 +318,7 @@ async fn execute( let table_partition_cols = current_metadata.partition_columns().clone(); let scan_start = Instant::now(); - let candidates = find_files(&snapshot, log_store.clone(), &state, predicate.clone()).await?; + let candidates = find_files(&snapshot, log_store.clone(), &session, predicate.clone()).await?; metrics.scan_time_ms = Instant::now().duration_since(scan_start).as_millis() as u64; if candidates.candidates.is_empty() { @@ -344,7 +342,7 @@ async fn execute( let target_provider = provider_as_source(target_provider); let plan = LogicalPlanBuilder::scan("target", target_provider.clone(), None)?.build()?; - let df = DataFrame::new(state.clone(), plan); + let df = DataFrame::new(session.clone(), plan); // Take advantage of how null counts are tracked in arrow arrays use the // null count to track how many records do NOT satisfy the predicate. The @@ -364,7 +362,7 @@ async fn execute( enable_pushdown: false, }), }); - let df_with_predicate_and_metrics = DataFrame::new(state.clone(), plan_with_metrics); + let df_with_predicate_and_metrics = DataFrame::new(session.clone(), plan_with_metrics); let expressions: Vec = df_with_predicate_and_metrics .schema() @@ -402,7 +400,7 @@ async fn execute( let add_actions = write_execution_plan( Some(&snapshot), - state.clone(), + session.clone(), physical_plan.clone(), table_partition_cols.clone(), log_store.object_store(Some(operation_id)).clone(), @@ -509,9 +507,9 @@ impl std::future::IntoFuture for UpdateBuilder { let operation_id = this.get_operation_id(); this.pre_execute(operation_id).await?; - let state = this - .state - .and_then(|state| state.as_any().downcast_ref::().cloned()) + let session = this + .session + .and_then(|session| session.as_any().downcast_ref::().cloned()) .unwrap_or_else(|| { let session: SessionContext = DeltaSessionContext::default().into(); session.state() @@ -523,7 +521,7 @@ impl std::future::IntoFuture for UpdateBuilder { this.updates, this.log_store.clone(), this.snapshot, - state, + session, this.writer_properties, this.commit_properties, this.safe_cast, diff --git a/crates/core/src/operations/write/execution.rs b/crates/core/src/operations/write/execution.rs index 0ffb5d2aba..011904a9b0 100644 --- a/crates/core/src/operations/write/execution.rs +++ b/crates/core/src/operations/write/execution.rs @@ -92,7 +92,7 @@ pub(crate) async fn write_execution_plan_cdc( #[allow(clippy::too_many_arguments)] pub(crate) async fn write_execution_plan( snapshot: Option<&EagerSnapshot>, - state: SessionState, + session: SessionState, plan: Arc, partition_columns: Vec, object_store: ObjectStoreRef, @@ -103,7 +103,7 @@ pub(crate) async fn write_execution_plan( ) -> DeltaResult> { let (actions, _) = write_execution_plan_v2( snapshot, - state, + session, plan, partition_columns, object_store, @@ -122,7 +122,7 @@ pub(crate) async fn write_execution_plan( pub(crate) async fn execute_non_empty_expr( snapshot: &EagerSnapshot, log_store: LogStoreRef, - state: SessionState, + session: SessionState, partition_columns: Vec, expression: &Expr, rewrite: &[Add], @@ -150,7 +150,7 @@ pub(crate) async fn execute_non_empty_expr( let source = LogicalPlanBuilder::scan("target", target_provider.clone(), None)?.build()?; // We don't want to verify the predicate against existing data - let df = DataFrame::new(state.clone(), source); + let df = DataFrame::new(session.clone(), source); let cdf_df = if !partition_scan { // Apply the negation of the filter and rewrite files @@ -164,7 +164,7 @@ pub(crate) async fn execute_non_empty_expr( let add_actions: Vec = write_execution_plan( Some(snapshot), - state.clone(), + session.clone(), filter, partition_columns.clone(), log_store.object_store(Some(operation_id)), @@ -201,20 +201,25 @@ pub(crate) async fn prepare_predicate_actions( predicate: Expr, log_store: LogStoreRef, snapshot: &EagerSnapshot, - state: SessionState, + session: SessionState, partition_columns: Vec, writer_properties: Option, deletion_timestamp: i64, writer_stats_config: WriterStatsConfig, operation_id: Uuid, ) -> DeltaResult<(Vec, Option)> { - let candidates = - find_files(snapshot, log_store.clone(), &state, Some(predicate.clone())).await?; + let candidates = find_files( + snapshot, + log_store.clone(), + &session, + Some(predicate.clone()), + ) + .await?; let (mut actions, cdf_df) = execute_non_empty_expr( snapshot, log_store, - state, + session, partition_columns, &predicate, &candidates.candidates, @@ -247,7 +252,7 @@ pub(crate) async fn prepare_predicate_actions( #[allow(clippy::too_many_arguments)] pub(crate) async fn write_execution_plan_v2( snapshot: Option<&EagerSnapshot>, - state: SessionState, + session: SessionState, plan: Arc, partition_columns: Vec, object_store: ObjectStoreRef, @@ -285,7 +290,7 @@ pub(crate) async fn write_execution_plan_v2( for i in 0..plan.properties().output_partitioning().partition_count() { let inner_plan = plan.clone(); let inner_schema = schema.clone(); - let task_ctx = Arc::new(TaskContext::from(&state)); + let task_ctx = Arc::new(TaskContext::from(&session)); let config = WriterConfig::new( inner_schema.clone(), partition_columns.clone(), @@ -351,7 +356,7 @@ pub(crate) async fn write_execution_plan_v2( .collect::>(), )); let cdf_schema = schema.clone(); - let task_ctx = Arc::new(TaskContext::from(&state)); + let task_ctx = Arc::new(TaskContext::from(&session)); let normal_config = WriterConfig::new( write_schema.clone(), partition_columns.clone(), diff --git a/crates/core/src/operations/write/mod.rs b/crates/core/src/operations/write/mod.rs index 9e1636cb8d..8e003014ab 100644 --- a/crates/core/src/operations/write/mod.rs +++ b/crates/core/src/operations/write/mod.rs @@ -136,7 +136,7 @@ pub struct WriteBuilder { /// The input plan input: Option>, /// Datafusion session state relevant for executing the input plan - state: Option>, + session: Option>, /// SaveMode defines how to treat data already written to table location mode: SaveMode, /// Column names for table partitioning @@ -195,7 +195,7 @@ impl WriteBuilder { snapshot, log_store, input: None, - state: None, + session: None, mode: SaveMode::Append, partition_columns: None, predicate: None, @@ -249,13 +249,13 @@ impl WriteBuilder { /// A session state accompanying a given input plan, containing e.g. registered object stores #[deprecated(since = "0.29.0", note = "Use `with_session_state` instead")] pub fn with_input_session_state(mut self, state: SessionState) -> Self { - self.state = Some(Arc::new(state)); + self.session = Some(Arc::new(state)); self } /// The Datafusion session state to use - pub fn with_session_state(mut self, state: Arc) -> Self { - self.state = Some(state); + pub fn with_session_state(mut self, session: Arc) -> Self { + self.session = Some(session); self } @@ -440,19 +440,19 @@ impl std::future::IntoFuture for WriteBuilder { let partition_columns = this.get_partition_columns()?; - let state = this - .state - .and_then(|state| state.as_any().downcast_ref::().cloned()) + let session = this + .session + .and_then(|session| session.as_any().downcast_ref::().cloned()) .map(SessionStateBuilder::new_from_existing) .unwrap_or_default() .with_query_planner(write_planner) .build(); - register_store(this.log_store.clone(), state.runtime_env().as_ref()); + register_store(this.log_store.clone(), session.runtime_env().as_ref()); let mut schema_drift = false; let mut generated_col_exp = None; let mut missing_gen_col = None; - let mut source = DataFrame::new(state.clone(), this.input.unwrap().as_ref().clone()); + let mut source = DataFrame::new(session.clone(), this.input.unwrap().as_ref().clone()); if let Some(snapshot) = &this.snapshot { if able_to_gc(snapshot)? { let generated_col_expressions = snapshot.schema().get_generated_columns()?; @@ -540,7 +540,7 @@ impl std::future::IntoFuture for WriteBuilder { source, &generated_columns_exp, &missing_generated_col, - &state, + &session, )?; } } @@ -553,7 +553,7 @@ impl std::future::IntoFuture for WriteBuilder { }), }); - let mut source = DataFrame::new(state.clone(), source); + let mut source = DataFrame::new(session.clone(), source); let schema = Arc::new(source.schema().as_arrow().clone()); @@ -601,7 +601,7 @@ impl std::future::IntoFuture for WriteBuilder { Expression::DataFusion(expr) => expr, Expression::String(s) => { let df_schema = DFSchema::try_from(schema.as_ref().to_owned())?; - parse_predicate_expression(&df_schema, s, &state)? + parse_predicate_expression(&df_schema, s, &session)? } }; (Some(fmt_expr_to_sql(&pred)?), Some(pred)) @@ -641,7 +641,7 @@ impl std::future::IntoFuture for WriteBuilder { pred.clone(), this.log_store.clone(), snapshot, - state.clone(), + session.clone(), partition_columns.clone(), this.writer_properties.clone(), deletion_timestamp, @@ -681,7 +681,7 @@ impl std::future::IntoFuture for WriteBuilder { // Here we need to validate if the new data conforms to a predicate if one is provided let (add_actions, _) = write_execution_plan_v2( this.snapshot.as_ref(), - state.clone(), + session.clone(), source_plan.clone(), partition_columns.clone(), this.log_store.object_store(Some(operation_id)).clone(), From 16f288c0fbab52ca3d65aca7e87c0b3451e1e1ab Mon Sep 17 00:00:00 2001 From: Abhi Agarwal Date: Thu, 9 Oct 2025 13:38:12 -0400 Subject: [PATCH 6/6] Fix a few stragglers Signed-off-by: Abhi Agarwal --- crates/core/src/operations/update.rs | 4 ++-- crates/core/src/operations/write/execution.rs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 5c5e2c74d6..dbc2c65196 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -462,7 +462,7 @@ async fn execute( Ok(df) => { let cdc_actions = write_execution_plan_cdc( Some(&snapshot), - state, + session, df.create_physical_plan().await?, table_partition_cols, log_store.object_store(Some(operation_id)), @@ -514,7 +514,7 @@ impl std::future::IntoFuture for UpdateBuilder { let session: SessionContext = DeltaSessionContext::default().into(); session.state() }); - register_store(this.log_store.clone(), state.runtime_env().as_ref()); + register_store(this.log_store.clone(), session.runtime_env().as_ref()); let (snapshot, metrics) = execute( this.predicate, diff --git a/crates/core/src/operations/write/execution.rs b/crates/core/src/operations/write/execution.rs index 011904a9b0..38f96dd0d3 100644 --- a/crates/core/src/operations/write/execution.rs +++ b/crates/core/src/operations/write/execution.rs @@ -45,7 +45,7 @@ pub(crate) struct WriteExecutionPlanMetrics { #[allow(clippy::too_many_arguments)] pub(crate) async fn write_execution_plan_cdc( snapshot: Option<&EagerSnapshot>, - state: SessionState, + session: SessionState, plan: Arc, partition_columns: Vec, object_store: ObjectStoreRef, @@ -58,7 +58,7 @@ pub(crate) async fn write_execution_plan_cdc( Ok(write_execution_plan( snapshot, - state, + session, plan, partition_columns, cdc_store,