Skip to content

Commit a53f53e

Browse files
committed
[core] ConflictDetection should check delta files for OVERWRITE
1 parent da8e246 commit a53f53e

File tree

3 files changed

+37
-3
lines changed

3 files changed

+37
-3
lines changed

paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -190,10 +190,12 @@ static <T extends FileEntry> void mergeEntries(Iterable<T> entries, Map<Identifi
190190
Identifier identifier = entry.identifier();
191191
switch (entry.kind()) {
192192
case ADD:
193+
T old = map.get(identifier);
193194
checkState(
194-
!map.containsKey(identifier),
195-
"Trying to add file %s which is already added.",
196-
identifier);
195+
old == null,
196+
"Trying to add file %s which is already in the the map: %s",
197+
identifier,
198+
old);
197199
map.put(identifier, entry);
198200
break;
199201
case DELETE:

paimon-core/src/main/java/org/apache/paimon/operation/commit/ConflictDetection.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,15 @@ public void checkNoConflictsOrFail(
137137

138138
Function<Throwable, RuntimeException> conflictException =
139139
conflictException(baseCommitUser, baseEntries, deltaEntries);
140+
141+
try {
142+
// check the delta, it is important not to delete and add the same file. Since scan
143+
// relies on map for deduplication, this may result in the loss of this file
144+
FileEntry.mergeEntries(deltaEntries);
145+
} catch (Throwable e) {
146+
throw conflictException.apply(e);
147+
}
148+
140149
Collection<SimpleFileEntry> mergedEntries;
141150
try {
142151
// merge manifest entries and also check if the files we want to delete are still there

paimon-core/src/test/java/org/apache/paimon/table/AppendOnlySimpleTableTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,29 @@
124124
/** Tests for {@link AppendOnlyFileStoreTable}. */
125125
public class AppendOnlySimpleTableTest extends SimpleTableTestBase {
126126

127+
@Test
128+
public void testOverwriteSameFiles() throws Exception {
129+
FileStoreTable table = createFileStoreTable();
130+
131+
// write files
132+
List<CommitMessage> commitMessages;
133+
try (BatchTableWrite write = table.newBatchWriteBuilder().newWrite()) {
134+
write.write(rowData(1, 10, 100L));
135+
commitMessages = write.prepareCommit();
136+
}
137+
138+
// first commit
139+
try (BatchTableCommit commit = table.newBatchWriteBuilder().withOverwrite().newCommit()) {
140+
commit.commit(commitMessages);
141+
}
142+
143+
// second commit should throw exception
144+
try (BatchTableCommit commit = table.newBatchWriteBuilder().withOverwrite().newCommit()) {
145+
assertThatThrownBy(() -> commit.commit(commitMessages))
146+
.hasMessageContaining("File deletion conflicts detected! Give up committing");
147+
}
148+
}
149+
127150
@Test
128151
public void testBucketedAppendTableWriteWithInit() throws Exception {
129152
innerTestBucketedAppendTableWriteInit(true);

0 commit comments

Comments
 (0)