Skip to content

Commit f663c6d

Browse files
committed
More tests. Fix a bug.
1 parent bbcf147 commit f663c6d

File tree

3 files changed

+165
-12
lines changed

3 files changed

+165
-12
lines changed

Diff for: data/src/main/java/org/apache/iceberg/data/DeleteFilter.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ public void incrementDeleteCount() {
125125
counter.increment();
126126
}
127127

128-
Accessor<StructLike> posAccessor() {
128+
protected Accessor<StructLike> posAccessor() {
129129
return posAccessor;
130130
}
131131

Diff for: spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseReader.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.stream.Stream;
3232
import org.apache.avro.generic.GenericData;
3333
import org.apache.avro.util.Utf8;
34+
import org.apache.iceberg.Accessor;
3435
import org.apache.iceberg.ContentFile;
3536
import org.apache.iceberg.ContentScanTask;
3637
import org.apache.iceberg.DeleteFile;
@@ -285,11 +286,16 @@ protected void markRowDeleted(InternalRow row) {
285286
*/
286287
public CloseableIterable<InternalRow> filterDeleted(CloseableIterable<InternalRow> rows) {
287288
PositionDeleteIndex deletedRowPositions = deletedRowPositions();
289+
Accessor<StructLike> posAccessor = posAccessor();
290+
288291
Filter<InternalRow> deletedFilter =
289292
new Filter<InternalRow>() {
290293
@Override
291294
protected boolean shouldKeep(InternalRow row) {
292-
boolean isPosDeleted = deletedRowPositions.isDeleted(pos(row));
295+
boolean isPosDeleted =
296+
(deletedRowPositions != null && posAccessor != null)
297+
? deletedRowPositions.isDeleted(pos(row))
298+
: false;
293299
boolean isEqDeleted = isEqDeleted().test(row);
294300
return isPosDeleted || isEqDeleted;
295301
}

Diff for: spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java

+157-10
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.iceberg.ChangelogOperation;
3131
import org.apache.iceberg.ChangelogScanTask;
3232
import org.apache.iceberg.DataFile;
33+
import org.apache.iceberg.DeleteFile;
3334
import org.apache.iceberg.Files;
3435
import org.apache.iceberg.IncrementalChangelogScan;
3536
import org.apache.iceberg.PartitionSpec;
@@ -46,6 +47,8 @@
4647
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
4748
import org.apache.iceberg.spark.TestBase;
4849
import org.apache.iceberg.types.Types;
50+
import org.apache.iceberg.util.CharSequenceSet;
51+
import org.apache.iceberg.util.Pair;
4952
import org.apache.spark.sql.catalyst.InternalRow;
5053
import org.junit.jupiter.api.AfterEach;
5154
import org.junit.jupiter.api.BeforeEach;
@@ -60,6 +63,7 @@ public class TestChangelogReader extends TestBase {
6063
PartitionSpec.builderFor(SCHEMA).bucket("data", 16).build();
6164
private final List<Record> records1 = Lists.newArrayList();
6265
private final List<Record> records2 = Lists.newArrayList();
66+
private final List<Record> records3 = Lists.newArrayList();
6367

6468
private Table table;
6569
private DataFile dataFile1;
@@ -84,6 +88,11 @@ public void before() throws IOException {
8488
// write data to files
8589
dataFile1 = writeDataFile(records1);
8690
dataFile2 = writeDataFile(records2);
91+
92+
// records to be deleted
93+
records3.add(record.copy("id", 29, "data", "a"));
94+
records3.add(record.copy("id", 89, "data", "d"));
95+
records3.add(record.copy("id", 122, "data", "g"));
8796
}
8897

8998
@AfterEach
@@ -191,6 +200,153 @@ public void testMixDeleteAndInsert() throws IOException {
191200
table.newAppend().appendFile(dataFile2).commit();
192201
long snapshotId3 = table.currentSnapshot().snapshotId();
193202

203+
List<InternalRow> rows = getChangelogRows();
204+
205+
List<Object[]> expectedRows = Lists.newArrayList();
206+
addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1);
207+
addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId2, 1, records1);
208+
addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId3, 2, records2);
209+
210+
assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows));
211+
}
212+
213+
@Test
214+
public void testPositionDeletes() throws IOException {
215+
table.newAppend().appendFile(dataFile1).commit();
216+
long snapshotId1 = table.currentSnapshot().snapshotId();
217+
218+
table.newAppend().appendFile(dataFile2).commit();
219+
long snapshotId2 = table.currentSnapshot().snapshotId();
220+
221+
List<Pair<CharSequence, Long>> deletes =
222+
Lists.newArrayList(
223+
Pair.of(dataFile1.path(), 0L), // id = 29
224+
Pair.of(dataFile1.path(), 3L), // id = 89
225+
Pair.of(dataFile2.path(), 2L) // id = 122
226+
);
227+
228+
Pair<DeleteFile, CharSequenceSet> posDeletes =
229+
FileHelpers.writeDeleteFile(
230+
table,
231+
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
232+
TestHelpers.Row.of(0),
233+
deletes);
234+
235+
table
236+
.newRowDelta()
237+
.addDeletes(posDeletes.first())
238+
.validateDataFilesExist(posDeletes.second())
239+
.commit();
240+
long snapshotId3 = table.currentSnapshot().snapshotId();
241+
242+
List<InternalRow> rows = getChangelogRows();
243+
244+
List<Object[]> expectedRows = Lists.newArrayList();
245+
addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1);
246+
addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, records2);
247+
addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, records3);
248+
249+
assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows));
250+
}
251+
252+
@Test
253+
public void testEqualityDeletes() throws IOException {
254+
table.newAppend().appendFile(dataFile1).commit();
255+
long snapshotId1 = table.currentSnapshot().snapshotId();
256+
257+
table.newAppend().appendFile(dataFile2).commit();
258+
long snapshotId2 = table.currentSnapshot().snapshotId();
259+
260+
Schema deleteRowSchema = table.schema().select("data");
261+
Record dataDelete = GenericRecord.create(deleteRowSchema);
262+
List<Record> dataDeletes =
263+
Lists.newArrayList(
264+
dataDelete.copy("data", "a"), // id = 29
265+
dataDelete.copy("data", "d"), // id = 89
266+
dataDelete.copy("data", "g") // id = 122
267+
);
268+
269+
DeleteFile eqDeletes =
270+
FileHelpers.writeDeleteFile(
271+
table,
272+
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
273+
TestHelpers.Row.of(0),
274+
dataDeletes,
275+
deleteRowSchema);
276+
277+
table.newRowDelta().addDeletes(eqDeletes).commit();
278+
long snapshotId3 = table.currentSnapshot().snapshotId();
279+
280+
List<InternalRow> rows = getChangelogRows();
281+
282+
List<Object[]> expectedRows = Lists.newArrayList();
283+
addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1);
284+
addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, records2);
285+
addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, records3);
286+
287+
assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows));
288+
}
289+
290+
@Test
291+
public void testMixOfPositionAndEqualityDeletes() throws IOException {
292+
table.newAppend().appendFile(dataFile1).commit();
293+
long snapshotId1 = table.currentSnapshot().snapshotId();
294+
295+
table.newAppend().appendFile(dataFile2).commit();
296+
long snapshotId2 = table.currentSnapshot().snapshotId();
297+
298+
List<Pair<CharSequence, Long>> deletes =
299+
Lists.newArrayList(
300+
Pair.of(dataFile1.path(), 0L), // id = 29
301+
Pair.of(dataFile1.path(), 3L) // id = 89
302+
);
303+
304+
Pair<DeleteFile, CharSequenceSet> posDeletes =
305+
FileHelpers.writeDeleteFile(
306+
table,
307+
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
308+
TestHelpers.Row.of(0),
309+
deletes);
310+
311+
Schema deleteRowSchema = table.schema().select("data");
312+
Record dataDelete = GenericRecord.create(deleteRowSchema);
313+
List<Record> dataDeletes =
314+
Lists.newArrayList(
315+
dataDelete.copy("data", "a"), // id = 29
316+
dataDelete.copy("data", "g") // id = 122
317+
);
318+
319+
DeleteFile eqDeletes =
320+
FileHelpers.writeDeleteFile(
321+
table,
322+
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
323+
TestHelpers.Row.of(0),
324+
dataDeletes,
325+
deleteRowSchema);
326+
327+
table
328+
.newRowDelta()
329+
.addDeletes(eqDeletes)
330+
.addDeletes(posDeletes.first())
331+
.validateDataFilesExist(posDeletes.second())
332+
.commit();
333+
long snapshotId3 = table.currentSnapshot().snapshotId();
334+
335+
List<InternalRow> rows = getChangelogRows();
336+
337+
List<Object[]> expectedRows = Lists.newArrayList();
338+
addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1);
339+
addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, records2);
340+
addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId3, 2, records3);
341+
342+
assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows));
343+
}
344+
345+
private IncrementalChangelogScan newScan() {
346+
return table.newIncrementalChangelogScan();
347+
}
348+
349+
private List<InternalRow> getChangelogRows() throws IOException {
194350
CloseableIterable<ScanTaskGroup<ChangelogScanTask>> taskGroups = newScan().planTasks();
195351

196352
List<InternalRow> rows = Lists.newArrayList();
@@ -214,16 +370,7 @@ public void testMixDeleteAndInsert() throws IOException {
214370
}
215371
});
216372

217-
List<Object[]> expectedRows = Lists.newArrayList();
218-
addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1);
219-
addExpectedRows(expectedRows, ChangelogOperation.DELETE, snapshotId2, 1, records1);
220-
addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId3, 2, records2);
221-
222-
assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows));
223-
}
224-
225-
private IncrementalChangelogScan newScan() {
226-
return table.newIncrementalChangelogScan();
373+
return rows;
227374
}
228375

229376
private List<Object[]> addExpectedRows(

0 commit comments

Comments
 (0)