Skip to content

Commit 191d539

Browse files
committed
[core] Extract StrictModeChecker from ConflictDetection in Commit
1 parent 4aa3598 commit 191d539

File tree

5 files changed

+159
-106
lines changed

5 files changed

+159
-106
lines changed

paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.paimon.operation.SnapshotDeletion;
4545
import org.apache.paimon.operation.TagDeletion;
4646
import org.apache.paimon.operation.commit.ConflictDetection;
47+
import org.apache.paimon.operation.commit.StrictModeChecker;
4748
import org.apache.paimon.partition.PartitionExpireStrategy;
4849
import org.apache.paimon.schema.SchemaManager;
4950
import org.apache.paimon.schema.TableSchema;
@@ -279,8 +280,13 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) {
279280
newKeyComparator(),
280281
bucketMode(),
281282
options.deletionVectorsEnabled(),
282-
newIndexFileHandler(),
283-
newScan());
283+
newIndexFileHandler());
284+
StrictModeChecker strictModeChecker =
285+
StrictModeChecker.create(
286+
snapshotManager,
287+
commitUser,
288+
this::newScan,
289+
options.commitStrictModeLastSafeSnapshot().orElse(null));
284290
return new FileStoreCommitImpl(
285291
snapshotCommit,
286292
fileIO,
@@ -310,10 +316,10 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) {
310316
options.commitTimeout(),
311317
options.commitMinRetryWait(),
312318
options.commitMaxRetryWait(),
313-
options.commitStrictModeLastSafeSnapshot().orElse(null),
314319
options.rowTrackingEnabled(),
315320
options.commitDiscardDuplicateFiles(),
316-
conflictDetection);
321+
conflictDetection,
322+
strictModeChecker);
317323
}
318324

