Skip to content

Commit 26f67e9

Browse files
authored
[Spark][4.0] Rename managed commit to coordinated commits (#3241)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [x] Other (RFC) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> Renames Managed Commit to Coordinated Commits and Commit Owner to Commit Coordinator to better express the meaning of the feature. Configs, table feature name, classes, functions, comments, file names, and directory names have all been updated to reflect the new terminology. ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> Existing tests should cover these changes. ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. --> Yes, the feature name and config names have changed.
1 parent d5d01c9 commit 26f67e9

File tree

51 files changed

+1959
-1808
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

51 files changed

+1959
-1808
lines changed

build.sbt

+1-1
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ lazy val spark = (project in file("spark"))
401401
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
402402
"org.apache.spark" %% "spark-core" % sparkVersion.value % "provided",
403403
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "provided",
404-
// For DynamoDBCommitStore
404+
// For DynamoDBCommitCoordinator
405405
"com.amazonaws" % "aws-java-sdk" % "1.12.262" % "provided",
406406

407407
// Test deps

connectors/standalone/src/main/scala/io/delta/standalone/internal/OptimisticTransactionImpl.scala

+3-1
Original file line numberDiff line numberDiff line change
@@ -527,7 +527,9 @@ private[internal] class OptimisticTransactionImpl(
527527
private def shouldCheckpoint(committedVersion: Long): Boolean = {
528528
val checkpointingEnabled =
529529
deltaLog.hadoopConf.getBoolean(StandaloneHadoopConf.CHECKPOINTING_ENABLED, true)
530-
checkpointingEnabled && committedVersion != 0 && committedVersion % deltaLog.checkpointInterval == 0
530+
checkpointingEnabled &&
531+
committedVersion != 0 &&
532+
committedVersion % deltaLog.checkpointInterval == 0
531533
}
532534

533535
/** Returns the next attempt version given the last attempted version */

protocol_rfcs/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ Here is the history of all the RFCs propose/accepted/rejected since Feb 6, 2024,
2020
|:--------------|:---------------------------------------------------------------------------------------------------------------------------------|:----------------------------------------------|:---------------------------------------|
2121
| 2023-02-02 | [in-commit-timestamps.md](https://github.com/delta-io/delta/blob/master/protocol_rfcs/in-commit-timestamps.md) | https://github.com/delta-io/delta/issues/2532 | In-Commit Timestamps |
2222
| 2023-02-09 | [type-widening.md](https://github.com/delta-io/delta/blob/master/protocol_rfcs/type-widening.md) | https://github.com/delta-io/delta/issues/2623 | Type Widening |
23-
| 2023-02-14 | [managed-commits.md](https://github.com/delta-io/delta/blob/master/protocol_rfcs/managed-commits.md) | https://github.com/delta-io/delta/issues/2598 | Managed Commits |
23+
| 2023-02-14 | [coordinated-commits.md](https://github.com/delta-io/delta/blob/master/protocol_rfcs/coordinated-commits.md) | https://github.com/delta-io/delta/issues/2598 | Coordinated Commits |
2424
| 2023-02-26 | [column-mapping-usage.tracking.md](https://github.com/delta-io/delta/blob/master/protocol_rfcs/column-mapping-usage-tracking.md) | https://github.com/delta-io/delta/issues/2682 | Column Mapping Usage Tracking |
2525
| 2023-04-24 | [variant-type.md](https://github.com/delta-io/delta/blob/master/protocol_rfcs/variant-type.md) | https://github.com/delta-io/delta/issues/2864 | Variant Data Type |
2626
| 2024-04-30 | [collated-string-type.md](https://github.com/delta-io/delta/blob/master/protocol_rfcs/collated-string-type.md) | https://github.com/delta-io/delta/issues/2894 | Collated String Type |

protocol_rfcs/managed-commits.md renamed to protocol_rfcs/coordinated-commits.md

+101-101
Large diffs are not rendered by default.

spark/src/main/java/io/delta/dynamodbcommitstore/ManagedCommitUtils.java renamed to spark/src/main/java/io/delta/dynamodbcommitcoordinator/CoordinatedCommitsUtils.java

+17-17
Original file line numberDiff line numberDiff line change
@@ -14,24 +14,24 @@
1414
* limitations under the License.
1515
*/
1616

17-
package io.delta.dynamodbcommitstore;
17+
package io.delta.dynamodbcommitcoordinator;
1818

19-
import org.apache.spark.sql.delta.managedcommit.AbstractMetadata;
20-
import org.apache.spark.sql.delta.managedcommit.UpdatedActions;
19+
import org.apache.spark.sql.delta.coordinatedcommits.AbstractMetadata;
20+
import org.apache.spark.sql.delta.coordinatedcommits.UpdatedActions;
2121
import org.apache.hadoop.fs.Path;
2222

2323
import java.util.UUID;
2424

25-
public class ManagedCommitUtils {
25+
public class CoordinatedCommitsUtils {
2626

27-
private ManagedCommitUtils() {}
27+
private CoordinatedCommitsUtils() {}
2828

2929
/** The subdirectory in which to store the unbackfilled commit files. */
3030
final static String COMMIT_SUBDIR = "_commits";
3131

32-
/** The configuration key for the managed commit owner. */
33-
private static final String MANAGED_COMMIT_OWNER_CONF_KEY =
34-
"delta.managedCommit.commitOwner-preview";
32+
/** The configuration key for the coordinated commits owner. */
33+
private static final String COORDINATED_COMMITS_COORDINATOR_CONF_KEY =
34+
"delta.coordinatedCommits.commitCoordinator-preview";
3535

3636
/**
3737
* Creates a new unbackfilled delta file path for the given commit version.
@@ -55,23 +55,23 @@ public static Path getBackfilledDeltaFilePath(
5555
return new Path(logPath, String.format("%020d.json", version));
5656
}
5757

58-
private static String getManagedCommitOwner(AbstractMetadata metadata) {
58+
private static String getCoordinatedCommitsCoordinator(AbstractMetadata metadata) {
5959
return metadata
6060
.getConfiguration()
61-
.get(MANAGED_COMMIT_OWNER_CONF_KEY)
61+
.get(COORDINATED_COMMITS_COORDINATOR_CONF_KEY)
6262
.getOrElse(() -> "");
6363
}
6464

6565
/**
66-
* Returns true if the commit is a managed commit to filesystem conversion.
66+
* Returns true if the commit is a coordinated commits to filesystem conversion.
6767
*/
68-
public static boolean isManagedCommitToFSConversion(
68+
public static boolean isCoordinatedCommitsToFSConversion(
6969
Long commitVersion,
7070
UpdatedActions updatedActions) {
71-
boolean oldMetadataHasManagedCommit =
72-
!getManagedCommitOwner(updatedActions.getOldMetadata()).isEmpty();
73-
boolean newMetadataHasManagedCommit =
74-
!getManagedCommitOwner(updatedActions.getNewMetadata()).isEmpty();
75-
return oldMetadataHasManagedCommit && !newMetadataHasManagedCommit && commitVersion > 0;
71+
boolean oldMetadataHasCoordinatedCommits =
72+
!getCoordinatedCommitsCoordinator(updatedActions.getOldMetadata()).isEmpty();
73+
boolean newMetadataHasCoordinatedCommits =
74+
!getCoordinatedCommitsCoordinator(updatedActions.getNewMetadata()).isEmpty();
75+
return oldMetadataHasCoordinatedCommits && !newMetadataHasCoordinatedCommits && commitVersion > 0;
7676
}
7777
}

0 commit comments

Comments
 (0)