Skip to content

Commit 95087a4

Browse files
committed
[core] Introduce 'commit.discard-duplicate-files' to make append safe
1 parent 966cdd0 commit 95087a4

File tree

5 files changed

+155
-23
lines changed

5 files changed

+155
-23
lines changed

docs/layouts/shortcodes/generated/core_configuration.html

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,12 @@
188188
<td>String</td>
189189
<td>A list of commit callback classes to be called after a successful commit. Class names are connected with comma (example: com.test.CallbackA,com.sample.CallbackB).</td>
190190
</tr>
191+
<tr>
192+
<td><h5>commit.discard-duplicate-files</h5></td>
193+
<td style="word-wrap: break-word;">false</td>
194+
<td>Boolean</td>
195+
<td>Whether discard duplicate files in commit.</td>
196+
</tr>
191197
<tr>
192198
<td><h5>commit.force-compact</h5></td>
193199
<td style="word-wrap: break-word;">false</td>

paimon-api/src/main/java/org/apache/paimon/CoreOptions.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2020,6 +2020,12 @@ public InlineElement getDescription() {
20202020
.withDescription(
20212021
"Write blob field using blob descriptor rather than blob bytes.");
20222022

2023+
public static final ConfigOption<Boolean> COMMIT_DISCARD_DUPLICATE_FILES =
2024+
key("commit.discard-duplicate-files")
2025+
.booleanType()
2026+
.defaultValue(false)
2027+
.withDescription("Whether discard duplicate files in commit.");
2028+
20232029
private final Options options;
20242030

20252031
public CoreOptions(Map<String, String> options) {
@@ -2919,6 +2925,10 @@ public Map<String, String> tagCallbacks() {
29192925
return callbacks(TAG_CALLBACKS, TAG_CALLBACK_PARAM);
29202926
}
29212927

2928+
public boolean commitDiscardDuplicateFiles() {
2929+
return options.get(COMMIT_DISCARD_DUPLICATE_FILES);
2930+
}
2931+
29222932
private Map<String, String> callbacks(
29232933
ConfigOption<String> callbacks, ConfigOption<String> callbackParam) {
29242934
Map<String, String> result = new HashMap<>();

paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,7 @@ public FileStoreCommitImpl newCommit(String commitUser, FileStoreTable table) {
309309
options.commitMaxRetryWait(),
310310
options.commitStrictModeLastSafeSnapshot().orElse(null),
311311
options.rowTrackingEnabled(),
312+
options.commitDiscardDuplicateFiles(),
312313
conflictDetection);
313314
}
314315

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java

Lines changed: 59 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ public class FileStoreCommitImpl implements FileStoreCommit {
149149
@Nullable private Long strictModeLastSafeSnapshot;
150150
private final InternalRowPartitionComputer partitionComputer;
151151
private final boolean rowTrackingEnabled;
152+
private final boolean discardDuplicateFiles;
152153
private final ConflictDetection conflictDetection;
153154

154155
private boolean ignoreEmptyCommit;
@@ -185,6 +186,7 @@ public FileStoreCommitImpl(
185186
long commitMaxRetryWait,
186187
@Nullable Long strictModeLastSafeSnapshot,
187188
boolean rowTrackingEnabled,
189+
boolean discardDuplicateFiles,
188190
ConflictDetection conflictDetection) {
189191
this.snapshotCommit = snapshotCommit;
190192
this.fileIO = fileIO;
@@ -228,6 +230,7 @@ public FileStoreCommitImpl(
228230
this.statsFileHandler = statsFileHandler;
229231
this.bucketMode = bucketMode;
230232
this.rowTrackingEnabled = rowTrackingEnabled;
233+
this.discardDuplicateFiles = discardDuplicateFiles;
231234
this.conflictDetection = conflictDetection;
232235
}
233236

@@ -284,6 +287,10 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
284287
LOG.debug("Ready to commit\n{}", committable);
285288
}
286289

290+
if (discardDuplicateFiles) {
291+
checkAppendFiles = true;
292+
}
293+
287294
long started = System.nanoTime();
288295
int generatedSnapshot = 0;
289296
int attempts = 0;
@@ -336,6 +343,20 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
336343
appendTableFiles,
337344
compactTableFiles,
338345
appendIndexFiles)));
346+
if (discardDuplicateFiles && commitKind == CommitKind.APPEND) {
347+
Set<FileEntry.Identifier> baseIdentifiers =
348+
baseEntries.stream()
349+
.map(FileEntry::identifier)
350+
.collect(Collectors.toSet());
351+
appendTableFiles =
352+
appendTableFiles.stream()
353+
.filter(
354+
entry ->
355+
!baseIdentifiers.contains(
356+
entry.identifier()))
357+
.collect(Collectors.toList());
358+
appendSimpleEntries = SimpleFileEntry.from(appendTableFiles);
359+
}
339360
conflictDetection.checkNoConflictsOrFail(
340361
latestSnapshot,
341362
baseEntries,
@@ -1003,30 +1024,46 @@ CommitResult tryCommitOnce(
10031024
}
10041025

10051026
List<SimpleFileEntry> baseDataFiles = new ArrayList<>();
1006-
if (latestSnapshot != null && conflictCheck.shouldCheck(latestSnapshot.id())) {
1007-
// latestSnapshotId is different from the snapshot id we've checked for conflicts,
1008-
// so we have to check again
1009-
List<BinaryRow> changedPartitions =
1010-
changedPartitions(deltaFiles, Collections.emptyList(), indexFiles);
1011-
if (retryResult != null && retryResult.latestSnapshot != null) {
1012-
baseDataFiles = new ArrayList<>(retryResult.baseDataFiles);
1013-
List<SimpleFileEntry> incremental =
1014-
readIncrementalChanges(
1015-
retryResult.latestSnapshot, latestSnapshot, changedPartitions);
1016-
if (!incremental.isEmpty()) {
1017-
baseDataFiles.addAll(incremental);
1018-
baseDataFiles = new ArrayList<>(FileEntry.mergeEntries(baseDataFiles));
1027+
if (latestSnapshot != null) {
1028+
boolean checkDuplicate = discardDuplicateFiles && commitKind == CommitKind.APPEND;
1029+
boolean checkConflict = conflictCheck.shouldCheck(latestSnapshot.id());
1030+
if (checkDuplicate || checkConflict) {
1031+
// latestSnapshotId is different from the snapshot id we've checked for conflicts,
1032+
// so we have to check again
1033+
List<BinaryRow> changedPartitions =
1034+
changedPartitions(deltaFiles, Collections.emptyList(), indexFiles);
1035+
if (retryResult != null && retryResult.latestSnapshot != null) {
1036+
baseDataFiles = new ArrayList<>(retryResult.baseDataFiles);
1037+
List<SimpleFileEntry> incremental =
1038+
readIncrementalChanges(
1039+
retryResult.latestSnapshot, latestSnapshot, changedPartitions);
1040+
if (!incremental.isEmpty()) {
1041+
baseDataFiles.addAll(incremental);
1042+
baseDataFiles = new ArrayList<>(FileEntry.mergeEntries(baseDataFiles));
1043+
}
1044+
} else {
1045+
baseDataFiles =
1046+
readAllEntriesFromChangedPartitions(latestSnapshot, changedPartitions);
1047+
}
1048+
if (checkDuplicate) {
1049+
Set<FileEntry.Identifier> baseIdentifiers =
1050+
baseDataFiles.stream()
1051+
.map(FileEntry::identifier)
1052+
.collect(Collectors.toSet());
1053+
deltaFiles =
1054+
deltaFiles.stream()
1055+
.filter(entry -> !baseIdentifiers.contains(entry.identifier()))
1056+
.collect(Collectors.toList());
1057+
}
1058+
if (checkConflict) {
1059+
conflictDetection.checkNoConflictsOrFail(
1060+
latestSnapshot,
1061+
baseDataFiles,
1062+
SimpleFileEntry.from(deltaFiles),
1063+
indexFiles,
1064+
commitKind);
10191065
}
1020-
} else {
1021-
baseDataFiles =
1022-
readAllEntriesFromChangedPartitions(latestSnapshot, changedPartitions);
10231066
}
1024-
conflictDetection.checkNoConflictsOrFail(
1025-
latestSnapshot,
1026-
baseDataFiles,
1027-
SimpleFileEntry.from(deltaFiles),
1028-
indexFiles,
1029-
commitKind);
10301067
}
10311068

10321069
Snapshot newSnapshot;

paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@
5050
import org.apache.paimon.schema.SchemaManager;
5151
import org.apache.paimon.schema.SchemaUtils;
5252
import org.apache.paimon.schema.TableSchema;
53+
import org.apache.paimon.table.sink.BatchTableCommit;
5354
import org.apache.paimon.table.sink.BatchTableWrite;
55+
import org.apache.paimon.table.sink.BatchWriteBuilder;
5456
import org.apache.paimon.table.sink.CommitMessage;
5557
import org.apache.paimon.table.sink.StreamTableCommit;
5658
import org.apache.paimon.table.sink.StreamTableWrite;
@@ -85,6 +87,10 @@
8587
import java.util.PriorityQueue;
8688
import java.util.Random;
8789
import java.util.UUID;
90+
import java.util.concurrent.ExecutorService;
91+
import java.util.concurrent.Executors;
92+
import java.util.concurrent.Future;
93+
import java.util.concurrent.ThreadLocalRandom;
8894
import java.util.concurrent.atomic.AtomicInteger;
8995
import java.util.function.Consumer;
9096
import java.util.function.Function;
@@ -104,12 +110,84 @@
104110
import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING;
105111
import static org.assertj.core.api.Assertions.assertThat;
106112
import static org.assertj.core.api.Assertions.assertThatThrownBy;
113+
import static org.junit.jupiter.api.Assertions.assertTrue;
107114

108115
/** Tests for {@link AppendOnlyFileStoreTable}. */
109116
public class AppendOnlySimpleTableTest extends SimpleTableTestBase {
110117

111118
@Test
112-
public void testMultipleWriters() throws Exception {
119+
public void testDiscardDuplicateFiles() throws Exception {
120+
FileStoreTable table =
121+
createFileStoreTable(
122+
options -> options.set(CoreOptions.COMMIT_DISCARD_DUPLICATE_FILES, true));
123+
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
124+
List<CommitMessage> commitMessages;
125+
try (BatchTableWrite write = writeBuilder.newWrite()) {
126+
write.write(rowData(1, 10, 100L));
127+
commitMessages = write.prepareCommit();
128+
}
129+
Runnable doCommit =
130+
() -> {
131+
try (BatchTableCommit commit = writeBuilder.newCommit()) {
132+
commit.commit(commitMessages);
133+
} catch (Exception e) {
134+
throw new RuntimeException(e);
135+
}
136+
};
137+
138+
doCommit.run();
139+
doCommit.run();
140+
List<Split> splits = table.newReadBuilder().newScan().plan().splits();
141+
assertThat(splits.size()).isEqualTo(1);
142+
assertThat(splits.get(0).convertToRawFiles()).map(List::size).get().isEqualTo(1);
143+
}
144+
145+
@Test
146+
public void testDiscardDuplicateFilesMultiThread() throws Exception {
147+
FileStoreTable table =
148+
createFileStoreTable(
149+
options -> options.set(CoreOptions.COMMIT_DISCARD_DUPLICATE_FILES, true));
150+
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
151+
List<List<CommitMessage>> messages = new ArrayList<>();
152+
for (int i = 0; i < 10; i++) {
153+
try (BatchTableWrite write = writeBuilder.newWrite()) {
154+
write.write(rowData(1, 10, 100L));
155+
messages.add(write.prepareCommit());
156+
}
157+
}
158+
Runnable doCommit =
159+
() -> {
160+
ThreadLocalRandom rnd = ThreadLocalRandom.current();
161+
for (int i = 0; i < 10; i++) {
162+
try (BatchTableCommit commit = writeBuilder.newCommit()) {
163+
commit.commit(messages.get(rnd.nextInt(messages.size())));
164+
} catch (Exception e) {
165+
throw new RuntimeException(e);
166+
}
167+
}
168+
};
169+
170+
Runnable asserter =
171+
() -> {
172+
List<Split> splits = table.newReadBuilder().newScan().plan().splits();
173+
assertThat(splits.size()).isEqualTo(1);
174+
assertTrue(splits.get(0).convertToRawFiles().get().size() <= 10);
175+
};
176+
177+
// test multiple threads
178+
ExecutorService pool = Executors.newCachedThreadPool();
179+
List<Future<?>> futures = new ArrayList<>();
180+
for (int i = 0; i < 10; i++) {
181+
futures.add(pool.submit(doCommit));
182+
}
183+
for (Future<?> future : futures) {
184+
future.get();
185+
}
186+
asserter.run();
187+
}
188+
189+
@Test
190+
public void testDynamicBucketNoSelector() throws Exception {
113191
assertThat(
114192
createFileStoreTable(options -> options.set("bucket", "-1"))
115193
.newBatchWriteBuilder()

0 commit comments

Comments
 (0)