319325
@Override

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.paimon.operation.commit.ManifestEntryChanges;
5151
import org.apache.paimon.operation.commit.RetryCommitResult;
5252
import org.apache.paimon.operation.commit.RowTrackingCommitUtils.RowTrackingAssigned;
53+
import org.apache.paimon.operation.commit.StrictModeChecker;
5354
import org.apache.paimon.operation.commit.SuccessCommitResult;
5455
import org.apache.paimon.operation.metrics.CommitMetrics;
5556
import org.apache.paimon.operation.metrics.CommitStats;
@@ -158,10 +159,10 @@ public class FileStoreCommitImpl implements FileStoreCommit {
158159
private final long commitMinRetryWait;
159160
private final long commitMaxRetryWait;
160161
private final int commitMaxRetries;
161-
@Nullable private Long strictModeLastSafeSnapshot;
162162
private final InternalRowPartitionComputer partitionComputer;
163163
private final boolean rowTrackingEnabled;
164164
private final boolean discardDuplicateFiles;
165+
@Nullable private final StrictModeChecker strictModeChecker;
165166
private final ConflictDetection conflictDetection;
166167
private final CommitCleaner commitCleaner;
167168

@@ -198,10 +199,10 @@ public FileStoreCommitImpl(
198199
long commitTimeout,
199200
long commitMinRetryWait,
200201
long commitMaxRetryWait,
201-
@Nullable Long strictModeLastSafeSnapshot,
202202
boolean rowTrackingEnabled,
203203
boolean discardDuplicateFiles,
204-
ConflictDetection conflictDetection) {
204+
ConflictDetection conflictDetection,
205+
@Nullable StrictModeChecker strictModeChecker) {
205206
this.snapshotCommit = snapshotCommit;
206207
this.fileIO = fileIO;
207208
this.schemaManager = schemaManager;
@@ -228,7 +229,6 @@ public FileStoreCommitImpl(
228229
this.commitTimeout = commitTimeout;
229230
this.commitMinRetryWait = commitMinRetryWait;
230231
this.commitMaxRetryWait = commitMaxRetryWait;
231-
this.strictModeLastSafeSnapshot = strictModeLastSafeSnapshot;
232232
this.partitionComputer =
233233
new InternalRowPartitionComputer(
234234
options.partitionDefaultName(),
@@ -241,6 +241,7 @@ public FileStoreCommitImpl(
241241
this.bucketMode = bucketMode;
242242
this.rowTrackingEnabled = rowTrackingEnabled;
243243
this.discardDuplicateFiles = discardDuplicateFiles;
244+
this.strictModeChecker = strictModeChecker;
244245
this.conflictDetection = conflictDetection;
245246
this.commitCleaner = new CommitCleaner(manifestList, manifestFile, indexManifestFile);
246247
}
@@ -873,10 +874,9 @@ CommitResult tryCommitOnce(
873874
}
874875
}
875876

876-
if (strictModeLastSafeSnapshot != null && strictModeLastSafeSnapshot >= 0) {
877-
conflictDetection.commitStrictModeCheck(
878-
strictModeLastSafeSnapshot, newSnapshotId, commitKind, snapshotManager);
879-
strictModeLastSafeSnapshot = newSnapshotId - 1;
877+
if (strictModeChecker != null) {
878+
strictModeChecker.check(newSnapshotId, commitKind);
879+
strictModeChecker.update(newSnapshotId - 1);
880880
}
881881

882882
if (LOG.isDebugEnabled()) {
@@ -1084,8 +1084,8 @@ CommitResult tryCommitOnce(
10841084
commitUser,
10851085
identifier,
10861086
commitKind.name());
1087-
if (strictModeLastSafeSnapshot != null) {
1088-
strictModeLastSafeSnapshot = newSnapshot.id();
1087+
if (strictModeChecker != null) {
1088+
strictModeChecker.update(newSnapshotId);
10891089
}
10901090
final List<SimpleFileEntry> finalBaseFiles = baseDataFiles;
10911091
final List<ManifestEntry> finalDeltaFiles = deltaFiles;

paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java

Lines changed: 1 addition & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.paimon.operation.commit;
2020

21-
import org.apache.paimon.CoreOptions;
2221
import org.apache.paimon.Snapshot;
2322
import org.apache.paimon.Snapshot.CommitKind;
2423
import org.apache.paimon.data.BinaryRow;
@@ -29,17 +28,13 @@
2928
import org.apache.paimon.manifest.FileEntry;
3029
import org.apache.paimon.manifest.FileKind;
3130
import org.apache.paimon.manifest.IndexManifestEntry;
32-
import org.apache.paimon.manifest.ManifestEntry;
3331
import org.apache.paimon.manifest.SimpleFileEntry;
3432
import org.apache.paimon.manifest.SimpleFileEntryWithDV;
35-
import org.apache.paimon.operation.FileStoreScan;
3633
import org.apache.paimon.operation.PartitionExpire;
3734
import org.apache.paimon.table.BucketMode;
38-
import org.apache.paimon.table.source.ScanMode;
3935
import org.apache.paimon.types.RowType;
4036
import org.apache.paimon.utils.FileStorePathFactory;
4137
import org.apache.paimon.utils.Pair;
42-
import org.apache.paimon.utils.SnapshotManager;
4338

4439
import org.slf4j.Logger;
4540
import org.slf4j.LoggerFactory;
@@ -52,7 +47,6 @@
5247
import java.util.Comparator;
5348
import java.util.HashMap;
5449
import java.util.HashSet;
55-
import java.util.Iterator;
5650
import java.util.LinkedHashMap;
5751
import java.util.List;
5852
import java.util.Map;
@@ -77,7 +71,6 @@ public class ConflictDetection {
7771
private final BucketMode bucketMode;
7872
private final boolean deletionVectorsEnabled;
7973
private final IndexFileHandler indexFileHandler;
80-
private final FileStoreScan scan;
8174

8275
private @Nullable PartitionExpire partitionExpire;
8376

@@ -89,8 +82,7 @@ public ConflictDetection(
8982
@Nullable Comparator<InternalRow> keyComparator,
9083
BucketMode bucketMode,
9184
boolean deletionVectorsEnabled,
92-
IndexFileHandler indexFileHandler,
93-
FileStoreScan scan) {
85+
IndexFileHandler indexFileHandler) {
9486
this.tableName = tableName;
9587
this.commitUser = commitUser;
9688
this.partitionType = partitionType;
@@ -99,7 +91,6 @@ public ConflictDetection(
9991
this.bucketMode = bucketMode;
10092
this.deletionVectorsEnabled = deletionVectorsEnabled;
10193
this.indexFileHandler = indexFileHandler;
102-
this.scan = scan;
10394
}
10495

10596
@Nullable
@@ -111,58 +102,6 @@ public void withPartitionExpire(PartitionExpire partitionExpire) {
111102
this.partitionExpire = partitionExpire;
112103
}
113104

114-
public void commitStrictModeCheck(
115-
@Nullable Long strictModeLastSafeSnapshot,
116-
long newSnapshotId,
117-
CommitKind newCommitKind,
118-
SnapshotManager snapshotManager) {
119-
if (strictModeLastSafeSnapshot == null || strictModeLastSafeSnapshot < 0) {
120-
return;
121-
}
122-
123-
for (long id = strictModeLastSafeSnapshot + 1; id < newSnapshotId; id++) {
124-
Snapshot snapshot = snapshotManager.snapshot(id);
125-
if (snapshot.commitUser().equals(commitUser)) {
126-
continue;
127-
}
128-
if (snapshot.commitKind() == CommitKind.COMPACT
129-
|| snapshot.commitKind() == CommitKind.OVERWRITE) {
130-
throw new RuntimeException(
131-
String.format(
132-
"When trying to commit snapshot %d, "
133-
+ "commit user %s has found a %s snapshot (id: %d) by another user %s. "
134-
+ "Giving up committing as %s is set.",
135-
newSnapshotId,
136-
commitUser,
137-
snapshot.commitKind().name(),
138-
id,
139-
snapshot.commitUser(),
140-
CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key()));
141-
}
142-
if (snapshot.commitKind() == CommitKind.APPEND
143-
&& newCommitKind == CommitKind.OVERWRITE) {
144-
Iterator<ManifestEntry> entries =
145-
scan.withSnapshot(snapshot)
146-
.withKind(ScanMode.DELTA)
147-
.onlyReadRealBuckets()
148-
.dropStats()
149-
.readFileIterator();
150-
if (entries.hasNext()) {
151-
throw new RuntimeException(
152-
String.format(
153-
"When trying to commit snapshot %d, "
154-
+ "commit user %s has found a APPEND snapshot (id: %d) by another user %s "
155-
+ "which committed files to fixed bucket. Giving up committing as %s is set.",
156-
newSnapshotId,
157-
commitUser,
158-
id,
159-
snapshot.commitUser(),
160-
CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key()));
161-
}
162-
}
163-
}
164-
}
165-
166105
public void checkNoConflictsOrFail(
167106
Snapshot snapshot,
168107
List<SimpleFileEntry> baseEntries,
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.operation.commit;
20+
21+
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.Snapshot;
23+
import org.apache.paimon.Snapshot.CommitKind;
24+
import org.apache.paimon.manifest.ManifestEntry;
25+
import org.apache.paimon.operation.FileStoreScan;
26+
import org.apache.paimon.table.source.ScanMode;
27+
import org.apache.paimon.utils.SnapshotManager;
28+
29+
import javax.annotation.Nullable;
30+
31+
import java.util.Iterator;
32+
import java.util.function.Supplier;
33+
34+
/** A checker to check strict mode based on last safe snapshot. */
35+
public class StrictModeChecker {
36+
37+
private final SnapshotManager snapshotManager;
38+
private final String commitUser;
39+
private final FileStoreScan scan;
40+
41+
private long strictModeLastSafeSnapshot;
42+
43+
public StrictModeChecker(
44+
SnapshotManager snapshotManager,
45+
String commitUser,
46+
FileStoreScan scan,
47+
long strictModeLastSafeSnapshot) {
48+
this.snapshotManager = snapshotManager;
49+
this.commitUser = commitUser;
50+
this.scan = scan;
51+
this.strictModeLastSafeSnapshot = strictModeLastSafeSnapshot;
52+
}
53+
54+
@Nullable
55+
public static StrictModeChecker create(
56+
SnapshotManager snapshotManager,
57+
String commitUser,
58+
Supplier<FileStoreScan> scanSupplier,
59+
@Nullable Long strictModeLastSafeSnapshot) {
60+
if (strictModeLastSafeSnapshot == null) {
61+
return null;
62+
}
63+
return new StrictModeChecker(
64+
snapshotManager, commitUser, scanSupplier.get(), strictModeLastSafeSnapshot);
65+
}
66+
67+
public void check(long newSnapshotId, CommitKind newCommitKind) {
68+
for (long id = strictModeLastSafeSnapshot + 1; id < newSnapshotId; id++) {
69+
Snapshot snapshot = snapshotManager.snapshot(id);
70+
if (snapshot.commitUser().equals(commitUser)) {
71+
continue;
72+
}
73+
if (snapshot.commitKind() == CommitKind.COMPACT
74+
|| snapshot.commitKind() == CommitKind.OVERWRITE) {
75+
throw new RuntimeException(
76+
String.format(
77+
"When trying to commit snapshot %d, "
78+
+ "commit user %s has found a %s snapshot (id: %d) by another user %s. "
79+
+ "Giving up committing as %s is set.",
80+
newSnapshotId,
81+
commitUser,
82+
snapshot.commitKind().name(),
83+
id,
84+
snapshot.commitUser(),
85+
CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key()));
86+
}
87+
if (snapshot.commitKind() == CommitKind.APPEND
88+
&& newCommitKind == CommitKind.OVERWRITE) {
89+
Iterator<ManifestEntry> entries =
90+
scan.withSnapshot(snapshot)
91+
.withKind(ScanMode.DELTA)
92+
.onlyReadRealBuckets()
93+
.dropStats()
94+
.readFileIterator();
95+
if (entries.hasNext()) {
96+
throw new RuntimeException(
97+
String.format(
98+
"When trying to commit snapshot %d, "
99+
+ "commit user %s has found a APPEND snapshot (id: %d) by another user %s "
100+
+ "which committed files to fixed bucket. Giving up committing as %s is set.",
101+
newSnapshotId,
102+
commitUser,
103+
id,
104+
snapshot.commitUser(),
105+
CoreOptions.COMMIT_STRICT_MODE_LAST_SAFE_SNAPSHOT.key()));
106+
}
107+
}
108+
}
109+
}
110+
111+
public void update(long newSafeSnapshot) {
112+
strictModeLastSafeSnapshot = newSafeSnapshot;
113+
}
114+
}

0 commit comments

Comments
 (0)