Skip to content

Commit f3963fb

Browse files
committed
More tests.
1 parent f663c6d commit f3963fb

File tree

2 files changed

+83
-4
lines changed

2 files changed

+83
-4
lines changed

spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestChangelogTable.java

+37-4
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static org.apache.iceberg.TableProperties.UPDATE_MODE;
2727
import static org.assertj.core.api.Assertions.assertThat;
2828
import static org.assertj.core.api.Assertions.assertThatThrownBy;
29+
import static org.assertj.core.api.Assumptions.assumeThat;
2930

3031
import java.util.List;
3132
import org.apache.iceberg.DataOperations;
@@ -79,13 +80,11 @@ public void testDataFilters() {
7980
sql("INSERT INTO %s VALUES (1, 'c'), (2, 'c'), (3, 'c')", tableName);
8081

8182
Table table = validationCatalog.loadTable(tableIdent);
82-
8383
Snapshot snap1 = table.currentSnapshot();
8484

8585
sql("DELETE FROM %s WHERE id = 3", tableName);
8686

8787
table.refresh();
88-
8988
Snapshot snap2 = table.currentSnapshot();
9089

9190
assertEquals(
@@ -123,13 +122,11 @@ public void testUpdates() {
123122
createTableWithDefaultRows();
124123

125124
Table table = validationCatalog.loadTable(tableIdent);
126-
127125
Snapshot snap2 = table.currentSnapshot();
128126

129127
sql("UPDATE %s SET id = -2 WHERE data = 'b'", tableName);
130128

131129
table.refresh();
132-
133130
Snapshot snap3 = table.currentSnapshot();
134131

135132
assertEquals(
@@ -295,6 +292,42 @@ public void testManifestRewritesAreIgnored() {
295292
sql("SELECT id, _change_type FROM %s.changes ORDER BY id", tableName));
296293
}
297294

295+
@TestTemplate
296+
public void testDataRewritesAreIgnored() {
297+
assumeThat(formatVersion).isEqualTo(2);
298+
299+
createTable();
300+
sql("INSERT INTO %s VALUES (1, 'c'), (2, 'c'), (3, 'c')", tableName);
301+
302+
Table table = validationCatalog.loadTable(tableIdent);
303+
Snapshot snap1 = table.currentSnapshot();
304+
305+
sql("DELETE FROM %s WHERE id = 3", tableName);
306+
307+
table.refresh();
308+
Snapshot snap2 = table.currentSnapshot();
309+
310+
sql(
311+
"CALL %s.system.rewrite_data_files(table => '%s', "
312+
+ "options => map('delete-file-threshold','1'))",
313+
catalogName, tableIdent);
314+
315+
sql("DELETE FROM %s WHERE id = 1", tableName);
316+
317+
table.refresh();
318+
Snapshot snap3 = table.currentSnapshot();
319+
320+
assertEquals(
321+
"Should have expected rows",
322+
ImmutableList.of(
323+
row(1, "c", "INSERT", 0, snap1.snapshotId()),
324+
row(2, "c", "INSERT", 0, snap1.snapshotId()),
325+
row(3, "c", "INSERT", 0, snap1.snapshotId()),
326+
row(3, "c", "DELETE", 1, snap2.snapshotId()),
327+
row(1, "c", "DELETE", 2, snap3.snapshotId())),
328+
sql("SELECT * FROM %s.changes ORDER BY _change_ordinal, id, _change_type", tableName));
329+
}
330+
298331
@TestTemplate
299332
public void testMetadataColumns() {
300333
createTableWithDefaultRows();

spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestChangelogReader.java

+46
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,52 @@ public void testMixOfPositionAndEqualityDeletes() throws IOException {
342342
assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows));
343343
}
344344

345+
@Test
346+
public void testAddingAndDeletingInSameCommit() throws IOException {
347+
GenericRecord record = GenericRecord.create(table.schema());
348+
List<Record> records1b = Lists.newArrayList();
349+
records1b.add(record.copy("id", 28, "data", "a"));
350+
records1b.add(record.copy("id", 29, "data", "a"));
351+
records1b.add(record.copy("id", 43, "data", "b"));
352+
records1b.add(record.copy("id", 44, "data", "b"));
353+
records1b.add(record.copy("id", 61, "data", "c"));
354+
records1b.add(record.copy("id", 89, "data", "d"));
355+
DataFile dataFile1b = writeDataFile(records1b);
356+
357+
List<Pair<CharSequence, Long>> deletes =
358+
Lists.newArrayList(
359+
Pair.of(dataFile1b.path(), 0L), // id = 28
360+
Pair.of(dataFile1b.path(), 3L) // id = 44
361+
);
362+
363+
Pair<DeleteFile, CharSequenceSet> posDeletes =
364+
FileHelpers.writeDeleteFile(
365+
table,
366+
Files.localOutput(File.createTempFile("junit", null, temp.toFile())),
367+
TestHelpers.Row.of(0),
368+
deletes);
369+
370+
table
371+
.newRowDelta()
372+
.addRows(dataFile1b)
373+
.addDeletes(posDeletes.first())
374+
.validateDataFilesExist(posDeletes.second())
375+
.commit();
376+
// the resulting records in the table are the same as records1
377+
long snapshotId1 = table.currentSnapshot().snapshotId();
378+
379+
table.newAppend().appendFile(dataFile2).commit();
380+
long snapshotId2 = table.currentSnapshot().snapshotId();
381+
382+
List<InternalRow> rows = getChangelogRows();
383+
384+
List<Object[]> expectedRows = Lists.newArrayList();
385+
addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId1, 0, records1);
386+
addExpectedRows(expectedRows, ChangelogOperation.INSERT, snapshotId2, 1, records2);
387+
388+
assertEquals("Should have expected rows", expectedRows, internalRowsToJava(rows));
389+
}
390+
345391
private IncrementalChangelogScan newScan() {
346392
return table.newIncrementalChangelogScan();
347393
}

0 commit comments

Comments
 (0)