diff --git a/ffi/src/transaction/mod.rs b/ffi/src/transaction/mod.rs index 8bb3681b0..7a22d835d 100644 --- a/ffi/src/transaction/mod.rs +++ b/ffi/src/transaction/mod.rs @@ -112,13 +112,16 @@ pub unsafe extern "C" fn commit( // TODO: for now this removes the enum, which prevents doing any conflict resolution. We should fix // this by making the commit function return the enum somehow. match txn.commit(engine.as_ref()) { - Ok(CommitResult::Committed { - version: v, - post_commit_stats: _, - }) => Ok(v), - Ok(CommitResult::Conflict(_, v)) => Err(delta_kernel::Error::Generic(format!( - "commit conflict at version {v}" - ))), + Ok(CommitResult::CommittedTransaction(committed)) => Ok(committed.commit_version()), + Ok(CommitResult::RetryableTransaction(_)) => Err(delta_kernel::Error::unsupported( + "commit failed: retryable transaction not supported in FFI (yet)", + )), + Ok(CommitResult::ConflictedTransaction(conflicted)) => { + Err(delta_kernel::Error::Generic(format!( + "commit conflict at version {}", + conflicted.conflict_version() + ))) + } Err(e) => Err(e), } .into_extern_result(&extern_engine) diff --git a/kernel/examples/write-table/src/main.rs b/kernel/examples/write-table/src/main.rs index 729e9eabd..8e6075ce8 100644 --- a/kernel/examples/write-table/src/main.rs +++ b/kernel/examples/write-table/src/main.rs @@ -19,7 +19,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData; use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor; use delta_kernel::engine::default::DefaultEngine; use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType}; -use delta_kernel::transaction::CommitResult; +use delta_kernel::transaction::{CommitResult, RetryableTransaction}; use delta_kernel::{DeltaResult, Engine, Error, Snapshot, SnapshotRef}; /// An example program that writes to a Delta table and creates it if necessary. @@ -102,23 +102,38 @@ async fn try_main() -> DeltaResult<()> { // Add the file metadata to the transaction txn.add_files(file_metadata); - // Commit the transaction - match txn.commit(&engine)? { - CommitResult::Committed { version, .. } => { - println!("✓ Committed transaction at version {version}"); - println!("✓ Successfully wrote {} rows to the table", cli.num_rows); + // Commit the transaction (in a simple retry loop) + let mut retries = 0; + let committed = loop { + if retries > 5 { + return Err(Error::generic( + "Exceeded maximum 5 retries for committing transaction", + )); + } + txn = match txn.commit(&engine)? { + CommitResult::CommittedTransaction(committed) => break committed, + CommitResult::ConflictedTransaction(conflicted) => { + let conflicting_version = conflicted.conflict_version(); + println!("✗ Failed to write data, transaction conflicted with version: {conflicting_version}"); + return Err(Error::generic("Commit failed")); + } + CommitResult::RetryableTransaction(RetryableTransaction { transaction, error }) => { + println!("✗ Failed to commit, retrying... retryable error: {error}"); + transaction + } + }; + retries += 1; + }; - // Read and display the data - read_and_display_data(&url, engine).await?; - println!("✓ Successfully read data from the table"); + let version = committed.commit_version(); + println!("✓ Committed transaction at version {version}"); + println!("✓ Successfully wrote {} rows to the table", cli.num_rows); - Ok(()) - } - CommitResult::Conflict(_, conflicting_version) => { - println!("✗ Failed to write data, transaction conflicted with version: {conflicting_version}"); - Err(Error::generic("Commit failed")) - } - } + // Read and display the data + read_and_display_data(&url, engine).await?; + println!("✓ Successfully read data from the table"); + + Ok(()) } /// Creates a new Delta table or gets an existing one. diff --git a/kernel/src/transaction/mod.rs b/kernel/src/transaction/mod.rs index df24c75ef..d5558fc4e 100644 --- a/kernel/src/transaction/mod.rs +++ b/kernel/src/transaction/mod.rs @@ -169,8 +169,11 @@ impl Transaction { }) } - /// Consume the transaction and commit it to the table. The result is a [CommitResult] which - /// will include the failed transaction in case of a conflict so the user can retry. + /// Consume the transaction and commit it to the table. The result is a result of + /// [CommitResult] with the following semantics: + /// - Ok(CommitResult) for either success or a recoverable error (includes the failed + /// transaction in case of a conflict so the user can retry, etc.) + /// - Err(Error) indicates a non-retryable error (e.g. logic/validation error). pub fn commit(self, engine: &dyn Engine) -> DeltaResult { // Step 1: Check for duplicate app_ids and generate set transactions (`txn`) // Note: The commit info must always be the first action in the commit but we generate it in @@ -227,22 +230,17 @@ impl Transaction { let json_handler = engine.json_handler(); match json_handler.write_json_file(&commit_path.location, Box::new(filtered_actions), false) { - Ok(()) => Ok(CommitResult::Committed { - version: commit_version, - post_commit_stats: PostCommitStats { - commits_since_checkpoint: self - .read_snapshot - .log_segment() - .commits_since_checkpoint() - + 1, - commits_since_log_compaction: self - .read_snapshot - .log_segment() - .commits_since_log_compaction_or_checkpoint() - + 1, - }, - }), - Err(Error::FileAlreadyExists(_)) => Ok(CommitResult::Conflict(self, commit_version)), + Ok(()) => Ok(CommitResult::CommittedTransaction( + self.into_committed(commit_version), + )), + Err(Error::FileAlreadyExists(_)) => Ok(CommitResult::ConflictedTransaction( + self.into_conflicted(commit_version), + )), + // TODO: we may want to be more or less selective about what is retryable (this is tied + // to the idea of "what kind of Errors should write_json_file return?") + Err(e @ Error::IOError(_)) => { + Ok(CommitResult::RetryableTransaction(self.into_retryable(e))) + } Err(e) => Err(e), } } @@ -489,6 +487,38 @@ impl Transaction { Ok((Box::new(add_actions), None)) } } + + fn into_committed(self, version: Version) -> CommittedTransaction { + let stats = PostCommitStats { + commits_since_checkpoint: self.read_snapshot.log_segment().commits_since_checkpoint() + + 1, + commits_since_log_compaction: self + .read_snapshot + .log_segment() + .commits_since_log_compaction_or_checkpoint() + + 1, + }; + + CommittedTransaction { + transaction: self, + commit_version: version, + post_commit_stats: stats, + } + } + + fn into_conflicted(self, conflict_version: Version) -> ConflictedTransaction { + ConflictedTransaction { + transaction: self, + conflict_version, + } + } + + fn into_retryable(self, error: Error) -> RetryableTransaction { + RetryableTransaction { + transaction: self, + error, + } + } } /// WriteContext is data derived from a [`Transaction`] that can be provided to writers in order to @@ -536,22 +566,98 @@ pub struct PostCommitStats { pub commits_since_log_compaction: u64, } -/// Result of committing a transaction. +/// The result of attempting to commit this transaction. If the commit was +/// successful/conflicted/retryable, the result is Ok(CommitResult), otherwise, if a nonrecoverable +/// error occurred, the result is Err(Error). +/// +/// The commit result can be one of the following: +/// - [CommittedTransaction]: the transaction was successfully committed. [PostCommitStats] and +/// in the future a post-commit snapshot can be obtained from the committed transaction. +/// - [ConflictedTransaction]: the transaction conflicted with an existing version. This transcation +/// must be rebased before retrying. (currently no rebase APIs exist, caller must create new txn) +/// - [RetryableTransaction]: an IO (retryable) error occurred during the commit. This transaction +/// can be retried without rebasing. #[derive(Debug)] +#[must_use] pub enum CommitResult { /// The transaction was successfully committed. - Committed { - /// the version of the table that was just committed - version: Version, - /// The [`PostCommitStats`] for this transaction - post_commit_stats: PostCommitStats, - }, - /// This transaction conflicted with an existing version (at the version given). The transaction + CommittedTransaction(CommittedTransaction), + /// This transaction conflicted with an existing version (see + /// [ConflictedTransaction::conflict_version]). The transaction /// is returned so the caller can resolve the conflict (along with the version which /// conflicted). // TODO(zach): in order to make the returning of a transaction useful, we need to add APIs to // update the transaction to a new version etc. - Conflict(Transaction, Version), + ConflictedTransaction(ConflictedTransaction), + /// An IO (retryable) error occurred during the commit. + RetryableTransaction(RetryableTransaction), +} + +impl CommitResult { + /// Returns true if the commit was successful. + pub fn is_committed(&self) -> bool { + matches!(self, CommitResult::CommittedTransaction(_)) + } +} + +/// This is the result of a successfully committed [Transaction]. One can retrieve the +/// [PostCommitStats] and [commit version] from this struct. In the future a post-commit snapshot +/// can be obtained as well. +/// +/// [commit version]: Self::commit_version +#[derive(Debug)] +pub struct CommittedTransaction { + // TODO: remove after post-commit snapshot + #[allow(dead_code)] + transaction: Transaction, + /// the version of the table that was just committed + commit_version: Version, + /// The [`PostCommitStats`] for this transaction + post_commit_stats: PostCommitStats, +} + +impl CommittedTransaction { + /// The version of the table that was just sucessfully committed + pub fn commit_version(&self) -> Version { + self.commit_version + } + + /// The [`PostCommitStats`] for this transaction + pub fn post_commit_stats(&self) -> &PostCommitStats { + &self.post_commit_stats + } + + // TODO(#916): post-commit snapshot +} + +/// This is the result of a conflicted [Transaction]. One can retrieve the [conflict version] from +/// this struct. In the future a rebase API will be provided (issue #1389). +/// +/// [conflict version]: Self::conflict_version +#[derive(Debug)] +pub struct ConflictedTransaction { + // TODO: remove after rebase APIs + #[allow(dead_code)] + transaction: Transaction, + conflict_version: Version, +} + +impl ConflictedTransaction { + /// The version attempted commit that yielded a conflict + pub fn conflict_version(&self) -> Version { + self.conflict_version + } +} + +/// A transaction that failed to commit due to a retryable error (e.g. IO error). The transaction +/// can be recovered with `RetryableTransaction::transaction` and retried without rebasing. The +/// associated error can be inspected via `RetryableTransaction::error`. +#[derive(Debug)] +pub struct RetryableTransaction { + /// The transaction that failed to commit due to a retryable error. + pub transaction: Transaction, + /// Transient error that caused the commit to fail. + pub error: Error, } #[cfg(test)] diff --git a/kernel/tests/row_tracking.rs b/kernel/tests/row_tracking.rs index 10b75d260..8a2887673 100644 --- a/kernel/tests/row_tracking.rs +++ b/kernel/tests/row_tracking.rs @@ -217,7 +217,9 @@ async fn test_row_tracking_append() -> DeltaResult<()> { vec![int32_array(vec![4, 5, 6])], ], )?; - write_data_to_table(&table_url, engine.clone(), data).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data) + .await? + .is_committed()); // Verify the commit was written correctly verify_row_tracking_in_commit( @@ -264,7 +266,9 @@ async fn test_row_tracking_single_record_batches() -> DeltaResult<()> { vec![int32_array(vec![3])], ], )?; - write_data_to_table(&table_url, engine.clone(), data).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data) + .await? + .is_committed()); // Verify the commit was written correctly verify_row_tracking_in_commit( @@ -295,7 +299,9 @@ async fn test_row_tracking_large_batch() -> DeltaResult<()> { // Write a large batch with 1000 records let large_batch: Vec = (1..=1000).collect(); let data = generate_data(schema.clone(), [vec![int32_array(large_batch.clone())]])?; - write_data_to_table(&table_url, engine.clone(), data).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data) + .await? + .is_committed()); // Verify the commit was written correctly verify_row_tracking_in_commit( @@ -342,7 +348,9 @@ async fn test_row_tracking_consecutive_transactions() -> DeltaResult<()> { vec![int32_array(vec![4, 5, 6])], ], )?; - write_data_to_table(&table_url, engine.clone(), data_1).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data_1) + .await? + .is_committed()); // Verify first commit verify_row_tracking_in_commit( @@ -357,7 +365,9 @@ async fn test_row_tracking_consecutive_transactions() -> DeltaResult<()> { // Second transaction: write one batch with 2 records // This should read the existing row tracking domain metadata and assign base row IDs starting from 6 let data_2 = generate_data(schema.clone(), [vec![int32_array(vec![7, 8])]])?; - write_data_to_table(&table_url, engine.clone(), data_2).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data_2) + .await? + .is_committed()); // Verify second commit verify_row_tracking_in_commit( @@ -410,7 +420,9 @@ async fn test_row_tracking_three_consecutive_transactions() -> DeltaResult<()> { ], ], )?; - write_data_to_table(&table_url, engine.clone(), data_1).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data_1) + .await? + .is_committed()); verify_row_tracking_in_commit( &store, @@ -429,7 +441,9 @@ async fn test_row_tracking_three_consecutive_transactions() -> DeltaResult<()> { string_array(vec!["g".to_string(), "h".to_string()]), ]], )?; - write_data_to_table(&table_url, engine.clone(), data_2).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data_2) + .await? + .is_committed()); verify_row_tracking_in_commit( &store, @@ -454,7 +468,9 @@ async fn test_row_tracking_three_consecutive_transactions() -> DeltaResult<()> { ], ], )?; - write_data_to_table(&table_url, engine.clone(), data_3).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data_3) + .await? + .is_committed()); verify_row_tracking_in_commit( &store, @@ -490,7 +506,9 @@ async fn test_row_tracking_with_regular_and_empty_adds() -> DeltaResult<()> { vec![int32_array(vec![4, 5, 6])], ], )?; - write_data_to_table(&table_url, engine.clone(), data).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data) + .await? + .is_committed()); // Verify the commit was written correctly verify_row_tracking_in_commit( @@ -536,7 +554,9 @@ async fn test_row_tracking_with_empty_adds() -> DeltaResult<()> { vec![int32_array(Vec::::new())], ], )?; - write_data_to_table(&table_url, engine.clone(), data).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data) + .await? + .is_committed()); // Verify the commit was written correctly // NB: The expected high water mark is a bit unintuitive here, as we are appending empty batches. @@ -579,7 +599,7 @@ async fn test_row_tracking_without_adds() -> DeltaResult<()> { let txn = snapshot.transaction()?; // Commit without adding any add files - txn.commit(engine.as_ref())?; + assert!(txn.commit(engine.as_ref())?.is_committed()); // Fetch and parse the commit let commit_url = table_url.join(&format!("_delta_log/{:020}.json", 1))?; @@ -660,26 +680,46 @@ async fn test_row_tracking_parallel_transactions_conflict() -> DeltaResult<()> { // Commit the first transaction - this should succeed let result1 = txn1.commit(engine1.as_ref())?; match result1 { - CommitResult::Committed { version, .. } => { - assert_eq!(version, 1, "First transaction should commit at version 1"); + CommitResult::CommittedTransaction(committed) => { + assert_eq!( + committed.commit_version(), + 1, + "First transaction should commit at version 1" + ); } - CommitResult::Conflict(_, version) => { - panic!("First transaction should not conflict, got conflict at version {version}"); + CommitResult::ConflictedTransaction(conflicted) => { + panic!( + "First transaction should not conflict, got conflict at version {}", + conflicted.conflict_version() + ); + } + CommitResult::RetryableTransaction(_) => { + panic!("First transaction should not be retryable error"); } } // Commit the second transaction - this should result in a conflict let result2 = txn2.commit(engine2.as_ref())?; match result2 { - CommitResult::Committed { version, .. } => { - panic!("Second transaction should conflict, but got committed at version {version}"); + CommitResult::CommittedTransaction(committed) => { + panic!( + "Second transaction should conflict, but got committed at version {}", + committed.commit_version() + ); } - CommitResult::Conflict(_conflicted_txn, version) => { - assert_eq!(version, 1, "Conflict should be at version 1"); + CommitResult::ConflictedTransaction(conflicted) => { + assert_eq!( + conflicted.conflict_version(), + 1, + "Conflict should be at version 1" + ); // TODO: In the future, we need to resolve conflicts and retry the commit // For now, we just verify that we got the conflict as expected } + CommitResult::RetryableTransaction(_) => { + panic!("Second transaction should not be retryable error"); + } } // Verify that the winning transaction is in the log and that it has the correct metadata @@ -745,7 +785,9 @@ async fn test_no_row_tracking_fields_without_feature() -> DeltaResult<()> { )?; // Write data to the table - write_data_to_table(&table_url, engine.clone(), data).await?; + assert!(write_data_to_table(&table_url, engine.clone(), data) + .await? + .is_committed()); // Verify that the commit does NOT contain row tracking fields let commit_url = table_url.join(&format!("_delta_log/{:020}.json", 1))?; diff --git a/kernel/tests/write.rs b/kernel/tests/write.rs index a46dc5f1b..d4878aae4 100644 --- a/kernel/tests/write.rs +++ b/kernel/tests/write.rs @@ -67,7 +67,7 @@ async fn test_commit_info() -> Result<(), Box> { let txn = snapshot.transaction()?.with_engine_info("default engine"); // commit! - txn.commit(&engine)?; + let _ = txn.commit(&engine)?; let commit1 = store .get(&Path::from(format!( @@ -183,17 +183,14 @@ async fn write_data_and_check_result_and_stats( // commit! match txn.commit(engine.as_ref())? { - CommitResult::Committed { - version, - post_commit_stats, - } => { - assert_eq!(version, expected_since_commit as Version); + CommitResult::CommittedTransaction(committed) => { + assert_eq!(committed.commit_version(), expected_since_commit as Version); assert_eq!( - post_commit_stats.commits_since_checkpoint, + committed.post_commit_stats().commits_since_checkpoint, expected_since_commit ); assert_eq!( - post_commit_stats.commits_since_log_compaction, + committed.post_commit_stats().commits_since_log_compaction, expected_since_commit ); } @@ -219,7 +216,7 @@ async fn test_commit_info_action() -> Result<(), Box> { let snapshot = Snapshot::builder_for(table_url.clone()).build(&engine)?; let txn = snapshot.transaction()?.with_engine_info("default engine"); - txn.commit(&engine)?; + let _ = txn.commit(&engine)?; let commit = store .get(&Path::from(format!( @@ -363,7 +360,7 @@ async fn test_no_add_actions() -> Result<(), Box> { let txn = snapshot.transaction()?.with_engine_info("default engine"); // Commit without adding any add files - txn.commit(&engine)?; + assert!(txn.commit(&engine)?.is_committed()); let commit1 = store .get(&Path::from(format!( @@ -477,7 +474,7 @@ async fn test_append_partitioned() -> Result<(), Box> { } // commit! - txn.commit(engine.as_ref())?; + assert!(txn.commit(engine.as_ref())?.is_committed()); let commit1 = store .get(&Path::from(format!( @@ -655,7 +652,7 @@ async fn test_write_txn_actions() -> Result<(), Box> { .with_transaction_id("app_id2".to_string(), 2); // commit! - txn.commit(&engine)?; + assert!(txn.commit(&engine)?.is_committed()); let snapshot = Snapshot::builder_for(table_url.clone()) .at_version(1) @@ -816,7 +813,7 @@ async fn test_append_timestamp_ntz() -> Result<(), Box> { txn.add_files(add_files_metadata); // Commit the transaction - txn.commit(engine.as_ref())?; + assert!(txn.commit(engine.as_ref())?.is_committed()); // Verify the commit was written correctly let commit1 = store @@ -1023,7 +1020,7 @@ async fn test_append_variant() -> Result<(), Box> { txn.add_files(add_files_metadata); // Commit the transaction - txn.commit(engine.as_ref())?; + assert!(txn.commit(engine.as_ref())?.is_committed()); // Verify the commit was written correctly let commit1_url = tmp_test_dir_url @@ -1195,7 +1192,7 @@ async fn test_shredded_variant_read_rejection() -> Result<(), Box Result<(), Box