Skip to content

Commit 96d0199

Browse files
committed
[core] Unify conflict detect in FileStoreCommitImpl
1 parent 3136296 commit 96d0199

File tree

5 files changed

+19
-117
lines changed

5 files changed

+19
-117
lines changed

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java

Lines changed: 13 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import org.apache.paimon.operation.commit.CommitResult;
4747
import org.apache.paimon.operation.commit.CommitScanner;
4848
import org.apache.paimon.operation.commit.ConflictDetection;
49-
import org.apache.paimon.operation.commit.ConflictDetection.ConflictCheck;
5049
import org.apache.paimon.operation.commit.ManifestEntryChanges;
5150
import org.apache.paimon.operation.commit.RetryCommitResult;
5251
import org.apache.paimon.operation.commit.RowTrackingCommitUtils.RowTrackingAssigned;
@@ -98,9 +97,6 @@
9897
import static org.apache.paimon.manifest.ManifestEntry.nullableRecordCount;
9998
import static org.apache.paimon.manifest.ManifestEntry.recordCountAdd;
10099
import static org.apache.paimon.manifest.ManifestEntry.recordCountDelete;
101-
import static org.apache.paimon.operation.commit.ConflictDetection.hasConflictChecked;
102-
import static org.apache.paimon.operation.commit.ConflictDetection.mustConflictCheck;
103-
import static org.apache.paimon.operation.commit.ConflictDetection.noConflictCheck;
104100
import static org.apache.paimon.operation.commit.ManifestEntryChanges.changedPartitions;
105101
import static org.apache.paimon.operation.commit.RowTrackingCommitUtils.assignRowTracking;
106102
import static org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
@@ -308,9 +304,6 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
308304
long started = System.nanoTime();
309305
int generatedSnapshot = 0;
310306
int attempts = 0;
311-
Snapshot latestSnapshot = null;
312-
Long safeLatestSnapshotId = null;
313-
List<SimpleFileEntry> baseEntries = new ArrayList<>();
314307

315308
ManifestEntryChanges changes = collectChanges(committable.fileCommittables());
316309
try {
@@ -320,63 +313,16 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
320313
|| !changes.appendTableFiles.isEmpty()
321314
|| !changes.appendChangelog.isEmpty()
322315
|| !changes.appendIndexFiles.isEmpty()) {
323-
// Optimization for common path.
324-
// Step 1:
325-
// Read manifest entries from changed partitions here and check for conflicts.
326-
// If there are no other jobs committing at the same time,
327-
// we can skip conflict checking in tryCommit method.
328-
// This optimization is mainly used to decrease the number of times we read from
329-
// files.
330-
latestSnapshot = snapshotManager.latestSnapshot();
331316
CommitKind commitKind = CommitKind.APPEND;
332-
ConflictCheck conflictCheck = noConflictCheck();
317+
if (appendCommitCheckConflict) {
318+
checkAppendFiles = true;
319+
}
333320
if (containsFileDeletionOrDeletionVectors(
334321
appendSimpleEntries, changes.appendIndexFiles)) {
335322
commitKind = CommitKind.OVERWRITE;
336-
conflictCheck = mustConflictCheck();
337-
} else if (latestSnapshot != null && appendCommitCheckConflict) {
338-
conflictCheck = mustConflictCheck();
339-
}
340-
341-
boolean discardDuplicate = discardDuplicateFiles && commitKind == CommitKind.APPEND;
342-
if (discardDuplicate) {
343323
checkAppendFiles = true;
344324
}
345325

346-
if (latestSnapshot != null && checkAppendFiles) {
347-
// it is possible that some partitions only have compact changes,
348-
// so we need to contain all changes
349-
baseEntries.addAll(
350-
scanner.readAllEntriesFromChangedPartitions(
351-
latestSnapshot,
352-
changedPartitions(
353-
changes.appendTableFiles,
354-
changes.compactTableFiles,
355-
changes.appendIndexFiles,
356-
changes.compactIndexFiles)));
357-
if (discardDuplicate) {
358-
Set<FileEntry.Identifier> baseIdentifiers =
359-
baseEntries.stream()
360-
.map(FileEntry::identifier)
361-
.collect(Collectors.toSet());
362-
changes.appendTableFiles =
363-
changes.appendTableFiles.stream()
364-
.filter(
365-
entry ->
366-
!baseIdentifiers.contains(
367-
entry.identifier()))
368-
.collect(Collectors.toList());
369-
appendSimpleEntries = SimpleFileEntry.from(changes.appendTableFiles);
370-
}
371-
conflictDetection.checkNoConflictsOrFail(
372-
latestSnapshot,
373-
baseEntries,
374-
appendSimpleEntries,
375-
changes.appendIndexFiles,
376-
commitKind);
377-
safeLatestSnapshotId = latestSnapshot.id();
378-
}
379-
380326
attempts +=
381327
tryCommit(
382328
CommitChangesProvider.provider(
@@ -387,33 +333,14 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
387333
committable.watermark(),
388334
committable.properties(),
389335
CommitKindProvider.provider(commitKind),
390-
conflictCheck,
336+
checkAppendFiles,
391337
null);
392338
generatedSnapshot += 1;
393339
}
394340

395341
if (!changes.compactTableFiles.isEmpty()
396342
|| !changes.compactChangelog.isEmpty()
397343
|| !changes.compactIndexFiles.isEmpty()) {
398-
// Optimization for common path.
399-
// Step 2:
400-
// Add appendChanges to the manifest entries read above and check for conflicts.
401-
// If there are no other jobs committing at the same time,
402-
// we can skip conflict checking in tryCommit method.
403-
// This optimization is mainly used to decrease the number of times we read from
404-
// files.
405-
if (safeLatestSnapshotId != null) {
406-
baseEntries.addAll(appendSimpleEntries);
407-
conflictDetection.checkNoConflictsOrFail(
408-
latestSnapshot,
409-
baseEntries,
410-
SimpleFileEntry.from(changes.compactTableFiles),
411-
changes.compactIndexFiles,
412-
CommitKind.COMPACT);
413-
// assume this compact commit follows just after the append commit created above
414-
safeLatestSnapshotId += 1;
415-
}
416-
417344
attempts +=
418345
tryCommit(
419346
CommitChangesProvider.provider(
@@ -424,7 +351,7 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
424351
committable.watermark(),
425352
committable.properties(),
426353
CommitKindProvider.provider(CommitKind.COMPACT),
427-
hasConflictChecked(safeLatestSnapshotId),
354+
true,
428355
null);
429356
generatedSnapshot += 1;
430357
}
@@ -589,7 +516,7 @@ public int overwritePartition(
589516
committable.watermark(),
590517
committable.properties(),
591518
CommitKindProvider.provider(CommitKind.COMPACT),
592-
mustConflictCheck(),
519+
true,
593520
null);
594521
generatedSnapshot += 1;
595522
}
@@ -728,7 +655,7 @@ public void commitStatistics(Statistics stats, long commitIdentifier) {
728655
null,
729656
Collections.emptyMap(),
730657
CommitKindProvider.provider(CommitKind.ANALYZE),
731-
noConflictCheck(),
658+
false,
732659
statsFileName);
733660
}
734661

