Skip to content

Commit 606d5c0

Browse files
committed
[core] Overwrite commit should never be conflicted by delete files
1 parent 985b733 commit 606d5c0

File tree

2 files changed

+145
-32
lines changed

2 files changed

+145
-32
lines changed

paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java

Lines changed: 54 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -369,9 +369,7 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
369369

370370
attempts +=
371371
tryCommit(
372-
appendTableFiles,
373-
appendChangelog,
374-
appendIndexFiles,
372+
provider(appendTableFiles, appendChangelog, appendIndexFiles),
375373
committable.identifier(),
376374
committable.watermark(),
377375
committable.logOffsets(),
@@ -406,9 +404,7 @@ public int commit(ManifestCommittable committable, boolean checkAppendFiles) {
406404

407405
attempts +=
408406
tryCommit(
409-
compactTableFiles,
410-
compactChangelog,
411-
compactIndexFiles,
407+
provider(compactTableFiles, compactChangelog, compactIndexFiles),
412408
committable.identifier(),
413409
committable.watermark(),
414410
committable.logOffsets(),
@@ -577,9 +573,7 @@ public int overwritePartition(
577573
if (!compactTableFiles.isEmpty() || !compactIndexFiles.isEmpty()) {
578574
attempts +=
579575
tryCommit(
580-
compactTableFiles,
581-
emptyList(),
582-
compactIndexFiles,
576+
provider(compactTableFiles, emptyList(), compactIndexFiles),
583577
committable.identifier(),
584578
committable.watermark(),
585579
committable.logOffsets(),
@@ -687,9 +681,7 @@ public FileStoreCommit withMetrics(CommitMetrics metrics) {
687681
public void commitStatistics(Statistics stats, long commitIdentifier) {
688682
String statsFileName = statsFileHandler.writeStats(stats);
689683
tryCommit(
690-
emptyList(),
691-
emptyList(),
692-
emptyList(),
684+
provider(emptyList(), emptyList(), emptyList()),
693685
commitIdentifier,
694686
null,
695687
Collections.emptyMap(),
@@ -830,9 +822,7 @@ private ManifestEntry makeEntry(FileKind kind, CommitMessage commitMessage, Data
830822
}
831823

832824
private int tryCommit(
833-
List<ManifestEntry> tableFiles,
834-
List<ManifestEntry> changelogFiles,
835-
List<IndexManifestEntry> indexFiles,
825+
ChangesProvider changesProvider,
836826
long identifier,
837827
@Nullable Long watermark,
838828
Map<Integer, Long> logOffsets,
@@ -845,12 +835,13 @@ private int tryCommit(
845835
long startMillis = System.currentTimeMillis();
846836
while (true) {
847837
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
838+
CommitChanges changes = changesProvider.provide(latestSnapshot);
848839
CommitResult result =
849840
tryCommitOnce(
850841
retryResult,
851-
tableFiles,
852-
changelogFiles,
853-
indexFiles,
842+
changes.tableFiles,
843+
changes.changelogFiles,
844+
changes.indexFiles,
854845
identifier,
855846
watermark,
856847
logOffsets,
@@ -895,8 +886,23 @@ private int tryOverwritePartition(
895886
@Nullable Long watermark,
896887
Map<Integer, Long> logOffsets,
897888
Map<String, String> properties) {
898-
// collect all files with overwrite
899-
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
889+
return tryCommit(
890+
latestSnapshot ->
891+
overwriteChanges(changes, indexFiles, latestSnapshot, partitionFilter),
892+
identifier,
893+
watermark,
894+
logOffsets,
895+
properties,
896+
CommitKind.OVERWRITE,
897+
mustConflictCheck(),
898+
null);
899+
}
900+
901+
private CommitChanges overwriteChanges(
902+
List<ManifestEntry> changes,
903+
List<IndexManifestEntry> indexFiles,
904+
@Nullable Snapshot latestSnapshot,
905+
@Nullable PartitionPredicate partitionFilter) {
900906
List<ManifestEntry> changesWithOverwrite = new ArrayList<>();
901907
List<IndexManifestEntry> indexChangesWithOverwrite = new ArrayList<>();
902908
if (latestSnapshot != null) {
@@ -931,18 +937,7 @@ private int tryOverwritePartition(
931937
}
932938
changesWithOverwrite.addAll(changes);
933939
indexChangesWithOverwrite.addAll(indexFiles);
934-
935-
return tryCommit(
936-
changesWithOverwrite,
937-
emptyList(),
938-
indexChangesWithOverwrite,
939-
identifier,
940-
watermark,
941-
logOffsets,
942-
properties,
943-
CommitKind.OVERWRITE,
944-
mustConflictCheck(),
945-
null);
940+
return new CommitChanges(changesWithOverwrite, emptyList(), indexChangesWithOverwrite);
946941
}
947942

948943
@VisibleForTesting
@@ -1547,4 +1542,31 @@ public boolean isSuccess() {
15471542
return false;
15481543
}
15491544
}
1545+
1546+
private static ChangesProvider provider(
1547+
List<ManifestEntry> tableFiles,
1548+
List<ManifestEntry> changelogFiles,
1549+
List<IndexManifestEntry> indexFiles) {
1550+
return s -> new CommitChanges(tableFiles, changelogFiles, indexFiles);
1551+
}
1552+
1553+
private interface ChangesProvider {
1554+
CommitChanges provide(@Nullable Snapshot latestSnapshot);
1555+
}
1556+
1557+
private static class CommitChanges {
1558+
1559+
private final List<ManifestEntry> tableFiles;
1560+
private final List<ManifestEntry> changelogFiles;
1561+
private final List<IndexManifestEntry> indexFiles;
1562+
1563+
private CommitChanges(
1564+
List<ManifestEntry> tableFiles,
1565+
List<ManifestEntry> changelogFiles,
1566+
List<IndexManifestEntry> indexFiles) {
1567+
this.tableFiles = tableFiles;
1568+
this.changelogFiles = changelogFiles;
1569+
this.indexFiles = indexFiles;
1570+
}
1571+
}
15501572
}

paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.paimon.table;
2020

2121
import org.apache.paimon.CoreOptions;
22+
import org.apache.paimon.append.AppendCompactTask;
2223
import org.apache.paimon.bucket.DefaultBucketFunction;
2324
import org.apache.paimon.data.BinaryRow;
2425
import org.apache.paimon.data.BinaryString;
@@ -37,6 +38,8 @@
3738
import org.apache.paimon.io.BundleRecords;
3839
import org.apache.paimon.io.DataFileMeta;
3940
import org.apache.paimon.manifest.ManifestFileMeta;
41+
import org.apache.paimon.operation.BaseAppendFileStoreWrite;
42+
import org.apache.paimon.operation.FileStoreWrite;
4043
import org.apache.paimon.options.MemorySize;
4144
import org.apache.paimon.options.Options;
4245
import org.apache.paimon.predicate.Equal;
@@ -57,6 +60,7 @@
5760
import org.apache.paimon.table.sink.CommitMessage;
5861
import org.apache.paimon.table.sink.StreamTableCommit;
5962
import org.apache.paimon.table.sink.StreamTableWrite;
63+
import org.apache.paimon.table.sink.TableWriteImpl;
6064
import org.apache.paimon.table.source.DataSplit;
6165
import org.apache.paimon.table.source.ReadBuilder;
6266
import org.apache.paimon.table.source.ScanMode;
@@ -93,6 +97,7 @@
9397
import java.util.concurrent.Future;
9498
import java.util.concurrent.ThreadLocalRandom;
9599
import java.util.concurrent.atomic.AtomicInteger;
100+
import java.util.concurrent.atomic.AtomicReference;
96101
import java.util.function.Consumer;
97102
import java.util.function.Function;
98103
import java.util.stream.Collectors;
@@ -116,6 +121,92 @@
116121
/** Tests for {@link AppendOnlyFileStoreTable}. */
117122
public class AppendOnlySimpleTableTest extends SimpleTableTestBase {
118123

124+
@Test
125+
public void testOverwriteNeverFail() throws Exception {
126+
FileStoreTable table = createFileStoreTable();
127+
128+
Runnable writeRecord =
129+
() -> {
130+
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
131+
try (BatchTableWrite write = writeBuilder.newWrite();
132+
BatchTableCommit commit = writeBuilder.newCommit()) {
133+
write.write(rowData(1, 10, 100L));
134+
commit.commit(write.prepareCommit());
135+
} catch (Exception e) {
136+
throw new RuntimeException(e);
137+
}
138+
};
139+
140+
Runnable overwrite =
141+
() -> {
142+
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder().withOverwrite();
143+
try (BatchTableWrite write = writeBuilder.newWrite();
144+
BatchTableCommit commit = writeBuilder.newCommit()) {
145+
write.write(rowData(1, 10, 100L));
146+
commit.commit(write.prepareCommit());
147+
} catch (Exception e) {
148+
throw new RuntimeException(e);
149+
}
150+
};
151+
152+
Runnable compact =
153+
() -> {
154+
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder().withOverwrite();
155+
try (BatchTableWrite write = writeBuilder.newWrite();
156+
BatchTableCommit commit = writeBuilder.newCommit()) {
157+
List<DataSplit> splits =
158+
(List) table.newReadBuilder().newScan().plan().splits();
159+
List<DataFileMeta> files =
160+
splits.stream()
161+
.flatMap(s -> s.dataFiles().stream())
162+
.collect(Collectors.toList());
163+
FileStoreWrite fileStoreWrite = ((TableWriteImpl) write).getWrite();
164+
CommitMessage commitMessage =
165+
new AppendCompactTask(splits.get(0).partition(), files)
166+
.doCompact(
167+
table, (BaseAppendFileStoreWrite) fileStoreWrite);
168+
commit.commit(Collections.singletonList(commitMessage));
169+
} catch (Exception e) {
170+
throw new RuntimeException(e);
171+
}
172+
};
173+
174+
AtomicReference<Exception> exception = new AtomicReference<>();
175+
176+
Thread thread1 =
177+
new Thread(
178+
() -> {
179+
for (int i = 0; i < 10; i++) {
180+
try {
181+
writeRecord.run();
182+
overwrite.run();
183+
} catch (Exception e) {
184+
exception.set(e);
185+
}
186+
}
187+
});
188+
189+
Thread thread2 =
190+
new Thread(
191+
() -> {
192+
for (int i = 0; i < 10; i++) {
193+
try {
194+
writeRecord.run();
195+
compact.run();
196+
} catch (Exception ignored) {
197+
}
198+
}
199+
});
200+
201+
thread1.start();
202+
thread2.start();
203+
204+
thread1.join();
205+
thread2.join();
206+
207+
assertThat(exception.get()).isNull();
208+
}
209+
119210
@Test
120211
public void testDiscardDuplicateFiles() throws Exception {
121212
FileStoreTable table =

0 commit comments

Comments
 (0)