Skip to content

Commit d7cf603

Browse files
committed
[core] Rollback 'COMPACT' commit for row-level operations
1 parent 59ad255 commit d7cf603

File tree

10 files changed

+271
-37
lines changed

10 files changed

+271
-37
lines changed

paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.CoreOptions.ExternalPathStrategy;
2222
import org.apache.paimon.catalog.RenamingSnapshotCommit;
2323
import org.apache.paimon.catalog.SnapshotCommit;
24+
import org.apache.paimon.catalog.TableRollback;
2425
import org.apache.paimon.data.InternalRow;
2526
import org.apache.paimon.format.FileFormat;
2627
import org.apache.paimon.fs.FileIO;
@@ -43,6 +44,7 @@
4344
import org.apache.paimon.operation.PartitionExpire;
4445
import org.apache.paimon.operation.SnapshotDeletion;
4546
import org.apache.paimon.operation.TagDeletion;
47+
import org.apache.paimon.operation.commit.CommitRollback;
4648
import org.apache.paimon.operation.commit.ConflictDetection;
4749
import org.apache.paimon.operation.commit.StrictModeChecker;
4850
import org.apache.paimon.partition.PartitionExpireStrategy;
@@ -287,6 +289,11 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) {
287289
commitUser,
288290
this::newScan,
289291
options.commitStrictModeLastSafeSnapshot().orElse(null));
292+
CommitRollback rollback = null;
293+
TableRollback tableRollback = catalogEnvironment.catalogTableRollback();
294+
if (tableRollback != null) {
295+
rollback = new CommitRollback(tableRollback);
296+
}
290297
return new FileStoreCommitImpl(
291298
snapshotCommit,
292299
fileIO,
@@ -319,7 +326,8 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) {
319326
options.rowTrackingEnabled(),
320327
options.commitDiscardDuplicateFiles(),
321328
conflictDetection,
322-
strictModeChecker);
329+
strictModeChecker,
330+
rollback);
323331
}
324332

325333
@Override
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.catalog;
20+
21+
import org.apache.paimon.table.Instant;
22+
23+
import javax.annotation.Nullable;
24+
25+
/** Rollback table to instant from snapshot. */
26+
public interface TableRollback {
27+
28+
void rollbackTo(Instant instant, @Nullable Long fromSnapshot);
29+
}

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java

Lines changed: 36 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.paimon.operation.commit.CommitCleaner;
4545
import org.apache.paimon.operation.commit.CommitKindProvider;
4646
import org.apache.paimon.operation.commit.CommitResult;
47+
import org.apache.paimon.operation.commit.CommitRollback;
4748
import org.apache.paimon.operation.commit.CommitScanner;
4849
import org.apache.paimon.operation.commit.ConflictDetection;
4950
import org.apache.paimon.operation.commit.ManifestEntryChanges;
@@ -139,6 +140,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
139140
private final ManifestFile manifestFile;
140141
private final ManifestList manifestList;
141142
private final IndexManifestFile indexManifestFile;
143+
@Nullable private final CommitRollback rollback;
142144
private final CommitScanner scanner;
143145
private final int numBucket;
144146
private final MemorySize manifestTargetSize;
@@ -196,7 +198,8 @@ public FileStoreCommitImpl(
196198
boolean rowTrackingEnabled,
197199
boolean discardDuplicateFiles,
198200
ConflictDetection conflictDetection,
199-
@Nullable StrictModeChecker strictModeChecker) {
201+
@Nullable StrictModeChecker strictModeChecker,
202+
@Nullable CommitRollback rollback) {
200203
this.snapshotCommit = snapshotCommit;
201204
this.fileIO = fileIO;
202205
this.schemaManager = schemaManager;
@@ -210,6 +213,7 @@ public FileStoreCommitImpl(
210213
this.manifestFile = manifestFileFactory.create();
211214
this.manifestList = manifestListFactory.create();
212215
this.indexManifestFile = indexManifestFileFactory.create();
216+
this.rollback = rollback;
213217
this.scanner = new CommitScanner(scan, indexManifestFile, options);
214218
this.numBucket = numBucket;
215219
this.manifestTargetSize = manifestTargetSize;
@@ -314,10 +318,13 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
314318
if (appendCommitCheckConflict) {
315319
checkAppendFiles = true;
316320
}
321+
322+
boolean allowRollback = false;
317323
if (containsFileDeletionOrDeletionVectors(
318324
appendSimpleEntries, changes.appendIndexFiles)) {
319325
commitKind = CommitKind.OVERWRITE;
320326
checkAppendFiles = true;
327+
allowRollback = true;
321328
}
322329

323330
attempts +=
@@ -330,6 +337,7 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
330337
committable.watermark(),
331338
committable.properties(),
332339
CommitKindProvider.provider(commitKind),
340+
allowRollback,
333341
checkAppendFiles,
334342
null);
335343
generatedSnapshot += 1;
@@ -348,6 +356,7 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
348356
committable.watermark(),
349357
committable.properties(),
350358
CommitKindProvider.provider(CommitKind.COMPACT),
359+
false,
351360
true,
352361
null);
353362
generatedSnapshot += 1;
@@ -513,6 +522,7 @@ public int overwritePartition(
513522
committable.watermark(),
514523
committable.properties(),
515524
CommitKindProvider.provider(CommitKind.COMPACT),
525+
false,
516526
true,
517527
null);
518528
generatedSnapshot += 1;
@@ -653,6 +663,7 @@ public void commitStatistics(Statistics stats, long commitIdentifier) {
653663
Collections.emptyMap(),
654664
CommitKindProvider.provider(CommitKind.ANALYZE),
655665
false,
666+
false,
656667
statsFileName);
657668
}
658669

