From bd8977fe788b8e209798ec93940fe762926f38f0 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Tue, 23 Sep 2025 16:43:40 -0700 Subject: [PATCH 1/7] commit type changes --- ffi/src/transaction/mod.rs | 14 ++-- kernel/examples/write-table/src/main.rs | 6 +- kernel/src/transaction/mod.rs | 93 ++++++++++++++++++------- kernel/tests/row_tracking.rs | 29 +++++--- kernel/tests/write.rs | 11 ++- 5 files changed, 105 insertions(+), 48 deletions(-) diff --git a/ffi/src/transaction/mod.rs b/ffi/src/transaction/mod.rs index 8bb3681b0..95ed5e38c 100644 --- a/ffi/src/transaction/mod.rs +++ b/ffi/src/transaction/mod.rs @@ -112,13 +112,13 @@ pub unsafe extern "C" fn commit( // TODO: for now this removes the enum, which prevents doing any conflict resolution. We should fix // this by making the commit function return the enum somehow. match txn.commit(engine.as_ref()) { - Ok(CommitResult::Committed { - version: v, - post_commit_stats: _, - }) => Ok(v), - Ok(CommitResult::Conflict(_, v)) => Err(delta_kernel::Error::Generic(format!( - "commit conflict at version {v}" - ))), + Ok(CommitResult::CommittedTransaction(committed)) => Ok(committed.version()), + Ok(CommitResult::ConflictedTransaction(conflicted)) => { + Err(delta_kernel::Error::Generic(format!( + "commit conflict at version {}", + conflicted.conflict_version + ))) + } Err(e) => Err(e), } .into_extern_result(&extern_engine) diff --git a/kernel/examples/write-table/src/main.rs b/kernel/examples/write-table/src/main.rs index 729e9eabd..a8ae2bcea 100644 --- a/kernel/examples/write-table/src/main.rs +++ b/kernel/examples/write-table/src/main.rs @@ -104,7 +104,8 @@ async fn try_main() -> DeltaResult<()> { // Commit the transaction match txn.commit(&engine)? { - CommitResult::Committed { version, .. } => { + CommitResult::CommittedTransaction(committed) => { + let version = committed.version(); println!("✓ Committed transaction at version {version}"); println!("✓ Successfully wrote {} rows to the table", cli.num_rows); @@ -114,7 +115,8 @@ async fn try_main() -> DeltaResult<()> { Ok(()) } - CommitResult::Conflict(_, conflicting_version) => { + CommitResult::ConflictedTransaction(conflicted) => { + let conflicting_version = conflicted.conflict_version; println!("✗ Failed to write data, transaction conflicted with version: {conflicting_version}"); Err(Error::generic("Commit failed")) } diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 982f572f3..65f6be7f5 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -19,7 +19,7 @@ use crate::snapshot::SnapshotRef; use crate::utils::current_time_ms; use crate::{ DataType, DeltaResult, Engine, EngineData, Expression, ExpressionRef, IntoEngineData, - RowVisitor, Version, + RowVisitor, Snapshot, Version, }; /// Type alias for an iterator of [`EngineData`] results. @@ -227,22 +227,12 @@ impl Transaction { let json_handler = engine.json_handler(); match json_handler.write_json_file(&commit_path.location, Box::new(filtered_actions), false) { - Ok(()) => Ok(CommitResult::Committed { - version: commit_version, - post_commit_stats: PostCommitStats { - commits_since_checkpoint: self - .read_snapshot - .log_segment() - .commits_since_checkpoint() - + 1, - commits_since_log_compaction: self - .read_snapshot - .log_segment() - .commits_since_log_compaction_or_checkpoint() - + 1, - }, - }), - Err(Error::FileAlreadyExists(_)) => Ok(CommitResult::Conflict(self, commit_version)), + Ok(()) => Ok(CommitResult::CommittedTransaction( + self.into_committed(commit_version), + )), + Err(Error::FileAlreadyExists(_)) => Ok(CommitResult::ConflictedTransaction( + self.into_conflicted(commit_version), + )), Err(e) => Err(e), } } @@ -485,6 +475,31 @@ impl Transaction { Ok((Box::new(add_actions), None)) } } + + fn into_committed(self, version: Version) -> CommittedTransaction { + let stats = PostCommitStats { + commits_since_checkpoint: self.read_snapshot.log_segment().commits_since_checkpoint() + + 1, + commits_since_log_compaction: self + .read_snapshot + .log_segment() + .commits_since_log_compaction_or_checkpoint() + + 1, + }; + + CommittedTransaction { + transaction: self, + commit_version: version, + post_commit_stats: stats, + } + } + + fn into_conflicted(self, conflict_version: Version) -> ConflictedTransaction { + ConflictedTransaction { + transaction: self, + conflict_version, + } + } } /// WriteContext is data derived from a [`Transaction`] that can be provided to writers in order to @@ -536,18 +551,48 @@ pub struct PostCommitStats { #[derive(Debug)] pub enum CommitResult { /// The transaction was successfully committed. - Committed { - /// the version of the table that was just committed - version: Version, - /// The [`PostCommitStats`] for this transaction - post_commit_stats: PostCommitStats, - }, + CommittedTransaction(CommittedTransaction), /// This transaction conflicted with an existing version (at the version given). The transaction /// is returned so the caller can resolve the conflict (along with the version which /// conflicted). // TODO(zach): in order to make the returning of a transaction useful, we need to add APIs to // update the transaction to a new version etc. - Conflict(Transaction, Version), + ConflictedTransaction(ConflictedTransaction), +} + +#[derive(Debug)] +pub struct CommittedTransaction { + transaction: Transaction, + /// the version of the table that was just committed + commit_version: Version, + /// The [`PostCommitStats`] for this transaction + post_commit_stats: PostCommitStats, +} + +impl CommittedTransaction { + /// The version of the table that was just committed + pub fn version(&self) -> Version { + self.commit_version + } + + /// The [`PostCommitStats`] for this transaction + pub fn post_commit_stats(&self) -> &PostCommitStats { + &self.post_commit_stats + } + + /// Compute a new snapshot for the table at the commit version. Note this is generally more + /// efficient than creating a new snapshot from scratch. + pub fn post_commit_snapshot(&self, engine: &dyn Engine) -> DeltaResult { + Snapshot::builder_from(self.transaction.read_snapshot.clone()) + .at_version(self.commit_version) + .build(engine) + } +} + +#[derive(Debug)] +pub struct ConflictedTransaction { + pub transaction: Transaction, + pub conflict_version: Version, } #[cfg(test)] diff --git a/kernel/tests/row_tracking.rs b/kernel/tests/row_tracking.rs index 10b75d260..0621f4990 100644 --- a/kernel/tests/row_tracking.rs +++ b/kernel/tests/row_tracking.rs @@ -660,22 +660,35 @@ async fn test_row_tracking_parallel_transactions_conflict() -> DeltaResult<()> { // Commit the first transaction - this should succeed let result1 = txn1.commit(engine1.as_ref())?; match result1 { - CommitResult::Committed { version, .. } => { - assert_eq!(version, 1, "First transaction should commit at version 1"); + CommitResult::CommittedTransaction(committed) => { + assert_eq!( + committed.version(), + 1, + "First transaction should commit at version 1" + ); } - CommitResult::Conflict(_, version) => { - panic!("First transaction should not conflict, got conflict at version {version}"); + CommitResult::ConflictedTransaction(conflicted) => { + panic!( + "First transaction should not conflict, got conflict at version {}", + conflicted.conflict_version + ); } } // Commit the second transaction - this should result in a conflict let result2 = txn2.commit(engine2.as_ref())?; match result2 { - CommitResult::Committed { version, .. } => { - panic!("Second transaction should conflict, but got committed at version {version}"); + CommitResult::CommittedTransaction(committed) => { + panic!( + "Second transaction should conflict, but got committed at version {}", + committed.version() + ); } - CommitResult::Conflict(_conflicted_txn, version) => { - assert_eq!(version, 1, "Conflict should be at version 1"); + CommitResult::ConflictedTransaction(conflicted) => { + assert_eq!( + conflicted.conflict_version, 1, + "Conflict should be at version 1" + ); // TODO: In the future, we need to resolve conflicts and retry the commit // For now, we just verify that we got the conflict as expected diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index a46dc5f1b..096728703 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -183,17 +183,14 @@ async fn write_data_and_check_result_and_stats( // commit! match txn.commit(engine.as_ref())? { - CommitResult::Committed { - version, - post_commit_stats, - } => { - assert_eq!(version, expected_since_commit as Version); + CommitResult::CommittedTransaction(committed) => { + assert_eq!(committed.version(), expected_since_commit as Version); assert_eq!( - post_commit_stats.commits_since_checkpoint, + committed.post_commit_stats().commits_since_checkpoint, expected_since_commit ); assert_eq!( - post_commit_stats.commits_since_log_compaction, + committed.post_commit_stats().commits_since_log_compaction, expected_since_commit ); } From d12b52e93db4af83271f5452ded0d305629e739b Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Mon, 29 Sep 2025 13:38:08 -0700 Subject: [PATCH 2/7] clean it up --- ffi/src/transaction/mod.rs | 12 +++--- kernel/examples/write-table/src/main.rs | 49 ++++++++++++++--------- kernel/src/transaction/mod.rs | 43 ++++++++++++++++++-- kernel/tests/row_tracking.rs | 52 +++++++++++++++++++------ kernel/tests/write.rs | 22 ++++++----- 5 files changed, 128 insertions(+), 50 deletions(-) diff --git a/ffi/src/transaction/mod.rs b/ffi/src/transaction/mod.rs index 95ed5e38c..24f6dd3f6 100644 --- a/ffi/src/transaction/mod.rs +++ b/ffi/src/transaction/mod.rs @@ -113,12 +113,12 @@ pub unsafe extern "C" fn commit( // this by making the commit function return the enum somehow. match txn.commit(engine.as_ref()) { Ok(CommitResult::CommittedTransaction(committed)) => Ok(committed.version()), - Ok(CommitResult::ConflictedTransaction(conflicted)) => { - Err(delta_kernel::Error::Generic(format!( - "commit conflict at version {}", - conflicted.conflict_version - ))) - } + Ok(CommitResult::RetryableTransaction(_)) => Err(delta_kernel::Error::unsupported( + "commit failed: retryable transaction not supported in FFI (yet)", + )), + Ok(CommitResult::ConflictedTransaction(conflicted)) => Err(delta_kernel::Error::Generic( + format!("commit conflict at version {}", conflicted.conflict_version), + )), Err(e) => Err(e), } .into_extern_result(&extern_engine) diff --git a/kernel/examples/write-table/src/main.rs b/kernel/examples/write-table/src/main.rs index a8ae2bcea..915658ac9 100644 --- a/kernel/examples/write-table/src/main.rs +++ b/kernel/examples/write-table/src/main.rs @@ -19,7 +19,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType}; -use delta_kernel::transaction::CommitResult; +use delta_kernel::transaction::{CommitResult, RetryableTransaction}; use delta_kernel::{DeltaResult, Engine, Error, Snapshot, SnapshotRef}; /// An example program that writes to a Delta table and creates it if necessary. @@ -102,25 +102,38 @@ async fn try_main() -> DeltaResult<()> { // Add the file metadata to the transaction txn.add_files(file_metadata); - // Commit the transaction - match txn.commit(&engine)? { - CommitResult::CommittedTransaction(committed) => { - let version = committed.version(); - println!("✓ Committed transaction at version {version}"); - println!("✓ Successfully wrote {} rows to the table", cli.num_rows); + // Commit the transaction (in a simple retry loop) + let mut retries = 0; + let committed = loop { + if retries > 5 { + return Err(Error::generic( + "Exceeded maximum 5 retries for committing transaction", + )); + } + txn = match txn.commit(&engine)? { + CommitResult::CommittedTransaction(committed) => break committed, + CommitResult::ConflictedTransaction(conflicted) => { + let conflicting_version = conflicted.conflict_version; + println!("✗ Failed to write data, transaction conflicted with version: {conflicting_version}"); + return Err(Error::generic("Commit failed")); + } + CommitResult::RetryableTransaction(RetryableTransaction { transaction, error }) => { + println!("✗ Failed to commit, retrying... retryable error: {error}"); + transaction + } + }; + retries += 1; + }; - // Read and display the data - read_and_display_data(&url, engine).await?; - println!("✓ Successfully read data from the table"); + let version = committed.version(); + println!("✓ Committed transaction at version {version}"); + println!("✓ Successfully wrote {} rows to the table", cli.num_rows); - Ok(()) - } - CommitResult::ConflictedTransaction(conflicted) => { - let conflicting_version = conflicted.conflict_version; - println!("✗ Failed to write data, transaction conflicted with version: {conflicting_version}"); - Err(Error::generic("Commit failed")) - } - } + // Read and display the data + read_and_display_data(&url, engine).await?; + println!("✓ Successfully read data from the table"); + + Ok(()) } /// Creates a new Delta table or gets an existing one. diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index 65f6be7f5..e6bf16daf 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -169,8 +169,11 @@ impl Transaction { }) } - /// Consume the transaction and commit it to the table. The result is a [CommitResult] which - /// will include the failed transaction in case of a conflict so the user can retry. + /// Consume the transaction and commit it to the table. The result is a result of + /// [CommitResult] with the following semantics: + /// - Ok(CommitResult) for either success or a recoverable error (includes the failed + /// transaction in case of a conflict so the user can retry, etc.) + /// - Err(Error) indicates a non-retryable error (e.g. logic/validation error). pub fn commit(self, engine: &dyn Engine) -> DeltaResult { // Step 1: Check for duplicate app_ids and generate set transactions (`txn`) // Note: The commit info must always be the first action in the commit but we generate it in @@ -233,7 +236,8 @@ impl Transaction { Err(Error::FileAlreadyExists(_)) => Ok(CommitResult::ConflictedTransaction( self.into_conflicted(commit_version), )), - Err(e) => Err(e), + // TODO: we may want to be more selective about what is retryable + Err(e) => Ok(CommitResult::RetryableTransaction(self.into_retryable(e))), } } @@ -500,6 +504,13 @@ impl Transaction { conflict_version, } } + + fn into_retryable(self, error: Error) -> RetryableTransaction { + RetryableTransaction { + transaction: self, + error, + } + } } /// WriteContext is data derived from a [`Transaction`] that can be provided to writers in order to @@ -547,8 +558,17 @@ pub struct PostCommitStats { pub commits_since_log_compaction: u64, } -/// Result of committing a transaction. +/// The result of attempting to commit this transaction. +/// +/// The commit result can be one of the following: +/// - CommittedTransaction: the transaction was successfully committed. post-commit stats and +/// post-commit snapshot can be obtained from the committed transaction. +/// - ConflictedTransaction: the transaction conflicted with an existing version. This transcation +/// must be rebased before retrying. +/// - RetryableTransaction: an IO (retryable) error occurred during the commit. This transaction +/// can be retried without rebasing. #[derive(Debug)] +#[must_use] pub enum CommitResult { /// The transaction was successfully committed. CommittedTransaction(CommittedTransaction), @@ -558,6 +578,15 @@ pub enum CommitResult { // TODO(zach): in order to make the returning of a transaction useful, we need to add APIs to // update the transaction to a new version etc. ConflictedTransaction(ConflictedTransaction), + /// An IO (retryable) error occurred during the commit. + RetryableTransaction(RetryableTransaction), +} + +impl CommitResult { + /// Returns true if the commit was successful. + pub fn is_committed(&self) -> bool { + matches!(self, CommitResult::CommittedTransaction(_)) + } } #[derive(Debug)] @@ -595,6 +624,12 @@ pub struct ConflictedTransaction { pub conflict_version: Version, } +#[derive(Debug)] +pub struct RetryableTransaction { + pub transaction: Transaction, + pub error: Error, +} + #[cfg(test)] mod tests { use super::*; diff --git a/kernel/tests/row_tracking.rs b/kernel/tests/row_tracking.rs index 0621f4990..a75e36b96 100644 --- a/kernel/tests/row_tracking.rs +++ b/kernel/tests/row_tracking.rs @@ -217,7 +217,9 @@ async fn test_row_tracking_append() -> DeltaResult<()> { vec![int32_array(vec![4, 5, 6])], ], )?; - write_data_to_table(&table_url, engine.clone(), data).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data) + .await? + .is_committed()); // Verify the commit was written correctly verify_row_tracking_in_commit( @@ -264,7 +266,9 @@ async fn test_row_tracking_single_record_batches() -> DeltaResult<()> { vec![int32_array(vec![3])], ], )?; - write_data_to_table(&table_url, engine.clone(), data).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data) + .await? + .is_committed()); // Verify the commit was written correctly verify_row_tracking_in_commit( @@ -295,7 +299,9 @@ async fn test_row_tracking_large_batch() -> DeltaResult<()> { // Write a large batch with 1000 records let large_batch: Vec = (1..=1000).collect(); let data = generate_data(schema.clone(), [vec![int32_array(large_batch.clone())]])?; - write_data_to_table(&table_url, engine.clone(), data).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data) + .await? + .is_committed()); // Verify the commit was written correctly verify_row_tracking_in_commit( @@ -342,7 +348,9 @@ async fn test_row_tracking_consecutive_transactions() -> DeltaResult<()> { vec![int32_array(vec![4, 5, 6])], ], )?; - write_data_to_table(&table_url, engine.clone(), data_1).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data_1) + .await? + .is_committed()); // Verify first commit verify_row_tracking_in_commit( @@ -357,7 +365,9 @@ async fn test_row_tracking_consecutive_transactions() -> DeltaResult<()> { // Second transaction: write one batch with 2 records // This should read the existing row tracking domain metadata and assign base row IDs starting from 6 let data_2 = generate_data(schema.clone(), [vec![int32_array(vec![7, 8])]])?; - write_data_to_table(&table_url, engine.clone(), data_2).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data_2) + .await? + .is_committed()); // Verify second commit verify_row_tracking_in_commit( @@ -410,7 +420,9 @@ async fn test_row_tracking_three_consecutive_transactions() -> DeltaResult<()> { ], ], )?; - write_data_to_table(&table_url, engine.clone(), data_1).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data_1) + .await? + .is_committed()); verify_row_tracking_in_commit( &store, @@ -429,7 +441,9 @@ async fn test_row_tracking_three_consecutive_transactions() -> DeltaResult<()> { string_array(vec!["g".to_string(), "h".to_string()]), ]], )?; - write_data_to_table(&table_url, engine.clone(), data_2).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data_2) + .await? + .is_committed()); verify_row_tracking_in_commit( &store, @@ -454,7 +468,9 @@ async fn test_row_tracking_three_consecutive_transactions() -> DeltaResult<()> { ], ], )?; - write_data_to_table(&table_url, engine.clone(), data_3).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data_3) + .await? + .is_committed()); verify_row_tracking_in_commit( &store, @@ -490,7 +506,9 @@ async fn test_row_tracking_with_regular_and_empty_adds() -> DeltaResult<()> { vec![int32_array(vec![4, 5, 6])], ], )?; - write_data_to_table(&table_url, engine.clone(), data).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data) + .await? + .is_committed()); // Verify the commit was written correctly verify_row_tracking_in_commit( @@ -536,7 +554,9 @@ async fn test_row_tracking_with_empty_adds() -> DeltaResult<()> { vec![int32_array(Vec::::new())], ], )?; - write_data_to_table(&table_url, engine.clone(), data).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data) + .await? + .is_committed()); // Verify the commit was written correctly // NB: The expected high water mark is a bit unintuitive here, as we are appending empty batches. @@ -579,7 +599,7 @@ async fn test_row_tracking_without_adds() -> DeltaResult<()> { let txn = snapshot.transaction()?; // Commit without adding any add files - txn.commit(engine.as_ref())?; + assert!(txn.commit(engine.as_ref())?.is_committed()); // Fetch and parse the commit let commit_url = table_url.join(&format!("_delta_log/{:020}.json", 1))?; @@ -673,6 +693,9 @@ async fn test_row_tracking_parallel_transactions_conflict() -> DeltaResult<()> { conflicted.conflict_version ); } + CommitResult::RetryableTransaction(_) => { + panic!("First transaction should not be retryable error"); + } } // Commit the second transaction - this should result in a conflict @@ -693,6 +716,9 @@ async fn test_row_tracking_parallel_transactions_conflict() -> DeltaResult<()> { // TODO: In the future, we need to resolve conflicts and retry the commit // For now, we just verify that we got the conflict as expected } + CommitResult::RetryableTransaction(_) => { + panic!("Second transaction should not be retryable error"); + } } // Verify that the winning transaction is in the log and that it has the correct metadata @@ -758,7 +784,9 @@ async fn test_no_row_tracking_fields_without_feature() -> DeltaResult<()> { )?; // Write data to the table - write_data_to_table(&table_url, engine.clone(), data).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data) + .await? + .is_committed()); // Verify that the commit does NOT contain row tracking fields let commit_url = table_url.join(&format!("_delta_log/{:020}.json", 1))?; diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index 096728703..aea9034cd 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -67,7 +67,7 @@ async fn test_commit_info() -> Result<(), Box> { let txn = snapshot.transaction()?.with_engine_info("default engine"); // commit! - txn.commit(&engine)?; + let _ = txn.commit(&engine)?; let commit1 = store .get(&Path::from(format!( @@ -216,7 +216,7 @@ async fn test_commit_info_action() -> Result<(), Box> { let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; let txn = snapshot.transaction()?.with_engine_info("default engine"); - txn.commit(&engine)?; + let _ = txn.commit(&engine)?; let commit = store .get(&Path::from(format!( @@ -360,7 +360,7 @@ async fn test_no_add_actions() -> Result<(), Box> { let txn = snapshot.transaction()?.with_engine_info("default engine"); // Commit without adding any add files - txn.commit(&engine)?; + assert!(txn.commit(&engine)?.is_committed()); let commit1 = store .get(&Path::from(format!( @@ -474,7 +474,7 @@ async fn test_append_partitioned() -> Result<(), Box> { } // commit! - txn.commit(engine.as_ref())?; + assert!(txn.commit(engine.as_ref())?.is_committed()); let commit1 = store .get(&Path::from(format!( @@ -652,7 +652,7 @@ async fn test_write_txn_actions() -> Result<(), Box> { .with_transaction_id("app_id2".to_string(), 2); // commit! - txn.commit(&engine)?; + assert!(txn.commit(&engine)?.is_committed()); let snapshot = Snapshot::builder_for(table_url.clone()) .at_version(1) @@ -813,7 +813,7 @@ async fn test_append_timestamp_ntz() -> Result<(), Box> { txn.add_files(add_files_metadata); // Commit the transaction - txn.commit(engine.as_ref())?; + assert!(txn.commit(engine.as_ref())?.is_committed()); // Verify the commit was written correctly let commit1 = store @@ -1020,7 +1020,7 @@ async fn test_append_variant() -> Result<(), Box> { txn.add_files(add_files_metadata); // Commit the transaction - txn.commit(engine.as_ref())?; + assert!(txn.commit(engine.as_ref())?.is_committed()); // Verify the commit was written correctly let commit1_url = tmp_test_dir_url @@ -1192,7 +1192,7 @@ async fn test_shredded_variant_read_rejection() -> Result<(), Box Result<(), Box Date: Mon, 6 Oct 2025 13:25:20 -0700 Subject: [PATCH 3/7] remove post commit snapshot for now --- kernel/src/transaction/mod.rs | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index e6bf16daf..d0de6327f 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -19,7 +19,7 @@ use crate::snapshot::SnapshotRef; use crate::utils::current_time_ms; use crate::{ DataType, DeltaResult, Engine, EngineData, Expression, ExpressionRef, IntoEngineData, - RowVisitor, Snapshot, Version, + RowVisitor, Version, }; /// Type alias for an iterator of [`EngineData`] results. @@ -591,6 +591,8 @@ impl CommitResult { #[derive(Debug)] pub struct CommittedTransaction { + // TODO: remove after post-commit snapshot + #[allow(dead_code)] transaction: Transaction, /// the version of the table that was just committed commit_version: Version, @@ -609,13 +611,7 @@ impl CommittedTransaction { &self.post_commit_stats } - /// Compute a new snapshot for the table at the commit version. Note this is generally more - /// efficient than creating a new snapshot from scratch. - pub fn post_commit_snapshot(&self, engine: &dyn Engine) -> DeltaResult { - Snapshot::builder_from(self.transaction.read_snapshot.clone()) - .at_version(self.commit_version) - .build(engine) - } + // TODO: post-commit snapshot } #[derive(Debug)] From e568469019ccaf917b8ab97d17f908bc63c44b17 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Mon, 6 Oct 2025 16:05:15 -0700 Subject: [PATCH 4/7] cleanup --- ffi/src/transaction/mod.rs | 2 +- kernel/examples/write-table/src/main.rs | 2 +- kernel/src/transaction/mod.rs | 49 +++++++++++++++++++------ kernel/tests/row_tracking.rs | 4 +- kernel/tests/write.rs | 2 +- 5 files changed, 43 insertions(+), 16 deletions(-) diff --git a/ffi/src/transaction/mod.rs b/ffi/src/transaction/mod.rs index 24f6dd3f6..9ca9a0405 100644 --- a/ffi/src/transaction/mod.rs +++ b/ffi/src/transaction/mod.rs @@ -112,7 +112,7 @@ pub unsafe extern "C" fn commit( // TODO: for now this removes the enum, which prevents doing any conflict resolution. We should fix // this by making the commit function return the enum somehow. match txn.commit(engine.as_ref()) { - Ok(CommitResult::CommittedTransaction(committed)) => Ok(committed.version()), + Ok(CommitResult::CommittedTransaction(committed)) => Ok(committed.commit_version()), Ok(CommitResult::RetryableTransaction(_)) => Err(delta_kernel::Error::unsupported( "commit failed: retryable transaction not supported in FFI (yet)", )), diff --git a/kernel/examples/write-table/src/main.rs b/kernel/examples/write-table/src/main.rs index 915658ac9..063e8e377 100644 --- a/kernel/examples/write-table/src/main.rs +++ b/kernel/examples/write-table/src/main.rs @@ -125,7 +125,7 @@ async fn try_main() -> DeltaResult<()> { retries += 1; }; - let version = committed.version(); + let version = committed.commit_version(); println!("✓ Committed transaction at version {version}"); println!("✓ Successfully wrote {} rows to the table", cli.num_rows); diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index d0de6327f..bbf241176 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -558,21 +558,24 @@ pub struct PostCommitStats { pub commits_since_log_compaction: u64, } -/// The result of attempting to commit this transaction. +/// The result of attempting to commit this transaction. If the commit was +/// successful/conflicted/retryable, the result is Ok(CommitResult), otherwise, if a nonrecoverable +/// error occurred, the result is Err(Error). /// /// The commit result can be one of the following: -/// - CommittedTransaction: the transaction was successfully committed. post-commit stats and -/// post-commit snapshot can be obtained from the committed transaction. -/// - ConflictedTransaction: the transaction conflicted with an existing version. This transcation -/// must be rebased before retrying. -/// - RetryableTransaction: an IO (retryable) error occurred during the commit. This transaction +/// - [CommittedTransaction]: the transaction was successfully committed. [PostCommitStats] and +/// in the future a post-commit snapshot can be obtained from the committed transaction. +/// - [ConflictedTransaction]: the transaction conflicted with an existing version. This transcation +/// must be rebased before retrying. (currently no rebase APIs exist, caller must create new txn) +/// - [RetryableTransaction]: an IO (retryable) error occurred during the commit. This transaction /// can be retried without rebasing. #[derive(Debug)] #[must_use] pub enum CommitResult { /// The transaction was successfully committed. CommittedTransaction(CommittedTransaction), - /// This transaction conflicted with an existing version (at the version given). The transaction + /// This transaction conflicted with an existing version (see + /// [ConflictedTransaction::conflict_version]). The transaction /// is returned so the caller can resolve the conflict (along with the version which /// conflicted). // TODO(zach): in order to make the returning of a transaction useful, we need to add APIs to @@ -589,6 +592,11 @@ impl CommitResult { } } +/// This is the result of a successfully committed [Transaction]. One can retrieve the +/// [PostCommitStats] and [commit version] from this struct. In the future a post-commit snapshot +/// can be obtained as well. +/// +/// [commit version]: Self::commit_version #[derive(Debug)] pub struct CommittedTransaction { // TODO: remove after post-commit snapshot @@ -601,8 +609,8 @@ pub struct CommittedTransaction { } impl CommittedTransaction { - /// The version of the table that was just committed - pub fn version(&self) -> Version { + /// The version of the table that was just sucessfully committed + pub fn commit_version(&self) -> Version { self.commit_version } @@ -614,15 +622,34 @@ impl CommittedTransaction { // TODO: post-commit snapshot } +/// This is the result of a [Transaction]. One can retrieve the +/// [PostCommitStats] and [commit version] from this struct. In the future a post-commit snapshot +/// can be obtained as well. +/// +/// [commit version]: Self::commit_version #[derive(Debug)] pub struct ConflictedTransaction { - pub transaction: Transaction, - pub conflict_version: Version, + // TODO: remove after rebase APIs + #[allow(dead_code)] + transaction: Transaction, + conflict_version: Version, +} + +impl ConflictedTransaction { + /// The version attempted commit that yielded a conflict + pub fn conflict_version(&self) -> Version { + self.conflict_version + } } +/// A transaction that failed to commit due to a retryable error (e.g. IO error). The transaction +/// can be recovered with `RetryableTransaction::transaction` and retried without rebasing. The +/// associated error can be inspected via `RetryableTransaction::error`. #[derive(Debug)] pub struct RetryableTransaction { + /// The transaction that failed to commit due to a retryable error. pub transaction: Transaction, + /// Transient error that caused the commit to fail. pub error: Error, } diff --git a/kernel/tests/row_tracking.rs b/kernel/tests/row_tracking.rs index a75e36b96..c62ad768b 100644 --- a/kernel/tests/row_tracking.rs +++ b/kernel/tests/row_tracking.rs @@ -682,7 +682,7 @@ async fn test_row_tracking_parallel_transactions_conflict() -> DeltaResult<()> { match result1 { CommitResult::CommittedTransaction(committed) => { assert_eq!( - committed.version(), + committed.commit_version(), 1, "First transaction should commit at version 1" ); @@ -704,7 +704,7 @@ async fn test_row_tracking_parallel_transactions_conflict() -> DeltaResult<()> { CommitResult::CommittedTransaction(committed) => { panic!( "Second transaction should conflict, but got committed at version {}", - committed.version() + committed.commit_version() ); } CommitResult::ConflictedTransaction(conflicted) => { diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index aea9034cd..d4878aae4 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -184,7 +184,7 @@ async fn write_data_and_check_result_and_stats( // commit! match txn.commit(engine.as_ref())? { CommitResult::CommittedTransaction(committed) => { - assert_eq!(committed.version(), expected_since_commit as Version); + assert_eq!(committed.commit_version(), expected_since_commit as Version); assert_eq!( committed.post_commit_stats().commits_since_checkpoint, expected_since_commit From 84e877a6b9013db81a36775e38f718d54a4f6330 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Mon, 6 Oct 2025 16:11:59 -0700 Subject: [PATCH 5/7] cleanup --- ffi/src/transaction/mod.rs | 9 ++++++--- kernel/examples/write-table/src/main.rs | 2 +- kernel/tests/row_tracking.rs | 5 +++-- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/ffi/src/transaction/mod.rs b/ffi/src/transaction/mod.rs index 9ca9a0405..7a22d835d 100644 --- a/ffi/src/transaction/mod.rs +++ b/ffi/src/transaction/mod.rs @@ -116,9 +116,12 @@ pub unsafe extern "C" fn commit( Ok(CommitResult::RetryableTransaction(_)) => Err(delta_kernel::Error::unsupported( "commit failed: retryable transaction not supported in FFI (yet)", )), - Ok(CommitResult::ConflictedTransaction(conflicted)) => Err(delta_kernel::Error::Generic( - format!("commit conflict at version {}", conflicted.conflict_version), - )), + Ok(CommitResult::ConflictedTransaction(conflicted)) => { + Err(delta_kernel::Error::Generic(format!( + "commit conflict at version {}", + conflicted.conflict_version() + ))) + } Err(e) => Err(e), } .into_extern_result(&extern_engine) diff --git a/kernel/examples/write-table/src/main.rs b/kernel/examples/write-table/src/main.rs index 063e8e377..8e6075ce8 100644 --- a/kernel/examples/write-table/src/main.rs +++ b/kernel/examples/write-table/src/main.rs @@ -113,7 +113,7 @@ async fn try_main() -> DeltaResult<()> { txn = match txn.commit(&engine)? { CommitResult::CommittedTransaction(committed) => break committed, CommitResult::ConflictedTransaction(conflicted) => { - let conflicting_version = conflicted.conflict_version; + let conflicting_version = conflicted.conflict_version(); println!("✗ Failed to write data, transaction conflicted with version: {conflicting_version}"); return Err(Error::generic("Commit failed")); } diff --git a/kernel/tests/row_tracking.rs b/kernel/tests/row_tracking.rs index c62ad768b..8a2887673 100644 --- a/kernel/tests/row_tracking.rs +++ b/kernel/tests/row_tracking.rs @@ -690,7 +690,7 @@ async fn test_row_tracking_parallel_transactions_conflict() -> DeltaResult<()> { CommitResult::ConflictedTransaction(conflicted) => { panic!( "First transaction should not conflict, got conflict at version {}", - conflicted.conflict_version + conflicted.conflict_version() ); } CommitResult::RetryableTransaction(_) => { @@ -709,7 +709,8 @@ async fn test_row_tracking_parallel_transactions_conflict() -> DeltaResult<()> { } CommitResult::ConflictedTransaction(conflicted) => { assert_eq!( - conflicted.conflict_version, 1, + conflicted.conflict_version(), + 1, "Conflict should be at version 1" ); From c33d9b4b353ca7d5d89a8787ac9287be7d490be6 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Thu, 9 Oct 2025 11:07:24 -0700 Subject: [PATCH 6/7] clean --- kernel/src/transaction/mod.rs | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index bbf241176..d722c7763 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -236,8 +236,12 @@ impl Transaction { Err(Error::FileAlreadyExists(_)) => Ok(CommitResult::ConflictedTransaction( self.into_conflicted(commit_version), )), - // TODO: we may want to be more selective about what is retryable - Err(e) => Ok(CommitResult::RetryableTransaction(self.into_retryable(e))), + // TODO: we may want to be more or less selective about what is retryable (this is tied + // to the idea of "what kind of Errors should write_json_file return?") + Err(e @ Error::IOError(_)) => { + Ok(CommitResult::RetryableTransaction(self.into_retryable(e))) + } + Err(e) => Err(e), } } @@ -622,11 +626,10 @@ impl CommittedTransaction { // TODO: post-commit snapshot } -/// This is the result of a [Transaction]. One can retrieve the -/// [PostCommitStats] and [commit version] from this struct. In the future a post-commit snapshot -/// can be obtained as well. +/// This is the result of a conflicted [Transaction]. One can retrieve the [conflict version] from +/// this struct. In the future a rebase API will be provided. /// -/// [commit version]: Self::commit_version +/// [conflict version]: Self::conflict_version #[derive(Debug)] pub struct ConflictedTransaction { // TODO: remove after rebase APIs From c677c5f7797fe7d059677a4efd2ad4974c9b9fb9 Mon Sep 17 00:00:00 2001 From: Zach Schuermann Date: Thu, 9 Oct 2025 11:51:23 -0700 Subject: [PATCH 7/7] comments --- kernel/src/transaction/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index d722c7763..c93fdcb61 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -169,7 +169,7 @@ impl Transaction { }) } - /// Consume the transaction and commit it to the table. The result is a result of + /// Consume the transaction and commit it to the table. The result is a result of /// [CommitResult] with the following semantics: /// - Ok(CommitResult) for either success or a recoverable error (includes the failed /// transaction in case of a conflict so the user can retry, etc.) @@ -623,11 +623,11 @@ impl CommittedTransaction { &self.post_commit_stats } - // TODO: post-commit snapshot + // TODO(#916): post-commit snapshot } /// This is the result of a conflicted [Transaction]. One can retrieve the [conflict version] from -/// this struct. In the future a rebase API will be provided. +/// this struct. In the future a rebase API will be provided (issue #1389). /// /// [conflict version]: Self::conflict_version #[derive(Debug)]