@@ -755,7 +682,7 @@ private int tryCommit(
755682
@Nullable Long watermark,
756683
Map<String, String> properties,
757684
CommitKindProvider commitKindProvider,
758-
ConflictCheck conflictCheck,
685+
boolean detectConflicts,
759686
@Nullable String statsFileName) {
760687
int retryCount = 0;
761688
RetryCommitResult retryResult = null;
@@ -775,7 +702,7 @@ private int tryCommit(
775702
properties,
776703
commitKind,
777704
latestSnapshot,
778-
conflictCheck,
705+
detectConflicts,
779706
statsFileName);
780707

781708
if (result.isSuccess()) {
@@ -826,7 +753,7 @@ private int tryOverwritePartition(
826753
watermark,
827754
properties,
828755
commitKindProvider,
829-
mustConflictCheck(),
756+
true,
830757
null);
831758
}
832759

@@ -841,7 +768,7 @@ CommitResult tryCommitOnce(
841768
Map<String, String> properties,
842769
CommitKind commitKind,
843770
@Nullable Snapshot latestSnapshot,
844-
ConflictCheck conflictCheck,
771+
boolean detectConflicts,
845772
@Nullable String newStatsFileName) {
846773
long startMillis = System.currentTimeMillis();
847774

@@ -893,12 +820,10 @@ CommitResult tryCommitOnce(
893820

894821
List<SimpleFileEntry> baseDataFiles = new ArrayList<>();
895822
boolean discardDuplicate = discardDuplicateFiles && commitKind == CommitKind.APPEND;
896-
if (latestSnapshot != null
897-
&& (discardDuplicate || conflictCheck.shouldCheck(latestSnapshot.id()))) {
823+
if (latestSnapshot != null && (discardDuplicate || detectConflicts)) {
898824
// latestSnapshotId is different from the snapshot id we've checked for conflicts,
899825
// so we have to check again
900-
List<BinaryRow> changedPartitions =
901-
changedPartitions(deltaFiles, emptyList(), indexFiles, emptyList());
826+
List<BinaryRow> changedPartitions = changedPartitions(deltaFiles, indexFiles);
902827
if (retryResult != null && retryResult.latestSnapshot != null) {
903828
baseDataFiles = new ArrayList<>(retryResult.baseDataFiles);
904829
List<SimpleFileEntry> incremental =

paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -532,21 +532,4 @@ public int hashCode() {
532532
return Objects.hash(partition, bucket, level);
533533
}
534534
}
535-
536-
/** Should do conflict check. */
537-
public interface ConflictCheck {
538-
boolean shouldCheck(long latestSnapshot);
539-
}
540-
541-
public static ConflictCheck hasConflictChecked(@Nullable Long checkedLatestSnapshotId) {
542-
return latestSnapshot -> !Objects.equals(latestSnapshot, checkedLatestSnapshotId);
543-
}
544-
545-
public static ConflictCheck noConflictCheck() {
546-
return latestSnapshot -> false;
547-
}
548-
549-
public static ConflictCheck mustConflictCheck() {
550-
return latestSnapshot -> true;
551-
}
552535
}

paimon-core/src/main/java/org/apache/paimon/operation/commit/ManifestEntryChanges.java

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import org.apache.paimon.manifest.ManifestEntry;
2626
import org.apache.paimon.table.sink.CommitMessage;
2727
import org.apache.paimon.table.sink.CommitMessageImpl;
28-
import org.apache.paimon.utils.ListUtils;
2928

3029
import java.util.ArrayList;
3130
import java.util.HashSet;
@@ -164,15 +163,12 @@ public String toString() {
164163
}
165164

166165
public static List<BinaryRow> changedPartitions(
167-
List<ManifestEntry> appendTableFiles,
168-
List<ManifestEntry> compactTableFiles,
169-
List<IndexManifestEntry> appendIndexFiles,
170-
List<IndexManifestEntry> compactIndexFiles) {
166+
List<ManifestEntry> dataFileChanges, List<IndexManifestEntry> indexFileChanges) {
171167
Set<BinaryRow> changedPartitions = new HashSet<>();
172-
for (ManifestEntry file : ListUtils.union(appendTableFiles, compactTableFiles)) {
168+
for (ManifestEntry file : dataFileChanges) {
173169
changedPartitions.add(file.partition());
174170
}
175-
for (IndexManifestEntry file : ListUtils.union(appendIndexFiles, compactIndexFiles)) {
171+
for (IndexManifestEntry file : indexFileChanges) {
176172
if (file.indexFile().indexType().equals(DELETION_VECTORS_INDEX)) {
177173
changedPartitions.add(file.partition());
178174
}

paimon-core/src/test/java/org/apache/paimon/operation/FileDeletionTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@
7777
import static org.apache.paimon.operation.FileStoreTestUtils.assertPathNotExists;
7878
import static org.apache.paimon.operation.FileStoreTestUtils.commitData;
7979
import static org.apache.paimon.operation.FileStoreTestUtils.partitionedData;
80-
import static org.apache.paimon.operation.commit.ConflictDetection.mustConflictCheck;
8180
import static org.assertj.core.api.Assertions.assertThat;
8281

8382
/**
@@ -931,7 +930,7 @@ private void cleanBucket(TestFileStore store, BinaryRow partition, int bucket) {
931930
Collections.emptyMap(),
932931
Snapshot.CommitKind.APPEND,
933932
store.snapshotManager().latestSnapshot(),
934-
mustConflictCheck(),
933+
true,
935934
null);
936935
}
937936
}

paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,6 @@
8484
import java.util.stream.Collectors;
8585

8686
import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
87-
import static org.apache.paimon.operation.commit.ConflictDetection.mustConflictCheck;
8887
import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
8988
import static org.apache.paimon.stats.SimpleStats.EMPTY_STATS;
9089
import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
@@ -1021,7 +1020,7 @@ public void testCommitTwiceWithDifferentKind() throws Exception {
10211020
Collections.emptyMap(),
10221021
Snapshot.CommitKind.APPEND,
10231022
firstLatest,
1024-
mustConflictCheck(),
1023+
true,
10251024
null);
10261025
// Compact
10271026
commit.tryCommitOnce(
@@ -1034,7 +1033,7 @@ public void testCommitTwiceWithDifferentKind() throws Exception {
10341033
Collections.emptyMap(),
10351034
Snapshot.CommitKind.COMPACT,
10361035
store.snapshotManager().latestSnapshot(),
1037-
mustConflictCheck(),
1036+
true,
10381037
null);
10391038
}
10401039
long id = store.snapshotManager().latestSnapshot().id();

0 commit comments

Comments
 (0)