Skip to content

Commit 8318f2b

Browse files
committed
Core: Support IncrementalChangelogScan with deletes
1 parent 2519ab4 commit 8318f2b

File tree

3 files changed

+201
-80
lines changed

3 files changed

+201
-80
lines changed

Diff for: core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java

+86-51
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,17 @@
2121
import java.util.ArrayDeque;
2222
import java.util.Collection;
2323
import java.util.Deque;
24+
import java.util.List;
2425
import java.util.Map;
26+
import java.util.Objects;
2527
import java.util.Set;
2628
import java.util.stream.Collectors;
2729
import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
2830
import org.apache.iceberg.ManifestGroup.TaskContext;
2931
import org.apache.iceberg.io.CloseableIterable;
30-
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
31-
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
32+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
3233
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
34+
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
3335
import org.apache.iceberg.util.SnapshotUtil;
3436
import org.apache.iceberg.util.TableScanUtil;
3537

@@ -63,21 +65,27 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles(
6365
return CloseableIterable.empty();
6466
}
6567

66-
Set<Long> changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
67-
68-
Set<ManifestFile> newDataManifests =
69-
FluentIterable.from(changelogSnapshots)
70-
.transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
71-
.filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
72-
.toSet();
68+
Set<ManifestFile> newDataManifests = Sets.newHashSet();
69+
Set<ManifestFile> newDeleteManifests = Sets.newHashSet();
70+
Map<Long, Snapshot> addedToChangedSnapshots = Maps.newHashMap();
71+
for (Snapshot snapshot : changelogSnapshots) {
72+
List<ManifestFile> dataManifests = snapshot.dataManifests(table().io());
73+
for (ManifestFile manifest : dataManifests) {
74+
if (!newDataManifests.contains(manifest)) {
75+
addedToChangedSnapshots.put(manifest.snapshotId(), snapshot);
76+
newDataManifests.add(manifest);
77+
}
78+
}
79+
newDeleteManifests.addAll(snapshot.deleteManifests(table().io()));
80+
}
7381

7482
ManifestGroup manifestGroup =
75-
new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
83+
new ManifestGroup(table().io(), newDataManifests, newDeleteManifests)
7684
.specsById(table().specs())
7785
.caseSensitive(isCaseSensitive())
7886
.select(scanColumns())
7987
.filterData(filter())
80-
.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
88+
.filterManifestEntries(entry -> addedToChangedSnapshots.containsKey(entry.snapshotId()))
8189
.ignoreExisting()
8290
.columnsToKeepStats(columnsToKeepStats());
8391

@@ -89,7 +97,8 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles(
8997
manifestGroup = manifestGroup.planWith(planExecutor());
9098
}
9199

92-
return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots));
100+
return manifestGroup.plan(
101+
new CreateDataFileChangeTasks(changelogSnapshots, addedToChangedSnapshots));
93102
}
94103

