Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/core/src/delta_datafusion/cdf/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub struct DeltaCdfTableProvider {
impl DeltaCdfTableProvider {
/// Build a DeltaCDFTableProvider
pub fn try_new(cdf_builder: CdfLoadBuilder) -> DeltaResult<Self> {
let mut fields = cdf_builder.snapshot.input_schema()?.fields().to_vec();
let mut fields = cdf_builder.snapshot.input_schema().fields().to_vec();
for f in ADD_PARTITION_SCHEMA.clone() {
fields.push(f.into());
}
Expand Down
1 change: 0 additions & 1 deletion crates/core/src/delta_datafusion/expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,6 @@ mod test {
.unwrap()
.snapshot()
.input_schema()
.unwrap()
.as_ref()
.to_owned()
.to_dfschema()
Expand Down
89 changes: 57 additions & 32 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@
use datafusion_proto::logical_plan::LogicalExtensionCodec;
use datafusion_proto::physical_plan::PhysicalExtensionCodec;
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
use delta_kernel::table_configuration::TableConfiguration;

Check warning on line 58 in crates/core/src/delta_datafusion/mod.rs

View workflow job for this annotation

GitHub Actions / check

unused import: `delta_kernel::table_configuration::TableConfiguration`

Check warning on line 58 in crates/core/src/delta_datafusion/mod.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.11

unused import: `delta_kernel::table_configuration::TableConfiguration`

Check warning on line 58 in crates/core/src/delta_datafusion/mod.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 Unity Catalog Integration tests)

unused import: `delta_kernel::table_configuration::TableConfiguration`

Check warning on line 58 in crates/core/src/delta_datafusion/mod.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.10

unused import: `delta_kernel::table_configuration::TableConfiguration`

Check warning on line 58 in crates/core/src/delta_datafusion/mod.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.9

unused import: `delta_kernel::table_configuration::TableConfiguration`

Check warning on line 58 in crates/core/src/delta_datafusion/mod.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 Unity Catalog Integration tests)

unused import: `delta_kernel::table_configuration::TableConfiguration`

Check warning on line 58 in crates/core/src/delta_datafusion/mod.rs

View workflow job for this annotation

GitHub Actions / test-minimal

unused import: `delta_kernel::table_configuration::TableConfiguration`

Check warning on line 58 in crates/core/src/delta_datafusion/mod.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 LakeFS Integration tests)

unused import: `delta_kernel::table_configuration::TableConfiguration`

Check warning on line 58 in crates/core/src/delta_datafusion/mod.rs

View workflow job for this annotation

GitHub Actions / Running with Python 3.12

unused import: `delta_kernel::table_configuration::TableConfiguration`

Check warning on line 58 in crates/core/src/delta_datafusion/mod.rs

View workflow job for this annotation

GitHub Actions / build-deploy

unused import: `delta_kernel::table_configuration::TableConfiguration`

Check warning on line 58 in crates/core/src/delta_datafusion/mod.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `delta_kernel::table_configuration::TableConfiguration`

Check warning on line 58 in crates/core/src/delta_datafusion/mod.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `delta_kernel::table_configuration::TableConfiguration`

Check warning on line 58 in crates/core/src/delta_datafusion/mod.rs

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest)

unused import: `delta_kernel::table_configuration::TableConfiguration`

Check warning on line 58 in crates/core/src/delta_datafusion/mod.rs

View workflow job for this annotation

GitHub Actions / PySpark Integration Tests

unused import: `delta_kernel::table_configuration::TableConfiguration`

Check warning on line 58 in crates/core/src/delta_datafusion/mod.rs

View workflow job for this annotation

GitHub Actions / Python Build (Python 3.10 PyArrow latest)

unused import: `delta_kernel::table_configuration::TableConfiguration`

Check warning on line 58 in crates/core/src/delta_datafusion/mod.rs

View workflow job for this annotation

GitHub Actions / cloud (azure)

unused import: `delta_kernel::table_configuration::TableConfiguration`

Check warning on line 58 in crates/core/src/delta_datafusion/mod.rs

View workflow job for this annotation

GitHub Actions / Unit Tests (ubuntu-latest)

