@@ -1294,6 +1294,143 @@ public void testSameBatchIndexFromTwoSubtasksMergedIntoOneSnapshot() throws Exce
12941294 Assertions .assertThat (result ).containsExactlyInAnyOrder ("1, a" , "2, b" );
12951295 }
12961296
1297+ /**
1298+ * Verifies no duplicates in the most complex parallel scenario: subtask 0 has data only before
1299+ * SC1, subtask 1 has data only between SC1 and SC2, and both have updates after SC2. This
1300+ * exercises all three batchIndex slots across two subtasks simultaneously and confirms that
1301+ * equality-deletes in batch 2 correctly suppress stale data from batches 0 and 1.
1302+ */
1303+ @ Test
1304+ public void testNoDuplicateWithMixedDataAcrossSubtasksAndMultipleSchemaChanges ()
1305+ throws Exception {
1306+ Map <String , String > catalogOptions = new HashMap <>();
1307+ catalogOptions .put ("type" , "hadoop" );
1308+ catalogOptions .put (
1309+ "warehouse" ,
1310+ new File (temporaryFolder .toFile (), UUID .randomUUID ().toString ()).toString ());
1311+ catalogOptions .put ("cache-enabled" , "false" );
1312+ Catalog catalog =
1313+ CatalogUtil .buildIcebergCatalog (
1314+ "cdc-iceberg-catalog" , catalogOptions , new Configuration ());
1315+ IcebergMetadataApplier icebergMetadataApplier = new IcebergMetadataApplier (catalogOptions );
1316+
1317+ String jobId = UUID .randomUUID ().toString ();
1318+ String operatorId = UUID .randomUUID ().toString ();
1319+
1320+ IcebergWriter writer0 =
1321+ new IcebergWriter (
1322+ catalogOptions , 0 , 1 , ZoneId .systemDefault (), 0 , jobId , operatorId );
1323+ IcebergWriter writer1 =
1324+ new IcebergWriter (
1325+ catalogOptions , 1 , 1 , ZoneId .systemDefault (), 0 , jobId , operatorId );
1326+
1327+ TableId tableId = TableId .parse ("test.iceberg_table" );
1328+ Schema schema0 =
1329+ Schema .newBuilder ()
1330+ .physicalColumn ("id" , DataTypes .BIGINT ().notNull ())
1331+ .physicalColumn ("name" , DataTypes .VARCHAR (100 ))
1332+ .primaryKey ("id" )
1333+ .build ();
1334+ CreateTableEvent createEvent = new CreateTableEvent (tableId , schema0 );
1335+ icebergMetadataApplier .applySchemaChange (createEvent );
1336+ writer0 .write (createEvent , null );
1337+ writer1 .write (createEvent , null );
1338+
1339+ BinaryRecordDataGenerator gen0 =
1340+ new BinaryRecordDataGenerator (
1341+ schema0 .getColumnDataTypes ().toArray (new DataType [0 ]));
1342+
1343+ // Batch 0: only subtask 0 has data before SC1.
1344+ writer0 .write (
1345+ DataChangeEvent .insertEvent (
1346+ tableId , gen0 .generate (new Object [] {1L , BinaryStringData .fromString ("a" )})),
1347+ null );
1348+ // Subtask 1 has no data before SC1.
1349+
1350+ // SC1 broadcast to both subtasks.
1351+ AddColumnEvent sc1 =
1352+ new AddColumnEvent (
1353+ tableId ,
1354+ Arrays .asList (
1355+ AddColumnEvent .last (
1356+ new PhysicalColumn (
1357+ "extra1" , DataTypes .STRING (), null , null ))));
1358+ icebergMetadataApplier .applySchemaChange (sc1 );
1359+ writer0 .write (sc1 , null ); // has writer → flush batchIndex=0; counter → 1
1360+ writer1 .write (sc1 , null ); // no writer → counter must still advance to 1
1361+
1362+ Schema schema1 = SchemaUtils .applySchemaChangeEvent (schema0 , sc1 );
1363+ BinaryRecordDataGenerator gen1 =
1364+ new BinaryRecordDataGenerator (
1365+ schema1 .getColumnDataTypes ().toArray (new DataType [0 ]));
1366+
1367+ // Batch 1: only subtask 1 has data between SC1 and SC2.
1368+ writer1 .write (
1369+ DataChangeEvent .insertEvent (
1370+ tableId ,
1371+ gen1 .generate (new Object [] {2L , BinaryStringData .fromString ("b" ), null })),
1372+ null );
1373+ // Subtask 0 has no data between SC1 and SC2.
1374+
1375+ // SC2 broadcast to both subtasks.
1376+ AddColumnEvent sc2 =
1377+ new AddColumnEvent (
1378+ tableId ,
1379+ Arrays .asList (
1380+ AddColumnEvent .last (
1381+ new PhysicalColumn (
1382+ "extra2" , DataTypes .STRING (), null , null ))));
1383+ icebergMetadataApplier .applySchemaChange (sc2 );
1384+ writer0 .write (sc2 , null ); // no writer → counter must still advance to 2
1385+ writer1 .write (sc2 , null ); // has writer → flush batchIndex=1; counter → 2
1386+
1387+ Schema schema2 = SchemaUtils .applySchemaChangeEvent (schema1 , sc2 );
1388+ BinaryRecordDataGenerator gen2 =
1389+ new BinaryRecordDataGenerator (
1390+ schema2 .getColumnDataTypes ().toArray (new DataType [0 ]));
1391+
1392+ // Batch 2: both subtasks update their respective rows after SC2.
1393+ // Subtask 0 updates id=1 "a" → "c"; subtask 1 updates id=2 "b" → "d".
1394+ writer0 .write (
1395+ DataChangeEvent .updateEvent (
1396+ tableId ,
1397+ gen2 .generate (new Object [] {1L , BinaryStringData .fromString ("a" ), null , null }),
1398+ gen2 .generate (
1399+ new Object [] {1L , BinaryStringData .fromString ("c" ), null , null })),
1400+ null );
1401+ writer1 .write (
1402+ DataChangeEvent .updateEvent (
1403+ tableId ,
1404+ gen2 .generate (new Object [] {2L , BinaryStringData .fromString ("b" ), null , null }),
1405+ gen2 .generate (
1406+ new Object [] {2L , BinaryStringData .fromString ("d" ), null , null })),
1407+ null );
1408+
1409+ List <WriteResultWrapper > allResults = new ArrayList <>();
1410+ allResults .addAll (writer0 .prepareCommit ());
1411+ allResults .addAll (writer1 .prepareCommit ());
1412+
1413+ // Expect 3 batches: {0: sub0}, {1: sub1}, {2: sub0+sub1}
1414+ long distinctBatchIndices =
1415+ allResults .stream ()
1416+ .mapToInt (WriteResultWrapper ::getBatchIndex )
1417+ .distinct ()
1418+ .count ();
1419+ Assertions .assertThat (distinctBatchIndices ).isEqualTo (3 );
1420+
1421+ IcebergCommitter committer = new IcebergCommitter (catalogOptions );
1422+ committer .commit (
1423+ allResults .stream ()
1424+ .map (MockCommitRequestImpl ::new )
1425+ .collect (Collectors .toList ()));
1426+
1427+ // Only the final values must survive. Equality-deletes in batch 2 (seq N+2) must suppress
1428+ // the stale inserts in batch 0 (seq N) and batch 1 (seq N+1).
1429+ List <String > result = fetchTableContent (catalog , tableId , null );
1430+ Assertions .assertThat (result )
1431+ .containsExactlyInAnyOrder ("1, c, null, null" , "2, d, null, null" );
1432+ }
1433+
12971434 private static long countSnapshots (Table table ) {
12981435 long count = 0 ;
12991436 for (Snapshot ignored : table .snapshots ()) {
0 commit comments