Skip to content

Commit 314f0ad

Browse files
committed
Core/RewriteFiles: Fixed dropping delete files that are still required
This fixes a bug where delete files that are still required by the re-written file could be deleted prematurely. This happened because `MergingSnapshotProducer` calculated `minDataSequenceNumber` in a way that could skip data sequence numbers that do exist in the final snapshot. The filtered manifest list could exclude manifests of a certain sequence number because their data files were re-written. With specific orders of operations it was possible to skip the sequence number of manifests that were re-written.
1 parent 10fce27 commit 314f0ad

File tree

3 files changed

+132
-1
lines changed

3 files changed

+132
-1
lines changed

core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java

+12-1
Original file line numberDiff line numberDiff line change
@@ -833,7 +833,17 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
833833
filterManager.filterManifests(
834834
SnapshotUtil.schemaFor(base, targetBranch()),
835835
snapshot != null ? snapshot.dataManifests(ops.io()) : null);
836-
long minDataSequenceNumber =
836+
837+
long minNewFileSequenceNumber =
838+
addedDataFiles().stream()
839+
.filter(x -> x.dataSequenceNumber() != null)
840+
.mapToLong(ContentFile::dataSequenceNumber)
841+
.reduce(
842+
newDataFilesDataSequenceNumber != null
843+
? newDataFilesDataSequenceNumber
844+
: base.nextSequenceNumber(),
845+
Math::min);
846+
long minExistingDataSequenceNumber =
837847
filtered.stream()
838848
.map(ManifestFile::minSequenceNumber)
839849
.filter(
@@ -842,6 +852,7 @@ public List<ManifestFile> apply(TableMetadata base, Snapshot snapshot) {
842852
!= ManifestWriter
843853
.UNASSIGNED_SEQ) // filter out unassigned in rewritten manifests
844854
.reduce(base.lastSequenceNumber(), Math::min);
855+
long minDataSequenceNumber = Math.min(minNewFileSequenceNumber, minExistingDataSequenceNumber);
845856
deleteFilterManager.dropDeleteFilesOlderThan(minDataSequenceNumber);
846857
List<ManifestFile> filteredDeletes =
847858
deleteFilterManager.filterManifests(

core/src/test/java/org/apache/iceberg/TestBase.java

+8
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,14 @@ public class TestBase {
108108
.withPartitionPath("data_bucket=1") // easy way to set partition data for now
109109
.withRecordCount(1)
110110
.build();
111+
static final DeleteFile FILE_B2_DELETES =
112+
FileMetadata.deleteFileBuilder(SPEC)
113+
.ofEqualityDeletes(1)
114+
.withPath("/path/to/data-b2-deletes.parquet")
115+
.withFileSizeInBytes(10)
116+
.withPartitionPath("data_bucket=0")
117+
.withRecordCount(1)
118+
.build();
111119
static final DataFile FILE_C =
112120
DataFiles.builder(SPEC)
113121
.withPath("/path/to/data-c.parquet")

core/src/test/java/org/apache/iceberg/TestRewriteFiles.java

+112
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
import java.io.File;
3030
import java.util.Arrays;
3131
import java.util.Collections;
32+
import java.util.Comparator;
3233
import java.util.List;
34+
import java.util.stream.Collectors;
3335
import org.apache.iceberg.exceptions.CommitFailedException;
3436
import org.apache.iceberg.exceptions.ValidationException;
3537
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
@@ -384,6 +386,116 @@ public void testRewriteDataAndAssignOldSequenceNumber() {
384386
assertThat(listManifestFiles()).hasSize(4);
385387
}
386388

389+
@TestTemplate
390+
public void testRewriteDataAndAssignOldSequenceNumbersShouldNotDropDeleteFiles() {
391+
assumeThat(formatVersion)
392+
.as("Sequence number is only supported in iceberg format v2 or later")
393+
.isGreaterThan(1);
394+
assertThat(listManifestFiles()).isEmpty();
395+
396+
commit(table, table.newRowDelta().addRows(FILE_A).addDeletes(FILE_A2_DELETES), branch);
397+
398+
long firstRewriteSequenceNumber = latestSnapshot(table, branch).sequenceNumber();
399+
400+
commit(
401+
table,
402+
table.newRowDelta().addRows(FILE_B).addRows(FILE_B).addDeletes(FILE_B2_DELETES),
403+
branch);
404+
commit(
405+
table,
406+
table.newRowDelta().addRows(FILE_B).addRows(FILE_C).addDeletes(FILE_C2_DELETES),
407+
branch);
408+
409+
long secondRewriteSequenceNumber = latestSnapshot(table, branch).sequenceNumber();
410+
411+
commit(
412+
table,
413+
table
414+
.newRewrite()
415+
.addFile(FILE_D)
416+
.deleteFile(FILE_B)
417+
.deleteFile(FILE_C)
418+
.dataSequenceNumber(secondRewriteSequenceNumber),
419+
branch);
420+
421+
TableMetadata base = readMetadata();
422+
Snapshot baseSnap = latestSnapshot(base, branch);
423+
long baseSnapshotId = baseSnap.snapshotId();
424+
425+
Comparator<ManifestFile> sequenceNumberOrdering =
426+
new Comparator<>() {
427+
@Override
428+
public int compare(ManifestFile o1, ManifestFile o2) {
429+
return (int) (o1.sequenceNumber() - o2.sequenceNumber());
430+
}
431+
};
432+
433+
// FILE_B2_DELETES and FILE_A2_DELETES should not be removed as the rewrite specifies
434+
// `firstRewriteSequenceNumber`
435+
// explicitly which is the same as that of A2_DELETES and before B2_DELETES
436+
437+
// Technically A1_DELETES could be removed since it's an equality delete and should apply on
438+
// data sequences strictly
439+
// smaller, so it's no longer needed. However, MergingSnapshotProducer calls
440+
// dropDeleteFilesOlderThan
441+
// which doesn't consider if the file is an equality delete, if that API is changed the equality
442+
// delete file could be dropped sooner
443+
Snapshot pending =
444+
apply(
445+
table
446+
.newRewrite()
447+
.addFile(FILE_A2)
448+
.deleteFile(FILE_A)
449+
.dataSequenceNumber(firstRewriteSequenceNumber),
450+
branch);
451+
452+
assertThat(pending.allManifests(table.io())).hasSize(6);
453+
454+
long pendingId = pending.snapshotId();
455+
List<ManifestFile> manifestFiles =
456+
pending.allManifests(table.io()).stream()
457+
.sorted(sequenceNumberOrdering.reversed())
458+
.collect(Collectors.toList());
459+
ManifestFile newManifest = manifestFiles.get(0);
460+
validateManifestEntries(newManifest, ids(pendingId), files(FILE_A2), statuses(ADDED));
461+
462+
assertThat(ManifestFiles.read(newManifest, FILE_IO).entries())
463+
.allSatisfy(
464+
entry -> assertThat(entry.dataSequenceNumber()).isEqualTo(firstRewriteSequenceNumber));
465+
assertThat(newManifest.sequenceNumber()).isEqualTo(secondRewriteSequenceNumber + 2);
466+
467+
validateManifestEntries(manifestFiles.get(1), ids(pendingId), files(FILE_A), statuses(DELETED));
468+
469+
validateManifestEntries(
470+
manifestFiles.get(2), ids(baseSnapshotId), files(FILE_D), statuses(ADDED));
471+
472+
validateDeleteManifest(
473+
manifestFiles.get(3),
474+
dataSeqs(3L),
475+
fileSeqs(3L),
476+
ids(baseSnapshotId - 1),
477+
files(FILE_C2_DELETES),
478+
statuses(ADDED));
479+
480+
validateDeleteManifest(
481+
manifestFiles.get(4),
482+
dataSeqs(2L),
483+
fileSeqs(2L),
484+
ids(baseSnapshotId - 2),
485+
files(FILE_B2_DELETES),
486+
statuses(ADDED));
487+
488+
validateDeleteManifest(
489+
manifestFiles.get(5),
490+
dataSeqs(1L),
491+
fileSeqs(1L),
492+
ids(baseSnapshotId - 3),
493+
files(FILE_A2_DELETES),
494+
statuses(ADDED));
495+
496+
assertThat(listManifestFiles()).hasSize(11);
497+
}
498+
387499
@TestTemplate
388500
public void testFailure() {
389501
commit(table, table.newAppend().appendFile(FILE_A), branch);

0 commit comments

Comments
 (0)