Skip to content

Commit 09134c3

Browse files
committed
[iceberg] Fix batchIndex sync and parallel subtask grouping in Iceberg sink
Address parallelism issues identified during review: - Writer: Advance tableBatchIndexMap before the writer == null guard so all subtasks stay in sync when a subtask has no data for the table at schema-change time - Writer: Skip flushTableWriter on initial CreateTableEvent since no data has been written yet and there is nothing to split - Committer: Group WriteResultWrappers by batchIndex using a TreeMap, so wrappers from different subtasks with the same batchIndex are merged into a single Iceberg snapshot instead of being committed separately Tests added: - testBatchIndexInSyncWhenSubtaskHasNoWriterAtSchemaChange - testNoDuplicateWithParallelSubtasksMissingPreSchemaChangeData - testSameBatchIndexFromTwoSubtasksMergedIntoOneSnapshot - testNoDuplicateWithMixedDataAcrossSubtasksAndMultipleSchemaChanges
1 parent 445564d commit 09134c3

3 files changed

Lines changed: 455 additions & 40 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java

Lines changed: 41 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,11 @@
4343
import java.util.ArrayList;
4444
import java.util.Arrays;
4545
import java.util.Collection;
46-
import java.util.Comparator;
4746
import java.util.HashMap;
4847
import java.util.List;
4948
import java.util.Map;
5049
import java.util.Optional;
50+
import java.util.TreeMap;
5151

