Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 10 additions & 7 deletions ffi/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,16 @@ pub unsafe extern "C" fn commit(
// TODO: for now this removes the enum, which prevents doing any conflict resolution. We should fix
// this by making the commit function return the enum somehow.
match txn.commit(engine.as_ref()) {
Ok(CommitResult::Committed {
version: v,
post_commit_stats: _,
}) => Ok(v),
Ok(CommitResult::Conflict(_, v)) => Err(delta_kernel::Error::Generic(format!(
"commit conflict at version {v}"
))),
Ok(CommitResult::CommittedTransaction(committed)) => Ok(committed.commit_version()),
Ok(CommitResult::RetryableTransaction(_)) => Err(delta_kernel::Error::unsupported(
"commit failed: retryable transaction not supported in FFI (yet)",
)),
Ok(CommitResult::ConflictedTransaction(conflicted)) => {
Err(delta_kernel::Error::Generic(format!(
"commit conflict at version {}",
conflicted.conflict_version()
)))
}
Err(e) => Err(e),
}
.into_extern_result(&extern_engine)
Expand Down
47 changes: 31 additions & 16 deletions kernel/examples/write-table/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::engine::default::executor::tokio::TokioBackgroundExecutor;
use delta_kernel::engine::default::DefaultEngine;
use delta_kernel::schema::{DataType, SchemaRef, StructField, StructType};
use delta_kernel::transaction::CommitResult;
use delta_kernel::transaction::{CommitResult, RetryableTransaction};
use delta_kernel::{DeltaResult, Engine, Error, Snapshot, SnapshotRef};

/// An example program that writes to a Delta table and creates it if necessary.
Expand Down Expand Up @@ -102,23 +102,38 @@ async fn try_main() -> DeltaResult<()> {
// Add the file metadata to the transaction
txn.add_files(file_metadata);

// Commit the transaction
match txn.commit(&engine)? {
CommitResult::Committed { version, .. } => {
println!("✓ Committed transaction at version {version}");
println!("✓ Successfully wrote {} rows to the table", cli.num_rows);
// Commit the transaction (in a simple retry loop)
let mut retries = 0;
let committed = loop {
if retries > 5 {
return Err(Error::generic(
"Exceeded maximum 5 retries for committing transaction",
));
}
txn = match txn.commit(&engine)? {
CommitResult::CommittedTransaction(committed) => break committed,
CommitResult::ConflictedTransaction(conflicted) => {
let conflicting_version = conflicted.conflict_version();
println!("✗ Failed to write data, transaction conflicted with version: {conflicting_version}");
return Err(Error::generic("Commit failed"));
}
CommitResult::RetryableTransaction(RetryableTransaction { transaction, error }) => {
println!("✗ Failed to commit, retrying... retryable error: {error}");
transaction
}
};
retries += 1;
};

// Read and display the data
read_and_display_data(&url, engine).await?;
println!("✓ Successfully read data from the table");
let version = committed.commit_version();
println!("✓ Committed transaction at version {version}");
println!("✓ Successfully wrote {} rows to the table", cli.num_rows);

Ok(())
}
CommitResult::Conflict(_, conflicting_version) => {
println!("✗ Failed to write data, transaction conflicted with version: {conflicting_version}");
Err(Error::generic("Commit failed"))
}
}
// Read and display the data
read_and_display_data(&url, engine).await?;
println!("✓ Successfully read data from the table");

Ok(())
}

/// Creates a new Delta table or gets an existing one.
Expand Down
160 changes: 133 additions & 27 deletions kernel/src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,11 @@ impl Transaction {
})
}

/// Consume the transaction and commit it to the table. The result is a [CommitResult] which
/// will include the failed transaction in case of a conflict so the user can retry.
/// Consume the transaction and commit it to the table. The result is a result of
/// [CommitResult] with the following semantics:
/// - Ok(CommitResult) for either success or a recoverable error (includes the failed
/// transaction in case of a conflict so the user can retry, etc.)
/// - Err(Error) indicates a non-retryable error (e.g. logic/validation error).
pub fn commit(self, engine: &dyn Engine) -> DeltaResult<CommitResult> {
// Step 1: Check for duplicate app_ids and generate set transactions (`txn`)
// Note: The commit info must always be the first action in the commit but we generate it in
Expand Down Expand Up @@ -227,22 +230,17 @@ impl Transaction {
let json_handler = engine.json_handler();
match json_handler.write_json_file(&commit_path.location, Box::new(filtered_actions), false)
{
Ok(()) => Ok(CommitResult::Committed {
version: commit_version,
post_commit_stats: PostCommitStats {
commits_since_checkpoint: self
.read_snapshot
.log_segment()
.commits_since_checkpoint()
+ 1,
commits_since_log_compaction: self
.read_snapshot
.log_segment()
.commits_since_log_compaction_or_checkpoint()
+ 1,
},
}),
Err(Error::FileAlreadyExists(_)) => Ok(CommitResult::Conflict(self, commit_version)),
Ok(()) => Ok(CommitResult::CommittedTransaction(
self.into_committed(commit_version),
)),
Err(Error::FileAlreadyExists(_)) => Ok(CommitResult::ConflictedTransaction(
self.into_conflicted(commit_version),
)),
// TODO: we may want to be more or less selective about what is retryable (this is tied
// to the idea of "what kind of Errors should write_json_file return?")
Err(e @ Error::IOError(_)) => {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woah never seen this syntax!

Ok(CommitResult::RetryableTransaction(self.into_retryable(e)))
}
Err(e) => Err(e),
}
}
Expand Down Expand Up @@ -485,6 +483,38 @@ impl Transaction {
Ok((Box::new(add_actions), None))
}
}

