Skip to content

Commit 0c39b32

Browse files
committed
fix
1 parent b659564 commit 0c39b32

File tree

3 files changed

+37
-26
lines changed

3 files changed

+37
-26
lines changed

paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -275,18 +275,20 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) {
275275
if (snapshotCommit == null) {
276276
snapshotCommit = new RenamingSnapshotCommit(snapshotManager, Lock.empty());
277277
}
278-
ConflictDetection conflictDetection =
279-
new ConflictDetection(
280-
tableName,
281-
commitUser,
282-
partitionType,
283-
pathFactory(),
284-
newKeyComparator(),
285-
bucketMode(),
286-
options.deletionVectorsEnabled(),
287-
options.dataEvolutionEnabled(),
288-
newIndexFileHandler(),
289-
snapshotManager);
278+
ConflictDetection.Factory conflictDetectFactory =
279+
scanner ->
280+
new ConflictDetection(
281+
tableName,
282+
commitUser,
283+
partitionType,
284+
pathFactory(),
285+
newKeyComparator(),
286+
bucketMode(),
287+
options.deletionVectorsEnabled(),
288+
options.dataEvolutionEnabled(),
289+
newIndexFileHandler(),
290+
snapshotManager,
291+
scanner);
290292
StrictModeChecker strictModeChecker =
291293
StrictModeChecker.create(
292294
snapshotManager,
@@ -329,7 +331,7 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) {
329331
options.commitMaxRetryWait(),
330332
options.rowTrackingEnabled(),
331333
options.commitDiscardDuplicateFiles(),
332-
conflictDetection,
334+
conflictDetectFactory,
333335
strictModeChecker,
334336
rollback);
335337
}

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ public FileStoreCommitImpl(
197197
long commitMaxRetryWait,
198198
boolean rowTrackingEnabled,
199199
boolean discardDuplicateFiles,
200-
ConflictDetection conflictDetection,
200+
ConflictDetection.Factory conflictDetectFactory,
201201
@Nullable StrictModeChecker strictModeChecker,
202202
@Nullable CommitRollback rollback) {
203203
this.snapshotCommit = snapshotCommit;
@@ -239,8 +239,7 @@ public FileStoreCommitImpl(
239239
this.rowTrackingEnabled = rowTrackingEnabled;
240240
this.discardDuplicateFiles = discardDuplicateFiles;
241241
this.strictModeChecker = strictModeChecker;
242-
conflictDetection.setCommitScanner(scanner);
243-
this.conflictDetection = conflictDetection;
242+
this.conflictDetection = conflictDetectFactory.create(scanner);
244243
this.commitCleaner = new CommitCleaner(manifestList, manifestFile, indexManifestFile);
245244
}
246245

paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.paimon.Snapshot;
2222
import org.apache.paimon.Snapshot.CommitKind;
23+
import org.apache.paimon.annotation.VisibleForTesting;
2324
import org.apache.paimon.data.BinaryRow;
2425
import org.apache.paimon.data.InternalRow;
2526
import org.apache.paimon.index.DeletionVectorMeta;
@@ -82,8 +83,8 @@ public class ConflictDetection {
8283
private final boolean dataEvolutionEnabled;
8384
private final IndexFileHandler indexFileHandler;
8485
private final SnapshotManager snapshotManager;
86+
private final CommitScanner commitScanner;
8587

86-
private @Nullable CommitScanner scanner;
8788
private @Nullable PartitionExpire partitionExpire;
8889
private @Nullable Long rowIdCheckFromSnapshot = null;
8990

@@ -97,7 +98,8 @@ public ConflictDetection(
9798
boolean deletionVectorsEnabled,
9899
boolean dataEvolutionEnabled,
99100
IndexFileHandler indexFileHandler,
100-
SnapshotManager snapshotManager) {
101+
SnapshotManager snapshotManager,
102+
CommitScanner commitScanner) {
101103
this.tableName = tableName;
102104
this.commitUser = commitUser;
103105
this.partitionType = partitionType;
@@ -108,16 +110,13 @@ public ConflictDetection(
108110
this.dataEvolutionEnabled = dataEvolutionEnabled;
109111
this.indexFileHandler = indexFileHandler;
110112
this.snapshotManager = snapshotManager;
113+
this.commitScanner = commitScanner;
111114
}
112115

113116
public void setRowIdCheckFromSnapshot(@Nullable Long rowIdCheckFromSnapshot) {
114117
this.rowIdCheckFromSnapshot = rowIdCheckFromSnapshot;
115118
}
116119

117-
public void setCommitScanner(CommitScanner scanner) {
118-
this.scanner = scanner;
119-
}
120-
121120
@Nullable
122121
public Comparator<InternalRow> keyComparator() {
123122
return keyComparator;
@@ -378,7 +377,8 @@ private Optional<RuntimeException> checkConflictForPartitionExpire(
378377
return Optional.empty();
379378
}
380379

381-
private Optional<RuntimeException> checkForRowIdFromSnapshot(
380+
@VisibleForTesting
381+
Optional<RuntimeException> checkForRowIdFromSnapshot(
382382
Snapshot latestSnapshot,
383383
List<SimpleFileEntry> deltaEntries,
384384
List<IndexManifestEntry> deltaIndexEntries) {
@@ -401,14 +401,18 @@ private Optional<RuntimeException> checkForRowIdFromSnapshot(
401401
}
402402

403403
// check history row id ranges
404-
long checkNextRowId = snapshotManager.snapshot(rowIdCheckFromSnapshot).nextRowId();
404+
Long checkNextRowId = snapshotManager.snapshot(rowIdCheckFromSnapshot).nextRowId();
405+
checkState(
406+
checkNextRowId != null,
407+
"Next row id cannot be null for snapshot %s.",
408+
rowIdCheckFromSnapshot);
405409
for (long i = rowIdCheckFromSnapshot + 1; i <= latestSnapshot.id(); i++) {
406410
Snapshot snapshot = snapshotManager.snapshot(i);
407411
if (snapshot.commitKind() == CommitKind.COMPACT) {
408412
continue;
409413
}
410414
List<ManifestEntry> changes =
411-
scanner.readIncrementalEntries(snapshot, changedPartitions);
415+
commitScanner.readIncrementalEntries(snapshot, changedPartitions);
412416
for (ManifestEntry entry : changes) {
413417
DataFileMeta file = entry.file();
414418
long firstRowId = file.nonNullFirstRowId();
@@ -429,7 +433,8 @@ private Optional<RuntimeException> checkForRowIdFromSnapshot(
429433
return Optional.empty();
430434
}
431435

432-
private Optional<RuntimeException> checkRowIdRangeConflicts(
436+
@VisibleForTesting
437+
Optional<RuntimeException> checkRowIdRangeConflicts(
433438
CommitKind commitKind, Collection<SimpleFileEntry> mergedEntries) {
434439
if (!dataEvolutionEnabled) {
435440
return Optional.empty();
@@ -694,4 +699,9 @@ public int hashCode() {
694699
return Objects.hash(partition, bucket, level);
695700
}
696701
}
702+
703+
/** Factory to create {@link ConflictDetection}. */
704+
public interface Factory {
705+
ConflictDetection create(CommitScanner scanner);
706+
}
697707
}

0 commit comments

Comments
 (0)