Skip to content

Commit 76954b8

Browse files
committed
Core/RewriteFiles: Fixed dropping delete files that are still required
This fixes a bug where delete files that are still required by the re-written file could be deleted prematurely. This happened because `MergingSnapshotProducer` calculated `minDataSequenceNumber` in a way that could skip data sequence numbers that do exist in the final snapshot. The filtered manifest list could exclude manifests of a certain sequence number because their data files were re-written. With specific orders of operations it was possible to skip the sequence number of manifests that were re-written.
1 parent fea6989 commit 76954b8

File tree

3 files changed

+115
-10
lines changed

3 files changed

+115
-10
lines changed

core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java

+28-10
Original file line numberDiff line numberDiff line change
@@ -915,16 +915,9 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
915915
filterManager.filterManifests(
916916
SnapshotUtil.schemaFor(base, targetBranch()),
917917
snapshot != null ? snapshot.dataManifests(ops().io()) : null);
918-
long minDataSequenceNumber =
919-
filtered.stream()
920-
.map(ManifestFile::minSequenceNumber)
921-
.filter(
922-
seq ->
923-
seq
924-
!= ManifestWriter
925-
.UNASSIGNED_SEQ) // filter out unassigned in rewritten manifests
926-
.reduce(base.lastSequenceNumber(), Math::min);
927-
deleteFilterManager.dropDeleteFilesOlderThan(minDataSequenceNumber);
918+
919+
long newMinSequenceNumber = minDataSequenceNumber(base, filtered);
920+
deleteFilterManager.dropDeleteFilesOlderThan(newMinSequenceNumber);
928921
List<ManifestFile> filteredDeletes =
929922
deleteFilterManager.filterManifests(
930923
SnapshotUtil.schemaFor(base, targetBranch()),
@@ -955,6 +948,31 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
955948
return manifests;
956949
}
957950

