Skip to content

Commit ecc3a4b

Browse files
fcfangccCopilotlvyanquan
authored andcommitted
[FLINK-39056][pipeline-connector][iceberg] Fix Duplicate Data Issue in Iceberg Sink During Two-Phase Commit (apache#4269)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: lvyanquan <lvyanquan.lyq@alibaba-inc.com> Co-authored-by: Kunni <1365976815@qq.com>
1 parent ff07302 commit ecc3a4b

14 files changed

Lines changed: 506 additions & 22 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,25 +47,30 @@ public class IcebergDataSink implements DataSink, Serializable {
4747

4848
public final CompactionOptions compactionOptions;
4949

50+
public final String jobIdPrefix;
51+
5052
public IcebergDataSink(
5153
Map<String, String> catalogOptions,
5254
Map<String, String> tableOptions,
5355
Map<TableId, List<String>> partitionMaps,
5456
ZoneId zoneId,
5557
String schemaOperatorUid,
56-
CompactionOptions compactionOptions) {
58+
CompactionOptions compactionOptions,
59+
String jobIdPrefix) {
5760
this.catalogOptions = catalogOptions;
5861
this.tableOptions = tableOptions;
5962
this.partitionMaps = partitionMaps;
6063
this.zoneId = zoneId;
6164
this.schemaOperatorUid = schemaOperatorUid;
6265
this.compactionOptions = compactionOptions;
66+
this.jobIdPrefix = jobIdPrefix;
6367
}
6468

6569
@Override
6670
public EventSinkProvider getEventSinkProvider() {
6771
IcebergSink icebergEventSink =
68-
new IcebergSink(catalogOptions, tableOptions, zoneId, compactionOptions);
72+
new IcebergSink(
73+
catalogOptions, tableOptions, zoneId, compactionOptions, jobIdPrefix);
6974
return FlinkSinkProvider.of(icebergEventSink);
7075
}
7176

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,14 +106,17 @@ public DataSink createDataSink(Context context) {
106106
}
107107
}
108108
}
109+
String jobIdPrefix =
110+
context.getFactoryConfiguration().get(IcebergDataSinkOptions.SINK_JOB_ID_PREFIX);
109111

110112
return new IcebergDataSink(
111113
catalogOptions,
112114
tableOptions,
113115
partitionMaps,
114116
zoneId,
115117
schemaOperatorUid,
116-
compactionOptions);
118+
compactionOptions,
119+
jobIdPrefix);
117120
}
118121

