diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java index 75977744065..8d37bdb7024 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java @@ -47,6 +47,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.TreeMap; import static java.util.stream.Collectors.toList; import static org.apache.flink.runtime.checkpoint.CheckpointIDCounter.INITIAL_CHECKPOINT_ID; @@ -62,6 +63,12 @@ public class IcebergCommitter implements Committer { public static final String TABLE_GROUP_KEY = "table"; + /** Snapshot summary key for the batch index; used to resume partial commits on retry. */ + static final String FLINK_BATCH_INDEX = "flink.batch-index"; + + /** Snapshot summary key for the checkpoint ID on intermediate batch commits. */ + static final String FLINK_CHECKPOINT_ID_PROP = "flink.checkpoint-id"; + private final Catalog catalog; private final SinkCommitterMetricGroup metricGroup; @@ -96,74 +103,140 @@ private void commit(List writeResultWrappers) { if (writeResultWrappers.isEmpty()) { return; } - // all commits a same checkpoint-id long checkpointId = writeResultWrappers.get(0).getCheckpointId(); String newFlinkJobId = writeResultWrappers.get(0).getJobId(); String operatorId = writeResultWrappers.get(0).getOperatorId(); - Map> tableMap = new HashMap<>(); - for (WriteResultWrapper writeResultWrapper : writeResultWrappers) { - List writeResult = - tableMap.getOrDefault(writeResultWrapper.getTableId(), new ArrayList<>()); - writeResult.add(writeResultWrapper.getWriteResult()); - tableMap.put(writeResultWrapper.getTableId(), writeResult); - LOGGER.info(writeResultWrapper.buildDescription()); + Map> tableMap = new HashMap<>(); + for (WriteResultWrapper w : writeResultWrappers) { + tableMap.computeIfAbsent(w.getTableId(), k -> new ArrayList<>()).add(w); } - for (Map.Entry> entry : tableMap.entrySet()) { + + for (Map.Entry> entry : tableMap.entrySet()) { TableId tableId = entry.getKey(); + // Group by batchIndex so wrappers from different subtasks for the same batch + // are merged into one snapshot, not committed separately. + TreeMap> batchGroups = new TreeMap<>(); + for (WriteResultWrapper w : entry.getValue()) { + batchGroups.computeIfAbsent(w.getBatchIndex(), k -> new ArrayList<>()).add(w); + LOGGER.info(w.buildDescription()); + } + Table table = catalog.loadTable( TableIdentifier.of(tableId.getSchemaName(), tableId.getTableName())); + int startBatchIndex = 0; Snapshot snapshot = table.currentSnapshot(); if (snapshot != null) { Iterable ancestors = SnapshotUtil.ancestorsOf(snapshot.snapshotId(), table::snapshot); - long lastCheckpointId = + long lastCommittedCheckpointId = getMaxCommittedCheckpointId(ancestors, newFlinkJobId, operatorId); - if (lastCheckpointId == checkpointId) { + if (lastCommittedCheckpointId >= checkpointId) { LOGGER.warn( "Checkpoint id {} has been committed to table {}, skipping", checkpointId, tableId.identifier()); continue; } + ancestors = SnapshotUtil.ancestorsOf(snapshot.snapshotId(), table::snapshot); + startBatchIndex = + getLastCommittedBatchIndex( + ancestors, newFlinkJobId, operatorId, checkpointId) + + 1; } Optional tableMetric = getTableMetric(tableId); tableMetric.ifPresent(TableMetric::increaseCommitTimes); - List results = entry.getValue(); - List dataFiles = - results.stream() - .filter(payload -> payload.dataFiles() != null) - .flatMap(payload -> Arrays.stream(payload.dataFiles())) - .filter(dataFile -> dataFile.recordCount() > 0) - .collect(toList()); - List deleteFiles = - results.stream() - .filter(payload -> payload.deleteFiles() != null) - .flatMap(payload -> Arrays.stream(payload.deleteFiles())) - .filter(deleteFile -> deleteFile.recordCount() > 0) - .collect(toList()); - if (dataFiles.isEmpty() && deleteFiles.isEmpty()) { - LOGGER.info(String.format("Nothing to commit to table %s, skipping", table.name())); - } else { + int lastNonEmptyBatchIndex = -1; + for (Map.Entry> g : batchGroups.entrySet()) { + List df = collectDataFilesFromGroup(g.getValue()); + List del = collectDeleteFilesFromGroup(g.getValue()); + if (!df.isEmpty() || !del.isEmpty()) { + lastNonEmptyBatchIndex = g.getKey(); + } + } + + // Commit each batch as a separate snapshot so sequence numbers increase per batch. + for (Map.Entry> g : batchGroups.entrySet()) { + int batchIdx = g.getKey(); + if (batchIdx < startBatchIndex) { + LOGGER.info( + "Batch {} for checkpoint {} of table {} already committed, skipping", + batchIdx, + checkpointId, + tableId.identifier()); + continue; + } + + List dataFiles = collectDataFilesFromGroup(g.getValue()); + List deleteFiles = collectDeleteFilesFromGroup(g.getValue()); + + if (dataFiles.isEmpty() && deleteFiles.isEmpty()) { + LOGGER.info( + "Batch {} for checkpoint {} of table {} has nothing to commit, skipping", + batchIdx, + checkpointId, + tableId.identifier()); + continue; + } + + SnapshotUpdate operation; if (deleteFiles.isEmpty()) { AppendFiles append = table.newAppend(); dataFiles.forEach(append::appendFile); - commitOperation(append, newFlinkJobId, operatorId, checkpointId); + operation = append; } else { RowDelta delta = table.newRowDelta(); dataFiles.forEach(delta::addRows); deleteFiles.forEach(delta::addDeletes); - commitOperation(delta, newFlinkJobId, operatorId, checkpointId); + operation = delta; } + + operation.set(SinkUtil.FLINK_JOB_ID, newFlinkJobId); + operation.set(SinkUtil.OPERATOR_ID, operatorId); + operation.set(FLINK_BATCH_INDEX, String.valueOf(batchIdx)); + operation.set(FLINK_CHECKPOINT_ID_PROP, String.valueOf(checkpointId)); + if (batchIdx == lastNonEmptyBatchIndex) { + operation.set( + SinkUtil.MAX_COMMITTED_CHECKPOINT_ID, String.valueOf(checkpointId)); + } + operation.commit(); } } } + private static List collectDataFilesFromGroup(List group) { + return group.stream() + .flatMap(w -> collectDataFiles(w.getWriteResult()).stream()) + .collect(toList()); + } + + private static List collectDeleteFilesFromGroup(List group) { + return group.stream() + .flatMap(w -> collectDeleteFiles(w.getWriteResult()).stream()) + .collect(toList()); + } + + private static List collectDataFiles(WriteResult result) { + if (result.dataFiles() == null) { + return new ArrayList<>(); + } + return Arrays.stream(result.dataFiles()).filter(f -> f.recordCount() > 0).collect(toList()); + } + + private static List collectDeleteFiles(WriteResult result) { + if (result.deleteFiles() == null) { + return new ArrayList<>(); + } + return Arrays.stream(result.deleteFiles()) + .filter(f -> f.recordCount() > 0) + .collect(toList()); + } + private static long getMaxCommittedCheckpointId( Iterable ancestors, String flinkJobId, String operatorId) { long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID - 1; @@ -185,15 +258,35 @@ private static long getMaxCommittedCheckpointId( return lastCommittedCheckpointId; } - private static void commitOperation( - SnapshotUpdate operation, - String newFlinkJobId, - String operatorId, - long checkpointId) { - operation.set(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId)); - operation.set(SinkUtil.FLINK_JOB_ID, newFlinkJobId); - operation.set(SinkUtil.OPERATOR_ID, operatorId); - operation.commit(); + /** + * Returns the highest batch index already committed for the given checkpoint, or -1 if none. + * Used to skip already-persisted batches on retry. + */ + private static int getLastCommittedBatchIndex( + Iterable ancestors, String flinkJobId, String operatorId, long checkpointId) { + for (Snapshot ancestor : ancestors) { + Map summary = ancestor.summary(); + if (!flinkJobId.equals(summary.get(SinkUtil.FLINK_JOB_ID))) { + continue; + } + String snapshotOperatorId = summary.get(SinkUtil.OPERATOR_ID); + if (snapshotOperatorId != null && !snapshotOperatorId.equals(operatorId)) { + continue; + } + // Stop once we pass a fully-committed earlier checkpoint; intermediate batch + // snapshots for the current checkpoint lie between it and the current tip. + String maxCommittedStr = summary.get(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID); + if (maxCommittedStr != null && Long.parseLong(maxCommittedStr) < checkpointId) { + break; + } + String snapshotCheckpointId = summary.get(FLINK_CHECKPOINT_ID_PROP); + if (snapshotCheckpointId != null + && Long.parseLong(snapshotCheckpointId) == checkpointId) { + String batchIndexStr = summary.get(FLINK_BATCH_INDEX); + return batchIndexStr != null ? Integer.parseInt(batchIndexStr) : 0; + } + } + return -1; } private Optional getTableMetric(TableId tableId) { diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java index cbcd3b98eaa..9af29d88cda 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java @@ -72,6 +72,11 @@ public class IcebergWriter private final List temporaryWriteResult; + /** + * Per-table batch index; incremented on each schema-change flush, even when no writer exists. + */ + private Map tableBatchIndexMap; + private Catalog catalog; private final int taskId; @@ -102,6 +107,7 @@ public IcebergWriter( writerFactoryMap = new HashMap<>(); writerMap = new HashMap<>(); schemaMap = new HashMap<>(); + tableBatchIndexMap = new HashMap<>(); temporaryWriteResult = new ArrayList<>(); this.taskId = taskId; this.attemptId = attemptId; @@ -129,6 +135,7 @@ public Collection prepareCommit() throws IOException { list.addAll(temporaryWriteResult); list.addAll(getWriteResult()); temporaryWriteResult.clear(); + tableBatchIndexMap.clear(); lastCheckpointId++; return list; } @@ -166,6 +173,11 @@ public void write(Event event, Context context) throws IOException { } else { SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event; TableId tableId = schemaChangeEvent.tableId(); + // Flush only when the table is already known; skip on initial CreateTableEvent since + // no data has been written yet and there is nothing to split. + if (schemaMap.containsKey(tableId)) { + flushTableWriter(tableId); + } TableSchemaWrapper tableSchemaWrapper = schemaMap.get(tableId); Schema newSchema = @@ -179,21 +191,46 @@ public void write(Event event, Context context) throws IOException { @Override public void flush(boolean flush) throws IOException { - // Notice: flush method may be called many times during one checkpoint. - temporaryWriteResult.addAll(getWriteResult()); + // Clear the factory cache so the next write picks up the latest catalog schema. + // Writers keep running; schema-change splits are handled in flushTableWriter. + writerFactoryMap.clear(); + } + + private void flushTableWriter(TableId tableId) throws IOException { + TaskWriter writer = writerMap.remove(tableId); + // Advance even when no writer exists, to keep batchIndex in sync across subtasks. + int batchIndex = tableBatchIndexMap.getOrDefault(tableId, 0); + tableBatchIndexMap.put(tableId, batchIndex + 1); + if (writer == null) { + return; + } + WriteResultWrapper writeResultWrapper = + new WriteResultWrapper( + writer.complete(), + tableId, + lastCheckpointId + 1, + jobId, + operatorId, + batchIndex); + temporaryWriteResult.add(writeResultWrapper); + LOGGER.info(writeResultWrapper.buildDescription()); + writerFactoryMap.remove(tableId); } private List getWriteResult() throws IOException { long currentCheckpointId = lastCheckpointId + 1; List writeResults = new ArrayList<>(); for (Map.Entry> entry : writerMap.entrySet()) { + TableId tableId = entry.getKey(); + int batchIndex = tableBatchIndexMap.getOrDefault(tableId, 0); WriteResultWrapper writeResultWrapper = new WriteResultWrapper( entry.getValue().complete(), - entry.getKey(), + tableId, currentCheckpointId, jobId, - operatorId); + operatorId, + batchIndex); writeResults.add(writeResultWrapper); LOGGER.info(writeResultWrapper.buildDescription()); } @@ -225,6 +262,11 @@ public void close() throws Exception { writerFactoryMap = null; } + if (tableBatchIndexMap != null) { + tableBatchIndexMap.clear(); + tableBatchIndexMap = null; + } + catalog = null; } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapper.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapper.java index e64cc5535b2..3e8d733c5ba 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapper.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapper.java @@ -40,17 +40,31 @@ public class WriteResultWrapper implements Serializable { private final String operatorId; + /** Batch index within the checkpoint for this table; increments on each schema-change flush. */ + private final int batchIndex; + public WriteResultWrapper( WriteResult writeResult, TableId tableId, long checkpointId, String jobId, - String operatorId) { + String operatorId, + int batchIndex) { this.writeResult = writeResult; this.tableId = tableId; this.checkpointId = checkpointId; this.jobId = jobId; this.operatorId = operatorId; + this.batchIndex = batchIndex; + } + + public WriteResultWrapper( + WriteResult writeResult, + TableId tableId, + long checkpointId, + String jobId, + String operatorId) { + this(writeResult, tableId, checkpointId, jobId, operatorId, 0); } public WriteResult getWriteResult() { @@ -73,6 +87,10 @@ public String getOperatorId() { return operatorId; } + public int getBatchIndex() { + return batchIndex; + } + /** Build a simple description for the write result. */ public String buildDescription() { long addCount = 0; @@ -95,6 +113,8 @@ public String buildDescription() { + jobId + ", OperatorId: " + operatorId + + ", BatchIndex: " + + batchIndex + ", AddCount: " + addCount + ", DeleteCount: " diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java index 745aea81f96..c1b9dbb0d58 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java @@ -42,7 +42,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.RowDelta; +import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; @@ -66,6 +70,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -596,6 +601,981 @@ public void testWithRepeatCommit() throws Exception { Assertions.assertThat(result).containsExactlyInAnyOrder("1, char1", "2, char2"); } + @Test + public void testNoDuplicateWhenFlushSplitsSamePkUpdatesWithinCheckpoint() throws Exception { + Map catalogOptions = new HashMap<>(); + String warehouse = + new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString(); + catalogOptions.put("type", "hadoop"); + catalogOptions.put("warehouse", warehouse); + catalogOptions.put("cache-enabled", "false"); + Catalog catalog = + CatalogUtil.buildIcebergCatalog( + "cdc-iceberg-catalog", catalogOptions, new Configuration()); + + String jobId = UUID.randomUUID().toString(); + String operatorId = UUID.randomUUID().toString(); + IcebergWriter icebergWriter = + new IcebergWriter( + catalogOptions, + 1, + 1, + ZoneId.systemDefault(), + 0, + jobId, + operatorId, + new HashMap<>()); + IcebergMetadataApplier icebergMetadataApplier = new IcebergMetadataApplier(catalogOptions); + + TableId tableId = TableId.parse("test.iceberg_table"); + CreateTableEvent createTableEvent = + new CreateTableEvent( + tableId, + Schema.newBuilder() + .physicalColumn("id", DataTypes.BIGINT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(100)) + .primaryKey("id") + .build()); + icebergMetadataApplier.applySchemaChange(createTableEvent); + icebergWriter.write(createTableEvent, null); + + BinaryRecordDataGenerator generator = + new BinaryRecordDataGenerator( + createTableEvent.getSchema().getColumnDataTypes().toArray(new DataType[0])); + + RecordData recordA = + generator.generate(new Object[] {1L, BinaryStringData.fromString("a")}); + RecordData recordB = + generator.generate(new Object[] {1L, BinaryStringData.fromString("b")}); + + icebergWriter.write(DataChangeEvent.insertEvent(tableId, recordA), null); + // flush resets the factory cache but keeps the writer running, so the update + // lands in the same writer as the insert and uses a position delete. + icebergWriter.flush(false); + icebergWriter.write(DataChangeEvent.updateEvent(tableId, recordA, recordB), null); + + Collection writeResults = icebergWriter.prepareCommit(); + + // Both writes went through the same writer (batchIndex 0) since flush only + // cleared the factory. Position delete handles dedup within the snapshot. + List batchIndexes = + writeResults.stream() + .map(WriteResultWrapper::getBatchIndex) + .sorted() + .collect(Collectors.toList()); + Assertions.assertThat(batchIndexes).containsExactly(0); + + IcebergCommitter icebergCommitter = new IcebergCommitter(catalogOptions, new HashMap<>()); + Collection> collection = + writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList()); + icebergCommitter.commit(collection); + + List result = fetchTableContent(catalog, tableId, null); + Assertions.assertThat(result).containsExactly("1, b"); + } + + /** + * A schema change mid-checkpoint splits writes into two batches for the same table. Both + * batches must not land in the same Iceberg snapshot: files in one snapshot share the same + * seq_num, so the equality-delete from batch 1 would silently miss the insert from batch 0. + * Each batch gets its own snapshot so the delete's seq_num is strictly higher than the data it + * targets. + */ + @Test + public void testNoDuplicateWhenSchemaChangeFlushSplitsSamePkUpdates() throws Exception { + Map catalogOptions = new HashMap<>(); + String warehouse = + new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString(); + catalogOptions.put("type", "hadoop"); + catalogOptions.put("warehouse", warehouse); + catalogOptions.put("cache-enabled", "false"); + Catalog catalog = + CatalogUtil.buildIcebergCatalog( + "cdc-iceberg-catalog", catalogOptions, new Configuration()); + + String jobId = UUID.randomUUID().toString(); + String operatorId = UUID.randomUUID().toString(); + IcebergWriter icebergWriter = + new IcebergWriter( + catalogOptions, + 1, + 1, + ZoneId.systemDefault(), + 0, + jobId, + operatorId, + new HashMap<>()); + IcebergMetadataApplier icebergMetadataApplier = new IcebergMetadataApplier(catalogOptions); + + TableId tableId = TableId.parse("test.iceberg_table"); + Schema initialSchema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.BIGINT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(100)) + .primaryKey("id") + .build(); + CreateTableEvent createTableEvent = new CreateTableEvent(tableId, initialSchema); + icebergMetadataApplier.applySchemaChange(createTableEvent); + icebergWriter.write(createTableEvent, null); + + BinaryRecordDataGenerator oldGenerator = + new BinaryRecordDataGenerator( + initialSchema.getColumnDataTypes().toArray(new DataType[0])); + + // Insert (id=1, name="a") — goes into writer batch 0. + RecordData recordA = + oldGenerator.generate(new Object[] {1L, BinaryStringData.fromString("a")}); + icebergWriter.write(DataChangeEvent.insertEvent(tableId, recordA), null); + + // Schema change: AddColumn triggers flushTableWriter, completing batch 0. + AddColumnEvent addColumnEvent = + new AddColumnEvent( + tableId, + Arrays.asList( + AddColumnEvent.last( + new PhysicalColumn( + "extra", DataTypes.STRING(), null, null)))); + icebergMetadataApplier.applySchemaChange(addColumnEvent); + icebergWriter.write(addColumnEvent, null); + + // Update (id=1) with the new schema — goes into writer batch 1. + Schema newSchema = SchemaUtils.applySchemaChangeEvent(initialSchema, addColumnEvent); + BinaryRecordDataGenerator newGenerator = + new BinaryRecordDataGenerator( + newSchema.getColumnDataTypes().toArray(new DataType[0])); + RecordData recordANew = + newGenerator.generate(new Object[] {1L, BinaryStringData.fromString("a"), null}); + RecordData recordB = + newGenerator.generate(new Object[] {1L, BinaryStringData.fromString("b"), null}); + icebergWriter.write(DataChangeEvent.updateEvent(tableId, recordANew, recordB), null); + + Collection writeResults = icebergWriter.prepareCommit(); + IcebergCommitter icebergCommitter = new IcebergCommitter(catalogOptions, new HashMap<>()); + Collection> collection = + writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList()); + icebergCommitter.commit(collection); + + // Expect only (1, b, null) — batch 0's stale (1, a, null) must be deleted. + List result = fetchTableContent(catalog, tableId, null); + Assertions.assertThat(result).containsExactly("1, b, null"); + } + + /** + * Verifies idempotency when the committer crashes after committing batch 0 but before + * committing batch 1 of the same checkpoint. + * + *

On retry, Flink re-delivers the full collection of committables for that checkpoint. The + * committer must detect that batch 0's snapshot is already present in the table history (via + * {@code flink.batch-index} and {@code flink.checkpoint-id} snapshot properties) and skip it, + * then commit only batch 1. Without this skip, batch 0's files would be added a second time, + * causing duplicate records. + */ + @Test + public void testRetryAfterPartialBatchCommit() throws Exception { + Map catalogOptions = new HashMap<>(); + String warehouse = + new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString(); + catalogOptions.put("type", "hadoop"); + catalogOptions.put("warehouse", warehouse); + catalogOptions.put("cache-enabled", "false"); + Catalog catalog = + CatalogUtil.buildIcebergCatalog( + "cdc-iceberg-catalog", catalogOptions, new Configuration()); + + String jobId = UUID.randomUUID().toString(); + String operatorId = UUID.randomUUID().toString(); + IcebergWriter icebergWriter = + new IcebergWriter( + catalogOptions, + 1, + 1, + ZoneId.systemDefault(), + 0, + jobId, + operatorId, + new HashMap<>()); + IcebergMetadataApplier icebergMetadataApplier = new IcebergMetadataApplier(catalogOptions); + + TableId tableId = TableId.parse("test.iceberg_table"); + Schema initialSchema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.BIGINT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(100)) + .primaryKey("id") + .build(); + CreateTableEvent createTableEvent = new CreateTableEvent(tableId, initialSchema); + icebergMetadataApplier.applySchemaChange(createTableEvent); + icebergWriter.write(createTableEvent, null); + + BinaryRecordDataGenerator oldGenerator = + new BinaryRecordDataGenerator( + initialSchema.getColumnDataTypes().toArray(new DataType[0])); + RecordData recordA = + oldGenerator.generate(new Object[] {1L, BinaryStringData.fromString("a")}); + icebergWriter.write(DataChangeEvent.insertEvent(tableId, recordA), null); + + AddColumnEvent addColumnEvent = + new AddColumnEvent( + tableId, + Arrays.asList( + AddColumnEvent.last( + new PhysicalColumn( + "extra", DataTypes.STRING(), null, null)))); + icebergMetadataApplier.applySchemaChange(addColumnEvent); + icebergWriter.write(addColumnEvent, null); + + Schema newSchema = SchemaUtils.applySchemaChangeEvent(initialSchema, addColumnEvent); + BinaryRecordDataGenerator newGenerator = + new BinaryRecordDataGenerator( + newSchema.getColumnDataTypes().toArray(new DataType[0])); + RecordData recordANew = + newGenerator.generate(new Object[] {1L, BinaryStringData.fromString("a"), null}); + RecordData recordB = + newGenerator.generate(new Object[] {1L, BinaryStringData.fromString("b"), null}); + icebergWriter.write(DataChangeEvent.updateEvent(tableId, recordANew, recordB), null); + + Collection writeResults = icebergWriter.prepareCommit(); + List sortedBatches = + writeResults.stream() + .sorted(Comparator.comparingInt(WriteResultWrapper::getBatchIndex)) + .collect(Collectors.toList()); + Assertions.assertThat(sortedBatches).hasSize(2); + Assertions.assertThat(sortedBatches.get(0).getBatchIndex()).isEqualTo(0); + Assertions.assertThat(sortedBatches.get(1).getBatchIndex()).isEqualTo(1); + + // Simulate a partial commit: manually commit only batch 0 using the Iceberg API, + // setting the intermediate batch properties but NOT MAX_COMMITTED_CHECKPOINT_ID. + // This replicates the on-disk state left behind when the committer crashes mid-checkpoint. + long checkpointId = sortedBatches.get(0).getCheckpointId(); + Table table = + catalog.loadTable( + TableIdentifier.of(tableId.getSchemaName(), tableId.getTableName())); + RowDelta partialDelta = table.newRowDelta(); + WriteResultWrapper batch0 = sortedBatches.get(0); + if (batch0.getWriteResult().dataFiles() != null) { + for (DataFile f : batch0.getWriteResult().dataFiles()) { + partialDelta.addRows(f); + } + } + if (batch0.getWriteResult().deleteFiles() != null) { + for (DeleteFile f : batch0.getWriteResult().deleteFiles()) { + partialDelta.addDeletes(f); + } + } + partialDelta.set(SinkUtil.FLINK_JOB_ID, jobId); + partialDelta.set(SinkUtil.OPERATOR_ID, operatorId); + partialDelta.set(IcebergCommitter.FLINK_BATCH_INDEX, "0"); + partialDelta.set(IcebergCommitter.FLINK_CHECKPOINT_ID_PROP, String.valueOf(checkpointId)); + // Intentionally omitting MAX_COMMITTED_CHECKPOINT_ID — this is an incomplete checkpoint. + partialDelta.commit(); + + // Retry: Flink re-delivers all committables for the checkpoint. + IcebergCommitter icebergCommitter = new IcebergCommitter(catalogOptions, new HashMap<>()); + Collection> collection = + writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList()); + icebergCommitter.commit(collection); + + // Batch 0 must be skipped (its snapshot is already present); only batch 1 is committed. + // Batch 1's eqDelete (higher sequence number) supersedes batch 0's data file. + List result = fetchTableContent(catalog, tableId, null); + Assertions.assertThat(result).containsExactly("1, b, null"); + + // Verify the final snapshot carries MAX_COMMITTED_CHECKPOINT_ID. + Map finalSummary = + catalog.loadTable( + TableIdentifier.of(tableId.getSchemaName(), tableId.getTableName())) + .currentSnapshot() + .summary(); + Assertions.assertThat(finalSummary.get(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID)) + .isEqualTo(String.valueOf(checkpointId)); + } + + /** + * Verifies that two schema-change events within a single checkpoint (producing three batches) + * do not cause duplicate records for the same primary key. + * + *

Timeline: INSERT(id=1,"a") → AddColumn1 flush → UPDATE(id=1,"b") → AddColumn2 flush → + * UPDATE(id=1,"c") → commit. The three batches are committed as three sequential Iceberg + * snapshots (seq=N, N+1, N+2), so each batch's equality-delete supersedes all earlier data. + */ + @Test + public void testNoDuplicateWithMultipleSchemaChangesInOneCheckpoint() throws Exception { + Map catalogOptions = new HashMap<>(); + String warehouse = + new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString(); + catalogOptions.put("type", "hadoop"); + catalogOptions.put("warehouse", warehouse); + catalogOptions.put("cache-enabled", "false"); + Catalog catalog = + CatalogUtil.buildIcebergCatalog( + "cdc-iceberg-catalog", catalogOptions, new Configuration()); + + String jobId = UUID.randomUUID().toString(); + String operatorId = UUID.randomUUID().toString(); + IcebergWriter icebergWriter = + new IcebergWriter( + catalogOptions, + 1, + 1, + ZoneId.systemDefault(), + 0, + jobId, + operatorId, + new HashMap<>()); + IcebergMetadataApplier icebergMetadataApplier = new IcebergMetadataApplier(catalogOptions); + + TableId tableId = TableId.parse("test.iceberg_table"); + Schema schema0 = + Schema.newBuilder() + .physicalColumn("id", DataTypes.BIGINT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(100)) + .primaryKey("id") + .build(); + CreateTableEvent createTableEvent = new CreateTableEvent(tableId, schema0); + icebergMetadataApplier.applySchemaChange(createTableEvent); + icebergWriter.write(createTableEvent, null); + + // Batch 0: INSERT(id=1,"a") + BinaryRecordDataGenerator gen0 = + new BinaryRecordDataGenerator( + schema0.getColumnDataTypes().toArray(new DataType[0])); + RecordData r0a = gen0.generate(new Object[] {1L, BinaryStringData.fromString("a")}); + icebergWriter.write(DataChangeEvent.insertEvent(tableId, r0a), null); + + // First schema change → flushTableWriter → batch 0 complete, batch index starts at 1. + AddColumnEvent addCol1 = + new AddColumnEvent( + tableId, + Arrays.asList( + AddColumnEvent.last( + new PhysicalColumn( + "extra1", DataTypes.STRING(), null, null)))); + icebergMetadataApplier.applySchemaChange(addCol1); + icebergWriter.write(addCol1, null); + + // Batch 1: UPDATE(id=1,"a"→"b") with schema {id, name, extra1} + Schema schema1 = SchemaUtils.applySchemaChangeEvent(schema0, addCol1); + BinaryRecordDataGenerator gen1 = + new BinaryRecordDataGenerator( + schema1.getColumnDataTypes().toArray(new DataType[0])); + RecordData r1a = gen1.generate(new Object[] {1L, BinaryStringData.fromString("a"), null}); + RecordData r1b = gen1.generate(new Object[] {1L, BinaryStringData.fromString("b"), null}); + icebergWriter.write(DataChangeEvent.updateEvent(tableId, r1a, r1b), null); + + // Second schema change → flushTableWriter → batch 1 complete, batch index now at 2. + AddColumnEvent addCol2 = + new AddColumnEvent( + tableId, + Arrays.asList( + AddColumnEvent.last( + new PhysicalColumn( + "extra2", DataTypes.STRING(), null, null)))); + icebergMetadataApplier.applySchemaChange(addCol2); + icebergWriter.write(addCol2, null); + + // Batch 2: UPDATE(id=1,"b"→"c") with schema {id, name, extra1, extra2} + Schema schema2 = SchemaUtils.applySchemaChangeEvent(schema1, addCol2); + BinaryRecordDataGenerator gen2 = + new BinaryRecordDataGenerator( + schema2.getColumnDataTypes().toArray(new DataType[0])); + RecordData r2b = + gen2.generate(new Object[] {1L, BinaryStringData.fromString("b"), null, null}); + RecordData r2c = + gen2.generate(new Object[] {1L, BinaryStringData.fromString("c"), null, null}); + icebergWriter.write(DataChangeEvent.updateEvent(tableId, r2b, r2c), null); + + // Verify three batches with indices 0, 1, 2 were produced. + Collection writeResults = icebergWriter.prepareCommit(); + List sortedBatches = + writeResults.stream() + .sorted(Comparator.comparingInt(WriteResultWrapper::getBatchIndex)) + .collect(Collectors.toList()); + Assertions.assertThat(sortedBatches).hasSize(3); + Assertions.assertThat(sortedBatches.get(0).getBatchIndex()).isEqualTo(0); + Assertions.assertThat(sortedBatches.get(1).getBatchIndex()).isEqualTo(1); + Assertions.assertThat(sortedBatches.get(2).getBatchIndex()).isEqualTo(2); + + IcebergCommitter icebergCommitter = new IcebergCommitter(catalogOptions, new HashMap<>()); + Collection> collection = + writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList()); + icebergCommitter.commit(collection); + + // Only the final value (id=1,"c") should survive; stale "a" and "b" must be deleted. + List result = fetchTableContent(catalog, tableId, null); + Assertions.assertThat(result).containsExactly("1, c, null, null"); + } + + /** + * Verifies that a schema-change flush on tableA does not affect tableB. TableB has no schema + * change and commits as a single batch, while tableA's two batches are committed sequentially. + * Both tables must contain exactly the correct final records. + */ + @Test + public void testSchemaChangeFlushDoesNotAffectOtherTable() throws Exception { + Map catalogOptions = new HashMap<>(); + String warehouse = + new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString(); + catalogOptions.put("type", "hadoop"); + catalogOptions.put("warehouse", warehouse); + catalogOptions.put("cache-enabled", "false"); + Catalog catalog = + CatalogUtil.buildIcebergCatalog( + "cdc-iceberg-catalog", catalogOptions, new Configuration()); + + String jobId = UUID.randomUUID().toString(); + String operatorId = UUID.randomUUID().toString(); + IcebergWriter icebergWriter = + new IcebergWriter( + catalogOptions, + 1, + 1, + ZoneId.systemDefault(), + 0, + jobId, + operatorId, + new HashMap<>()); + IcebergMetadataApplier icebergMetadataApplier = new IcebergMetadataApplier(catalogOptions); + + TableId tableA = TableId.parse("test.table_a"); + TableId tableB = TableId.parse("test.table_b"); + + Schema schemaA = + Schema.newBuilder() + .physicalColumn("id", DataTypes.BIGINT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(100)) + .primaryKey("id") + .build(); + Schema schemaB = + Schema.newBuilder() + .physicalColumn("id", DataTypes.BIGINT().notNull()) + .physicalColumn("value", DataTypes.VARCHAR(100)) + .primaryKey("id") + .build(); + + icebergMetadataApplier.applySchemaChange(new CreateTableEvent(tableA, schemaA)); + icebergMetadataApplier.applySchemaChange(new CreateTableEvent(tableB, schemaB)); + icebergWriter.write(new CreateTableEvent(tableA, schemaA), null); + icebergWriter.write(new CreateTableEvent(tableB, schemaB), null); + + BinaryRecordDataGenerator genA = + new BinaryRecordDataGenerator( + schemaA.getColumnDataTypes().toArray(new DataType[0])); + BinaryRecordDataGenerator genB = + new BinaryRecordDataGenerator( + schemaB.getColumnDataTypes().toArray(new DataType[0])); + + // TableA: INSERT(id=1,"a") → schema change flush → UPDATE(id=1,"b") [2 batches] + RecordData rAa = genA.generate(new Object[] {1L, BinaryStringData.fromString("a")}); + icebergWriter.write(DataChangeEvent.insertEvent(tableA, rAa), null); + + AddColumnEvent addColA = + new AddColumnEvent( + tableA, + Arrays.asList( + AddColumnEvent.last( + new PhysicalColumn( + "extra", DataTypes.STRING(), null, null)))); + icebergMetadataApplier.applySchemaChange(addColA); + icebergWriter.write(addColA, null); + + Schema schemaANew = SchemaUtils.applySchemaChangeEvent(schemaA, addColA); + BinaryRecordDataGenerator genANew = + new BinaryRecordDataGenerator( + schemaANew.getColumnDataTypes().toArray(new DataType[0])); + RecordData rAaNew = + genANew.generate(new Object[] {1L, BinaryStringData.fromString("a"), null}); + RecordData rAb = + genANew.generate(new Object[] {1L, BinaryStringData.fromString("b"), null}); + icebergWriter.write(DataChangeEvent.updateEvent(tableA, rAaNew, rAb), null); + + // TableB: INSERT(id=2,"x") → UPDATE(id=2,"y") in the same checkpoint, no schema change. + // Both events land in tableB's single writer (no flush), so dedup is handled internally. + RecordData rBx = genB.generate(new Object[] {2L, BinaryStringData.fromString("x")}); + RecordData rBy = genB.generate(new Object[] {2L, BinaryStringData.fromString("y")}); + icebergWriter.write(DataChangeEvent.insertEvent(tableB, rBx), null); + icebergWriter.write(DataChangeEvent.updateEvent(tableB, rBx, rBy), null); + + Collection writeResults = icebergWriter.prepareCommit(); + + // TableA should produce 2 batches; tableB should produce 1 batch. + Map batchCountByTable = + writeResults.stream() + .collect( + Collectors.groupingBy( + WriteResultWrapper::getTableId, Collectors.counting())); + Assertions.assertThat(batchCountByTable.get(tableA)).isEqualTo(2); + Assertions.assertThat(batchCountByTable.get(tableB)).isEqualTo(1); + + IcebergCommitter icebergCommitter = new IcebergCommitter(catalogOptions, new HashMap<>()); + Collection> collection = + writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList()); + icebergCommitter.commit(collection); + + // TableA: only final value survives (stale "a" deleted by batch 1's eqDelete). + List resultA = fetchTableContent(catalog, tableA, null); + Assertions.assertThat(resultA).containsExactly("1, b, null"); + + // TableB: only final value survives (internal position-delete handles dedup within writer). + List resultB = fetchTableContent(catalog, tableB, null); + Assertions.assertThat(resultB).containsExactly("2, y"); + } + + /** + * Verifies that batchIndex stays in sync across subtasks even when a subtask has no writer for + * the table at schema-change flush time (parallelism > 1 scenario). + */ + @Test + public void testBatchIndexInSyncWhenSubtaskHasNoWriterAtSchemaChange() throws Exception { + Map catalogOptions = new HashMap<>(); + catalogOptions.put("type", "hadoop"); + catalogOptions.put( + "warehouse", + new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString()); + catalogOptions.put("cache-enabled", "false"); + IcebergMetadataApplier icebergMetadataApplier = new IcebergMetadataApplier(catalogOptions); + + String jobId = UUID.randomUUID().toString(); + String operatorId = UUID.randomUUID().toString(); + + // Two subtask writers sharing the same catalog and table. + IcebergWriter writer0 = + new IcebergWriter( + catalogOptions, + 0, + 1, + ZoneId.systemDefault(), + 0, + jobId, + operatorId, + new HashMap<>()); + IcebergWriter writer1 = + new IcebergWriter( + catalogOptions, + 1, + 1, + ZoneId.systemDefault(), + 0, + jobId, + operatorId, + new HashMap<>()); + + TableId tableId = TableId.parse("test.iceberg_table"); + Schema schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.BIGINT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(100)) + .primaryKey("id") + .build(); + CreateTableEvent createEvent = new CreateTableEvent(tableId, schema); + icebergMetadataApplier.applySchemaChange(createEvent); + writer0.write(createEvent, null); + writer1.write(createEvent, null); + + BinaryRecordDataGenerator gen = + new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0])); + + // Only subtask 0 has data before the schema change. + writer0.write( + DataChangeEvent.insertEvent( + tableId, gen.generate(new Object[] {1L, BinaryStringData.fromString("a")})), + null); + // Subtask 1 has no writer for the table yet. + + // Both subtasks receive the same SchemaChangeEvent (broadcast). + AddColumnEvent addColumn = + new AddColumnEvent( + tableId, + Arrays.asList( + AddColumnEvent.last( + new PhysicalColumn( + "extra", DataTypes.STRING(), null, null)))); + icebergMetadataApplier.applySchemaChange(addColumn); + writer0.write(addColumn, null); // has writer → flushes at batchIndex=0; counter → 1 + writer1.write(addColumn, null); // no writer → counter must still advance to 1 + + Schema newSchema = SchemaUtils.applySchemaChangeEvent(schema, addColumn); + BinaryRecordDataGenerator newGen = + new BinaryRecordDataGenerator( + newSchema.getColumnDataTypes().toArray(new DataType[0])); + + // Subtask 1 writes data after the schema change. + writer1.write( + DataChangeEvent.insertEvent( + tableId, + newGen.generate(new Object[] {2L, BinaryStringData.fromString("b"), null})), + null); + + Collection results0 = writer0.prepareCommit(); + Collection results1 = writer1.prepareCommit(); + + // subtask 0: one batch at batchIndex=0 (pre-schema-change flush) + Assertions.assertThat(results0).hasSize(1); + Assertions.assertThat(results0.iterator().next().getBatchIndex()).isEqualTo(0); + + // subtask 1: must be at batchIndex=1, not 0 — counter advanced at E1 even without a writer + Assertions.assertThat(results1).hasSize(1); + Assertions.assertThat(results1.iterator().next().getBatchIndex()).isEqualTo(1); + } + + /** + * Verifies no duplicates when parallel subtasks share a table and one subtask has no data + * before the schema-change flush while the other has an UPDATE that produces an + * equality-delete. + */ + @Test + public void testNoDuplicateWithParallelSubtasksMissingPreSchemaChangeData() throws Exception { + Map catalogOptions = new HashMap<>(); + String warehouse = + new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString(); + catalogOptions.put("type", "hadoop"); + catalogOptions.put("warehouse", warehouse); + catalogOptions.put("cache-enabled", "false"); + Catalog catalog = + CatalogUtil.buildIcebergCatalog( + "cdc-iceberg-catalog", catalogOptions, new Configuration()); + IcebergMetadataApplier icebergMetadataApplier = new IcebergMetadataApplier(catalogOptions); + + String jobId = UUID.randomUUID().toString(); + String operatorId = UUID.randomUUID().toString(); + + IcebergWriter writer0 = + new IcebergWriter( + catalogOptions, + 0, + 1, + ZoneId.systemDefault(), + 0, + jobId, + operatorId, + new HashMap<>()); + IcebergWriter writer1 = + new IcebergWriter( + catalogOptions, + 1, + 1, + ZoneId.systemDefault(), + 0, + jobId, + operatorId, + new HashMap<>()); + + TableId tableId = TableId.parse("test.iceberg_table"); + Schema initialSchema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.BIGINT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(100)) + .primaryKey("id") + .build(); + CreateTableEvent createEvent = new CreateTableEvent(tableId, initialSchema); + icebergMetadataApplier.applySchemaChange(createEvent); + writer0.write(createEvent, null); + writer1.write(createEvent, null); + + BinaryRecordDataGenerator oldGen = + new BinaryRecordDataGenerator( + initialSchema.getColumnDataTypes().toArray(new DataType[0])); + + // Subtask 1 writes the "old" row before the schema change. + writer1.write( + DataChangeEvent.insertEvent( + tableId, + oldGen.generate(new Object[] {1L, BinaryStringData.fromString("old")})), + null); + // Subtask 0 has no data for the table yet. + + // Schema-change E1 is broadcast to both subtasks. + AddColumnEvent addColumn = + new AddColumnEvent( + tableId, + Arrays.asList( + AddColumnEvent.last( + new PhysicalColumn( + "extra", DataTypes.STRING(), null, null)))); + icebergMetadataApplier.applySchemaChange(addColumn); + writer0.write(addColumn, null); // no writer → batchIndex must still advance to 1 (fix) + writer1.write(addColumn, null); // has writer → flushes "old" at batchIndex=0; counter → 1 + + // Subtask 0 processes the UPDATE after E1, using the new schema. + Schema newSchema = SchemaUtils.applySchemaChangeEvent(initialSchema, addColumn); + BinaryRecordDataGenerator newGen = + new BinaryRecordDataGenerator( + newSchema.getColumnDataTypes().toArray(new DataType[0])); + RecordData before = + newGen.generate(new Object[] {1L, BinaryStringData.fromString("old"), null}); + RecordData after = + newGen.generate(new Object[] {1L, BinaryStringData.fromString("new"), null}); + writer0.write(DataChangeEvent.updateEvent(tableId, before, after), null); + + // Collect and commit all results from both subtasks. + List allResults = new ArrayList<>(); + allResults.addAll(writer0.prepareCommit()); + allResults.addAll(writer1.prepareCommit()); + + IcebergCommitter committer = new IcebergCommitter(catalogOptions, new HashMap<>()); + committer.commit( + allResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList())); + + // Only the updated value must survive; "old" must be deleted by the equality-delete in + // batch 1 (higher sequence number). Without the fix both rows appear. + List result = fetchTableContent(catalog, tableId, null); + Assertions.assertThat(result).containsExactly("1, new, null"); + } + + /** + * Verifies that wrappers from two subtasks sharing the same batchIndex are merged into exactly + * one Iceberg snapshot, not two. This directly tests the committer-side grouping fix. + */ + @Test + public void testSameBatchIndexFromTwoSubtasksMergedIntoOneSnapshot() throws Exception { + Map catalogOptions = new HashMap<>(); + catalogOptions.put("type", "hadoop"); + catalogOptions.put( + "warehouse", + new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString()); + catalogOptions.put("cache-enabled", "false"); + Catalog catalog = + CatalogUtil.buildIcebergCatalog( + "cdc-iceberg-catalog", catalogOptions, new Configuration()); + IcebergMetadataApplier icebergMetadataApplier = new IcebergMetadataApplier(catalogOptions); + + String jobId = UUID.randomUUID().toString(); + String operatorId = UUID.randomUUID().toString(); + + IcebergWriter writer0 = + new IcebergWriter( + catalogOptions, + 0, + 1, + ZoneId.systemDefault(), + 0, + jobId, + operatorId, + new HashMap<>()); + IcebergWriter writer1 = + new IcebergWriter( + catalogOptions, + 1, + 1, + ZoneId.systemDefault(), + 0, + jobId, + operatorId, + new HashMap<>()); + + TableId tableId = TableId.parse("test.iceberg_table"); + Schema schema = + Schema.newBuilder() + .physicalColumn("id", DataTypes.BIGINT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(100)) + .primaryKey("id") + .build(); + CreateTableEvent createEvent = new CreateTableEvent(tableId, schema); + icebergMetadataApplier.applySchemaChange(createEvent); + writer0.write(createEvent, null); + writer1.write(createEvent, null); + + BinaryRecordDataGenerator gen = + new BinaryRecordDataGenerator(schema.getColumnDataTypes().toArray(new DataType[0])); + + // Both subtasks write data with no schema change, so both produce batchIndex=0. + writer0.write( + DataChangeEvent.insertEvent( + tableId, gen.generate(new Object[] {1L, BinaryStringData.fromString("a")})), + null); + writer1.write( + DataChangeEvent.insertEvent( + tableId, gen.generate(new Object[] {2L, BinaryStringData.fromString("b")})), + null); + + List allResults = new ArrayList<>(); + allResults.addAll(writer0.prepareCommit()); + allResults.addAll(writer1.prepareCommit()); + + // Both wrappers carry batchIndex=0. + Assertions.assertThat(allResults).hasSize(2); + Assertions.assertThat( + allResults.stream() + .mapToInt(WriteResultWrapper::getBatchIndex) + .distinct() + .count()) + .isEqualTo(1); + + Table table = + catalog.loadTable( + TableIdentifier.of(tableId.getSchemaName(), tableId.getTableName())); + long snapshotsBefore = countSnapshots(table); + + IcebergCommitter committer = new IcebergCommitter(catalogOptions, new HashMap<>()); + committer.commit( + allResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList())); + + table.refresh(); + long snapshotsAfter = countSnapshots(table); + + // Two wrappers with the same batchIndex must produce exactly ONE new snapshot, not two. + Assertions.assertThat(snapshotsAfter - snapshotsBefore).isEqualTo(1); + + List result = fetchTableContent(catalog, tableId, null); + Assertions.assertThat(result).containsExactlyInAnyOrder("1, a", "2, b"); + } + + /** + * Verifies no duplicates in the most complex parallel scenario: subtask 0 has data only before + * SC1, subtask 1 has data only between SC1 and SC2, and both have updates after SC2. This + * exercises all three batchIndex slots across two subtasks simultaneously and confirms that + * equality-deletes in batch 2 correctly suppress stale data from batches 0 and 1. + */ + @Test + public void testNoDuplicateWithMixedDataAcrossSubtasksAndMultipleSchemaChanges() + throws Exception { + Map catalogOptions = new HashMap<>(); + catalogOptions.put("type", "hadoop"); + catalogOptions.put( + "warehouse", + new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString()); + catalogOptions.put("cache-enabled", "false"); + Catalog catalog = + CatalogUtil.buildIcebergCatalog( + "cdc-iceberg-catalog", catalogOptions, new Configuration()); + IcebergMetadataApplier icebergMetadataApplier = new IcebergMetadataApplier(catalogOptions); + + String jobId = UUID.randomUUID().toString(); + String operatorId = UUID.randomUUID().toString(); + + IcebergWriter writer0 = + new IcebergWriter( + catalogOptions, + 0, + 1, + ZoneId.systemDefault(), + 0, + jobId, + operatorId, + new HashMap<>()); + IcebergWriter writer1 = + new IcebergWriter( + catalogOptions, + 1, + 1, + ZoneId.systemDefault(), + 0, + jobId, + operatorId, + new HashMap<>()); + + TableId tableId = TableId.parse("test.iceberg_table"); + Schema schema0 = + Schema.newBuilder() + .physicalColumn("id", DataTypes.BIGINT().notNull()) + .physicalColumn("name", DataTypes.VARCHAR(100)) + .primaryKey("id") + .build(); + CreateTableEvent createEvent = new CreateTableEvent(tableId, schema0); + icebergMetadataApplier.applySchemaChange(createEvent); + writer0.write(createEvent, null); + writer1.write(createEvent, null); + + BinaryRecordDataGenerator gen0 = + new BinaryRecordDataGenerator( + schema0.getColumnDataTypes().toArray(new DataType[0])); + + // Batch 0: only subtask 0 has data before SC1. + writer0.write( + DataChangeEvent.insertEvent( + tableId, + gen0.generate(new Object[] {1L, BinaryStringData.fromString("a")})), + null); + // Subtask 1 has no data before SC1. + + // SC1 broadcast to both subtasks. + AddColumnEvent sc1 = + new AddColumnEvent( + tableId, + Arrays.asList( + AddColumnEvent.last( + new PhysicalColumn( + "extra1", DataTypes.STRING(), null, null)))); + icebergMetadataApplier.applySchemaChange(sc1); + writer0.write(sc1, null); // has writer → flush batchIndex=0; counter → 1 + writer1.write(sc1, null); // no writer → counter must still advance to 1 + + Schema schema1 = SchemaUtils.applySchemaChangeEvent(schema0, sc1); + BinaryRecordDataGenerator gen1 = + new BinaryRecordDataGenerator( + schema1.getColumnDataTypes().toArray(new DataType[0])); + + // Batch 1: only subtask 1 has data between SC1 and SC2. + writer1.write( + DataChangeEvent.insertEvent( + tableId, + gen1.generate(new Object[] {2L, BinaryStringData.fromString("b"), null})), + null); + // Subtask 0 has no data between SC1 and SC2. + + // SC2 broadcast to both subtasks. + AddColumnEvent sc2 = + new AddColumnEvent( + tableId, + Arrays.asList( + AddColumnEvent.last( + new PhysicalColumn( + "extra2", DataTypes.STRING(), null, null)))); + icebergMetadataApplier.applySchemaChange(sc2); + writer0.write(sc2, null); // no writer → counter must still advance to 2 + writer1.write(sc2, null); // has writer → flush batchIndex=1; counter → 2 + + Schema schema2 = SchemaUtils.applySchemaChangeEvent(schema1, sc2); + BinaryRecordDataGenerator gen2 = + new BinaryRecordDataGenerator( + schema2.getColumnDataTypes().toArray(new DataType[0])); + + // Batch 2: both subtasks update their respective rows after SC2. + // Subtask 0 updates id=1 "a" → "c"; subtask 1 updates id=2 "b" → "d". + writer0.write( + DataChangeEvent.updateEvent( + tableId, + gen2.generate( + new Object[] {1L, BinaryStringData.fromString("a"), null, null}), + gen2.generate( + new Object[] {1L, BinaryStringData.fromString("c"), null, null})), + null); + writer1.write( + DataChangeEvent.updateEvent( + tableId, + gen2.generate( + new Object[] {2L, BinaryStringData.fromString("b"), null, null}), + gen2.generate( + new Object[] {2L, BinaryStringData.fromString("d"), null, null})), + null); + + List allResults = new ArrayList<>(); + allResults.addAll(writer0.prepareCommit()); + allResults.addAll(writer1.prepareCommit()); + + // Expect 3 batches: {0: sub0}, {1: sub1}, {2: sub0+sub1} + long distinctBatchIndices = + allResults.stream().mapToInt(WriteResultWrapper::getBatchIndex).distinct().count(); + Assertions.assertThat(distinctBatchIndices).isEqualTo(3); + + IcebergCommitter committer = new IcebergCommitter(catalogOptions, new HashMap<>()); + committer.commit( + allResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList())); + + // Only the final values must survive. Equality-deletes in batch 2 (seq N+2) must suppress + // the stale inserts in batch 0 (seq N) and batch 1 (seq N+1). + List result = fetchTableContent(catalog, tableId, null); + Assertions.assertThat(result) + .containsExactlyInAnyOrder("1, c, null, null", "2, d, null, null"); + } + + private static long countSnapshots(Table table) { + long count = 0; + for (Snapshot ignored : table.snapshots()) { + count++; + } + return count; + } + /** Mock CommitRequestImpl. */ public static class MockCommitRequestImpl extends CommitRequestImpl {