Skip to content

Commit f388320

Browse files
committed
Address comments.
1 parent 8e7245d commit f388320

14 files changed

Lines changed: 244 additions & 61 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSink.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,25 +47,30 @@ public class IcebergDataSink implements DataSink, Serializable {
4747

4848
public final CompactionOptions compactionOptions;
4949

50+
public final String jobIdPrefix;
51+
5052
public IcebergDataSink(
5153
Map<String, String> catalogOptions,
5254
Map<String, String> tableOptions,
5355
Map<TableId, List<String>> partitionMaps,
5456
ZoneId zoneId,
5557
String schemaOperatorUid,
56-
CompactionOptions compactionOptions) {
58+
CompactionOptions compactionOptions,
59+
String jobIdPrefix) {
5760
this.catalogOptions = catalogOptions;
5861
this.tableOptions = tableOptions;
5962
this.partitionMaps = partitionMaps;
6063
this.zoneId = zoneId;
6164
this.schemaOperatorUid = schemaOperatorUid;
6265
this.compactionOptions = compactionOptions;
66+
this.jobIdPrefix = jobIdPrefix;
6367
}
6468

6569
@Override
6670
public EventSinkProvider getEventSinkProvider() {
6771
IcebergSink icebergEventSink =
68-
new IcebergSink(catalogOptions, tableOptions, zoneId, compactionOptions);
72+
new IcebergSink(
73+
catalogOptions, tableOptions, zoneId, compactionOptions, jobIdPrefix);
6974
return FlinkSinkProvider.of(icebergEventSink);
7075
}
7176

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,14 +106,17 @@ public DataSink createDataSink(Context context) {
106106
}
107107
}
108108
}
109+
String jobIdPrefix =
110+
context.getFactoryConfiguration().get(IcebergDataSinkOptions.SINK_JOB_ID_PREFIX);
109111

110112
return new IcebergDataSink(
111113
catalogOptions,
112114
tableOptions,
113115
partitionMaps,
114116
zoneId,
115117
schemaOperatorUid,
116-
compactionOptions);
118+
compactionOptions,
119+
jobIdPrefix);
117120
}
118121

