diff --git a/crates/aws/src/logstore/dynamodb_logstore.rs b/crates/aws/src/logstore/dynamodb_logstore.rs index c8b3cb9c16..d43c280e50 100644 --- a/crates/aws/src/logstore/dynamodb_logstore.rs +++ b/crates/aws/src/logstore/dynamodb_logstore.rs @@ -148,7 +148,7 @@ impl S3DynamoDbLogStore { ), } } - unreachable!("for loop yields Ok or Err in body when retyr = MAX_REPAIR_RETRIES") + unreachable!("for loop yields Ok or Err in body when retry = MAX_REPAIR_RETRIES") } fn map_retry_result( diff --git a/crates/benchmarks/src/bin/merge.rs b/crates/benchmarks/src/bin/merge.rs index fe6c6b6510..0e1f7a6f9d 100644 --- a/crates/benchmarks/src/bin/merge.rs +++ b/crates/benchmarks/src/bin/merge.rs @@ -199,7 +199,7 @@ async fn benchmark_merge_tpcds( let table = DeltaTableBuilder::from_uri(table_url)?.load().await?; let provider = DeltaTableProvider::try_new( - table.snapshot()?.clone(), + table.snapshot()?.snapshot().clone(), table.log_store(), DeltaScanConfig { file_column_name: Some("file_path".to_string()), diff --git a/crates/core/src/delta_datafusion/expr.rs b/crates/core/src/delta_datafusion/expr.rs index c506355905..364c2e1c35 100644 --- a/crates/core/src/delta_datafusion/expr.rs +++ b/crates/core/src/delta_datafusion/expr.rs @@ -478,7 +478,7 @@ macro_rules! format_option { }}; } -/// Epoch days from ce calander until 1970-01-01 +/// Epoch days from ce calendar until 1970-01-01 pub const EPOCH_DAYS_FROM_CE: i32 = 719_163; struct ScalarValueFormat<'a> { diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 0c2cc7c0f3..a656b235e2 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -309,7 +309,7 @@ pub(crate) fn register_store(store: LogStoreRef, env: Arc) { /// columns must appear at the end of the schema. This is to align with how partition are handled /// at the physical level pub(crate) fn df_logical_schema( - snapshot: &DeltaTableState, + snapshot: &EagerSnapshot, file_column_name: &Option, schema: Option, ) -> DeltaResult { @@ -576,13 +576,13 @@ impl DeltaDataChecker { } /// Create a new DeltaDataChecker - pub fn new(snapshot: &DeltaTableState) -> Self { + pub fn new(snapshot: &EagerSnapshot) -> Self { let invariants = snapshot.schema().get_invariants().unwrap_or_default(); let generated_columns = snapshot .schema() .get_generated_columns() .unwrap_or_default(); - let constraints = snapshot.table_config().get_constraints(); + let constraints = snapshot.table_properties().get_constraints(); let non_nullable_columns = snapshot .schema() .fields() @@ -937,16 +937,17 @@ fn join_batches_with_add_actions( /// Determine which files contain a record that satisfies the predicate pub(crate) async fn find_files_scan( - snapshot: &DeltaTableState, + snapshot: &EagerSnapshot, log_store: LogStoreRef, state: &SessionState, expression: Expr, ) -> DeltaResult> { let candidate_map: HashMap = snapshot - .file_actions_iter(&log_store) - .map_ok(|add| (add.path.clone(), add.to_owned())) - .try_collect() - .await?; + .log_data() + .iter() + .map(|f| f.add_action()) + .map(|add| (add.path.clone(), add.to_owned())) + .collect(); let scan_config = DeltaScanConfigBuilder { include_file_column: true, @@ -997,10 +998,14 @@ pub(crate) async fn find_files_scan( pub(crate) async fn scan_memory_table( log_store: &dyn LogStore, - snapshot: &DeltaTableState, + snapshot: &EagerSnapshot, predicate: &Expr, ) -> DeltaResult> { - let actions = snapshot.file_actions(log_store).await?; + let actions = snapshot + .log_data() + .iter() + .map(|f| f.add_action()) + .collect_vec(); let batch = snapshot.add_actions_table(true)?; let mut arrays = Vec::new(); @@ -1052,7 +1057,7 @@ pub(crate) async fn scan_memory_table( /// Finds files in a snapshot that match the provided predicate. pub async fn find_files( - snapshot: &DeltaTableState, + snapshot: &EagerSnapshot, log_store: LogStoreRef, state: &SessionState, predicate: Option, @@ -1088,7 +1093,7 @@ pub async fn find_files( } } None => Ok(FindFiles { - candidates: snapshot.file_actions(&log_store).await?, + candidates: snapshot.log_data().iter().map(|f| f.add_action()).collect(), partition_scan: true, }), } @@ -1192,7 +1197,7 @@ impl From for Column { } } -/// Create a column, resuing the existing datafusion column +/// Create a column, reusing the existing datafusion column impl From for DeltaColumn { fn from(c: Column) -> Self { DeltaColumn { inner: c } @@ -1517,13 +1522,16 @@ mod tests { let table = crate::open_table(table_url).await.unwrap(); let config = DeltaScanConfigBuilder::new() .with_file_column_name(&"file_source") - .build(table.snapshot().unwrap()) + .build(table.snapshot().unwrap().snapshot()) .unwrap(); let log_store = table.log_store(); - let provider = - DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log_store, config) - .unwrap(); + let provider = DeltaTableProvider::try_new( + table.snapshot().unwrap().snapshot().clone(), + log_store, + config, + ) + .unwrap(); let ctx = SessionContext::new(); ctx.register_table("test", Arc::new(provider)).unwrap(); @@ -1581,13 +1589,16 @@ mod tests { .unwrap(); let config = DeltaScanConfigBuilder::new() - .build(table.snapshot().unwrap()) + .build(table.snapshot().unwrap().snapshot()) .unwrap(); let log_store = table.log_store(); - let provider = - DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log_store, config) - .unwrap(); + let provider = DeltaTableProvider::try_new( + table.snapshot().unwrap().snapshot().clone(), + log_store, + config, + ) + .unwrap(); let logical_schema = provider.schema(); let ctx = SessionContext::new(); ctx.register_table("test", Arc::new(provider)).unwrap(); @@ -1645,12 +1656,13 @@ mod tests { .unwrap(); let config = DeltaScanConfigBuilder::new() - .build(table.snapshot().unwrap()) + .build(table.snapshot().unwrap().snapshot()) .unwrap(); let log = table.log_store(); let provider = - DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap(); + DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config) + .unwrap(); let ctx: SessionContext = DeltaSessionContext::default().into(); ctx.register_table("test", Arc::new(provider)).unwrap(); @@ -1737,12 +1749,13 @@ mod tests { .unwrap(); let config = DeltaScanConfigBuilder::new() - .build(table.snapshot().unwrap()) + .build(table.snapshot().unwrap().snapshot()) .unwrap(); let log = table.log_store(); let provider = - DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap(); + DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config) + .unwrap(); let ctx: SessionContext = DeltaSessionContext::default().into(); ctx.register_table("test", Arc::new(provider)).unwrap(); @@ -1793,12 +1806,13 @@ mod tests { .unwrap(); let config = DeltaScanConfigBuilder::new() - .build(table.snapshot().unwrap()) + .build(table.snapshot().unwrap().snapshot()) .unwrap(); let log = table.log_store(); let provider = - DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap(); + DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config) + .unwrap(); let mut cfg = SessionConfig::default(); cfg.options_mut().execution.parquet.pushdown_filters = true; @@ -1889,12 +1903,13 @@ mod tests { .unwrap(); let config = DeltaScanConfigBuilder::new() - .build(table.snapshot().unwrap()) + .build(table.snapshot().unwrap().snapshot()) .unwrap(); let log = table.log_store(); let provider = - DeltaTableProvider::try_new(table.snapshot().unwrap().clone(), log, config).unwrap(); + DeltaTableProvider::try_new(table.snapshot().unwrap().snapshot().clone(), log, config) + .unwrap(); let ctx: SessionContext = DeltaSessionContext::default().into(); ctx.register_table("test", Arc::new(provider)).unwrap(); @@ -1972,11 +1987,15 @@ mod tests { let ctx = SessionContext::new(); let state = ctx.state(); - let scan = DeltaScanBuilder::new(table.snapshot().unwrap(), table.log_store(), &state) - .with_filter(Some(col("a").eq(lit("s")))) - .build() - .await - .unwrap(); + let scan = DeltaScanBuilder::new( + table.snapshot().unwrap().snapshot(), + table.log_store(), + &state, + ) + .with_filter(Some(col("a").eq(lit("s")))) + .build() + .await + .unwrap(); let mut visitor = ParquetVisitor::default(); visit_execution_plan(&scan, &mut visitor).unwrap(); @@ -1997,12 +2016,12 @@ mod tests { let snapshot = table.snapshot().unwrap(); let ctx = SessionContext::new(); let state = ctx.state(); - let scan = DeltaScanBuilder::new(snapshot, table.log_store(), &state) + let scan = DeltaScanBuilder::new(snapshot.snapshot(), table.log_store(), &state) .with_filter(Some(col("a").eq(lit("s")))) .with_scan_config( DeltaScanConfigBuilder::new() .with_parquet_pushdown(false) - .build(snapshot) + .build(snapshot.snapshot()) .unwrap(), ) .build() @@ -2032,7 +2051,7 @@ mod tests { let ctx = SessionContext::new_with_config(config); let state = ctx.state(); - let scan = DeltaScanBuilder::new(snapshot, table.log_store(), &state) + let scan = DeltaScanBuilder::new(snapshot.snapshot(), table.log_store(), &state) .build() .await .unwrap(); @@ -2151,7 +2170,7 @@ mod tests { .unwrap(); let config = DeltaScanConfigBuilder::new() - .build(table.snapshot().unwrap()) + .build(table.snapshot().unwrap().snapshot()) .unwrap(); let (object_store, mut operations) = @@ -2164,7 +2183,7 @@ mod tests { table.log_store().config().clone(), ); let provider = DeltaTableProvider::try_new( - table.snapshot().unwrap().clone(), + table.snapshot().unwrap().snapshot().clone(), Arc::new(log_store), config, ) diff --git a/crates/core/src/delta_datafusion/schema_adapter.rs b/crates/core/src/delta_datafusion/schema_adapter.rs index a1055b23c1..392964870c 100644 --- a/crates/core/src/delta_datafusion/schema_adapter.rs +++ b/crates/core/src/delta_datafusion/schema_adapter.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use crate::kernel::schema::cast_record_batch; use arrow_array::RecordBatch; use arrow_schema::{Schema, SchemaRef}; -use datafusion::common::{not_impl_err, ColumnStatistics}; +use datafusion::common::{not_impl_err, ColumnStatistics, Result}; use datafusion::datasource::schema_adapter::{SchemaAdapter, SchemaAdapterFactory, SchemaMapper}; /// A Schema Adapter Factory which provides casting record batches from parquet to meet @@ -39,10 +39,7 @@ impl SchemaAdapter for DeltaSchemaAdapter { Some(file_schema.fields.find(field.name())?.0) } - fn map_schema( - &self, - file_schema: &Schema, - ) -> datafusion::common::Result<(Arc, Vec)> { + fn map_schema(&self, file_schema: &Schema) -> Result<(Arc, Vec)> { let mut projection = Vec::with_capacity(file_schema.fields().len()); for (file_idx, file_field) in file_schema.fields.iter().enumerate() { @@ -71,7 +68,7 @@ pub(crate) struct SchemaMapping { } impl SchemaMapper for SchemaMapping { - fn map_batch(&self, batch: RecordBatch) -> datafusion::common::Result { + fn map_batch(&self, batch: RecordBatch) -> Result { let record_batch = cast_record_batch(&batch, self.projected_schema.clone(), false, true)?; Ok(record_batch) } @@ -79,7 +76,7 @@ impl SchemaMapper for SchemaMapping { fn map_column_statistics( &self, _file_col_statistics: &[ColumnStatistics], - ) -> datafusion::common::Result> { + ) -> Result> { not_impl_err!("Mapping column statistics is not implemented for DeltaSchemaAdapter") } } diff --git a/crates/core/src/delta_datafusion/table_provider.rs b/crates/core/src/delta_datafusion/table_provider.rs index 9696bfcd7a..df62050704 100644 --- a/crates/core/src/delta_datafusion/table_provider.rs +++ b/crates/core/src/delta_datafusion/table_provider.rs @@ -43,10 +43,11 @@ use datafusion::{ common::{HashMap, HashSet}, datasource::listing::PartitionedFile, logical_expr::{utils::conjunction, TableProviderFilterPushDown}, - prelude::{Expr, SessionContext}, + prelude::Expr, scalar::ScalarValue, }; use futures::TryStreamExt; +use itertools::Itertools; use object_store::ObjectMeta; use serde::{Deserialize, Serialize}; @@ -58,12 +59,12 @@ use crate::delta_datafusion::{ }; use crate::kernel::schema::cast::cast_record_batch; use crate::kernel::transaction::{CommitBuilder, PROTOCOL}; -use crate::kernel::{Action, Add, Remove}; +use crate::kernel::{Action, Add, EagerSnapshot, Remove}; use crate::operations::write::writer::{DeltaWriter, WriterConfig}; use crate::operations::write::WriterStatsConfig; use crate::protocol::{DeltaOperation, SaveMode}; use crate::{ensure_table_uri, DeltaTable}; -use crate::{logstore::LogStoreRef, table::state::DeltaTableState, DeltaResult, DeltaTableError}; +use crate::{logstore::LogStoreRef, DeltaResult, DeltaTableError}; use delta_kernel::table_properties::DataSkippingNumIndexedCols; const PATH_COLUMN: &str = "__delta_rs_path"; @@ -86,7 +87,7 @@ pub struct DeltaDataSink { /// The log store log_store: LogStoreRef, /// The snapshot - snapshot: DeltaTableState, + snapshot: EagerSnapshot, /// The save mode save_mode: SaveMode, /// The schema @@ -105,7 +106,7 @@ impl DeltaDataSink { /// Create a new `DeltaDataSink` pub fn new( log_store: LogStoreRef, - snapshot: DeltaTableState, + snapshot: EagerSnapshot, save_mode: SaveMode, session_state: Arc, ) -> datafusion::common::Result { @@ -213,11 +214,7 @@ impl DataSink for DeltaDataSink { .map_err(|e| DataFusionError::External(Box::new(e)))?; let mut actions = if self.save_mode == SaveMode::Overwrite { - let current_files = self - .snapshot - .file_actions(&*self.log_store) - .await - .map_err(|e| DataFusionError::External(Box::new(e)))?; + let current_files = self.snapshot.log_data().iter().map(|f| f.add_action()); current_files .into_iter() .map(|add| { @@ -339,7 +336,7 @@ impl DeltaScanConfigBuilder { } /// Build a DeltaScanConfig and ensure no column name conflicts occur during downstream processing - pub fn build(&self, snapshot: &DeltaTableState) -> DeltaResult { + pub fn build(&self, snapshot: &EagerSnapshot) -> DeltaResult { let file_column_name = if self.include_file_column { let input_schema = snapshot.input_schema()?; let mut column_names: HashSet<&String> = HashSet::new(); @@ -397,7 +394,7 @@ pub struct DeltaScanConfig { } pub(crate) struct DeltaScanBuilder<'a> { - snapshot: &'a DeltaTableState, + snapshot: &'a EagerSnapshot, log_store: LogStoreRef, filter: Option, session: &'a dyn Session, @@ -409,7 +406,7 @@ pub(crate) struct DeltaScanBuilder<'a> { impl<'a> DeltaScanBuilder<'a> { pub fn new( - snapshot: &'a DeltaTableState, + snapshot: &'a EagerSnapshot, log_store: LogStoreRef, session: &'a dyn Session, ) -> Self { @@ -488,13 +485,12 @@ impl<'a> DeltaScanBuilder<'a> { logical_schema }; - let context = SessionContext::new(); let df_schema = logical_schema.clone().to_dfschema()?; let logical_filter = self .filter .clone() - .map(|expr| simplify_expr(&context, &df_schema, expr)); + .map(|expr| simplify_expr(self.session, &df_schema, expr)); // only inexact filters should be pushed down to the data source, doing otherwise // will make stats inexact and disable datafusion optimizations like AggregateStatistics let pushdown_filter = self @@ -516,7 +512,7 @@ impl<'a> DeltaScanBuilder<'a> { }); conjunction(filtered_predicates) }) - .map(|expr| simplify_expr(&context, &df_schema, expr)); + .map(|expr| simplify_expr(self.session, &df_schema, expr)); // Perform Pruning of files to scan let (files, files_scanned, files_pruned, pruning_mask) = match self.files { @@ -528,7 +524,12 @@ impl<'a> DeltaScanBuilder<'a> { None => { // early return in case we have no push down filters or limit if logical_filter.is_none() && self.limit.is_none() { - let files = self.snapshot.file_actions(&self.log_store).await?; + let files = self + .snapshot + .log_data() + .iter() + .map(|f| f.add_action()) + .collect_vec(); let files_scanned = files.len(); (files, files_scanned, 0, None) } else { @@ -550,9 +551,10 @@ impl<'a> DeltaScanBuilder<'a> { let file_actions: Vec<_> = self .snapshot - .file_actions_iter(&self.log_store) - .try_collect() - .await?; + .log_data() + .iter() + .map(|f| f.add_action()) + .collect(); for (action, keep) in file_actions.into_iter().zip(files_to_prune.iter().cloned()) @@ -649,10 +651,11 @@ impl<'a> DeltaScanBuilder<'a> { // Should we update datafusion_table_statistics to optionally take the mask? let stats = if let Some(mask) = pruning_mask { let es = self.snapshot.snapshot(); - let pruned_stats = filter_record_batch(&es.files, &BooleanArray::from(mask))?; + let pruned_stats = + filter_record_batch(&self.snapshot.files, &BooleanArray::from(mask))?; LogDataHandler::new(&pruned_stats, es.table_configuration()).statistics() } else { - self.snapshot.datafusion_table_statistics() + self.snapshot.log_data().statistics() }; let stats = stats.unwrap_or(Statistics::new_unknown(&schema)); @@ -712,7 +715,8 @@ impl<'a> DeltaScanBuilder<'a> { } } -// TODO: implement this for Snapshot, not for DeltaTable +// TODO: implement this for Snapshot, not for DeltaTable since DeltaTable has unknown load state. +// the unwraps in the schema method are a dead giveaway .. #[async_trait::async_trait] impl TableProvider for DeltaTable { fn as_any(&self) -> &dyn Any { @@ -745,7 +749,7 @@ impl TableProvider for DeltaTable { register_store(self.log_store(), session.runtime_env().clone()); let filter_expr = conjunction(filters.iter().cloned()); - let scan = DeltaScanBuilder::new(self.snapshot()?, self.log_store(), session) + let scan = DeltaScanBuilder::new(self.snapshot()?.snapshot(), self.log_store(), session) .with_projection(projection) .with_limit(limit) .with_filter(filter_expr) @@ -771,7 +775,7 @@ impl TableProvider for DeltaTable { /// A Delta table provider that enables additional metadata columns to be included during the scan #[derive(Debug)] pub struct DeltaTableProvider { - snapshot: DeltaTableState, + snapshot: EagerSnapshot, log_store: LogStoreRef, config: DeltaScanConfig, schema: Arc, @@ -781,7 +785,7 @@ pub struct DeltaTableProvider { impl DeltaTableProvider { /// Build a DeltaTableProvider pub fn try_new( - snapshot: DeltaTableState, + snapshot: EagerSnapshot, log_store: LogStoreRef, config: DeltaScanConfig, ) -> DeltaResult { @@ -856,7 +860,7 @@ impl TableProvider for DeltaTableProvider { } fn statistics(&self) -> Option { - self.snapshot.datafusion_table_statistics() + self.snapshot.log_data().statistics() } /// Insert the data into the delta table @@ -875,9 +879,9 @@ impl TableProvider for DeltaTableProvider { InsertOp::Append => SaveMode::Append, InsertOp::Overwrite => SaveMode::Overwrite, InsertOp::Replace => { - return Err(DataFusionError::Plan(format!( - "Replace operation is not supported for DeltaTableProvider" - ))) + return Err(DataFusionError::Plan( + "Replace operation is not supported for DeltaTableProvider".to_string(), + )) } }; @@ -1004,7 +1008,7 @@ impl ExecutionPlan for DeltaScan { /// columns must appear at the end of the schema. This is to align with how partition are handled /// at the physical level fn df_logical_schema( - snapshot: &DeltaTableState, + snapshot: &EagerSnapshot, file_column_name: &Option, schema: Option, ) -> DeltaResult { @@ -1037,11 +1041,7 @@ fn df_logical_schema( Ok(Arc::new(Schema::new(fields))) } -fn simplify_expr( - context: &SessionContext, - df_schema: &DFSchema, - expr: Expr, -) -> Arc { +fn simplify_expr(context: &dyn Session, df_schema: &DFSchema, expr: Expr) -> Arc { // Simplify the expression first let props = ExecutionProps::new(); let simplify_context = SimplifyContext::new(&props).with_schema(df_schema.clone().into()); @@ -1171,7 +1171,6 @@ fn partitioned_file_from_action( mod tests { use crate::kernel::{DataType, PrimitiveType, StructField, StructType}; use crate::operations::create::CreateBuilder; - use crate::protocol::SaveMode; use crate::{DeltaTable, DeltaTableError}; use arrow::array::{Int64Array, StringArray}; use arrow::datatypes::{DataType as ArrowDataType, Field, Schema}; @@ -1225,10 +1224,10 @@ mod tests { let session_state = create_test_session_state(); let scan_config = DeltaScanConfigBuilder::new() - .build(table.snapshot().unwrap()) + .build(table.snapshot().unwrap().snapshot()) .unwrap(); let table_provider = DeltaTableProvider::try_new( - table.snapshot().unwrap().clone(), + table.snapshot().unwrap().snapshot().clone(), table.log_store(), scan_config, ) diff --git a/crates/core/src/kernel/snapshot/iterators.rs b/crates/core/src/kernel/snapshot/iterators.rs index 3bab97ea61..b27bc297f2 100644 --- a/crates/core/src/kernel/snapshot/iterators.rs +++ b/crates/core/src/kernel/snapshot/iterators.rs @@ -255,7 +255,7 @@ impl LogicalFileView { dv_col.column(*DV_FIELD_INDICES.get(DV_FIELD_STORAGE_TYPE).unwrap()); storage_col .is_valid(self.index) - .then(|| DeletionVectorView { + .then_some(DeletionVectorView { data: dv_col, index: self.index, }) diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index 029581ae03..c11bc9c177 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -531,7 +531,7 @@ mod tests { let json_adds: Vec<_> = table_from_json_stats .snapshot() .unwrap() - .snapshot + .snapshot() .files(&log_store, None) .try_collect() .await @@ -548,7 +548,7 @@ mod tests { let struct_adds: Vec<_> = table_from_struct_stats .snapshot() .unwrap() - .snapshot + .snapshot() .files(&log_store, None) .try_collect() .await @@ -588,7 +588,7 @@ mod tests { let file_stats = table_from_struct_stats .snapshot() .unwrap() - .snapshot + .snapshot() .log_data(); let col_stats = file_stats.statistics(); diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 25aec85158..c923b498a1 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -44,7 +44,7 @@ use crate::kernel::parse::read_removes; use crate::kernel::transaction::CommitData; use crate::kernel::{ActionType, StructType}; use crate::logstore::{LogStore, LogStoreExt}; -use crate::{DeltaResult, DeltaTableConfig, DeltaTableError}; +use crate::{to_kernel_predicate, DeltaResult, DeltaTableConfig, DeltaTableError, PartitionFilter}; pub use self::log_data::*; pub use iterators::*; @@ -652,15 +652,13 @@ impl EagerSnapshot { } }; - let files = match concat_batches(&SCAN_ROW_ARROW_SCHEMA, &files) + match concat_batches(&SCAN_ROW_ARROW_SCHEMA, &files) .map_err(DeltaTableError::from) .and_then(|batch| self.snapshot.inner.parse_stats_column(&batch)) { Ok(files) => files, Err(err) => return Box::pin(futures::stream::once(async { Err(err) })), - }; - - files + } } else { self.files.clone() }; @@ -668,6 +666,22 @@ impl EagerSnapshot { futures::stream::iter(iter).boxed() } + #[deprecated(since = "0.30.0", note = "Use `files` with kernel predicate instead.")] + pub fn file_views_by_partitions( + &self, + log_store: &dyn LogStore, + filters: &[PartitionFilter], + ) -> BoxStream<'_, DeltaResult> { + if filters.is_empty() { + return self.files(log_store, None); + } + let predicate = match to_kernel_predicate(filters, self.snapshot.schema()) { + Ok(predicate) => Arc::new(predicate), + Err(err) => return Box::pin(futures::stream::once(async { Err(err) })), + }; + self.files(log_store, Some(predicate)) + } + /// Iterate over all latest app transactions pub async fn transaction_version( &self, diff --git a/crates/core/src/kernel/transaction/protocol.rs b/crates/core/src/kernel/transaction/protocol.rs index 49aa5a0e5b..72dbf1b039 100644 --- a/crates/core/src/kernel/transaction/protocol.rs +++ b/crates/core/src/kernel/transaction/protocol.rs @@ -9,7 +9,6 @@ use crate::kernel::{ }; use crate::protocol::DeltaOperation; use crate::table::config::TablePropertiesExt as _; -use crate::table::state::DeltaTableState; static READER_V2: LazyLock> = LazyLock::new(|| HashSet::from_iter([ReaderFeature::ColumnMapping])); @@ -95,7 +94,7 @@ impl ProtocolChecker { /// Check can write_timestamp_ntz pub fn check_can_write_timestamp_ntz( &self, - snapshot: &DeltaTableState, + snapshot: &EagerSnapshot, schema: &Schema, ) -> Result<(), TransactionError> { let contains_timestampntz = contains_timestampntz(schema.fields()); diff --git a/crates/core/src/kernel/transaction/state.rs b/crates/core/src/kernel/transaction/state.rs index 004d23dc86..b7603d7249 100644 --- a/crates/core/src/kernel/transaction/state.rs +++ b/crates/core/src/kernel/transaction/state.rs @@ -320,7 +320,7 @@ mod tests { ))); let state = DeltaTableState::from_actions(actions).await.unwrap(); - let files = files_matching_predicate(state.snapshot.log_data(), &[]) + let files = files_matching_predicate(state.snapshot().log_data(), &[]) .unwrap() .collect::>(); assert_eq!(files.len(), 3); @@ -329,7 +329,7 @@ mod tests { .gt(lit::(10)) .or(col("value").lt_eq(lit::(0))); - let files = files_matching_predicate(state.snapshot.log_data(), &[predictate]) + let files = files_matching_predicate(state.snapshot().log_data(), &[predictate]) .unwrap() .collect::>(); assert_eq!(files.len(), 2); diff --git a/crates/core/src/logstore/mod.rs b/crates/core/src/logstore/mod.rs index 983e2e64ca..d505f0dfaa 100644 --- a/crates/core/src/logstore/mod.rs +++ b/crates/core/src/logstore/mod.rs @@ -341,7 +341,7 @@ pub trait LogStore: Send + Sync + AsAny { #[deprecated( since = "0.1.0", - note = "DO NOT USE: Just a stop grap to support lakefs during kernel migration" + note = "DO NOT USE: Just a stop gap to support lakefs during kernel migration" )] fn transaction_url(&self, _operation_id: Uuid, base: &Url) -> DeltaResult { Ok(base.clone()) diff --git a/crates/core/src/operations/add_column.rs b/crates/core/src/operations/add_column.rs index fe00722ea0..ef6c4e66b0 100644 --- a/crates/core/src/operations/add_column.rs +++ b/crates/core/src/operations/add_column.rs @@ -9,16 +9,15 @@ use itertools::Itertools; use super::{CustomExecuteHandler, Operation}; use crate::kernel::schema::merge_delta_struct; use crate::kernel::transaction::{CommitBuilder, CommitProperties}; -use crate::kernel::{MetadataExt, ProtocolExt as _, StructField, StructTypeExt}; +use crate::kernel::{EagerSnapshot, MetadataExt, ProtocolExt as _, StructField, StructTypeExt}; use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; -use crate::table::state::DeltaTableState; use crate::{DeltaResult, DeltaTable, DeltaTableError}; /// Add new columns and/or nested fields to a table pub struct AddColumnBuilder { /// A snapshot of the table's state - snapshot: DeltaTableState, + snapshot: EagerSnapshot, /// Fields to add/merge into schema fields: Option>, /// Delta object store for handling data files @@ -39,7 +38,7 @@ impl Operation<()> for AddColumnBuilder { impl AddColumnBuilder { /// Create a new builder - pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { Self { snapshot, log_store, diff --git a/crates/core/src/operations/add_feature.rs b/crates/core/src/operations/add_feature.rs index bfc8b4293d..58a7f37d3b 100644 --- a/crates/core/src/operations/add_feature.rs +++ b/crates/core/src/operations/add_feature.rs @@ -8,17 +8,16 @@ use itertools::Itertools; use super::{CustomExecuteHandler, Operation}; use crate::kernel::transaction::{CommitBuilder, CommitProperties}; -use crate::kernel::{ProtocolExt as _, TableFeatures}; +use crate::kernel::{EagerSnapshot, ProtocolExt as _, TableFeatures}; use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; -use crate::table::state::DeltaTableState; use crate::DeltaTable; use crate::{DeltaResult, DeltaTableError}; /// Enable table features for a table pub struct AddTableFeatureBuilder { /// A snapshot of the table's state - snapshot: DeltaTableState, + snapshot: EagerSnapshot, /// Name of the feature name: Vec, /// Allow protocol versions to be increased by setting features @@ -41,7 +40,7 @@ impl super::Operation<()> for AddTableFeatureBuilder { impl AddTableFeatureBuilder { /// Create a new builder - pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { Self { name: vec![], allow_protocol_versions_increase: false, diff --git a/crates/core/src/operations/cdc.rs b/crates/core/src/operations/cdc.rs index ae5959c66e..cf71d6dd44 100644 --- a/crates/core/src/operations/cdc.rs +++ b/crates/core/src/operations/cdc.rs @@ -2,8 +2,8 @@ //! The CDC module contains private tools for managing CDC files //! +use crate::kernel::EagerSnapshot; use crate::table::config::TablePropertiesExt as _; -use crate::table::state::DeltaTableState; use crate::DeltaResult; use datafusion::common::ScalarValue; @@ -65,7 +65,7 @@ impl CDCTracker { /// > For Writer Version 7, all writers must respect the delta.enableChangeDataFeed configuration flag in /// > the metadata of the table only if the feature changeDataFeed exists in the table protocol's /// > writerFeatures. -pub(crate) fn should_write_cdc(snapshot: &DeltaTableState) -> DeltaResult { +pub(crate) fn should_write_cdc(snapshot: &EagerSnapshot) -> DeltaResult { if let Some(features) = &snapshot.protocol().writer_features() { // Features should only exist at writer version 7 but to avoid cases where // the Option> can get filled with an empty set, checking for the value @@ -78,7 +78,7 @@ pub(crate) fn should_write_cdc(snapshot: &DeltaTableState) -> DeltaResult return Ok(false); } } - Ok(snapshot.table_config().enable_change_data_feed()) + Ok(snapshot.table_properties().enable_change_data_feed()) } #[cfg(test)] @@ -113,7 +113,8 @@ mod tests { .await .expect("Failed to make a table"); table.load().await.expect("Failed to reload table"); - let result = should_write_cdc(table.snapshot().unwrap()).expect("Failed to use table"); + let result = + should_write_cdc(table.snapshot().unwrap().snapshot()).expect("Failed to use table"); assert!(!result, "A default table should not create CDC files"); } @@ -137,7 +138,8 @@ mod tests { .expect("failed to make a version 4 table with EnableChangeDataFeed"); table.load().await.expect("Failed to reload table"); - let result = should_write_cdc(table.snapshot().unwrap()).expect("Failed to use table"); + let result = + should_write_cdc(table.snapshot().unwrap().snapshot()).expect("Failed to use table"); assert!( result, "A table with the EnableChangeDataFeed should create CDC files" @@ -163,7 +165,8 @@ mod tests { .expect("failed to make a version 4 table with EnableChangeDataFeed"); table.load().await.expect("Failed to reload table"); - let result = should_write_cdc(table.snapshot().unwrap()).expect("Failed to use table"); + let result = + should_write_cdc(table.snapshot().unwrap().snapshot()).expect("Failed to use table"); assert!( !result, "A v7 table must not write CDC files unless the writer feature is set" @@ -193,7 +196,8 @@ mod tests { .expect("failed to make a version 4 table with EnableChangeDataFeed"); table.load().await.expect("Failed to reload table"); - let result = should_write_cdc(table.snapshot().unwrap()).expect("Failed to use table"); + let result = + should_write_cdc(table.snapshot().unwrap().snapshot()).expect("Failed to use table"); assert!( result, "A v7 table must not write CDC files unless the writer feature is set" diff --git a/crates/core/src/operations/constraints.rs b/crates/core/src/operations/constraints.rs index 10ba729f0f..8581668a78 100644 --- a/crates/core/src/operations/constraints.rs +++ b/crates/core/src/operations/constraints.rs @@ -18,18 +18,17 @@ use crate::delta_datafusion::{ register_store, DeltaDataChecker, DeltaScanBuilder, DeltaSessionContext, }; use crate::kernel::transaction::{CommitBuilder, CommitProperties}; -use crate::kernel::{MetadataExt, ProtocolExt as _, ProtocolInner}; +use crate::kernel::{EagerSnapshot, MetadataExt, ProtocolExt as _, ProtocolInner}; use crate::logstore::LogStoreRef; use crate::operations::datafusion_utils::Expression; use crate::protocol::DeltaOperation; -use crate::table::state::DeltaTableState; use crate::table::Constraint; use crate::{DeltaResult, DeltaTable, DeltaTableError}; /// Build a constraint to add to a table pub struct ConstraintBuilder { /// A snapshot of the table's state - snapshot: DeltaTableState, + snapshot: EagerSnapshot, /// Name of the constraint name: Option, /// Constraint expression @@ -54,7 +53,7 @@ impl super::Operation<()> for ConstraintBuilder { impl ConstraintBuilder { /// Create a new builder - pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { Self { name: None, expr: None, diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index 63c812c683..07123d43de 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -53,7 +53,7 @@ use crate::delta_datafusion::{ }; use crate::errors::DeltaResult; use crate::kernel::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; -use crate::kernel::{Action, Add, Remove}; +use crate::kernel::{Action, Add, EagerSnapshot, Remove}; use crate::logstore::LogStoreRef; use crate::operations::write::execution::{write_execution_plan, write_execution_plan_cdc}; use crate::operations::write::WriterStatsConfig; @@ -72,7 +72,7 @@ pub struct DeleteBuilder { /// Which records to delete predicate: Option, /// A snapshot of the table's state - snapshot: DeltaTableState, + snapshot: EagerSnapshot, /// Delta object store for handling data files log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan @@ -114,7 +114,7 @@ impl super::Operation<()> for DeleteBuilder { impl DeleteBuilder { /// Create a new [`DeleteBuilder`] - pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { Self { predicate: None, snapshot, @@ -189,7 +189,7 @@ impl ExtensionPlanner for DeleteMetricExtensionPlanner { #[allow(clippy::too_many_arguments)] async fn execute_non_empty_expr( - snapshot: &DeltaTableState, + snapshot: &EagerSnapshot, log_store: LogStoreRef, state: &SessionState, expression: &Expr, @@ -235,9 +235,9 @@ async fn execute_non_empty_expr( let df = DataFrame::new(state.clone(), source); let writer_stats_config = WriterStatsConfig::new( - snapshot.table_config().num_indexed_cols(), + snapshot.table_properties().num_indexed_cols(), snapshot - .table_config() + .table_properties() .data_skipping_stats_columns .as_ref() .map(|v| v.iter().map(|v| v.to_string()).collect::>()), @@ -259,7 +259,7 @@ async fn execute_non_empty_expr( filter.clone(), table_partition_cols.clone(), log_store.object_store(Some(operation_id)), - Some(snapshot.table_config().target_file_size().get() as usize), + Some(snapshot.table_properties().target_file_size().get() as usize), None, writer_properties.clone(), writer_stats_config.clone(), @@ -295,7 +295,7 @@ async fn execute_non_empty_expr( cdc_filter, table_partition_cols.clone(), log_store.object_store(Some(operation_id)), - Some(snapshot.table_config().target_file_size().get() as usize), + Some(snapshot.table_properties().target_file_size().get() as usize), None, writer_properties, writer_stats_config, @@ -311,13 +311,13 @@ async fn execute_non_empty_expr( async fn execute( predicate: Option, log_store: LogStoreRef, - snapshot: DeltaTableState, + snapshot: EagerSnapshot, state: SessionState, writer_properties: Option, mut commit_properties: CommitProperties, operation_id: Uuid, handle: Option<&Arc>, -) -> DeltaResult<(DeltaTableState, DeleteMetrics)> { +) -> DeltaResult<(EagerSnapshot, DeleteMetrics)> { if !&snapshot.load_config().require_files { return Err(DeltaTableError::NotInitializedWithFiles("DELETE".into())); } @@ -401,7 +401,7 @@ async fn execute( if let Some(handler) = handle { handler.post_execute(&log_store, operation_id).await?; } - Ok((commit.snapshot(), metrics)) + Ok((commit.snapshot().snapshot, metrics)) } impl std::future::IntoFuture for DeleteBuilder { @@ -412,8 +412,8 @@ impl std::future::IntoFuture for DeleteBuilder { let this = self; Box::pin(async move { - PROTOCOL.check_append_only(&this.snapshot.snapshot)?; - PROTOCOL.can_write_to(&this.snapshot.snapshot)?; + PROTOCOL.check_append_only(&this.snapshot)?; + PROTOCOL.can_write_to(&this.snapshot)?; let operation_id = this.get_operation_id(); this.pre_execute(operation_id).await?; @@ -450,7 +450,12 @@ impl std::future::IntoFuture for DeleteBuilder { .await?; Ok(( - DeltaTable::new_with_state(this.log_store, new_snapshot), + DeltaTable::new_with_state( + this.log_store, + DeltaTableState { + snapshot: new_snapshot, + }, + ), metrics, )) }) diff --git a/crates/core/src/operations/drop_constraints.rs b/crates/core/src/operations/drop_constraints.rs index 26543c0ac3..36d85429cb 100644 --- a/crates/core/src/operations/drop_constraints.rs +++ b/crates/core/src/operations/drop_constraints.rs @@ -6,7 +6,7 @@ use futures::future::BoxFuture; use super::{CustomExecuteHandler, Operation}; use crate::kernel::transaction::{CommitBuilder, CommitProperties}; -use crate::kernel::{Action, MetadataExt}; +use crate::kernel::{Action, EagerSnapshot, MetadataExt}; use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; @@ -16,7 +16,7 @@ use crate::{DeltaResult, DeltaTableError}; /// Remove constraints from the table pub struct DropConstraintBuilder { /// A snapshot of the table's state - snapshot: DeltaTableState, + snapshot: EagerSnapshot, /// Name of the constraint name: Option, /// Raise if constraint doesn't exist @@ -39,7 +39,7 @@ impl super::Operation<()> for DropConstraintBuilder { impl DropConstraintBuilder { /// Create a new builder - pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { Self { name: None, raise_if_not_exists: true, @@ -101,7 +101,12 @@ impl std::future::IntoFuture for DropConstraintBuilder { "Constraint with name '{name}' does not exist." ))); } - return Ok(DeltaTable::new_with_state(this.log_store, this.snapshot)); + return Ok(DeltaTable::new_with_state( + this.log_store, + DeltaTableState { + snapshot: this.snapshot, + }, + )); } metadata = metadata.remove_config_key(&configuration_key)?; diff --git a/crates/core/src/operations/filesystem_check.rs b/crates/core/src/operations/filesystem_check.rs index 10c38e6134..0e2c6a1ea9 100644 --- a/crates/core/src/operations/filesystem_check.rs +++ b/crates/core/src/operations/filesystem_check.rs @@ -29,6 +29,7 @@ use super::CustomExecuteHandler; use super::Operation; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::transaction::{CommitBuilder, CommitProperties}; +use crate::kernel::EagerSnapshot; use crate::kernel::{Action, Add, Remove}; use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; @@ -39,7 +40,7 @@ use crate::DeltaTable; /// See this module's documentation for more information pub struct FileSystemCheckBuilder { /// A snapshot of the to-be-checked table's state - snapshot: DeltaTableState, + snapshot: EagerSnapshot, /// Delta object store for handling data files log_store: LogStoreRef, /// Don't remove actions to the table log. Just determine which files can be removed @@ -109,9 +110,9 @@ impl super::Operation<()> for FileSystemCheckBuilder { impl FileSystemCheckBuilder { /// Create a new [`FileSystemCheckBuilder`] - pub fn new(log_store: LogStoreRef, state: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { FileSystemCheckBuilder { - snapshot: state, + snapshot, log_store, dry_run: false, commit_properties: CommitProperties::default(), @@ -140,9 +141,8 @@ impl FileSystemCheckBuilder { async fn create_fsck_plan(&self) -> DeltaResult { let mut files_relative: HashMap = HashMap::new(); let log_store = self.log_store.clone(); - let mut file_stream = self.snapshot.file_actions_iter(&self.log_store); - while let Some(active) = file_stream.next().await { - let active = active?; + let file_stream = self.snapshot.log_data().iter().map(|f| f.add_action()); + for active in file_stream { if is_absolute_path(&active.path)? { return Err(DeltaTableError::Generic( "Filesystem check does not support absolute paths".to_string(), @@ -178,7 +178,7 @@ impl FileSystemCheckBuilder { impl FileSystemCheckPlan { pub async fn execute( self, - snapshot: &DeltaTableState, + snapshot: &EagerSnapshot, mut commit_properties: CommitProperties, operation_id: Uuid, handle: Option>, @@ -242,7 +242,12 @@ impl std::future::IntoFuture for FileSystemCheckBuilder { let plan = this.create_fsck_plan().await?; if this.dry_run { return Ok(( - DeltaTable::new_with_state(this.log_store, this.snapshot), + DeltaTable::new_with_state( + this.log_store, + DeltaTableState { + snapshot: this.snapshot, + }, + ), FileSystemCheckMetrics { files_removed: plan.files_to_remove.into_iter().map(|f| f.path).collect(), dry_run: true, @@ -251,7 +256,12 @@ impl std::future::IntoFuture for FileSystemCheckBuilder { } if plan.files_to_remove.is_empty() { return Ok(( - DeltaTable::new_with_state(this.log_store, this.snapshot), + DeltaTable::new_with_state( + this.log_store, + DeltaTableState { + snapshot: this.snapshot, + }, + ), FileSystemCheckMetrics { dry_run: false, files_removed: Vec::new(), @@ -272,7 +282,12 @@ impl std::future::IntoFuture for FileSystemCheckBuilder { this.post_execute(operation_id).await?; - let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot); + let mut table = DeltaTable::new_with_state( + this.log_store, + DeltaTableState { + snapshot: this.snapshot, + }, + ); table.update().await?; Ok((table, metrics)) }) diff --git a/crates/core/src/operations/load.rs b/crates/core/src/operations/load.rs index 101d279ae1..8bcdf66eb1 100644 --- a/crates/core/src/operations/load.rs +++ b/crates/core/src/operations/load.rs @@ -10,6 +10,7 @@ use super::CustomExecuteHandler; use crate::delta_datafusion::DataFusionMixins; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::transaction::PROTOCOL; +use crate::kernel::EagerSnapshot; use crate::logstore::LogStoreRef; use crate::table::state::DeltaTableState; use crate::DeltaTable; @@ -17,7 +18,7 @@ use crate::DeltaTable; #[derive(Debug, Clone)] pub struct LoadBuilder { /// A snapshot of the to-be-loaded table's state - snapshot: DeltaTableState, + snapshot: EagerSnapshot, /// Delta object store for handling data files log_store: LogStoreRef, /// A sub-selection of columns to be loaded @@ -35,7 +36,7 @@ impl super::Operation<()> for LoadBuilder { impl LoadBuilder { /// Create a new [`LoadBuilder`] - pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { Self { snapshot, log_store, @@ -58,12 +59,17 @@ impl std::future::IntoFuture for LoadBuilder { let this = self; Box::pin(async move { - PROTOCOL.can_read_from(&this.snapshot.snapshot)?; + PROTOCOL.can_read_from(&this.snapshot)?; if !this.snapshot.load_config().require_files { return Err(DeltaTableError::NotInitializedWithFiles("reading".into())); } - let table = DeltaTable::new_with_state(this.log_store, this.snapshot); + let table = DeltaTable::new_with_state( + this.log_store, + DeltaTableState { + snapshot: this.snapshot, + }, + ); let schema = table.snapshot()?.arrow_schema()?; let projection = this .columns diff --git a/crates/core/src/operations/load_cdf.rs b/crates/core/src/operations/load_cdf.rs index 6dc76fd03b..c311155cc6 100644 --- a/crates/core/src/operations/load_cdf.rs +++ b/crates/core/src/operations/load_cdf.rs @@ -33,9 +33,8 @@ use tracing::log; use crate::delta_datafusion::{register_store, DataFusionMixins}; use crate::errors::DeltaResult; use crate::kernel::transaction::PROTOCOL; -use crate::kernel::{Action, Add, AddCDCFile, CommitInfo}; +use crate::kernel::{Action, Add, AddCDCFile, CommitInfo, EagerSnapshot}; use crate::logstore::{get_actions, LogStoreRef}; -use crate::table::state::DeltaTableState; use crate::DeltaTableError; use crate::{delta_datafusion::cdf::*, kernel::Remove}; @@ -43,7 +42,7 @@ use crate::{delta_datafusion::cdf::*, kernel::Remove}; #[derive(Clone, Debug)] pub struct CdfLoadBuilder { /// A snapshot of the to-be-loaded table's state - pub snapshot: DeltaTableState, + pub snapshot: EagerSnapshot, /// Delta object store for handling data files log_store: LogStoreRef, /// Version to read from @@ -60,7 +59,7 @@ pub struct CdfLoadBuilder { impl CdfLoadBuilder { /// Create a new [`LoadBuilder`] - pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { Self { snapshot, log_store, diff --git a/crates/core/src/operations/merge/filter.rs b/crates/core/src/operations/merge/filter.rs index 64039a0341..d6bf31ed35 100644 --- a/crates/core/src/operations/merge/filter.rs +++ b/crates/core/src/operations/merge/filter.rs @@ -14,7 +14,7 @@ use either::{Left, Right}; use itertools::Itertools; use crate::delta_datafusion::execute_plan_to_batch; -use crate::table::state::DeltaTableState; +use crate::kernel::EagerSnapshot; use crate::{DeltaResult, DeltaTableError}; #[derive(Debug)] @@ -324,7 +324,7 @@ pub(crate) fn generalize_filter( pub(crate) async fn try_construct_early_filter( join_predicate: Expr, - table_snapshot: &DeltaTableState, + table_snapshot: &EagerSnapshot, session_state: &SessionState, source: &LogicalPlan, source_name: &TableReference, @@ -457,7 +457,7 @@ mod tests { let pred = try_construct_early_filter( join_predicate, - table.snapshot().unwrap(), + table.snapshot().unwrap().snapshot(), &ctx.state(), &source, &source_name, @@ -548,7 +548,7 @@ mod tests { let pred = try_construct_early_filter( join_predicate, - table.snapshot().unwrap(), + table.snapshot().unwrap().snapshot(), &ctx.state(), &source, &source_name, @@ -606,7 +606,7 @@ mod tests { let pred = try_construct_early_filter( join_predicate, - table.snapshot().unwrap(), + table.snapshot().unwrap().snapshot(), &ctx.state(), &source, &source_name, @@ -669,7 +669,7 @@ mod tests { let pred = try_construct_early_filter( join_predicate, - table.snapshot().unwrap(), + table.snapshot().unwrap().snapshot(), &ctx.state(), &source_plan, &source_name, @@ -738,7 +738,7 @@ mod tests { let pred = try_construct_early_filter( join_predicate, - table.snapshot().unwrap(), + table.snapshot().unwrap().snapshot(), &ctx.state(), &source_plan, &source_name, @@ -810,7 +810,7 @@ mod tests { let pred = try_construct_early_filter( join_predicate, - table.snapshot().unwrap(), + table.snapshot().unwrap().snapshot(), &ctx.state(), &source_plan, &source_name, diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index 50bf57f139..f32f39ad6e 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -81,7 +81,7 @@ use crate::delta_datafusion::{ }; use crate::kernel::schema::cast::{merge_arrow_field, merge_arrow_schema}; use crate::kernel::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; -use crate::kernel::{new_metadata, Action, StructTypeExt}; +use crate::kernel::{new_metadata, Action, EagerSnapshot, StructTypeExt}; use crate::logstore::LogStoreRef; use crate::operations::cdc::*; use crate::operations::merge::barrier::find_node; @@ -134,7 +134,7 @@ pub struct MergeBuilder { ///Prefix target columns with a user provided prefix target_alias: Option, /// A snapshot of the table's state. AKA the target table in the operation - snapshot: DeltaTableState, + snapshot: EagerSnapshot, /// The source data source: DataFrame, /// Whether the source is a streaming source (if true, stats deducing to prune target is disabled) @@ -168,7 +168,7 @@ impl MergeBuilder { /// Create a new [`MergeBuilder`] pub fn new>( log_store: LogStoreRef, - snapshot: DeltaTableState, + snapshot: EagerSnapshot, predicate: E, source: DataFrame, ) -> Self { @@ -728,7 +728,7 @@ async fn execute( predicate: Expression, mut source: DataFrame, log_store: LogStoreRef, - snapshot: DeltaTableState, + snapshot: EagerSnapshot, state: SessionState, writer_properties: Option, mut commit_properties: CommitProperties, @@ -742,7 +742,7 @@ async fn execute( not_match_source_operations: Vec, operation_id: Uuid, handle: Option<&Arc>, -) -> DeltaResult<(DeltaTableState, MergeMetrics)> { +) -> DeltaResult<(EagerSnapshot, MergeMetrics)> { let mut metrics = MergeMetrics::default(); let exec_start = Instant::now(); // Determining whether we should write change data once so that computation of change data can @@ -1376,9 +1376,9 @@ async fn execute( let table_partition_cols = current_metadata.partition_columns().clone(); let writer_stats_config = WriterStatsConfig::new( - snapshot.table_config().num_indexed_cols(), + snapshot.table_properties().num_indexed_cols(), snapshot - .table_config() + .table_properties() .data_skipping_stats_columns .as_ref() .map(|v| v.iter().map(|v| v.to_string()).collect::>()), @@ -1390,7 +1390,7 @@ async fn execute( write, table_partition_cols.clone(), log_store.object_store(Some(operation_id)), - Some(snapshot.table_config().target_file_size().get() as usize), + Some(snapshot.table_properties().target_file_size().get() as usize), None, writer_properties.clone(), writer_stats_config.clone(), @@ -1462,7 +1462,7 @@ async fn execute( .with_post_commit_hook_handler(handle.cloned()) .build(Some(&snapshot), log_store.clone(), operation) .await?; - Ok((commit.snapshot(), metrics)) + Ok((commit.snapshot().snapshot, metrics)) } fn modify_schema( @@ -1524,7 +1524,7 @@ impl std::future::IntoFuture for MergeBuilder { let this = self; Box::pin(async move { - PROTOCOL.can_write_to(&this.snapshot.snapshot)?; + PROTOCOL.can_write_to(&this.snapshot)?; if !this.snapshot.load_config().require_files { return Err(DeltaTableError::NotInitializedWithFiles("MERGE".into())); @@ -1569,7 +1569,7 @@ impl std::future::IntoFuture for MergeBuilder { } Ok(( - DeltaTable::new_with_state(this.log_store, snapshot), + DeltaTable::new_with_state(this.log_store, DeltaTableState { snapshot }), metrics, )) }) diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index a2a531f08e..c024bc5538 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -231,21 +231,22 @@ impl DeltaOps { #[cfg(feature = "datafusion")] #[must_use] pub fn load(self) -> LoadBuilder { - LoadBuilder::new(self.0.log_store, self.0.state.unwrap()) + LoadBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) } /// Load a table with CDF Enabled #[cfg(feature = "datafusion")] #[must_use] pub fn load_cdf(self) -> CdfLoadBuilder { - CdfLoadBuilder::new(self.0.log_store, self.0.state.unwrap()) + CdfLoadBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) } /// Write data to Delta table #[cfg(feature = "datafusion")] #[must_use] pub fn write(self, batches: impl IntoIterator) -> WriteBuilder { - WriteBuilder::new(self.0.log_store, self.0.state).with_input_batches(batches) + WriteBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot)) + .with_input_batches(batches) } /// Vacuum stale files from delta table @@ -257,34 +258,34 @@ impl DeltaOps { /// Audit active files with files present on the filesystem #[must_use] pub fn filesystem_check(self) -> FileSystemCheckBuilder { - FileSystemCheckBuilder::new(self.0.log_store, self.0.state.unwrap()) + FileSystemCheckBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) } /// Audit active files with files present on the filesystem #[cfg(feature = "datafusion")] #[must_use] pub fn optimize<'a>(self) -> OptimizeBuilder<'a> { - OptimizeBuilder::new(self.0.log_store, self.0.state.unwrap()) + OptimizeBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) } /// Delete data from Delta table #[cfg(feature = "datafusion")] #[must_use] pub fn delete(self) -> DeleteBuilder { - DeleteBuilder::new(self.0.log_store, self.0.state.unwrap()) + DeleteBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) } /// Update data from Delta table #[cfg(feature = "datafusion")] #[must_use] pub fn update(self) -> UpdateBuilder { - UpdateBuilder::new(self.0.log_store, self.0.state.unwrap()) + UpdateBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) } /// Restore delta table to a specified version or datetime #[must_use] pub fn restore(self) -> RestoreBuilder { - RestoreBuilder::new(self.0.log_store, self.0.state.unwrap()) + RestoreBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) } /// Update data from Delta table @@ -297,7 +298,7 @@ impl DeltaOps { ) -> MergeBuilder { MergeBuilder::new( self.0.log_store, - self.0.state.unwrap(), + self.0.state.unwrap().snapshot, predicate.into(), source, ) @@ -307,40 +308,40 @@ impl DeltaOps { #[cfg(feature = "datafusion")] #[must_use] pub fn add_constraint(self) -> ConstraintBuilder { - ConstraintBuilder::new(self.0.log_store, self.0.state.unwrap()) + ConstraintBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) } /// Enable a table feature for a table #[must_use] pub fn add_feature(self) -> AddTableFeatureBuilder { - AddTableFeatureBuilder::new(self.0.log_store, self.0.state.unwrap()) + AddTableFeatureBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) } /// Drops constraints from a table #[cfg(feature = "datafusion")] #[must_use] pub fn drop_constraints(self) -> DropConstraintBuilder { - DropConstraintBuilder::new(self.0.log_store, self.0.state.unwrap()) + DropConstraintBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) } /// Set table properties pub fn set_tbl_properties(self) -> SetTablePropertiesBuilder { - SetTablePropertiesBuilder::new(self.0.log_store, self.0.state.unwrap()) + SetTablePropertiesBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) } /// Add new columns pub fn add_columns(self) -> AddColumnBuilder { - AddColumnBuilder::new(self.0.log_store, self.0.state.unwrap()) + AddColumnBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) } /// Update field metadata pub fn update_field_metadata(self) -> UpdateFieldMetadataBuilder { - UpdateFieldMetadataBuilder::new(self.0.log_store, self.0.state.unwrap()) + UpdateFieldMetadataBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) } /// Update table metadata pub fn update_table_metadata(self) -> UpdateTableMetadataBuilder { - UpdateTableMetadataBuilder::new(self.0.log_store, self.0.state.unwrap()) + UpdateTableMetadataBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) } } diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index b631ee0135..6f7d45af77 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -50,6 +50,7 @@ use super::{CustomExecuteHandler, Operation}; use crate::delta_datafusion::DeltaTableProvider; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::transaction::{CommitBuilder, CommitProperties, DEFAULT_RETRIES, PROTOCOL}; +use crate::kernel::EagerSnapshot; use crate::kernel::{scalars::ScalarExt, Action, Add, PartitionsExt, Remove}; use crate::logstore::{LogStore, LogStoreRef, ObjectStoreRef}; use crate::protocol::DeltaOperation; @@ -195,7 +196,7 @@ pub enum OptimizeType { /// table's configuration is read. Otherwise a default value is used. pub struct OptimizeBuilder<'a> { /// A snapshot of the to-be-optimized table's state - snapshot: DeltaTableState, + snapshot: EagerSnapshot, /// Delta object store for handling data files log_store: LogStoreRef, /// Filters to select specific table partitions to be optimized @@ -231,7 +232,7 @@ impl super::Operation<()> for OptimizeBuilder<'_> { impl<'a> OptimizeBuilder<'a> { /// Create a new [`OptimizeBuilder`] - pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { Self { snapshot, log_store, @@ -324,7 +325,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { let this = self; Box::pin(async move { - PROTOCOL.can_write_to(&this.snapshot.snapshot)?; + PROTOCOL.can_write_to(&this.snapshot)?; if !&this.snapshot.load_config().require_files { return Err(DeltaTableError::NotInitializedWithFiles("OPTIMIZE".into())); } @@ -364,7 +365,8 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { if let Some(handler) = this.custom_execute_handler { handler.post_execute(&this.log_store, operation_id).await?; } - let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot); + let mut table = + DeltaTable::new_with_state(this.log_store, DeltaTableState::new(this.snapshot)); table.update().await?; Ok((table, metrics)) }) @@ -612,7 +614,7 @@ impl MergePlan { pub async fn execute( mut self, log_store: LogStoreRef, - snapshot: &DeltaTableState, + snapshot: &EagerSnapshot, max_concurrent_tasks: usize, #[allow(unused_variables)] // used behind a feature flag max_spill_size: usize, @@ -713,7 +715,8 @@ impl MergePlan { let mut stream = stream.buffer_unordered(max_concurrent_tasks); - let mut table = DeltaTable::new_with_state(log_store.clone(), snapshot.clone()); + let mut table = + DeltaTable::new_with_state(log_store.clone(), DeltaTableState::new(snapshot.clone())); // Actions buffered so far. These will be flushed either at the end // or when we reach the commit interval. @@ -777,7 +780,7 @@ impl MergePlan { self.task_parameters.input_parameters.clone().into(), ) .await?; - snapshot = commit.snapshot(); + snapshot = commit.snapshot().snapshot; commits_made += 1; } @@ -794,7 +797,7 @@ impl MergePlan { total_metrics.files_removed.min = 0; } - table.state = Some(snapshot); + table.state = Some(DeltaTableState::new(snapshot)); Ok(total_metrics) } @@ -804,14 +807,14 @@ impl MergePlan { pub async fn create_merge_plan( log_store: &dyn LogStore, optimize_type: OptimizeType, - snapshot: &DeltaTableState, + snapshot: &EagerSnapshot, filters: &[PartitionFilter], target_size: Option, writer_properties: WriterProperties, session_config: Option, ) -> Result { let target_size = - target_size.unwrap_or_else(|| snapshot.table_config().target_file_size().get()); + target_size.unwrap_or_else(|| snapshot.table_properties().target_file_size().get()); let partitions_keys = snapshot.metadata().partition_columns(); let (operations, metrics) = match optimize_type { @@ -847,9 +850,9 @@ pub async fn create_merge_plan( input_parameters, file_schema, writer_properties, - num_indexed_cols: snapshot.table_config().num_indexed_cols(), + num_indexed_cols: snapshot.table_properties().num_indexed_cols(), stats_columns: snapshot - .table_config() + .table_properties() .data_skipping_stats_columns .as_ref() .map(|v| v.iter().map(|v| v.to_string()).collect::>()), @@ -902,14 +905,14 @@ impl IntoIterator for MergeBin { async fn build_compaction_plan( log_store: &dyn LogStore, - snapshot: &DeltaTableState, + snapshot: &EagerSnapshot, filters: &[PartitionFilter], target_size: u64, ) -> Result<(OptimizeOperations, Metrics), DeltaTableError> { let mut metrics = Metrics::default(); let mut partition_files: HashMap, Vec)> = HashMap::new(); - let mut file_stream = snapshot.get_active_add_actions_by_partitions(log_store, filters); + let mut file_stream = snapshot.file_views_by_partitions(log_store, filters); while let Some(file) = file_stream.next().await { let file = file?; metrics.total_considered_files += 1; @@ -983,7 +986,7 @@ async fn build_compaction_plan( async fn build_zorder_plan( log_store: &dyn LogStore, zorder_columns: Vec, - snapshot: &DeltaTableState, + snapshot: &EagerSnapshot, partition_keys: &[String], filters: &[PartitionFilter], session_config: Option, @@ -1021,7 +1024,7 @@ async fn build_zorder_plan( let mut metrics = Metrics::default(); let mut partition_files: HashMap, MergeBin)> = HashMap::new(); - let mut file_stream = snapshot.get_active_add_actions_by_partitions(log_store, filters); + let mut file_stream = snapshot.file_views_by_partitions(log_store, filters); while let Some(file) = file_stream.next().await { let file = file?; let partition_values = file diff --git a/crates/core/src/operations/restore.rs b/crates/core/src/operations/restore.rs index cc0942c083..aaf82fc6e6 100644 --- a/crates/core/src/operations/restore.rs +++ b/crates/core/src/operations/restore.rs @@ -28,6 +28,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use chrono::{DateTime, Utc}; use futures::future::BoxFuture; +use futures::TryStreamExt; use object_store::path::Path; use object_store::ObjectStore; use serde::Serialize; @@ -35,7 +36,7 @@ use uuid::Uuid; use super::{CustomExecuteHandler, Operation}; use crate::kernel::transaction::{CommitBuilder, CommitProperties, TransactionError}; -use crate::kernel::{Action, Add, ProtocolExt as _, ProtocolInner, Remove}; +use crate::kernel::{Action, Add, EagerSnapshot, ProtocolExt as _, ProtocolInner, Remove}; use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; @@ -76,7 +77,7 @@ pub struct RestoreMetrics { /// See this module's documentation for more information pub struct RestoreBuilder { /// A snapshot of the to-be-restored table's state - snapshot: DeltaTableState, + snapshot: EagerSnapshot, /// Delta object store for handling data files log_store: LogStoreRef, /// Version to restore @@ -103,7 +104,7 @@ impl super::Operation<()> for RestoreBuilder { impl RestoreBuilder { /// Create a new [`RestoreBuilder`] - pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { Self { snapshot, log_store, @@ -157,7 +158,7 @@ impl RestoreBuilder { #[allow(clippy::too_many_arguments)] async fn execute( log_store: LogStoreRef, - snapshot: DeltaTableState, + snapshot: EagerSnapshot, version_to_restore: Option, datetime_to_restore: Option>, ignore_missing_files: bool, @@ -198,16 +199,22 @@ async fn execute( let snapshot_restored = table.snapshot()?; let metadata_restored_version = snapshot_restored.metadata(); - let state_to_restore_files = snapshot_restored.file_actions(&log_store).await?; - let latest_state_files = snapshot.file_actions(&log_store).await?; + let state_to_restore_files: Vec<_> = snapshot_restored + .snapshot() + .files(&log_store, None) + .try_collect() + .await?; + let latest_state_files: Vec<_> = snapshot.files(&log_store, None).try_collect().await?; let state_to_restore_files_set = - HashSet::::from_iter(state_to_restore_files.iter().cloned()); - let latest_state_files_set = HashSet::::from_iter(latest_state_files.iter().cloned()); + HashSet::<_>::from_iter(state_to_restore_files.iter().map(|f| f.path().to_string())); + let latest_state_files_set = + HashSet::<_>::from_iter(latest_state_files.iter().map(|f| f.path().to_string())); let files_to_add: Vec = state_to_restore_files - .into_iter() - .filter(|a: &Add| !latest_state_files_set.contains(a)) - .map(|mut a: Add| -> Add { + .iter() + .filter(|a| !latest_state_files_set.contains(&a.path().to_string())) + .map(|f| { + let mut a = f.add_action(); a.data_change = true; a }) @@ -218,21 +225,12 @@ async fn execute( .unwrap() .as_millis() as i64; let files_to_remove: Vec = latest_state_files - .into_iter() - .filter(|a: &Add| !state_to_restore_files_set.contains(a)) - .map(|a: Add| -> Remove { - Remove { - path: a.path.clone(), - deletion_timestamp: Some(deletion_timestamp), - data_change: true, - extended_file_metadata: Some(false), - partition_values: Some(a.partition_values.clone()), - size: Some(a.size), - tags: a.tags, - deletion_vector: a.deletion_vector, - base_row_id: a.base_row_id, - default_row_commit_version: a.default_row_commit_version, - } + .iter() + .filter(|f| !state_to_restore_files_set.contains(&f.path().to_string())) + .map(|f| { + let mut rm = f.remove_action(true); + rm.deletion_timestamp = Some(deletion_timestamp); + rm }) .collect(); @@ -364,7 +362,12 @@ impl std::future::IntoFuture for RestoreBuilder { this.post_execute(operation_id).await?; - let mut table = DeltaTable::new_with_state(this.log_store, this.snapshot); + let mut table = DeltaTable::new_with_state( + this.log_store, + DeltaTableState { + snapshot: this.snapshot, + }, + ); table.update().await?; Ok((table, metrics)) }) diff --git a/crates/core/src/operations/set_tbl_properties.rs b/crates/core/src/operations/set_tbl_properties.rs index 51cb71de54..382f71e787 100644 --- a/crates/core/src/operations/set_tbl_properties.rs +++ b/crates/core/src/operations/set_tbl_properties.rs @@ -7,17 +7,16 @@ use futures::future::BoxFuture; use super::{CustomExecuteHandler, Operation}; use crate::kernel::transaction::{CommitBuilder, CommitProperties}; -use crate::kernel::{Action, MetadataExt as _, ProtocolExt as _}; +use crate::kernel::{Action, EagerSnapshot, MetadataExt as _, ProtocolExt as _}; use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; -use crate::table::state::DeltaTableState; use crate::DeltaResult; use crate::DeltaTable; /// Remove constraints from the table pub struct SetTablePropertiesBuilder { /// A snapshot of the table's state - snapshot: DeltaTableState, + snapshot: EagerSnapshot, /// Name of the property properties: HashMap, /// Raise if property doesn't exist @@ -40,7 +39,7 @@ impl super::Operation<()> for SetTablePropertiesBuilder { impl SetTablePropertiesBuilder { /// Create a new builder - pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { Self { properties: HashMap::new(), raise_if_not_exists: true, diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 8e6dbeb908..bf95949ba4 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -51,7 +51,6 @@ use super::{ write::execution::{write_execution_plan, write_execution_plan_cdc}, CustomExecuteHandler, Operation, }; -use crate::delta_datafusion::{find_files, planner::DeltaPlanner, register_store}; use crate::kernel::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; use crate::kernel::{Action, Remove}; use crate::logstore::LogStoreRef; @@ -68,6 +67,10 @@ use crate::{ }, table::config::TablePropertiesExt, }; +use crate::{ + delta_datafusion::{find_files, planner::DeltaPlanner, register_store}, + kernel::EagerSnapshot, +}; use crate::{DeltaResult, DeltaTable, DeltaTableError}; /// Custom column name used for marking internal [RecordBatch] rows as updated @@ -85,7 +88,7 @@ pub struct UpdateBuilder { /// How to update columns in a record that match the predicate updates: HashMap, /// A snapshot of the table's state - snapshot: DeltaTableState, + snapshot: EagerSnapshot, /// Delta object store for handling data files log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan @@ -128,7 +131,7 @@ impl super::Operation<()> for UpdateBuilder { impl UpdateBuilder { /// Create a new ['UpdateBuilder'] - pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { Self { predicate: None, updates: HashMap::new(), @@ -240,14 +243,14 @@ async fn execute( predicate: Option, updates: HashMap, log_store: LogStoreRef, - snapshot: DeltaTableState, + snapshot: EagerSnapshot, state: SessionState, writer_properties: Option, mut commit_properties: CommitProperties, _safe_cast: bool, operation_id: Uuid, handle: Option<&Arc>, -) -> DeltaResult<(DeltaTableState, UpdateMetrics)> { +) -> DeltaResult<(EagerSnapshot, UpdateMetrics)> { // Validate the predicate and update expressions. // // If the predicate is not set, then all files need to be updated. @@ -380,9 +383,9 @@ async fn execute( .drop_columns(&[UPDATE_PREDICATE_COLNAME])?; let physical_plan = updated_df.clone().create_physical_plan().await?; let writer_stats_config = WriterStatsConfig::new( - snapshot.table_config().num_indexed_cols(), + snapshot.table_properties().num_indexed_cols(), snapshot - .table_config() + .table_properties() .data_skipping_stats_columns .as_ref() .map(|v| v.iter().map(|v| v.to_string()).collect::>()), @@ -396,7 +399,7 @@ async fn execute( physical_plan.clone(), table_partition_cols.clone(), log_store.object_store(Some(operation_id)).clone(), - Some(snapshot.table_config().target_file_size().get() as usize), + Some(snapshot.table_properties().target_file_size().get() as usize), None, writer_properties.clone(), writer_stats_config.clone(), @@ -458,7 +461,7 @@ async fn execute( df.create_physical_plan().await?, table_partition_cols, log_store.object_store(Some(operation_id)), - Some(snapshot.table_config().target_file_size().get() as usize), + Some(snapshot.table_properties().target_file_size().get() as usize), None, writer_properties, writer_stats_config, @@ -479,7 +482,7 @@ async fn execute( .build(Some(&snapshot), log_store, operation) .await?; - Ok((commit.snapshot(), metrics)) + Ok((commit.snapshot().snapshot().clone(), metrics)) } impl std::future::IntoFuture for UpdateBuilder { @@ -489,8 +492,8 @@ impl std::future::IntoFuture for UpdateBuilder { fn into_future(self) -> Self::IntoFuture { let this = self; Box::pin(async move { - PROTOCOL.check_append_only(&this.snapshot.snapshot)?; - PROTOCOL.can_write_to(&this.snapshot.snapshot)?; + PROTOCOL.check_append_only(&this.snapshot)?; + PROTOCOL.can_write_to(&this.snapshot)?; if !&this.snapshot.load_config().require_files { return Err(DeltaTableError::NotInitializedWithFiles("UPDATE".into())); @@ -527,7 +530,7 @@ impl std::future::IntoFuture for UpdateBuilder { } Ok(( - DeltaTable::new_with_state(this.log_store, snapshot), + DeltaTable::new_with_state(this.log_store, DeltaTableState::new(snapshot)), metrics, )) }) diff --git a/crates/core/src/operations/update_field_metadata.rs b/crates/core/src/operations/update_field_metadata.rs index 320c626c59..7457d41f9c 100644 --- a/crates/core/src/operations/update_field_metadata.rs +++ b/crates/core/src/operations/update_field_metadata.rs @@ -9,17 +9,16 @@ use itertools::Itertools; use super::{CustomExecuteHandler, Operation}; use crate::kernel::transaction::{CommitBuilder, CommitProperties}; -use crate::kernel::{MetadataExt as _, ProtocolExt as _}; +use crate::kernel::{EagerSnapshot, MetadataExt as _, ProtocolExt as _}; use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; -use crate::table::state::DeltaTableState; use crate::DeltaTable; use crate::{DeltaResult, DeltaTableError}; /// Update a field's metadata in a schema. If the key does not exists, the entry is inserted. pub struct UpdateFieldMetadataBuilder { /// A snapshot of the table's state - snapshot: DeltaTableState, + snapshot: EagerSnapshot, /// The name of the field where the metadata may be updated field_name: String, /// HashMap of the metadata to upsert @@ -42,7 +41,7 @@ impl super::Operation<()> for UpdateFieldMetadataBuilder { impl UpdateFieldMetadataBuilder { /// Create a new builder - pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { Self { metadata: HashMap::new(), field_name: String::new(), diff --git a/crates/core/src/operations/update_table_metadata.rs b/crates/core/src/operations/update_table_metadata.rs index ba0240e133..fae0fbdec9 100644 --- a/crates/core/src/operations/update_table_metadata.rs +++ b/crates/core/src/operations/update_table_metadata.rs @@ -7,10 +7,9 @@ use validator::Validate; use super::{CustomExecuteHandler, Operation}; use crate::kernel::transaction::{CommitBuilder, CommitProperties}; -use crate::kernel::{Action, MetadataExt}; +use crate::kernel::{Action, EagerSnapshot, MetadataExt}; use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; -use crate::table::state::DeltaTableState; use crate::DeltaTable; use crate::{DeltaResult, DeltaTableError}; @@ -46,7 +45,7 @@ fn validate_at_least_one_field( /// Update table metadata operation pub struct UpdateTableMetadataBuilder { /// A snapshot of the table's state - snapshot: DeltaTableState, + snapshot: EagerSnapshot, /// The metadata update to apply update: Option, /// Delta object store for handling data files @@ -67,7 +66,7 @@ impl super::Operation<()> for UpdateTableMetadataBuilder { impl UpdateTableMetadataBuilder { /// Create a new builder - pub fn new(log_store: LogStoreRef, snapshot: DeltaTableState) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { Self { update: None, snapshot, diff --git a/crates/core/src/operations/write/execution.rs b/crates/core/src/operations/write/execution.rs index 2760702b70..f7b90d1f60 100644 --- a/crates/core/src/operations/write/execution.rs +++ b/crates/core/src/operations/write/execution.rs @@ -19,11 +19,10 @@ use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{find_files, DeltaScanConfigBuilder, DeltaTableProvider}; use crate::delta_datafusion::{DataFusionMixins, DeltaDataChecker}; use crate::errors::DeltaResult; -use crate::kernel::{Action, Add, AddCDCFile, Remove, StructType, StructTypeExt}; +use crate::kernel::{Action, Add, AddCDCFile, EagerSnapshot, Remove, StructType, StructTypeExt}; use crate::logstore::{LogStoreRef, ObjectStoreRef}; use crate::operations::cdc::should_write_cdc; use crate::table::config::TablePropertiesExt as _; -use crate::table::state::DeltaTableState; use crate::table::Constraint as DeltaConstraint; use crate::DeltaTableError; @@ -45,7 +44,7 @@ pub(crate) struct WriteExecutionPlanMetrics { #[allow(clippy::too_many_arguments)] pub(crate) async fn write_execution_plan_cdc( - snapshot: Option<&DeltaTableState>, + snapshot: Option<&EagerSnapshot>, state: SessionState, plan: Arc, partition_columns: Vec, @@ -92,7 +91,7 @@ pub(crate) async fn write_execution_plan_cdc( #[allow(clippy::too_many_arguments)] pub(crate) async fn write_execution_plan( - snapshot: Option<&DeltaTableState>, + snapshot: Option<&EagerSnapshot>, state: SessionState, plan: Arc, partition_columns: Vec, @@ -121,7 +120,7 @@ pub(crate) async fn write_execution_plan( #[allow(clippy::too_many_arguments)] pub(crate) async fn execute_non_empty_expr( - snapshot: &DeltaTableState, + snapshot: &EagerSnapshot, log_store: LogStoreRef, state: SessionState, partition_columns: Vec, @@ -169,7 +168,7 @@ pub(crate) async fn execute_non_empty_expr( filter, partition_columns.clone(), log_store.object_store(Some(operation_id)), - Some(snapshot.table_config().target_file_size().get() as usize), + Some(snapshot.table_properties().target_file_size().get() as usize), None, writer_properties.clone(), writer_stats_config.clone(), @@ -201,7 +200,7 @@ pub(crate) async fn execute_non_empty_expr( pub(crate) async fn prepare_predicate_actions( predicate: Expr, log_store: LogStoreRef, - snapshot: &DeltaTableState, + snapshot: &EagerSnapshot, state: SessionState, partition_columns: Vec, writer_properties: Option, @@ -247,7 +246,7 @@ pub(crate) async fn prepare_predicate_actions( #[allow(clippy::too_many_arguments)] pub(crate) async fn write_execution_plan_v2( - snapshot: Option<&DeltaTableState>, + snapshot: Option<&EagerSnapshot>, state: SessionState, plan: Arc, partition_columns: Vec, diff --git a/crates/core/src/operations/write/generated_columns.rs b/crates/core/src/operations/write/generated_columns.rs index e8781d1d07..69500412e0 100644 --- a/crates/core/src/operations/write/generated_columns.rs +++ b/crates/core/src/operations/write/generated_columns.rs @@ -1,4 +1,4 @@ -use crate::table::state::DeltaTableState; +use crate::kernel::EagerSnapshot; use datafusion::common::ScalarValue; use datafusion::logical_expr::{col, when, ExprSchemable}; use datafusion::prelude::lit; @@ -9,7 +9,7 @@ use tracing::debug; use crate::{kernel::DataCheck, table::GeneratedColumn, DeltaResult}; /// check if the writer version is able to write generated columns -pub fn able_to_gc(snapshot: &DeltaTableState) -> DeltaResult { +pub fn able_to_gc(snapshot: &EagerSnapshot) -> DeltaResult { if let Some(features) = &snapshot.protocol().writer_features() { if snapshot.protocol().min_writer_version() < 4 { return Ok(false); diff --git a/crates/core/src/operations/write/mod.rs b/crates/core/src/operations/write/mod.rs index f264f9e20f..52625bed94 100644 --- a/crates/core/src/operations/write/mod.rs +++ b/crates/core/src/operations/write/mod.rs @@ -71,11 +71,11 @@ use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::schema::cast::merge_arrow_schema; use crate::kernel::transaction::{CommitBuilder, CommitProperties, TableReference, PROTOCOL}; use crate::kernel::{ - new_metadata, Action, ActionType, MetadataExt as _, ProtocolExt as _, StructType, StructTypeExt, + new_metadata, Action, ActionType, EagerSnapshot, MetadataExt as _, ProtocolExt as _, + StructType, StructTypeExt, }; use crate::logstore::LogStoreRef; use crate::protocol::{DeltaOperation, SaveMode}; -use crate::table::state::DeltaTableState; use crate::DeltaTable; #[derive(thiserror::Error, Debug)] @@ -130,7 +130,7 @@ impl FromStr for SchemaMode { /// Write data into a DeltaTable pub struct WriteBuilder { /// A snapshot of the to-be-loaded table's state - snapshot: Option, + snapshot: Option, /// Delta object store for handling data files log_store: LogStoreRef, /// The input plan @@ -190,7 +190,7 @@ impl super::Operation<()> for WriteBuilder { impl WriteBuilder { /// Create a new [`WriteBuilder`] - pub fn new(log_store: LogStoreRef, snapshot: Option) -> Self { + pub fn new(log_store: LogStoreRef, snapshot: Option) -> Self { Self { snapshot, log_store, @@ -368,7 +368,7 @@ impl WriteBuilder { match &self.snapshot { Some(snapshot) => { if self.mode == SaveMode::Overwrite { - PROTOCOL.check_append_only(&snapshot.snapshot)?; + PROTOCOL.check_append_only(snapshot)?; if !snapshot.load_config().require_files { return Err(DeltaTableError::NotInitializedWithFiles("WRITE".into())); } @@ -611,7 +611,7 @@ impl std::future::IntoFuture for WriteBuilder { let config = this .snapshot .as_ref() - .map(|snapshot| snapshot.table_config()); + .map(|snapshot| snapshot.table_properties()); let target_file_size = this.target_file_size.or_else(|| { Some(super::get_target_file_size(config, &this.configuration) as usize) @@ -1950,9 +1950,10 @@ mod tests { .logical_plan() .clone(), ); - let writer = WriteBuilder::new(table.log_store.clone(), table.state) - .with_input_execution_plan(plan) - .with_save_mode(SaveMode::Overwrite); + let writer = + WriteBuilder::new(table.log_store.clone(), table.state.map(|f| f.snapshot)) + .with_input_execution_plan(plan) + .with_save_mode(SaveMode::Overwrite); let _ = writer.check_preconditions().await?; Ok(()) diff --git a/crates/core/src/operations/write/writer.rs b/crates/core/src/operations/write/writer.rs index d0577eb563..c31a3dda56 100644 --- a/crates/core/src/operations/write/writer.rs +++ b/crates/core/src/operations/write/writer.rs @@ -491,7 +491,7 @@ mod tests { use crate::logstore::tests::flatten_list_stream as list; use crate::table::config::DEFAULT_NUM_INDEX_COLS; use crate::writer::test_utils::*; - use crate::{ensure_table_uri, DeltaTableBuilder}; + use crate::DeltaTableBuilder; use arrow::array::{Int32Array, StringArray}; use arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; use std::sync::Arc; diff --git a/crates/core/src/table/mod.rs b/crates/core/src/table/mod.rs index 2f1088d522..e001f16406 100644 --- a/crates/core/src/table/mod.rs +++ b/crates/core/src/table/mod.rs @@ -235,7 +235,7 @@ impl DeltaTable { ) -> Result, DeltaTableError> { let infos = self .snapshot()? - .snapshot + .snapshot() .snapshot() .commit_infos(&self.log_store(), limit) .await? @@ -265,7 +265,9 @@ impl DeltaTable { Err(DeltaTableError::NotInitialized) })); }; - state.get_active_add_actions_by_partitions(&self.log_store, filters) + state + .snapshot() + .file_views_by_partitions(&self.log_store, filters) } /// Returns the file list tracked in current table state filtered by provided diff --git a/crates/core/src/table/state.rs b/crates/core/src/table/state.rs index 2a97c2d530..1f74a1ca5f 100644 --- a/crates/core/src/table/state.rs +++ b/crates/core/src/table/state.rs @@ -24,7 +24,7 @@ use crate::kernel::{ use crate::logstore::LogStore; use crate::partitions::PartitionFilter; use crate::table::config::TablePropertiesExt; -use crate::{to_kernel_predicate, DeltaResult, DeltaTableError}; +use crate::{DeltaResult, DeltaTableError}; /// State snapshot currently held by the Delta Table instance. #[derive(Debug, Clone, Serialize, Deserialize)] @@ -34,6 +34,10 @@ pub struct DeltaTableState { } impl DeltaTableState { + pub fn new(snapshot: EagerSnapshot) -> Self { + Self { snapshot } + } + /// Create a new DeltaTableState pub async fn try_new( log_store: &dyn LogStore, @@ -218,21 +222,53 @@ impl DeltaTableState { /// ## Returns /// /// A stream of logical file views that match the partition filters. + #[deprecated( + since = "0.30.0", + note = "Use `.snapshot().files(log_store, predicate)` with a kernel predicate instead." + )] pub fn get_active_add_actions_by_partitions( &self, log_store: &dyn LogStore, filters: &[PartitionFilter], ) -> BoxStream<'_, DeltaResult> { - if filters.is_empty() { - return self.snapshot().files(log_store, None); - } - let predicate = match to_kernel_predicate(filters, self.snapshot.schema()) { - Ok(predicate) => Arc::new(predicate), - Err(err) => return Box::pin(futures::stream::once(async { Err(err) })), - }; - self.snapshot().files(log_store, Some(predicate)) + self.snapshot().file_views_by_partitions(log_store, filters) + } + + /// Get an [arrow::record_batch::RecordBatch] containing add action data. + /// + /// # Arguments + /// + /// * `flatten` - whether to flatten the schema. Partition values columns are + /// given the prefix `partition.`, statistics (null_count, min, and max) are + /// given the prefix `null_count.`, `min.`, and `max.`, and tags the + /// prefix `tags.`. Nested field names are concatenated with `.`. + /// + /// # Data schema + /// + /// Each row represents a file that is a part of the selected tables state. + /// + /// * `path` (String): relative or absolute to a file. + /// * `size_bytes` (Int64): size of file in bytes. + /// * `modification_time` (Millisecond Timestamp): time the file was created. + /// * `null_count.{col_name}` (Int64): number of null values for column in + /// this file. + /// * `num_records.{col_name}` (Int64): number of records for column in + /// this file. + /// * `min.{col_name}` (matches column type): minimum value of column in file + /// (if available). + /// * `max.{col_name}` (matches column type): maximum value of column in file + /// (if available). + /// * `partition.{partition column name}` (matches column type): value of + /// partition the file corresponds to. + pub fn add_actions_table( + &self, + flatten: bool, + ) -> Result { + self.snapshot.add_actions_table(flatten) } +} +impl EagerSnapshot { /// Get an [arrow::record_batch::RecordBatch] containing add action data. /// /// # Arguments @@ -274,7 +310,7 @@ impl DeltaTableState { StructField::not_null("modification_time", DataType::LONG), ]; - let stats_schema = self.snapshot.snapshot().inner.stats_schema()?; + let stats_schema = self.snapshot().inner.stats_schema()?; let num_records_field = stats_schema .field("numRecords") .ok_or_else(|| DeltaTableError::SchemaMismatch { @@ -303,7 +339,7 @@ impl DeltaTableState { fields.push(max_values_field); } - if let Some(partition_schema) = self.snapshot.snapshot().inner.partitions_schema()? { + if let Some(partition_schema) = self.snapshot().inner.partitions_schema()? { fields.push(StructField::nullable( "partition", DataType::try_struct_type(partition_schema.fields().cloned())?, @@ -314,9 +350,9 @@ impl DeltaTableState { let expression = Expression::Struct(expressions); let table_schema = DataType::try_struct_type(fields)?; - let input_schema = self.snapshot.files.schema(); + let input_schema = self.files.schema(); let input_schema = Arc::new(input_schema.as_ref().try_into_kernel()?); - let actions = self.snapshot.files.clone(); + let actions = self.files.clone(); let evaluator = ARROW_HANDLER.new_expression_evaluator(input_schema, expression.into(), table_schema); diff --git a/crates/core/tests/command_optimize.rs b/crates/core/tests/command_optimize.rs index 7d525ec74b..725fa81b5b 100644 --- a/crates/core/tests/command_optimize.rs +++ b/crates/core/tests/command_optimize.rs @@ -294,7 +294,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box> { let plan = create_merge_plan( &dt.log_store(), OptimizeType::Compact, - dt.snapshot()?, + dt.snapshot()?.snapshot(), &filter, None, WriterProperties::builder().build(), @@ -317,7 +317,7 @@ async fn test_conflict_for_remove_actions() -> Result<(), Box> { let maybe_metrics = plan .execute( dt.log_store(), - dt.snapshot()?, + dt.snapshot()?.snapshot(), 1, 20, None, @@ -360,7 +360,7 @@ async fn test_no_conflict_for_append_actions() -> Result<(), Box> { let plan = create_merge_plan( &dt.log_store(), OptimizeType::Compact, - dt.snapshot()?, + dt.snapshot()?.snapshot(), &filter, None, WriterProperties::builder().build(), @@ -382,7 +382,7 @@ async fn test_no_conflict_for_append_actions() -> Result<(), Box> { let metrics = plan .execute( dt.log_store(), - dt.snapshot()?, + dt.snapshot()?.snapshot(), 1, 20, None, @@ -423,7 +423,7 @@ async fn test_commit_interval() -> Result<(), Box> { let plan = create_merge_plan( &dt.log_store(), OptimizeType::Compact, - dt.snapshot()?, + dt.snapshot()?.snapshot(), &[], None, WriterProperties::builder().build(), @@ -434,7 +434,7 @@ async fn test_commit_interval() -> Result<(), Box> { let metrics = plan .execute( dt.log_store(), - dt.snapshot()?, + dt.snapshot()?.snapshot(), 1, 20, Some(Duration::from_secs(0)), // this will cause as many commits as num_files_added diff --git a/crates/core/tests/integration_datafusion.rs b/crates/core/tests/integration_datafusion.rs index b371021a1f..5caab81862 100644 --- a/crates/core/tests/integration_datafusion.rs +++ b/crates/core/tests/integration_datafusion.rs @@ -433,7 +433,7 @@ mod local { // Execute write to the target table with the proper state let target_table = WriteBuilder::new( target_table.log_store(), - target_table.snapshot().ok().cloned(), + target_table.snapshot().ok().map(|s| s.snapshot()).cloned(), ) .with_input_execution_plan(source_scan) .with_input_session_state(state) @@ -1406,7 +1406,7 @@ mod local { .logical_plan() .clone(), ); - let write_builder = WriteBuilder::new(log_store, tbl.state); + let write_builder = WriteBuilder::new(log_store, tbl.state.map(|s| s.snapshot().clone())); let _ = write_builder .with_input_execution_plan(plan) .with_save_mode(SaveMode::Overwrite) @@ -1684,9 +1684,9 @@ mod insert_into_tests { )?; let ctx = SessionContext::new(); - let scan_config = DeltaScanConfigBuilder::new().build(table.snapshot()?)?; + let scan_config = DeltaScanConfigBuilder::new().build(table.snapshot()?.snapshot())?; let table_provider: Arc = Arc::new(DeltaTableProvider::try_new( - table.snapshot()?.clone(), + table.snapshot()?.snapshot().clone(), table.log_store(), scan_config, )?); @@ -1807,9 +1807,9 @@ mod insert_into_tests { )?; let ctx = SessionContext::new(); - let scan_config = DeltaScanConfigBuilder::new().build(table.snapshot()?)?; + let scan_config = DeltaScanConfigBuilder::new().build(table.snapshot()?.snapshot())?; let table_provider: Arc = Arc::new(DeltaTableProvider::try_new( - table.snapshot()?.clone(), + table.snapshot()?.snapshot().clone(), table.log_store(), scan_config, )?); @@ -1864,10 +1864,10 @@ mod insert_into_tests { ctx.deregister_table("test_table")?; let refreshed_table = deltalake_core::open_table(url::Url::parse(&table_uri)?).await?; let refreshed_scan_config = - DeltaScanConfigBuilder::new().build(refreshed_table.snapshot()?)?; + DeltaScanConfigBuilder::new().build(refreshed_table.snapshot()?.snapshot())?; let refreshed_table_provider: Arc = Arc::new(DeltaTableProvider::try_new( - refreshed_table.snapshot()?.clone(), + refreshed_table.snapshot()?.snapshot().clone(), refreshed_table.log_store(), refreshed_scan_config, )?); @@ -1947,9 +1947,9 @@ mod insert_into_tests { )?; let ctx = SessionContext::new(); - let scan_config = DeltaScanConfigBuilder::new().build(table.snapshot()?)?; + let scan_config = DeltaScanConfigBuilder::new().build(table.snapshot()?.snapshot())?; let table_provider: Arc = Arc::new(DeltaTableProvider::try_new( - table.snapshot()?.clone(), + table.snapshot()?.snapshot().clone(), table.log_store(), scan_config, )?); @@ -1986,9 +1986,10 @@ mod insert_into_tests { ctx.deregister_table("test_table")?; let final_table = deltalake_core::open_table(url::Url::parse(&table_uri)?).await?; - let final_scan_config = DeltaScanConfigBuilder::new().build(final_table.snapshot()?)?; + let final_scan_config = + DeltaScanConfigBuilder::new().build(final_table.snapshot()?.snapshot())?; let final_table_provider: Arc = Arc::new(DeltaTableProvider::try_new( - final_table.snapshot()?.clone(), + final_table.snapshot()?.snapshot().clone(), final_table.log_store(), final_scan_config, )?); @@ -2069,9 +2070,9 @@ mod insert_into_tests { RecordBatch::try_new(table_schema.clone(), vec![Arc::new(names), Arc::new(ages)])?; let ctx = SessionContext::new(); - let scan_config = DeltaScanConfigBuilder::new().build(table.snapshot()?)?; + let scan_config = DeltaScanConfigBuilder::new().build(table.snapshot()?.snapshot())?; let table_provider: Arc = Arc::new(DeltaTableProvider::try_new( - table.snapshot()?.clone(), + table.snapshot()?.snapshot().clone(), table.log_store(), scan_config, )?); diff --git a/python/src/lib.rs b/python/src/lib.rs index be0d0d1f23..702591c8b6 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -27,7 +27,8 @@ use deltalake::errors::DeltaTableError; use deltalake::kernel::scalars::ScalarExt; use deltalake::kernel::transaction::{CommitBuilder, CommitProperties, TableReference}; use deltalake::kernel::{ - Action, Add, LogicalFileView, MetadataExt as _, StructDataExt as _, StructType, Transaction, + Action, Add, EagerSnapshot, LogicalFileView, MetadataExt as _, StructDataExt as _, StructType, + Transaction, }; use deltalake::lakefs::LakeFSCustomExecuteHandler; use deltalake::logstore::LogStoreRef; @@ -154,9 +155,10 @@ impl RawDeltaTable { self.with_table(|t| Ok(t.log_store().object_store(None).clone())) } - fn cloned_state(&self) -> PyResult { + fn cloned_state(&self) -> PyResult { self.with_table(|t| { t.snapshot() + .map(|snapshot| snapshot.snapshot()) .cloned() .map_err(PythonError::from) .map_err(PyErr::from) @@ -1184,7 +1186,7 @@ impl RawDeltaTable { let adds: Vec<_> = rt() .block_on(async { state - .get_active_add_actions_by_partitions(&log_store, &converted_filters) + .file_views_by_partitions(&log_store, &converted_filters) .try_collect() .await }) @@ -1200,7 +1202,7 @@ impl RawDeltaTable { *col, add.partition_values() .and_then(|v| { - v.index_of(*col).and_then(|idx| v.value(idx).cloned()) + v.index_of(col).and_then(|idx| v.value(idx).cloned()) }) .map(|v| v.serialize()), ) @@ -1255,10 +1257,7 @@ impl RawDeltaTable { let add_actions: Vec<_> = rt() .block_on(async { state - .get_active_add_actions_by_partitions( - &log_store, - &converted_filters, - ) + .file_views_by_partitions(&log_store, &converted_filters) .try_collect() .await }) @@ -1689,7 +1688,7 @@ impl RawDeltaTable { let mut builder = WriteBuilder::new( self.log_store()?, - self.with_table(|t| Ok(t.state.clone()))?, + self.with_table(|t| Ok(t.state.as_ref().map(|s| s.snapshot().clone())))?, // Take the Option since it might be the first write, // triggered through `write_to_deltalake` ) @@ -1870,7 +1869,7 @@ fn set_writer_properties(writer_properties: PyWriterProperties) -> DeltaResult