Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use datafusion::sql::sqlparser::parser::Parser;
use datafusion::sql::sqlparser::tokenizer::Tokenizer;
use tracing::log::*;

use super::DeltaParserOptions;
use crate::delta_datafusion::session::DeltaParserOptions;
use crate::{DeltaResult, DeltaTableError};

/// This struct is like Datafusion's MakeArray but ensures that `element` is used rather than `item
Expand Down
80 changes: 10 additions & 70 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
use std::fmt::Debug;
use std::sync::Arc;

use arrow_array::types::UInt16Type;
use arrow_array::{Array, DictionaryArray, RecordBatch, StringArray, TypedDictionaryArray};
use arrow::array::types::UInt16Type;
use arrow::array::{Array, DictionaryArray, RecordBatch, StringArray, TypedDictionaryArray};
use arrow_cast::display::array_value_to_string;
use arrow_cast::{cast_with_options, CastOptions};
use arrow_schema::{
Expand All @@ -41,7 +41,7 @@ use datafusion::common::{
};
use datafusion::datasource::physical_plan::wrap_partition_type_in_dict;
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::execution::context::{SessionConfig, SessionContext};
use datafusion::execution::context::SessionContext;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::execution::FunctionRegistry;
use datafusion::logical_expr::logical_plan::CreateExternalTable;
Expand All @@ -50,7 +50,6 @@ use datafusion::logical_expr::{Expr, Extension, LogicalPlan};
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion::physical_plan::{ExecutionPlan, Statistics};
use datafusion::sql::planner::ParserOptions;
use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
Expand All @@ -71,6 +70,7 @@ use crate::table::state::DeltaTableState;
use crate::table::{Constraint, GeneratedColumn};
use crate::{open_table, open_table_with_storage_options, DeltaTable};

pub use self::session::*;
pub(crate) use find_files::*;

pub(crate) const PATH_COLUMN: &str = "__delta_rs_path";
Expand All @@ -83,6 +83,7 @@ pub mod logical;
pub mod physical;
pub mod planner;
mod schema_adapter;
mod session;
mod table_provider;

pub use cdf::scan::DeltaCdfTableProvider;
Expand Down Expand Up @@ -125,7 +126,7 @@ pub trait DataFusionMixins {
fn parse_predicate_expression(
&self,
expr: impl AsRef<str>,
session: &impl Session,
session: &dyn Session,
) -> DeltaResult<Expr>;
}

Expand All @@ -149,7 +150,7 @@ impl DataFusionMixins for Snapshot {
fn parse_predicate_expression(
&self,
expr: impl AsRef<str>,
session: &impl Session,
session: &dyn Session,
) -> DeltaResult<Expr> {
let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
parse_predicate_expression(&schema, expr, session)
Expand Down Expand Up @@ -188,7 +189,7 @@ impl DataFusionMixins for LogDataHandler<'_> {
fn parse_predicate_expression(
&self,
expr: impl AsRef<str>,
session: &impl Session,
session: &dyn Session,
) -> DeltaResult<Expr> {
let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
parse_predicate_expression(&schema, expr, session)
Expand All @@ -207,7 +208,7 @@ impl DataFusionMixins for EagerSnapshot {
fn parse_predicate_expression(
&self,
expr: impl AsRef<str>,
session: &impl Session,
session: &dyn Session,
) -> DeltaResult<Expr> {
self.snapshot().parse_predicate_expression(expr, session)
}
Expand Down Expand Up @@ -792,67 +793,6 @@ impl TableProviderFactory for DeltaTableFactory {
}
}

/// A wrapper for sql_parser's ParserOptions to capture sane default table defaults
pub struct DeltaParserOptions {
inner: ParserOptions,
}

impl Default for DeltaParserOptions {
fn default() -> Self {
DeltaParserOptions {
inner: ParserOptions {
enable_ident_normalization: false,
..ParserOptions::default()
},
}
}
}

impl From<DeltaParserOptions> for ParserOptions {
fn from(value: DeltaParserOptions) -> Self {
value.inner
}
}

/// A wrapper for Deltafusion's SessionConfig to capture sane default table defaults
pub struct DeltaSessionConfig {
inner: SessionConfig,
}

impl Default for DeltaSessionConfig {
fn default() -> Self {
DeltaSessionConfig {
inner: SessionConfig::default()
.set_bool("datafusion.sql_parser.enable_ident_normalization", false),
}
}
}

impl From<DeltaSessionConfig> for SessionConfig {
fn from(value: DeltaSessionConfig) -> Self {
value.inner
}
}

/// A wrapper for Deltafusion's SessionContext to capture sane default table defaults
pub struct DeltaSessionContext {
inner: SessionContext,
}

impl Default for DeltaSessionContext {
fn default() -> Self {
DeltaSessionContext {
inner: SessionContext::new_with_config(DeltaSessionConfig::default().into()),
}
}
}

impl From<DeltaSessionContext> for SessionContext {
fn from(value: DeltaSessionContext) -> Self {
value.inner
}
}

/// A wrapper for Deltafusion's Column to preserve case-sensitivity during string conversion
pub struct DeltaColumn {
inner: Column,
Expand Down Expand Up @@ -914,7 +854,7 @@ mod tests {
use datafusion::logical_expr::lit;
use datafusion::physical_plan::empty::EmptyExec;
use datafusion::physical_plan::{visit_execution_plan, ExecutionPlanVisitor, PhysicalExpr};
use datafusion::prelude::col;
use datafusion::prelude::{col, SessionConfig};
use datafusion_proto::physical_plan::AsExecutionPlan;
use datafusion_proto::protobuf;
use delta_kernel::path::{LogPathFileType, ParsedLogPath};
Expand Down
101 changes: 101 additions & 0 deletions crates/core/src/delta_datafusion/session.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
use datafusion::{
catalog::Session,
common::{exec_datafusion_err, Result as DataFusionResult},
execution::{SessionState, SessionStateBuilder},
prelude::{SessionConfig, SessionContext},
sql::planner::ParserOptions,
};

use crate::delta_datafusion::planner::DeltaPlanner;

pub fn create_session() -> DeltaSessionContext {
DeltaSessionContext::default()
}

// Given a `Session` reference, get the concrete `SessionState` reference
// Note: this may stop working in future versions,
#[deprecated(
since = "0.29.1",
note = "Stop gap to get rid of all explicit session state references"
)]
pub(crate) fn session_state_from_session(session: &dyn Session) -> DataFusionResult<&SessionState> {
session
.as_any()
.downcast_ref::<SessionState>()
.ok_or_else(|| exec_datafusion_err!("Failed to downcast Session to SessionState"))
}

/// A wrapper for sql_parser's ParserOptions to capture sane default table defaults
pub struct DeltaParserOptions {
inner: ParserOptions,
}

impl Default for DeltaParserOptions {
fn default() -> Self {
DeltaParserOptions {
inner: ParserOptions {
enable_ident_normalization: false,
..ParserOptions::default()
},
}
}
}

impl From<DeltaParserOptions> for ParserOptions {
fn from(value: DeltaParserOptions) -> Self {
value.inner
}
}

/// A wrapper for Deltafusion's SessionConfig to capture sane default table defaults
pub struct DeltaSessionConfig {
inner: SessionConfig,
}

impl Default for DeltaSessionConfig {
fn default() -> Self {
DeltaSessionConfig {
inner: SessionConfig::default()
.set_bool("datafusion.sql_parser.enable_ident_normalization", false),
}
}
}

impl From<DeltaSessionConfig> for SessionConfig {
fn from(value: DeltaSessionConfig) -> Self {
value.inner
}
}

/// A wrapper for Deltafusion's SessionContext to capture sane default table defaults
pub struct DeltaSessionContext {
inner: SessionContext,
}

impl DeltaSessionContext {
pub fn new() -> Self {
let ctx = SessionContext::new_with_config(DeltaSessionConfig::default().into());
let planner = DeltaPlanner::new();
let state = SessionStateBuilder::new_from_existing(ctx.state())
.with_query_planner(planner)
.build();
let inner = SessionContext::new_with_state(state);
Self { inner }
}

pub fn into_inner(self) -> SessionContext {
self.inner
}
}

impl Default for DeltaSessionContext {
fn default() -> Self {
Self::new()
}
}

impl From<DeltaSessionContext> for SessionContext {
fn from(value: DeltaSessionContext) -> Self {
value.inner
}
}
2 changes: 1 addition & 1 deletion crates/core/src/logstore/factories.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,9 @@ fn default_parse_url_opts(
/// Access global registry of object store factories
pub fn object_store_factories() -> ObjectStoreFactoryRegistry {
static REGISTRY: OnceLock<ObjectStoreFactoryRegistry> = OnceLock::new();
let factory = Arc::new(DefaultObjectStoreFactory::default());
REGISTRY
.get_or_init(|| {
let factory = Arc::new(DefaultObjectStoreFactory::default());
let registry = ObjectStoreFactoryRegistry::default();
registry.insert(Url::parse("memory://").unwrap(), factory.clone());
registry.insert(Url::parse("file://").unwrap(), factory);
Expand Down
25 changes: 9 additions & 16 deletions crates/core/src/operations/constraints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,16 @@ use std::sync::Arc;

use datafusion::catalog::Session;
use datafusion::common::ToDFSchema;
use datafusion::execution::{SendableRecordBatchStream, SessionState};
use datafusion::execution::SendableRecordBatchStream;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use delta_kernel::table_features::WriterFeature;
use futures::future::BoxFuture;
use futures::StreamExt;

use super::datafusion_utils::into_expr;
use super::{CustomExecuteHandler, Operation};
use crate::delta_datafusion::expr::fmt_expr_to_sql;
use crate::delta_datafusion::{
register_store, DeltaDataChecker, DeltaScanBuilder, DeltaSessionContext,
};
use crate::delta_datafusion::{create_session, register_store, DeltaDataChecker, DeltaScanBuilder};
use crate::kernel::transaction::{CommitBuilder, CommitProperties};
use crate::kernel::{EagerSnapshot, MetadataExt, ProtocolExt as _, ProtocolInner};
use crate::logstore::LogStoreRef;
Expand Down Expand Up @@ -132,19 +129,16 @@ impl std::future::IntoFuture for ConstraintBuilder {

let session = this
.session
.and_then(|session| session.as_any().downcast_ref::<SessionState>().cloned())
.unwrap_or_else(|| {
let session: SessionContext = DeltaSessionContext::default().into();
session.state()
});
.unwrap_or_else(|| Arc::new(create_session().into_inner().state()));
register_store(this.log_store.clone(), session.runtime_env().as_ref());

let scan = DeltaScanBuilder::new(&this.snapshot, this.log_store.clone(), &session)
.build()
.await?;
let scan =
DeltaScanBuilder::new(&this.snapshot, this.log_store.clone(), session.as_ref())
.build()
.await?;

let schema = scan.schema().to_dfschema()?;
let expr = into_expr(expr, &schema, &session)?;
let expr = into_expr(expr, &schema, session.as_ref())?;
let expr_str = fmt_expr_to_sql(&expr)?;

// Checker built here with the one time constraint to check.
Expand All @@ -156,9 +150,8 @@ impl std::future::IntoFuture for ConstraintBuilder {
for p in 0..plan.properties().output_partitioning().partition_count() {
let inner_plan = plan.clone();
let inner_checker = checker.clone();
let task_ctx = Arc::new((&session).into());
let mut record_stream: SendableRecordBatchStream =
inner_plan.execute(p, task_ctx)?;
inner_plan.execute(p, session.task_ctx())?;
let handle: tokio::task::JoinHandle<DeltaResult<()>> =
tokio::task::spawn(async move {
while let Some(maybe_batch) = record_stream.next().await {
Expand Down
Loading
Loading