Skip to content

Commit b659564

Browse files
committed
fix
1 parent 03eba2e commit b659564

File tree

1 file changed

+16
-25
lines changed

1 file changed

+16
-25
lines changed

paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java

Lines changed: 16 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ public Optional<RuntimeException> checkConflicts(
149149
List<IndexManifestEntry> deltaIndexEntries,
150150
CommitKind commitKind) {
151151
String baseCommitUser = latestSnapshot.commitUser();
152-
if (checkForDeletionVector()) {
152+
if (deletionVectorsEnabled && bucketMode.equals(BucketMode.BUCKET_UNAWARE)) {
153153
// Enrich dvName in fileEntry to checker for base ADD dv and delta DELETE dv.
154154
// For example:
155155
// If the base file is <ADD baseFile1, ADD dv1>,
@@ -210,23 +210,12 @@ public Optional<RuntimeException> checkConflicts(
210210
return exception;
211211
}
212212

213-
if (rowIdCheckFromSnapshot != null || commitKind == CommitKind.COMPACT) {
214-
exception = assertRowIdRangeConflicts(mergedEntries);
215-
if (exception.isPresent()) {
216-
return exception;
217-
}
218-
}
219-
220-
if (rowIdCheckFromSnapshot != null) {
221-
exception =
222-
assertForRowIdCheckFromSnapshot(
223-
latestSnapshot, deltaEntries, deltaIndexEntries);
224-
if (exception.isPresent()) {
225-
return exception;
226-
}
213+
exception = checkRowIdRangeConflicts(commitKind, mergedEntries);
214+
if (exception.isPresent()) {
215+
return exception;
227216
}
228217

229-
return Optional.empty();
218+
return checkForRowIdFromSnapshot(latestSnapshot, deltaEntries, deltaIndexEntries);
230219
}
231220

232221
private Optional<RuntimeException> checkBucketKeepSame(
@@ -340,10 +329,6 @@ private Function<Throwable, RuntimeException> conflictException(
340329
};
341330
}
342331

343-
private boolean checkForDeletionVector() {
344-
return deletionVectorsEnabled && bucketMode.equals(BucketMode.BUCKET_UNAWARE);
345-
}
346-
347332
private Optional<RuntimeException> checkDeleteInEntries(
348333
Collection<SimpleFileEntry> mergedEntries,
349334
Function<Throwable, RuntimeException> exceptionFunction) {
@@ -356,7 +341,7 @@ private Optional<RuntimeException> checkDeleteInEntries(
356341
tableName);
357342
}
358343
} catch (Throwable e) {
359-
Optional<RuntimeException> exception = assertConflictForPartitionExpire(mergedEntries);
344+
Optional<RuntimeException> exception = checkConflictForPartitionExpire(mergedEntries);
360345
if (exception.isPresent()) {
361346
return exception;
362347
}
@@ -365,7 +350,7 @@ private Optional<RuntimeException> checkDeleteInEntries(
365350
return Optional.empty();
366351
}
367352

368-
private Optional<RuntimeException> assertConflictForPartitionExpire(
353+
private Optional<RuntimeException> checkConflictForPartitionExpire(
369354
Collection<SimpleFileEntry> mergedEntries) {
370355
if (partitionExpire != null && partitionExpire.isValueExpiration()) {
371356
Set<BinaryRow> deletedPartitions = new HashSet<>();
@@ -393,13 +378,16 @@ private Optional<RuntimeException> assertConflictForPartitionExpire(
393378
return Optional.empty();
394379
}
395380

396-
private Optional<RuntimeException> assertForRowIdCheckFromSnapshot(
381+
private Optional<RuntimeException> checkForRowIdFromSnapshot(
397382
Snapshot latestSnapshot,
398383
List<SimpleFileEntry> deltaEntries,
399384
List<IndexManifestEntry> deltaIndexEntries) {
400385
if (!dataEvolutionEnabled) {
401386
return Optional.empty();
402387
}
388+
if (rowIdCheckFromSnapshot == null) {
389+
return Optional.empty();
390+
}
403391

404392
List<BinaryRow> changedPartitions = changedPartitions(deltaEntries, deltaIndexEntries);
405393
// collect history row id ranges
@@ -441,11 +429,14 @@ private Optional<RuntimeException> assertForRowIdCheckFromSnapshot(
441429
return Optional.empty();
442430
}
443431

444-
private Optional<RuntimeException> assertRowIdRangeConflicts(
445-
Collection<SimpleFileEntry> mergedEntries) {
432+
private Optional<RuntimeException> checkRowIdRangeConflicts(
433+
CommitKind commitKind, Collection<SimpleFileEntry> mergedEntries) {
446434
if (!dataEvolutionEnabled) {
447435
return Optional.empty();
448436
}
437+
if (rowIdCheckFromSnapshot == null && commitKind != CommitKind.COMPACT) {
438+
return Optional.empty();
439+
}
449440

450441
List<SimpleFileEntry> entries =
451442
mergedEntries.stream()

0 commit comments

Comments
 (0)