Skip to content

Commit 7695122

Browse files
committed
[iceberg] Restore flush to clear factory cache for schema reload
flush(boolean) was a no-op to avoid splitting same-PK updates into multiple batches within a checkpoint. But without clearing the writer factory cache, new writes after a checkpoint could use a stale factory that doesn't reflect schema changes applied by IcebergMetadataApplier. Change flush to clear writerFactoryMap only. Writers themselves keep running so same-PK updates within a checkpoint go through the same writer and use position deletes (exact file+offset), which work correctly within a single snapshot without needing a separate seq_num. Schema-change flushes still go through flushTableWriter(tableId), which drains the writer and bumps batchIndex so equality-deletes land in a snapshot with a strictly higher sequence number than the data they target.
1 parent 2f0ed03 commit 7695122

2 files changed

Lines changed: 27 additions & 16 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -190,10 +190,13 @@ public void write(Event event, Context context) throws IOException {
190190
}
191191

192192
@Override
193-
public void flush(boolean flush) {
194-
// Flush may be called many times during one checkpoint by non-data events.
195-
// Avoid rotating all task writers here, which can split same-PK updates into multiple
196-
// batches within one checkpoint and break dedup semantics in downstream reads.
193+
public void flush(boolean flush) throws IOException {
194+
// Drop cached writer factories so the next write reloads them from the current catalog
195+
// schema. Writers themselves keep running — same-writer updates use position deletes
196+
// (exact file+offset), which work correctly within a single snapshot. Schema-change
197+
// splits are handled by flushTableWriter, which completes the writer and advances
198+
// batchIndex so equality-deletes land in a snapshot with a higher seq_num.
199+
writerFactoryMap.clear();
197200
}
198201

199202
private void flushTableWriter(TableId tableId) throws IOException {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterTest.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -649,12 +649,25 @@ public void testNoDuplicateWhenFlushSplitsSamePkUpdatesWithinCheckpoint() throws
649649
generator.generate(new Object[] {1L, BinaryStringData.fromString("b")});
650650

651651
icebergWriter.write(DataChangeEvent.insertEvent(tableId, recordA), null);
652-
// flush(false) is a no-op; both events reach the same writer so the position-delete
653-
// mechanism within the writer handles dedup correctly.
652+
// flush only clears the factory cache; the active writer keeps running. The update
653+
// therefore goes to the same writer as the insert, which uses a position delete
654+
// (exact file+offset) rather than an equality delete — no separate snapshot needed.
654655
icebergWriter.flush(false);
655656
icebergWriter.write(DataChangeEvent.updateEvent(tableId, recordA, recordB), null);
656657

657658
Collection<WriteResultWrapper> writeResults = icebergWriter.prepareCommit();
659+
660+
// flush only clears the factory cache — the writer keeps running. Both the insert and
661+
// the update go through the same writer (batchIndex 0), which uses position deletes
662+
// for the update. Position deletes reference the exact file+row offset, so they work
663+
// correctly within a single snapshot without needing a separate seq_num.
664+
List<Integer> batchIndexes =
665+
writeResults.stream()
666+
.map(WriteResultWrapper::getBatchIndex)
667+
.sorted()
668+
.collect(Collectors.toList());
669+
Assertions.assertThat(batchIndexes).containsExactly(0);
670+
658671
IcebergCommitter icebergCommitter = new IcebergCommitter(catalogOptions, new HashMap<>());
659672
Collection<Committer.CommitRequest<WriteResultWrapper>> collection =
660673
writeResults.stream().map(MockCommitRequestImpl::new).collect(Collectors.toList());
@@ -665,16 +678,11 @@ public void testNoDuplicateWhenFlushSplitsSamePkUpdatesWithinCheckpoint() throws
665678
}
666679

667680
/**
668-
* Verifies that same-PK updates split across a schema-change-triggered writer flush within one
669-
* checkpoint do not produce duplicate records.
670-
*
671-
* <p>Root cause: when a schema-change event forces {@code flushTableWriter}, the pre-change
672-
* writes land in one {@code WriteResultWrapper} (batch 0) and the post-change writes land in
673-
* another (batch 1). Committing both in a single Iceberg {@code RowDelta} would give all files
674-
* the same sequence number N, so the equality-delete file from batch 1 (seq=N) cannot delete
675-
* the data file from batch 0 (seq=N) — Iceberg equality-delete semantics require strictly lower
676-
* sequence numbers. The fix commits each batch as a separate sequential snapshot, giving batch
677-
* 1 a higher sequence number so its equality-delete correctly supersedes batch 0's data.
681+
* A schema change mid-checkpoint splits writes into two batches for the same table. Both
682+
* batches must not land in the same Iceberg snapshot: files in one snapshot share the same
683+
* seq_num, so the equality-delete from batch 1 would silently miss the insert from batch 0.
684+
* Each batch gets its own snapshot so the delete's seq_num is strictly higher than the data it
685+
* targets.
678686
*/
679687
@Test
680688
public void testNoDuplicateWhenSchemaChangeFlushSplitsSamePkUpdates() throws Exception {

0 commit comments

Comments
 (0)