Skip to content

Commit 4eba0f0

Browse files
committed
[iceberg] Fix batchIndex sync and parallel subtask grouping in Iceberg sink
Address parallelism issues identified during review: - Writer: Advance tableBatchIndexMap before the writer == null guard so all subtasks stay in sync when a subtask has no data for the table at schema-change time - Writer: Skip flushTableWriter on initial CreateTableEvent since no data has been written yet and there is nothing to split - Committer: Group WriteResultWrappers by batchIndex using a TreeMap, so wrappers from different subtasks with the same batchIndex are merged into a single Iceberg snapshot instead of being committed separately Tests added: - testBatchIndexInSyncWhenSubtaskHasNoWriterAtSchemaChange - testNoDuplicateWithParallelSubtasksMissingPreSchemaChangeData - testSameBatchIndexFromTwoSubtasksMergedIntoOneSnapshot
1 parent 6f666f2 commit 4eba0f0

File tree

3 files changed

+455
-40
lines changed

3 files changed

+455
-40
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java

Lines changed: 41 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,11 @@
4242
import java.util.ArrayList;
4343
import java.util.Arrays;
4444
import java.util.Collection;
45-
import java.util.Comparator;
4645
import java.util.HashMap;
4746
import java.util.List;
4847
import java.util.Map;
4948
import java.util.Optional;
49+
import java.util.TreeMap;
5050