119122
private CompactionOptions getCompactionStrategy(Configuration configuration) {
@@ -144,6 +147,7 @@ public Set<ConfigOption<?>> optionalOptions() {
144147
options.add(IcebergDataSinkOptions.SINK_COMPACTION_ENABLED);
145148
options.add(IcebergDataSinkOptions.SINK_COMPACTION_COMMIT_INTERVAL);
146149
options.add(IcebergDataSinkOptions.SINK_COMPACTION_PARALLELISM);
150+
options.add(IcebergDataSinkOptions.SINK_JOB_ID_PREFIX);
147151
return options;
148152
}
149153
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,4 +78,12 @@ public class IcebergDataSinkOptions {
7878
.defaultValue(-1)
7979
.withDescription(
8080
"The parallelism for file compaction, default value is -1, which means that compaction parallelism is equal to sink writer parallelism.");
81+
82+
@Experimental
83+
public static final ConfigOption<String> SINK_JOB_ID_PREFIX =
84+
key("sink.job.id.prefix")
85+
.stringType()
86+
.defaultValue("cdc")
87+
.withDescription(
88+
"The prefix of job id, which is used to distinguish different jobs.");
8189
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergCommitter.java

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,13 @@
2929
import org.apache.iceberg.DeleteFile;
3030
import org.apache.iceberg.RowDelta;
3131
import org.apache.iceberg.Snapshot;
32+
import org.apache.iceberg.SnapshotUpdate;
3233
import org.apache.iceberg.Table;
3334
import org.apache.iceberg.catalog.Catalog;
3435
import org.apache.iceberg.catalog.TableIdentifier;
36+
import org.apache.iceberg.flink.sink.SinkUtil;
3537
import org.apache.iceberg.io.WriteResult;
38+
import org.apache.iceberg.util.SnapshotUtil;
3639
import org.slf4j.Logger;
3740
import org.slf4j.LoggerFactory;
3841

@@ -45,6 +48,7 @@
4548
import java.util.Optional;
4649

4750
import static java.util.stream.Collectors.toList;
51+
import static org.apache.flink.runtime.checkpoint.CheckpointIDCounter.INITIAL_CHECKPOINT_ID;
4852

4953
/** A {@link Committer} for Apache Iceberg. */
5054
public class IcebergCommitter implements Committer<WriteResultWrapper> {
@@ -57,8 +61,6 @@ public class IcebergCommitter implements Committer<WriteResultWrapper> {
5761

5862
public static final String TABLE_GROUP_KEY = "table";
5963

60-
public static final String CHECKPOINT_SUMMARY_NAME = "flink-cdc-checkpoint-id";
61-
6264
private final Catalog catalog;
6365

6466
private final SinkCommitterMetricGroup metricGroup;
@@ -91,6 +93,8 @@ private void commit(List<WriteResultWrapper> writeResultWrappers) {
9193
}
9294
// all commits a same checkpoint-id
9395
long checkpointId = writeResultWrappers.get(0).getCheckpointId();
96+
String newFlinkJobId = writeResultWrappers.get(0).getJobId();
97+
String operatorId = writeResultWrappers.get(0).getOperatorId();
9498

9599
Map<TableId, List<WriteResult>> tableMap = new HashMap<>();
96100
for (WriteResultWrapper writeResultWrapper : writeResultWrappers) {
@@ -109,9 +113,11 @@ private void commit(List<WriteResultWrapper> writeResultWrappers) {
109113

110114
Snapshot snapshot = table.currentSnapshot();
111115
if (snapshot != null) {
112-
String lastCheckpointId = snapshot.summary().get(CHECKPOINT_SUMMARY_NAME);
113-
if (lastCheckpointId != null
114-
&& lastCheckpointId.equals(Long.toString(checkpointId))) {
116+
Iterable<Snapshot> ancestors =
117+
SnapshotUtil.ancestorsOf(snapshot.snapshotId(), table::snapshot);
118+
long lastCheckpointId =
119+
getMaxCommittedCheckpointId(ancestors, newFlinkJobId, operatorId);
120+
if (lastCheckpointId == checkpointId) {
115121
LOGGER.warn(
116122
"Checkpoint id {} has been committed to table {}, skipping",
117123
checkpointId,
@@ -141,20 +147,50 @@ private void commit(List<WriteResultWrapper> writeResultWrappers) {
141147
} else {
142148
if (deleteFiles.isEmpty()) {
143149
AppendFiles append = table.newAppend();
144-
append.set(CHECKPOINT_SUMMARY_NAME, Long.toString(checkpointId));
145150
dataFiles.forEach(append::appendFile);
146-
append.commit();
151+
commitOperation(append, newFlinkJobId, operatorId, checkpointId);
147152
} else {
148153
RowDelta delta = table.newRowDelta();
149-
delta.set(CHECKPOINT_SUMMARY_NAME, Long.toString(checkpointId));
150154
dataFiles.forEach(delta::addRows);
151155
deleteFiles.forEach(delta::addDeletes);
152-
delta.commit();
156+
commitOperation(delta, newFlinkJobId, operatorId, checkpointId);
153157
}
154158
}
155159
}
156160
}
157161

162+
private static long getMaxCommittedCheckpointId(
163+
Iterable<Snapshot> ancestors, String flinkJobId, String operatorId) {
164+
long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
165+
166+
for (Snapshot ancestor : ancestors) {
167+
Map<String, String> summary = ancestor.summary();
168+
String snapshotFlinkJobId = summary.get(SinkUtil.FLINK_JOB_ID);
169+
String snapshotOperatorId = summary.get(SinkUtil.OPERATOR_ID);
170+
if (flinkJobId.equals(snapshotFlinkJobId)
171+
&& (snapshotOperatorId == null || snapshotOperatorId.equals(operatorId))) {
172+
String value = summary.get(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID);
173+
if (value != null) {
174+
lastCommittedCheckpointId = Long.parseLong(value);
175+
break;
176+
}
177+
}
178+
}
179+
180+
return lastCommittedCheckpointId;
181+
}
182+
183+
private static void commitOperation(
184+
SnapshotUpdate<?> operation,
185+
String newFlinkJobId,
186+
String operatorId,
187+
long checkpointId) {
188+
operation.set(SinkUtil.MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
189+
operation.set(SinkUtil.FLINK_JOB_ID, newFlinkJobId);
190+
operation.set(SinkUtil.OPERATOR_ID, operatorId);
191+
operation.commit();
192+
}
193+
158194
private Optional<TableMetric> getTableMetric(TableId tableId) {
159195
if (tableIdMetricMap.containsKey(tableId)) {
160196
return Optional.of(tableIdMetricMap.get(tableId));

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
import java.util.Collection;
4646
import java.util.Map;
4747
import java.util.Objects;
48+
import java.util.UUID;
4849

4950
/** A {@link Sink} implementation for Apache Iceberg. */
5051
public class IcebergSink
@@ -62,15 +63,22 @@ public class IcebergSink
6263

6364
private final CompactionOptions compactionOptions;
6465

66+
private String jobId;
67+
68+
private String operatorId;
69+
6570
public IcebergSink(
6671
Map<String, String> catalogOptions,
6772
Map<String, String> tableOptions,
6873
ZoneId zoneId,
69-
CompactionOptions compactionOptions) {
74+
CompactionOptions compactionOptions,
75+
String jobIdPrefix) {
7076
this.catalogOptions = catalogOptions;
7177
this.tableOptions = tableOptions;
7278
this.zoneId = zoneId;
7379
this.compactionOptions = compactionOptions;
80+
this.jobId = jobIdPrefix + UUID.randomUUID();
81+
this.operatorId = UUID.randomUUID().toString();
7482
}
7583

7684
@Override
@@ -105,7 +113,9 @@ public SinkWriter<Event> createWriter(InitContext context) {
105113
context.getTaskInfo().getIndexOfThisSubtask(),
106114
context.getTaskInfo().getAttemptNumber(),
107115
zoneId,
108-
lastCheckpointId);
116+
lastCheckpointId,
117+
jobId,
118+
operatorId);
109119
}
110120

111121
@Override
@@ -118,22 +128,32 @@ public SinkWriter<Event> createWriter(WriterInitContext context) {
118128
context.getTaskInfo().getIndexOfThisSubtask(),
119129
context.getTaskInfo().getAttemptNumber(),
120130
zoneId,
121-
lastCheckpointId);
131+
lastCheckpointId,
132+
jobId,
133+
operatorId);
122134
}
123135

124136
@Override
125137
public StatefulSinkWriter<Event, IcebergWriterState> restoreWriter(
126138
WriterInitContext context, Collection<IcebergWriterState> writerStates) {
127-
// No need to read checkpointId from state
139+
// No need to read checkpointId from state
128140
long lastCheckpointId =
129141
context.getRestoredCheckpointId()
130142
.orElse(CheckpointIDCounter.INITIAL_CHECKPOINT_ID - 1);
143+
if (writerStates != null && !writerStates.isEmpty()) {
144+
IcebergWriterState icebergWriterState = writerStates.iterator().next();
145+
jobId = icebergWriterState.getJobId();
146+
operatorId = icebergWriterState.getOperatorId();
147+
}
148+
131149
return new IcebergWriter(
132150
catalogOptions,
133151
context.getTaskInfo().getIndexOfThisSubtask(),
134152
context.getTaskInfo().getAttemptNumber(),
135153
zoneId,
136-
lastCheckpointId);
154+
lastCheckpointId,
155+
jobId,
156+
operatorId);
137157
}
138158

139159
@Override

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriter.java

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -81,12 +81,18 @@ public class IcebergWriter
8181

8282
private long lastCheckpointId;
8383

84+
private final String jobId;
85+
86+
private final String operatorId;
87+
8488
public IcebergWriter(
8589
Map<String, String> catalogOptions,
8690
int taskId,
8791
int attemptId,
8892
ZoneId zoneId,
89-
long lastCheckpointId) {
93+
long lastCheckpointId,
94+
String jobId,
95+
String operatorId) {
9096
catalog =
9197
CatalogUtil.buildIcebergCatalog(
9298
this.getClass().getSimpleName(), catalogOptions, new Configuration());
@@ -98,15 +104,24 @@ public IcebergWriter(
98104
this.attemptId = attemptId;
99105
this.zoneId = zoneId;
100106
this.lastCheckpointId = lastCheckpointId;
107+
this.jobId = jobId;
108+
this.operatorId = operatorId;
109+
LOGGER.info(
110+
"IcebergWriter created, taskId: {}, attemptId: {}, lastCheckpointId: {}, jobId: {}, operatorId: {}",
111+
taskId,
112+
attemptId,
113+
lastCheckpointId,
114+
jobId,
115+
operatorId);
101116
}
102117

103118
@Override
104-
public List<IcebergWriterState> snapshotState(long checkpointId) throws IOException {
105-
return Collections.singletonList(new IcebergWriterState(checkpointId));
119+
public List<IcebergWriterState> snapshotState(long checkpointId) {
120+
return Collections.singletonList(new IcebergWriterState(jobId, operatorId));
106121
}
107122

108123
@Override
109-
public Collection<WriteResultWrapper> prepareCommit() throws IOException, InterruptedException {
124+
public Collection<WriteResultWrapper> prepareCommit() throws IOException {
110125
List<WriteResultWrapper> list = new ArrayList<>();
111126
list.addAll(temporaryWriteResult);
112127
list.addAll(getWriteResult());
@@ -171,7 +186,11 @@ private List<WriteResultWrapper> getWriteResult() throws IOException {
171186
for (Map.Entry<TableId, TaskWriter<RowData>> entry : writerMap.entrySet()) {
172187
WriteResultWrapper writeResultWrapper =
173188
new WriteResultWrapper(
174-
entry.getValue().complete(), entry.getKey(), currentCheckpointId);
189+
entry.getValue().complete(),
190+
entry.getKey(),
191+
currentCheckpointId,
192+
jobId,
193+
operatorId);
175194
writeResults.add(writeResultWrapper);
176195
LOGGER.info(writeResultWrapper.buildDescription());
177196
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergWriterState.java

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,39 +17,56 @@
1717

1818
package org.apache.flink.cdc.connectors.iceberg.sink.v2;
1919

20-
import com.fasterxml.jackson.annotation.JsonCreator;
21-
import com.fasterxml.jackson.annotation.JsonProperty;
22-
import com.fasterxml.jackson.databind.ObjectMapper;
20+
import java.util.Objects;
2321

24-
import java.io.IOException;
25-
26-
/** A {@link IcebergWriterState} for Apache Iceberg. */
22+
/** The state of the {@link IcebergWriter}. */
2723
public class IcebergWriterState {
28-
public static final int VERSION = 0;
2924

30-
public static final ObjectMapper MAPPER = new ObjectMapper();
25+
// The job ID associated with this writer state
26+
private final String jobId;
3127

32-
private final Long checkpointId;
28+
// The operator ID associated with this writer state
29+
private final String operatorId;
3330

34-
@JsonCreator
35-
public IcebergWriterState(@JsonProperty("checkpointId") Long checkpointId) {
36-
this.checkpointId = checkpointId;
31+
public IcebergWriterState(String jobId, String operatorId) {
32+
this.jobId = jobId;
33+
this.operatorId = operatorId;
3734
}
3835

39-
public Long getCheckpointId() {
40-
return checkpointId;
36+
public String getJobId() {
37+
return jobId;
4138
}
4239

43-
public byte[] toBytes() throws IOException {
44-
return MAPPER.writeValueAsBytes(this);
40+
public String getOperatorId() {
41+
return operatorId;
4542
}
4643

47-
public static IcebergWriterState fromBytes(byte[] bytes) throws IOException {
48-
return MAPPER.readValue(bytes, IcebergWriterState.class);
44+
@Override
45+
public int hashCode() {
46+
return Objects.hash(jobId, operatorId);
47+
}
48+
49+
@Override
50+
public boolean equals(Object obj) {
51+
if (this == obj) {
52+
return true;
53+
}
54+
if (obj == null || getClass() != obj.getClass()) {
55+
return false;
56+
}
57+
IcebergWriterState that = (IcebergWriterState) obj;
58+
return Objects.equals(jobId, that.jobId) && Objects.equals(operatorId, that.operatorId);
4959
}
5060

5161
@Override
5262
public String toString() {
53-
return "IcebergWriterState{" + "checkpointId=" + checkpointId + '}';
63+
return "IcebergWriterState{"
64+
+ "jobId='"
65+
+ jobId
66+
+ '\''
67+
+ ", operatorId='"
68+
+ operatorId
69+
+ '\''
70+
+ '}';
5471
}
5572
}

0 commit comments

Comments
 (0)