Skip to content

Commit d9307d3

Browse files
committed
Testing the BaseIncrementalChangeloggScan implementation from apache#9888.
This uses the BaseIncrementalChangeloggScan implementation from apache#9888 and the rest of the changes (including tests) from apache#10935 to test the BaseIncrementalChangeloggScan implementation.
1 parent 33259f9 commit d9307d3

File tree

7 files changed

+490
-100
lines changed

7 files changed

+490
-100
lines changed

core/src/main/java/org/apache/iceberg/BaseIncrementalChangelogScan.java

+86-51
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,17 @@
2121
import java.util.ArrayDeque;
2222
import java.util.Collection;
2323
import java.util.Deque;
24+
import java.util.List;
2425
import java.util.Map;
26+
import java.util.Objects;
2527
import java.util.Set;
2628
import java.util.stream.Collectors;
2729
import org.apache.iceberg.ManifestGroup.CreateTasksFunction;
2830
import org.apache.iceberg.ManifestGroup.TaskContext;
2931
import org.apache.iceberg.io.CloseableIterable;
30-
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
31-
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
32+
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
3233
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
34+
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
3335
import org.apache.iceberg.util.SnapshotUtil;
3436
import org.apache.iceberg.util.TableScanUtil;
3537

@@ -63,21 +65,27 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles(
6365
return CloseableIterable.empty();
6466
}
6567

66-
Set<Long> changelogSnapshotIds = toSnapshotIds(changelogSnapshots);
67-
68-
Set<ManifestFile> newDataManifests =
69-
FluentIterable.from(changelogSnapshots)
70-
.transformAndConcat(snapshot -> snapshot.dataManifests(table().io()))
71-
.filter(manifest -> changelogSnapshotIds.contains(manifest.snapshotId()))
72-
.toSet();
68+
Set<ManifestFile> newDataManifests = Sets.newHashSet();
69+
Set<ManifestFile> newDeleteManifests = Sets.newHashSet();
70+
Map<Long, Snapshot> addedToChangedSnapshots = Maps.newHashMap();
71+
for (Snapshot snapshot : changelogSnapshots) {
72+
List<ManifestFile> dataManifests = snapshot.dataManifests(table().io());
73+
for (ManifestFile manifest : dataManifests) {
74+
if (!newDataManifests.contains(manifest)) {
75+
addedToChangedSnapshots.put(manifest.snapshotId(), snapshot);
76+
newDataManifests.add(manifest);
77+
}
78+
}
79+
newDeleteManifests.addAll(snapshot.deleteManifests(table().io()));
80+
}
7381

7482
ManifestGroup manifestGroup =
75-
new ManifestGroup(table().io(), newDataManifests, ImmutableList.of())
83+
new ManifestGroup(table().io(), newDataManifests, newDeleteManifests)
7684
.specsById(table().specs())
7785
.caseSensitive(isCaseSensitive())
7886
.select(scanColumns())
7987
.filterData(filter())
80-
.filterManifestEntries(entry -> changelogSnapshotIds.contains(entry.snapshotId()))
88+
.filterManifestEntries(entry -> addedToChangedSnapshots.containsKey(entry.snapshotId()))
8189
.ignoreExisting()
8290
.columnsToKeepStats(columnsToKeepStats());
8391

@@ -89,7 +97,8 @@ protected CloseableIterable<ChangelogScanTask> doPlanFiles(
8997
manifestGroup = manifestGroup.planWith(planExecutor());
9098
}
9199

92-
return manifestGroup.plan(new CreateDataFileChangeTasks(changelogSnapshots));
100+
return manifestGroup.plan(
101+
new CreateDataFileChangeTasks(changelogSnapshots, addedToChangedSnapshots));
93102
}
94103

