Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@

//! Utility functions for Datafusion's Expressions
use std::fmt::{self, Display, Error, Formatter, Write};
use std::marker::PhantomData;
use std::sync::Arc;

use arrow_array::{Array, GenericListArray};
use arrow_schema::{DataType, Field};
use chrono::{DateTime, NaiveDate};
use datafusion::catalog::Session;
use datafusion::common::Result as DFResult;
use datafusion::common::{config::ConfigOptions, DFSchema, Result, ScalarValue, TableReference};
use datafusion::execution::context::SessionState;
Expand Down Expand Up @@ -186,30 +188,35 @@ impl ExprPlanner for CustomNestedFunctionPlanner {

pub(crate) struct DeltaContextProvider<'a> {
state: SessionState,
/// Keeping this around just to make use of the 'a lifetime
_original: &'a SessionState,
planners: Vec<Arc<dyn ExprPlanner>>,
/// Keeping this around just to make use of the 'a lifetime
_phantom: PhantomData<&'a SessionState>,
}

impl<'a> DeltaContextProvider<'a> {
fn new(state: &'a SessionState) -> Self {
fn new(session: &'a dyn Session) -> Self {
// default planners are [CoreFunctionPlanner, NestedFunctionPlanner, FieldAccessPlanner,
// UserDefinedFunctionPlanner]
let planners: Vec<Arc<dyn ExprPlanner>> = vec![
Arc::new(CoreFunctionPlanner::default()),
Arc::new(CustomNestedFunctionPlanner::default()),
Arc::new(FieldAccessPlanner),
Arc::new(datafusion::functions::planner::UserDefinedFunctionPlanner),
Arc::new(datafusion::functions::unicode::planner::UnicodeFunctionPlanner),
Arc::new(datafusion::functions::datetime::planner::DatetimeFunctionPlanner),
];
// Disable the above for testing
//let planners = state.expr_planners();
let new_state = SessionStateBuilder::new_from_existing(state.clone())
let new_state = session
.as_any()
.downcast_ref::<SessionState>()
.map(|state| SessionStateBuilder::new_from_existing(state.clone()))
.unwrap_or_default()
.with_expr_planners(planners.clone())
.build();
DeltaContextProvider {
planners,
state: new_state,
_original: state,
_phantom: PhantomData,
}
}
}
Expand Down Expand Up @@ -260,7 +267,7 @@ impl ContextProvider for DeltaContextProvider<'_> {
pub fn parse_predicate_expression(
schema: &DFSchema,
expr: impl AsRef<str>,
df_state: &SessionState,
session: &dyn Session,
) -> DeltaResult<Expr> {
let dialect = &GenericDialect {};
let mut tokenizer = Tokenizer::new(dialect, expr.as_ref());
Expand All @@ -276,7 +283,7 @@ pub fn parse_predicate_expression(
source: Box::new(err),
})?;

let context_provider = DeltaContextProvider::new(df_state);
let context_provider = DeltaContextProvider::new(session);
let sql_to_rel =
SqlToRel::new_with_options(&context_provider, DeltaParserOptions::default().into());

Expand Down
17 changes: 9 additions & 8 deletions crates/core/src/delta_datafusion/find_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use std::sync::Arc;

use arrow_array::{Array, RecordBatch, StringArray};
use arrow_schema::{ArrowError, DataType as ArrowDataType, Field, Schema as ArrowSchema};
use datafusion::catalog::Session;
use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion::datasource::MemTable;
use datafusion::execution::context::{SessionContext, SessionState, TaskContext};
use datafusion::execution::context::{SessionContext, TaskContext};
use datafusion::logical_expr::{col, Expr, Volatility};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::limit::LocalLimitExec;
Expand All @@ -33,7 +34,7 @@ pub(crate) struct FindFiles {
pub(crate) async fn find_files(
snapshot: &EagerSnapshot,
log_store: LogStoreRef,
state: &SessionState,
session: &dyn Session,
predicate: Option<Expr>,
) -> DeltaResult<FindFiles> {
let current_metadata = snapshot.metadata();
Expand All @@ -58,7 +59,7 @@ pub(crate) async fn find_files(
})
} else {
let candidates =
find_files_scan(snapshot, log_store, state, predicate.to_owned()).await?;
find_files_scan(snapshot, log_store, session, predicate.to_owned()).await?;

Ok(FindFiles {
candidates,
Expand Down Expand Up @@ -190,7 +191,7 @@ fn join_batches_with_add_actions(
async fn find_files_scan(
snapshot: &EagerSnapshot,
log_store: LogStoreRef,
state: &SessionState,
session: &dyn Session,
expression: Expr,
) -> DeltaResult<Vec<Add>> {
let candidate_map: HashMap<String, Add> = snapshot
Expand All @@ -215,7 +216,7 @@ async fn find_files_scan(
// Add path column
used_columns.push(logical_schema.index_of(scan_config.file_column_name.as_ref().unwrap())?);

let scan = DeltaScanBuilder::new(snapshot, log_store, state)
let scan = DeltaScanBuilder::new(snapshot, log_store, session)
.with_filter(Some(expression.clone()))
.with_projection(Some(&used_columns))
.with_scan_config(scan_config)
Expand All @@ -227,14 +228,14 @@ async fn find_files_scan(
let input_schema = scan.logical_schema.as_ref().to_owned();
let input_dfschema = input_schema.clone().try_into()?;

let predicate_expr =
state.create_physical_expr(Expr::IsTrue(Box::new(expression.clone())), &input_dfschema)?;
let predicate_expr = session
.create_physical_expr(Expr::IsTrue(Box::new(expression.clone())), &input_dfschema)?;

let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?);
let limit: Arc<dyn ExecutionPlan> = Arc::new(LocalLimitExec::new(filter, 1));

let task_ctx = Arc::new(TaskContext::from(state));
let task_ctx = Arc::new(TaskContext::from(session));
let path_batches = datafusion::physical_plan::collect(limit, task_ctx).await?;

join_batches_with_add_actions(
Expand Down
19 changes: 9 additions & 10 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use datafusion::common::{
};
use datafusion::datasource::physical_plan::wrap_partition_type_in_dict;
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::execution::context::{SessionConfig, SessionContext, SessionState};
use datafusion::execution::context::{SessionConfig, SessionContext};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::FunctionRegistry;
use datafusion::logical_expr::logical_plan::CreateExternalTable;
Expand All @@ -55,7 +55,6 @@ use datafusion::sql::planner::ParserOptions;
use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
use delta_kernel::table_configuration::TableConfiguration;
use either::Either;
use itertools::Itertools;
use url::Url;
Expand Down Expand Up @@ -126,7 +125,7 @@ pub trait DataFusionMixins {
fn parse_predicate_expression(
&self,
expr: impl AsRef<str>,
df_state: &SessionState,
session: &impl Session,
) -> DeltaResult<Expr>;
}

Expand All @@ -150,10 +149,10 @@ impl DataFusionMixins for Snapshot {
fn parse_predicate_expression(
&self,
expr: impl AsRef<str>,
df_state: &SessionState,
session: &impl Session,
) -> DeltaResult<Expr> {
let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
parse_predicate_expression(&schema, expr, df_state)
parse_predicate_expression(&schema, expr, session)
}
}

Expand Down Expand Up @@ -189,10 +188,10 @@ impl DataFusionMixins for LogDataHandler<'_> {
fn parse_predicate_expression(
&self,
expr: impl AsRef<str>,
df_state: &SessionState,
session: &impl Session,
) -> DeltaResult<Expr> {
let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
parse_predicate_expression(&schema, expr, df_state)
parse_predicate_expression(&schema, expr, session)
}
}

Expand All @@ -208,9 +207,9 @@ impl DataFusionMixins for EagerSnapshot {
fn parse_predicate_expression(
&self,
expr: impl AsRef<str>,
df_state: &SessionState,
session: &impl Session,
) -> DeltaResult<Expr> {
self.snapshot().parse_predicate_expression(expr, df_state)
self.snapshot().parse_predicate_expression(expr, session)
}
}

Expand Down Expand Up @@ -303,7 +302,7 @@ impl DeltaTableState {

// each delta table must register a specific object store, since paths are internally
// handled relative to the table root.
pub(crate) fn register_store(store: LogStoreRef, env: Arc<RuntimeEnv>) {
pub(crate) fn register_store(store: LogStoreRef, env: &RuntimeEnv) {
let object_store_url = store.object_store_url();
let url: &Url = object_store_url.as_ref();
env.register_object_store(url, store.object_store(None));
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/delta_datafusion/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ impl TableProvider for DeltaTable {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
register_store(self.log_store(), session.runtime_env().clone());
register_store(self.log_store(), session.runtime_env().as_ref());
let filter_expr = conjunction(filters.iter().cloned());

let scan = DeltaScanBuilder::new(self.snapshot()?.snapshot(), self.log_store(), session)
Expand Down Expand Up @@ -817,7 +817,7 @@ impl TableProvider for DeltaTableProvider {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
register_store(self.log_store.clone(), session.runtime_env().clone());
register_store(self.log_store.clone(), session.runtime_env().as_ref());
let filter_expr = conjunction(filters.iter().cloned());

let mut scan = DeltaScanBuilder::new(&self.snapshot, self.log_store.clone(), session)
Expand Down Expand Up @@ -855,7 +855,7 @@ impl TableProvider for DeltaTableProvider {
input: Arc<dyn ExecutionPlan>,
insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
register_store(self.log_store.clone(), state.runtime_env().clone());
register_store(self.log_store.clone(), state.runtime_env().as_ref());

let save_mode = match insert_op {
InsertOp::Append => SaveMode::Append,
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ mod tests {

use super::*;
use crate::table::PeekCommit;
use std::collections::HashMap;

#[tokio::test]
async fn read_delta_2_0_table_without_version() {
Expand Down
33 changes: 18 additions & 15 deletions crates/core/src/operations/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

use std::sync::Arc;

use datafusion::catalog::Session;
use datafusion::common::ToDFSchema;
use datafusion::execution::context::SessionState;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::execution::{SendableRecordBatchStream, SessionState};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use delta_kernel::table_features::WriterFeature;
Expand Down Expand Up @@ -36,7 +36,7 @@ pub struct ConstraintBuilder {
/// Delta object store for handling data files
log_store: LogStoreRef,
/// Datafusion session state relevant for executing the input plan
state: Option<SessionState>,
session: Option<Arc<dyn Session>>,
/// Additional information to add to the commit
commit_properties: CommitProperties,
custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
Expand All @@ -59,7 +59,7 @@ impl ConstraintBuilder {
expr: None,
snapshot,
log_store,
state: None,
session: None,
commit_properties: CommitProperties::default(),
custom_execute_handler: None,
}
Expand All @@ -76,9 +76,9 @@ impl ConstraintBuilder {
self
}

/// Specify the datafusion session context
pub fn with_session_state(mut self, state: SessionState) -> Self {
self.state = Some(state);
/// The Datafusion session state to use
pub fn with_session_state(mut self, session: Arc<dyn Session>) -> Self {
self.session = Some(session);
self
}

Expand Down Expand Up @@ -130,18 +130,21 @@ impl std::future::IntoFuture for ConstraintBuilder {
)));
}

let state = this.state.unwrap_or_else(|| {
let session: SessionContext = DeltaSessionContext::default().into();
register_store(this.log_store.clone(), session.runtime_env());
session.state()
});
let session = this
.session
.and_then(|session| session.as_any().downcast_ref::<SessionState>().cloned())
.unwrap_or_else(|| {
let session: SessionContext = DeltaSessionContext::default().into();
session.state()
});
register_store(this.log_store.clone(), session.runtime_env().as_ref());

let scan = DeltaScanBuilder::new(&this.snapshot, this.log_store.clone(), &state)
let scan = DeltaScanBuilder::new(&this.snapshot, this.log_store.clone(), &session)
.build()
.await?;

let schema = scan.schema().to_dfschema()?;
let expr = into_expr(expr, &schema, &state)?;
let expr = into_expr(expr, &schema, &session)?;
let expr_str = fmt_expr_to_sql(&expr)?;

// Checker built here with the one time constraint to check.
Expand All @@ -153,7 +156,7 @@ impl std::future::IntoFuture for ConstraintBuilder {
for p in 0..plan.properties().output_partitioning().partition_count() {
let inner_plan = plan.clone();
let inner_checker = checker.clone();
let task_ctx = Arc::new(TaskContext::from(&state));
let task_ctx = Arc::new((&session).into());
let mut record_stream: SendableRecordBatchStream =
inner_plan.execute(p, task_ctx)?;
let handle: tokio::task::JoinHandle<DeltaResult<()>> =
Expand Down
Loading
Loading