Skip to content

Commit 6f666f2

Browse files
committed
[iceberg] Fix duplicate records when schema change splits writes within a checkpoint
When a schema-change event arrives mid-checkpoint, the writer flushes the affected table before applying the new schema, producing two batches for the same table. Previously these were merged into one RowDelta and committed as a single Iceberg snapshot. Because Iceberg equality-delete files only suppress data with a strictly lower sequence number, same-snapshot deletes were ineffective and both versions of a row appeared on read. - flush(boolean) is now a no-op to prevent unrelated tables from being split into multiple batches on non-schema-change flushes - Schema-change events call flushTableWriter(tableId) to flush only the affected table; a per-table batchIndex increments on each flush - Each batch is committed as a separate Iceberg snapshot so equality-deletes in batch N have a strictly higher sequence number than data in batch M (M<N) - flink.batch-index and flink.checkpoint-id snapshot properties enable retry-safe idempotency: on failure, the committer resumes from the last uncommitted batch without re-committing already-persisted files Tests added for: same-PK dedup across batches, schema-change split correctness, retry after partial batch commit, multiple schema changes in one checkpoint, and multi-table isolation.
1 parent 209c0c6 commit 6f666f2

File tree

4 files changed

+668
-43
lines changed

4 files changed

+668
-43
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java

