diff --git a/ffi/src/transaction/mod.rs b/ffi/src/transaction/mod.rs index 7a22d835d..c8e710fc3 100644 --- a/ffi/src/transaction/mod.rs +++ b/ffi/src/transaction/mod.rs @@ -94,6 +94,18 @@ pub unsafe extern "C" fn add_files( txn.add_files(write_metadata); } +/// +/// Mark the transaction as having data changes or not (these are recorded at the file level). +/// +/// # Safety +/// +/// Caller is responsible for passing a valid handle. +#[no_mangle] +pub unsafe extern "C" fn set_data_change(mut txn: Handle, data_change: bool) { + let underlying_txn = unsafe { txn.as_mut() }; + underlying_txn.set_data_change(data_change); +} + /// Attempt to commit a transaction to the table. Returns version number if successful. /// Returns error if the commit fails. /// @@ -136,11 +148,11 @@ mod tests { use delta_kernel::arrow::ffi::to_ffi; use delta_kernel::arrow::json::reader::ReaderBuilder; use delta_kernel::arrow::record_batch::RecordBatch; - use delta_kernel::engine::arrow_conversion::TryIntoArrow as _; + + use delta_kernel::engine::arrow_conversion::TryIntoArrow; use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::parquet::arrow::arrow_writer::ArrowWriter; use delta_kernel::parquet::file::properties::WriterProperties; - use delta_kernel::transaction::add_files_schema; use delta_kernel_ffi::engine_data::get_engine_data; use delta_kernel_ffi::engine_data::ArrowFFIData; @@ -194,25 +206,25 @@ mod tests { fn create_file_metadata( path: &str, num_rows: i64, + metadata_schema: ArrowSchema, ) -> Result> { - let schema: ArrowSchema = add_files_schema().as_ref().try_into_arrow()?; - let current_time: i64 = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_millis() as i64; let file_metadata = format!( - r#"{{"path":"{path}", "partitionValues": {{}}, "size": {num_rows}, "modificationTime": {current_time}, "dataChange": true, "stats": {{"numRecords": {num_rows}}}}}"#, + r#"{{"path":"{path}", "partitionValues": {{}}, "size": {num_rows}, "modificationTime": {current_time}, "stats": {{"numRecords": {num_rows}}}}}"#, ); - create_arrow_ffi_from_json(schema, file_metadata.as_str()) + create_arrow_ffi_from_json(metadata_schema, file_metadata.as_str()) } fn write_parquet_file( delta_path: &str, file_path: &str, batch: &RecordBatch, + metadata_schema: ArrowSchema, ) -> Result> { // WriterProperties can be used to set Parquet file options let props = WriterProperties::builder().build(); @@ -226,7 +238,7 @@ mod tests { // writer must be closed to write footer let res = writer.close().unwrap(); - create_file_metadata(file_path, res.num_rows) + create_file_metadata(file_path, res.num_rows, metadata_schema) } #[tokio::test] @@ -260,6 +272,7 @@ mod tests { let txn = ok_or_panic(unsafe { transaction(kernel_string_slice!(table_path_str), engine.shallow_copy()) }); + unsafe { set_data_change(txn.shallow_copy(), false) }; // Add engine info let engine_info = "default_engine"; @@ -313,8 +326,18 @@ mod tests { ), ]) .unwrap(); - - let file_info = write_parquet_file(table_path_str, "my_file.parquet", &batch)?; + let parquet_schema = unsafe { + txn_with_engine_info + .shallow_copy() + .as_ref() + .add_files_schema() + }; + let file_info = write_parquet_file( + table_path_str, + "my_file.parquet", + &batch, + parquet_schema.as_ref().try_into_arrow()?, + )?; let file_info_engine_data = ok_or_panic(unsafe { get_engine_data( @@ -365,7 +388,7 @@ mod tests { "partitionValues": {}, "size": 0, "modificationTime": 0, - "dataChange": true, + "dataChange": false, "stats": "{\"numRecords\":5}" } }), diff --git a/kernel/examples/write-table/src/main.rs b/kernel/examples/write-table/src/main.rs index 8e6075ce8..d721393e7 100644 --- a/kernel/examples/write-table/src/main.rs +++ b/kernel/examples/write-table/src/main.rs @@ -91,12 +91,13 @@ async fn try_main() -> DeltaResult<()> { let mut txn = snapshot .transaction()? .with_operation("INSERT".to_string()) - .with_engine_info("default_engine/write-table-example"); + .with_engine_info("default_engine/write-table-example") + .with_data_change(true); // Write the data using the engine let write_context = Arc::new(txn.get_write_context()); let file_metadata = engine - .write_parquet(&sample_data, write_context.as_ref(), HashMap::new(), true) + .write_parquet(&sample_data, write_context.as_ref(), HashMap::new()) .await?; // Add the file metadata to the transaction diff --git a/kernel/src/engine/default/mod.rs b/kernel/src/engine/default/mod.rs index 051a6a32c..73331fdf9 100644 --- a/kernel/src/engine/default/mod.rs +++ b/kernel/src/engine/default/mod.rs @@ -98,7 +98,6 @@ impl DefaultEngine { data: &ArrowEngineData, write_context: &WriteContext, partition_values: HashMap, - data_change: bool, ) -> DeltaResult> { let transform = write_context.logical_to_physical(); let input_schema = Schema::try_from_arrow(data.record_batch().schema())?; @@ -110,12 +109,7 @@ impl DefaultEngine { ); let physical_data = logical_to_physical_expr.evaluate(data)?; self.parquet - .write_parquet_file( - write_context.target_dir(), - physical_data, - partition_values, - data_change, - ) + .write_parquet_file(write_context.target_dir(), physical_data, partition_values) .await } } diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index fa77b3294..ac63877ec 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -5,7 +5,7 @@ use std::ops::Range; use std::sync::Arc; use crate::arrow::array::builder::{MapBuilder, MapFieldNames, StringBuilder}; -use crate::arrow::array::{BooleanArray, Int64Array, RecordBatch, StringArray, StructArray}; +use crate::arrow::array::{Int64Array, RecordBatch, StringArray, StructArray}; use crate::arrow::datatypes::{DataType, Field}; use crate::parquet::arrow::arrow_reader::{ ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, @@ -28,7 +28,6 @@ use crate::engine::arrow_utils::{ use crate::engine::default::executor::TaskExecutor; use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping; use crate::schema::SchemaRef; -use crate::transaction::add_files_schema; use crate::{ DeltaResult, EngineData, Error, FileDataReadResultIterator, FileMeta, ParquetHandler, PredicateRef, @@ -67,7 +66,6 @@ impl DataFileMetadata { fn as_record_batch( &self, partition_values: &HashMap, - data_change: bool, ) -> DeltaResult> { let DataFileMetadata { file_meta: @@ -99,7 +97,6 @@ impl DataFileMetadata { .try_into() .map_err(|_| Error::generic("Failed to convert parquet metadata 'size' to i64"))?; let size = Arc::new(Int64Array::from(vec![size])); - let data_change = Arc::new(BooleanArray::from(vec![data_change])); let modification_time = Arc::new(Int64Array::from(vec![*last_modified])); let stats = Arc::new(StructArray::try_new_with_length( vec![Field::new("numRecords", DataType::Int64, true)].into(), @@ -109,15 +106,12 @@ impl DataFileMetadata { )?); Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new( - Arc::new(add_files_schema().as_ref().try_into_arrow()?), - vec![ - path, - partitions, - size, - modification_time, - data_change, - stats, - ], + Arc::new( + crate::transaction::BASE_ADD_FILES_SCHEMA + .as_ref() + .try_into_arrow()?, + ), + vec![path, partitions, size, modification_time, stats], )?))) } } @@ -192,16 +186,18 @@ impl DefaultParquetHandler { /// metadata as an EngineData batch which matches the [add file metadata] schema (where `` /// is a generated UUIDv4). /// - /// [add file metadata]: crate::transaction::add_files_schema + /// Note that the schema does not contain the dataChange column. In order to set `data_change` flag, + /// use [`crate::transaction::Transaction::with_data_change`]. + /// + /// [add file metadata]: crate::transaction::Transaction::add_files_schema pub async fn write_parquet_file( &self, path: &url::Url, data: Box, partition_values: HashMap, - data_change: bool, ) -> DeltaResult> { let parquet_metadata = self.write_parquet(path, data).await?; - parquet_metadata.as_record_batch(&partition_values, data_change) + parquet_metadata.as_record_batch(&partition_values) } } @@ -504,14 +500,13 @@ mod tests { let file_metadata = FileMeta::new(location.clone(), last_modified, size); let data_file_metadata = DataFileMetadata::new(file_metadata, num_records); let partition_values = HashMap::from([("partition1".to_string(), "a".to_string())]); - let data_change = true; let actual = data_file_metadata - .as_record_batch(&partition_values, data_change) + .as_record_batch(&partition_values) .unwrap(); let actual = ArrowEngineData::try_from_engine_data(actual).unwrap(); let schema = Arc::new( - crate::transaction::add_files_schema() + crate::transaction::BASE_ADD_FILES_SCHEMA .as_ref() .try_into_arrow() .unwrap(), @@ -544,7 +539,6 @@ mod tests { Arc::new(partition_values), Arc::new(Int64Array::from(vec![size as i64])), Arc::new(Int64Array::from(vec![last_modified])), - Arc::new(BooleanArray::from(vec![data_change])), Arc::new(stats_struct), ], ) diff --git a/kernel/src/row_tracking.rs b/kernel/src/row_tracking.rs index eea1ca264..d4f56b3dd 100644 --- a/kernel/src/row_tracking.rs +++ b/kernel/src/row_tracking.rs @@ -6,7 +6,6 @@ use crate::actions::domain_metadata::domain_metadata_configuration; use crate::actions::DomainMetadata; use crate::engine_data::{GetData, RowVisitor, TypedGetData as _}; use crate::schema::{ColumnName, ColumnNamesAndTypes, DataType}; -use crate::transaction::add_files_schema; use crate::utils::require; use crate::{DeltaResult, Engine, Error, Snapshot}; @@ -70,7 +69,8 @@ impl TryFrom for DomainMetadata { /// A row visitor that iterates over preliminary [`Add`] actions as returned by the engine and /// computes a base row ID for each action. -/// It expects to visit engine data conforming to the schema returned by [`add_files_schema()`]. +/// It expects to visit engine data with a nested field 'stats.numRecords' which is +/// part of a Delta add action. /// /// This visitor is only required for the row tracking write path. The read path will be completely /// implemented via expressions. @@ -88,11 +88,6 @@ impl RowTrackingVisitor { /// Default value for an absent high water mark const DEFAULT_HIGH_WATER_MARK: i64 = -1; - /// Field index of "numRecords" in the [`add_files_schema()`] - /// - /// We verify this hard-coded index in a test. - const NUM_RECORDS_FIELD_INDEX: usize = 5; - pub(crate) fn new(row_id_high_water_mark: Option, num_batches: Option) -> Self { // A table might not have a row ID high water mark yet, so we model the input as an Option Self { @@ -104,14 +99,19 @@ impl RowTrackingVisitor { impl RowVisitor for RowTrackingVisitor { fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) { - static NAMES_AND_TYPES: LazyLock = - LazyLock::new(|| add_files_schema().leaves(None)); + static NAMES_AND_TYPES: LazyLock = LazyLock::new(|| { + ( + vec![ColumnName::new(["stats", "numRecords"])], + vec![DataType::LONG], + ) + .into() + }); NAMES_AND_TYPES.as_ref() } fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> { require!( - getters.len() == add_files_schema().num_fields(), + getters.len() == 1, Error::generic(format!( "Wrong number of RowTrackingVisitor getters: {}", getters.len() @@ -123,14 +123,12 @@ impl RowVisitor for RowTrackingVisitor { let mut current_hwm = self.row_id_high_water_mark; for i in 0..row_count { - let num_records: i64 = getters[Self::NUM_RECORDS_FIELD_INDEX] - .get_opt(i, "numRecords")? - .ok_or_else(|| { - Error::InternalError( - "numRecords must be present in Add actions when row tracking is enabled." - .to_string(), - ) - })?; + let num_records: i64 = getters[0].get_opt(i, "numRecords")?.ok_or_else(|| { + Error::InternalError( + "numRecords must be present in Add actions when row tracking is enabled." + .to_string(), + ) + })?; batch_base_row_ids.push(current_hwm + 1); current_hwm += num_records; } @@ -168,46 +166,15 @@ mod tests { } } - fn create_getters<'a>( - num_records_mock: &'a MockGetData, - unit_mock: &'a (), - ) -> Vec<&'a dyn GetData<'a>> { - let schema = add_files_schema(); - let mut getters: Vec<&'a dyn GetData<'a>> = Vec::new(); - - for i in 0..schema.num_fields() { - if i == RowTrackingVisitor::NUM_RECORDS_FIELD_INDEX { - getters.push(num_records_mock); - } else { - getters.push(unit_mock); - } - } - getters - } - - #[test] - fn test_num_records_field_index() { - // Verify that the correct numRecords field index is hard-coded in the RowTrackingVisitor - let num_records_field_index = add_files_schema() - .leaves(None) - .as_ref() - .0 - .iter() - .position(|name| name.path().last() == Some(&"numRecords".to_string())) - .expect("numRecords field not found"); - - assert_eq!( - num_records_field_index, - RowTrackingVisitor::NUM_RECORDS_FIELD_INDEX - ); + fn create_getters<'a>(num_records_mock: &'a MockGetData) -> Vec<&'a dyn GetData<'a>> { + vec![num_records_mock] } #[test] fn test_visit_basic_functionality() -> DeltaResult<()> { let mut visitor = RowTrackingVisitor::new(None, Some(1)); let num_records_mock = MockGetData::new(vec![Some(10), Some(5), Some(20)]); - let unit_mock = (); - let getters = create_getters(&num_records_mock, &unit_mock); + let getters = create_getters(&num_records_mock); visitor.visit(3, &getters)?; @@ -225,8 +192,7 @@ mod tests { fn test_visit_with_negative_high_water_mark() -> DeltaResult<()> { let mut visitor = RowTrackingVisitor::new(Some(-5), Some(1)); let num_records_mock = MockGetData::new(vec![Some(3), Some(2)]); - let unit_mock = (); - let getters = create_getters(&num_records_mock, &unit_mock); + let getters = create_getters(&num_records_mock); visitor.visit(2, &getters)?; @@ -244,8 +210,7 @@ mod tests { fn test_visit_with_zero_records() -> DeltaResult<()> { let mut visitor = RowTrackingVisitor::new(Some(10), Some(1)); let num_records_mock = MockGetData::new(vec![Some(0), Some(0), Some(5)]); - let unit_mock = (); - let getters = create_getters(&num_records_mock, &unit_mock); + let getters = create_getters(&num_records_mock); visitor.visit(3, &getters)?; @@ -263,8 +228,7 @@ mod tests { fn test_visit_empty_batch() -> DeltaResult<()> { let mut visitor = RowTrackingVisitor::new(Some(42), None); let num_records_mock = MockGetData::new(vec![]); - let unit_mock = (); - let getters = create_getters(&num_records_mock, &unit_mock); + let getters = create_getters(&num_records_mock); visitor.visit(0, &getters)?; @@ -279,16 +243,15 @@ mod tests { #[test] fn test_visit_multiple_batches() -> DeltaResult<()> { let mut visitor = RowTrackingVisitor::new(Some(0), Some(2)); - let unit_mock = (); // First batch let num_records_mock1 = MockGetData::new(vec![Some(10), Some(5)]); - let getters1 = create_getters(&num_records_mock1, &unit_mock); + let getters1 = create_getters(&num_records_mock1); visitor.visit(2, &getters1)?; // Second batch let num_records_mock2 = MockGetData::new(vec![Some(3), Some(7), Some(2)]); - let getters2 = create_getters(&num_records_mock2, &unit_mock); + let getters2 = create_getters(&num_records_mock2); visitor.visit(3, &getters2)?; // Check that we have two batches @@ -309,8 +272,7 @@ mod tests { #[test] fn test_visit_wrong_getter_count() -> DeltaResult<()> { let mut visitor = RowTrackingVisitor::new(Some(0), None); - let unit_mock = (); - let wrong_getters: Vec<&dyn GetData<'_>> = vec![&unit_mock]; // Only one getter instead of expected count + let wrong_getters: Vec<&dyn GetData<'_>> = vec![]; // No getters instead of expected count let result = visitor.visit(1, &wrong_getters); assert_result_error_with_message(result, "Wrong number of RowTrackingVisitor getters"); @@ -322,8 +284,7 @@ mod tests { fn test_visit_missing_num_records() -> DeltaResult<()> { let mut visitor = RowTrackingVisitor::new(Some(0), None); let num_records_mock = MockGetData::new(vec![None]); // Missing numRecords - let unit_mock = (); - let getters = create_getters(&num_records_mock, &unit_mock); + let getters = create_getters(&num_records_mock); let result = visitor.visit(1, &getters); assert_result_error_with_message( @@ -339,10 +300,8 @@ mod tests { let visitor = RowTrackingVisitor::new(Some(0), None); let (names, types) = visitor.selected_column_names_and_types(); - // Should return the same as add_files_schema().leaves(None) - let expected = add_files_schema().leaves(None); - assert_eq!(names, expected.as_ref().0); - assert_eq!(types, expected.as_ref().1); + assert_eq!(names, (vec![ColumnName::new(["stats", "numRecords"])])); + assert_eq!(types, vec![DataType::LONG]); } #[test] diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 79aa658eb..a3c039718 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -21,12 +21,13 @@ use crate::{ DataType, DeltaResult, Engine, EngineData, Expression, ExpressionRef, IntoEngineData, RowVisitor, Version, }; +use delta_kernel_derive::internal_api; /// Type alias for an iterator of [`EngineData`] results. type EngineDataResultIterator<'a> = Box>> + Send + 'a>; -/// The minimal (i.e., mandatory) fields in an add action. +/// The static instance referenced by [`add_files_schema`] that doesn't contain the dataChange column. pub(crate) static MANDATORY_ADD_FILE_SCHEMA: LazyLock = LazyLock::new(|| { Arc::new(StructType::new_unchecked(vec![ StructField::not_null("path", DataType::STRING), @@ -36,17 +37,20 @@ pub(crate) static MANDATORY_ADD_FILE_SCHEMA: LazyLock = LazyLock::new ), StructField::not_null("size", DataType::LONG), StructField::not_null("modificationTime", DataType::LONG), - StructField::not_null("dataChange", DataType::BOOLEAN), ])) }); /// Returns a reference to the mandatory fields in an add action. +/// +/// Note this does not include "dataChange" which is a required field but +/// but should be set on the transactoin level. Getting the full schema +/// can be done with [`Transaction::add_files_schema`]. pub(crate) fn mandatory_add_file_schema() -> &'static SchemaRef { &MANDATORY_ADD_FILE_SCHEMA } /// The static instance referenced by [`add_files_schema`]. -pub(crate) static ADD_FILES_SCHEMA: LazyLock = LazyLock::new(|| { +pub(crate) static BASE_ADD_FILES_SCHEMA: LazyLock = LazyLock::new(|| { let stats = StructField::nullable( "stats", DataType::struct_type_unchecked(vec![StructField::nullable("numRecords", DataType::LONG)]), @@ -57,22 +61,20 @@ pub(crate) static ADD_FILES_SCHEMA: LazyLock = LazyLock::new(|| { )) }); -/// The schema that the [`Engine`]'s [`ParquetHandler`] is expected to use when reporting information about -/// a Parquet write operation back to Kernel. -/// -/// Concretely, it is the expected schema for [`EngineData`] passed to [`add_files`], as it is the base -/// for constructing an add_file (and soon remove_file) action. Each row represents metadata about a -/// file to be added to the table. Kernel takes this information and extends it to the full add_file -/// action schema, adding additional fields (e.g., baseRowID) as necessary. -/// -/// For now, Kernel only supports the number of records as a file statistic. -/// This will change in a future release. -/// -/// [`add_files`]: crate::transaction::Transaction::add_files -/// [`ParquetHandler`]: crate::ParquetHandler -pub fn add_files_schema() -> &'static SchemaRef { - &ADD_FILES_SCHEMA -} +static DATA_CHANGE_COLUMN: LazyLock = + LazyLock::new(|| StructField::not_null("dataChange", DataType::BOOLEAN)); + +/// The static instance referenced by [`add_files_schema`] that contains the dataChange column. +static ADD_FILES_SCHEMA_WITH_DATA_CHANGE: LazyLock = LazyLock::new(|| { + let mut fields = BASE_ADD_FILES_SCHEMA.fields().collect::>(); + let len = fields.len(); + let insert_position = fields + .iter() + .position(|f| f.name() == "modificationTime") + .unwrap_or(len); + fields.insert(insert_position + 1, &DATA_CHANGE_COLUMN); + Arc::new(StructType::new_unchecked(fields.into_iter().cloned())) +}); // NOTE: The following two methods are a workaround for the fact that we do not have a proper SchemaBuilder yet. // See https://github.com/delta-io/delta-kernel-rs/issues/1284 @@ -129,6 +131,8 @@ pub struct Transaction { // keep all timestamps within the same commit consistent. commit_timestamp: i64, domain_metadatas: Vec, + // Whether this transaction contains any logical data changes. + data_change: bool, } impl std::fmt::Debug for Transaction { @@ -166,6 +170,7 @@ impl Transaction { set_transactions: vec![], commit_timestamp, domain_metadatas: vec![], + data_change: true, }) } @@ -256,6 +261,28 @@ impl Transaction { } } + /// Set the data change flag. + /// + /// True indicates this commit is a "data changing" commit. False indicates table data was + /// reorganized but not materially modified. + /// + /// Data change might be set to false in the following scenarios: + /// 1. Operations that only change metadata (e.g. backfilling statistics) + /// 2. Operations that make no logical changes to the contents of the table (i.e. rows are only moved + /// from old files to new ones. OPTIMIZE commands is one example of this type of optimizaton). + pub fn with_data_change(mut self, data_change: bool) -> Self { + self.data_change = data_change; + self + } + + /// Same as [`Transaction::with_data_change`] but set the value directly instead of + /// using a fluent API. + #[internal_api] + #[allow(dead_code)] // used in FFI + pub(crate) fn set_data_change(&mut self, data_change: bool) { + self.data_change = data_change; + } + /// Set the operation that this transaction is performing. This string will be persisted in the /// commit and visible to anyone who describes the table history. pub fn with_operation(mut self, operation: String) -> Self { @@ -340,6 +367,26 @@ impl Transaction { .map(|dm| dm.into_engine_data(get_log_domain_metadata_schema().clone(), engine))) } + /// The schema that the [`Engine`]'s [`ParquetHandler`] is expected to use when reporting information about + /// a Parquet write operation back to Kernel. + /// + /// Concretely, it is the expected schema for [`EngineData`] passed to [`add_files`], as it is the base + /// for constructing an add_file. Each row represents metadata about a + /// file to be added to the table. Kernel takes this information and extends it to the full add_file + /// action schema, adding internal fields (e.g., baseRowID) as necessary. + /// + /// For now, Kernel only supports the number of records as a file statistic. + /// This will change in a future release. + /// + /// Note: While currently static, in the future the schema might change depending on + /// options set on the transaction or features enabled on the table. + /// + /// [`add_files`]: crate::transaction::Transaction::add_files + /// [`ParquetHandler`]: crate::ParquetHandler + pub fn add_files_schema(&self) -> &'static SchemaRef { + &BASE_ADD_FILES_SCHEMA + } + // Generate the logical-to-physical transform expression which must be evaluated on every data // chunk before writing. At the moment, this is a transaction-wide expression. fn generate_logical_to_physical(&self) -> Expression { @@ -378,7 +425,7 @@ impl Transaction { /// add/append/insert data (files) to the table. Note that this API can be called multiple times /// to add multiple batches. /// - /// The expected schema for `add_metadata` is given by [`add_files_schema`]. + /// The expected schema for `add_metadata` is given by [`Transaction::add_files_schema`]. pub fn add_files(&mut self, add_metadata: Box) { self.add_files_metadata.push(add_metadata); } @@ -397,6 +444,7 @@ impl Transaction { add_files_metadata: I, input_schema: SchemaRef, output_schema: SchemaRef, + data_change: bool, ) -> impl Iterator>> + 'a where I: Iterator> + Send + 'a, @@ -406,16 +454,22 @@ impl Transaction { add_files_metadata.map(move |add_files_batch| { // Convert stats to a JSON string and nest the add action in a top-level struct - let adds_expr = Expression::struct_from([Expression::transform( - Transform::new_top_level().with_replaced_field( - "stats", - Expression::unary(ToJson, Expression::column(["stats"])).into(), - ), - )]); + let transform = Expression::transform( + Transform::new_top_level() + .with_inserted_field( + Some("modificationTime"), + Expression::literal(data_change).into(), + ) + .with_replaced_field( + "stats", + Expression::unary(ToJson, Expression::column(["stats"])).into(), + ), + ); + let adds_expr = Expression::struct_from([transform]); let adds_evaluator = evaluation_handler.new_expression_evaluator( input_schema.clone(), Arc::new(adds_expr), - output_schema.clone().into(), + as_log_add_schema(output_schema.clone()).into(), ); adds_evaluator.evaluate(add_files_batch?.deref()) }) @@ -472,13 +526,13 @@ impl Transaction { }, ); + // Generate add actions including row tracking metadata let add_actions = build_add_actions( engine, extended_add_files, - with_row_tracking_cols(add_files_schema()), - as_log_add_schema(with_row_tracking_cols(&with_stats_col( - mandatory_add_file_schema(), - ))), + with_row_tracking_cols(self.add_files_schema()), + with_row_tracking_cols(&with_stats_col(&ADD_FILES_SCHEMA_WITH_DATA_CHANGE.clone())), + self.data_change, ); // Generate a row tracking domain metadata based on the final high water mark @@ -491,8 +545,9 @@ impl Transaction { let add_actions = build_add_actions( engine, self.add_files_metadata.iter().map(|a| Ok(a.deref())), - add_files_schema().clone(), - as_log_add_schema(with_stats_col(mandatory_add_file_schema())), + self.add_files_schema().clone(), + with_stats_col(&ADD_FILES_SCHEMA_WITH_DATA_CHANGE.clone()), + self.data_change, ); Ok((Box::new(add_actions), None)) @@ -674,13 +729,27 @@ pub struct RetryableTransaction { #[cfg(test)] mod tests { use super::*; + use crate::engine::sync::SyncEngine; use crate::schema::MapType; + use crate::Snapshot; + use std::path::PathBuf; // TODO: create a finer-grained unit tests for transactions (issue#1091) #[test] - fn test_add_files_schema() { - let schema = add_files_schema(); + + fn test_add_files_schema() -> Result<(), Box> { + let engine = SyncEngine::new(); + let path = + std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap(); + let url = url::Url::from_directory_path(path).unwrap(); + let snapshot = Snapshot::builder_for(url) + .at_version(1) + .build(&engine) + .unwrap(); + let txn = snapshot.transaction()?.with_engine_info("default engine"); + + let schema = txn.add_files_schema(); let expected = StructType::new_unchecked(vec![ StructField::not_null("path", DataType::STRING), StructField::not_null( @@ -689,7 +758,6 @@ mod tests { ), StructField::not_null("size", DataType::LONG), StructField::not_null("modificationTime", DataType::LONG), - StructField::not_null("dataChange", DataType::BOOLEAN), StructField::nullable( "stats", DataType::struct_type_unchecked(vec![StructField::nullable( @@ -699,5 +767,6 @@ mod tests { ), ]); assert_eq!(*schema, expected.into()); + Ok(()) } } diff --git a/kernel/tests/row_tracking.rs b/kernel/tests/row_tracking.rs index 8a2887673..d8830a280 100644 --- a/kernel/tests/row_tracking.rs +++ b/kernel/tests/row_tracking.rs @@ -57,7 +57,7 @@ async fn write_data_to_table( data: Vec, ) -> DeltaResult { let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?; - let mut txn = snapshot.transaction()?; + let mut txn = snapshot.transaction()?.with_data_change(true); // Write data out by spawning async tasks to simulate executors let write_context = Arc::new(txn.get_write_context()); @@ -66,7 +66,7 @@ async fn write_data_to_table( let write_context = write_context.clone(); tokio::task::spawn(async move { engine - .write_parquet(&data, write_context.as_ref(), HashMap::new(), true) + .write_parquet(&data, write_context.as_ref(), HashMap::new()) .await }) }); @@ -639,8 +639,14 @@ async fn test_row_tracking_parallel_transactions_conflict() -> DeltaResult<()> { let snapshot2 = Snapshot::builder_for(table_url.clone()).build(engine2.as_ref())?; // Create two transactions from the same snapshot (simulating parallel transactions) - let mut txn1 = snapshot1.transaction()?.with_engine_info("transaction 1"); - let mut txn2 = snapshot2.transaction()?.with_engine_info("transaction 2"); + let mut txn1 = snapshot1 + .transaction()? + .with_engine_info("transaction 1") + .with_data_change(true); + let mut txn2 = snapshot2 + .transaction()? + .with_engine_info("transaction 2") + .with_data_change(true); // Prepare data for both transactions let data1 = RecordBatch::try_new( @@ -661,7 +667,6 @@ async fn test_row_tracking_parallel_transactions_conflict() -> DeltaResult<()> { &ArrowEngineData::new(data1), write_context1.as_ref(), HashMap::new(), - true, ) .await?; @@ -670,7 +675,6 @@ async fn test_row_tracking_parallel_transactions_conflict() -> DeltaResult<()> { &ArrowEngineData::new(data2), write_context2.as_ref(), HashMap::new(), - true, ) .await?; diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index ad6297056..a8b228cd4 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -148,7 +148,7 @@ async fn write_data_and_check_result_and_stats( expected_since_commit: u64, ) -> Result<(), Box> { let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?; - let mut txn = snapshot.transaction()?; + let mut txn = snapshot.transaction()?.with_data_change(true); // create two new arrow record batches to append let append_data = [[1, 2, 3], [4, 5, 6]].map(|data| -> DeltaResult<_> { @@ -171,7 +171,6 @@ async fn write_data_and_check_result_and_stats( data.as_ref().unwrap(), write_context.as_ref(), HashMap::new(), - true, ) .await }) @@ -435,7 +434,10 @@ async fn test_append_partitioned() -> Result<(), Box> { setup_test_tables(table_schema.clone(), &[partition_col], None, "test_table").await? { let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; - let mut txn = snapshot.transaction()?.with_engine_info("default engine"); + let mut txn = snapshot + .transaction()? + .with_engine_info("default engine") + .with_data_change(false); // create two new arrow record batches to append let append_data = [[1, 2, 3], [4, 5, 6]].map(|data| -> DeltaResult<_> { @@ -463,7 +465,6 @@ async fn test_append_partitioned() -> Result<(), Box> { data.as_ref().unwrap(), write_context.as_ref(), HashMap::from([(partition_col.to_string(), partition_val.to_string())]), - true, ) .await }) @@ -524,7 +525,7 @@ async fn test_append_partitioned() -> Result<(), Box> { }, "size": size, "modificationTime": 0, - "dataChange": true, + "dataChange": false, "stats": "{\"numRecords\":3}" } }), @@ -536,7 +537,7 @@ async fn test_append_partitioned() -> Result<(), Box> { }, "size": size, "modificationTime": 0, - "dataChange": true, + "dataChange": false, "stats": "{\"numRecords\":3}" } }), @@ -602,7 +603,6 @@ async fn test_append_invalid_schema() -> Result<(), Box> data.as_ref().unwrap(), write_context.as_ref(), HashMap::new(), - true, ) .await }) @@ -807,7 +807,6 @@ async fn test_append_timestamp_ntz() -> Result<(), Box> { &ArrowEngineData::new(data.clone()), write_context.as_ref(), HashMap::new(), - true, ) .await?; @@ -832,6 +831,14 @@ async fn test_append_timestamp_ntz() -> Result<(), Box> { // Check that the add action exists assert!(parsed_commits[1].get("add").is_some()); + // Ensure default of data change is true. + assert!(parsed_commits[1] + .get("add") + .unwrap() + .get("dataChange") + .unwrap() + .as_bool() + .unwrap()); // Verify the data can be read back correctly test_read(&ArrowEngineData::new(data), &table_url, engine)?; @@ -915,7 +922,7 @@ async fn test_append_variant() -> Result<(), Box> { .await?; let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; - let mut txn = snapshot.transaction()?; + let mut txn = snapshot.transaction()?.with_data_change(true); // First value corresponds to the variant value "1". Third value corresponds to the variant // representing the JSON Object {"a":2}. @@ -1014,7 +1021,6 @@ async fn test_append_variant() -> Result<(), Box> { write_context.target_dir(), Box::new(ArrowEngineData::new(data.clone())), HashMap::new(), - true, ) .await?; @@ -1125,7 +1131,7 @@ async fn test_shredded_variant_read_rejection() -> Result<(), Box Result<(), Box