5252
import static java.util.stream.Collectors.toList;
5353
import static org.apache.flink.runtime.checkpoint.CheckpointIDCounter.INITIAL_CHECKPOINT_ID;
@@ -110,17 +110,18 @@ private void commit(List<WriteResultWrapper> writeResultWrappers) {
110110
Map<TableId, List<WriteResultWrapper>> tableMap = new HashMap<>();
111111
for (WriteResultWrapper w : writeResultWrappers) {
112112
tableMap.computeIfAbsent(w.getTableId(), k -> new ArrayList<>()).add(w);
113-
LOGGER.info(w.buildDescription());
114113
}
115114

116115
for (Map.Entry<TableId, List<WriteResultWrapper>> entry : tableMap.entrySet()) {
117116
TableId tableId = entry.getKey();
118117

119-
// Sort ascending by batch index to guarantee correct Iceberg sequence number ordering.
120-
// Equality-delete files in batch N will have sequence number > batch M (M < N), so
121-
// they correctly supersede stale data written by earlier same-checkpoint batches.
122-
List<WriteResultWrapper> batches = entry.getValue();
123-
batches.sort(Comparator.comparingInt(WriteResultWrapper::getBatchIndex));
118+
// Group by batchIndex so wrappers from different subtasks for the same batch
119+
// are merged into one snapshot, not committed separately.
120+
TreeMap<Integer, List<WriteResultWrapper>> batchGroups = new TreeMap<>();
121+
for (WriteResultWrapper w : entry.getValue()) {
122+
batchGroups.computeIfAbsent(w.getBatchIndex(), k -> new ArrayList<>()).add(w);
123+
LOGGER.info(w.buildDescription());
124+
}
124125

125126
Table table =
126127
catalog.loadTable(
@@ -150,32 +151,39 @@ private void commit(List<WriteResultWrapper> writeResultWrappers) {
150151
Optional<TableMetric> tableMetric = getTableMetric(tableId);
151152
tableMetric.ifPresent(TableMetric::increaseCommitTimes);
152153

153-
// Find the last non-empty batch so we know where to write MAX_COMMITTED_CHECKPOINT_ID.
154-
int lastNonEmptyBatchPos = -1;
155-
for (int i = batches.size() - 1; i >= startBatchIndex; i--) {
156-
if (!isBatchEmpty(batches.get(i))) {
157-
lastNonEmptyBatchPos = i;
158-
break;
154+
int lastNonEmptyBatchIndex = -1;
155+
for (Map.Entry<Integer, List<WriteResultWrapper>> g : batchGroups.entrySet()) {
156+
List<DataFile> df = collectDataFilesFromGroup(g.getValue());
157+
List<DeleteFile> del = collectDeleteFilesFromGroup(g.getValue());
158+
if (!df.isEmpty() || !del.isEmpty()) {
159+
lastNonEmptyBatchIndex = g.getKey();
159160
}
160161
}
161162

162-
// Commit each batch as a separate Iceberg snapshot to get distinct sequence numbers.
163-
for (int i = startBatchIndex; i < batches.size(); i++) {
164-
WriteResultWrapper batch = batches.get(i);
165-
List<DataFile> dataFiles = collectDataFiles(batch.getWriteResult());
166-
List<DeleteFile> deleteFiles = collectDeleteFiles(batch.getWriteResult());
163+
// Commit each batch as a separate snapshot so sequence numbers increase per batch.
164+
for (Map.Entry<Integer, List<WriteResultWrapper>> g : batchGroups.entrySet()) {
165+
int batchIdx = g.getKey();
166+
if (batchIdx < startBatchIndex) {
167+
LOGGER.info(
168+
"Batch {} for checkpoint {} of table {} already committed, skipping",
169+
batchIdx,
170+
checkpointId,
171+
tableId.identifier());
172+
continue;
173+
}
174+
175+
List<DataFile> dataFiles = collectDataFilesFromGroup(g.getValue());
176+
List<DeleteFile> deleteFiles = collectDeleteFilesFromGroup(g.getValue());
167177

168178
if (dataFiles.isEmpty() && deleteFiles.isEmpty()) {
169179
LOGGER.info(
170180
"Batch {} for checkpoint {} of table {} has nothing to commit, skipping",
171-
batch.getBatchIndex(),
181+
batchIdx,
172182
checkpointId,
173183
tableId.identifier());
174184
continue;
175185
}
176186

177-
boolean isLastNonEmptyBatch = (i == lastNonEmptyBatchPos);
178-
179187
SnapshotUpdate<?> operation;
180188
if (deleteFiles.isEmpty()) {
181189
AppendFiles append = table.newAppend();
@@ -190,9 +198,9 @@ private void commit(List<WriteResultWrapper> writeResultWrappers) {
190198

191199
operation.set(SinkUtil.FLINK_JOB_ID, newFlinkJobId);
192200
operation.set(SinkUtil.OPERATOR_ID, operatorId);
193-
operation.set(FLINK_BATCH_INDEX, String.valueOf(batch.getBatchIndex()));
201+
operation.set(FLINK_BATCH_INDEX, String.valueOf(batchIdx));
194202
operation.set(FLINK_CHECKPOINT_ID_PROP, String.valueOf(checkpointId));
195-
if (isLastNonEmptyBatch) {
203+
if (batchIdx == lastNonEmptyBatchIndex) {
196204
operation.set(
197205
SinkUtil.MAX_COMMITTED_CHECKPOINT_ID, String.valueOf(checkpointId));
198206
}
@@ -201,17 +209,16 @@ private void commit(List<WriteResultWrapper> writeResultWrappers) {
201209
}
202210
}
203211

204-
private static boolean isBatchEmpty(WriteResultWrapper batch) {
205-
WriteResult r = batch.getWriteResult();
206-
long dataCount =
207-
r.dataFiles() == null
208-
? 0
209-
: Arrays.stream(r.dataFiles()).filter(f -> f.recordCount() > 0).count();
210-
long deleteCount =
211-
r.deleteFiles() == null
212-
? 0
213-
: Arrays.stream(r.deleteFiles()).filter(f -> f.recordCount() > 0).count();
214-
return dataCount == 0 && deleteCount == 0;
212+
private static List<DataFile> collectDataFilesFromGroup(List<WriteResultWrapper> group) {
213+
return group.stream()
214+
.flatMap(w -> collectDataFiles(w.getWriteResult()).stream())
215+
.collect(toList());
216+
}
217+
218+
private static List<DeleteFile> collectDeleteFilesFromGroup(List<WriteResultWrapper> group) {
219+
return group.stream()
220+
.flatMap(w -> collectDeleteFiles(w.getWriteResult()).stream())
221+
.collect(toList());
215222
}
216223

217224
private static List<DataFile> collectDataFiles(WriteResult result) {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public class IcebergWriter
7272

7373
private final List<WriteResultWrapper> temporaryWriteResult;
7474

75-
/** Per-table batch index within the current checkpoint; incremented on each schema-change flush. */
75+
/** Per-table batch index; incremented on each schema-change flush, even when no writer exists. */
7676
private Map<TableId, Integer> tableBatchIndexMap;
7777

7878
private Catalog catalog;
@@ -171,8 +171,11 @@ public void write(Event event, Context context) throws IOException {
171171
} else {
172172
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
173173
TableId tableId = schemaChangeEvent.tableId();
174-
// Flush only this table before applying schema change to avoid global writer rotation.
175-
flushTableWriter(tableId);
174+
// Flush only when the table is already known; skip on initial CreateTableEvent since
175+
// no data has been written yet and there is nothing to split.
176+
if (schemaMap.containsKey(tableId)) {
177+
flushTableWriter(tableId);
178+
}
176179
TableSchemaWrapper tableSchemaWrapper = schemaMap.get(tableId);
177180

178181
Schema newSchema =
@@ -185,19 +188,20 @@ public void write(Event event, Context context) throws IOException {
185188
}
186189

187190
@Override
188-
public void flush(boolean flush) throws IOException {
191+
public void flush(boolean flush) {
189192
// Flush may be called many times during one checkpoint by non-data events.
190193
// Avoid rotating all task writers here, which can split same-PK updates into multiple
191194
// batches within one checkpoint and break dedup semantics in downstream reads.
192195
}
193196

194197
private void flushTableWriter(TableId tableId) throws IOException {
195198
TaskWriter<RowData> writer = writerMap.remove(tableId);
199+
// Advance even when no writer exists, to keep batchIndex in sync across subtasks.
200+
int batchIndex = tableBatchIndexMap.getOrDefault(tableId, 0);
201+
tableBatchIndexMap.put(tableId, batchIndex + 1);
196202
if (writer == null) {
197203
return;
198204
}
199-
int batchIndex = tableBatchIndexMap.getOrDefault(tableId, 0);
200-
tableBatchIndexMap.put(tableId, batchIndex + 1);
201205
WriteResultWrapper writeResultWrapper =
202206
new WriteResultWrapper(
203207
writer.complete(),

0 commit comments

Comments
 (0)