Lines changed: 124 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.ArrayList;
4343
import java.util.Arrays;
4444
import java.util.Collection;
45+
import java.util.Comparator;
4546
import java.util.HashMap;
4647
import java.util.List;
4748
import java.util.Map;
@@ -61,6 +62,12 @@ public class IcebergCommitter implements Committer<WriteResultWrapper> {
6162

6263
public static final String TABLE_GROUP_KEY = "table";
6364

65+
/** Snapshot summary key for the batch index; used to resume partial commits on retry. */
66+
static final String FLINK_BATCH_INDEX = "flink.batch-index";
67+
68+
/** Snapshot summary key for the checkpoint ID on intermediate batch commits. */
69+
static final String FLINK_CHECKPOINT_ID_PROP = "flink.checkpoint-id";
70+
6471
private final Catalog catalog;
6572

6673
private final SinkCommitterMetricGroup metricGroup;
@@ -91,74 +98,133 @@ private void commit(List<WriteResultWrapper> writeResultWrappers) {
9198
if (writeResultWrappers.isEmpty()) {
9299
return;
93100
}
94-
// all commits a same checkpoint-id
95101
long checkpointId = writeResultWrappers.get(0).getCheckpointId();
96102
String newFlinkJobId = writeResultWrappers.get(0).getJobId();
97103
String operatorId = writeResultWrappers.get(0).getOperatorId();
98104

99-
Map<TableId, List<WriteResult>> tableMap = new HashMap<>();
100-
for (WriteResultWrapper writeResultWrapper : writeResultWrappers) {
101-
List<WriteResult> writeResult =
102-
tableMap.getOrDefault(writeResultWrapper.getTableId(), new ArrayList<>());
103-
writeResult.add(writeResultWrapper.getWriteResult());
104-
tableMap.put(writeResultWrapper.getTableId(), writeResult);
105-
LOGGER.info(writeResultWrapper.buildDescription());
105+
Map<TableId, List<WriteResultWrapper>> tableMap = new HashMap<>();
106+
for (WriteResultWrapper w : writeResultWrappers) {
107+
tableMap.computeIfAbsent(w.getTableId(), k -> new ArrayList<>()).add(w);
108+
LOGGER.info(w.buildDescription());
106109
}
107-
for (Map.Entry<TableId, List<WriteResult>> entry : tableMap.entrySet()) {
110+
111+
for (Map.Entry<TableId, List<WriteResultWrapper>> entry : tableMap.entrySet()) {
108112
TableId tableId = entry.getKey();
109113

114+
// Sort ascending by batch index to guarantee correct Iceberg sequence number ordering.
115+
// Equality-delete files in batch N will have sequence number > batch M (M < N), so
116+
// they correctly supersede stale data written by earlier same-checkpoint batches.
117+
List<WriteResultWrapper> batches = entry.getValue();
118+
batches.sort(Comparator.comparingInt(WriteResultWrapper::getBatchIndex));
119+
110120
Table table =
111121
catalog.loadTable(
112122
TableIdentifier.of(tableId.getSchemaName(), tableId.getTableName()));
113123

124+
int startBatchIndex = 0;
114125
Snapshot snapshot = table.currentSnapshot();
115126
if (snapshot != null) {
116127
Iterable<Snapshot> ancestors =
117128
SnapshotUtil.ancestorsOf(snapshot.snapshotId(), table::snapshot);
118-
long lastCheckpointId =
129+
long lastCommittedCheckpointId =
119130
getMaxCommittedCheckpointId(ancestors, newFlinkJobId, operatorId);
120-
if (lastCheckpointId == checkpointId) {
131+
if (lastCommittedCheckpointId >= checkpointId) {
121132
LOGGER.warn(
122133
"Checkpoint id {} has been committed to table {}, skipping",
123134
checkpointId,
124135
tableId.identifier());
125136
continue;
126137
}
138+
ancestors = SnapshotUtil.ancestorsOf(snapshot.snapshotId(), table::snapshot);
139+
startBatchIndex =
140+
getLastCommittedBatchIndex(
141+
ancestors, newFlinkJobId, operatorId, checkpointId)
142+
+ 1;
127143
}
128144

129145
Optional<TableMetric> tableMetric = getTableMetric(tableId);
130146
tableMetric.ifPresent(TableMetric::increaseCommitTimes);
131147

132-
List<WriteResult> results = entry.getValue();
133-
List<DataFile> dataFiles =
134-
results.stream()
135-
.filter(payload -> payload.dataFiles() != null)
136-
.flatMap(payload -> Arrays.stream(payload.dataFiles()))
137-
.filter(dataFile -> dataFile.recordCount() > 0)
138-
.collect(toList());
139-
List<DeleteFile> deleteFiles =
140-
results.stream()
141-
.filter(payload -> payload.deleteFiles() != null)
142-
.flatMap(payload -> Arrays.stream(payload.deleteFiles()))
143-
.filter(deleteFile -> deleteFile.recordCount() > 0)
144-
.collect(toList());
145-
if (dataFiles.isEmpty() && deleteFiles.isEmpty()) {
146-
LOGGER.info(String.format("Nothing to commit to table %s, skipping", table.name()));
147-
} else {
148+
// Find the last non-empty batch so we know where to write MAX_COMMITTED_CHECKPOINT_ID.
149+
int lastNonEmptyBatchPos = -1;
150+
for (int i = batches.size() - 1; i >= startBatchIndex; i--) {
151+
if (!isBatchEmpty(batches.get(i))) {
152+
lastNonEmptyBatchPos = i;
153+
break;
154+
}
155+
}
156+
157+
// Commit each batch as a separate Iceberg snapshot to get distinct sequence numbers.
158+
for (int i = startBatchIndex; i < batches.size(); i++) {
159+
WriteResultWrapper batch = batches.get(i);
160+
List<DataFile> dataFiles = collectDataFiles(batch.getWriteResult());
161+
List<DeleteFile> deleteFiles = collectDeleteFiles(batch.getWriteResult());
162+
163+
if (dataFiles.isEmpty() && deleteFiles.isEmpty()) {
164+
LOGGER.info(
165+
"Batch {} for checkpoint {} of table {} has nothing to commit, skipping",
166+
batch.getBatchIndex(),
167+
checkpointId,
168+
tableId.identifier());
169+
continue;
170+
}
171+
172+
boolean isLastNonEmptyBatch = (i == lastNonEmptyBatchPos);
173+
174+
SnapshotUpdate<?> operation;
148175
if (deleteFiles.isEmpty()) {
149176
AppendFiles append = table.newAppend();
150177
dataFiles.forEach(append::appendFile);
151-
commitOperation(append, newFlinkJobId, operatorId, checkpointId);
178+
operation = append;
152179
} else {
153180
RowDelta delta = table.newRowDelta();
154181
dataFiles.forEach(delta::addRows);
155182
deleteFiles.forEach(delta::addDeletes);
156-
commitOperation(delta, newFlinkJobId, operatorId, checkpointId);
183+
operation = delta;
184+
}
185+
186+
operation.set(SinkUtil.FLINK_JOB_ID, newFlinkJobId);
187+
operation.set(SinkUtil.OPERATOR_ID, operatorId);
188+
operation.set(FLINK_BATCH_INDEX, String.valueOf(batch.getBatchIndex()));
189+
operation.set(FLINK_CHECKPOINT_ID_PROP, String.valueOf(checkpointId));
190+
if (isLastNonEmptyBatch) {
191+
operation.set(
192+
SinkUtil.MAX_COMMITTED_CHECKPOINT_ID, String.valueOf(checkpointId));
157193
}
194+
operation.commit();
158195
}
159196
}
160197
}
161198

199+
private static boolean isBatchEmpty(WriteResultWrapper batch) {
200+
WriteResult r = batch.getWriteResult();
201+
long dataCount =
202+
r.dataFiles() == null
203+
? 0
204+
: Arrays.stream(r.dataFiles()).filter(f -> f.recordCount() > 0).count();
205+
long deleteCount =
206+
r.deleteFiles() == null
207+
? 0
208+
: Arrays.stream(r.deleteFiles()).filter(f -> f.recordCount() > 0).count();
209+
return dataCount == 0 && deleteCount == 0;
210+
}
211+
212+
private static List<DataFile> collectDataFiles(WriteResult result) {
213+
if (result.dataFiles() == null) {
214+
return new ArrayList<>();
215+
}
216+
return Arrays.stream(result.dataFiles()).filter(f -> f.recordCount() > 0).collect(toList());
217+
}
218+
219+
private static List<DeleteFile> collectDeleteFiles(WriteResult result) {
220+
if (result.deleteFiles() == null) {
221+
return new ArrayList<>();
222+
}
223+
return Arrays.stream(result.deleteFiles())
224+
.filter(f -> f.recordCount() > 0)
225+
.collect(toList());
226+
}
227+
162228
private static long getMaxCommittedCheckpointId(
163229
Iterable<Snapshot> ancestors, String flinkJobId, String operatorId) {
164230
long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID - 1;
@@ -180,15 +246,35 @@ private static long getMaxCommittedCheckpointId(
180246
return lastCommittedCheckpointId;
181247
}
182248

183-
private static void commitOperation(
184-
SnapshotUpdate<?> operation,
185-
String newFlinkJobId,
186-
String operatorId,
187-
long checkpointId) {
188-
operation.set(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
189-
operation.set(SinkUtil.FLINK_JOB_ID, newFlinkJobId);
190-
operation.set(SinkUtil.OPERATOR_ID, operatorId);
191-
operation.commit();
249+
/**
250+
* Returns the highest batch index already committed for the given checkpoint, or -1 if none.
251+
* Used to skip already-persisted batches on retry.
252+
*/
253+
private static int getLastCommittedBatchIndex(
254+
Iterable<Snapshot> ancestors, String flinkJobId, String operatorId, long checkpointId) {
255+
for (Snapshot ancestor : ancestors) {
256+
Map<String, String> summary = ancestor.summary();
257+
if (!flinkJobId.equals(summary.get(SinkUtil.FLINK_JOB_ID))) {
258+
continue;
259+
}
260+
String snapshotOperatorId = summary.get(SinkUtil.OPERATOR_ID);
261+
if (snapshotOperatorId != null && !snapshotOperatorId.equals(operatorId)) {
262+
continue;
263+
}
264+
// Stop once we pass a fully-committed earlier checkpoint; intermediate batch
265+
// snapshots for the current checkpoint lie between it and the current tip.
266+
String maxCommittedStr = summary.get(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID);
267+
if (maxCommittedStr != null && Long.parseLong(maxCommittedStr) < checkpointId) {
268+
break;
269+
}
270+
String snapshotCheckpointId = summary.get(FLINK_CHECKPOINT_ID_PROP);
271+
if (snapshotCheckpointId != null
272+
&& Long.parseLong(snapshotCheckpointId) == checkpointId) {
273+
String batchIndexStr = summary.get(FLINK_BATCH_INDEX);
274+
return batchIndexStr != null ? Integer.parseInt(batchIndexStr) : 0;
275+
}
276+
}
277+
return -1;
192278
}
193279

194280
private Optional<TableMetric> getTableMetric(TableId tableId) {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ public class IcebergWriter
7171

7272
private final List<WriteResultWrapper> temporaryWriteResult;
7373

74+
/** Per-table batch index within the current checkpoint; incremented on each schema-change flush. */
75+
private Map<TableId, Integer> tableBatchIndexMap;
76+
7477
private Catalog catalog;
7578

7679
private final int taskId;
@@ -99,6 +102,7 @@ public IcebergWriter(
99102
writerFactoryMap = new HashMap<>();
100103
writerMap = new HashMap<>();
101104
schemaMap = new HashMap<>();
105+
tableBatchIndexMap = new HashMap<>();
102106
temporaryWriteResult = new ArrayList<>();
103107
this.taskId = taskId;
104108
this.attemptId = attemptId;
@@ -126,6 +130,7 @@ public Collection<WriteResultWrapper> prepareCommit() throws IOException {
126130
list.addAll(temporaryWriteResult);
127131
list.addAll(getWriteResult());
128132
temporaryWriteResult.clear();
133+
tableBatchIndexMap.clear();
129134
lastCheckpointId++;
130135
return list;
131136
}
@@ -163,6 +168,8 @@ public void write(Event event, Context context) throws IOException {
163168
} else {
164169
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
165170
TableId tableId = schemaChangeEvent.tableId();
171+
// Flush only this table before applying schema change to avoid global writer rotation.
172+
flushTableWriter(tableId);
166173
TableSchemaWrapper tableSchemaWrapper = schemaMap.get(tableId);
167174

168175
Schema newSchema =
@@ -176,21 +183,45 @@ public void write(Event event, Context context) throws IOException {
176183

177184
@Override
178185
public void flush(boolean flush) throws IOException {
179-
// Notice: flush method may be called many times during one checkpoint.
180-
temporaryWriteResult.addAll(getWriteResult());
186+
// Flush may be called many times during one checkpoint by non-data events.
187+
// Avoid rotating all task writers here, which can split same-PK updates into multiple
188+
// batches within one checkpoint and break dedup semantics in downstream reads.
189+
}
190+
191+
private void flushTableWriter(TableId tableId) throws IOException {
192+
TaskWriter<RowData> writer = writerMap.remove(tableId);
193+
if (writer == null) {
194+
return;
195+
}
196+
int batchIndex = tableBatchIndexMap.getOrDefault(tableId, 0);
197+
tableBatchIndexMap.put(tableId, batchIndex + 1);
198+
WriteResultWrapper writeResultWrapper =
199+
new WriteResultWrapper(
200+
writer.complete(),
201+
tableId,
202+
lastCheckpointId + 1,
203+
jobId,
204+
operatorId,
205+
batchIndex);
206+
temporaryWriteResult.add(writeResultWrapper);
207+
LOGGER.info(writeResultWrapper.buildDescription());
208+
writerFactoryMap.remove(tableId);
181209
}
182210

183211
private List<WriteResultWrapper> getWriteResult() throws IOException {
184212
long currentCheckpointId = lastCheckpointId + 1;
185213
List<WriteResultWrapper> writeResults = new ArrayList<>();
186214
for (Map.Entry<TableId, TaskWriter<RowData>> entry : writerMap.entrySet()) {
215+
TableId tableId = entry.getKey();
216+
int batchIndex = tableBatchIndexMap.getOrDefault(tableId, 0);
187217
WriteResultWrapper writeResultWrapper =
188218
new WriteResultWrapper(
189219
entry.getValue().complete(),
190-
entry.getKey(),
220+
tableId,
191221
currentCheckpointId,
192222
jobId,
193-
operatorId);
223+
operatorId,
224+
batchIndex);
194225
writeResults.add(writeResultWrapper);
195226
LOGGER.info(writeResultWrapper.buildDescription());
196227
}
@@ -222,6 +253,11 @@ public void close() throws Exception {
222253
writerFactoryMap = null;
223254
}
224255

256+
if (tableBatchIndexMap != null) {
257+
tableBatchIndexMap.clear();
258+
tableBatchIndexMap = null;
259+
}
260+
225261
catalog = null;
226262
}
227263
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/WriteResultWrapper.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,17 +40,31 @@ public class WriteResultWrapper implements Serializable {
4040

4141
private final String operatorId;
4242

43+
/** Batch index within the checkpoint for this table; increments on each schema-change flush. */
44+
private final int batchIndex;
45+
4346
public WriteResultWrapper(
4447
WriteResult writeResult,
4548
TableId tableId,
4649
long checkpointId,
4750
String jobId,
48-
String operatorId) {
51+
String operatorId,
52+
int batchIndex) {
4953
this.writeResult = writeResult;
5054
this.tableId = tableId;
5155
this.checkpointId = checkpointId;
5256
this.jobId = jobId;
5357
this.operatorId = operatorId;
58+
this.batchIndex = batchIndex;
59+
}
60+
61+
public WriteResultWrapper(
62+
WriteResult writeResult,
63+
TableId tableId,
64+
long checkpointId,
65+
String jobId,
66+
String operatorId) {
67+
this(writeResult, tableId, checkpointId, jobId, operatorId, 0);
5468
}
5569

5670
public WriteResult getWriteResult() {
@@ -73,6 +87,10 @@ public String getOperatorId() {
7387
return operatorId;
7488
}
7589

90+
public int getBatchIndex() {
91+
return batchIndex;
92+
}
93+
7694
/** Build a simple description for the write result. */
7795
public String buildDescription() {
7896
long addCount = 0;
@@ -95,6 +113,8 @@ public String buildDescription() {
95113
+ jobId
96114
+ ", OperatorId: "
97115
+ operatorId
116+
+ ", BatchIndex: "
117+
+ batchIndex
98118
+ ", AddCount: "
99119
+ addCount
100120
+ ", DeleteCount: "

0 commit comments

Comments
 (0)