Skip to content

Commit d631c6b

Browse files
committed
[FLINK-39478][mysql] Persist checkpointIdToFinish and expose enumerator metrics
1 parent 201aef6 commit d631c6b

8 files changed

Lines changed: 301 additions & 19 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,31 @@ public AssignerStatus getAssignerStatus() {
182182
return snapshotSplitAssigner.getAssignerStatus();
183183
}
184184

185+
@Override
186+
public int getRemainingSplitsCount() {
187+
return snapshotSplitAssigner.getRemainingSplitsCount();
188+
}
189+
190+
@Override
191+
public int getRemainingTablesCount() {
192+
return snapshotSplitAssigner.getRemainingTablesCount();
193+
}
194+
195+
@Override
196+
public int getAssignedSplitsCount() {
197+
return snapshotSplitAssigner.getAssignedSplitsCount();
198+
}
199+
200+
@Override
201+
public int getFinishedSplitsCount() {
202+
return snapshotSplitAssigner.getFinishedSplitsCount();
203+
}
204+
205+
@Override
206+
public int getAlreadyProcessedTablesCount() {
207+
return snapshotSplitAssigner.getAlreadyProcessedTablesCount();
208+
}
209+
185210
@Override
186211
public boolean noMoreSplits() {
187212
return snapshotSplitAssigner.noMoreSplits() && isBinlogSplitAssigned;

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java

Lines changed: 46 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ public MySqlSnapshotSplitAssigner(
127127
isTableIdCaseSensitive,
128128
true,
129129
ChunkSplitterState.NO_SPLITTING_TABLE_STATE,
130+
null,
130131
enumeratorContext);
131132
}
132133

@@ -148,6 +149,7 @@ public MySqlSnapshotSplitAssigner(
148149
checkpoint.isTableIdCaseSensitive(),
149150
checkpoint.isRemainingTablesCheckpointed(),
150151
checkpoint.getChunkSplitterState(),
152+
checkpoint.getCheckpointIdToFinish(),
151153
enumeratorContext);
152154
}
153155

