Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 7 additions & 18 deletions crates/core/src/delta_datafusion/cdf/scan.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use std::any::Any;
use std::sync::Arc;

use arrow_schema::{Schema, SchemaRef};
use async_trait::async_trait;
use datafusion::catalog::Session;
use datafusion::catalog::TableProvider;
use datafusion::common::{exec_datafusion_err, Column, DFSchema, Result as DataFusionResult};
use datafusion::execution::SessionState;
use arrow::datatypes::{Schema, SchemaRef};
use datafusion::catalog::{Session, TableProvider};
use datafusion::common::{Column, DFSchema, Result as DataFusionResult};
use datafusion::logical_expr::utils::conjunction;
use datafusion::logical_expr::{Expr, TableProviderFilterPushDown, TableType};
use datafusion::physical_expr::PhysicalExpr;
Expand All @@ -15,20 +12,13 @@ use datafusion::physical_plan::limit::GlobalLimitExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::ExecutionPlan;

use crate::DeltaTableError;
use crate::{
delta_datafusion::DataFusionMixins, operations::load_cdf::CdfLoadBuilder, DeltaResult,
DeltaTableError,
};

use super::ADD_PARTITION_SCHEMA;

fn session_state_from_session(session: &dyn Session) -> DataFusionResult<&SessionState> {
session
.as_any()
.downcast_ref::<SessionState>()
.ok_or_else(|| exec_datafusion_err!("Failed to downcast Session to SessionState"))
}

#[derive(Debug)]
pub struct DeltaCdfTableProvider {
cdf_builder: CdfLoadBuilder,
Expand All @@ -49,7 +39,7 @@ impl DeltaCdfTableProvider {
}
}

