Skip to content

Commit 1e22d51

Browse files
committed
More tests.
1 parent f663c6d commit 1e22d51

File tree

2 files changed

+92
-4
lines changed

2 files changed

+92
-4
lines changed

spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java

+46-4
Original file line numberDiff line numberDiff line change
@@ -79,13 +79,11 @@ public void testDataFilters() {
7979
sql("INSERT INTO %s VALUES (1, 'c'), (2, 'c'), (3, 'c')", tableName);
8080

8181
Table table = validationCatalog.loadTable(tableIdent);
82-
8382
Snapshot snap1 = table.currentSnapshot();
8483

8584
sql("DELETE FROM %s WHERE id = 3", tableName);
8685

8786
table.refresh();
88-
8987
Snapshot snap2 = table.currentSnapshot();
9088

9189
assertEquals(
@@ -123,13 +121,11 @@ public void testUpdates() {
123121
createTableWithDefaultRows();
124122

125123
Table table = validationCatalog.loadTable(tableIdent);
126-
127124
Snapshot snap2 = table.currentSnapshot();
128125

129126
sql("UPDATE %s SET id = -2 WHERE data = 'b'", tableName);
130127

131128
table.refresh();
132-
133129
Snapshot snap3 = table.currentSnapshot();
134130

135131
assertEquals(
@@ -295,6 +291,52 @@ public void testManifestRewritesAreIgnored() {
295291
sql("SELECT id, _change_type FROM %s.changes ORDER BY id", tableName));
296292
}
297293

294+
@TestTemplate
295+
public void testDataRewritesAreIgnored() {
296+
createTable();
297+
sql("INSERT INTO %s VALUES (1, 'c'), (2, 'c'), (3, 'c')", tableName);
298+
299+
Table table = validationCatalog.loadTable(tableIdent);
300+
Snapshot snap1 = table.currentSnapshot();
301+
302+
sql("DELETE FROM %s WHERE id = 3", tableName);
303+
304+
table.refresh();
305+
Snapshot snap2 = table.currentSnapshot();
306+
307+
sql("CALL %s.system.rewrite_data_files('%s')", catalogName, tableIdent);
308+
309+
sql("DELETE FROM %s WHERE id = 1", tableName);
310+
311+
table.refresh();
312+
Snapshot snap3 = table.currentSnapshot();
313+
314+
List<Object[]> expectedRows =
315+
(formatVersion == 1)
316+
? ImmutableList.of(
317+
row(1, "c", "INSERT", 0, snap1.snapshotId()),
318+
row(2, "c", "INSERT", 0, snap1.snapshotId()),
319+
row(3, "c", "INSERT", 0, snap1.snapshotId()),
320+
row(1, "c", "DELETE", 1, snap1.snapshotId()),
321+
row(1, "c", "INSERT", 1, snap1.snapshotId()),
322+
row(2, "c", "DELETE", 1, snap1.snapshotId()),
323+
row(2, "c", "INSERT", 1, snap1.snapshotId()),
324+
row(3, "c", "DELETE", 1, snap2.snapshotId()),
325+
row(1, "c", "DELETE", 2, snap3.snapshotId()),
326+
row(2, "c", "DELETE", 2, snap1.snapshotId()),
327+
row(2, "c", "INSERT", 2, snap1.snapshotId()))
328+
: ImmutableList.of(
329+
row(1, "c", "INSERT", 0, snap1.snapshotId()),
330+
row(2, "c", "INSERT", 0, snap1.snapshotId()),
331+
row(3, "c", "INSERT", 0, snap1.snapshotId()),
332+
row(3, "c", "DELETE", 1, snap2.snapshotId()),
333+
row(1, "c", "DELETE", 2, snap3.snapshotId()));
334+
assertEquals(
335+
"Should have expected rows",
336+
expectedRows,
337+
sql("SELECT * FROM %s.changes ORDER BY _change_ordinal, id, _change_type", tableName));
338+
}
339+
298340
@TestTemplate
299341
public void testMetadataColumns() {
300342
createTableWithDefaultRows();

spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java

+46
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,52 @@ public void testMixOfPositionAndEqualityDeletes() throws IOException {
342342
assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows));
343343
}
344344

345+
@Test
346+
public void testAddingAndDeletingInSameCommit() throws IOException {
347+
GenericRecord record = GenericRecord.create(table.schema());
348+
List<Record> records1b = Lists.newArrayList();
349+
records1b.add(record.copy("id", 28, "data", "a"));
350+
records1b.add(record.copy("id", 29, "data", "a"));
351+
records1b.add(record.copy("id", 43, "data", "b"));
352+
records1b.add(record.copy("id", 44, "data", "b"));
353+
records1b.add(record.copy("id", 61, "data", "c"));
354+
records1b.add(record.copy("id", 89, "data", "d"));
355+
DataFile dataFile1b = writeDataFile(records1b);
356+
357+
List<Pair<CharSequence, Long>> deletes =
358+
Lists.newArrayList(
359+
Pair.of(dataFile1b.path(), 0L), // id = 28
360+
Pair.of(dataFile1b.path(), 3L) // id = 44
361+
);
362+
363+
Pair<DeleteFile, CharSequenceSet> posDeletes =
364+
FileHelpers.writeDeleteFile(
365+
table,
366+
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
367+
TestHelpers.Row.of(0),
368+
deletes);
369+
370+
table
371+
.newRowDelta()
372+
.addRows(dataFile1b)
373+
.addDeletes(posDeletes.first())
374+
.validateDataFilesExist(posDeletes.second())
375+
.commit();
376+
// the resulting records in the table are the same as records1
377+
long snapshotId1 = table.currentSnapshot().snapshotId();
378+
379+
table.newAppend().appendFile(dataFile2).commit();
380+
long snapshotId2 = table.currentSnapshot().snapshotId();
381+
382+
List<InternalRow> rows = getChangelogRows();
383+
384+
List<Object[]> expectedRows = Lists.newArrayList();
385+
addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1);
386+
addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, records2);
387+
388+
assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows));
389+
}
390+
345391
private IncrementalChangelogScan newScan() {
346392
return table.newIncrementalChangelogScan();
347393
}

0 commit comments

Comments
 (0)