119122
private CompactionOptions getCompactionStrategy(Configuration configuration) {
@@ -144,6 +147,7 @@ public Set<ConfigOption<?>> optionalOptions() {
144147
options.add(IcebergDataSinkOptions.SINK_COMPACTION_ENABLED);
145148
options.add(IcebergDataSinkOptions.SINK_COMPACTION_COMMIT_INTERVAL);
146149
options.add(IcebergDataSinkOptions.SINK_COMPACTION_PARALLELISM);
150+
options.add(IcebergDataSinkOptions.SINK_JOB_ID_PREFIX);
147151
return options;
148152
}
149153
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,12 @@ public class IcebergDataSinkOptions {
7878
.defaultValue(-1)
7979
.withDescription(
8080
"The parallelism for file compaction, default value is -1, which means that compaction parallelism is equal to sink writer parallelism.");
81+
82+
@Experimental
83+
public static final ConfigOption<String> SINK_JOB_ID_PREFIX =
84+
key("sink.job.id.prefix")
85+
.stringType()
86+
.defaultValue("cdc")
87+
.withDescription(
88+
"The prefix of job id, which is used to distinguish different jobs.");
8189
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,14 @@
2828
import org.apache.iceberg.DataFile;
2929
import org.apache.iceberg.DeleteFile;
3030
import org.apache.iceberg.RowDelta;
31+
import org.apache.iceberg.Snapshot;
32+
import org.apache.iceberg.SnapshotUpdate;
3133
import org.apache.iceberg.Table;
3234
import org.apache.iceberg.catalog.Catalog;
3335
import org.apache.iceberg.catalog.TableIdentifier;
36+
import org.apache.iceberg.flink.sink.SinkUtil;
3437
import org.apache.iceberg.io.WriteResult;
38+
import org.apache.iceberg.util.SnapshotUtil;
3539
import org.slf4j.Logger;
3640
import org.slf4j.LoggerFactory;
3741

@@ -44,6 +48,7 @@
4448
import java.util.Optional;
4549

4650
import static java.util.stream.Collectors.toList;
51+
import static org.apache.flink.runtime.checkpoint.CheckpointIDCounter.INITIAL_CHECKPOINT_ID;
4752

4853
/** A {@link Committer} for Apache Iceberg. */
4954
public class IcebergCommitter implements Committer<WriteResultWrapper> {
@@ -83,6 +88,14 @@ public void commit(Collection<CommitRequest<WriteResultWrapper>> collection) {
8388
}
8489

8590
private void commit(List<WriteResultWrapper> writeResultWrappers) {
91+
if (writeResultWrappers.isEmpty()) {
92+
return;
93+
}
94+
// all commits a same checkpoint-id
95+
long checkpointId = writeResultWrappers.get(0).getCheckpointId();
96+
String newFlinkJobId = writeResultWrappers.get(0).getJobId();
97+
String operatorId = writeResultWrappers.get(0).getOperatorId();
98+
8699
Map<TableId, List<WriteResult>> tableMap = new HashMap<>();
87100
for (WriteResultWrapper writeResultWrapper : writeResultWrappers) {
88101
List<WriteResult> writeResult =
@@ -93,11 +106,29 @@ private void commit(List<WriteResultWrapper> writeResultWrappers) {
93106
}
94107
for (Map.Entry<TableId, List<WriteResult>> entry : tableMap.entrySet()) {
95108
TableId tableId = entry.getKey();
96-
Optional<TableMetric> tableMetric = getTableMetric(tableId);
97-
tableMetric.ifPresent(TableMetric::increaseCommitTimes);
109+
98110
Table table =
99111
catalog.loadTable(
100112
TableIdentifier.of(tableId.getSchemaName(), tableId.getTableName()));
113+
114+
Snapshot snapshot = table.currentSnapshot();
115+
if (snapshot != null) {
116+
Iterable<Snapshot> ancestors =
117+
SnapshotUtil.ancestorsOf(snapshot.snapshotId(), table::snapshot);
118+
long lastCheckpointId =
119+
getMaxCommittedCheckpointId(ancestors, newFlinkJobId, operatorId);
120+
if (lastCheckpointId == checkpointId) {
121+
LOGGER.warn(
122+
"Checkpoint id {} has been committed to table {}, skipping",
123+
checkpointId,
124+
tableId.identifier());
125+
continue;
126+
}
127+
}
128+
129+
Optional<TableMetric> tableMetric = getTableMetric(tableId);
130+
tableMetric.ifPresent(TableMetric::increaseCommitTimes);
131+
101132
List<WriteResult> results = entry.getValue();
102133
List<DataFile> dataFiles =
103134
results.stream()
@@ -117,15 +148,47 @@ private void commit(List<WriteResultWrapper> writeResultWrappers) {
117148
if (deleteFiles.isEmpty()) {
118149
AppendFiles append = table.newAppend();
119150
dataFiles.forEach(append::appendFile);
120-
append.commit();
151+
commitOperation(append, newFlinkJobId, operatorId, checkpointId);
121152
} else {
122153
RowDelta delta = table.newRowDelta();
123154
dataFiles.forEach(delta::addRows);
124155
deleteFiles.forEach(delta::addDeletes);
125-
delta.commit();
156+
commitOperation(delta, newFlinkJobId, operatorId, checkpointId);
157+
}
158+
}
159+
}
160+
}
161+
162+
private static long getMaxCommittedCheckpointId(
163+
Iterable<Snapshot> ancestors, String flinkJobId, String operatorId) {
164+
long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID - 1;
165+
166+
for (Snapshot ancestor : ancestors) {
167+
Map<String, String> summary = ancestor.summary();
168+
String snapshotFlinkJobId = summary.get(SinkUtil.FLINK_JOB_ID);
169+
String snapshotOperatorId = summary.get(SinkUtil.OPERATOR_ID);
170+
if (flinkJobId.equals(snapshotFlinkJobId)
171+
&& (snapshotOperatorId == null || snapshotOperatorId.equals(operatorId))) {
172+
String value = summary.get(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID);
173+
if (value != null) {
174+
lastCommittedCheckpointId = Long.parseLong(value);
175+
break;
126176
}
127177
}
128178
}
179+
180+
return lastCommittedCheckpointId;
181+
}
182+
183+
private static void commitOperation(
184+
SnapshotUpdate<?> operation,
185+
String newFlinkJobId,
186+
String operatorId,
187+
long checkpointId) {
188+
operation.set(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
189+
operation.set(SinkUtil.FLINK_JOB_ID, newFlinkJobId);
190+
operation.set(SinkUtil.OPERATOR_ID, operatorId);
191+
operation.commit();
129192
}
130193

131194
private Optional<TableMetric> getTableMetric(TableId tableId) {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.flink.api.connector.sink2.CommitterInitContext;
2323
import org.apache.flink.api.connector.sink2.Sink;
2424
import org.apache.flink.api.connector.sink2.SinkWriter;
25+
import org.apache.flink.api.connector.sink2.StatefulSinkWriter;
26+
import org.apache.flink.api.connector.sink2.SupportsWriterState;
2527
import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink;
2628
import org.apache.flink.api.connector.sink2.WriterInitContext;
2729
import org.apache.flink.cdc.common.event.Event;
@@ -30,6 +32,7 @@
3032
import org.apache.flink.cdc.connectors.iceberg.sink.v2.compaction.CompactionOptions;
3133
import org.apache.flink.core.io.SimpleVersionedSerializer;
3234
import org.apache.flink.metrics.groups.SinkCommitterMetricGroup;
35+
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
3336
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
3437
import org.apache.flink.streaming.api.connector.sink2.CommittableMessageTypeInfo;
3538
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
@@ -39,16 +42,19 @@
3942
import org.apache.flink.streaming.api.datastream.DataStream;
4043

4144
import java.time.ZoneId;
45+
import java.util.Collection;
4246
import java.util.Map;
4347
import java.util.Objects;
48+
import java.util.UUID;
4449

4550
/** A {@link Sink} implementation for Apache Iceberg. */
4651
public class IcebergSink
4752
implements Sink<Event>,
4853
WithPreWriteTopology<Event>,
4954
WithPreCommitTopology<Event, WriteResultWrapper>,
5055
TwoPhaseCommittingSink<Event, WriteResultWrapper>,
51-
WithPostCommitTopology<Event, WriteResultWrapper> {
56+
WithPostCommitTopology<Event, WriteResultWrapper>,
57+
SupportsWriterState<Event, IcebergWriterState> {
5258

5359
protected final Map<String, String> catalogOptions;
5460
protected final Map<String, String> tableOptions;
@@ -57,15 +63,22 @@ public class IcebergSink
5763

5864
private final CompactionOptions compactionOptions;
5965

66+
private String jobId;
67+
68+
private String operatorId;
69+
6070
public IcebergSink(
6171
Map<String, String> catalogOptions,
6272
Map<String, String> tableOptions,
6373
ZoneId zoneId,
64-
CompactionOptions compactionOptions) {
74+
CompactionOptions compactionOptions,
75+
String jobIdPrefix) {
6576
this.catalogOptions = catalogOptions;
6677
this.tableOptions = tableOptions;
6778
this.zoneId = zoneId;
6879
this.compactionOptions = compactionOptions;
80+
this.jobId = jobIdPrefix + UUID.randomUUID();
81+
this.operatorId = UUID.randomUUID().toString();
6982
}
7083

7184
@Override
@@ -92,20 +105,60 @@ public SimpleVersionedSerializer<WriteResultWrapper> getCommittableSerializer()
92105

93106
@Override
94107
public SinkWriter<Event> createWriter(InitContext context) {
108+
long lastCheckpointId =
109+
context.getRestoredCheckpointId()
110+
.orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
95111
return new IcebergWriter(
96112
catalogOptions,
97113
context.getTaskInfo().getIndexOfThisSubtask(),
98114
context.getTaskInfo().getAttemptNumber(),
99-
zoneId);
115+
zoneId,
116+
lastCheckpointId,
117+
jobId,
118+
operatorId);
100119
}
101120

102121
@Override
103122
public SinkWriter<Event> createWriter(WriterInitContext context) {
123+
long lastCheckpointId =
124+
context.getRestoredCheckpointId()
125+
.orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
126+
return new IcebergWriter(
127+
catalogOptions,
128+
context.getTaskInfo().getIndexOfThisSubtask(),
129+
context.getTaskInfo().getAttemptNumber(),
130+
zoneId,
131+
lastCheckpointId,
132+
jobId,
133+
operatorId);
134+
}
135+
136+
@Override
137+
public StatefulSinkWriter<Event, IcebergWriterState> restoreWriter(
138+
WriterInitContext context, Collection<IcebergWriterState> writerStates) {
139+
// No need to read checkpointId from state
140+
long lastCheckpointId =
141+
context.getRestoredCheckpointId()
142+
.orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
143+
if (writerStates != null && !writerStates.isEmpty()) {
144+
IcebergWriterState icebergWriterState = writerStates.iterator().next();
145+
jobId = icebergWriterState.getJobId();
146+
operatorId = icebergWriterState.getOperatorId();
147+
}
148+
104149
return new IcebergWriter(
105150
catalogOptions,
106151
context.getTaskInfo().getIndexOfThisSubtask(),
107152
context.getTaskInfo().getAttemptNumber(),
108-
zoneId);
153+
zoneId,
154+
lastCheckpointId,
155+
jobId,
156+
operatorId);
157+
}
158+
159+
@Override
160+
public SimpleVersionedSerializer<IcebergWriterState> getWriterStateSerializer() {
161+
return new IcebergWriterStateSerializer();
109162
}
110163

111164
@Override

0 commit comments

Comments
 (0)