diff --git a/Cargo.toml b/Cargo.toml index 3cab8bdd06..7fc95e9d9e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,7 +83,7 @@ tracing-subscriber = { version = "0.3", features = ["env-filter"] } num_cpus = { version = "1" } [workspace.metadata.typos] -files.extend-exclude = ["CHANGELOG.md"] +files.extend-exclude = ["CHANGELOG.md", "crates/benchmarks/queries/tpcds/*.sql"] default.extend-ignore-re = [ # Custom ignore regex patterns: https://github.com/crate-ci/typos/blob/master/docs/reference.md#example-configurations "(?s)//\\s*spellchecker:ignore-next-line[^\\n]*\\n[^\\n]*", diff --git a/crates/core/src/delta_datafusion/cdf/scan.rs b/crates/core/src/delta_datafusion/cdf/scan.rs index fb85f2ca5a..4a3ea9afb6 100644 --- a/crates/core/src/delta_datafusion/cdf/scan.rs +++ b/crates/core/src/delta_datafusion/cdf/scan.rs @@ -28,7 +28,15 @@ pub struct DeltaCdfTableProvider { impl DeltaCdfTableProvider { /// Build a DeltaCDFTableProvider pub fn try_new(cdf_builder: CdfLoadBuilder) -> DeltaResult { - let mut fields = cdf_builder.snapshot.input_schema().fields().to_vec(); + let mut fields = cdf_builder + .snapshot + .as_ref() + .ok_or(DeltaTableError::generic( + "expected initialized snapshot for DeltaCdfTableProvider", + ))? + .input_schema() + .fields() + .to_vec(); for f in ADD_PARTITION_SCHEMA.clone() { fields.push(f.into()); } diff --git a/crates/core/src/delta_datafusion/table_provider.rs b/crates/core/src/delta_datafusion/table_provider.rs index 3e6a4f89b0..452aa798ce 100644 --- a/crates/core/src/delta_datafusion/table_provider.rs +++ b/crates/core/src/delta_datafusion/table_provider.rs @@ -626,7 +626,7 @@ impl<'a> DeltaScanBuilder<'a> { let mut pruned_batches = Vec::new(); let mut mask_offset = 0; - for batch in &self.snapshot.files { + for batch in self.snapshot.files()? { let batch_size = batch.num_rows(); let batch_mask = &mask[mask_offset..mask_offset + batch_size]; let batch_mask_array = BooleanArray::from(batch_mask.to_vec()); diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index ef42577e9e..b858dcda86 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -463,7 +463,25 @@ impl Snapshot { pub struct EagerSnapshot { snapshot: Snapshot, // logical files in the snapshot - pub(crate) files: Vec, + files: Vec, +} + +pub(crate) async fn resolve_snapshot( + log_store: &dyn LogStore, + maybe_snapshot: Option, + require_files: bool, +) -> DeltaResult { + if let Some(snapshot) = maybe_snapshot { + if require_files { + snapshot.with_files(log_store).await + } else { + Ok(snapshot) + } + } else { + let mut config = DeltaTableConfig::default(); + config.require_files = require_files; + EagerSnapshot::try_new(log_store, config, None).await + } } impl EagerSnapshot { @@ -474,15 +492,36 @@ impl EagerSnapshot { version: Option, ) -> DeltaResult { let snapshot = Snapshot::try_new(log_store, config.clone(), version).await?; + Self::try_new_with_snapshot(log_store, snapshot).await + } - let files = match config.require_files { + pub(crate) async fn try_new_with_snapshot( + log_store: &dyn LogStore, + snapshot: Snapshot, + ) -> DeltaResult { + let files = match snapshot.load_config().require_files { true => snapshot.files(log_store, None).try_collect().await?, false => vec![], }; - Ok(Self { snapshot, files }) } + pub(crate) async fn with_files(mut self, log_store: &dyn LogStore) -> DeltaResult { + if self.snapshot.config.require_files { + return Ok(self); + } + self.snapshot.config.require_files = true; + Self::try_new_with_snapshot(log_store, self.snapshot).await + } + + pub(crate) fn files(&self) -> DeltaResult<&[RecordBatch]> { + if self.snapshot.config.require_files { + Ok(&self.files) + } else { + Err(DeltaTableError::NotInitializedWithFiles("files".into())) + } + } + /// Update the snapshot to the given version pub(crate) async fn update( &mut self, @@ -588,6 +627,12 @@ impl EagerSnapshot { log_store: &dyn LogStore, predicate: Option, ) -> BoxStream<'_, DeltaResult> { + if !self.snapshot.load_config().require_files { + return Box::pin(once(ready(Err(DeltaTableError::NotInitializedWithFiles( + "file_views".into(), + ))))); + } + self.snapshot .files_from( log_store, diff --git a/crates/core/src/operations/add_column.rs b/crates/core/src/operations/add_column.rs index a535b62c8a..e789cf64ce 100644 --- a/crates/core/src/operations/add_column.rs +++ b/crates/core/src/operations/add_column.rs @@ -9,7 +9,9 @@ use itertools::Itertools; use super::{CustomExecuteHandler, Operation}; use crate::kernel::schema::merge_delta_struct; use crate::kernel::transaction::{CommitBuilder, CommitProperties}; -use crate::kernel::{EagerSnapshot, MetadataExt, ProtocolExt as _, StructField, StructTypeExt}; +use crate::kernel::{ + resolve_snapshot, EagerSnapshot, MetadataExt, ProtocolExt as _, StructField, StructTypeExt, +}; use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; use crate::{DeltaResult, DeltaTable, DeltaTableError}; @@ -17,7 +19,7 @@ use crate::{DeltaResult, DeltaTable, DeltaTableError}; /// Add new columns and/or nested fields to a table pub struct AddColumnBuilder { /// A snapshot of the table's state - snapshot: EagerSnapshot, + snapshot: Option, /// Fields to add/merge into schema fields: Option>, /// Delta object store for handling data files @@ -27,7 +29,7 @@ pub struct AddColumnBuilder { custom_execute_handler: Option>, } -impl Operation<()> for AddColumnBuilder { +impl Operation for AddColumnBuilder { fn log_store(&self) -> &LogStoreRef { &self.log_store } @@ -38,7 +40,7 @@ impl Operation<()> for AddColumnBuilder { impl AddColumnBuilder { /// Create a new builder - pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { + pub(crate) fn new(log_store: LogStoreRef, snapshot: Option) -> Self { Self { snapshot, log_store, @@ -75,7 +77,9 @@ impl std::future::IntoFuture for AddColumnBuilder { let this = self; Box::pin(async move { - let mut metadata = this.snapshot.metadata().clone(); + let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), false).await?; + + let mut metadata = snapshot.metadata().clone(); let fields = match this.fields.clone() { Some(v) => v, None => return Err(DeltaTableError::Generic("No fields provided".to_string())), @@ -95,10 +99,10 @@ impl std::future::IntoFuture for AddColumnBuilder { )); } - let table_schema = this.snapshot.schema(); + let table_schema = snapshot.schema(); let new_table_schema = merge_delta_struct(table_schema.as_ref(), fields_right)?; - let current_protocol = this.snapshot.protocol(); + let current_protocol = snapshot.protocol(); let new_protocol = current_protocol .clone() @@ -121,7 +125,7 @@ impl std::future::IntoFuture for AddColumnBuilder { .with_actions(actions) .with_operation_id(operation_id) .with_post_commit_hook_handler(this.get_custom_execute_handler()) - .build(Some(&this.snapshot), this.log_store.clone(), operation) + .build(Some(&snapshot), this.log_store.clone(), operation) .await?; this.post_execute(operation_id).await?; diff --git a/crates/core/src/operations/add_feature.rs b/crates/core/src/operations/add_feature.rs index 58a7f37d3b..d396ca8286 100644 --- a/crates/core/src/operations/add_feature.rs +++ b/crates/core/src/operations/add_feature.rs @@ -8,7 +8,7 @@ use itertools::Itertools; use super::{CustomExecuteHandler, Operation}; use crate::kernel::transaction::{CommitBuilder, CommitProperties}; -use crate::kernel::{EagerSnapshot, ProtocolExt as _, TableFeatures}; +use crate::kernel::{resolve_snapshot, EagerSnapshot, ProtocolExt as _, TableFeatures}; use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; use crate::DeltaTable; @@ -17,7 +17,7 @@ use crate::{DeltaResult, DeltaTableError}; /// Enable table features for a table pub struct AddTableFeatureBuilder { /// A snapshot of the table's state - snapshot: EagerSnapshot, + snapshot: Option, /// Name of the feature name: Vec, /// Allow protocol versions to be increased by setting features @@ -29,7 +29,7 @@ pub struct AddTableFeatureBuilder { custom_execute_handler: Option>, } -impl super::Operation<()> for AddTableFeatureBuilder { +impl super::Operation for AddTableFeatureBuilder { fn log_store(&self) -> &LogStoreRef { &self.log_store } @@ -40,7 +40,7 @@ impl super::Operation<()> for AddTableFeatureBuilder { impl AddTableFeatureBuilder { /// Create a new builder - pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { + pub(crate) fn new(log_store: LogStoreRef, snapshot: Option) -> Self { Self { name: vec![], allow_protocol_versions_increase: false, @@ -92,6 +92,8 @@ impl std::future::IntoFuture for AddTableFeatureBuilder { let this = self; Box::pin(async move { + let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), false).await?; + let name = if this.name.is_empty() { return Err(DeltaTableError::Generic("No features provided".to_string())); } else { @@ -107,7 +109,7 @@ impl std::future::IntoFuture for AddTableFeatureBuilder { let reader_features = reader_features.into_iter().flatten().collect_vec(); let writer_features = writer_features.into_iter().flatten().collect_vec(); - let mut protocol = this.snapshot.protocol().clone(); + let mut protocol = snapshot.protocol().clone(); if !this.allow_protocol_versions_increase { if !reader_features.is_empty() @@ -135,7 +137,7 @@ impl std::future::IntoFuture for AddTableFeatureBuilder { .with_actions(actions) .with_operation_id(operation_id) .with_post_commit_hook_handler(this.get_custom_execute_handler()) - .build(Some(&this.snapshot), this.log_store.clone(), operation) + .build(Some(&snapshot), this.log_store.clone(), operation) .await?; this.post_execute(operation_id).await?; diff --git a/crates/core/src/operations/constraints.rs b/crates/core/src/operations/constraints.rs index 5fc4372a42..0512f043e6 100644 --- a/crates/core/src/operations/constraints.rs +++ b/crates/core/src/operations/constraints.rs @@ -15,7 +15,9 @@ use super::{CustomExecuteHandler, Operation}; use crate::delta_datafusion::expr::fmt_expr_to_sql; use crate::delta_datafusion::{create_session, register_store, DeltaDataChecker, DeltaScanBuilder}; use crate::kernel::transaction::{CommitBuilder, CommitProperties}; -use crate::kernel::{EagerSnapshot, MetadataExt, ProtocolExt as _, ProtocolInner}; +use crate::kernel::{ + resolve_snapshot, EagerSnapshot, MetadataExt, ProtocolExt as _, ProtocolInner, +}; use crate::logstore::LogStoreRef; use crate::operations::datafusion_utils::Expression; use crate::protocol::DeltaOperation; @@ -25,7 +27,7 @@ use crate::{DeltaResult, DeltaTable, DeltaTableError}; /// Build a constraint to add to a table pub struct ConstraintBuilder { /// A snapshot of the table's state - snapshot: EagerSnapshot, + snapshot: Option, /// Name of the constraint name: Option, /// Constraint expression @@ -39,7 +41,7 @@ pub struct ConstraintBuilder { custom_execute_handler: Option>, } -impl super::Operation<()> for ConstraintBuilder { +impl super::Operation for ConstraintBuilder { fn log_store(&self) -> &LogStoreRef { &self.log_store } @@ -50,7 +52,7 @@ impl super::Operation<()> for ConstraintBuilder { impl ConstraintBuilder { /// Create a new builder - pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { + pub(crate) fn new(log_store: LogStoreRef, snapshot: Option) -> Self { Self { name: None, expr: None, @@ -101,11 +103,8 @@ impl std::future::IntoFuture for ConstraintBuilder { let this = self; Box::pin(async move { - if !this.snapshot.load_config().require_files { - return Err(DeltaTableError::NotInitializedWithFiles( - "ADD CONSTRAINTS".into(), - )); - } + let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), true).await?; + let operation_id = this.get_operation_id(); this.pre_execute(operation_id).await?; @@ -118,7 +117,7 @@ impl std::future::IntoFuture for ConstraintBuilder { .expr .ok_or_else(|| DeltaTableError::Generic("No Expression provided".to_string()))?; - let mut metadata = this.snapshot.metadata().clone(); + let mut metadata = snapshot.metadata().clone(); let configuration_key = format!("delta.constraints.{name}"); if metadata.configuration().contains_key(&configuration_key) { @@ -132,10 +131,9 @@ impl std::future::IntoFuture for ConstraintBuilder { .unwrap_or_else(|| Arc::new(create_session().into_inner().state())); register_store(this.log_store.clone(), session.runtime_env().as_ref()); - let scan = - DeltaScanBuilder::new(&this.snapshot, this.log_store.clone(), session.as_ref()) - .build() - .await?; + let scan = DeltaScanBuilder::new(&snapshot, this.log_store.clone(), session.as_ref()) + .build() + .await?; let schema = scan.schema().to_dfschema()?; let expr = into_expr(expr, &schema, session.as_ref())?; @@ -175,7 +173,7 @@ impl std::future::IntoFuture for ConstraintBuilder { metadata = metadata.add_config_key(format!("delta.constraints.{name}"), expr_str.clone())?; - let old_protocol = this.snapshot.protocol(); + let old_protocol = snapshot.protocol(); let protocol = ProtocolInner { min_reader_version: if old_protocol.min_reader_version() > 1 { old_protocol.min_reader_version() @@ -213,7 +211,7 @@ impl std::future::IntoFuture for ConstraintBuilder { .with_actions(actions) .with_operation_id(operation_id) .with_post_commit_hook_handler(this.custom_execute_handler.clone()) - .build(Some(&this.snapshot), this.log_store.clone(), operation) + .build(Some(&snapshot), this.log_store.clone(), operation) .await?; if let Some(handler) = this.custom_execute_handler { diff --git a/crates/core/src/operations/convert_to_delta.rs b/crates/core/src/operations/convert_to_delta.rs index 0138130416..fb7a843873 100644 --- a/crates/core/src/operations/convert_to_delta.rs +++ b/crates/core/src/operations/convert_to_delta.rs @@ -168,7 +168,7 @@ impl Default for ConvertToDeltaBuilder { } } -impl super::Operation<()> for ConvertToDeltaBuilder { +impl super::Operation for ConvertToDeltaBuilder { fn log_store(&self) -> &LogStoreRef { self.log_store .as_ref() diff --git a/crates/core/src/operations/create.rs b/crates/core/src/operations/create.rs index 122fc854a8..4267b8729b 100644 --- a/crates/core/src/operations/create.rs +++ b/crates/core/src/operations/create.rs @@ -68,7 +68,7 @@ pub struct CreateBuilder { custom_execute_handler: Option>, } -impl super::Operation<()> for CreateBuilder { +impl super::Operation for CreateBuilder { fn log_store(&self) -> &LogStoreRef { self.log_store .as_ref() diff --git a/crates/core/src/operations/delete.rs b/crates/core/src/operations/delete.rs index c9284cd5a0..119b057fff 100644 --- a/crates/core/src/operations/delete.rs +++ b/crates/core/src/operations/delete.rs @@ -52,7 +52,7 @@ use crate::delta_datafusion::{ }; use crate::errors::DeltaResult; use crate::kernel::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; -use crate::kernel::{Action, Add, EagerSnapshot, Remove}; +use crate::kernel::{resolve_snapshot, Action, Add, EagerSnapshot, Remove}; use crate::logstore::LogStoreRef; use crate::operations::write::execution::{write_execution_plan, write_execution_plan_cdc}; use crate::operations::write::WriterStatsConfig; @@ -72,7 +72,7 @@ pub struct DeleteBuilder { /// Which records to delete predicate: Option, /// A snapshot of the table's state - snapshot: EagerSnapshot, + snapshot: Option, /// Delta object store for handling data files log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan @@ -114,7 +114,7 @@ pub struct DeleteMetrics { pub rewrite_time_ms: u64, } -impl super::Operation<()> for DeleteBuilder { +impl super::Operation for DeleteBuilder { fn log_store(&self) -> &LogStoreRef { &self.log_store } @@ -125,7 +125,7 @@ impl super::Operation<()> for DeleteBuilder { impl DeleteBuilder { /// Create a new [`DeleteBuilder`] - pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { + pub(crate) fn new(log_store: LogStoreRef, snapshot: Option) -> Self { Self { predicate: None, snapshot, @@ -176,8 +176,9 @@ impl std::future::IntoFuture for DeleteBuilder { let this = self; Box::pin(async move { - PROTOCOL.check_append_only(&this.snapshot)?; - PROTOCOL.can_write_to(&this.snapshot)?; + let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), true).await?; + PROTOCOL.check_append_only(&snapshot)?; + PROTOCOL.can_write_to(&snapshot)?; let operation_id = this.get_operation_id(); this.pre_execute(operation_id).await?; @@ -191,10 +192,9 @@ impl std::future::IntoFuture for DeleteBuilder { let predicate = match this.predicate { Some(predicate) => match predicate { Expression::DataFusion(expr) => Some(expr), - Expression::String(s) => Some( - this.snapshot - .parse_predicate_expression(s, session.as_ref())?, - ), + Expression::String(s) => { + Some(snapshot.parse_predicate_expression(s, session.as_ref())?) + } }, None => None, }; @@ -202,7 +202,7 @@ impl std::future::IntoFuture for DeleteBuilder { let (new_snapshot, metrics) = execute( predicate, this.log_store.clone(), - this.snapshot, + snapshot, session.as_ref(), this.writer_properties, this.commit_properties, diff --git a/crates/core/src/operations/drop_constraints.rs b/crates/core/src/operations/drop_constraints.rs index 36d85429cb..9e20208769 100644 --- a/crates/core/src/operations/drop_constraints.rs +++ b/crates/core/src/operations/drop_constraints.rs @@ -5,8 +5,8 @@ use std::sync::Arc; use futures::future::BoxFuture; use super::{CustomExecuteHandler, Operation}; -use crate::kernel::transaction::{CommitBuilder, CommitProperties}; -use crate::kernel::{Action, EagerSnapshot, MetadataExt}; +use crate::kernel::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; +use crate::kernel::{resolve_snapshot, Action, EagerSnapshot, MetadataExt}; use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; @@ -16,7 +16,7 @@ use crate::{DeltaResult, DeltaTableError}; /// Remove constraints from the table pub struct DropConstraintBuilder { /// A snapshot of the table's state - snapshot: EagerSnapshot, + snapshot: Option, /// Name of the constraint name: Option, /// Raise if constraint doesn't exist @@ -28,7 +28,7 @@ pub struct DropConstraintBuilder { custom_execute_handler: Option>, } -impl super::Operation<()> for DropConstraintBuilder { +impl super::Operation for DropConstraintBuilder { fn log_store(&self) -> &LogStoreRef { &self.log_store } @@ -39,7 +39,7 @@ impl super::Operation<()> for DropConstraintBuilder { impl DropConstraintBuilder { /// Create a new builder - pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { + pub(crate) fn new(log_store: LogStoreRef, snapshot: Option) -> Self { Self { name: None, raise_if_not_exists: true, @@ -84,6 +84,9 @@ impl std::future::IntoFuture for DropConstraintBuilder { let this = self; Box::pin(async move { + let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), false).await?; + PROTOCOL.can_write_to(&snapshot)?; + let name = this .name .clone() @@ -92,7 +95,7 @@ impl std::future::IntoFuture for DropConstraintBuilder { let operation_id = this.get_operation_id(); this.pre_execute(operation_id).await?; - let mut metadata = this.snapshot.metadata().clone(); + let mut metadata = snapshot.metadata().clone(); let configuration_key = format!("delta.constraints.{name}"); if !metadata.configuration().contains_key(&configuration_key) { @@ -103,9 +106,7 @@ impl std::future::IntoFuture for DropConstraintBuilder { } return Ok(DeltaTable::new_with_state( this.log_store, - DeltaTableState { - snapshot: this.snapshot, - }, + DeltaTableState::new(snapshot), )); } @@ -118,7 +119,7 @@ impl std::future::IntoFuture for DropConstraintBuilder { .with_operation_id(operation_id) .with_post_commit_hook_handler(this.get_custom_execute_handler()) .with_actions(actions) - .build(Some(&this.snapshot), this.log_store.clone(), operation) + .build(Some(&snapshot), this.log_store.clone(), operation) .await?; this.post_execute(operation_id).await?; diff --git a/crates/core/src/operations/filesystem_check.rs b/crates/core/src/operations/filesystem_check.rs index 956f77e35e..b3591c6cc2 100644 --- a/crates/core/src/operations/filesystem_check.rs +++ b/crates/core/src/operations/filesystem_check.rs @@ -29,6 +29,7 @@ use uuid::Uuid; use super::CustomExecuteHandler; use super::Operation; use crate::errors::{DeltaResult, DeltaTableError}; +use crate::kernel::resolve_snapshot; use crate::kernel::transaction::{CommitBuilder, CommitProperties}; use crate::kernel::EagerSnapshot; use crate::kernel::{Action, Add, Remove}; @@ -41,7 +42,7 @@ use crate::DeltaTable; /// See this module's documentation for more information pub struct FileSystemCheckBuilder { /// A snapshot of the to-be-checked table's state - snapshot: EagerSnapshot, + snapshot: Option, /// Delta object store for handling data files log_store: LogStoreRef, /// Don't remove actions to the table log. Just determine which files can be removed @@ -100,7 +101,7 @@ fn is_absolute_path(path: &str) -> DeltaResult { } } -impl super::Operation<()> for FileSystemCheckBuilder { +impl super::Operation for FileSystemCheckBuilder { fn log_store(&self) -> &LogStoreRef { &self.log_store } @@ -111,7 +112,7 @@ impl super::Operation<()> for FileSystemCheckBuilder { impl FileSystemCheckBuilder { /// Create a new [`FileSystemCheckBuilder`] - pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { + pub(crate) fn new(log_store: LogStoreRef, snapshot: Option) -> Self { FileSystemCheckBuilder { snapshot, log_store, @@ -139,10 +140,10 @@ impl FileSystemCheckBuilder { self } - async fn create_fsck_plan(&self) -> DeltaResult { + async fn create_fsck_plan(&self, snapshot: &EagerSnapshot) -> DeltaResult { let mut files_relative: HashMap = HashMap::new(); let log_store = self.log_store.clone(); - let file_stream = self.snapshot.log_data().into_iter().map(|f| f.add_action()); + let file_stream = snapshot.log_data().into_iter().map(|f| f.add_action()); for active in file_stream { if is_absolute_path(&active.path)? { return Err(DeltaTableError::Generic( @@ -249,15 +250,12 @@ impl std::future::IntoFuture for FileSystemCheckBuilder { let this = self; Box::pin(async move { - let plan = this.create_fsck_plan().await?; + let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), true).await?; + + let plan = this.create_fsck_plan(&snapshot).await?; if this.dry_run { return Ok(( - DeltaTable::new_with_state( - this.log_store, - DeltaTableState { - snapshot: this.snapshot, - }, - ), + DeltaTable::new_with_state(this.log_store, DeltaTableState::new(snapshot)), FileSystemCheckMetrics { files_removed: plan.files_to_remove.into_iter().map(|f| f.path).collect(), dry_run: true, @@ -266,12 +264,7 @@ impl std::future::IntoFuture for FileSystemCheckBuilder { } if plan.files_to_remove.is_empty() { return Ok(( - DeltaTable::new_with_state( - this.log_store, - DeltaTableState { - snapshot: this.snapshot, - }, - ), + DeltaTable::new_with_state(this.log_store, DeltaTableState::new(snapshot)), FileSystemCheckMetrics { dry_run: false, files_removed: Vec::new(), @@ -283,7 +276,7 @@ impl std::future::IntoFuture for FileSystemCheckBuilder { let metrics = plan .execute( - &this.snapshot, + &snapshot, this.commit_properties.clone(), operation_id, this.get_custom_execute_handler(), @@ -292,12 +285,8 @@ impl std::future::IntoFuture for FileSystemCheckBuilder { this.post_execute(operation_id).await?; - let mut table = DeltaTable::new_with_state( - this.log_store, - DeltaTableState { - snapshot: this.snapshot, - }, - ); + let mut table = + DeltaTable::new_with_state(this.log_store, DeltaTableState::new(snapshot)); table.update().await?; Ok((table, metrics)) }) diff --git a/crates/core/src/operations/load.rs b/crates/core/src/operations/load.rs index 35b3c9f9fe..097ddd3e90 100644 --- a/crates/core/src/operations/load.rs +++ b/crates/core/src/operations/load.rs @@ -10,7 +10,7 @@ use super::CustomExecuteHandler; use crate::delta_datafusion::{create_session, DataFusionMixins as _}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::transaction::PROTOCOL; -use crate::kernel::EagerSnapshot; +use crate::kernel::{resolve_snapshot, EagerSnapshot}; use crate::logstore::LogStoreRef; use crate::table::state::DeltaTableState; use crate::DeltaTable; @@ -18,7 +18,7 @@ use crate::DeltaTable; #[derive(Clone)] pub struct LoadBuilder { /// A snapshot of the to-be-loaded table's state - snapshot: EagerSnapshot, + snapshot: Option, /// Delta object store for handling data files log_store: LogStoreRef, /// A sub-selection of columns to be loaded @@ -36,7 +36,7 @@ impl std::fmt::Debug for LoadBuilder { } } -impl super::Operation<()> for LoadBuilder { +impl super::Operation for LoadBuilder { fn log_store(&self) -> &LogStoreRef { &self.log_store } @@ -47,7 +47,7 @@ impl super::Operation<()> for LoadBuilder { impl LoadBuilder { /// Create a new [`LoadBuilder`] - pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { + pub(crate) fn new(log_store: LogStoreRef, snapshot: Option) -> Self { Self { snapshot, log_store, @@ -77,18 +77,10 @@ impl std::future::IntoFuture for LoadBuilder { let this = self; Box::pin(async move { - PROTOCOL.can_read_from(&this.snapshot)?; - if !this.snapshot.load_config().require_files { - return Err(DeltaTableError::NotInitializedWithFiles("reading".into())); - } - - let table = DeltaTable::new_with_state( - this.log_store, - DeltaTableState { - snapshot: this.snapshot, - }, - ); - let schema = table.snapshot()?.snapshot().read_schema(); + let snapshot = resolve_snapshot(&this.log_store, this.snapshot, true).await?; + PROTOCOL.can_read_from(&snapshot)?; + + let schema = snapshot.read_schema(); let projection = this .columns .map(|cols| { @@ -108,6 +100,7 @@ impl std::future::IntoFuture for LoadBuilder { .session .unwrap_or_else(|| Arc::new(create_session().into_inner().state())); + let table = DeltaTable::new_with_state(this.log_store, DeltaTableState::new(snapshot)); let scan_plan = table .scan(session.as_ref(), projection.as_ref(), &[], None) .await?; diff --git a/crates/core/src/operations/load_cdf.rs b/crates/core/src/operations/load_cdf.rs index 12f14eee2c..4ae0dea874 100644 --- a/crates/core/src/operations/load_cdf.rs +++ b/crates/core/src/operations/load_cdf.rs @@ -33,7 +33,7 @@ use tracing::log; use crate::delta_datafusion::{register_store, DataFusionMixins}; use crate::errors::DeltaResult; use crate::kernel::transaction::PROTOCOL; -use crate::kernel::{Action, Add, AddCDCFile, CommitInfo, EagerSnapshot}; +use crate::kernel::{resolve_snapshot, Action, Add, AddCDCFile, CommitInfo, EagerSnapshot}; use crate::logstore::{get_actions, LogStoreRef}; use crate::DeltaTableError; use crate::{delta_datafusion::cdf::*, kernel::Remove}; @@ -42,7 +42,7 @@ use crate::{delta_datafusion::cdf::*, kernel::Remove}; #[derive(Clone)] pub struct CdfLoadBuilder { /// A snapshot of the to-be-loaded table's state - pub snapshot: EagerSnapshot, + pub(crate) snapshot: Option, /// Delta object store for handling data files log_store: LogStoreRef, /// Version to read from @@ -74,8 +74,8 @@ impl std::fmt::Debug for CdfLoadBuilder { } impl CdfLoadBuilder { - /// Create a new [`LoadBuilder`] - pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { + /// Create a new [`CdfLoadBuilder`] + pub(crate) fn new(log_store: LogStoreRef, snapshot: Option) -> Self { Self { snapshot, log_store, @@ -124,9 +124,9 @@ impl CdfLoadBuilder { self } - async fn calculate_earliest_version(&self) -> DeltaResult { + async fn calculate_earliest_version(&self, snapshot: &EagerSnapshot) -> DeltaResult { let ts = self.starting_timestamp.unwrap_or(DateTime::UNIX_EPOCH); - for v in 0..self.snapshot.version() { + for v in 0..snapshot.version() { if let Ok(Some(bytes)) = self.log_store.read_commit_entry(v).await { if let Ok(actions) = get_actions(v, &bytes) { if actions.iter().any(|action| { @@ -148,6 +148,7 @@ impl CdfLoadBuilder { /// than I have right now. I plan to extend the checks once we have a stable state of the initial implementation. async fn determine_files_to_read( &self, + snapshot: &EagerSnapshot, ) -> DeltaResult<( Vec>, Vec>, @@ -159,7 +160,7 @@ impl CdfLoadBuilder { let start = if let Some(s) = self.starting_version { s } else { - self.calculate_earliest_version().await? + self.calculate_earliest_version(snapshot).await? }; let mut change_files: Vec> = vec![]; @@ -351,13 +352,14 @@ impl CdfLoadBuilder { session: &dyn Session, filters: Option<&Arc>, ) -> DeltaResult> { - PROTOCOL.can_read_from(&self.snapshot)?; + let snapshot = resolve_snapshot(&self.log_store, self.snapshot.clone(), true).await?; + PROTOCOL.can_read_from(&snapshot)?; - let (cdc, add, remove) = self.determine_files_to_read().await?; + let (cdc, add, remove) = self.determine_files_to_read(&snapshot).await?; register_store(self.log_store.clone(), session.runtime_env().as_ref()); - let partition_values = self.snapshot.metadata().partition_columns().clone(); - let schema = self.snapshot.input_schema(); + let partition_values = snapshot.metadata().partition_columns().clone(); + let schema = snapshot.input_schema(); let schema_fields: Vec> = schema .fields() .into_iter() diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index 3b60eeae0c..2674b32ca4 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -81,7 +81,7 @@ use crate::delta_datafusion::{ }; use crate::kernel::schema::cast::{merge_arrow_field, merge_arrow_schema}; use crate::kernel::transaction::{CommitBuilder, CommitProperties, PROTOCOL}; -use crate::kernel::{new_metadata, Action, EagerSnapshot, StructTypeExt}; +use crate::kernel::{new_metadata, resolve_snapshot, Action, EagerSnapshot, StructTypeExt}; use crate::logstore::LogStoreRef; use crate::operations::cdc::*; use crate::operations::merge::barrier::find_node; @@ -134,7 +134,7 @@ pub struct MergeBuilder { ///Prefix target columns with a user provided prefix target_alias: Option, /// A snapshot of the table's state. AKA the target table in the operation - snapshot: EagerSnapshot, + snapshot: Option, /// The source data source: DataFrame, /// Whether the source is a streaming source (if true, stats deducing to prune target is disabled) @@ -155,7 +155,7 @@ pub struct MergeBuilder { custom_execute_handler: Option>, } -impl super::Operation<()> for MergeBuilder { +impl super::Operation for MergeBuilder { fn log_store(&self) -> &LogStoreRef { &self.log_store } @@ -168,7 +168,7 @@ impl MergeBuilder { /// Create a new [`MergeBuilder`] pub fn new>( log_store: LogStoreRef, - snapshot: EagerSnapshot, + snapshot: Option, predicate: E, source: DataFrame, ) -> Self { @@ -1540,11 +1540,9 @@ impl std::future::IntoFuture for MergeBuilder { let this = self; Box::pin(async move { - PROTOCOL.can_write_to(&this.snapshot)?; + let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), true).await?; - if !this.snapshot.load_config().require_files { - return Err(DeltaTableError::NotInitializedWithFiles("MERGE".into())); - } + PROTOCOL.can_write_to(&snapshot)?; let operation_id = this.get_operation_id(); this.pre_execute(operation_id).await?; @@ -1563,7 +1561,7 @@ impl std::future::IntoFuture for MergeBuilder { this.predicate, this.source, this.log_store.clone(), - this.snapshot, + snapshot, state, this.writer_properties, this.commit_properties, diff --git a/crates/core/src/operations/mod.rs b/crates/core/src/operations/mod.rs index cb6e9dcb83..20284447a6 100644 --- a/crates/core/src/operations/mod.rs +++ b/crates/core/src/operations/mod.rs @@ -6,41 +6,36 @@ //! the operations' behaviors and will return an updated table potentially in conjunction //! with a [data stream][datafusion::physical_plan::SendableRecordBatchStream], //! if the operation returns data as well. -use async_trait::async_trait; -use delta_kernel::table_properties::{DataSkippingNumIndexedCols, TableProperties}; use std::collections::HashMap; use std::sync::Arc; -use update_field_metadata::UpdateFieldMetadataBuilder; -use uuid::Uuid; -use add_feature::AddTableFeatureBuilder; #[cfg(feature = "datafusion")] -use arrow_array::RecordBatch; +use arrow::array::RecordBatch; +use async_trait::async_trait; #[cfg(feature = "datafusion")] pub use datafusion::physical_plan::common::collect as collect_sendable_stream; +use delta_kernel::table_properties::{DataSkippingNumIndexedCols, TableProperties}; +use url::Url; +use uuid::Uuid; -use self::add_column::AddColumnBuilder; -use self::create::CreateBuilder; -use self::filesystem_check::FileSystemCheckBuilder; -#[cfg(feature = "datafusion")] -use self::optimize::OptimizeBuilder; -use self::restore::RestoreBuilder; -use self::set_tbl_properties::SetTablePropertiesBuilder; -use self::update_table_metadata::UpdateTableMetadataBuilder; -use self::vacuum::VacuumBuilder; +use self::{ + add_column::AddColumnBuilder, add_feature::AddTableFeatureBuilder, create::CreateBuilder, + filesystem_check::FileSystemCheckBuilder, restore::RestoreBuilder, + set_tbl_properties::SetTablePropertiesBuilder, + update_field_metadata::UpdateFieldMetadataBuilder, + update_table_metadata::UpdateTableMetadataBuilder, vacuum::VacuumBuilder, +}; #[cfg(feature = "datafusion")] use self::{ constraints::ConstraintBuilder, datafusion_utils::Expression, delete::DeleteBuilder, drop_constraints::DropConstraintBuilder, load::LoadBuilder, load_cdf::CdfLoadBuilder, - merge::MergeBuilder, update::UpdateBuilder, write::WriteBuilder, + merge::MergeBuilder, optimize::OptimizeBuilder, update::UpdateBuilder, write::WriteBuilder, }; use crate::errors::{DeltaResult, DeltaTableError}; use crate::logstore::LogStoreRef; -use crate::table::builder::ensure_table_uri; -use crate::table::builder::DeltaTableBuilder; +use crate::table::builder::{ensure_table_uri, DeltaTableBuilder}; use crate::table::config::{TablePropertiesExt as _, DEFAULT_NUM_INDEX_COLS}; use crate::DeltaTable; -use url::Url; pub mod add_column; pub mod add_feature; @@ -101,7 +96,7 @@ pub trait CustomExecuteHandler: Send + Sync { #[allow(unused)] /// The [Operation] trait defines common behaviors that all operations builders /// should have consistent -pub(crate) trait Operation: std::future::IntoFuture { +pub(crate) trait Operation: std::future::IntoFuture { fn log_store(&self) -> &LogStoreRef; fn get_custom_execute_handler(&self) -> Option>; async fn pre_execute(&self, operation_id: Uuid) -> DeltaResult<()> { @@ -231,14 +226,14 @@ impl DeltaOps { #[cfg(feature = "datafusion")] #[must_use] pub fn load(self) -> LoadBuilder { - LoadBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) + LoadBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot)) } /// Load a table with CDF Enabled #[cfg(feature = "datafusion")] #[must_use] pub fn load_cdf(self) -> CdfLoadBuilder { - CdfLoadBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) + CdfLoadBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot)) } /// Write data to Delta table @@ -252,40 +247,40 @@ impl DeltaOps { /// Vacuum stale files from delta table #[must_use] pub fn vacuum(self) -> VacuumBuilder { - VacuumBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) + VacuumBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot)) } /// Audit active files with files present on the filesystem #[must_use] pub fn filesystem_check(self) -> FileSystemCheckBuilder { - FileSystemCheckBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) + FileSystemCheckBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot)) } /// Audit active files with files present on the filesystem #[cfg(feature = "datafusion")] #[must_use] pub fn optimize<'a>(self) -> OptimizeBuilder<'a> { - OptimizeBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) + OptimizeBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot)) } /// Delete data from Delta table #[cfg(feature = "datafusion")] #[must_use] pub fn delete(self) -> DeleteBuilder { - DeleteBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) + DeleteBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot)) } /// Update data from Delta table #[cfg(feature = "datafusion")] #[must_use] pub fn update(self) -> UpdateBuilder { - UpdateBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) + UpdateBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot)) } /// Restore delta table to a specified version or datetime #[must_use] pub fn restore(self) -> RestoreBuilder { - RestoreBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) + RestoreBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot)) } /// Update data from Delta table @@ -298,7 +293,7 @@ impl DeltaOps { ) -> MergeBuilder { MergeBuilder::new( self.0.log_store, - self.0.state.unwrap().snapshot, + self.0.state.map(|s| s.snapshot), predicate.into(), source, ) @@ -308,40 +303,40 @@ impl DeltaOps { #[cfg(feature = "datafusion")] #[must_use] pub fn add_constraint(self) -> ConstraintBuilder { - ConstraintBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) + ConstraintBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot)) } /// Enable a table feature for a table #[must_use] pub fn add_feature(self) -> AddTableFeatureBuilder { - AddTableFeatureBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) + AddTableFeatureBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot)) } /// Drops constraints from a table #[cfg(feature = "datafusion")] #[must_use] pub fn drop_constraints(self) -> DropConstraintBuilder { - DropConstraintBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) + DropConstraintBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot)) } /// Set table properties pub fn set_tbl_properties(self) -> SetTablePropertiesBuilder { - SetTablePropertiesBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) + SetTablePropertiesBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot)) } /// Add new columns pub fn add_columns(self) -> AddColumnBuilder { - AddColumnBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) + AddColumnBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot)) } /// Update field metadata pub fn update_field_metadata(self) -> UpdateFieldMetadataBuilder { - UpdateFieldMetadataBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) + UpdateFieldMetadataBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot)) } /// Update table metadata pub fn update_table_metadata(self) -> UpdateTableMetadataBuilder { - UpdateTableMetadataBuilder::new(self.0.log_store, self.0.state.unwrap().snapshot) + UpdateTableMetadataBuilder::new(self.0.log_store, self.0.state.map(|s| s.snapshot)) } } diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index 20947f0c9c..c57bd55905 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -54,7 +54,7 @@ use super::{CustomExecuteHandler, Operation}; use crate::delta_datafusion::DeltaTableProvider; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::transaction::{CommitBuilder, CommitProperties, DEFAULT_RETRIES, PROTOCOL}; -use crate::kernel::EagerSnapshot; +use crate::kernel::{resolve_snapshot, EagerSnapshot}; use crate::kernel::{scalars::ScalarExt, Action, Add, PartitionsExt, Remove}; use crate::logstore::{LogStore, LogStoreRef, ObjectStoreRef}; use crate::protocol::DeltaOperation; @@ -200,7 +200,7 @@ pub enum OptimizeType { /// table's configuration is read. Otherwise a default value is used. pub struct OptimizeBuilder<'a> { /// A snapshot of the to-be-optimized table's state - snapshot: EagerSnapshot, + snapshot: Option, /// Delta object store for handling data files log_store: LogStoreRef, /// Filters to select specific table partitions to be optimized @@ -225,7 +225,7 @@ pub struct OptimizeBuilder<'a> { custom_execute_handler: Option>, } -impl super::Operation<()> for OptimizeBuilder<'_> { +impl super::Operation for OptimizeBuilder<'_> { fn log_store(&self) -> &LogStoreRef { &self.log_store } @@ -236,7 +236,7 @@ impl super::Operation<()> for OptimizeBuilder<'_> { impl<'a> OptimizeBuilder<'a> { /// Create a new [`OptimizeBuilder`] - pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { + pub(crate) fn new(log_store: LogStoreRef, snapshot: Option) -> Self { Self { snapshot, log_store, @@ -333,10 +333,9 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { let this = self; Box::pin(async move { - PROTOCOL.can_write_to(&this.snapshot)?; - if !&this.snapshot.load_config().require_files { - return Err(DeltaTableError::NotInitializedWithFiles("OPTIMIZE".into())); - } + let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), true).await?; + PROTOCOL.can_write_to(&snapshot)?; + let operation_id = this.get_operation_id(); this.pre_execute(operation_id).await?; @@ -363,7 +362,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { let plan = create_merge_plan( &this.log_store, this.optimize_type, - &this.snapshot, + &snapshot, this.filters, this.target_size.to_owned(), writer_properties, @@ -374,7 +373,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { let metrics = plan .execute( this.log_store.clone(), - &this.snapshot, + &snapshot, this.max_concurrent_tasks, this.min_commit_interval, this.commit_properties.clone(), @@ -387,7 +386,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { handler.post_execute(&this.log_store, operation_id).await?; } let mut table = - DeltaTable::new_with_state(this.log_store, DeltaTableState::new(this.snapshot)); + DeltaTable::new_with_state(this.log_store, DeltaTableState::new(snapshot)); table.update().await?; Ok((table, metrics)) }) diff --git a/crates/core/src/operations/restore.rs b/crates/core/src/operations/restore.rs index ef92afcfe2..8b8a17402b 100644 --- a/crates/core/src/operations/restore.rs +++ b/crates/core/src/operations/restore.rs @@ -36,7 +36,9 @@ use uuid::Uuid; use super::{CustomExecuteHandler, Operation}; use crate::kernel::transaction::{CommitBuilder, CommitProperties, TransactionError}; -use crate::kernel::{Action, Add, EagerSnapshot, ProtocolExt as _, ProtocolInner, Remove}; +use crate::kernel::{ + resolve_snapshot, Action, Add, EagerSnapshot, ProtocolExt as _, ProtocolInner, Remove, +}; use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; use crate::table::state::DeltaTableState; @@ -77,7 +79,7 @@ pub struct RestoreMetrics { /// See this module's documentation for more information pub struct RestoreBuilder { /// A snapshot of the to-be-restored table's state - snapshot: EagerSnapshot, + snapshot: Option, /// Delta object store for handling data files log_store: LogStoreRef, /// Version to restore @@ -93,7 +95,7 @@ pub struct RestoreBuilder { custom_execute_handler: Option>, } -impl super::Operation<()> for RestoreBuilder { +impl super::Operation for RestoreBuilder { fn log_store(&self) -> &LogStoreRef { &self.log_store } @@ -104,7 +106,7 @@ impl super::Operation<()> for RestoreBuilder { impl RestoreBuilder { /// Create a new [`RestoreBuilder`] - pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { + pub(crate) fn new(log_store: LogStoreRef, snapshot: Option) -> Self { Self { snapshot, log_store, @@ -345,12 +347,14 @@ impl std::future::IntoFuture for RestoreBuilder { let this = self; Box::pin(async move { + let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), true).await?; + let operation_id = this.get_operation_id(); this.pre_execute(operation_id).await?; let metrics = execute( this.log_store.clone(), - this.snapshot.clone(), + snapshot.clone(), this.version_to_restore, this.datetime_to_restore, this.ignore_missing_files, @@ -362,12 +366,8 @@ impl std::future::IntoFuture for RestoreBuilder { this.post_execute(operation_id).await?; - let mut table = DeltaTable::new_with_state( - this.log_store, - DeltaTableState { - snapshot: this.snapshot, - }, - ); + let mut table = + DeltaTable::new_with_state(this.log_store, DeltaTableState::new(snapshot)); table.update().await?; Ok((table, metrics)) }) diff --git a/crates/core/src/operations/set_tbl_properties.rs b/crates/core/src/operations/set_tbl_properties.rs index 382f71e787..0e5302000b 100644 --- a/crates/core/src/operations/set_tbl_properties.rs +++ b/crates/core/src/operations/set_tbl_properties.rs @@ -7,7 +7,7 @@ use futures::future::BoxFuture; use super::{CustomExecuteHandler, Operation}; use crate::kernel::transaction::{CommitBuilder, CommitProperties}; -use crate::kernel::{Action, EagerSnapshot, MetadataExt as _, ProtocolExt as _}; +use crate::kernel::{resolve_snapshot, Action, EagerSnapshot, MetadataExt as _, ProtocolExt as _}; use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; use crate::DeltaResult; @@ -16,7 +16,7 @@ use crate::DeltaTable; /// Remove constraints from the table pub struct SetTablePropertiesBuilder { /// A snapshot of the table's state - snapshot: EagerSnapshot, + snapshot: Option, /// Name of the property properties: HashMap, /// Raise if property doesn't exist @@ -28,7 +28,7 @@ pub struct SetTablePropertiesBuilder { custom_execute_handler: Option>, } -impl super::Operation<()> for SetTablePropertiesBuilder { +impl super::Operation for SetTablePropertiesBuilder { fn log_store(&self) -> &LogStoreRef { &self.log_store } @@ -39,7 +39,7 @@ impl super::Operation<()> for SetTablePropertiesBuilder { impl SetTablePropertiesBuilder { /// Create a new builder - pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { + pub(crate) fn new(log_store: LogStoreRef, snapshot: Option) -> Self { Self { properties: HashMap::new(), raise_if_not_exists: true, @@ -84,12 +84,14 @@ impl std::future::IntoFuture for SetTablePropertiesBuilder { let this = self; Box::pin(async move { + let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), false).await?; + let operation_id = this.get_operation_id(); this.pre_execute(operation_id).await?; - let mut metadata = this.snapshot.metadata().clone(); + let mut metadata = snapshot.metadata().clone(); - let current_protocol = this.snapshot.protocol(); + let current_protocol = snapshot.protocol(); let properties = this.properties; let new_protocol = current_protocol @@ -115,11 +117,7 @@ impl std::future::IntoFuture for SetTablePropertiesBuilder { .with_actions(actions.clone()) .with_operation_id(operation_id) .with_post_commit_hook_handler(this.custom_execute_handler.clone()) - .build( - Some(&this.snapshot), - this.log_store.clone(), - operation.clone(), - ) + .build(Some(&snapshot), this.log_store.clone(), operation.clone()) .await?; if let Some(handler) = this.custom_execute_handler { diff --git a/crates/core/src/operations/update.rs b/crates/core/src/operations/update.rs index 66c3d531ca..8ab6683352 100644 --- a/crates/core/src/operations/update.rs +++ b/crates/core/src/operations/update.rs @@ -53,7 +53,6 @@ use super::{ write::execution::{write_execution_plan, write_execution_plan_cdc}, CustomExecuteHandler, Operation, }; -use crate::delta_datafusion::{find_files, planner::DeltaPlanner, register_store}; use crate::logstore::LogStoreRef; use crate::operations::cdc::*; use crate::protocol::DeltaOperation; @@ -73,6 +72,10 @@ use crate::{ }, table::config::TablePropertiesExt, }; +use crate::{ + delta_datafusion::{find_files, planner::DeltaPlanner, register_store}, + kernel::resolve_snapshot, +}; use crate::{DeltaResult, DeltaTable, DeltaTableError}; /// Custom column name used for marking internal [RecordBatch] rows as updated @@ -90,7 +93,7 @@ pub struct UpdateBuilder { /// How to update columns in a record that match the predicate updates: HashMap, /// A snapshot of the table's state - snapshot: EagerSnapshot, + snapshot: Option, /// Delta object store for handling data files log_store: LogStoreRef, /// Datafusion session state relevant for executing the input plan @@ -122,7 +125,7 @@ pub struct UpdateMetrics { pub scan_time_ms: u64, } -impl super::Operation<()> for UpdateBuilder { +impl super::Operation for UpdateBuilder { fn log_store(&self) -> &LogStoreRef { &self.log_store } @@ -133,7 +136,7 @@ impl super::Operation<()> for UpdateBuilder { impl UpdateBuilder { /// Create a new ['UpdateBuilder'] - pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { + pub(crate) fn new(log_store: LogStoreRef, snapshot: Option) -> Self { Self { predicate: None, updates: HashMap::new(), @@ -497,10 +500,11 @@ impl std::future::IntoFuture for UpdateBuilder { fn into_future(self) -> Self::IntoFuture { let this = self; Box::pin(async move { - PROTOCOL.check_append_only(&this.snapshot)?; - PROTOCOL.can_write_to(&this.snapshot)?; + let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), true).await?; + PROTOCOL.check_append_only(&snapshot)?; + PROTOCOL.can_write_to(&snapshot)?; - if !&this.snapshot.load_config().require_files { + if !&snapshot.load_config().require_files { return Err(DeltaTableError::NotInitializedWithFiles("UPDATE".into())); } @@ -517,7 +521,7 @@ impl std::future::IntoFuture for UpdateBuilder { this.predicate, this.updates, this.log_store.clone(), - this.snapshot, + snapshot, state.clone(), this.writer_properties, this.commit_properties, diff --git a/crates/core/src/operations/update_field_metadata.rs b/crates/core/src/operations/update_field_metadata.rs index 7457d41f9c..fac9163ee2 100644 --- a/crates/core/src/operations/update_field_metadata.rs +++ b/crates/core/src/operations/update_field_metadata.rs @@ -9,7 +9,7 @@ use itertools::Itertools; use super::{CustomExecuteHandler, Operation}; use crate::kernel::transaction::{CommitBuilder, CommitProperties}; -use crate::kernel::{EagerSnapshot, MetadataExt as _, ProtocolExt as _}; +use crate::kernel::{resolve_snapshot, EagerSnapshot, MetadataExt as _, ProtocolExt as _}; use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; use crate::DeltaTable; @@ -18,7 +18,7 @@ use crate::{DeltaResult, DeltaTableError}; /// Update a field's metadata in a schema. If the key does not exists, the entry is inserted. pub struct UpdateFieldMetadataBuilder { /// A snapshot of the table's state - snapshot: EagerSnapshot, + snapshot: Option, /// The name of the field where the metadata may be updated field_name: String, /// HashMap of the metadata to upsert @@ -30,7 +30,7 @@ pub struct UpdateFieldMetadataBuilder { custom_execute_handler: Option>, } -impl super::Operation<()> for UpdateFieldMetadataBuilder { +impl super::Operation for UpdateFieldMetadataBuilder { fn log_store(&self) -> &LogStoreRef { &self.log_store } @@ -41,7 +41,7 @@ impl super::Operation<()> for UpdateFieldMetadataBuilder { impl UpdateFieldMetadataBuilder { /// Create a new builder - pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { + pub(crate) fn new(log_store: LogStoreRef, snapshot: Option) -> Self { Self { metadata: HashMap::new(), field_name: String::new(), @@ -86,10 +86,12 @@ impl std::future::IntoFuture for UpdateFieldMetadataBuilder { let this = self; Box::pin(async move { + let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), false).await?; + let operation_id = this.get_operation_id(); this.pre_execute(operation_id).await?; - let table_schema = this.snapshot.schema(); + let table_schema = snapshot.schema(); // Check if the field exists in the schema. Otherwise, no need to continue the // operation @@ -134,9 +136,9 @@ impl std::future::IntoFuture for UpdateFieldMetadataBuilder { } }))?; - let mut metadata = this.snapshot.metadata().clone(); + let mut metadata = snapshot.metadata().clone(); - let current_protocol = this.snapshot.protocol(); + let current_protocol = snapshot.protocol(); let new_protocol = current_protocol .clone() .apply_column_metadata_to_protocol(&updated_table_schema)? @@ -158,7 +160,7 @@ impl std::future::IntoFuture for UpdateFieldMetadataBuilder { .with_actions(actions) .with_operation_id(operation_id) .with_post_commit_hook_handler(this.get_custom_execute_handler()) - .build(Some(&this.snapshot), this.log_store.clone(), operation) + .build(Some(&snapshot), this.log_store.clone(), operation) .await?; this.post_execute(operation_id).await?; diff --git a/crates/core/src/operations/update_table_metadata.rs b/crates/core/src/operations/update_table_metadata.rs index fae0fbdec9..1def386a66 100644 --- a/crates/core/src/operations/update_table_metadata.rs +++ b/crates/core/src/operations/update_table_metadata.rs @@ -7,7 +7,7 @@ use validator::Validate; use super::{CustomExecuteHandler, Operation}; use crate::kernel::transaction::{CommitBuilder, CommitProperties}; -use crate::kernel::{Action, EagerSnapshot, MetadataExt}; +use crate::kernel::{resolve_snapshot, Action, EagerSnapshot, MetadataExt}; use crate::logstore::LogStoreRef; use crate::protocol::DeltaOperation; use crate::DeltaTable; @@ -45,7 +45,7 @@ fn validate_at_least_one_field( /// Update table metadata operation pub struct UpdateTableMetadataBuilder { /// A snapshot of the table's state - snapshot: EagerSnapshot, + snapshot: Option, /// The metadata update to apply update: Option, /// Delta object store for handling data files @@ -55,7 +55,7 @@ pub struct UpdateTableMetadataBuilder { custom_execute_handler: Option>, } -impl super::Operation<()> for UpdateTableMetadataBuilder { +impl super::Operation for UpdateTableMetadataBuilder { fn log_store(&self) -> &LogStoreRef { &self.log_store } @@ -66,7 +66,7 @@ impl super::Operation<()> for UpdateTableMetadataBuilder { impl UpdateTableMetadataBuilder { /// Create a new builder - pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { + pub(crate) fn new(log_store: LogStoreRef, snapshot: Option) -> Self { Self { update: None, snapshot, @@ -104,6 +104,8 @@ impl std::future::IntoFuture for UpdateTableMetadataBuilder { let this = self; Box::pin(async move { + let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), false).await?; + let operation_id = this.get_operation_id(); this.pre_execute(operation_id).await?; @@ -114,7 +116,7 @@ impl std::future::IntoFuture for UpdateTableMetadataBuilder { .validate() .map_err(|e| DeltaTableError::MetadataError(format!("{e}")))?; - let mut metadata = this.snapshot.metadata().clone(); + let mut metadata = snapshot.metadata().clone(); if let Some(name) = &update.name { metadata = metadata.with_name(name.clone())?; @@ -133,11 +135,7 @@ impl std::future::IntoFuture for UpdateTableMetadataBuilder { .with_actions(actions) .with_operation_id(operation_id) .with_post_commit_hook_handler(this.custom_execute_handler.clone()) - .build( - Some(&this.snapshot), - this.log_store.clone(), - operation.clone(), - ) + .build(Some(&snapshot), this.log_store.clone(), operation.clone()) .await?; if let Some(handler) = this.custom_execute_handler { diff --git a/crates/core/src/operations/vacuum.rs b/crates/core/src/operations/vacuum.rs index d751f6a7f8..a78542d291 100644 --- a/crates/core/src/operations/vacuum.rs +++ b/crates/core/src/operations/vacuum.rs @@ -35,7 +35,7 @@ use tracing::*; use super::{CustomExecuteHandler, Operation}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::transaction::{CommitBuilder, CommitProperties}; -use crate::kernel::EagerSnapshot; +use crate::kernel::{resolve_snapshot, EagerSnapshot}; use crate::logstore::{LogStore, LogStoreRef}; use crate::protocol::DeltaOperation; use crate::table::config::TablePropertiesExt as _; @@ -93,7 +93,7 @@ pub enum VacuumMode { /// See this module's documentation for more information pub struct VacuumBuilder { /// A snapshot of the to-be-vacuumed table's state - snapshot: EagerSnapshot, + snapshot: Option, /// Delta object store for handling data files log_store: LogStoreRef, /// Period of stale files allowed. @@ -113,7 +113,7 @@ pub struct VacuumBuilder { custom_execute_handler: Option>, } -impl super::Operation<()> for VacuumBuilder { +impl super::Operation for VacuumBuilder { fn log_store(&self) -> &LogStoreRef { &self.log_store } @@ -154,7 +154,7 @@ pub struct VacuumEndOperationMetrics { /// Methods to specify various vacuum options and to execute the operation impl VacuumBuilder { /// Create a new [`VacuumBuilder`] - pub fn new(log_store: LogStoreRef, snapshot: EagerSnapshot) -> Self { + pub(crate) fn new(log_store: LogStoreRef, snapshot: Option) -> Self { VacuumBuilder { snapshot, log_store, @@ -221,13 +221,16 @@ impl VacuumBuilder { } /// Determine which files can be deleted. Does not actually perform the deletion - async fn create_vacuum_plan(&self) -> Result { + async fn create_vacuum_plan( + &self, + snapshot: &EagerSnapshot, + ) -> Result { if self.mode == VacuumMode::Full { info!("Vacuum configured to run with 'VacuumMode::Full'. It will scan for orphaned parquet files in the Delta table directory and remove those as well!"); } let min_retention = Duration::milliseconds( - self.snapshot + snapshot .table_properties() .deleted_file_retention_duration() .as_millis() as i64, @@ -273,15 +276,9 @@ impl VacuumBuilder { _ => HashSet::new(), }; - let expired_tombstones = get_stale_files( - &self.snapshot, - retention_period, - now_millis, - &self.log_store, - ) - .await?; - let valid_files: HashSet<_> = self - .snapshot + let expired_tombstones = + get_stale_files(&snapshot, retention_period, now_millis, &self.log_store).await?; + let valid_files: HashSet<_> = snapshot .file_views(self.log_store.as_ref(), None) .map_ok(|f| f.object_store_path()) .try_collect() @@ -293,7 +290,7 @@ impl VacuumBuilder { let list_span = info_span!("list_files", operation = "vacuum"); let mut all_files = list_span.in_scope(|| object_store.list(None)); - let partition_columns = self.snapshot.metadata().partition_columns(); + let partition_columns = snapshot.metadata().partition_columns(); let mut file_count = 0; while let Some(obj_meta) = all_files.next().await { @@ -365,27 +362,26 @@ impl std::future::IntoFuture for VacuumBuilder { fn into_future(self) -> Self::IntoFuture { let this = self; Box::pin(async move { - if !&this.snapshot.load_config().require_files { - return Err(DeltaTableError::NotInitializedWithFiles("VACUUM".into())); - } + let snapshot = resolve_snapshot(&this.log_store, this.snapshot.clone(), true).await?; + let plan = this.create_vacuum_plan(&snapshot).await?; - let plan = this.create_vacuum_plan().await?; if this.dry_run { return Ok(( - DeltaTable::new_with_state(this.log_store, DeltaTableState::new(this.snapshot)), + DeltaTable::new_with_state(this.log_store, DeltaTableState::new(snapshot)), VacuumMetrics { files_deleted: plan.files_to_delete.iter().map(|f| f.to_string()).collect(), dry_run: true, }, )); } + let operation_id = this.get_operation_id(); this.pre_execute(operation_id).await?; let result = plan .execute( this.log_store.clone(), - &this.snapshot, + &snapshot, this.commit_properties.clone(), operation_id, this.get_custom_execute_handler(), @@ -400,7 +396,7 @@ impl std::future::IntoFuture for VacuumBuilder { metrics, ), None => ( - DeltaTable::new_with_state(this.log_store, DeltaTableState::new(this.snapshot)), + DeltaTable::new_with_state(this.log_store, DeltaTableState::new(snapshot)), Default::default(), ), }) @@ -562,7 +558,7 @@ mod tests { let table = open_table(table_uri).await?; let (_table, result) = - VacuumBuilder::new(table.log_store(), table.snapshot()?.snapshot.clone()) + VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone())) .with_retention_period(Duration::hours(0)) .with_dry_run(true) .with_mode(VacuumMode::Lite) @@ -573,7 +569,7 @@ mod tests { assert!(result.files_deleted.is_empty()); let (_table, result) = - VacuumBuilder::new(table.log_store(), table.snapshot()?.snapshot.clone()) + VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone())) .with_retention_period(Duration::hours(0)) .with_dry_run(true) .with_mode(VacuumMode::Full) @@ -607,7 +603,7 @@ mod tests { // First, vacuum without keeping any particular versions let (_table, result) = - VacuumBuilder::new(table.log_store(), table.snapshot()?.snapshot.clone()) + VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone())) .with_retention_period(Duration::hours(0)) .with_dry_run(true) .with_mode(VacuumMode::Full) @@ -619,7 +615,7 @@ mod tests { // Next, vacuum with specific versions retained let (_table, result) = - VacuumBuilder::new(table.log_store(), table.snapshot()?.snapshot.clone()) + VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone())) .with_retention_period(Duration::hours(0)) .with_keep_versions(&versions_to_keep) .with_dry_run(true) @@ -646,7 +642,7 @@ mod tests { // First, vacuum without keeping any particular versions let (_table, result) = - VacuumBuilder::new(table.log_store(), table.snapshot()?.snapshot.clone()) + VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone())) .with_retention_period(Duration::hours(0)) .with_dry_run(true) .with_mode(VacuumMode::Full) @@ -658,7 +654,7 @@ mod tests { // Next, vacuum with specific versions retained let (_table, result) = - VacuumBuilder::new(table.log_store(), table.snapshot()?.snapshot.clone()) + VacuumBuilder::new(table.log_store(), Some(table.snapshot()?.snapshot.clone())) .with_retention_period(Duration::hours(0)) .with_keep_versions(&versions_to_keep) .with_dry_run(true) @@ -726,7 +722,7 @@ mod tests { let (mut table, result) = VacuumBuilder::new( table.log_store(), - table.snapshot().unwrap().snapshot.clone(), + Some(table.snapshot().unwrap().snapshot.clone()), ) .with_retention_period(Duration::hours(0)) .with_keep_versions(&[2, 3]) @@ -762,7 +758,7 @@ mod tests { let result = VacuumBuilder::new( table.log_store(), - table.snapshot().unwrap().snapshot.clone(), + Some(table.snapshot().unwrap().snapshot.clone()), ) .with_retention_period(Duration::hours(1)) .with_dry_run(true) @@ -777,7 +773,7 @@ mod tests { let (table, result) = VacuumBuilder::new( table.log_store(), - table.snapshot().unwrap().snapshot.clone(), + Some(table.snapshot().unwrap().snapshot.clone()), ) .with_retention_period(Duration::hours(0)) .with_dry_run(true) @@ -791,7 +787,7 @@ mod tests { let (table, result) = VacuumBuilder::new( table.log_store(), - table.snapshot().unwrap().snapshot.clone(), + Some(table.snapshot().unwrap().snapshot.clone()), ) .with_retention_period(Duration::hours(169)) .with_dry_run(true) @@ -810,7 +806,7 @@ mod tests { let empty: Vec = Vec::new(); let (_table, result) = VacuumBuilder::new( table.log_store(), - table.snapshot().unwrap().snapshot.clone(), + Some(table.snapshot().unwrap().snapshot.clone()), ) .with_retention_period(Duration::hours(retention_hours as i64)) .with_dry_run(true) @@ -892,7 +888,7 @@ mod tests { // The recent file should NOT be deleted because it's too new let (_table, result) = VacuumBuilder::new( table.log_store(), - table.snapshot().unwrap().snapshot.clone(), + Some(table.snapshot().unwrap().snapshot.clone()), ) .with_retention_period(Duration::days(7)) .with_dry_run(true) diff --git a/crates/core/src/operations/write/mod.rs b/crates/core/src/operations/write/mod.rs index 41facb1d5b..40026c02b1 100644 --- a/crates/core/src/operations/write/mod.rs +++ b/crates/core/src/operations/write/mod.rs @@ -178,7 +178,7 @@ pub struct WriteMetrics { pub execution_time_ms: u64, } -impl super::Operation<()> for WriteBuilder { +impl super::Operation for WriteBuilder { fn log_store(&self) -> &LogStoreRef { &self.log_store } diff --git a/crates/core/src/table/state.rs b/crates/core/src/table/state.rs index 87a0f7614b..542d476458 100644 --- a/crates/core/src/table/state.rs +++ b/crates/core/src/table/state.rs @@ -346,7 +346,7 @@ impl EagerSnapshot { ARROW_HANDLER.new_expression_evaluator(input_schema, expression.into(), table_schema); let results = self - .files + .files()? .iter() .map(|file| evaluator.evaluate_arrow(file.clone())) .collect::, _>>()?; diff --git a/python/src/datafusion.rs b/python/src/datafusion.rs index 79fa6d395a..cc37b06e20 100644 --- a/python/src/datafusion.rs +++ b/python/src/datafusion.rs @@ -138,7 +138,6 @@ mod tests { // A dummy LazyBatchGenerator implementation for testing #[derive(Debug)] struct TestBatchGenerator { - schema: Arc, data: Vec, current_index: usize, } @@ -150,9 +149,8 @@ mod tests { } impl TestBatchGenerator { - fn new(schema: Arc, data: Vec) -> Self { + fn new(data: Vec) -> Self { Self { - schema, data, current_index: 0, } @@ -164,13 +162,11 @@ mod tests { let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]); let name_array = StringArray::from(vec!["Alice", "Bob", "Carol", "Dave", "Eve"]); - let batch = RecordBatch::try_new( - schema.clone(), - vec![Arc::new(id_array), Arc::new(name_array)], - ) - .unwrap(); + let batch = + RecordBatch::try_new(schema, vec![Arc::new(id_array), Arc::new(name_array)]) + .unwrap(); - Arc::new(RwLock::new(TestBatchGenerator::new(schema, vec![batch]))) + Arc::new(RwLock::new(TestBatchGenerator::new(vec![batch]))) } } diff --git a/python/src/error.rs b/python/src/error.rs index 14cc589f1c..eca8624988 100644 --- a/python/src/error.rs +++ b/python/src/error.rs @@ -1,9 +1,9 @@ use arrow_schema::ArrowError; use deltalake::datafusion::error::DataFusionError; use deltalake::{errors::DeltaTableError, ObjectStoreError}; -use pyo3::exceptions::PyRuntimeError; use pyo3::exceptions::{ - PyException, PyFileNotFoundError, PyIOError, PyNotImplementedError, PyValueError, + PyException, PyFileNotFoundError, PyIOError, PyNotImplementedError, PyRuntimeError, + PyValueError, }; use pyo3::{create_exception, PyErr}; use std::error::Error; @@ -15,6 +15,10 @@ create_exception!(_internal, DeltaProtocolError, DeltaError); create_exception!(_internal, CommitFailedError, DeltaError); create_exception!(_internal, SchemaMismatchError, DeltaError); +pub(crate) fn to_rt_err(msg: impl ToString) -> PyErr { + PyRuntimeError::new_err(msg.to_string()) +} + fn inner_to_py_err(err: DeltaTableError) -> PyErr { match err { DeltaTableError::NotATable(msg) => TableNotFoundError::new_err(msg), diff --git a/python/src/lib.rs b/python/src/lib.rs index 894973ccff..10c40a51be 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -33,23 +33,10 @@ use deltalake::kernel::{ use deltalake::lakefs::LakeFSCustomExecuteHandler; use deltalake::logstore::LogStoreRef; use deltalake::logstore::{IORuntime, ObjectStoreRef}; -use deltalake::operations::add_column::AddColumnBuilder; -use deltalake::operations::add_feature::AddTableFeatureBuilder; -use deltalake::operations::constraints::ConstraintBuilder; use deltalake::operations::convert_to_delta::{ConvertToDeltaBuilder, PartitionStrategy}; -use deltalake::operations::delete::DeleteBuilder; -use deltalake::operations::drop_constraints::DropConstraintBuilder; -use deltalake::operations::filesystem_check::FileSystemCheckBuilder; -use deltalake::operations::load_cdf::CdfLoadBuilder; -use deltalake::operations::optimize::{OptimizeBuilder, OptimizeType}; -use deltalake::operations::restore::RestoreBuilder; -use deltalake::operations::set_tbl_properties::SetTablePropertiesBuilder; -use deltalake::operations::update::UpdateBuilder; -use deltalake::operations::update_field_metadata::UpdateFieldMetadataBuilder; -use deltalake::operations::update_table_metadata::{ - TableMetadataUpdate, UpdateTableMetadataBuilder, -}; -use deltalake::operations::vacuum::{VacuumBuilder, VacuumMode}; +use deltalake::operations::optimize::OptimizeType; +use deltalake::operations::update_table_metadata::TableMetadataUpdate; +use deltalake::operations::vacuum::VacuumMode; use deltalake::operations::write::WriteBuilder; use deltalake::operations::CustomExecuteHandler; use deltalake::parquet::basic::{Compression, Encoding}; @@ -76,12 +63,11 @@ use std::future::IntoFuture; use std::str::FromStr; use std::sync::{Arc, Mutex}; use std::time; -use std::time::{SystemTime, UNIX_EPOCH}; use uuid::Uuid; use writer::maybe_lazy_cast_reader; -use crate::error::{DeltaError, DeltaProtocolError, PythonError}; +use crate::error::{to_rt_err, DeltaError, DeltaProtocolError, PythonError}; use crate::features::TableFeatures; use crate::filesystem::FsConfig; use crate::merge::PyMergeBuilder; @@ -497,15 +483,9 @@ impl RawDeltaTable { keep_versions: Option>, ) -> PyResult> { let (table, metrics) = py.allow_threads(|| { - let snapshot = match self._table.lock() { - Ok(table) => table - .snapshot() - .cloned() - .map_err(PythonError::from) - .map_err(PyErr::from), - Err(e) => Err(PyRuntimeError::new_err(e.to_string())), - }?; - let mut cmd = VacuumBuilder::new(self.log_store()?, snapshot.snapshot().clone()) + let table = self._table.lock().map_err(to_rt_err)?.clone(); + let mut cmd = DeltaOps(table) + .vacuum() .with_enforce_retention_duration(enforce_retention_duration) .with_dry_run(dry_run); @@ -560,8 +540,8 @@ impl RawDeltaTable { post_commithook_properties: Option, ) -> PyResult { let (table, metrics) = py.allow_threads(|| { - let mut cmd = UpdateBuilder::new(self.log_store()?, self.cloned_state()?) - .with_safe_cast(safe_cast); + let table = self._table.lock().map_err(to_rt_err)?.clone(); + let mut cmd = DeltaOps(table).update().with_safe_cast(safe_cast); if let Some(writer_props) = writer_properties { cmd = cmd.with_writer_properties( @@ -619,8 +599,11 @@ impl RawDeltaTable { post_commithook_properties: Option, ) -> PyResult { let (table, metrics) = py.allow_threads(|| { - let mut cmd = OptimizeBuilder::new(self.log_store()?, self.cloned_state()?) + let table = self._table.lock().map_err(to_rt_err)?.clone(); + let mut cmd = DeltaOps(table) + .optimize() .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)); + if let Some(size) = target_size { cmd = cmd.with_target_size(size); } @@ -683,10 +666,13 @@ impl RawDeltaTable { post_commithook_properties: Option, ) -> PyResult { let (table, metrics) = py.allow_threads(|| { - let mut cmd = OptimizeBuilder::new(self.log_store()?, self.cloned_state()?) + let table = self._table.lock().map_err(to_rt_err)?.clone(); + let mut cmd = DeltaOps(table.clone()) + .optimize() .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)) .with_max_spill_size(max_spill_size) .with_type(OptimizeType::ZOrder(z_order_columns)); + if let Some(size) = target_size { cmd = cmd.with_target_size(size); } @@ -732,7 +718,8 @@ impl RawDeltaTable { post_commithook_properties: Option, ) -> PyResult<()> { let table = py.allow_threads(|| { - let mut cmd = AddColumnBuilder::new(self.log_store()?, self.cloned_state()?); + let table = self._table.lock().map_err(to_rt_err)?.clone(); + let mut cmd = DeltaOps(table).add_columns(); let new_fields = fields .iter() @@ -771,7 +758,9 @@ impl RawDeltaTable { post_commithook_properties: Option, ) -> PyResult<()> { let table = py.allow_threads(|| { - let mut cmd = AddTableFeatureBuilder::new(self.log_store()?, self.cloned_state()?) + let table = self._table.lock().map_err(to_rt_err)?.clone(); + let mut cmd = DeltaOps(table) + .add_feature() .with_features(feature) .with_allow_protocol_versions_increase(allow_protocol_versions_increase); @@ -802,7 +791,8 @@ impl RawDeltaTable { post_commithook_properties: Option, ) -> PyResult<()> { let table = py.allow_threads(|| { - let mut cmd = ConstraintBuilder::new(self.log_store()?, self.cloned_state()?); + let table = self._table.lock().map_err(to_rt_err)?.clone(); + let mut cmd = DeltaOps(table).add_constraint(); for (col_name, expression) in constraints { cmd = cmd.with_constraint(col_name.clone(), expression.clone()); @@ -836,7 +826,9 @@ impl RawDeltaTable { post_commithook_properties: Option, ) -> PyResult<()> { let table = py.allow_threads(|| { - let mut cmd = DropConstraintBuilder::new(self.log_store()?, self.cloned_state()?) + let table = self._table.lock().map_err(to_rt_err)?.clone(); + let mut cmd = DeltaOps(table) + .drop_constraints() .with_constraint(name) .with_raise_if_not_exists(raise_if_not_exists); @@ -880,33 +872,34 @@ impl RawDeltaTable { allow_out_of_range: bool, ) -> PyResult { let ctx = SessionContext::new(); - let mut cdf_read = CdfLoadBuilder::new(self.log_store()?, self.cloned_state()?); + let table = self._table.lock().map_err(to_rt_err)?.clone(); + let mut cmd = DeltaOps(table).load_cdf(); if let Some(sv) = starting_version { - cdf_read = cdf_read.with_starting_version(sv); + cmd = cmd.with_starting_version(sv); } if let Some(ev) = ending_version { - cdf_read = cdf_read.with_ending_version(ev); + cmd = cmd.with_ending_version(ev); } if let Some(st) = starting_timestamp { let starting_ts: DateTime = DateTime::::from_str(&st) .map_err(|pe| PyValueError::new_err(pe.to_string()))? .to_utc(); - cdf_read = cdf_read.with_starting_timestamp(starting_ts); + cmd = cmd.with_starting_timestamp(starting_ts); } if let Some(et) = ending_timestamp { let ending_ts = DateTime::::from_str(&et) .map_err(|pe| PyValueError::new_err(pe.to_string()))? .to_utc(); - cdf_read = cdf_read.with_ending_timestamp(ending_ts); + cmd = cmd.with_ending_timestamp(ending_ts); } if allow_out_of_range { - cdf_read = cdf_read.with_allow_out_of_range(); + cmd = cmd.with_allow_out_of_range(); } let table_provider: Arc = - Arc::new(DeltaCdfTableProvider::try_new(cdf_read).map_err(PythonError::from)?); + Arc::new(DeltaCdfTableProvider::try_new(cmd).map_err(PythonError::from)?); let table_name: String = "source".to_string(); @@ -1017,7 +1010,8 @@ impl RawDeltaTable { protocol_downgrade_allowed: bool, commit_properties: Option, ) -> PyResult { - let mut cmd = RestoreBuilder::new(self.log_store()?, self.cloned_state()?); + let table = self._table.lock().map_err(to_rt_err)?.clone(); + let mut cmd = DeltaOps(table).restore(); if let Some(val) = target { if let Ok(version) = val.extract::() { cmd = cmd.with_version_to_restore(version) @@ -1509,7 +1503,8 @@ impl RawDeltaTable { post_commithook_properties: Option, ) -> PyResult { let (table, metrics) = py.allow_threads(|| { - let mut cmd = DeleteBuilder::new(self.log_store()?, self.cloned_state()?); + let table = self._table.lock().map_err(to_rt_err)?.clone(); + let mut cmd = DeltaOps(table).delete(); if let Some(predicate) = predicate { cmd = cmd.with_predicate(predicate); } @@ -1543,7 +1538,9 @@ impl RawDeltaTable { raise_if_not_exists: bool, commit_properties: Option, ) -> PyResult<()> { - let mut cmd = SetTablePropertiesBuilder::new(self.log_store()?, self.cloned_state()?) + let table = self._table.lock().map_err(to_rt_err)?.clone(); + let mut cmd = DeltaOps(table) + .set_tbl_properties() .with_properties(properties) .with_raise_if_not_exists(raise_if_not_exists); @@ -1572,8 +1569,8 @@ impl RawDeltaTable { name: Some(name), description: None, }; - let mut cmd = UpdateTableMetadataBuilder::new(self.log_store()?, self.cloned_state()?) - .with_update(update); + let table = self._table.lock().map_err(to_rt_err)?.clone(); + let mut cmd = DeltaOps(table).update_table_metadata().with_update(update); if let Some(commit_properties) = maybe_create_commit_properties(commit_properties, None) { cmd = cmd.with_commit_properties(commit_properties); @@ -1600,8 +1597,8 @@ impl RawDeltaTable { name: None, description: Some(description), }; - let mut cmd = UpdateTableMetadataBuilder::new(self.log_store()?, self.cloned_state()?) - .with_update(update); + let table = self._table.lock().map_err(to_rt_err)?.clone(); + let mut cmd = DeltaOps(table).update_table_metadata().with_update(update); if let Some(commit_properties) = maybe_create_commit_properties(commit_properties, None) { cmd = cmd.with_commit_properties(commit_properties); @@ -1627,8 +1624,8 @@ impl RawDeltaTable { commit_properties: Option, post_commithook_properties: Option, ) -> PyResult { - let mut cmd = FileSystemCheckBuilder::new(self.log_store()?, self.cloned_state()?) - .with_dry_run(dry_run); + let table = self._table.lock().map_err(to_rt_err)?.clone(); + let mut cmd = DeltaOps(table).filesystem_check().with_dry_run(dry_run); if let Some(commit_properties) = maybe_create_commit_properties(commit_properties, post_commithook_properties) @@ -1669,14 +1666,16 @@ impl RawDeltaTable { post_commithook_properties: Option, ) -> PyResult<()> { let table = py.allow_threads(|| { - let mut cmd = UpdateFieldMetadataBuilder::new(self.log_store()?, self.cloned_state()?); - - cmd = cmd.with_field_name(field_name).with_metadata( - metadata - .iter() - .map(|(k, v)| (k.clone(), MetadataValue::String(v.clone()))) - .collect(), - ); + let table = self._table.lock().map_err(to_rt_err)?.clone(); + let mut cmd = DeltaOps(table) + .update_field_metadata() + .with_field_name(field_name) + .with_metadata( + metadata + .iter() + .map(|(k, v)| (k.clone(), MetadataValue::String(v.clone()))) + .collect(), + ); if let Some(commit_properties) = maybe_create_commit_properties(commit_properties, post_commithook_properties) diff --git a/python/src/merge.rs b/python/src/merge.rs index 1a8d37ae2f..101269061c 100644 --- a/python/src/merge.rs +++ b/python/src/merge.rs @@ -79,7 +79,7 @@ impl PyMergeBuilder { ctx.read_table(table_provider).unwrap() }; - let mut cmd = MergeBuilder::new(log_store, snapshot, predicate, source_df) + let mut cmd = MergeBuilder::new(log_store, Some(snapshot), predicate, source_df) .with_safe_cast(safe_cast) .with_streaming(streamed_exec);