951+
private long minDataSequenceNumber(TableMetadata base, List<ManifestFile> keptManifests) {
952+
long minAddedDataSequenceNumber =
953+
addedDataFiles().stream()
954+
.map(ContentFile::dataSequenceNumber)
955+
.filter(Objects::nonNull)
956+
.filter(seq -> seq >= 0)
957+
.reduce(base.nextSequenceNumber(), Math::min);
958+
959+
long minExistingDataSequenceNumber =
960+
keptManifests.stream()
961+
.map(ManifestFile::minSequenceNumber)
962+
.filter(
963+
seq ->
964+
seq
965+
!= ManifestWriter
966+
.UNASSIGNED_SEQ) // filter out unassigned in rewritten manifests
967+
.reduce(base.lastSequenceNumber(), Math::min);
968+
969+
return Math.min(
970+
Math.min(Integer.MAX_VALUE, minExistingDataSequenceNumber),
971+
newDataFilesDataSequenceNumber != null
972+
? newDataFilesDataSequenceNumber
973+
: Integer.MAX_VALUE);
974+
}
975+
958976
@Override
959977
public Object updateEvent() {
960978
long snapshotId = snapshotId();

core/src/test/java/org/apache/iceberg/TestBase.java

+8
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,14 @@ public class TestBase {
122122
.withPartitionPath("data_bucket=1") // easy way to set partition data for now
123123
.withRecordCount(1)
124124
.build();
125+
static final DeleteFile FILE_B_EQ_DELETES =
126+
FileMetadata.deleteFileBuilder(SPEC)
127+
.ofEqualityDeletes(1)
128+
.withPath("/path/to/data-b2-deletes.parquet")
129+
.withFileSizeInBytes(10)
130+
.withPartitionPath("data_bucket=0")
131+
.withRecordCount(1)
132+
.build();
125133
static final DeleteFile FILE_B_DV =
126134
FileMetadata.deleteFileBuilder(SPEC)
127135
.ofPositionDeletes()

core/src/test/java/org/apache/iceberg/TestRewriteFiles.java

+79
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
import java.io.File;
3030
import java.util.Arrays;
3131
import java.util.Collections;
32+
import java.util.Comparator;
3233
import java.util.List;
34+
import java.util.stream.Collectors;
3335
import org.apache.iceberg.exceptions.CommitFailedException;
3436
import org.apache.iceberg.exceptions.ValidationException;
3537
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
@@ -387,6 +389,83 @@ public void testRewriteDataAndAssignOldSequenceNumber() {
387389
assertThat(listManifestFiles()).hasSize(4);
388390
}
389391

392+
@TestTemplate
393+
public void testRewriteDataAndAssignOldSequenceNumbersShouldNotDropDeleteFiles() {
394+
assumeThat(formatVersion)
395+
.as("Sequence number is only supported in iceberg format v2 or later")
396+
.isGreaterThan(1);
397+
assertThat(listManifestFiles()).isEmpty();
398+
399+
commit(table, table.newRowDelta().addRows(FILE_A).addDeletes(FILE_A2_DELETES), branch);
400+
401+
long firstCommitSequenceNumber = latestSnapshot(table, branch).sequenceNumber();
402+
403+
commit(table, table.newRowDelta().addRows(FILE_B).addDeletes(FILE_B_EQ_DELETES), branch);
404+
405+
long secondCommitSequenceNumber = latestSnapshot(table, branch).sequenceNumber();
406+
407+
TableMetadata base = readMetadata();
408+
Snapshot baseSnap = latestSnapshot(base, branch);
409+
long baseSnapshotId = baseSnap.snapshotId();
410+
411+
// FILE_B_EQ_DELETES and FILE_A2_DELETES should not be removed as the rewrite specifies
412+
// `firstRewriteSequenceNumber` explicitly which is the same as that of FILE_A2_DELETES and before
413+
// FILE_B_EQ_DELETES
414+
415+
// Technically FILE_A2_DELETES could be removed since it's an equality delete and should apply
416+
// on data sequences strictly smaller, so it's no longer needed.
417+
// However, MergingSnapshotProducer calls dropDeleteFilesOlderThan which doesn't consider if the
418+
// file is an equality delete, if that API is changed the equality delete file could be dropped
419+
// sooner
420+
Snapshot pending =
421+
apply(
422+
table
423+
.newRewrite()
424+
.addFile(FILE_A2)
425+
.deleteFile(FILE_A)
426+
.dataSequenceNumber(firstCommitSequenceNumber),
427+
branch);
428+
429+
assertThat(pending.allManifests(table.io())).hasSize(5);
430+
431+
long pendingId = pending.snapshotId();
432+
List<ManifestFile> manifestFiles =
433+
pending.allManifests(table.io()).stream()
434+
.sorted(Comparator.comparingLong(ManifestFile::sequenceNumber).reversed())
435+
.collect(Collectors.toList());
436+
437+
validateManifest(
438+
manifestFiles.get(0),
439+
dataSeqs(1L),
440+
fileSeqs(secondCommitSequenceNumber + 1),
441+
ids(pendingId),
442+
files(FILE_A2),
443+
statuses(ADDED));
444+
445+
validateManifestEntries(manifestFiles.get(1), ids(pendingId), files(FILE_A), statuses(DELETED));
446+
447+
validateManifestEntries(
448+
manifestFiles.get(2), ids(baseSnapshotId), files(FILE_B), statuses(ADDED));
449+
450+
validateDeleteManifest(
451+
manifestFiles.get(3),
452+
dataSeqs(2L),
453+
fileSeqs(2L),
454+
ids(baseSnapshotId),
455+
files(FILE_B_EQ_DELETES),
456+
statuses(ADDED));
457+
458+
validateDeleteManifest(
459+
manifestFiles.get(4),
460+
dataSeqs(1L),
461+
fileSeqs(1L),
462+
ids(baseSnapshotId - 1),
463+
files(FILE_A2_DELETES),
464+
statuses(ADDED));
465+
466+
assertThat(listManifestFiles()).hasSize(6);
467+
}
468+
390469
@TestTemplate
391470
public void testFailure() {
392471
commit(table, table.newAppend().appendFile(FILE_A), branch);

0 commit comments

Comments
 (0)