5151
import static java.util.stream.Collectors.toList;
5252
import static org.apache.flink.runtime.checkpoint.CheckpointIDCounter.INITIAL_CHECKPOINT_ID;
@@ -105,17 +105,18 @@ private void commit(List<WriteResultWrapper> writeResultWrappers) {
105105
Map<TableId, List<WriteResultWrapper>> tableMap = new HashMap<>();
106106
for (WriteResultWrapper w : writeResultWrappers) {
107107
tableMap.computeIfAbsent(w.getTableId(), k -> new ArrayList<>()).add(w);
108-
LOGGER.info(w.buildDescription());
109108
}
110109

111110
for (Map.Entry<TableId, List<WriteResultWrapper>> entry : tableMap.entrySet()) {
112111
TableId tableId = entry.getKey();
113112

114-
// Sort ascending by batch index to guarantee correct Iceberg sequence number ordering.
115-
// Equality-delete files in batch N will have sequence number > batch M (M < N), so
116-
// they correctly supersede stale data written by earlier same-checkpoint batches.
117-
List<WriteResultWrapper> batches = entry.getValue();
118-
batches.sort(Comparator.comparingInt(WriteResultWrapper::getBatchIndex));
113+
// Group by batchIndex so wrappers from different subtasks for the same batch
114+
// are merged into one snapshot, not committed separately.
115+
TreeMap<Integer, List<WriteResultWrapper>> batchGroups = new TreeMap<>();
116+
for (WriteResultWrapper w : entry.getValue()) {
117+
batchGroups.computeIfAbsent(w.getBatchIndex(), k -> new ArrayList<>()).add(w);
118+
LOGGER.info(w.buildDescription());
119+
}
119120

120121
Table table =
121122
catalog.loadTable(
@@ -145,32 +146,39 @@ private void commit(List<WriteResultWrapper> writeResultWrappers) {
145146
Optional<TableMetric> tableMetric = getTableMetric(tableId);
146147
tableMetric.ifPresent(TableMetric::increaseCommitTimes);
147148

148-
// Find the last non-empty batch so we know where to write MAX_COMMITTED_CHECKPOINT_ID.
149-
int lastNonEmptyBatchPos = -1;
150-
for (int i = batches.size() - 1; i >= startBatchIndex; i--) {
151-
if (!isBatchEmpty(batches.get(i))) {
152-
lastNonEmptyBatchPos = i;
153-
break;
149+
int lastNonEmptyBatchIndex = -1;
150+
for (Map.Entry<Integer, List<WriteResultWrapper>> g : batchGroups.entrySet()) {
151+
List<DataFile> df = collectDataFilesFromGroup(g.getValue());
152+
List<DeleteFile> del = collectDeleteFilesFromGroup(g.getValue());
153+
if (!df.isEmpty() || !del.isEmpty()) {
154+
lastNonEmptyBatchIndex = g.getKey();
154155
}
155156
}
156157

157-
// Commit each batch as a separate Iceberg snapshot to get distinct sequence numbers.
158-
for (int i = startBatchIndex; i < batches.size(); i++) {
159-
WriteResultWrapper batch = batches.get(i);
160-
List<DataFile> dataFiles = collectDataFiles(batch.getWriteResult());
161-
List<DeleteFile> deleteFiles = collectDeleteFiles(batch.getWriteResult());
158+
// Commit each batch as a separate snapshot so sequence numbers increase per batch.
159+
for (Map.Entry<Integer, List<WriteResultWrapper>> g : batchGroups.entrySet()) {
160+
int batchIdx = g.getKey();
161+
if (batchIdx < startBatchIndex) {
162+
LOGGER.info(
163+
"Batch {} for checkpoint {} of table {} already committed, skipping",
164+
batchIdx,
165+
checkpointId,
166+
tableId.identifier());
167+
continue;
168+
}
169+
170+
List<DataFile> dataFiles = collectDataFilesFromGroup(g.getValue());
171+
List<DeleteFile> deleteFiles = collectDeleteFilesFromGroup(g.getValue());
162172

163173
if (dataFiles.isEmpty() && deleteFiles.isEmpty()) {
164174
LOGGER.info(
165175
"Batch {} for checkpoint {} of table {} has nothing to commit, skipping",
166-
batch.getBatchIndex(),
176+
batchIdx,
167177
checkpointId,
168178
tableId.identifier());
169179
continue;
170180
}
171181

172-
boolean isLastNonEmptyBatch = (i == lastNonEmptyBatchPos);
173-
174182
SnapshotUpdate<?> operation;
175183
if (deleteFiles.isEmpty()) {
176184
AppendFiles append = table.newAppend();
@@ -185,9 +193,9 @@ private void commit(List<WriteResultWrapper> writeResultWrappers) {
185193

186194
operation.set(SinkUtil.FLINK_JOB_ID, newFlinkJobId);
187195
operation.set(SinkUtil.OPERATOR_ID, operatorId);
188-
operation.set(FLINK_BATCH_INDEX, String.valueOf(batch.getBatchIndex()));
196+
operation.set(FLINK_BATCH_INDEX, String.valueOf(batchIdx));
189197
operation.set(FLINK_CHECKPOINT_ID_PROP, String.valueOf(checkpointId));
190-
if (isLastNonEmptyBatch) {
198+
if (batchIdx == lastNonEmptyBatchIndex) {
191199
operation.set(
192200
SinkUtil.MAX_COMMITTED_CHECKPOINT_ID, String.valueOf(checkpointId));
193201
}
@@ -196,17 +204,16 @@ private void commit(List<WriteResultWrapper> writeResultWrappers) {
196204
}
197205
}
198206

199-
private static boolean isBatchEmpty(WriteResultWrapper batch) {
200-
WriteResult r = batch.getWriteResult();
201-
long dataCount =
202-
r.dataFiles() == null
203-
? 0
204-
: Arrays.stream(r.dataFiles()).filter(f -> f.recordCount() > 0).count();
205-
long deleteCount =
206-
r.deleteFiles() == null
207-
? 0
208-
: Arrays.stream(r.deleteFiles()).filter(f -> f.recordCount() > 0).count();
209-
return dataCount == 0 && deleteCount == 0;
207+
private static List<DataFile> collectDataFilesFromGroup(List<WriteResultWrapper> group) {
208+
return group.stream()
209+
.flatMap(w -> collectDataFiles(w.getWriteResult()).stream())
210+
.collect(toList());
211+
}
212+
213+
private static List<DeleteFile> collectDeleteFilesFromGroup(List<WriteResultWrapper> group) {
214+
return group.stream()
215+
.flatMap(w -> collectDeleteFiles(w.getWriteResult()).stream())
216+
.collect(toList());
210217
}
211218

212219
private static List<DataFile> collectDataFiles(WriteResult result) {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public class IcebergWriter
7171

7272
private final List<WriteResultWrapper> temporaryWriteResult;
7373

74-
/** Per-table batch index within the current checkpoint; incremented on each schema-change flush. */
74+
/** Per-table batch index; incremented on each schema-change flush, even when no writer exists. */
7575
private Map<TableId, Integer> tableBatchIndexMap;
7676

7777
private Catalog catalog;
@@ -168,8 +168,11 @@ public void write(Event event, Context context) throws IOException {
168168
} else {
169169
SchemaChangeEvent schemaChangeEvent = (SchemaChangeEvent) event;
170170
TableId tableId = schemaChangeEvent.tableId();
171-
// Flush only this table before applying schema change to avoid global writer rotation.
172-
flushTableWriter(tableId);
171+
// Flush only when the table is already known; skip on initial CreateTableEvent since
172+
// no data has been written yet and there is nothing to split.
173+
if (schemaMap.containsKey(tableId)) {
174+
flushTableWriter(tableId);
175+
}
173176
TableSchemaWrapper tableSchemaWrapper = schemaMap.get(tableId);
174177

175178
Schema newSchema =
@@ -182,19 +185,20 @@ public void write(Event event, Context context) throws IOException {
182185
}
183186

184187
@Override
185-
public void flush(boolean flush) throws IOException {
188+
public void flush(boolean flush) {
186189
// Flush may be called many times during one checkpoint by non-data events.
187190
// Avoid rotating all task writers here, which can split same-PK updates into multiple
188191
// batches within one checkpoint and break dedup semantics in downstream reads.
189192
}
190193

191194
private void flushTableWriter(TableId tableId) throws IOException {
192195
TaskWriter<RowData> writer = writerMap.remove(tableId);
196+
// Advance even when no writer exists, to keep batchIndex in sync across subtasks.
197+
int batchIndex = tableBatchIndexMap.getOrDefault(tableId, 0);
198+
tableBatchIndexMap.put(tableId, batchIndex + 1);
193199
if (writer == null) {
194200
return;
195201
}
196-
int batchIndex = tableBatchIndexMap.getOrDefault(tableId, 0);
197-
tableBatchIndexMap.put(tableId, batchIndex + 1);
198202
WriteResultWrapper writeResultWrapper =
199203
new WriteResultWrapper(
200204
writer.complete(),

0 commit comments

Comments
 (0)