@@ -679,6 +690,7 @@ private int tryCommit(
679690
@Nullable Long watermark,
680691
Map<String, String> properties,
681692
CommitKindProvider commitKindProvider,
693+
boolean allowRollback,
682694
boolean detectConflicts,
683695
@Nullable String statsFileName) {
684696
int retryCount = 0;
@@ -698,6 +710,7 @@ private int tryCommit(
698710
watermark,
699711
properties,
700712
commitKind,
713+
allowRollback,
701714
latestSnapshot,
702715
detectConflicts,
703716
statsFileName);
@@ -750,6 +763,7 @@ private int tryOverwritePartition(
750763
watermark,
751764
properties,
752765
commitKindProvider,
766+
false,
753767
true,
754768
null);
755769
}
@@ -764,6 +778,7 @@ CommitResult tryCommitOnce(
764778
@Nullable Long watermark,
765779
Map<String, String> properties,
766780
CommitKind commitKind,
781+
boolean allowRollback,
767782
@Nullable Snapshot latestSnapshot,
768783
boolean detectConflicts,
769784
@Nullable String newStatsFileName) {
@@ -821,7 +836,9 @@ CommitResult tryCommitOnce(
821836
// latestSnapshotId is different from the snapshot id we've checked for conflicts,
822837
// so we have to check again
823838
List<BinaryRow> changedPartitions = changedPartitions(deltaFiles, indexFiles);
824-
if (retryResult != null && retryResult.latestSnapshot != null) {
839+
if (retryResult != null
840+
&& retryResult.latestSnapshot != null
841+
&& retryResult.baseDataFiles != null) {
825842
baseDataFiles = new ArrayList<>(retryResult.baseDataFiles);
826843
List<SimpleFileEntry> incremental =
827844
scanner.readIncrementalChanges(
@@ -845,12 +862,21 @@ CommitResult tryCommitOnce(
845862
.filter(entry -> !baseIdentifiers.contains(entry.identifier()))
846863
.collect(Collectors.toList());
847864
}
848-
conflictDetection.checkNoConflictsOrFail(
849-
latestSnapshot,
850-
baseDataFiles,
851-
SimpleFileEntry.from(deltaFiles),
852-
indexFiles,
853-
commitKind);
865+
Optional<RuntimeException> exception =
866+
conflictDetection.checkConflicts(
867+
latestSnapshot,
868+
baseDataFiles,
869+
SimpleFileEntry.from(deltaFiles),
870+
indexFiles,
871+
commitKind);
872+
if (exception.isPresent()) {
873+
if (allowRollback && rollback != null) {
874+
if (rollback.tryToRollback(latestSnapshot)) {
875+
return RetryCommitResult.ofEmpty(exception.get());
876+
}
877+
}
878+
throw exception.get();
879+
}
854880
}
855881

856882
Snapshot newSnapshot;
@@ -979,7 +1005,7 @@ CommitResult tryCommitOnce(
9791005
} catch (Exception e) {
9801006
// commit exception, not sure about the situation and should not clean up the files
9811007
LOG.warn("Retry commit for exception.", e);
982-
return new RetryCommitResult(latestSnapshot, baseDataFiles, e);
1008+
return RetryCommitResult.ofContext(latestSnapshot, baseDataFiles, e);
9831009
}
9841010

9851011
if (!success) {
@@ -996,7 +1022,7 @@ CommitResult tryCommitOnce(
9961022
commitTime);
9971023
commitCleaner.cleanUpNoReuseTmpManifests(
9981024
baseManifestList, mergeBeforeManifests, mergeAfterManifests);
999-
return new RetryCommitResult(latestSnapshot, baseDataFiles, null);
1025+
return RetryCommitResult.ofContext(latestSnapshot, baseDataFiles, null);
10001026
}
10011027

10021028
LOG.info(
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.operation.commit;
20+
21+
import org.apache.paimon.Snapshot;
22+
import org.apache.paimon.catalog.TableRollback;
23+
import org.apache.paimon.table.Instant;
24+
25+
/** Commit rollback to rollback 'COMPACT' commits for resolving conflicts. */
26+
public class CommitRollback {
27+
28+
private final TableRollback tableRollback;
29+
30+
public CommitRollback(TableRollback tableRollback) {
31+
this.tableRollback = tableRollback;
32+
}
33+
34+
public boolean tryToRollback(Snapshot latestSnapshot) {
35+
if (latestSnapshot.commitKind() == Snapshot.CommitKind.COMPACT) {
36+
long latest = latestSnapshot.id();
37+
try {
38+
tableRollback.rollbackTo(Instant.snapshot(latest - 1), latest);
39+
return true;
40+
} catch (Exception ignored) {
41+
}
42+
}
43+
return false;
44+
}
45+
}

0 commit comments

Comments
 (0)