Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ impl DataFusionMixins for Snapshot {
expr: impl AsRef<str>,
df_state: &SessionState,
) -> DeltaResult<Expr> {
let schema = DFSchema::try_from(self.arrow_schema()?.as_ref().to_owned())?;
let schema = DFSchema::try_from(self.arrow_schema().as_ref().to_owned())?;
parse_predicate_expression(&schema, expr, df_state)
}
}
Expand All @@ -170,7 +170,7 @@ impl DataFusionMixins for LogDataHandler<'_> {

impl DataFusionMixins for EagerSnapshot {
fn arrow_schema(&self) -> DeltaResult<ArrowSchemaRef> {
self.snapshot().arrow_schema()
Ok(self.snapshot().arrow_schema())
}

fn input_schema(&self) -> DeltaResult<ArrowSchemaRef> {
Expand Down
28 changes: 9 additions & 19 deletions crates/core/src/delta_datafusion/table_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,22 +89,14 @@ pub struct DeltaDataSink {
/// transaction log access, snapshot state, and session configuration.
impl DeltaDataSink {
/// Create a new `DeltaDataSink`
pub fn new(
log_store: LogStoreRef,
snapshot: EagerSnapshot,
save_mode: SaveMode,
) -> datafusion::common::Result<Self> {
let schema = snapshot
.arrow_schema()
.map_err(|e| DataFusionError::External(Box::new(e)))?;

Ok(Self {
pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot, save_mode: SaveMode) -> Self {
Self {
log_store,
schema: snapshot.arrow_schema(),
snapshot,
save_mode,
schema,
metrics: ExecutionPlanMetricsSet::new(),
})
}
}

/// Create a streaming transformed version of the input that converts dictionary columns
Expand Down Expand Up @@ -167,9 +159,7 @@ impl DataSink for DeltaDataSink {
let total_rows_metric = MetricBuilder::new(&self.metrics).counter("total_rows", 0);
let stats_config = WriterStatsConfig::new(DataSkippingNumIndexedCols::AllColumns, None);
let config = WriterConfig::new(
self.snapshot
.arrow_schema()
.map_err(|e| DataFusionError::External(Box::new(e)))?,
self.snapshot.arrow_schema(),
partition_columns.clone(),
None,
None,
Expand Down Expand Up @@ -439,9 +429,9 @@ impl<'a> DeltaScanBuilder<'a> {
};

let schema = match config.schema.clone() {
Some(value) => Ok(value),
Some(value) => value,
None => self.snapshot.arrow_schema(),
}?;
};

let logical_schema = df_logical_schema(
self.snapshot,
Expand Down Expand Up @@ -720,7 +710,7 @@ impl TableProvider for DeltaTable {
}

fn schema(&self) -> Arc<Schema> {
self.snapshot().unwrap().snapshot().arrow_schema().unwrap()
self.snapshot().unwrap().snapshot().arrow_schema()
}

fn table_type(&self) -> TableType {
Expand Down Expand Up @@ -881,7 +871,7 @@ impl TableProvider for DeltaTableProvider {
};

let data_sink =
DeltaDataSink::new(self.log_store.clone(), self.snapshot.clone(), save_mode)?;
DeltaDataSink::new(self.log_store.clone(), self.snapshot.clone(), save_mode);

Ok(Arc::new(DataSinkExec::new(
input,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/kernel/snapshot/iterators/scan_row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ where
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
match this.stream.poll_next(cx) {
Poll::Ready(Some(Ok(batch))) => match parse_stats_column(&this.snapshot, &batch) {
Poll::Ready(Some(Ok(batch))) => match parse_stats_column(this.snapshot, &batch) {
Ok(batch) => Poll::Ready(Some(Ok(batch))),
Err(err) => Poll::Ready(Some(Err(err))),
},
Expand Down
54 changes: 40 additions & 14 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@
use std::io::{BufRead, BufReader, Cursor};
use std::sync::{Arc, LazyLock};

use arrow::array::RecordBatch;
use arrow::compute::{filter_record_batch, is_not_null};
use arrow_array::RecordBatch;
use arrow::datatypes::SchemaRef;
use delta_kernel::actions::{Remove, Sidecar};
use delta_kernel::engine::arrow_conversion::TryIntoArrow;
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::path::{LogPathFileType, ParsedLogPath};
use delta_kernel::scan::scan_row_schema;
use delta_kernel::schema::derive_macro_utils::ToDataType;
use delta_kernel::schema::{SchemaRef, StructField, ToSchema};
use delta_kernel::schema::{SchemaRef as KernelSchemaRef, StructField, ToSchema};
use delta_kernel::snapshot::Snapshot as KernelSnapshot;
use delta_kernel::table_configuration::TableConfiguration;
use delta_kernel::table_properties::TableProperties;
Expand All @@ -40,13 +41,13 @@ use tokio::task::spawn_blocking;

use super::{Action, CommitInfo, Metadata, Protocol};
use crate::kernel::arrow::engine_ext::{kernel_to_arrow, ExpressionEvaluatorExt};
use crate::kernel::snapshot::scan::ScanBuilder;
use crate::kernel::{StructType, ARROW_HANDLER};
use crate::logstore::{LogStore, LogStoreExt};
use crate::{to_kernel_predicate, DeltaResult, DeltaTableConfig, DeltaTableError, PartitionFilter};

pub use self::log_data::*;
pub use iterators::*;
pub use scan::*;
pub use stream::*;

mod iterators;
Expand Down Expand Up @@ -108,7 +109,13 @@ impl Snapshot {
}
};

let schema = snapshot.table_configuration().schema();
let schema = Arc::new(
snapshot
.table_configuration()
.schema()
.as_ref()
.try_into_arrow()?,
);

Ok(Self {
inner: snapshot,
Expand Down Expand Up @@ -154,7 +161,13 @@ impl Snapshot {
.map_err(|e| DeltaTableError::Generic(e.to_string()))??;

self.inner = snapshot;
self.schema = self.inner.table_configuration().schema();
self.schema = Arc::new(
self.inner
.table_configuration()
.schema()
.as_ref()
.try_into_arrow()?,
);

Ok(())
}
Expand All @@ -165,8 +178,12 @@ impl Snapshot {
}

/// Get the table schema of the snapshot
pub fn schema(&self) -> &StructType {
self.schema.as_ref()
pub fn schema(&self) -> KernelSchemaRef {
self.inner.table_configuration().schema()
}

pub fn arrow_schema(&self) -> SchemaRef {
self.schema.clone()
}

/// Get the table metadata of the snapshot
Expand Down Expand Up @@ -502,10 +519,15 @@ impl EagerSnapshot {
}

/// Get the table schema of the snapshot
pub fn schema(&self) -> &StructType {
pub fn schema(&self) -> KernelSchemaRef {
self.snapshot.schema()
}

/// Get the table arrow schema of the snapshot
pub fn arrow_schema(&self) -> SchemaRef {
self.snapshot.arrow_schema()
}

/// Get the table metadata of the snapshot
pub fn metadata(&self) -> &Metadata {
self.snapshot.metadata()
Expand Down Expand Up @@ -579,7 +601,7 @@ impl EagerSnapshot {
if filters.is_empty() {
return self.file_views(log_store, None);
}
let predicate = match to_kernel_predicate(filters, self.snapshot.schema()) {
let predicate = match to_kernel_predicate(filters, self.snapshot.schema().as_ref()) {
Ok(predicate) => Arc::new(predicate),
Err(err) => return Box::pin(once(ready(Err(err)))),
};
Expand Down Expand Up @@ -665,13 +687,17 @@ mod tests {

let engine = log_store.engine(None);
let snapshot = KernelSnapshot::builder_for(table_url.clone()).build(engine.as_ref())?;
let schema = snapshot.table_configuration().schema();
let schema = snapshot
.table_configuration()
.schema()
.as_ref()
.try_into_arrow()?;

Ok((
Self {
inner: snapshot,
config: Default::default(),
schema,
schema: Arc::new(schema),
},
log_store,
))
Expand Down Expand Up @@ -717,7 +743,7 @@ mod tests {

let schema_string = r#"{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}}]}"#;
let expected: StructType = serde_json::from_str(schema_string)?;
assert_eq!(snapshot.schema(), &expected);
assert_eq!(snapshot.schema().as_ref(), &expected);

let infos = snapshot
.commit_infos(&log_store, None)
Expand Down Expand Up @@ -776,7 +802,7 @@ mod tests {

let schema_string = r#"{"type":"struct","fields":[{"name":"id","type":"long","nullable":true,"metadata":{}}]}"#;
let expected: StructType = serde_json::from_str(schema_string)?;
assert_eq!(snapshot.schema(), &expected);
assert_eq!(snapshot.schema().as_ref(), &expected);

let log_store = TestTables::Checkpoints.table_builder()?.build_storage()?;

Expand Down
7 changes: 5 additions & 2 deletions crates/core/src/kernel/snapshot/serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::Arc;
use arrow_ipc::reader::FileReader;
use arrow_ipc::writer::FileWriter;
use delta_kernel::actions::{Metadata, Protocol};
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
use delta_kernel::log_segment::{ListedLogFiles, LogSegment};
use delta_kernel::path::ParsedLogPath;
use delta_kernel::snapshot::Snapshot as KernelSnapshot;
Expand Down Expand Up @@ -196,8 +197,10 @@ impl<'de> Visitor<'de> for SnapshotVisitor {

let snapshot = KernelSnapshot::new(log_segment, table_configuration);
let schema = snapshot
.metadata()
.parse_schema()
.table_configuration()
.schema()
.as_ref()
.try_into_arrow()
.map_err(de::Error::custom)?;

Ok(Snapshot {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/add_column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl std::future::IntoFuture for AddColumnBuilder {
}

let table_schema = this.snapshot.schema();
let new_table_schema = merge_delta_struct(table_schema, fields_right)?;
let new_table_schema = merge_delta_struct(table_schema.as_ref(), fields_right)?;

let current_protocol = this.snapshot.protocol();

Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ mod tests {
.await
.unwrap();
assert_eq!(table.version(), Some(0));
assert_eq!(table.snapshot().unwrap().schema(), &table_schema)
assert_eq!(table.snapshot().unwrap().schema().as_ref(), &table_schema)
}

#[tokio::test]
Expand All @@ -442,7 +442,7 @@ mod tests {
.await
.unwrap();
assert_eq!(table.version(), Some(0));
assert_eq!(table.snapshot().unwrap().schema(), &table_schema)
assert_eq!(table.snapshot().unwrap().schema().as_ref(), &table_schema)
}

#[tokio::test]
Expand Down Expand Up @@ -479,7 +479,7 @@ mod tests {
snapshot.protocol().min_writer_version(),
PROTOCOL.default_writer_version()
);
assert_eq!(snapshot.schema(), &schema);
assert_eq!(snapshot.schema().as_ref(), &schema);

// check we can overwrite default settings via adding actions
let protocol = ProtocolInner {
Expand Down
3 changes: 1 addition & 2 deletions crates/core/src/operations/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ use datafusion::physical_plan::{ExecutionPlan, SendableRecordBatchStream};
use futures::future::BoxFuture;

use super::CustomExecuteHandler;
use crate::delta_datafusion::DataFusionMixins;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::transaction::PROTOCOL;
use crate::kernel::EagerSnapshot;
Expand Down Expand Up @@ -70,7 +69,7 @@ impl std::future::IntoFuture for LoadBuilder {
snapshot: this.snapshot,
},
);
let schema = table.snapshot()?.snapshot().arrow_schema()?;
let schema = table.snapshot()?.snapshot().arrow_schema();
let projection = this
.columns
.map(|cols| {
Expand Down
27 changes: 20 additions & 7 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,7 @@ async fn execute(
let schema = Arc::new(schema_builder.finish());
new_schema = Some(schema.clone());
let schema_struct: StructType = schema.try_into_kernel()?;
if &schema_struct != snapshot.schema() {
if &schema_struct != snapshot.schema().as_ref() {
let action = Action::Metadata(new_metadata(
&schema_struct,
current_metadata.partition_columns(),
Expand Down Expand Up @@ -1097,7 +1097,7 @@ async fn execute(
let mut write_projection_with_cdf = Vec::new();

let schema = if let Some(schema) = new_schema {
&schema.try_into_kernel()?
Arc::new(schema.try_into_kernel()?)
} else {
snapshot.schema()
};
Expand Down Expand Up @@ -1902,7 +1902,8 @@ mod tests {
.unwrap();

// Verify schema nullability is preserved after merge
let final_fields: Vec<_> = merged_table.snapshot().unwrap().schema().fields().collect();
let schema = merged_table.snapshot().unwrap().schema();
let final_fields: Vec<_> = schema.fields().collect();
assert!(
!final_fields[0].is_nullable(),
"id should remain non-nullable after merge"
Expand Down Expand Up @@ -2269,7 +2270,10 @@ mod tests {
];
let actual = get_data(&table).await;
let expected_schema_struct: StructType = Arc::clone(&schema).try_into_kernel().unwrap();
assert_eq!(&expected_schema_struct, table.snapshot().unwrap().schema());
assert_eq!(
&expected_schema_struct,
table.snapshot().unwrap().schema().as_ref()
);
assert_batches_sorted_eq!(&expected, &actual);
}

Expand Down Expand Up @@ -2336,7 +2340,10 @@ mod tests {
];
let actual = get_data(&table).await;
let expected_schema_struct: StructType = Arc::clone(&schema).try_into_kernel().unwrap();
assert_eq!(&expected_schema_struct, table.snapshot().unwrap().schema());
assert_eq!(
&expected_schema_struct,
table.snapshot().unwrap().schema().as_ref()
);
assert_batches_sorted_eq!(&expected, &actual);
}

Expand Down Expand Up @@ -3410,7 +3417,10 @@ mod tests {
];
let actual = get_data(&table).await;
let expected_schema_struct: StructType = schema.as_ref().try_into_kernel().unwrap();
assert_eq!(&expected_schema_struct, table.snapshot().unwrap().schema());
assert_eq!(
&expected_schema_struct,
table.snapshot().unwrap().schema().as_ref()
);
assert_batches_sorted_eq!(&expected, &actual);
}

Expand Down Expand Up @@ -4244,7 +4254,10 @@ mod tests {
];
let actual = get_data(&table).await;
let expected_schema_struct: StructType = source_schema.try_into_kernel().unwrap();
assert_eq!(&expected_schema_struct, table.snapshot().unwrap().schema());
assert_eq!(
&expected_schema_struct,
table.snapshot().unwrap().schema().as_ref()
);
assert_batches_sorted_eq!(&expected, &actual);

let ctx = SessionContext::new();
Expand Down
2 changes: 1 addition & 1 deletion crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -859,7 +859,7 @@ pub async fn create_merge_plan(
predicate: serde_json::to_string(filters).ok(),
};
let file_schema = arrow_schema_without_partitions(
&Arc::new(snapshot.schema().try_into_arrow()?),
&Arc::new(snapshot.schema().as_ref().try_into_arrow()?),
partitions_keys,
);

Expand Down
Loading
Loading