-
Couldn't load subscription status.
- Fork 118
refactor!: rearchitect CommitResult
#1343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
bd8977f
d12b52e
b073d23
e568469
84e877a
c33d9b4
c677c5f
73e7083
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -169,8 +169,11 @@ | |
| }) | ||
| } | ||
|
|
||
| /// Consume the transaction and commit it to the table. The result is a [CommitResult] which | ||
| /// will include the failed transaction in case of a conflict so the user can retry. | ||
| /// Consume the transaction and commit it to the table. The result is a result of | ||
| /// [CommitResult] with the following semantics: | ||
| /// - Ok(CommitResult) for either success or a recoverable error (includes the failed | ||
| /// transaction in case of a conflict so the user can retry, etc.) | ||
| /// - Err(Error) indicates a non-retryable error (e.g. logic/validation error). | ||
| pub fn commit(self, engine: &dyn Engine) -> DeltaResult<CommitResult> { | ||
| // Step 1: Check for duplicate app_ids and generate set transactions (`txn`) | ||
| // Note: The commit info must always be the first action in the commit but we generate it in | ||
|
|
@@ -227,23 +230,14 @@ | |
| let json_handler = engine.json_handler(); | ||
| match json_handler.write_json_file(&commit_path.location, Box::new(filtered_actions), false) | ||
| { | ||
| Ok(()) => Ok(CommitResult::Committed { | ||
| version: commit_version, | ||
| post_commit_stats: PostCommitStats { | ||
| commits_since_checkpoint: self | ||
| .read_snapshot | ||
| .log_segment() | ||
| .commits_since_checkpoint() | ||
| + 1, | ||
| commits_since_log_compaction: self | ||
| .read_snapshot | ||
| .log_segment() | ||
| .commits_since_log_compaction_or_checkpoint() | ||
| + 1, | ||
| }, | ||
| }), | ||
| Err(Error::FileAlreadyExists(_)) => Ok(CommitResult::Conflict(self, commit_version)), | ||
| Err(e) => Err(e), | ||
| Ok(()) => Ok(CommitResult::CommittedTransaction( | ||
| self.into_committed(commit_version), | ||
| )), | ||
| Err(Error::FileAlreadyExists(_)) => Ok(CommitResult::ConflictedTransaction( | ||
| self.into_conflicted(commit_version), | ||
| )), | ||
| // TODO: we may want to be more selective about what is retryable | ||
| Err(e) => Ok(CommitResult::RetryableTransaction(self.into_retryable(e))), | ||
|
||
| } | ||
| } | ||
|
|
||
|
|
@@ -485,6 +479,38 @@ | |
| Ok((Box::new(add_actions), None)) | ||
| } | ||
| } | ||
|
|
||
| fn into_committed(self, version: Version) -> CommittedTransaction { | ||
| let stats = PostCommitStats { | ||
| commits_since_checkpoint: self.read_snapshot.log_segment().commits_since_checkpoint() | ||
| + 1, | ||
| commits_since_log_compaction: self | ||
| .read_snapshot | ||
| .log_segment() | ||
| .commits_since_log_compaction_or_checkpoint() | ||
| + 1, | ||
| }; | ||
|
|
||
| CommittedTransaction { | ||
| transaction: self, | ||
| commit_version: version, | ||
| post_commit_stats: stats, | ||
| } | ||
| } | ||
|
|
||
| fn into_conflicted(self, conflict_version: Version) -> ConflictedTransaction { | ||
| ConflictedTransaction { | ||
| transaction: self, | ||
| conflict_version, | ||
| } | ||
| } | ||
|
|
||
| fn into_retryable(self, error: Error) -> RetryableTransaction { | ||
| RetryableTransaction { | ||
| transaction: self, | ||
| error, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// WriteContext is data derived from a [`Transaction`] that can be provided to writers in order to | ||
|
|
@@ -532,22 +558,99 @@ | |
| pub commits_since_log_compaction: u64, | ||
| } | ||
|
|
||
| /// Result of committing a transaction. | ||
| /// The result of attempting to commit this transaction. If the commit was | ||
| /// successful/conflicted/retryable, the result is Ok(CommitResult), otherwise, if a nonrecoverable | ||
| /// error occurred, the result is Err(Error). | ||
| /// | ||
| /// The commit result can be one of the following: | ||
| /// - [CommittedTransaction]: the transaction was successfully committed. [PostCommitStats] and | ||
| /// in the future a post-commit snapshot can be obtained from the committed transaction. | ||
| /// - [ConflictedTransaction]: the transaction conflicted with an existing version. This transcation | ||
| /// must be rebased before retrying. (currently no rebase APIs exist, caller must create new txn) | ||
| /// - [RetryableTransaction]: an IO (retryable) error occurred during the commit. This transaction | ||
| /// can be retried without rebasing. | ||
| #[derive(Debug)] | ||
| #[must_use] | ||
| pub enum CommitResult { | ||
| /// The transaction was successfully committed. | ||
| Committed { | ||
| /// the version of the table that was just committed | ||
| version: Version, | ||
| /// The [`PostCommitStats`] for this transaction | ||
| post_commit_stats: PostCommitStats, | ||
| }, | ||
| /// This transaction conflicted with an existing version (at the version given). The transaction | ||
| CommittedTransaction(CommittedTransaction), | ||
| /// This transaction conflicted with an existing version (see | ||
| /// [ConflictedTransaction::conflict_version]). The transaction | ||
| /// is returned so the caller can resolve the conflict (along with the version which | ||
| /// conflicted). | ||
| // TODO(zach): in order to make the returning of a transaction useful, we need to add APIs to | ||
| // update the transaction to a new version etc. | ||
| Conflict(Transaction, Version), | ||
| ConflictedTransaction(ConflictedTransaction), | ||
| /// An IO (retryable) error occurred during the commit. | ||
| RetryableTransaction(RetryableTransaction), | ||
| } | ||
|
|
||
| impl CommitResult { | ||
| /// Returns true if the commit was successful. | ||
| pub fn is_committed(&self) -> bool { | ||
| matches!(self, CommitResult::CommittedTransaction(_)) | ||
| } | ||
| } | ||
|
|
||
| /// This is the result of a successfully committed [Transaction]. One can retrieve the | ||
| /// [PostCommitStats] and [commit version] from this struct. In the future a post-commit snapshot | ||
| /// can be obtained as well. | ||
| /// | ||
| /// [commit version]: Self::commit_version | ||
| #[derive(Debug)] | ||
| pub struct CommittedTransaction { | ||
| // TODO: remove after post-commit snapshot | ||
| #[allow(dead_code)] | ||
| transaction: Transaction, | ||
| /// the version of the table that was just committed | ||
| commit_version: Version, | ||
| /// The [`PostCommitStats`] for this transaction | ||
| post_commit_stats: PostCommitStats, | ||
| } | ||
|
|
||
| impl CommittedTransaction { | ||
| /// The version of the table that was just sucessfully committed | ||
| pub fn commit_version(&self) -> Version { | ||
| self.commit_version | ||
| } | ||
|
|
||
| /// The [`PostCommitStats`] for this transaction | ||
| pub fn post_commit_stats(&self) -> &PostCommitStats { | ||
| &self.post_commit_stats | ||
| } | ||
|
|
||
| // TODO: post-commit snapshot | ||
zachschuermann marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
|
|
||
| /// This is the result of a [Transaction]. One can retrieve the | ||
| /// [PostCommitStats] and [commit version] from this struct. In the future a post-commit snapshot | ||
| /// can be obtained as well. | ||
| /// | ||
| /// [commit version]: Self::commit_version | ||
zachschuermann marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| #[derive(Debug)] | ||
| pub struct ConflictedTransaction { | ||
| // TODO: remove after rebase APIs | ||
| #[allow(dead_code)] | ||
| transaction: Transaction, | ||
| conflict_version: Version, | ||
| } | ||
|
|
||
| impl ConflictedTransaction { | ||
| /// The version attempted commit that yielded a conflict | ||
| pub fn conflict_version(&self) -> Version { | ||
| self.conflict_version | ||
| } | ||
| } | ||
|
|
||
| /// A transaction that failed to commit due to a retryable error (e.g. IO error). The transaction | ||
| /// can be recovered with `RetryableTransaction::transaction` and retried without rebasing. The | ||
| /// associated error can be inspected via `RetryableTransaction::error`. | ||
| #[derive(Debug)] | ||
| pub struct RetryableTransaction { | ||
| /// The transaction that failed to commit due to a retryable error. | ||
| pub transaction: Transaction, | ||
| /// Transient error that caused the commit to fail. | ||
| pub error: Error, | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.