fn into_committed(self, version: Version) -> CommittedTransaction {
let stats = PostCommitStats {
commits_since_checkpoint: self.read_snapshot.log_segment().commits_since_checkpoint()
+ 1,
commits_since_log_compaction: self
.read_snapshot
.log_segment()
.commits_since_log_compaction_or_checkpoint()
+ 1,
};

CommittedTransaction {
transaction: self,
commit_version: version,
post_commit_stats: stats,
}
}

fn into_conflicted(self, conflict_version: Version) -> ConflictedTransaction {
ConflictedTransaction {
transaction: self,
conflict_version,
}
}

fn into_retryable(self, error: Error) -> RetryableTransaction {
RetryableTransaction {
transaction: self,
error,
}
}
}

/// WriteContext is data derived from a [`Transaction`] that can be provided to writers in order to
Expand Down Expand Up @@ -532,22 +562,98 @@ pub struct PostCommitStats {
pub commits_since_log_compaction: u64,
}

/// Result of committing a transaction.
/// The result of attempting to commit this transaction. If the commit was
/// successful/conflicted/retryable, the result is Ok(CommitResult), otherwise, if a nonrecoverable
/// error occurred, the result is Err(Error).
///
/// The commit result can be one of the following:
/// - [CommittedTransaction]: the transaction was successfully committed. [PostCommitStats] and
/// in the future a post-commit snapshot can be obtained from the committed transaction.
/// - [ConflictedTransaction]: the transaction conflicted with an existing version. This transcation
/// must be rebased before retrying. (currently no rebase APIs exist, caller must create new txn)
/// - [RetryableTransaction]: an IO (retryable) error occurred during the commit. This transaction
/// can be retried without rebasing.
#[derive(Debug)]
#[must_use]
pub enum CommitResult {
/// The transaction was successfully committed.
Committed {
/// the version of the table that was just committed
version: Version,
/// The [`PostCommitStats`] for this transaction
post_commit_stats: PostCommitStats,
},
/// This transaction conflicted with an existing version (at the version given). The transaction
CommittedTransaction(CommittedTransaction),
/// This transaction conflicted with an existing version (see
/// [ConflictedTransaction::conflict_version]). The transaction
/// is returned so the caller can resolve the conflict (along with the version which
/// conflicted).
// TODO(zach): in order to make the returning of a transaction useful, we need to add APIs to
// update the transaction to a new version etc.
Conflict(Transaction, Version),
ConflictedTransaction(ConflictedTransaction),
/// An IO (retryable) error occurred during the commit.
RetryableTransaction(RetryableTransaction),
}

impl CommitResult {
/// Returns true if the commit was successful.
pub fn is_committed(&self) -> bool {
matches!(self, CommitResult::CommittedTransaction(_))
}
}

/// This is the result of a successfully committed [Transaction]. One can retrieve the
/// [PostCommitStats] and [commit version] from this struct. In the future a post-commit snapshot
/// can be obtained as well.
///
/// [commit version]: Self::commit_version
#[derive(Debug)]
pub struct CommittedTransaction {
// TODO: remove after post-commit snapshot
#[allow(dead_code)]
transaction: Transaction,
/// the version of the table that was just committed
commit_version: Version,
/// The [`PostCommitStats`] for this transaction
post_commit_stats: PostCommitStats,
}

impl CommittedTransaction {
/// The version of the table that was just sucessfully committed
pub fn commit_version(&self) -> Version {
self.commit_version
}

/// The [`PostCommitStats`] for this transaction
pub fn post_commit_stats(&self) -> &PostCommitStats {
&self.post_commit_stats
}

// TODO: post-commit snapshot
}

/// This is the result of a conflicted [Transaction]. One can retrieve the [conflict version] from
/// this struct. In the future a rebase API will be provided.
///
/// [conflict version]: Self::conflict_version
#[derive(Debug)]
pub struct ConflictedTransaction {
// TODO: remove after rebase APIs
#[allow(dead_code)]
transaction: Transaction,
conflict_version: Version,
}

impl ConflictedTransaction {
/// The version attempted commit that yielded a conflict
pub fn conflict_version(&self) -> Version {
self.conflict_version
}
}

/// A transaction that failed to commit due to a retryable error (e.g. IO error). The transaction
/// can be recovered with `RetryableTransaction::transaction` and retried without rebasing. The
/// associated error can be inspected via `RetryableTransaction::error`.
#[derive(Debug)]
pub struct RetryableTransaction {
/// The transaction that failed to commit due to a retryable error.
pub transaction: Transaction,
/// Transient error that caused the commit to fail.
pub error: Error,
}

#[cfg(test)]
Expand Down
Loading
Loading