@@ -164,6 +166,7 @@ private MySqlSnapshotSplitAssigner(
164166
boolean isTableIdCaseSensitive,
165167
boolean isRemainingTablesCheckpointed,
166168
ChunkSplitterState chunkSplitterState,
169+
@Nullable Long checkpointIdToFinish,
167170
SplitEnumeratorContext<MySqlSplit> enumeratorContext) {
168171
this.sourceConfig = sourceConfig;
169172
this.currentParallelism = currentParallelism;
@@ -181,6 +184,7 @@ private MySqlSnapshotSplitAssigner(
181184
this.partition =
182185
new MySqlPartition(sourceConfig.getMySqlConnectorConfig().getLogicalName());
183186
this.enumeratorContext = enumeratorContext;
187+
this.checkpointIdToFinish = checkpointIdToFinish;
184188
}
185189

186190
@Override
@@ -451,26 +455,28 @@ public void addSplits(Collection<MySqlSplit> splits) {
451455

452456
@Override
453457
public SnapshotPendingSplitsState snapshotState(long checkpointId) {
454-
SnapshotPendingSplitsState state =
455-
new SnapshotPendingSplitsState(
456-
alreadyProcessedTables,
457-
remainingSplits,
458-
assignedSplits,
459-
tableSchemas,
460-
splitFinishedOffsets,
461-
assignerStatus,
462-
remainingTables,
463-
isTableIdCaseSensitive,
464-
true,
465-
chunkSplitter.snapshotState(checkpointId));
466-
// we need a complete checkpoint before mark this assigner to be finished, to wait for
467-
// all records of snapshot splits are completely processed
458+
// We need a complete checkpoint before marking this assigner as finished, to wait for
459+
// all records of snapshot splits to be completely processed. Set checkpointIdToFinish
460+
// *before* building the state so it is included in this checkpoint — otherwise it would
461+
// be lost on restore and would need two more checkpoint cycles to be re-derived
462+
// (FLINK-39478).
468463
if (checkpointIdToFinish == null
469464
&& AssignerStatus.isAssigningSnapshotSplits(assignerStatus)
470465
&& allSnapshotSplitsFinished()) {
471466
checkpointIdToFinish = checkpointId;
472467
}
473-
return state;
468+
return new SnapshotPendingSplitsState(
469+
alreadyProcessedTables,
470+
remainingSplits,
471+
assignedSplits,
472+
tableSchemas,
473+
splitFinishedOffsets,
474+
assignerStatus,
475+
remainingTables,
476+
isTableIdCaseSensitive,
477+
true,
478+
chunkSplitter.snapshotState(checkpointId),
479+
checkpointIdToFinish);
474480
}
475481

476482
@Override
@@ -571,6 +577,31 @@ public Map<String, BinlogOffset> getSplitFinishedOffsets() {
571577
return splitFinishedOffsets;
572578
}
573579

580+
@Override
581+
public int getRemainingSplitsCount() {
582+
return remainingSplits.size();
583+
}
584+
585+
@Override
586+
public int getRemainingTablesCount() {
587+
return remainingTables.size();
588+
}
589+
590+
@Override
591+
public int getAssignedSplitsCount() {
592+
return assignedSplits.size();
593+
}
594+
595+
@Override
596+
public int getFinishedSplitsCount() {
597+
return splitFinishedOffsets.size();
598+
}
599+
600+
@Override
601+
public int getAlreadyProcessedTablesCount() {
602+
return alreadyProcessedTables.size();
603+
}
604+
574605
// -------------------------------------------------------------------------------------------
575606

576607
/**

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSplitAssigner.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,46 @@ public interface MySqlSplitAssigner extends Closeable {
106106
/** Gets the split assigner status, see {@code AssignerStatus}. */
107107
AssignerStatus getAssignerStatus();
108108

109+
/**
110+
* Returns the number of snapshot splits split but not yet handed out to readers. Used for
111+
* metrics; defaults to {@code 0} for assigners that don't track snapshot splits.
112+
*/
113+
default int getRemainingSplitsCount() {
114+
return 0;
115+
}
116+
117+
/**
118+
* Returns the number of tables yet to be split into snapshot chunks. Used for metrics; defaults
119+
* to {@code 0} for assigners that don't track snapshot splits.
120+
*/
121+
default int getRemainingTablesCount() {
122+
return 0;
123+
}
124+
125+
/**
126+
* Returns the number of snapshot splits that have been handed out to readers. Used for metrics;
127+
* defaults to {@code 0} for assigners that don't track snapshot splits.
128+
*/
129+
default int getAssignedSplitsCount() {
130+
return 0;
131+
}
132+
133+
/**
134+
* Returns the number of snapshot splits that readers have reported as finished. Used for
135+
* metrics; defaults to {@code 0} for assigners that don't track snapshot splits.
136+
*/
137+
default int getFinishedSplitsCount() {
138+
return 0;
139+
}
140+
141+
/**
142+
* Returns the number of tables that have been fully snapshotted at least once. Used for
143+
* metrics; defaults to {@code 0} for assigners that don't track snapshot splits.
144+
*/
145+
default int getAlreadyProcessedTablesCount() {
146+
return 0;
147+
}
148+
109149
/** Starts assign newly added tables. */
110150
void startAssignNewlyAddedTables();
111151

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/PendingSplitsStateSerializer.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,10 @@
4646
public class PendingSplitsStateSerializer implements SimpleVersionedSerializer<PendingSplitsState> {
4747

4848
// TODO: need proper implementation of the new version
49-
private static final int VERSION = 5;
49+
// Version 6 adds the optional checkpointIdToFinish field to SnapshotPendingSplitsState so the
50+
// assigner can transition out of *_ASSIGNING after restore without waiting two additional
51+
// checkpoint cycles (see FLINK-39478).
52+
private static final int VERSION = 6;
5053
private static final ThreadLocal<DataOutputSerializer> SERIALIZER_CACHE =
5154
ThreadLocal.withInitial(() -> new DataOutputSerializer(64));
5255

@@ -110,6 +113,7 @@ public PendingSplitsState deserialize(int version, byte[] serialized) throws IOE
110113
case 3:
111114
case 4:
112115
case 5:
116+
case 6:
113117
return deserializePendingSplitsState(version, serialized);
114118
default:
115119
throw new IOException("Unknown version: " + version);
@@ -176,6 +180,12 @@ private void serializeSnapshotPendingSplitsState(
176180
new Object[] {chunkSplitterState.getNextChunkStart().getValue()}));
177181
out.writeInt(chunkSplitterState.getNextChunkId());
178182
}
183+
184+
Long checkpointIdToFinish = state.getCheckpointIdToFinish();
185+
out.writeBoolean(checkpointIdToFinish != null);
186+
if (checkpointIdToFinish != null) {
187+
out.writeLong(checkpointIdToFinish);
188+
}
179189
}
180190

