From 8f6315d80d10d53a046a224920ff8150dac22c24 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Fri, 12 Sep 2025 18:36:52 +0000 Subject: [PATCH 01/14] wip --- ffi/src/transaction/mod.rs | 48 +++++++-- kernel/examples/write-table/src/main.rs | 5 +- kernel/src/engine/default/mod.rs | 8 +- kernel/src/engine/default/parquet.rs | 34 +++--- kernel/src/row_tracking.rs | 97 +++++------------- kernel/src/transaction/mod.rs | 131 +++++++++++++++++------- kernel/tests/row_tracking.rs | 16 +-- kernel/tests/write.rs | 22 ++-- 8 files changed, 200 insertions(+), 161 deletions(-) diff --git a/ffi/src/transaction/mod.rs b/ffi/src/transaction/mod.rs index 7a22d835d..69cee51d6 100644 --- a/ffi/src/transaction/mod.rs +++ b/ffi/src/transaction/mod.rs @@ -94,6 +94,23 @@ pub unsafe extern "C" fn add_files( txn.add_files(write_metadata); } +/// +/// Mark the transaction as having data changes or not (these are recorded at the file level). +/// +/// When set here writers should not provide a data_change column in EngineData. +/// +/// # Safety +/// +/// Caller is responsible for passing a valid handle. CONSUMES TRANSACTION and commit info +#[no_mangle] +pub unsafe extern "C" fn with_data_change( + txn: Handle, + data_change: bool, +) -> Handle { + let txn = unsafe { txn.into_inner() }; + Box::new(txn.with_data_change(data_change)).into() +} + /// Attempt to commit a transaction to the table. Returns version number if successful. /// Returns error if the commit fails. /// @@ -136,11 +153,11 @@ mod tests { use delta_kernel::arrow::ffi::to_ffi; use delta_kernel::arrow::json::reader::ReaderBuilder; use delta_kernel::arrow::record_batch::RecordBatch; - use delta_kernel::engine::arrow_conversion::TryIntoArrow as _; + + use delta_kernel::engine::arrow_conversion::TryIntoArrow; use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::parquet::arrow::arrow_writer::ArrowWriter; use delta_kernel::parquet::file::properties::WriterProperties; - use delta_kernel::transaction::add_files_schema; use delta_kernel_ffi::engine_data::get_engine_data; use delta_kernel_ffi::engine_data::ArrowFFIData; @@ -194,25 +211,25 @@ mod tests { fn create_file_metadata( path: &str, num_rows: i64, + metadata_schema: ArrowSchema, ) -> Result> { - let schema: ArrowSchema = add_files_schema().as_ref().try_into_arrow()?; - let current_time: i64 = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_millis() as i64; let file_metadata = format!( - r#"{{"path":"{path}", "partitionValues": {{}}, "size": {num_rows}, "modificationTime": {current_time}, "dataChange": true, "stats": {{"numRecords": {num_rows}}}}}"#, + r#"{{"path":"{path}", "partitionValues": {{}}, "size": {num_rows}, "modificationTime": {current_time}, "stats": {{"numRecords": {num_rows}}}}}"#, ); - create_arrow_ffi_from_json(schema, file_metadata.as_str()) + create_arrow_ffi_from_json(metadata_schema, file_metadata.as_str()) } fn write_parquet_file( delta_path: &str, file_path: &str, batch: &RecordBatch, + metadata_schema: ArrowSchema, ) -> Result> { // WriterProperties can be used to set Parquet file options let props = WriterProperties::builder().build(); @@ -226,7 +243,7 @@ mod tests { // writer must be closed to write footer let res = writer.close().unwrap(); - create_file_metadata(file_path, res.num_rows) + create_file_metadata(file_path, res.num_rows, metadata_schema) } #[tokio::test] @@ -257,9 +274,10 @@ mod tests { let engine = get_default_engine(table_path_str); // Start the transaction - let txn = ok_or_panic(unsafe { + let mut txn = ok_or_panic(unsafe { transaction(kernel_string_slice!(table_path_str), engine.shallow_copy()) }); + unsafe { txn = with_data_change(txn.shallow_copy(), true) }; // Add engine info let engine_info = "default_engine"; @@ -314,7 +332,19 @@ mod tests { ]) .unwrap(); - let file_info = write_parquet_file(table_path_str, "my_file.parquet", &batch)?; + let file_info = write_parquet_file( + table_path_str, + "my_file.parquet", + &batch, + unsafe { + txn_with_engine_info + .shallow_copy() + .as_ref() + .add_files_schema() + } + .as_ref() + .try_into_arrow()?, + )?; let file_info_engine_data = ok_or_panic(unsafe { get_engine_data( diff --git a/kernel/examples/write-table/src/main.rs b/kernel/examples/write-table/src/main.rs index 8e6075ce8..d721393e7 100644 --- a/kernel/examples/write-table/src/main.rs +++ b/kernel/examples/write-table/src/main.rs @@ -91,12 +91,13 @@ async fn try_main() -> DeltaResult<()> { let mut txn = snapshot .transaction()? .with_operation("INSERT".to_string()) - .with_engine_info("default_engine/write-table-example"); + .with_engine_info("default_engine/write-table-example") + .with_data_change(true); // Write the data using the engine let write_context = Arc::new(txn.get_write_context()); let file_metadata = engine - .write_parquet(&sample_data, write_context.as_ref(), HashMap::new(), true) + .write_parquet(&sample_data, write_context.as_ref(), HashMap::new()) .await?; // Add the file metadata to the transaction diff --git a/kernel/src/engine/default/mod.rs b/kernel/src/engine/default/mod.rs index 051a6a32c..73331fdf9 100644 --- a/kernel/src/engine/default/mod.rs +++ b/kernel/src/engine/default/mod.rs @@ -98,7 +98,6 @@ impl DefaultEngine { data: &ArrowEngineData, write_context: &WriteContext, partition_values: HashMap, - data_change: bool, ) -> DeltaResult> { let transform = write_context.logical_to_physical(); let input_schema = Schema::try_from_arrow(data.record_batch().schema())?; @@ -110,12 +109,7 @@ impl DefaultEngine { ); let physical_data = logical_to_physical_expr.evaluate(data)?; self.parquet - .write_parquet_file( - write_context.target_dir(), - physical_data, - partition_values, - data_change, - ) + .write_parquet_file(write_context.target_dir(), physical_data, partition_values) .await } } diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index fa77b3294..9161161d3 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -5,7 +5,7 @@ use std::ops::Range; use std::sync::Arc; use crate::arrow::array::builder::{MapBuilder, MapFieldNames, StringBuilder}; -use crate::arrow::array::{BooleanArray, Int64Array, RecordBatch, StringArray, StructArray}; +use crate::arrow::array::{Int64Array, RecordBatch, StringArray, StructArray}; use crate::arrow::datatypes::{DataType, Field}; use crate::parquet::arrow::arrow_reader::{ ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReaderBuilder, @@ -28,7 +28,6 @@ use crate::engine::arrow_utils::{ use crate::engine::default::executor::TaskExecutor; use crate::engine::parquet_row_group_skipping::ParquetRowGroupSkipping; use crate::schema::SchemaRef; -use crate::transaction::add_files_schema; use crate::{ DeltaResult, EngineData, Error, FileDataReadResultIterator, FileMeta, ParquetHandler, PredicateRef, @@ -67,7 +66,6 @@ impl DataFileMetadata { fn as_record_batch( &self, partition_values: &HashMap, - data_change: bool, ) -> DeltaResult> { let DataFileMetadata { file_meta: @@ -99,7 +97,6 @@ impl DataFileMetadata { .try_into() .map_err(|_| Error::generic("Failed to convert parquet metadata 'size' to i64"))?; let size = Arc::new(Int64Array::from(vec![size])); - let data_change = Arc::new(BooleanArray::from(vec![data_change])); let modification_time = Arc::new(Int64Array::from(vec![*last_modified])); let stats = Arc::new(StructArray::try_new_with_length( vec![Field::new("numRecords", DataType::Int64, true)].into(), @@ -109,15 +106,12 @@ impl DataFileMetadata { )?); Ok(Box::new(ArrowEngineData::new(RecordBatch::try_new( - Arc::new(add_files_schema().as_ref().try_into_arrow()?), - vec![ - path, - partitions, - size, - modification_time, - data_change, - stats, - ], + Arc::new( + crate::transaction::BASE_ADD_FILES_SCHEMA + .as_ref() + .try_into_arrow()?, + ), + vec![path, partitions, size, modification_time, stats], )?))) } } @@ -192,16 +186,18 @@ impl DefaultParquetHandler { /// metadata as an EngineData batch which matches the [add file metadata] schema (where `` /// is a generated UUIDv4). /// - /// [add file metadata]: crate::transaction::add_files_schema + /// Note that the schema does not contain the dataChange column. Users of the default engine + /// must add this column by calling [`crate::transaction::Transaction::with_data_change`]. + /// + /// [add file metadata]: crate::transaction::Transaction::add_files_schema pub async fn write_parquet_file( &self, path: &url::Url, data: Box, partition_values: HashMap, - data_change: bool, ) -> DeltaResult> { let parquet_metadata = self.write_parquet(path, data).await?; - parquet_metadata.as_record_batch(&partition_values, data_change) + parquet_metadata.as_record_batch(&partition_values) } } @@ -504,14 +500,13 @@ mod tests { let file_metadata = FileMeta::new(location.clone(), last_modified, size); let data_file_metadata = DataFileMetadata::new(file_metadata, num_records); let partition_values = HashMap::from([("partition1".to_string(), "a".to_string())]); - let data_change = true; let actual = data_file_metadata - .as_record_batch(&partition_values, data_change) + .as_record_batch(&partition_values) .unwrap(); let actual = ArrowEngineData::try_from_engine_data(actual).unwrap(); let schema = Arc::new( - crate::transaction::add_files_schema() + crate::transaction::BASE_ADD_FILES_SCHEMA .as_ref() .try_into_arrow() .unwrap(), @@ -544,7 +539,6 @@ mod tests { Arc::new(partition_values), Arc::new(Int64Array::from(vec![size as i64])), Arc::new(Int64Array::from(vec![last_modified])), - Arc::new(BooleanArray::from(vec![data_change])), Arc::new(stats_struct), ], ) diff --git a/kernel/src/row_tracking.rs b/kernel/src/row_tracking.rs index eea1ca264..e099f72dd 100644 --- a/kernel/src/row_tracking.rs +++ b/kernel/src/row_tracking.rs @@ -6,7 +6,6 @@ use crate::actions::domain_metadata::domain_metadata_configuration; use crate::actions::DomainMetadata; use crate::engine_data::{GetData, RowVisitor, TypedGetData as _}; use crate::schema::{ColumnName, ColumnNamesAndTypes, DataType}; -use crate::transaction::add_files_schema; use crate::utils::require; use crate::{DeltaResult, Engine, Error, Snapshot}; @@ -70,7 +69,8 @@ impl TryFrom for DomainMetadata { /// A row visitor that iterates over preliminary [`Add`] actions as returned by the engine and /// computes a base row ID for each action. -/// It expects to visit engine data conforming to the schema returned by [`add_files_schema()`]. +/// It expects to visit engine data with a nested field 'stats.numRecords' which is +/// part of a Delta add action. /// /// This visitor is only required for the row tracking write path. The read path will be completely /// implemented via expressions. @@ -88,11 +88,6 @@ impl RowTrackingVisitor { /// Default value for an absent high water mark const DEFAULT_HIGH_WATER_MARK: i64 = -1; - /// Field index of "numRecords" in the [`add_files_schema()`] - /// - /// We verify this hard-coded index in a test. - const NUM_RECORDS_FIELD_INDEX: usize = 5; - pub(crate) fn new(row_id_high_water_mark: Option, num_batches: Option) -> Self { // A table might not have a row ID high water mark yet, so we model the input as an Option Self { @@ -104,14 +99,19 @@ impl RowTrackingVisitor { impl RowVisitor for RowTrackingVisitor { fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) { - static NAMES_AND_TYPES: LazyLock = - LazyLock::new(|| add_files_schema().leaves(None)); + static NAMES_AND_TYPES: LazyLock = LazyLock::new(|| { + ( + vec![ColumnName::new(["stats", "numRecords"])], + vec![DataType::LONG], + ) + .into() + }); NAMES_AND_TYPES.as_ref() } fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> { require!( - getters.len() == add_files_schema().num_fields(), + getters.len() == 1, Error::generic(format!( "Wrong number of RowTrackingVisitor getters: {}", getters.len() @@ -123,14 +123,12 @@ impl RowVisitor for RowTrackingVisitor { let mut current_hwm = self.row_id_high_water_mark; for i in 0..row_count { - let num_records: i64 = getters[Self::NUM_RECORDS_FIELD_INDEX] - .get_opt(i, "numRecords")? - .ok_or_else(|| { - Error::InternalError( - "numRecords must be present in Add actions when row tracking is enabled." - .to_string(), - ) - })?; + let num_records: i64 = getters[0].get_opt(i, "numRecords")?.ok_or_else(|| { + Error::InternalError( + "numRecords must be present in Add actions when row tracking is enabled." + .to_string(), + ) + })?; batch_base_row_ids.push(current_hwm + 1); current_hwm += num_records; } @@ -168,46 +166,15 @@ mod tests { } } - fn create_getters<'a>( - num_records_mock: &'a MockGetData, - unit_mock: &'a (), - ) -> Vec<&'a dyn GetData<'a>> { - let schema = add_files_schema(); - let mut getters: Vec<&'a dyn GetData<'a>> = Vec::new(); - - for i in 0..schema.num_fields() { - if i == RowTrackingVisitor::NUM_RECORDS_FIELD_INDEX { - getters.push(num_records_mock); - } else { - getters.push(unit_mock); - } - } - getters - } - - #[test] - fn test_num_records_field_index() { - // Verify that the correct numRecords field index is hard-coded in the RowTrackingVisitor - let num_records_field_index = add_files_schema() - .leaves(None) - .as_ref() - .0 - .iter() - .position(|name| name.path().last() == Some(&"numRecords".to_string())) - .expect("numRecords field not found"); - - assert_eq!( - num_records_field_index, - RowTrackingVisitor::NUM_RECORDS_FIELD_INDEX - ); + fn create_getters<'a>(num_records_mock: &'a MockGetData) -> Vec<&'a dyn GetData<'a>> { + vec![num_records_mock] } #[test] fn test_visit_basic_functionality() -> DeltaResult<()> { let mut visitor = RowTrackingVisitor::new(None, Some(1)); let num_records_mock = MockGetData::new(vec![Some(10), Some(5), Some(20)]); - let unit_mock = (); - let getters = create_getters(&num_records_mock, &unit_mock); + let getters = create_getters(&num_records_mock); visitor.visit(3, &getters)?; @@ -225,8 +192,7 @@ mod tests { fn test_visit_with_negative_high_water_mark() -> DeltaResult<()> { let mut visitor = RowTrackingVisitor::new(Some(-5), Some(1)); let num_records_mock = MockGetData::new(vec![Some(3), Some(2)]); - let unit_mock = (); - let getters = create_getters(&num_records_mock, &unit_mock); + let getters = create_getters(&num_records_mock); visitor.visit(2, &getters)?; @@ -244,8 +210,7 @@ mod tests { fn test_visit_with_zero_records() -> DeltaResult<()> { let mut visitor = RowTrackingVisitor::new(Some(10), Some(1)); let num_records_mock = MockGetData::new(vec![Some(0), Some(0), Some(5)]); - let unit_mock = (); - let getters = create_getters(&num_records_mock, &unit_mock); + let getters = create_getters(&num_records_mock); visitor.visit(3, &getters)?; @@ -263,8 +228,7 @@ mod tests { fn test_visit_empty_batch() -> DeltaResult<()> { let mut visitor = RowTrackingVisitor::new(Some(42), None); let num_records_mock = MockGetData::new(vec![]); - let unit_mock = (); - let getters = create_getters(&num_records_mock, &unit_mock); + let getters = create_getters(&num_records_mock); visitor.visit(0, &getters)?; @@ -279,16 +243,15 @@ mod tests { #[test] fn test_visit_multiple_batches() -> DeltaResult<()> { let mut visitor = RowTrackingVisitor::new(Some(0), Some(2)); - let unit_mock = (); // First batch let num_records_mock1 = MockGetData::new(vec![Some(10), Some(5)]); - let getters1 = create_getters(&num_records_mock1, &unit_mock); + let getters1 = create_getters(&num_records_mock1); visitor.visit(2, &getters1)?; // Second batch let num_records_mock2 = MockGetData::new(vec![Some(3), Some(7), Some(2)]); - let getters2 = create_getters(&num_records_mock2, &unit_mock); + let getters2 = create_getters(&num_records_mock2); visitor.visit(3, &getters2)?; // Check that we have two batches @@ -309,8 +272,7 @@ mod tests { #[test] fn test_visit_wrong_getter_count() -> DeltaResult<()> { let mut visitor = RowTrackingVisitor::new(Some(0), None); - let unit_mock = (); - let wrong_getters: Vec<&dyn GetData<'_>> = vec![&unit_mock]; // Only one getter instead of expected count + let wrong_getters: Vec<&dyn GetData<'_>> = vec![]; // Only one getter instead of expected count let result = visitor.visit(1, &wrong_getters); assert_result_error_with_message(result, "Wrong number of RowTrackingVisitor getters"); @@ -322,8 +284,7 @@ mod tests { fn test_visit_missing_num_records() -> DeltaResult<()> { let mut visitor = RowTrackingVisitor::new(Some(0), None); let num_records_mock = MockGetData::new(vec![None]); // Missing numRecords - let unit_mock = (); - let getters = create_getters(&num_records_mock, &unit_mock); + let getters = create_getters(&num_records_mock); let result = visitor.visit(1, &getters); assert_result_error_with_message( @@ -339,10 +300,8 @@ mod tests { let visitor = RowTrackingVisitor::new(Some(0), None); let (names, types) = visitor.selected_column_names_and_types(); - // Should return the same as add_files_schema().leaves(None) - let expected = add_files_schema().leaves(None); - assert_eq!(names, expected.as_ref().0); - assert_eq!(types, expected.as_ref().1); + assert_eq!(names, (vec![ColumnName::new(["stats", "numRecords"])])); + assert_eq!(types, vec![DataType::LONG]); } #[test] diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 79aa658eb..b1e82fd44 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -26,7 +26,7 @@ use crate::{ type EngineDataResultIterator<'a> = Box>> + Send + 'a>; -/// The minimal (i.e., mandatory) fields in an add action. +/// The static instance referenced by [`add_files_schema`] that doesn't contain the dataChange column. pub(crate) static MANDATORY_ADD_FILE_SCHEMA: LazyLock = LazyLock::new(|| { Arc::new(StructType::new_unchecked(vec![ StructField::not_null("path", DataType::STRING), @@ -36,17 +36,20 @@ pub(crate) static MANDATORY_ADD_FILE_SCHEMA: LazyLock = LazyLock::new ), StructField::not_null("size", DataType::LONG), StructField::not_null("modificationTime", DataType::LONG), - StructField::not_null("dataChange", DataType::BOOLEAN), ])) }); /// Returns a reference to the mandatory fields in an add action. +/// +/// Note this does not include "dataChange" which is a required field but +/// but should be set on the transactoin level. Getting the full schema +/// can be done with [`Transaction::add_files_schema`]. pub(crate) fn mandatory_add_file_schema() -> &'static SchemaRef { &MANDATORY_ADD_FILE_SCHEMA } /// The static instance referenced by [`add_files_schema`]. -pub(crate) static ADD_FILES_SCHEMA: LazyLock = LazyLock::new(|| { +pub(crate) static BASE_ADD_FILES_SCHEMA: LazyLock = LazyLock::new(|| { let stats = StructField::nullable( "stats", DataType::struct_type_unchecked(vec![StructField::nullable("numRecords", DataType::LONG)]), @@ -57,22 +60,20 @@ pub(crate) static ADD_FILES_SCHEMA: LazyLock = LazyLock::new(|| { )) }); -/// The schema that the [`Engine`]'s [`ParquetHandler`] is expected to use when reporting information about -/// a Parquet write operation back to Kernel. -/// -/// Concretely, it is the expected schema for [`EngineData`] passed to [`add_files`], as it is the base -/// for constructing an add_file (and soon remove_file) action. Each row represents metadata about a -/// file to be added to the table. Kernel takes this information and extends it to the full add_file -/// action schema, adding additional fields (e.g., baseRowID) as necessary. -/// -/// For now, Kernel only supports the number of records as a file statistic. -/// This will change in a future release. -/// -/// [`add_files`]: crate::transaction::Transaction::add_files -/// [`ParquetHandler`]: crate::ParquetHandler -pub fn add_files_schema() -> &'static SchemaRef { - &ADD_FILES_SCHEMA -} +static DATA_CHANGE_COLUMN: LazyLock = + LazyLock::new(|| StructField::not_null("dataChange", DataType::BOOLEAN)); + +/// The static instance referenced by [`add_files_schema`] that contains the dataChange column. +static ADD_FILES_SCHEMA_WITH_DATA_CHANGE: LazyLock = LazyLock::new(|| { + let mut fields = BASE_ADD_FILES_SCHEMA.fields().collect::>(); + let len = fields.len(); + let insert_position = fields + .iter() + .position(|f| f.name() == "modificationTime") + .unwrap_or(len); + fields.insert(insert_position + 1, &DATA_CHANGE_COLUMN); + Arc::new(StructType::new_unchecked(fields.into_iter().cloned())) +}); // NOTE: The following two methods are a workaround for the fact that we do not have a proper SchemaBuilder yet. // See https://github.com/delta-io/delta-kernel-rs/issues/1284 @@ -81,7 +82,7 @@ pub fn add_files_schema() -> &'static SchemaRef { /// The stats column is of type string as required by the spec. /// /// Note that this method is only useful to extend an Add action schema. -fn with_stats_col(schema: &SchemaRef) -> SchemaRef { +pub(crate) fn with_stats_col(schema: &SchemaRef) -> SchemaRef { let fields = schema .fields() .cloned() @@ -129,6 +130,8 @@ pub struct Transaction { // keep all timestamps within the same commit consistent. commit_timestamp: i64, domain_metadatas: Vec, + // Whether this transaction contains any logical data changes. + data_change: bool, } impl std::fmt::Debug for Transaction { @@ -166,6 +169,7 @@ impl Transaction { set_transactions: vec![], commit_timestamp, domain_metadatas: vec![], + data_change: false, }) } @@ -256,6 +260,17 @@ impl Transaction { } } + /// Set the data change flag. + /// + /// Data change might be set to false in the following scenarios: + /// 1. Operations that only change metadata (e.g. backfilling statistics) + /// 2. Operations that make no logical changes to the contents of the table (i.e. rows are only moved + /// from old files to new ones. OPTIMIZE commands is one example of this type of optimizaton). + pub fn with_data_change(mut self, data_change: bool) -> Self { + self.data_change = data_change; + self + } + /// Set the operation that this transaction is performing. This string will be persisted in the /// commit and visible to anyone who describes the table history. pub fn with_operation(mut self, operation: String) -> Self { @@ -340,6 +355,26 @@ impl Transaction { .map(|dm| dm.into_engine_data(get_log_domain_metadata_schema().clone(), engine))) } + /// The schema that the [`Engine`]'s [`ParquetHandler`] is expected to use when reporting information about + /// a Parquet write operation back to Kernel. + /// + /// Concretely, it is the expected schema for [`EngineData`] passed to [`add_files`], as it is the base + /// for constructing an add_file. Each row represents metadata about a + /// file to be added to the table. Kernel takes this information and extends it to the full add_file + /// action schema, adding additional fields (e.g., baseRowID) as necessary. + /// + /// For now, Kernel only supports the number of records as a file statistic. + /// This will change in a future release. + /// + /// Note: While currently static, in the future the schema might change depending on + /// options set on the transaction or features enabled on the table. + /// + /// [`add_files`]: crate::transaction::Transaction::add_files + /// [`ParquetHandler`]: crate::ParquetHandler + pub fn add_files_schema(&self) -> &'static SchemaRef { + &BASE_ADD_FILES_SCHEMA + } + // Generate the logical-to-physical transform expression which must be evaluated on every data // chunk before writing. At the moment, this is a transaction-wide expression. fn generate_logical_to_physical(&self) -> Expression { @@ -378,7 +413,7 @@ impl Transaction { /// add/append/insert data (files) to the table. Note that this API can be called multiple times /// to add multiple batches. /// - /// The expected schema for `add_metadata` is given by [`add_files_schema`]. + /// The expected schema for `add_metadata` is given by [`Transaction::add_files_schema`]. pub fn add_files(&mut self, add_metadata: Box) { self.add_files_metadata.push(add_metadata); } @@ -397,6 +432,7 @@ impl Transaction { add_files_metadata: I, input_schema: SchemaRef, output_schema: SchemaRef, + data_change: bool, ) -> impl Iterator>> + 'a where I: Iterator> + Send + 'a, @@ -406,16 +442,22 @@ impl Transaction { add_files_metadata.map(move |add_files_batch| { // Convert stats to a JSON string and nest the add action in a top-level struct - let adds_expr = Expression::struct_from([Expression::transform( - Transform::new_top_level().with_replaced_field( - "stats", - Expression::unary(ToJson, Expression::column(["stats"])).into(), - ), - )]); + let transform = Expression::transform( + Transform::new_top_level() + .with_inserted_field( + Some("modificationTime"), + Expression::literal(data_change).into(), + ) + .with_replaced_field( + "stats", + Expression::unary(ToJson, Expression::column(["stats"])).into(), + ), + ); + let adds_expr = Expression::struct_from([transform]); let adds_evaluator = evaluation_handler.new_expression_evaluator( input_schema.clone(), Arc::new(adds_expr), - output_schema.clone().into(), + as_log_add_schema(output_schema.clone()).into(), ); adds_evaluator.evaluate(add_files_batch?.deref()) }) @@ -472,13 +514,13 @@ impl Transaction { }, ); + // Generate add actions including row tracking metadata let add_actions = build_add_actions( engine, extended_add_files, - with_row_tracking_cols(add_files_schema()), - as_log_add_schema(with_row_tracking_cols(&with_stats_col( - mandatory_add_file_schema(), - ))), + with_row_tracking_cols(self.add_files_schema()), + with_row_tracking_cols(&with_stats_col(&ADD_FILES_SCHEMA_WITH_DATA_CHANGE.clone())), + self.data_change, ); // Generate a row tracking domain metadata based on the final high water mark @@ -491,8 +533,9 @@ impl Transaction { let add_actions = build_add_actions( engine, self.add_files_metadata.iter().map(|a| Ok(a.deref())), - add_files_schema().clone(), - as_log_add_schema(with_stats_col(mandatory_add_file_schema())), + self.add_files_schema().clone(), + with_stats_col(&ADD_FILES_SCHEMA_WITH_DATA_CHANGE.clone()), + self.data_change, ); Ok((Box::new(add_actions), None)) @@ -674,13 +717,27 @@ pub struct RetryableTransaction { #[cfg(test)] mod tests { use super::*; + use crate::engine::sync::SyncEngine; use crate::schema::MapType; + use crate::Snapshot; + use std::path::PathBuf; // TODO: create a finer-grained unit tests for transactions (issue#1091) #[test] - fn test_add_files_schema() { - let schema = add_files_schema(); + + fn test_add_files_schema() -> Result<(), Box> { + let engine = SyncEngine::new(); + let path = + std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap(); + let url = url::Url::from_directory_path(path).unwrap(); + let snapshot = Snapshot::builder_for(url) + .at_version(1) + .build(&engine) + .unwrap(); + let txn = snapshot.transaction()?.with_engine_info("default engine"); + + let schema = txn.add_files_schema(); let expected = StructType::new_unchecked(vec![ StructField::not_null("path", DataType::STRING), StructField::not_null( @@ -689,7 +746,6 @@ mod tests { ), StructField::not_null("size", DataType::LONG), StructField::not_null("modificationTime", DataType::LONG), - StructField::not_null("dataChange", DataType::BOOLEAN), StructField::nullable( "stats", DataType::struct_type_unchecked(vec![StructField::nullable( @@ -699,5 +755,6 @@ mod tests { ), ]); assert_eq!(*schema, expected.into()); + Ok(()) } } diff --git a/kernel/tests/row_tracking.rs b/kernel/tests/row_tracking.rs index 8a2887673..d8830a280 100644 --- a/kernel/tests/row_tracking.rs +++ b/kernel/tests/row_tracking.rs @@ -57,7 +57,7 @@ async fn write_data_to_table( data: Vec, ) -> DeltaResult { let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?; - let mut txn = snapshot.transaction()?; + let mut txn = snapshot.transaction()?.with_data_change(true); // Write data out by spawning async tasks to simulate executors let write_context = Arc::new(txn.get_write_context()); @@ -66,7 +66,7 @@ async fn write_data_to_table( let write_context = write_context.clone(); tokio::task::spawn(async move { engine - .write_parquet(&data, write_context.as_ref(), HashMap::new(), true) + .write_parquet(&data, write_context.as_ref(), HashMap::new()) .await }) }); @@ -639,8 +639,14 @@ async fn test_row_tracking_parallel_transactions_conflict() -> DeltaResult<()> { let snapshot2 = Snapshot::builder_for(table_url.clone()).build(engine2.as_ref())?; // Create two transactions from the same snapshot (simulating parallel transactions) - let mut txn1 = snapshot1.transaction()?.with_engine_info("transaction 1"); - let mut txn2 = snapshot2.transaction()?.with_engine_info("transaction 2"); + let mut txn1 = snapshot1 + .transaction()? + .with_engine_info("transaction 1") + .with_data_change(true); + let mut txn2 = snapshot2 + .transaction()? + .with_engine_info("transaction 2") + .with_data_change(true); // Prepare data for both transactions let data1 = RecordBatch::try_new( @@ -661,7 +667,6 @@ async fn test_row_tracking_parallel_transactions_conflict() -> DeltaResult<()> { &ArrowEngineData::new(data1), write_context1.as_ref(), HashMap::new(), - true, ) .await?; @@ -670,7 +675,6 @@ async fn test_row_tracking_parallel_transactions_conflict() -> DeltaResult<()> { &ArrowEngineData::new(data2), write_context2.as_ref(), HashMap::new(), - true, ) .await?; diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index ad6297056..1ceee4742 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -148,7 +148,7 @@ async fn write_data_and_check_result_and_stats( expected_since_commit: u64, ) -> Result<(), Box> { let snapshot = Snapshot::builder_for(table_url.clone()).build(engine.as_ref())?; - let mut txn = snapshot.transaction()?; + let mut txn = snapshot.transaction()?.with_data_change(true); // create two new arrow record batches to append let append_data = [[1, 2, 3], [4, 5, 6]].map(|data| -> DeltaResult<_> { @@ -171,7 +171,6 @@ async fn write_data_and_check_result_and_stats( data.as_ref().unwrap(), write_context.as_ref(), HashMap::new(), - true, ) .await }) @@ -435,7 +434,10 @@ async fn test_append_partitioned() -> Result<(), Box> { setup_test_tables(table_schema.clone(), &[partition_col], None, "test_table").await? { let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; - let mut txn = snapshot.transaction()?.with_engine_info("default engine"); + let mut txn = snapshot + .transaction()? + .with_engine_info("default engine") + .with_data_change(true); // create two new arrow record batches to append let append_data = [[1, 2, 3], [4, 5, 6]].map(|data| -> DeltaResult<_> { @@ -463,7 +465,6 @@ async fn test_append_partitioned() -> Result<(), Box> { data.as_ref().unwrap(), write_context.as_ref(), HashMap::from([(partition_col.to_string(), partition_val.to_string())]), - true, ) .await }) @@ -602,7 +603,6 @@ async fn test_append_invalid_schema() -> Result<(), Box> data.as_ref().unwrap(), write_context.as_ref(), HashMap::new(), - true, ) .await }) @@ -780,7 +780,10 @@ async fn test_append_timestamp_ntz() -> Result<(), Box> { .await?; let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; - let mut txn = snapshot.transaction()?.with_engine_info("default engine"); + let mut txn = snapshot + .transaction()? + .with_engine_info("default engine") + .with_data_change(true); // Create Arrow data with TIMESTAMP_NTZ values including edge cases // These are microseconds since Unix epoch @@ -807,7 +810,6 @@ async fn test_append_timestamp_ntz() -> Result<(), Box> { &ArrowEngineData::new(data.clone()), write_context.as_ref(), HashMap::new(), - true, ) .await?; @@ -915,7 +917,7 @@ async fn test_append_variant() -> Result<(), Box> { .await?; let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; - let mut txn = snapshot.transaction()?; + let mut txn = snapshot.transaction()?.with_data_change(true); // First value corresponds to the variant value "1". Third value corresponds to the variant // representing the JSON Object {"a":2}. @@ -1014,7 +1016,6 @@ async fn test_append_variant() -> Result<(), Box> { write_context.target_dir(), Box::new(ArrowEngineData::new(data.clone())), HashMap::new(), - true, ) .await?; @@ -1125,7 +1126,7 @@ async fn test_shredded_variant_read_rejection() -> Result<(), Box Result<(), Box Date: Sat, 27 Sep 2025 00:31:46 +0000 Subject: [PATCH 02/14] fix nits --- kernel/src/transaction/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index b1e82fd44..3c3320613 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -82,7 +82,7 @@ static ADD_FILES_SCHEMA_WITH_DATA_CHANGE: LazyLock = LazyLock::new(|| /// The stats column is of type string as required by the spec. /// /// Note that this method is only useful to extend an Add action schema. -pub(crate) fn with_stats_col(schema: &SchemaRef) -> SchemaRef { +fn with_stats_col(schema: &SchemaRef) -> SchemaRef { let fields = schema .fields() .cloned() @@ -169,7 +169,7 @@ impl Transaction { set_transactions: vec![], commit_timestamp, domain_metadatas: vec![], - data_change: false, + data_change: true, }) } From 0b77071c3b0d05c4495fa75122056ab7462440b7 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Sat, 27 Sep 2025 00:37:17 +0000 Subject: [PATCH 03/14] test with_data_change=false --- kernel/tests/write.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index 1ceee4742..6349f6026 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -437,7 +437,7 @@ async fn test_append_partitioned() -> Result<(), Box> { let mut txn = snapshot .transaction()? .with_engine_info("default engine") - .with_data_change(true); + .with_data_change(false); // create two new arrow record batches to append let append_data = [[1, 2, 3], [4, 5, 6]].map(|data| -> DeltaResult<_> { @@ -525,7 +525,7 @@ async fn test_append_partitioned() -> Result<(), Box> { }, "size": size, "modificationTime": 0, - "dataChange": true, + "dataChange": false, "stats": "{\"numRecords\":3}" } }), @@ -537,7 +537,7 @@ async fn test_append_partitioned() -> Result<(), Box> { }, "size": size, "modificationTime": 0, - "dataChange": true, + "dataChange": false, "stats": "{\"numRecords\":3}" } }), From 6153c54890aa16888d1db9e95b0cf6a6cd80961d Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Tue, 30 Sep 2025 05:10:11 +0000 Subject: [PATCH 04/14] don't consume --- ffi/src/transaction/mod.rs | 14 +++++++------- kernel/src/transaction/mod.rs | 6 ++++++ 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/ffi/src/transaction/mod.rs b/ffi/src/transaction/mod.rs index 69cee51d6..aa2daae66 100644 --- a/ffi/src/transaction/mod.rs +++ b/ffi/src/transaction/mod.rs @@ -104,11 +104,11 @@ pub unsafe extern "C" fn add_files( /// Caller is responsible for passing a valid handle. CONSUMES TRANSACTION and commit info #[no_mangle] pub unsafe extern "C" fn with_data_change( - txn: Handle, + mut txn: Handle, data_change: bool, -) -> Handle { - let txn = unsafe { txn.into_inner() }; - Box::new(txn.with_data_change(data_change)).into() +) { + let underlying_txn = unsafe { txn.as_mut() }; + underlying_txn.set_data_change(data_change); } /// Attempt to commit a transaction to the table. Returns version number if successful. @@ -274,10 +274,10 @@ mod tests { let engine = get_default_engine(table_path_str); // Start the transaction - let mut txn = ok_or_panic(unsafe { + let txn = ok_or_panic(unsafe { transaction(kernel_string_slice!(table_path_str), engine.shallow_copy()) }); - unsafe { txn = with_data_change(txn.shallow_copy(), true) }; + unsafe { with_data_change(txn.shallow_copy(), false) }; // Add engine info let engine_info = "default_engine"; @@ -395,7 +395,7 @@ mod tests { "partitionValues": {}, "size": 0, "modificationTime": 0, - "dataChange": true, + "dataChange": false, "stats": "{\"numRecords\":5}" } }), diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 3c3320613..e26d3bbf2 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -271,6 +271,12 @@ impl Transaction { self } + /// Same as [`with_data_change`] but set the value directly instead of + /// using a fluent API. + pub fn set_data_change(&mut self, data_change: bool) { + self.data_change = data_change; + } + /// Set the operation that this transaction is performing. This string will be persisted in the /// commit and visible to anyone who describes the table history. pub fn with_operation(mut self, operation: String) -> Self { From eb0708dd87984d1e9d2c52fd23b2492bbf62b3bd Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Tue, 30 Sep 2025 05:11:38 +0000 Subject: [PATCH 05/14] update contract --- ffi/src/transaction/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ffi/src/transaction/mod.rs b/ffi/src/transaction/mod.rs index aa2daae66..dbdbea7d0 100644 --- a/ffi/src/transaction/mod.rs +++ b/ffi/src/transaction/mod.rs @@ -101,9 +101,9 @@ pub unsafe extern "C" fn add_files( /// /// # Safety /// -/// Caller is responsible for passing a valid handle. CONSUMES TRANSACTION and commit info +/// Caller is responsible for passing a valid handle. #[no_mangle] -pub unsafe extern "C" fn with_data_change( +pub unsafe extern "C" fn set_data_change( mut txn: Handle, data_change: bool, ) { From c84fd57a04f8c18c9232d0b8009f43f00465785d Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Tue, 30 Sep 2025 05:11:57 +0000 Subject: [PATCH 06/14] update name --- ffi/src/transaction/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ffi/src/transaction/mod.rs b/ffi/src/transaction/mod.rs index dbdbea7d0..c55da776d 100644 --- a/ffi/src/transaction/mod.rs +++ b/ffi/src/transaction/mod.rs @@ -277,7 +277,7 @@ mod tests { let txn = ok_or_panic(unsafe { transaction(kernel_string_slice!(table_path_str), engine.shallow_copy()) }); - unsafe { with_data_change(txn.shallow_copy(), false) }; + unsafe { set_data_change(txn.shallow_copy(), false) }; // Add engine info let engine_info = "default_engine"; From 058d56867ba58887dcd3ffbfd703f316565203f6 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Tue, 30 Sep 2025 05:14:01 +0000 Subject: [PATCH 07/14] update contract again --- ffi/src/transaction/mod.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/ffi/src/transaction/mod.rs b/ffi/src/transaction/mod.rs index c55da776d..5305fb5a1 100644 --- a/ffi/src/transaction/mod.rs +++ b/ffi/src/transaction/mod.rs @@ -97,8 +97,6 @@ pub unsafe extern "C" fn add_files( /// /// Mark the transaction as having data changes or not (these are recorded at the file level). /// -/// When set here writers should not provide a data_change column in EngineData. -/// /// # Safety /// /// Caller is responsible for passing a valid handle. From 76cf43ce297caeab61632cceff64ac4cb3d366a6 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Tue, 30 Sep 2025 05:16:19 +0000 Subject: [PATCH 08/14] fix fmt --- ffi/src/transaction/mod.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/ffi/src/transaction/mod.rs b/ffi/src/transaction/mod.rs index 5305fb5a1..53c226617 100644 --- a/ffi/src/transaction/mod.rs +++ b/ffi/src/transaction/mod.rs @@ -99,12 +99,9 @@ pub unsafe extern "C" fn add_files( /// /// # Safety /// -/// Caller is responsible for passing a valid handle. +/// Caller is responsible for passing a valid handle. #[no_mangle] -pub unsafe extern "C" fn set_data_change( - mut txn: Handle, - data_change: bool, -) { +pub unsafe extern "C" fn set_data_change(mut txn: Handle, data_change: bool) { let underlying_txn = unsafe { txn.as_mut() }; underlying_txn.set_data_change(data_change); } From ba4761930b431f36965c6f4817f3677486f2e841 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Tue, 30 Sep 2025 05:17:27 +0000 Subject: [PATCH 09/14] fix docs --- kernel/src/transaction/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index e26d3bbf2..7acdedce8 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -271,7 +271,7 @@ impl Transaction { self } - /// Same as [`with_data_change`] but set the value directly instead of + /// Same as [`Transaction::with_data_change`] but set the value directly instead of /// using a fluent API. pub fn set_data_change(&mut self, data_change: bool) { self.data_change = data_change; From 91c80610965b11d0f45885855f9659f07234ba38 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Thu, 9 Oct 2025 17:24:07 +0000 Subject: [PATCH 10/14] address feedback --- ffi/src/transaction/mod.rs | 16 +++++++--------- kernel/src/engine/default/parquet.rs | 4 ++-- kernel/src/transaction/mod.rs | 9 +++++++-- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/ffi/src/transaction/mod.rs b/ffi/src/transaction/mod.rs index 53c226617..c8e710fc3 100644 --- a/ffi/src/transaction/mod.rs +++ b/ffi/src/transaction/mod.rs @@ -326,19 +326,17 @@ mod tests { ), ]) .unwrap(); - + let parquet_schema = unsafe { + txn_with_engine_info + .shallow_copy() + .as_ref() + .add_files_schema() + }; let file_info = write_parquet_file( table_path_str, "my_file.parquet", &batch, - unsafe { - txn_with_engine_info - .shallow_copy() - .as_ref() - .add_files_schema() - } - .as_ref() - .try_into_arrow()?, + parquet_schema.as_ref().try_into_arrow()?, )?; let file_info_engine_data = ok_or_panic(unsafe { diff --git a/kernel/src/engine/default/parquet.rs b/kernel/src/engine/default/parquet.rs index 9161161d3..ac63877ec 100644 --- a/kernel/src/engine/default/parquet.rs +++ b/kernel/src/engine/default/parquet.rs @@ -186,8 +186,8 @@ impl DefaultParquetHandler { /// metadata as an EngineData batch which matches the [add file metadata] schema (where `` /// is a generated UUIDv4). /// - /// Note that the schema does not contain the dataChange column. Users of the default engine - /// must add this column by calling [`crate::transaction::Transaction::with_data_change`]. + /// Note that the schema does not contain the dataChange column. In order to set `data_change` flag, + /// use [`crate::transaction::Transaction::with_data_change`]. /// /// [add file metadata]: crate::transaction::Transaction::add_files_schema pub async fn write_parquet_file( diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 7acdedce8..54d94b7b7 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -21,6 +21,7 @@ use crate::{ DataType, DeltaResult, Engine, EngineData, Expression, ExpressionRef, IntoEngineData, RowVisitor, Version, }; +use delta_kernel_derive::internal_api; /// Type alias for an iterator of [`EngineData`] results. type EngineDataResultIterator<'a> = @@ -262,6 +263,9 @@ impl Transaction { /// Set the data change flag. /// + /// True indicates this commit is a "data changing" commit. False indicates table data was + /// reorganized but not materially modified. + /// /// Data change might be set to false in the following scenarios: /// 1. Operations that only change metadata (e.g. backfilling statistics) /// 2. Operations that make no logical changes to the contents of the table (i.e. rows are only moved @@ -273,7 +277,8 @@ impl Transaction { /// Same as [`Transaction::with_data_change`] but set the value directly instead of /// using a fluent API. - pub fn set_data_change(&mut self, data_change: bool) { + #[internal_api] + pub(crate)fn set_data_change(&mut self, data_change: bool) { self.data_change = data_change; } @@ -367,7 +372,7 @@ impl Transaction { /// Concretely, it is the expected schema for [`EngineData`] passed to [`add_files`], as it is the base /// for constructing an add_file. Each row represents metadata about a /// file to be added to the table. Kernel takes this information and extends it to the full add_file - /// action schema, adding additional fields (e.g., baseRowID) as necessary. + /// action schema, adding internal fields (e.g., baseRowID) as necessary. /// /// For now, Kernel only supports the number of records as a file statistic. /// This will change in a future release. From 66e58f04f971f0056dbd90c5fc28cba3763d72e6 Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Thu, 9 Oct 2025 17:25:14 +0000 Subject: [PATCH 11/14] format --- kernel/src/transaction/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 54d94b7b7..0b9b6047a 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -278,7 +278,7 @@ impl Transaction { /// Same as [`Transaction::with_data_change`] but set the value directly instead of /// using a fluent API. #[internal_api] - pub(crate)fn set_data_change(&mut self, data_change: bool) { + pub(crate) fn set_data_change(&mut self, data_change: bool) { self.data_change = data_change; } From f08306f4c8192875c1140285b559796299e4096b Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Thu, 9 Oct 2025 17:30:57 +0000 Subject: [PATCH 12/14] marker for dead code --- kernel/src/transaction/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 0b9b6047a..a3c039718 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -278,6 +278,7 @@ impl Transaction { /// Same as [`Transaction::with_data_change`] but set the value directly instead of /// using a fluent API. #[internal_api] + #[allow(dead_code)] // used in FFI pub(crate) fn set_data_change(&mut self, data_change: bool) { self.data_change = data_change; } From f431f59872867e378f77be0b8cebb535acf13e3c Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Thu, 9 Oct 2025 21:41:26 +0000 Subject: [PATCH 13/14] remove arg --- kernel/tests/write.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index 6349f6026..7f9e5821c 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -1436,7 +1436,6 @@ async fn generate_and_add_data_file( &ArrowEngineData::new(data), write_context.as_ref(), HashMap::new(), - true, ) .await?; txn.add_files(file_meta); From a9159f03f6260388d7bbf9f4fd2ca06f8db21f8f Mon Sep 17 00:00:00 2001 From: Micah Kornfield Date: Wed, 15 Oct 2025 19:57:10 +0000 Subject: [PATCH 14/14] address comments --- kernel/src/row_tracking.rs | 2 +- kernel/tests/write.rs | 13 +++++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/kernel/src/row_tracking.rs b/kernel/src/row_tracking.rs index e099f72dd..d4f56b3dd 100644 --- a/kernel/src/row_tracking.rs +++ b/kernel/src/row_tracking.rs @@ -272,7 +272,7 @@ mod tests { #[test] fn test_visit_wrong_getter_count() -> DeltaResult<()> { let mut visitor = RowTrackingVisitor::new(Some(0), None); - let wrong_getters: Vec<&dyn GetData<'_>> = vec![]; // Only one getter instead of expected count + let wrong_getters: Vec<&dyn GetData<'_>> = vec![]; // No getters instead of expected count let result = visitor.visit(1, &wrong_getters); assert_result_error_with_message(result, "Wrong number of RowTrackingVisitor getters"); diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index 7f9e5821c..a8b228cd4 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -780,10 +780,7 @@ async fn test_append_timestamp_ntz() -> Result<(), Box> { .await?; let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; - let mut txn = snapshot - .transaction()? - .with_engine_info("default engine") - .with_data_change(true); + let mut txn = snapshot.transaction()?.with_engine_info("default engine"); // Create Arrow data with TIMESTAMP_NTZ values including edge cases // These are microseconds since Unix epoch @@ -834,6 +831,14 @@ async fn test_append_timestamp_ntz() -> Result<(), Box> { // Check that the add action exists assert!(parsed_commits[1].get("add").is_some()); + // Ensure default of data change is true. + assert!(parsed_commits[1] + .get("add") + .unwrap() + .get("dataChange") + .unwrap() + .as_bool() + .unwrap()); // Verify the data can be read back correctly test_read(&ArrowEngineData::new(data), &table_url, engine)?;