Skip to content

Commit 725a8e4

Browse files
committed
[iceberg] Restore flush to clear factory cache for schema reload
flush(boolean) was a no-op to avoid splitting same-PK updates into multiple batches within a checkpoint. But without clearing the writer factory cache, new writes after a checkpoint could use a stale factory that doesn't reflect schema changes applied by IcebergMetadataApplier. Change flush to clear writerFactoryMap only. Writers themselves keep running so same-PK updates within a checkpoint go through the same writer and use position deletes (exact file+offset), which work correctly within a single snapshot without needing a separate seq_num. Schema-change flushes still go through flushTableWriter(tableId), which drains the writer and bumps batchIndex so equality-deletes land in a snapshot with a strictly higher sequence number than the data they target.
1 parent 2f0ed03 commit 725a8e4

2 files changed

Lines changed: 21 additions & 16 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -190,10 +190,10 @@ public void write(Event event, Context context) throws IOException {
190190
}
191191

192192
@Override
193-
public void flush(boolean flush) {
194-
// Flush may be called many times during one checkpoint by non-data events.
195-
// Avoid rotating all task writers here, which can split same-PK updates into multiple
196-
// batches within one checkpoint and break dedup semantics in downstream reads.
193+
public void flush(boolean flush) throws IOException {
194+
// Clear the factory cache so the next write picks up the latest catalog schema.
195+
// Writers keep running; schema-change splits are handled in flushTableWriter.
196+
writerFactoryMap.clear();
197197
}
198198

199199
private void flushTableWriter(TableId tableId) throws IOException {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -649,12 +649,22 @@ public void testNoDuplicateWhenFlushSplitsSamePkUpdatesWithinCheckpoint() throws
649649
generator.generate(new Object[] {1L, BinaryStringData.fromString("b")});
650650

651651
icebergWriter.write(DataChangeEvent.insertEvent(tableId, recordA), null);
652-
// flush(false) is a no-op; both events reach the same writer so the position-delete
653-
// mechanism within the writer handles dedup correctly.
652+
// flush resets the factory cache but keeps the writer running, so the update
653+
// lands in the same writer as the insert and uses a position delete.
654654
icebergWriter.flush(false);
655655
icebergWriter.write(DataChangeEvent.updateEvent(tableId, recordA, recordB), null);
656656

657657
Collection<WriteResultWrapper> writeResults = icebergWriter.prepareCommit();
658+
659+
// Both writes went through the same writer (batchIndex 0) since flush only
660+
// cleared the factory. Position delete handles dedup within the snapshot.
661+
List<Integer> batchIndexes =
662+
writeResults.stream()
663+
.map(WriteResultWrapper::getBatchIndex)
664+
.sorted()
665+
.collect(Collectors.toList());
666+
Assertions.assertThat(batchIndexes).containsExactly(0);
667+
658668
IcebergCommitter icebergCommitter = new IcebergCommitter(catalogOptions, new HashMap<>());
659669
Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
660670
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
@@ -665,16 +675,11 @@ public void testNoDuplicateWhenFlushSplitsSamePkUpdatesWithinCheckpoint() throws
665675
}
666676

667677
/**
668-
* Verifies that same-PK updates split across a schema-change-triggered writer flush within one
669-
* checkpoint do not produce duplicate records.
670-
*
671-
* <p>Root cause: when a schema-change event forces {@code flushTableWriter}, the pre-change
672-
* writes land in one {@code WriteResultWrapper} (batch 0) and the post-change writes land in
673-
* another (batch 1). Committing both in a single Iceberg {@code RowDelta} would give all files
674-
* the same sequence number N, so the equality-delete file from batch 1 (seq=N) cannot delete
675-
* the data file from batch 0 (seq=N) — Iceberg equality-delete semantics require strictly lower
676-
* sequence numbers. The fix commits each batch as a separate sequential snapshot, giving batch
677-
* 1 a higher sequence number so its equality-delete correctly supersedes batch 0's data.
678+
* A schema change mid-checkpoint splits writes into two batches for the same table. Both
679+
* batches must not land in the same Iceberg snapshot: files in one snapshot share the same
680+
* seq_num, so the equality-delete from batch 1 would silently miss the insert from batch 0.
681+
* Each batch gets its own snapshot so the delete's seq_num is strictly higher than the data it
682+
* targets.
678683
*/
679684
@Test
680685
public void testNoDuplicateWhenSchemaChangeFlushSplitsSamePkUpdates() throws Exception {

0 commit comments

Comments
 (0)