95104
@Override
@@ -105,11 +114,6 @@ private Deque<Snapshot> orderedChangelogSnapshots(Long fromIdExcl, long toIdIncl
105114

106115
for (Snapshot snapshot : SnapshotUtil.ancestorsBetween(table(), toIdIncl, fromIdExcl)) {
107116
if (!snapshot.operation().equals(DataOperations.REPLACE)) {
108-
if (!snapshot.deleteManifests(table().io()).isEmpty()) {
109-
throw new UnsupportedOperationException(
110-
"Delete files are currently not supported in changelog scans");
111-
}
112-
113117
changelogSnapshots.addFirst(snapshot);
114118
}
115119
}
@@ -134,50 +138,81 @@ private static Map<Long, Integer> computeSnapshotOrdinals(Deque<Snapshot> snapsh
134138
}
135139

136140
private static class CreateDataFileChangeTasks implements CreateTasksFunction<ChangelogScanTask> {
137-
private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
138141

139142
private final Map<Long, Integer> snapshotOrdinals;
143+
private final Map<Long, Snapshot> addedToChangedSnapshots;
140144

141-
CreateDataFileChangeTasks(Deque<Snapshot> snapshots) {
145+
CreateDataFileChangeTasks(
146+
Deque<Snapshot> snapshots, Map<Long, Snapshot> addedToChangedSnapshots) {
142147
this.snapshotOrdinals = computeSnapshotOrdinals(snapshots);
148+
this.addedToChangedSnapshots = addedToChangedSnapshots;
143149
}
144150

145151
@Override
146152
public CloseableIterable<ChangelogScanTask> apply(
147153
CloseableIterable<ManifestEntry<DataFile>> entries, TaskContext context) {
148154

149-
return CloseableIterable.transform(
150-
entries,
151-
entry -> {
152-
long commitSnapshotId = entry.snapshotId();
153-
int changeOrdinal = snapshotOrdinals.get(commitSnapshotId);
154-
DataFile dataFile = entry.file().copy(context.shouldKeepStats());
155-
156-
switch (entry.status()) {
157-
case ADDED:
158-
return new BaseAddedRowsScanTask(
159-
changeOrdinal,
160-
commitSnapshotId,
161-
dataFile,
162-
NO_DELETES,
163-
context.schemaAsString(),
164-
context.specAsString(),
165-
context.residuals());
166-
167-
case DELETED:
168-
return new BaseDeletedDataFileScanTask(
169-
changeOrdinal,
170-
commitSnapshotId,
171-
dataFile,
172-
NO_DELETES,
173-
context.schemaAsString(),
174-
context.specAsString(),
175-
context.residuals());
176-
177-
default:
178-
throw new IllegalArgumentException("Unexpected entry status: " + entry.status());
179-
}
180-
});
155+
return CloseableIterable.filter(
156+
CloseableIterable.transform(
157+
entries,
158+
entry -> {
159+
long snapshotId = entry.snapshotId();
160+
Snapshot snapshot = addedToChangedSnapshots.get(snapshotId);
161+
long commitSnapshotId = snapshot.snapshotId();
162+
int changeOrdinal = snapshotOrdinals.get(snapshot.snapshotId());
163+
DataFile dataFile = entry.file().copy(context.shouldKeepStats());
164+
DeleteFile[] deleteFiles = context.deletes().forDataFile(dataFile);
165+
List<DeleteFile> addedDeletes = Lists.newArrayList();
166+
List<DeleteFile> existingDeletes = Lists.newArrayList();
167+
for (DeleteFile file : deleteFiles) {
168+
if (file.dataSequenceNumber() == snapshot.sequenceNumber()) {
169+
addedDeletes.add(file);
170+
} else {
171+
existingDeletes.add(file);
172+
}
173+
}
174+
175+
switch (entry.status()) {
176+
case ADDED:
177+
if (snapshotId == commitSnapshotId) {
178+
return new BaseAddedRowsScanTask(
179+
changeOrdinal,
180+
commitSnapshotId,
181+
dataFile,
182+
addedDeletes.toArray(new DeleteFile[0]),
183+
context.schemaAsString(),
184+
context.specAsString(),
185+
context.residuals());
186+
} else if (deleteFiles.length > 0) {
187+
return new BaseDeletedRowsScanTask(
188+
changeOrdinal,
189+
commitSnapshotId,
190+
dataFile,
191+
addedDeletes.toArray(new DeleteFile[0]),
192+
existingDeletes.toArray(new DeleteFile[0]),
193+
context.schemaAsString(),
194+
context.specAsString(),
195+
context.residuals());
196+
} else {
197+
return null;
198+
}
199+
200+
case DELETED:
201+
return new BaseDeletedDataFileScanTask(
202+
changeOrdinal,
203+
commitSnapshotId,
204+
dataFile,
205+
existingDeletes.toArray(new DeleteFile[0]),
206+
context.schemaAsString(),
207+
context.specAsString(),
208+
context.residuals());
209+
210+
default:
211+
throw new IllegalArgumentException(
212+
"Unexpected entry status: " + entry.status());
213+
}
214+
}),
215+
Objects::nonNull);
181216
}
182217
}
183218
}

core/src/test/java/org/apache/iceberg/TestBaseIncrementalChangelogScan.java

-15
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED;
2222
import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT;
2323
import static org.assertj.core.api.Assertions.assertThat;
24-
import static org.assertj.core.api.Assertions.assertThatThrownBy;
25-
import static org.assertj.core.api.Assumptions.assumeThat;
2624

2725
import java.io.IOException;
2826
import java.util.Comparator;
@@ -247,19 +245,6 @@ public void testDataFileRewrites() {
247245
assertThat(t2.deletes()).as("Must be no deletes").isEmpty();
248246
}
249247

250-
@TestTemplate
251-
public void testDeleteFilesAreNotSupported() {
252-
assumeThat(formatVersion).isEqualTo(2);
253-
254-
table.newFastAppend().appendFile(FILE_A2).appendFile(FILE_B).commit();
255-
256-
table.newRowDelta().addDeletes(FILE_A2_DELETES).commit();
257-
258-
assertThatThrownBy(() -> plan(newScan()))
259-
.isInstanceOf(UnsupportedOperationException.class)
260-
.hasMessage("Delete files are currently not supported in changelog scans");
261-
}
262-
263248
// plans tasks and reorders them to have deterministic order
264249
private List<ChangelogScanTask> plan(IncrementalChangelogScan scan) {
265250
try (CloseableIterable<ChangelogScanTask> tasks = scan.planFiles()) {

data/src/main/java/org/apache/iceberg/data/DeleteFilter.java

+11-11
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public void incrementDeleteCount() {
125125
counter.increment();
126126
}
127127

128-
Accessor<StructLike> posAccessor() {
128+
protected Accessor<StructLike> posAccessor() {
129129
return posAccessor;
130130
}
131131

@@ -197,31 +197,31 @@ record -> deleteSet.contains(projectRow.wrap(asStructLike(record)));
197197
}
198198

199199
public CloseableIterable<T> findEqualityDeleteRows(CloseableIterable<T> records) {
200-
// Predicate to test whether a row has been deleted by equality deletions.
201-
Predicate<T> deletedRows = applyEqDeletes().stream().reduce(Predicate::or).orElse(t -> false);
202-
203-
return CloseableIterable.filter(records, deletedRows);
200+
return CloseableIterable.filter(records, isEqDeleted());
204201
}
205202

206203
private CloseableIterable<T> applyEqDeletes(CloseableIterable<T> records) {
207-
Predicate<T> isEqDeleted = applyEqDeletes().stream().reduce(Predicate::or).orElse(t -> false);
208-
209-
return createDeleteIterable(records, isEqDeleted);
204+
return createDeleteIterable(records, isEqDeleted());
210205
}
211206

212207
protected void markRowDeleted(T item) {
213208
throw new UnsupportedOperationException(
214209
this.getClass().getName() + " does not implement markRowDeleted");
215210
}
216211

217-
public Predicate<T> eqDeletedRowFilter() {
212+
// Predicate to test whether a row has been deleted by equality deletes
213+
public Predicate<T> isEqDeleted() {
218214
if (eqDeleteRows == null) {
219-
eqDeleteRows =
220-
applyEqDeletes().stream().map(Predicate::negate).reduce(Predicate::and).orElse(t -> true);
215+
eqDeleteRows = applyEqDeletes().stream().reduce(Predicate::or).orElse(t -> false);
221216
}
222217
return eqDeleteRows;
223218
}
224219

220+
// Predicate to test whether a row has not been deleted by equality deletes
221+
public Predicate<T> eqDeletedRowFilter() {
222+
return isEqDeleted().negate();
223+
}
224+
225225
public PositionDeleteIndex deletedRowPositions() {
226226
if (deleteRowPositions == null && !posDeletes.isEmpty()) {
227227
this.deleteRowPositions = deleteLoader().loadPositionDeletes(posDeletes, filePath);

0 commit comments

Comments
 (0)