#[async_trait]
#[async_trait::async_trait]
impl TableProvider for DeltaCdfTableProvider {
fn as_any(&self) -> &dyn Any {
self
Expand All @@ -70,18 +60,17 @@ impl TableProvider for DeltaCdfTableProvider {
filters: &[Expr],
limit: Option<usize>,
) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
let session_state = session_state_from_session(session)?;
let schema: DFSchema = self.schema().try_into()?;

let mut plan = if let Some(filter_expr) = conjunction(filters.iter().cloned()) {
let physical_expr = session.create_physical_expr(filter_expr, &schema)?;
let plan = self
.cdf_builder
.build(session_state, Some(&physical_expr))
.build(session, Some(&physical_expr))
.await?;
Arc::new(FilterExec::try_new(physical_expr, plan)?)
} else {
self.cdf_builder.build(session_state, None).await?
self.cdf_builder.build(session, None).await?
};

let df_schema: DFSchema = plan.schema().try_into()?;
Expand Down
33 changes: 7 additions & 26 deletions crates/core/src/delta_datafusion/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,10 @@ use std::borrow::Cow;
use std::fmt;
use std::sync::Arc;

use async_trait::async_trait;
use futures::StreamExt;

use arrow::array::BooleanArray;
use arrow::compute::filter_record_batch;
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::error::ArrowError;
use arrow_array::BooleanArray;
use chrono::{DateTime, TimeZone, Utc};
use datafusion::catalog::memory::DataSourceExec;
use datafusion::catalog::TableProvider;
Expand All @@ -25,7 +22,6 @@ use datafusion::datasource::sink::{DataSink, DataSinkExec};
use datafusion::datasource::TableType;
use datafusion::error::DataFusionError;
use datafusion::execution::context::ExecutionProps;
use datafusion::execution::context::SessionState;
use datafusion::execution::{SendableRecordBatchStream, TaskContext};
use datafusion::logical_expr::dml::InsertOp;
use datafusion::logical_expr::simplify::SimplifyContext;
Expand All @@ -46,6 +42,8 @@ use datafusion::{
prelude::Expr,
scalar::ScalarValue,
};
use delta_kernel::table_properties::DataSkippingNumIndexedCols;
use futures::StreamExt as _;
use itertools::Itertools;
use object_store::ObjectMeta;
use serde::{Deserialize, Serialize};
Expand All @@ -63,20 +61,9 @@ use crate::operations::write::WriterStatsConfig;
use crate::protocol::{DeltaOperation, SaveMode};
use crate::{ensure_table_uri, DeltaTable};
use crate::{logstore::LogStoreRef, DeltaResult, DeltaTableError};
use delta_kernel::table_properties::DataSkippingNumIndexedCols;

const PATH_COLUMN: &str = "__delta_rs_path";

/// Get the session state from the session
fn session_state_from_session(session: &dyn Session) -> Result<&SessionState> {
session
.as_any()
.downcast_ref::<SessionState>()
.ok_or_else(|| {
DataFusionError::Plan("Failed to downcast Session to SessionState".to_string())
})
}

/// DataSink implementation for delta lake
/// This uses DataSinkExec to handle the insert operation
/// Implements writing streams of RecordBatches to delta.
Expand Down Expand Up @@ -106,7 +93,6 @@ impl DeltaDataSink {
log_store: LogStoreRef,
snapshot: EagerSnapshot,
save_mode: SaveMode,
session_state: Arc<SessionState>,
) -> datafusion::common::Result<Self> {
let schema = snapshot
.arrow_schema()
Expand Down Expand Up @@ -149,7 +135,7 @@ impl DeltaDataSink {
/// This is used to write the data to the delta table
/// It implements the `DataSink` trait and is used by the `DataSinkExec` node
/// to write the data to the delta table
#[async_trait]
#[async_trait::async_trait]
impl DataSink for DeltaDataSink {
fn as_any(&self) -> &dyn Any {
self
Expand Down Expand Up @@ -882,8 +868,7 @@ impl TableProvider for DeltaTableProvider {
input: Arc<dyn ExecutionPlan>,
insert_op: InsertOp,
) -> Result<Arc<dyn ExecutionPlan>> {
let session_state = session_state_from_session(state)?.clone();
register_store(self.log_store.clone(), session_state.runtime_env().clone());
register_store(self.log_store.clone(), state.runtime_env().clone());

let save_mode = match insert_op {
InsertOp::Append => SaveMode::Append,
Expand All @@ -895,12 +880,8 @@ impl TableProvider for DeltaTableProvider {
}
};

let data_sink = DeltaDataSink::new(
self.log_store.clone(),
self.snapshot.clone(),
save_mode,
Arc::new(session_state),
)?;
let data_sink =
DeltaDataSink::new(self.log_store.clone(), self.snapshot.clone(), save_mode)?;

Ok(Arc::new(DataSinkExec::new(
input,
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/operations/load_cdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ use std::time::SystemTime;
use arrow_array::RecordBatch;
use arrow_schema::{ArrowError, Field, Schema};
use chrono::{DateTime, Utc};
use datafusion::catalog::Session;
use datafusion::common::config::TableParquetOptions;
use datafusion::common::ScalarValue;
use datafusion::datasource::memory::DataSourceExec;
use datafusion::datasource::physical_plan::{
FileGroup, FileScanConfigBuilder, FileSource, ParquetSource,
};
use datafusion::execution::SessionState;
use datafusion::physical_expr::{expressions, PhysicalExpr};
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::union::UnionExec;
Expand Down Expand Up @@ -325,13 +325,13 @@ impl CdfLoadBuilder {
/// Executes the scan
pub(crate) async fn build(
&self,
session_sate: &SessionState,
session: &dyn Session,
filters: Option<&Arc<dyn PhysicalExpr>>,
) -> DeltaResult<Arc<dyn ExecutionPlan>> {
PROTOCOL.can_read_from(&self.snapshot)?;

let (cdc, add, remove) = self.determine_files_to_read().await?;
register_store(self.log_store.clone(), session_sate.runtime_env().clone());
register_store(self.log_store.clone(), session.runtime_env().clone());

let partition_values = self.snapshot.metadata().partition_columns().clone();
let schema = self.snapshot.input_schema()?;
Expand Down
Loading