diff --git a/crates/core/src/kernel/arrow/extract.rs b/crates/core/src/kernel/arrow/extract.rs index 90bb2bab2c..92a563170e 100644 --- a/crates/core/src/kernel/arrow/extract.rs +++ b/crates/core/src/kernel/arrow/extract.rs @@ -2,10 +2,7 @@ use std::sync::Arc; -use arrow_array::{ - Array, ArrowNativeTypeOp, ArrowNumericType, BooleanArray, ListArray, MapArray, PrimitiveArray, - RecordBatch, StringArray, StructArray, -}; +use arrow_array::{Array, ListArray, MapArray, RecordBatch, StructArray}; use arrow_schema::{ArrowError, DataType}; use crate::{DeltaResult, DeltaTableError}; @@ -74,17 +71,6 @@ pub(crate) fn extract_column<'a>( extract_column(maparr.entries(), next_path, remaining_path_steps) } else { Ok(child) - // if maparr.entries().num_columns() != 2 { - // return Err(ArrowError::SchemaError(format!( - // "Map {path_step} has {} columns, expected 2", - // maparr.entries().num_columns() - // ))); - // } - // if next_path_step == *maparr.entries().column_names().first().unwrap() { - // Ok(maparr.entries().column(0)) - // } else { - // Ok(maparr.entries().column(1)) - // } } } DataType::List(_) => { @@ -122,41 +108,3 @@ fn cast_column_as<'a, T: Array + 'static>( "{name} is not of expected type." ))) } - -#[inline] -pub(crate) fn read_str(arr: &StringArray, idx: usize) -> DeltaResult<&str> { - read_str_opt(arr, idx).ok_or(DeltaTableError::Generic("missing value".into())) -} - -#[inline] -pub(crate) fn read_str_opt(arr: &StringArray, idx: usize) -> Option<&str> { - arr.is_valid(idx).then(|| arr.value(idx)) -} - -#[inline] -pub(crate) fn read_primitive(arr: &PrimitiveArray, idx: usize) -> DeltaResult -where - T: ArrowNumericType, - T::Native: ArrowNativeTypeOp, -{ - read_primitive_opt(arr, idx).ok_or(DeltaTableError::Generic("missing value".into())) -} - -#[inline] -pub(crate) fn read_primitive_opt(arr: &PrimitiveArray, idx: usize) -> Option -where - T: ArrowNumericType, - T::Native: ArrowNativeTypeOp, -{ - arr.is_valid(idx).then(|| arr.value(idx)) -} - -#[inline] -pub(crate) fn read_bool(arr: &BooleanArray, idx: usize) -> DeltaResult { - read_bool_opt(arr, idx).ok_or(DeltaTableError::Generic("missing value".into())) -} - -#[inline] -pub(crate) fn read_bool_opt(arr: &BooleanArray, idx: usize) -> Option { - arr.is_valid(idx).then(|| arr.value(idx)) -} diff --git a/crates/core/src/kernel/models/fields.rs b/crates/core/src/kernel/models/fields.rs index b4c198f276..6ae23cc1fc 100644 --- a/crates/core/src/kernel/models/fields.rs +++ b/crates/core/src/kernel/models/fields.rs @@ -3,26 +3,6 @@ use std::sync::LazyLock; use delta_kernel::schema::{ArrayType, DataType, MapType, StructField, StructType}; -use super::ActionType; - -impl ActionType { - /// Returns the type of the corresponding field in the delta log schema - pub(crate) fn schema_field(&self) -> &StructField { - match self { - Self::Metadata => &METADATA_FIELD, - Self::Protocol => &PROTOCOL_FIELD, - Self::CommitInfo => &COMMIT_INFO_FIELD, - Self::Add => &ADD_FIELD, - Self::Remove => &REMOVE_FIELD, - Self::Cdc => &CDC_FIELD, - Self::Txn => &TXN_FIELD, - Self::DomainMetadata => &DOMAIN_METADATA_FIELD, - Self::CheckpointMetadata => &CHECKPOINT_METADATA_FIELD, - Self::Sidecar => &SIDECAR_FIELD, - } - } -} - // https://github.com/delta-io/delta/blob/master/PROTOCOL.md#change-metadata static METADATA_FIELD: LazyLock = LazyLock::new(|| { StructField::new( @@ -210,33 +190,6 @@ static DOMAIN_METADATA_FIELD: LazyLock = LazyLock::new(|| { true, ) }); -// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#checkpoint-metadata -static CHECKPOINT_METADATA_FIELD: LazyLock = LazyLock::new(|| { - StructField::new( - "checkpointMetadata", - StructType::try_new(vec![ - StructField::new("flavor", DataType::STRING, false), - tags_field(), - ]) - .expect("Failed to construct StructType for CHECKPOINT_METADATA_FIELD"), - true, - ) -}); -// https://github.com/delta-io/delta/blob/master/PROTOCOL.md#sidecar-file-information -static SIDECAR_FIELD: LazyLock = LazyLock::new(|| { - StructField::new( - "sidecar", - StructType::try_new(vec![ - StructField::new("path", DataType::STRING, false), - StructField::new("sizeInBytes", DataType::LONG, true), - StructField::new("modificationTime", DataType::LONG, false), - StructField::new("type", DataType::STRING, false), - tags_field(), - ]) - .expect("Failed to construct StructType for SIDECAR_FIELD"), - true, - ) -}); #[allow(unused)] static LOG_SCHEMA: LazyLock = LazyLock::new(|| { diff --git a/crates/core/src/kernel/models/mod.rs b/crates/core/src/kernel/models/mod.rs index fdc422a5aa..e5bc1db1c5 100644 --- a/crates/core/src/kernel/models/mod.rs +++ b/crates/core/src/kernel/models/mod.rs @@ -12,31 +12,6 @@ pub(crate) mod fields; pub use actions::*; -#[derive(Debug, Hash, PartialEq, Eq, Clone, Serialize, Deserialize)] -/// The type of action that was performed on the table -pub enum ActionType { - /// modify the data in a table by adding individual logical files - Add, - /// add a file containing only the data that was changed as part of the transaction - Cdc, - /// additional provenance information about what higher-level operation was being performed - CommitInfo, - /// contains a configuration (string-string map) for a named metadata domain - DomainMetadata, - /// changes the current metadata of the table - Metadata, - /// increase the version of the Delta protocol that is required to read or write a given table - Protocol, - /// modify the data in a table by removing individual logical files - Remove, - /// Transactional information - Txn, - /// Checkpoint metadata - CheckpointMetadata, - /// Sidecar - Sidecar, -} - #[derive(Debug, PartialEq, Eq, Clone, Deserialize, Serialize)] #[serde(rename_all = "camelCase")] #[allow(missing_docs)] @@ -109,19 +84,3 @@ impl From for Action { Self::DomainMetadata(a) } } - -impl Action { - /// Get the action type - pub fn action_type(&self) -> ActionType { - match self { - Self::Add(_) => ActionType::Add, - Self::Remove(_) => ActionType::Remove, - Self::Cdc(_) => ActionType::Cdc, - Self::Metadata(_) => ActionType::Metadata, - Self::Protocol(_) => ActionType::Protocol, - Self::Txn(_) => ActionType::Txn, - Self::CommitInfo(_) => ActionType::CommitInfo, - Self::DomainMetadata(_) => ActionType::DomainMetadata, - } - } -} diff --git a/crates/core/src/kernel/snapshot/iterators.rs b/crates/core/src/kernel/snapshot/iterators.rs index b27bc297f2..37eba21a81 100644 --- a/crates/core/src/kernel/snapshot/iterators.rs +++ b/crates/core/src/kernel/snapshot/iterators.rs @@ -19,6 +19,10 @@ use crate::kernel::scalars::ScalarExt; use crate::kernel::{Add, DeletionVectorDescriptor, Remove}; use crate::{DeltaResult, DeltaTableError}; +pub use self::tombstones::TombstoneView; + +mod tombstones; + const FIELD_NAME_PATH: &str = "path"; const FIELD_NAME_SIZE: &str = "size"; const FIELD_NAME_MODIFICATION_TIME: &str = "modificationTime"; diff --git a/crates/core/src/kernel/snapshot/iterators/tombstones.rs b/crates/core/src/kernel/snapshot/iterators/tombstones.rs new file mode 100644 index 0000000000..448fd346e6 --- /dev/null +++ b/crates/core/src/kernel/snapshot/iterators/tombstones.rs @@ -0,0 +1,58 @@ +use std::{borrow::Cow, sync::LazyLock}; + +use arrow::{ + array::{AsArray, RecordBatch}, + datatypes::Int64Type, +}; +use delta_kernel::{actions::Remove, schema::ToSchema}; +use percent_encoding::percent_decode_str; + +use crate::kernel::snapshot::iterators::get_string_value; + +#[derive(Clone)] +pub struct TombstoneView { + data: RecordBatch, + index: usize, +} + +impl TombstoneView { + /// Creates a new view into the specified file entry. + pub(crate) fn new(data: RecordBatch, index: usize) -> Self { + Self { data, index } + } + + /// Returns the file path with URL decoding applied. + pub fn path(&self) -> Cow<'_, str> { + static FIELD_INDEX: LazyLock = + LazyLock::new(|| Remove::to_schema().field_with_index("path").unwrap().0); + let raw = get_string_value(self.data.column(*FIELD_INDEX), self.index) + .expect("valid string field"); + percent_decode_str(raw).decode_utf8_lossy() + } + + pub fn deletion_timestamp(&self) -> Option { + static FIELD_INDEX: LazyLock = LazyLock::new(|| { + Remove::to_schema() + .field_with_index("deletionTimestamp") + .unwrap() + .0 + }); + self.data + .column(*FIELD_INDEX) + .as_primitive_opt::() + .map(|a| a.value(self.index)) + } + + pub fn data_change(&self) -> bool { + static FIELD_INDEX: LazyLock = LazyLock::new(|| { + Remove::to_schema() + .field_with_index("dataChange") + .unwrap() + .0 + }); + self.data + .column(*FIELD_INDEX) + .as_boolean() + .value(self.index) + } +} diff --git a/crates/core/src/kernel/snapshot/log_data.rs b/crates/core/src/kernel/snapshot/log_data.rs index c11bc9c177..dfae669154 100644 --- a/crates/core/src/kernel/snapshot/log_data.rs +++ b/crates/core/src/kernel/snapshot/log_data.rs @@ -14,19 +14,6 @@ pub(crate) trait PartitionsExt { fn hive_partition_path(&self) -> String; } -impl PartitionsExt for IndexMap<&str, Scalar> { - fn hive_partition_path(&self) -> String { - let fields = self - .iter() - .map(|(k, v)| { - let encoded = v.serialize_encoded(); - format!("{k}={encoded}") - }) - .collect::>(); - fields.join("/") - } -} - impl PartitionsExt for IndexMap { fn hive_partition_path(&self) -> String { let fields = self diff --git a/crates/core/src/kernel/snapshot/mod.rs b/crates/core/src/kernel/snapshot/mod.rs index 4934a16939..507a4f95c6 100644 --- a/crates/core/src/kernel/snapshot/mod.rs +++ b/crates/core/src/kernel/snapshot/mod.rs @@ -18,17 +18,19 @@ use std::io::{BufRead, BufReader, Cursor}; use std::sync::{Arc, LazyLock}; -use arrow::compute::concat_batches; +use arrow::compute::{concat_batches, filter_record_batch, is_not_null}; use arrow_array::RecordBatch; +use delta_kernel::actions::{Remove, Sidecar}; use delta_kernel::engine::arrow_conversion::TryIntoArrow; use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::path::{LogPathFileType, ParsedLogPath}; use delta_kernel::scan::scan_row_schema; -use delta_kernel::schema::SchemaRef; +use delta_kernel::schema::derive_macro_utils::ToDataType; +use delta_kernel::schema::{SchemaRef, StructField, ToSchema}; use delta_kernel::snapshot::Snapshot as KernelSnapshot; use delta_kernel::table_configuration::TableConfiguration; use delta_kernel::table_properties::TableProperties; -use delta_kernel::{PredicateRef, Version}; +use delta_kernel::{EvaluationHandler, Expression, ExpressionEvaluator, PredicateRef, Version}; use futures::stream::BoxStream; use futures::{StreamExt, TryStreamExt}; use itertools::Itertools; @@ -36,12 +38,11 @@ use object_store::path::Path; use object_store::ObjectStore; use tokio::task::spawn_blocking; -use super::{Action, CommitInfo, Metadata, Protocol, Remove}; -use crate::kernel::arrow::engine_ext::{ScanExt, SnapshotExt}; -use crate::kernel::parse::read_removes; +use super::{Action, CommitInfo, Metadata, Protocol}; +use crate::kernel::arrow::engine_ext::{ExpressionEvaluatorExt, ScanExt, SnapshotExt}; #[cfg(test)] use crate::kernel::transaction::CommitData; -use crate::kernel::{ActionType, StructType}; +use crate::kernel::{StructType, ARROW_HANDLER}; use crate::logstore::{LogStore, LogStoreExt}; use crate::{to_kernel_predicate, DeltaResult, DeltaTableConfig, DeltaTableError, PartitionFilter}; @@ -51,7 +52,6 @@ pub use stream::*; mod iterators; mod log_data; -pub(crate) mod parse; pub(crate) mod replay; mod serde; mod stream; @@ -355,16 +355,29 @@ impl Snapshot { pub(crate) fn tombstones( &self, log_store: &dyn LogStore, - ) -> BoxStream<'_, DeltaResult> { + ) -> BoxStream<'_, DeltaResult> { static TOMBSTONE_SCHEMA: LazyLock> = LazyLock::new(|| { Arc::new( StructType::try_new(vec![ - ActionType::Remove.schema_field().clone(), - ActionType::Sidecar.schema_field().clone(), + StructField::nullable("remove", Remove::to_data_type()), + StructField::nullable("sidecar", Sidecar::to_data_type()), ]) .expect("Failed to create a StructType somehow"), ) }); + static TOMBSTONE_EVALUATOR: LazyLock> = LazyLock::new(|| { + let expression = Expression::struct_from( + Remove::to_schema() + .fields() + .map(|field| Expression::column(["remove", field.name()])), + ) + .into(); + ARROW_HANDLER.new_expression_evaluator( + TOMBSTONE_SCHEMA.clone(), + expression, + Remove::to_data_type(), + ) + }); // TODO: which capacity to choose let mut builder = RecordBatchReceiverStreamBuilder::new(100); @@ -400,10 +413,15 @@ impl Snapshot { builder .build() - .map(|maybe_batch| maybe_batch.and_then(|batch| read_removes(&batch))) - .map_ok(|removes| { - futures::stream::iter(removes.into_iter().map(Ok::<_, DeltaTableError>)) + .map(|maybe_batch| { + maybe_batch.and_then(|batch| { + let filtered = filter_record_batch(&batch, &is_not_null(batch.column(0))?)?; + let tombstones = TOMBSTONE_EVALUATOR.evaluate_arrow(filtered)?; + Ok((0..tombstones.num_rows()) + .map(move |idx| TombstoneView::new(tombstones.clone(), idx))) + }) }) + .map_ok(|removes| futures::stream::iter(removes.map(Ok::<_, DeltaTableError>))) .try_flatten() .boxed() } diff --git a/crates/core/src/kernel/snapshot/parse.rs b/crates/core/src/kernel/snapshot/parse.rs deleted file mode 100644 index c2f3df5eec..0000000000 --- a/crates/core/src/kernel/snapshot/parse.rs +++ /dev/null @@ -1,115 +0,0 @@ -//! Utilities for converting Arrow arrays into Delta data structures. -use arrow_array::{ - Array, BooleanArray, Int32Array, Int64Array, MapArray, StringArray, StructArray, -}; -use percent_encoding::percent_decode_str; - -use crate::kernel::arrow::extract::{self as ex, ProvidesColumnByName}; -use crate::kernel::{DeletionVectorDescriptor, Remove}; -use crate::{DeltaResult, DeltaTableError}; - -pub(super) fn read_removes(array: &dyn ProvidesColumnByName) -> DeltaResult> { - let mut result = Vec::new(); - - if let Some(arr) = ex::extract_and_cast_opt::(array, "remove") { - // Stop early if all values are null - if arr.null_count() == arr.len() { - return Ok(result); - } - - let path = ex::extract_and_cast::(arr, "path")?; - let data_change = ex::extract_and_cast::(arr, "dataChange")?; - let deletion_timestamp = ex::extract_and_cast::(arr, "deletionTimestamp")?; - - let extended_file_metadata = - ex::extract_and_cast_opt::(arr, "extendedFileMetadata"); - let pvs = ex::extract_and_cast_opt::(arr, "partitionValues"); - let size = ex::extract_and_cast_opt::(arr, "size"); - let tags = ex::extract_and_cast_opt::(arr, "tags"); - let dv = ex::extract_and_cast_opt::(arr, "deletionVector"); - - let get_dv: Box Option> = if let Some(d) = dv { - let storage_type = ex::extract_and_cast::(d, "storageType")?; - let path_or_inline_dv = ex::extract_and_cast::(d, "pathOrInlineDv")?; - let offset = ex::extract_and_cast::(d, "offset")?; - let size_in_bytes = ex::extract_and_cast::(d, "sizeInBytes")?; - let cardinality = ex::extract_and_cast::(d, "cardinality")?; - - // Column might exist but have nullability set for the whole array, so we just return Nones - if d.null_count() == d.len() { - Box::new(|_| None) - } else { - Box::new(|idx: usize| { - d.is_valid(idx) - .then(|| { - if ex::read_str(storage_type, idx).is_ok() { - Some(DeletionVectorDescriptor { - storage_type: std::str::FromStr::from_str( - ex::read_str(storage_type, idx).ok()?, - ) - .ok()?, - path_or_inline_dv: ex::read_str(path_or_inline_dv, idx) - .ok()? - .to_string(), - offset: ex::read_primitive_opt(offset, idx), - size_in_bytes: ex::read_primitive(size_in_bytes, idx).ok()?, - cardinality: ex::read_primitive(cardinality, idx).ok()?, - }) - } else { - None - } - }) - .flatten() - }) - } - } else { - Box::new(|_| None) - }; - - for i in 0..arr.len() { - if arr.is_valid(i) { - let path_ = ex::read_str(path, i)?; - let path_ = percent_decode_str(path_) - .decode_utf8() - .map_err(|_| DeltaTableError::Generic("illegal path encoding".into()))? - .to_string(); - result.push(Remove { - path: path_, - data_change: ex::read_bool(data_change, i)?, - deletion_timestamp: ex::read_primitive_opt(deletion_timestamp, i), - extended_file_metadata: extended_file_metadata - .and_then(|e| ex::read_bool_opt(e, i)), - size: size.and_then(|s| ex::read_primitive_opt(s, i)), - partition_values: pvs - .and_then(|pv| collect_map(&pv.value(i)).map(|m| m.collect())), - tags: tags.and_then(|t| collect_map(&t.value(i)).map(|m| m.collect())), - deletion_vector: get_dv(i), - base_row_id: None, - default_row_commit_version: None, - }); - } - } - } - - Ok(result) -} - -pub(crate) fn collect_map( - val: &StructArray, -) -> Option)> + '_> { - let keys = val - .column(0) - .as_ref() - .as_any() - .downcast_ref::()?; - let values = val - .column(1) - .as_ref() - .as_any() - .downcast_ref::()?; - Some( - keys.iter() - .zip(values.iter()) - .filter_map(|(k, v)| k.map(|kv| (kv.to_string(), v.map(|vv| vv.to_string())))), - ) -} diff --git a/crates/core/src/kernel/snapshot/replay.rs b/crates/core/src/kernel/snapshot/replay.rs index 9393791f39..623ba2ecbe 100644 --- a/crates/core/src/kernel/snapshot/replay.rs +++ b/crates/core/src/kernel/snapshot/replay.rs @@ -10,7 +10,6 @@ use delta_kernel::schema::DataType; use delta_kernel::schema::PrimitiveType; use tracing::log::*; -use super::parse::collect_map; use crate::kernel::arrow::extract::{self as ex}; use crate::kernel::StructType; use crate::{DeltaResult, DeltaTableError}; @@ -316,3 +315,21 @@ mod tests { Ok(()) } } + +fn collect_map(val: &StructArray) -> Option)> + '_> { + let keys = val + .column(0) + .as_ref() + .as_any() + .downcast_ref::()?; + let values = val + .column(1) + .as_ref() + .as_any() + .downcast_ref::()?; + Some( + keys.iter() + .zip(values.iter()) + .filter_map(|(k, v)| k.map(|kv| (kv.to_string(), v.map(|vv| vv.to_string())))), + ) +} diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 1c159121fc..58d0958e0a 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -193,6 +193,7 @@ pub fn crate_version() -> &'static str { #[cfg(test)] mod tests { + use futures::TryStreamExt as _; use itertools::Itertools; use super::*; @@ -222,9 +223,9 @@ mod tests { .snapshot() .unwrap() .all_tombstones(&table.log_store()) + .try_collect::>() .await - .unwrap() - .collect_vec(); + .unwrap(); assert_eq!(tombstones.len(), 4); // assert!(tombstones.contains(&crate::kernel::Remove { // path: "part-00000-512e1537-8aaa-4193-b8b4-bef3de0de409-c000.snappy.parquet".to_string(), @@ -358,22 +359,16 @@ mod tests { .snapshot() .unwrap() .all_tombstones(&table.log_store()) + .try_collect::>() .await - .unwrap() - .collect_vec(); + .unwrap(); assert_eq!(tombstones.len(), 1); - assert!(tombstones.contains(&crate::kernel::Remove { - path: "part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet".to_string(), - deletion_timestamp: Some(1615043776198), - data_change: true, - extended_file_metadata: Some(true), - partition_values: Some(HashMap::new()), - size: Some(445), - base_row_id: None, - default_row_commit_version: None, - deletion_vector: None, - tags: Some(HashMap::new()), - })); + let tombstone = tombstones.first().unwrap(); + assert_eq!( + tombstone.path(), + "part-00001-911a94a2-43f6-4acb-8620-5e68c2654989-c000.snappy.parquet" + ); + assert_eq!(tombstone.deletion_timestamp(), Some(1615043776198)); } #[tokio::test] diff --git a/crates/core/src/operations/vacuum.rs b/crates/core/src/operations/vacuum.rs index 8fc796cfbd..6a0f4efd00 100644 --- a/crates/core/src/operations/vacuum.rs +++ b/crates/core/src/operations/vacuum.rs @@ -26,7 +26,7 @@ use std::fmt::Debug; use std::sync::Arc; use chrono::{Duration, Utc}; -use futures::future::BoxFuture; +use futures::future::{ready, BoxFuture}; use futures::{StreamExt, TryStreamExt}; use object_store::Error; use object_store::{path::Path, ObjectStore}; @@ -505,18 +505,16 @@ async fn get_stale_files( store: &dyn LogStore, ) -> DeltaResult> { let tombstone_retention_timestamp = now_timestamp_millis - retention_period.num_milliseconds(); - Ok(snapshot + snapshot .all_tombstones(store) - .await? - .collect::>() - .into_iter() - .filter(|tombstone| { + .try_filter(|tombstone| { // if the file has a creation time before the `tombstone_retention_timestamp` // then it's considered as a stale file - tombstone.deletion_timestamp.unwrap_or(0) < tombstone_retention_timestamp + ready(tombstone.deletion_timestamp().unwrap_or(0) < tombstone_retention_timestamp) }) - .map(|tombstone| tombstone.path) - .collect::>()) + .map_ok(|tombstone| tombstone.path().to_string()) + .try_collect::>() + .await } #[cfg(test)] diff --git a/crates/core/src/operations/write/mod.rs b/crates/core/src/operations/write/mod.rs index 910a12ca4d..ff10a42d9a 100644 --- a/crates/core/src/operations/write/mod.rs +++ b/crates/core/src/operations/write/mod.rs @@ -71,8 +71,8 @@ use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::schema::cast::merge_arrow_schema; use crate::kernel::transaction::{CommitBuilder, CommitProperties, TableReference, PROTOCOL}; use crate::kernel::{ - new_metadata, Action, ActionType, EagerSnapshot, MetadataExt as _, ProtocolExt as _, - StructType, StructTypeExt, + new_metadata, Action, EagerSnapshot, MetadataExt as _, ProtocolExt as _, StructType, + StructTypeExt, }; use crate::logstore::LogStoreRef; use crate::protocol::{DeltaOperation, SaveMode}; @@ -669,7 +669,7 @@ impl std::future::IntoFuture for WriteBuilder { } metrics.num_removed_files = actions .iter() - .filter(|a| a.action_type() == ActionType::Remove) + .filter(|a| matches!(a, Action::Remove(_))) .count(); } diff --git a/crates/core/src/table/state.rs b/crates/core/src/table/state.rs index 1f74a1ca5f..726e07c022 100644 --- a/crates/core/src/table/state.rs +++ b/crates/core/src/table/state.rs @@ -9,7 +9,7 @@ use delta_kernel::schema::StructField; use delta_kernel::table_properties::TableProperties; use delta_kernel::{EvaluationHandler, Expression}; use futures::stream::BoxStream; -use futures::{StreamExt, TryStreamExt}; +use futures::{future::ready, StreamExt as _, TryStreamExt as _}; use object_store::path::Path; use serde::{Deserialize, Serialize}; @@ -18,8 +18,8 @@ use crate::kernel::arrow::engine_ext::{ExpressionEvaluatorExt, SnapshotExt}; #[cfg(test)] use crate::kernel::Action; use crate::kernel::{ - Add, DataType, EagerSnapshot, LogDataHandler, LogicalFileView, Metadata, Protocol, Remove, - StructType, ARROW_HANDLER, + Add, DataType, EagerSnapshot, LogDataHandler, LogicalFileView, Metadata, Protocol, StructType, + TombstoneView, ARROW_HANDLER, }; use crate::logstore::LogStore; use crate::partitions::PartitionFilter; @@ -132,34 +132,31 @@ impl DeltaTableState { } /// Full list of tombstones (remove actions) representing files removed from table state). - pub async fn all_tombstones( + pub fn all_tombstones( &self, log_store: &dyn LogStore, - ) -> DeltaResult> { - Ok(self - .snapshot - .snapshot() - .tombstones(log_store) - .try_collect::>() - .await? - .into_iter()) + ) -> BoxStream<'_, DeltaResult> { + self.snapshot.snapshot().tombstones(log_store) } /// List of unexpired tombstones (remove actions) representing files removed from table state. /// The retention period is set by `deletedFileRetentionDuration` with default value of 1 week. - pub async fn unexpired_tombstones( + #[deprecated( + since = "0.30.0", + note = "Use `all_tombstones` instead and filter by retention timestamp." + )] + pub fn unexpired_tombstones( &self, log_store: &dyn LogStore, - ) -> DeltaResult> { + ) -> BoxStream<'_, DeltaResult> { let retention_timestamp = Utc::now().timestamp_millis() - self .table_config() .deleted_file_retention_duration() .as_millis() as i64; - let tombstones = self.all_tombstones(log_store).await?.collect::>(); - Ok(tombstones - .into_iter() - .filter(move |t| t.deletion_timestamp.unwrap_or(0) > retention_timestamp)) + self.all_tombstones(log_store) + .try_filter(move |t| ready(t.deletion_timestamp().unwrap_or(0) > retention_timestamp)) + .boxed() } /// Full list of add actions representing all parquet files that are part of the current diff --git a/crates/core/src/test_utils/factories/actions.rs b/crates/core/src/test_utils/factories/actions.rs index 773606c3c6..7caaa7a679 100644 --- a/crates/core/src/test_utils/factories/actions.rs +++ b/crates/core/src/test_utils/factories/actions.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; -use arrow_array::*; +use arrow::array::AsArray as _; +use arrow::datatypes::{Int32Type, Int64Type}; use chrono::Utc; use delta_kernel::schema::{DataType, PrimitiveType}; use delta_kernel::table_features::{ReaderFeature, WriterFeature}; @@ -10,7 +11,6 @@ use object_store::ObjectMeta; use serde_json::json; use super::{get_parquet_bytes, DataFactory, FileStats}; -use crate::kernel::arrow::extract::{self as ex}; use crate::kernel::transaction::PROTOCOL; use crate::kernel::{partitions_schema, ProtocolInner}; use crate::kernel::{Add, Metadata, Protocol, Remove, StructType}; @@ -53,16 +53,21 @@ impl ActionFactory { .map(|f| { let value = match f.data_type() { DataType::Primitive(PrimitiveType::String) => { - let arr = - ex::extract_and_cast::(&batch, f.name()).unwrap(); + let arr = batch.column_by_name(f.name()).unwrap().as_string::(); Some(arr.value(0).to_string()) } DataType::Primitive(PrimitiveType::Integer) => { - let arr = ex::extract_and_cast::(&batch, f.name()).unwrap(); + let arr = batch + .column_by_name(f.name()) + .unwrap() + .as_primitive::(); Some(arr.value(0).to_string()) } DataType::Primitive(PrimitiveType::Long) => { - let arr = ex::extract_and_cast::(&batch, f.name()).unwrap(); + let arr = batch + .column_by_name(f.name()) + .unwrap() + .as_primitive::(); Some(arr.value(0).to_string()) } _ => unimplemented!(), diff --git a/crates/core/tests/checkpoint_writer.rs b/crates/core/tests/checkpoint_writer.rs index 06753aff8c..e3faef5a46 100644 --- a/crates/core/tests/checkpoint_writer.rs +++ b/crates/core/tests/checkpoint_writer.rs @@ -432,6 +432,7 @@ mod checkpoints_with_tombstones { use deltalake_core::kernel::*; use deltalake_core::table::config::TableProperty; use deltalake_core::*; + use futures::TryStreamExt as _; use pretty_assertions::assert_eq; use std::collections::{HashMap, HashSet}; @@ -481,10 +482,14 @@ mod checkpoints_with_tombstones { .snapshot() .unwrap() .all_tombstones(&table.log_store()) + .map_ok(|t| t.path().to_string()) + .try_collect::>() .await - .unwrap() - .collect::>(), + .unwrap(), removes1 + .iter() + .map(|t| t.path.clone()) + .collect::>() ); checkpoints::create_checkpoint(&table, None).await.unwrap(); @@ -502,9 +507,11 @@ mod checkpoints_with_tombstones { .snapshot() .unwrap() .all_tombstones(&table.log_store()) + .map_ok(|t| t.path().to_string()) + .try_collect::>() .await .unwrap() - .count(), + .len(), 0 ); // stale removes are deleted from the state } diff --git a/crates/core/tests/command_filesystem_check.rs b/crates/core/tests/command_filesystem_check.rs index caa107d622..8f96797e84 100644 --- a/crates/core/tests/command_filesystem_check.rs +++ b/crates/core/tests/command_filesystem_check.rs @@ -1,8 +1,9 @@ -use std::collections::HashSet; +use std::collections::HashMap; use deltalake_core::Path; use deltalake_core::{errors::DeltaTableError, DeltaOps}; use deltalake_test::utils::*; +use futures::TryStreamExt as _; use serial_test::serial; #[tokio::test] @@ -42,10 +43,11 @@ async fn test_filesystem_check(context: &IntegrationContext) -> TestResult { let remove = table .snapshot()? .all_tombstones(&table.log_store()) - .await? - .collect::>(); + .map_ok(|t| (t.path().to_string(), t)) + .try_collect::>() + .await?; let remove = remove.get(file).unwrap(); - assert!(remove.data_change); + assert!(remove.data_change()); // An additional run should return an empty list of orphaned actions let op = DeltaOps::from(table); @@ -89,10 +91,11 @@ async fn test_filesystem_check_partitioned() -> TestResult { let remove = table .snapshot()? .all_tombstones(&table.log_store()) - .await? - .collect::>(); + .map_ok(|t| (t.path().to_string(), t)) + .try_collect::>() + .await?; let remove = remove.get(file).unwrap(); - assert!(remove.data_change); + assert!(remove.data_change()); Ok(()) } diff --git a/crates/core/tests/command_restore.rs b/crates/core/tests/command_restore.rs index cbcaae864d..0878885dcc 100644 --- a/crates/core/tests/command_restore.rs +++ b/crates/core/tests/command_restore.rs @@ -6,6 +6,7 @@ use deltalake_core::kernel::{DataType, PrimitiveType, StructField}; use deltalake_core::logstore::commit_uri_from_version; use deltalake_core::protocol::SaveMode; use deltalake_core::{ensure_table_uri, DeltaOps, DeltaTable}; +use futures::TryStreamExt; use itertools::Itertools; use rand::Rng; use std::error::Error; @@ -185,9 +186,10 @@ async fn test_restore_file_missing() -> Result<(), Box> { .table .snapshot()? .all_tombstones(&context.table.log_store()) + .try_collect::>() .await? { - let p = tmp_dir.path().join(file.clone().path); + let p = tmp_dir.path().join(file.path().to_string()); fs::remove_file(p).unwrap(); } @@ -214,9 +216,10 @@ async fn test_restore_allow_file_missing() -> Result<(), Box> { .table .snapshot()? .all_tombstones(&context.table.log_store()) + .try_collect::>() .await? { - let p = tmp_dir.path().join(file.clone().path); + let p = tmp_dir.path().join(file.path().to_string()); fs::remove_file(p).unwrap(); } diff --git a/crates/core/tests/integration.rs b/crates/core/tests/integration.rs index 4036ee8c12..2cc4b3f5ed 100644 --- a/crates/core/tests/integration.rs +++ b/crates/core/tests/integration.rs @@ -1,6 +1,7 @@ use deltalake_test::read::read_table_paths; use deltalake_test::utils::*; use deltalake_test::{test_concurrent_writes, test_read_tables}; +use futures::TryStreamExt as _; use object_store::path::Path; use serial_test::serial; @@ -71,10 +72,10 @@ async fn test_action_reconciliation() { .snapshot() .unwrap() .all_tombstones(&table.log_store()) + .map_ok(|r| r.path().to_string()) + .try_collect::>() .await - .unwrap() - .map(|r| r.path.clone()) - .collect::>(), + .unwrap(), vec![a.path.clone()] ); } diff --git a/crates/test/src/read.rs b/crates/test/src/read.rs index 88766d4df2..990f124af1 100644 --- a/crates/test/src/read.rs +++ b/crates/test/src/read.rs @@ -1,4 +1,5 @@ use deltalake_core::DeltaTableBuilder; +use futures::TryStreamExt as _; use object_store::path::Path; use crate::utils::{IntegrationContext, TestResult, TestTables}; @@ -56,21 +57,21 @@ async fn read_simple_table(integration: &IntegrationContext) -> TestResult { ); let tombstones = snapshot .all_tombstones(&table.log_store()) - .await? - .collect::>(); + .try_collect::>() + .await?; assert_eq!(tombstones.len(), 31); - assert!(tombstones.contains(&deltalake_core::kernel::Remove { - path: "part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet".to_string(), - deletion_timestamp: Some(1587968596250), - data_change: true, - extended_file_metadata: None, - deletion_vector: None, - base_row_id: None, - default_row_commit_version: None, - size: None, - partition_values: Some(Default::default()), - tags: Some(Default::default()), - })); + let paths = tombstones + .iter() + .map(|tombstone| tombstone.path().to_string()) + .collect::>(); + assert!(paths.contains( + &"part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet".to_string() + )); + let deletion_ts = tombstones + .iter() + .map(|tombstone| tombstone.deletion_timestamp()) + .collect::>(); + assert!(deletion_ts.contains(&Some(1587968596250))); Ok(()) } @@ -101,21 +102,21 @@ async fn read_simple_table_with_version(integration: &IntegrationContext) -> Tes ); let tombstones = snapshot .all_tombstones(&table.log_store()) - .await? - .collect::>(); + .try_collect::>() + .await?; assert_eq!(tombstones.len(), 29); - assert!(tombstones.contains(&deltalake_core::kernel::Remove { - path: "part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet".to_string(), - deletion_timestamp: Some(1587968596250), - data_change: true, - tags: Some(Default::default()), - partition_values: Some(Default::default()), - base_row_id: None, - default_row_commit_version: None, - size: None, - deletion_vector: None, - extended_file_metadata: None, - })); + let paths = tombstones + .iter() + .map(|tombstone| tombstone.path().to_string()) + .collect::>(); + assert!(paths.contains( + &"part-00006-63ce9deb-bc0f-482d-b9a1-7e717b67f294-c000.snappy.parquet".to_string() + )); + let deletion_ts = tombstones + .iter() + .map(|tombstone| tombstone.deletion_timestamp()) + .collect::>(); + assert!(deletion_ts.contains(&Some(1587968596250))); Ok(()) } diff --git a/python/tests/test_vacuum.py b/python/tests/test_vacuum.py index 074967bb52..af4a7e5d65 100644 --- a/python/tests/test_vacuum.py +++ b/python/tests/test_vacuum.py @@ -146,3 +146,24 @@ def test_vacuum_keep_versions(): "part-00003-53f42606-6cda-4f13-8d07-599a21197296-c000.snappy.parquet", "part-00006-46f2ff20-eb5d-4dda-8498-7bfb2940713b-c000.snappy.parquet", } + + +# https://github.com/delta-io/delta-rs/issues/3745 +@pytest.mark.pyarrow +def test_issue_3745(tmp_path: pathlib.Path): + import pyarrow as pa + + data = pa.Table.from_pydict( + { + "x": pa.array(list(range(100)), type=pa.int32()), + } + ) + write_deltalake(table_or_uri=tmp_path, data=data, mode="append") + + table = DeltaTable(tmp_path) + table.create_checkpoint() + + write_deltalake(table_or_uri=tmp_path, data=data, mode="append") + + table = DeltaTable(tmp_path) + table.vacuum()