181191
private void serializeHybridPendingSplitsState(
@@ -299,6 +309,15 @@ private SnapshotPendingSplitsState deserializeSnapshotPendingSplitsState(
299309
nextChunkId = in.readInt();
300310
}
301311
}
312+
313+
Long checkpointIdToFinish = null;
314+
if (version >= 6) {
315+
boolean hasCheckpointIdToFinish = in.readBoolean();
316+
if (hasCheckpointIdToFinish) {
317+
checkpointIdToFinish = in.readLong();
318+
}
319+
}
320+
302321
return new SnapshotPendingSplitsState(
303322
alreadyProcessedTables,
304323
remainingSchemalessSplits,
@@ -314,7 +333,8 @@ private SnapshotPendingSplitsState deserializeSnapshotPendingSplitsState(
314333
: new ChunkSplitterState(
315334
splittingTableId,
316335
ChunkSplitterState.ChunkBound.middleOf(nextChunkStart),
317-
nextChunkId));
336+
nextChunkId),
337+
checkpointIdToFinish);
318338
}
319339

320340
private HybridPendingSplitsState deserializeHybridPendingSplitsState(

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/state/SnapshotPendingSplitsState.java

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
import io.debezium.relational.TableId;
2828
import io.debezium.relational.history.TableChanges.TableChange;
2929

30+
import javax.annotation.Nullable;
31+
3032
import java.util.ArrayList;
3133
import java.util.HashMap;
3234
import java.util.LinkedHashMap;
@@ -75,6 +77,14 @@ public class SnapshotPendingSplitsState extends PendingSplitsState {
7577
/** The data structure to record the state of a {@link ChunkSplitter}. */
7678
private final ChunkSplitterState chunkSplitterState;
7779

80+
/**
81+
* The checkpoint id that, once completed, allows the assigner to transition out of an {@code
82+
* *_ASSIGNING} status after all snapshot splits have finished. Persisted across restores so the
83+
* state machine can advance without waiting for two additional checkpoint cycles to re-derive
84+
* it. {@code null} when the assigner has not yet observed "all snapshot splits finished".
85+
*/
86+
@Nullable private final Long checkpointIdToFinish;
87+
7888
public SnapshotPendingSplitsState(
7989
List<TableId> alreadyProcessedTables,
8090
List<MySqlSchemalessSnapshotSplit> remainingSplits,
@@ -86,6 +96,32 @@ public SnapshotPendingSplitsState(
8696
boolean isTableIdCaseSensitive,
8797
boolean isRemainingTablesCheckpointed,
8898
ChunkSplitterState chunkSplitterState) {
99+
this(
100+
alreadyProcessedTables,
101+
remainingSplits,
102+
assignedSplits,
103+
tableSchemas,
104+
splitFinishedOffsets,
105+
assignerStatus,
106+
remainingTables,
107+
isTableIdCaseSensitive,
108+
isRemainingTablesCheckpointed,
109+
chunkSplitterState,
110+
null);
111+
}
112+
113+
public SnapshotPendingSplitsState(
114+
List<TableId> alreadyProcessedTables,
115+
List<MySqlSchemalessSnapshotSplit> remainingSplits,
116+
LinkedHashMap<String, MySqlSchemalessSnapshotSplit> assignedSplits,
117+
Map<TableId, TableChange> tableSchemas,
118+
Map<String, BinlogOffset> splitFinishedOffsets,
119+
AssignerStatus assignerStatus,
120+
List<TableId> remainingTables,
121+
boolean isTableIdCaseSensitive,
122+
boolean isRemainingTablesCheckpointed,
123+
ChunkSplitterState chunkSplitterState,
124+
@Nullable Long checkpointIdToFinish) {
89125
// FLINK-38061: make defensive copy to avoid potential concurrent modification of the
90126
// collections.
91127
this.alreadyProcessedTables = new ArrayList<>(alreadyProcessedTables);
@@ -98,6 +134,7 @@ public SnapshotPendingSplitsState(
98134
this.isRemainingTablesCheckpointed = isRemainingTablesCheckpointed;
99135
this.tableSchemas = new HashMap<>(tableSchemas);
100136
this.chunkSplitterState = chunkSplitterState;
137+
this.checkpointIdToFinish = checkpointIdToFinish;
101138
}
102139

103140
public List<TableId> getAlreadyProcessedTables() {
@@ -140,6 +177,11 @@ public ChunkSplitterState getChunkSplitterState() {
140177
return chunkSplitterState;
141178
}
142179

180+
@Nullable
181+
public Long getCheckpointIdToFinish() {
182+
return checkpointIdToFinish;
183+
}
184+
143185
@Override
144186
public boolean equals(Object o) {
145187
if (this == o) {
@@ -157,7 +199,8 @@ public boolean equals(Object o) {
157199
&& Objects.equals(remainingSplits, that.remainingSplits)
158200
&& Objects.equals(assignedSplits, that.assignedSplits)
159201
&& Objects.equals(splitFinishedOffsets, that.splitFinishedOffsets)
160-
&& Objects.equals(chunkSplitterState, that.chunkSplitterState);
202+
&& Objects.equals(chunkSplitterState, that.chunkSplitterState)
203+
&& Objects.equals(checkpointIdToFinish, that.checkpointIdToFinish);
161204
}
162205

163206
@Override
@@ -171,7 +214,8 @@ public int hashCode() {
171214
assignerStatus,
172215
isTableIdCaseSensitive,
173216
isRemainingTablesCheckpointed,
174-
chunkSplitterState);
217+
chunkSplitterState,
218+
checkpointIdToFinish);
175219
}
176220

177221
@Override
@@ -195,6 +239,8 @@ public String toString() {
195239
+ isRemainingTablesCheckpointed
196240
+ ", chunkSplitterState="
197241
+ chunkSplitterState
242+
+ ", checkpointIdToFinish="
243+
+ checkpointIdToFinish
198244
+ '}';
199245
}
200246
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/enumerator/MySqlSourceEnumerator.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.flink.cdc.connectors.mysql.source.events.FinishedSnapshotSplitsRequestEvent;
3737
import org.apache.flink.cdc.connectors.mysql.source.events.LatestFinishedSplitsNumberEvent;
3838
import org.apache.flink.cdc.connectors.mysql.source.events.LatestFinishedSplitsNumberRequestEvent;
39+
import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceEnumeratorMetrics;
3940
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
4041
import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
4142
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
@@ -99,6 +100,7 @@ public MySqlSourceEnumerator(
99100
@Override
100101
public void start() {
101102
splitAssigner.open();
103+
registerMetrics();
102104
requestBinlogSplitUpdateIfNeed();
103105
this.context.callAsync(
104106
this::getRegisteredReader,
@@ -107,6 +109,15 @@ public void start() {
107109
CHECK_EVENT_INTERVAL);
108110
}
109111

112+
private void registerMetrics() {
113+
try {
114+
new MySqlSourceEnumeratorMetrics(context.metricGroup(), splitAssigner);
115+
} catch (Throwable t) {
116+
// Defensive: never fail enumerator start because of metric registration.
117+
LOG.warn("Failed to register MySQL CDC enumerator metrics.", t);
118+
}
119+
}
120+
110121
@Override
111122
public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
112123
if (!context.registeredReaders().containsKey(subtaskId)) {

0 commit comments

Comments
 (0)