95104
@Override
@@ -105,11 +114,6 @@ private Deque<Snapshot> orderedChangelogSnapshots(Long fromIdExcl, long toIdIncl
105114

106115
for (Snapshot snapshot : SnapshotUtil.ancestorsBetween(table(), toIdIncl, fromIdExcl)) {
107116
if (!snapshot.operation().equals(DataOperations.REPLACE)) {
108-
if (!snapshot.deleteManifests(table().io()).isEmpty()) {
109-
throw new UnsupportedOperationException(
110-
"Delete files are currently not supported in changelog scans");
111-
}
112-
113117
changelogSnapshots.addFirst(snapshot);
114118
}
115119
}
@@ -134,50 +138,81 @@ private static Map<Long, Integer> computeSnapshotOrdinals(Deque<Snapshot> snapsh
134138
}
135139

136140
private static class CreateDataFileChangeTasks implements CreateTasksFunction<ChangelogScanTask> {
137-
private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
138141

139142
private final Map<Long, Integer> snapshotOrdinals;
143+
private final Map<Long, Snapshot> addedToChangedSnapshots;
140144

141-
CreateDataFileChangeTasks(Deque<Snapshot> snapshots) {
145+
CreateDataFileChangeTasks(
146+
Deque<Snapshot> snapshots, Map<Long, Snapshot> addedToChangedSnapshots) {
142147
this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
148+
this.addedToChangedSnapshots = addedToChangedSnapshots;
143149
}
144150

145151
@Override
146152
public CloseableIterable<ChangelogScanTask> apply(
147153
CloseableIterable<ManifestEntry<DataFile>> entries, TaskContext context) {
148154

149-
return CloseableIterable.transform(
150-
entries,
151-
entry -> {
152-
long commitSnapshotId = entry.snapshotId();
153-
int changeOrdinal = snapshotOrdinals.get(commitSnapshotId);
154-
DataFile dataFile = entry.file().copy(context.shouldKeepStats());
155-
156-
switch (entry.status()) {
157-
case ADDED:
158-
return new BaseAddedRowsScanTask(
159-
changeOrdinal,
160-
commitSnapshotId,
161-
dataFile,
162-
NO_DELETES,
163-
context.schemaAsString(),
164-
context.specAsString(),
165-
context.residuals());
166-
167-
case DELETED:
168-
return new BaseDeletedDataFileScanTask(
169-
changeOrdinal,
170-
commitSnapshotId,
171-
dataFile,
172-
NO_DELETES,
173-
context.schemaAsString(),
174-
context.specAsString(),
175-
context.residuals());
176-
177-
default:
178-
throw new IllegalArgumentException("Unexpected entry status: " + entry.status());
179-
}
180-
});
155+
return CloseableIterable.filter(
156+
CloseableIterable.transform(
157+
entries,
158+
entry -> {
159+
long snapshotId = entry.snapshotId();
160+
Snapshot snapshot = addedToChangedSnapshots.get(snapshotId);
161+
long commitSnapshotId = snapshot.snapshotId();
162+
int changeOrdinal = snapshotOrdinals.get(snapshot.snapshotId());
163+
DataFile dataFile = entry.file().copy(context.shouldKeepStats());
164+
DeleteFile[] deleteFiles = context.deletes().forDataFile(dataFile);
165+
List<DeleteFile> addedDeletes = Lists.newArrayList();
166+
List<DeleteFile> existingDeletes = Lists.newArrayList();
167+
for (DeleteFile file : deleteFiles) {
168+
if (file.dataSequenceNumber() == snapshot.sequenceNumber()) {
169+
addedDeletes.add(file);
170+
} else {
171+
existingDeletes.add(file);
172+
}
173+
}
174+
175+
switch (entry.status()) {
176+
case ADDED:
177+
if (snapshotId == commitSnapshotId) {
178+
return new BaseAddedRowsScanTask(
179+
changeOrdinal,
180+
commitSnapshotId,
181+
dataFile,
182+
addedDeletes.toArray(new DeleteFile[0]),
183+
context.schemaAsString(),
184+
context.specAsString(),
185+
context.residuals());
186+
} else if (deleteFiles.length > 0) {
187+
return new BaseDeletedRowsScanTask(
188+
changeOrdinal,
189+
commitSnapshotId,
190+
dataFile,
191+
addedDeletes.toArray(new DeleteFile[0]),
192+
existingDeletes.toArray(new DeleteFile[0]),
193+
context.schemaAsString(),
194+
context.specAsString(),
195+
context.residuals());
196+
} else {
197+
return null;
198+
}
199+
200+
case DELETED:
201+
return new BaseDeletedDataFileScanTask(
202+
changeOrdinal,
203+
commitSnapshotId,
204+
dataFile,
205+
existingDeletes.toArray(new DeleteFile[0]),
206+
context.schemaAsString(),
207+
context.specAsString(),
208+
context.residuals());
209+
210+
default:
211+
throw new IllegalArgumentException(
212+
"Unexpected entry status: " + entry.status());
213+
}
214+
}),
215+
Objects::nonNull);
181216
}
182217
}
183218
}

Diff for: core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java

+51-14
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
3434
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
3535
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
36-
import org.assertj.core.api.Assertions;
3736
import org.junit.jupiter.api.TestTemplate;
3837
import org.junit.jupiter.api.extension.ExtendWith;
3938

@@ -132,6 +131,57 @@ public void testFileDeletes() {
132131
assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
133132
}
134133

134+
@TestTemplate
135+
public void testRowDeletes() {
136+
assumeThat(formatVersion).isEqualTo(2);
137+
138+
table.newFastAppend().appendFile(FILE_A).commit();
139+
Snapshot snap1 = table.currentSnapshot();
140+
141+
table.newRowDelta().addDeletes(FILE_A_DELETES).commit();
142+
Snapshot snap2 = table.currentSnapshot();
143+
144+
table.newRowDelta().addRows(FILE_B).addDeletes(FILE_B_DELETES).commit();
145+
146+
Snapshot snap3 = table.currentSnapshot();
147+
148+
table.newDelete().deleteFile(FILE_B).commit();
149+
Snapshot snap4 = table.currentSnapshot();
150+
151+
IncrementalChangelogScan scan =
152+
newScan().fromSnapshotExclusive(snap1.snapshotId()).toSnapshot(snap4.snapshotId());
153+
154+
List<ChangelogScanTask> tasks = plan(scan);
155+
156+
assertThat(tasks).as("Must have 3 tasks").hasSize(3);
157+
158+
DeletedRowsScanTask t1 = (DeletedRowsScanTask) Iterables.get(tasks, 0);
159+
assertThat(t1.changeOrdinal()).as("Ordinal must match").isEqualTo(0);
160+
assertThat(t1.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap2.snapshotId());
161+
assertThat(t1.file().path()).as("Data file must match").isEqualTo(FILE_A.path());
162+
assertThat(t1.addedDeletes().get(0).path())
163+
.as("Delete file must match")
164+
.isEqualTo(FILE_A_DELETES.path());
165+
assertThat(t1.existingDeletes()).as("Must be no deletes").isEmpty();
166+
167+
AddedRowsScanTask t2 = (AddedRowsScanTask) Iterables.get(tasks, 1);
168+
assertThat(t2.changeOrdinal()).as("Ordinal must match").isEqualTo(1);
169+
assertThat(t2.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap3.snapshotId());
170+
assertThat(t2.file().path()).as("Data file must match").isEqualTo(FILE_B.path());
171+
assertThat(t2.deletes().get(0).path())
172+
.as("Delete file must match")
173+
.isEqualTo(FILE_B_DELETES.path());
174+
175+
DeletedDataFileScanTask t3 = (DeletedDataFileScanTask) Iterables.get(tasks, 2);
176+
assertThat(t3.changeOrdinal()).as("Ordinal must match").isEqualTo(2);
177+
assertThat(t3.commitSnapshotId()).as("Snapshot must match").isEqualTo(snap4.snapshotId());
178+
assertThat(t3.file().path()).as("Data file must match").isEqualTo(FILE_B.path());
179+
assertThat(t3.existingDeletes()).hasSize(1);
180+
assertThat(t3.existingDeletes().get(0).path())
181+
.as("Existing delete file must match")
182+
.isEqualTo(FILE_B_DELETES.path());
183+
}
184+
135185
@TestTemplate
136186
public void testExistingEntriesInNewDataManifestsAreIgnored() {
137187
table
@@ -247,19 +297,6 @@ public void testDataFileRewrites() {
247297
assertThat(t2.deletes()).as("Must be no deletes").isEmpty();
248298
}
249299

250-
@TestTemplate
251-
public void testDeleteFilesAreNotSupported() {
252-
assumeThat(formatVersion).isEqualTo(2);
253-
254-
table.newFastAppend().appendFile(FILE_A2).appendFile(FILE_B).commit();
255-
256-
table.newRowDelta().addDeletes(FILE_A2_DELETES).commit();
257-
258-
Assertions.assertThatThrownBy(() -> plan(newScan()))
259-
.isInstanceOf(UnsupportedOperationException.class)
260-
.hasMessage("Delete files are currently not supported in changelog scans");
261-
}
262-
263300
// plans tasks and reorders them to have deterministic order
264301
private List<ChangelogScanTask> plan(IncrementalChangelogScan scan) {
265302
try (CloseableIterable<ChangelogScanTask> tasks = scan.planFiles()) {

0 commit comments

Comments
 (0)