unused import: `delta_kernel::table_configuration::TableConfiguration`
use either::Either;
use itertools::Itertools;
use url::Url;
Expand Down Expand Up @@ -117,10 +117,10 @@
/// Convenience trait for calling common methods on snapshot hierarchies
pub trait DataFusionMixins {
/// The physical datafusion schema of a table
fn arrow_schema(&self) -> DeltaResult<ArrowSchemaRef>;
fn read_schema(&self) -> ArrowSchemaRef;

/// Get the table schema as an [`ArrowSchemaRef`]
fn input_schema(&self) -> DeltaResult<ArrowSchemaRef>;
fn input_schema(&self) -> ArrowSchemaRef;

/// Parse an expression string into a datafusion [`Expr`]
fn parse_predicate_expression(
Expand All @@ -131,49 +131,77 @@
}

impl DataFusionMixins for Snapshot {
fn arrow_schema(&self) -> DeltaResult<ArrowSchemaRef> {
_arrow_schema(self.table_configuration(), true)
fn read_schema(&self) -> ArrowSchemaRef {
_arrow_schema(
self.arrow_schema(),
self.metadata().partition_columns(),
true,
)
}

fn input_schema(&self) -> DeltaResult<ArrowSchemaRef> {
_arrow_schema(self.table_configuration(), false)
fn input_schema(&self) -> ArrowSchemaRef {
_arrow_schema(
self.arrow_schema(),
self.metadata().partition_columns(),
false,
)
}

fn parse_predicate_expression(
&self,
expr: impl AsRef<str>,
df_state: &SessionState,
) -> DeltaResult<Expr> {
let schema = DFSchema::try_from(self.arrow_schema()?.as_ref().to_owned())?;
let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
parse_predicate_expression(&schema, expr, df_state)
}
}

impl DataFusionMixins for LogDataHandler<'_> {
fn arrow_schema(&self) -> DeltaResult<ArrowSchemaRef> {
_arrow_schema(self.table_configuration(), true)
fn read_schema(&self) -> ArrowSchemaRef {
_arrow_schema(
Arc::new(
self.table_configuration()
.schema()
.as_ref()
.try_into_arrow()
.unwrap(),
),
self.table_configuration().metadata().partition_columns(),
true,
)
}

fn input_schema(&self) -> DeltaResult<ArrowSchemaRef> {
_arrow_schema(self.table_configuration(), false)
fn input_schema(&self) -> ArrowSchemaRef {
_arrow_schema(
Arc::new(
self.table_configuration()
.schema()
.as_ref()
.try_into_arrow()
.unwrap(),
),
self.table_configuration().metadata().partition_columns(),
false,
)
}

fn parse_predicate_expression(
&self,
expr: impl AsRef<str>,
df_state: &SessionState,
) -> DeltaResult<Expr> {
let schema = DFSchema::try_from(self.arrow_schema()?.as_ref().to_owned())?;
let schema = DFSchema::try_from(self.read_schema().as_ref().to_owned())?;
parse_predicate_expression(&schema, expr, df_state)
}
}

impl DataFusionMixins for EagerSnapshot {
fn arrow_schema(&self) -> DeltaResult<ArrowSchemaRef> {
self.snapshot().arrow_schema()
fn read_schema(&self) -> ArrowSchemaRef {
self.snapshot().read_schema()
}

fn input_schema(&self) -> DeltaResult<ArrowSchemaRef> {
fn input_schema(&self) -> ArrowSchemaRef {
self.snapshot().input_schema()
}

Expand All @@ -187,22 +215,20 @@
}

fn _arrow_schema(
snapshot: &TableConfiguration,
schema: SchemaRef,
partition_columns: &[String],
wrap_partitions: bool,
) -> DeltaResult<ArrowSchemaRef> {
let meta = snapshot.metadata();
let schema = snapshot.schema();

) -> ArrowSchemaRef {
let fields = schema
.fields()
.filter(|f| !meta.partition_columns().contains(&f.name().to_string()))
.map(|f| f.try_into_arrow())
.into_iter()
.filter(|f| !partition_columns.contains(&f.name().to_string()))
.cloned()
.chain(
// We need stable order between logical and physical schemas, but the order of
// partitioning columns is not always the same in the json schema and the array
meta.partition_columns().iter().map(|partition_col| {
let f = schema.field(partition_col).unwrap();
let field: Field = f.try_into_arrow()?;
partition_columns.iter().map(|partition_col| {
let field = schema.field_with_name(partition_col).unwrap();
let corrected = if wrap_partitions {
match field.data_type() {
// Only dictionary-encode types that may be large
Expand All @@ -218,12 +244,11 @@
} else {
field.data_type().clone()
};
Ok(field.with_data_type(corrected))
Arc::new(field.clone().with_data_type(corrected))
}),
)
.collect::<Result<Vec<Field>, _>>()?;

Ok(Arc::new(ArrowSchema::new(fields)))
.collect::<Vec<_>>();
Arc::new(ArrowSchema::new(fields))
}

pub(crate) fn files_matching_predicate<'a>(
Expand All @@ -234,8 +259,8 @@
(!filters.is_empty()).then_some(conjunction(filters.iter().cloned()))
{
let expr = SessionContext::new()
.create_physical_expr(predicate, &log_data.arrow_schema()?.to_dfschema()?)?;
let pruning_predicate = PruningPredicate::try_new(expr, log_data.arrow_schema()?)?;
.create_physical_expr(predicate, &log_data.read_schema().to_dfschema()?)?;
let pruning_predicate = PruningPredicate::try_new(expr, log_data.read_schema())?;
let mask = pruning_predicate.prune(&log_data)?;

Ok(Either::Left(log_data.into_iter().zip(mask).filter_map(
Expand Down Expand Up @@ -294,7 +319,7 @@
) -> DeltaResult<SchemaRef> {
let input_schema = match schema {
Some(schema) => schema,
None => snapshot.input_schema()?,
None => snapshot.input_schema(),
};
let table_partition_cols = snapshot.metadata().partition_columns();

Expand Down
39 changes: 13 additions & 26 deletions crates/core/src/delta_datafusion/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,22 +89,14 @@ pub struct DeltaDataSink {
/// transaction log access, snapshot state, and session configuration.
impl DeltaDataSink {
/// Create a new `DeltaDataSink`
pub fn new(
log_store: LogStoreRef,
snapshot: EagerSnapshot,
save_mode: SaveMode,
) -> datafusion::common::Result<Self> {
let schema = snapshot
.arrow_schema()
.map_err(|e| DataFusionError::External(Box::new(e)))?;

Ok(Self {
pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot, save_mode: SaveMode) -> Self {
Self {
log_store,
schema: snapshot.read_schema(),
snapshot,
save_mode,
schema,
metrics: ExecutionPlanMetricsSet::new(),
})
}
}

/// Create a streaming transformed version of the input that converts dictionary columns
Expand Down Expand Up @@ -156,20 +148,15 @@ impl DataSink for DeltaDataSink {
data: SendableRecordBatchStream,
_context: &Arc<TaskContext>,
) -> datafusion::common::Result<u64> {
let target_schema = self
.snapshot
.input_schema()
.map_err(|e| DataFusionError::External(Box::new(e)))?;
let target_schema = self.snapshot.input_schema();

let mut stream = self.create_converted_stream(data, target_schema.clone());
let partition_columns = self.snapshot.metadata().partition_columns();
let object_store = self.log_store.object_store(None);
let total_rows_metric = MetricBuilder::new(&self.metrics).counter("total_rows", 0);
let stats_config = WriterStatsConfig::new(DataSkippingNumIndexedCols::AllColumns, None);
let config = WriterConfig::new(
self.snapshot
.arrow_schema()
.map_err(|e| DataFusionError::External(Box::new(e)))?,
self.snapshot.read_schema(),
partition_columns.clone(),
None,
None,
Expand Down Expand Up @@ -322,7 +309,7 @@ impl DeltaScanConfigBuilder {
/// Build a DeltaScanConfig and ensure no column name conflicts occur during downstream processing
pub fn build(&self, snapshot: &EagerSnapshot) -> DeltaResult<DeltaScanConfig> {
let file_column_name = if self.include_file_column {
let input_schema = snapshot.input_schema()?;
let input_schema = snapshot.input_schema();
let mut column_names: HashSet<&String> = HashSet::new();
for field in input_schema.fields.iter() {
column_names.insert(field.name());
Expand Down Expand Up @@ -439,9 +426,9 @@ impl<'a> DeltaScanBuilder<'a> {
};

let schema = match config.schema.clone() {
Some(value) => Ok(value),
None => self.snapshot.arrow_schema(),
}?;
Some(value) => value,
None => self.snapshot.read_schema(),
};

let logical_schema = df_logical_schema(
self.snapshot,
Expand Down Expand Up @@ -720,7 +707,7 @@ impl TableProvider for DeltaTable {
}

fn schema(&self) -> Arc<Schema> {
self.snapshot().unwrap().snapshot().arrow_schema().unwrap()
self.snapshot().unwrap().snapshot().read_schema()
}

fn table_type(&self) -> TableType {
Expand Down Expand Up @@ -881,7 +868,7 @@ impl TableProvider for DeltaTableProvider {
};

let data_sink =
DeltaDataSink::new(self.log_store.clone(), self.snapshot.clone(), save_mode)?;
DeltaDataSink::new(self.log_store.clone(), self.snapshot.clone(), save_mode);

Ok(Arc::new(DataSinkExec::new(
input,
Expand Down Expand Up @@ -1005,7 +992,7 @@ fn df_logical_schema(
) -> DeltaResult<SchemaRef> {
let input_schema = match schema {
Some(schema) => schema,
None => snapshot.input_schema()?,
None => snapshot.input_schema(),
};
let table_partition_cols = snapshot.metadata().partition_columns();

Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/kernel/snapshot/iterators/scan_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
match this.stream.poll_next(cx) {
Poll::Ready(Some(Ok(batch))) => match parse_stats_column(&this.snapshot, &batch) {
Poll::Ready(Some(Ok(batch))) => match parse_stats_column(this.snapshot, &batch) {
Ok(batch) => Poll::Ready(Some(Ok(batch))),
Err(err) => Poll::Ready(Some(Err(err))),
},
Expand Down
Loading
Loading