Skip to content

Commit a4cc11f

Browse files
beryllw王俊博
authored andcommitted
fix startAsynchronouslySplit method enumeratorMetrics NPE
1 parent 9997a0a commit a4cc11f

File tree

6 files changed

+46
-33
lines changed

6 files changed

+46
-33
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/HybridSplitAssigner.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ public HybridSplitAssigner(
8383
remainingTables,
8484
isTableIdCaseSensitive,
8585
dialect,
86-
offsetFactory),
86+
offsetFactory,
87+
enumeratorContext),
8788
false,
8889
sourceConfig.getSplitMetaGroupSize(),
8990
offsetFactory,
@@ -104,7 +105,8 @@ public HybridSplitAssigner(
104105
currentParallelism,
105106
checkpoint.getSnapshotPendingSplits(),
106107
dialect,
107-
offsetFactory),
108+
offsetFactory,
109+
enumeratorContext),
108110
checkpoint.isStreamSplitAssigned(),
109111
sourceConfig.getSplitMetaGroupSize(),
110112
offsetFactory,
@@ -128,7 +130,7 @@ private HybridSplitAssigner(
128130

129131
@Override
130132
public void open() {
131-
this.enumeratorMetrics = new SourceEnumeratorMetrics(enumeratorContext.metricGroup());
133+
this.enumeratorMetrics = snapshotSplitAssigner.getEnumeratorMetrics();
132134

133135
if (isStreamSplitAssigned) {
134136
enumeratorMetrics.enterStreamReading();
@@ -137,8 +139,6 @@ public void open() {
137139
}
138140

139141
snapshotSplitAssigner.open();
140-
// init enumerator metrics
141-
snapshotSplitAssigner.initEnumeratorMetrics(enumeratorMetrics);
142142
}
143143

144144
@Override

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/assigner/SnapshotSplitAssigner.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.flink.cdc.connectors.base.source.assigner;
1919

20+
import org.apache.flink.api.connector.source.SourceSplit;
21+
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
2022
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
2123
import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
2224
import org.apache.flink.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
@@ -104,7 +106,8 @@ public SnapshotSplitAssigner(
104106
List<TableId> remainingTables,
105107
boolean isTableIdCaseSensitive,
106108
DataSourceDialect<C> dialect,
107-
OffsetFactory offsetFactory) {
109+
OffsetFactory offsetFactory,
110+
SplitEnumeratorContext<? extends SourceSplit> enumeratorContext) {
108111
this(
109112
sourceConfig,
110113
currentParallelism,
@@ -120,15 +123,17 @@ public SnapshotSplitAssigner(
120123
dialect,
121124
offsetFactory,
122125
new ConcurrentHashMap<>(),
123-
ChunkSplitterState.NO_SPLITTING_TABLE_STATE);
126+
ChunkSplitterState.NO_SPLITTING_TABLE_STATE,
127+
enumeratorContext);
124128
}
125129

126130
public SnapshotSplitAssigner(
127131
C sourceConfig,
128132
int currentParallelism,
129133
SnapshotPendingSplitsState checkpoint,
130134
DataSourceDialect<C> dialect,
131-
OffsetFactory offsetFactory) {
135+
OffsetFactory offsetFactory,
136+
SplitEnumeratorContext<? extends SourceSplit> enumeratorContext) {
132137
this(
133138
sourceConfig,
134139
currentParallelism,
@@ -144,7 +149,8 @@ public SnapshotSplitAssigner(
144149
dialect,
145150
offsetFactory,
146151
new ConcurrentHashMap<>(),
147-
checkpoint.getChunkSplitterState());
152+
checkpoint.getChunkSplitterState(),
153+
enumeratorContext);
148154
}
149155

150156
private SnapshotSplitAssigner(
@@ -162,7 +168,8 @@ private SnapshotSplitAssigner(
162168
DataSourceDialect<C> dialect,
163169
OffsetFactory offsetFactory,
164170
Map<String, Long> splitFinishedCheckpointIds,
165-
ChunkSplitterState chunkSplitterState) {
171+
ChunkSplitterState chunkSplitterState,
172+
SplitEnumeratorContext<? extends SourceSplit> enumeratorContext) {
166173
this.sourceConfig = sourceConfig;
167174
this.currentParallelism = currentParallelism;
168175
this.alreadyProcessedTables = alreadyProcessedTables;
@@ -188,13 +195,19 @@ private SnapshotSplitAssigner(
188195
this.offsetFactory = offsetFactory;
189196
this.splitFinishedCheckpointIds = splitFinishedCheckpointIds;
190197
chunkSplitter = createChunkSplitter(sourceConfig, dialect, chunkSplitterState);
198+
this.enumeratorMetrics = new SourceEnumeratorMetrics(enumeratorContext.metricGroup());
199+
}
200+
201+
public SourceEnumeratorMetrics getEnumeratorMetrics() {
202+
return enumeratorMetrics;
191203
}
192204

193205
@Override
194206
public void open() {
195207
chunkSplitter.open();
196208
discoveryCaptureTables();
197209
captureNewlyAddedTables();
210+
initEnumeratorMetrics();
198211
startAsynchronouslySplit();
199212
}
200213

@@ -295,10 +308,11 @@ private void captureNewlyAddedTables() {
295308
}
296309
}
297310

298-
/** This should be invoked after this class's open method. */
299-
public void initEnumeratorMetrics(SourceEnumeratorMetrics enumeratorMetrics) {
300-
this.enumeratorMetrics = enumeratorMetrics;
301-
311+
/**
312+
* This should be invoked before this class's startAsynchronouslySplit method to avoid
313+
* enumeratorMetrics NPE.
314+
*/
315+
private void initEnumeratorMetrics() {
302316
this.enumeratorMetrics.enterSnapshotPhase();
303317
this.enumeratorMetrics.registerMetrics(
304318
alreadyProcessedTables::size, assignedSplits::size, remainingSplits::size);

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlHybridSplitAssigner.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ private MySqlHybridSplitAssigner(
111111

112112
@Override
113113
public void open() {
114-
this.enumeratorMetrics = new SourceEnumeratorMetrics(enumeratorContext.metricGroup());
114+
this.enumeratorMetrics = snapshotSplitAssigner.getEnumeratorMetrics();
115115

116116
if (isBinlogSplitAssigned) {
117117
enumeratorMetrics.enterStreamReading();
@@ -120,8 +120,6 @@ public void open() {
120120
}
121121

122122
snapshotSplitAssigner.open();
123-
// init enumerator metrics
124-
snapshotSplitAssigner.initEnumeratorMetrics(enumeratorMetrics);
125123
}
126124

127125
@Override

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,11 @@ private MySqlSnapshotSplitAssigner(
187187
new MySqlPartition(sourceConfig.getMySqlConnectorConfig().getLogicalName());
188188
this.enumeratorContext = enumeratorContext;
189189
this.splitFinishedCheckpointIds = splitFinishedCheckpointIds;
190+
this.enumeratorMetrics = new SourceEnumeratorMetrics(enumeratorContext.metricGroup());
191+
}
192+
193+
public SourceEnumeratorMetrics getEnumeratorMetrics() {
194+
return enumeratorMetrics;
190195
}
191196

192197
@Override
@@ -195,6 +200,7 @@ public void open() {
195200
chunkSplitter.open();
196201
discoveryCaptureTables();
197202
captureNewlyAddedTables();
203+
initEnumeratorMetrics();
198204
startAsynchronouslySplit();
199205
}
200206

@@ -302,10 +308,11 @@ private void captureNewlyAddedTables() {
302308
}
303309
}
304310

305-
/** This should be invoked after this class's open method. */
306-
public void initEnumeratorMetrics(SourceEnumeratorMetrics enumeratorMetrics) {
307-
this.enumeratorMetrics = enumeratorMetrics;
308-
311+
/**
312+
* This should be invoked before this class's startAsynchronouslySplit method to avoid
313+
* enumeratorMetrics NPE.
314+
*/
315+
private void initEnumeratorMetrics() {
309316
this.enumeratorMetrics.enterSnapshotPhase();
310317
this.enumeratorMetrics.registerMetrics(
311318
alreadyProcessedTables::size, assignedSplits::size, remainingSplits::size);

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/fetch/PostgresScanFetchTaskTest.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717

1818
package org.apache.flink.cdc.connectors.postgres.source.fetch;
1919

20+
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
2021
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
2122
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
2223
import org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner;
2324
import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
2425
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
2526
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
2627
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
27-
import org.apache.flink.cdc.connectors.base.source.metrics.SourceEnumeratorMetrics;
2828
import org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask;
2929
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
3030
import org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher;
@@ -37,7 +37,6 @@
3737
import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory;
3838
import org.apache.flink.cdc.connectors.postgres.testutils.RecordsFormatter;
3939
import org.apache.flink.cdc.connectors.postgres.testutils.UniqueDatabase;
40-
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
4140
import org.apache.flink.table.api.DataTypes;
4241
import org.apache.flink.table.types.DataType;
4342

@@ -329,10 +328,8 @@ private List<SnapshotSplit> getSnapshotSplits(
329328
discoverTables,
330329
sourceDialect.isDataCollectionIdCaseSensitive(sourceConfig),
331330
sourceDialect,
332-
offsetFactory);
333-
snapshotSplitAssigner.initEnumeratorMetrics(
334-
new SourceEnumeratorMetrics(
335-
UnregisteredMetricsGroup.createSplitEnumeratorMetricGroup()));
331+
offsetFactory,
332+
new MockSplitEnumeratorContext<>(1));
336333
snapshotSplitAssigner.open();
337334
List<SnapshotSplit> snapshotSplitList = new ArrayList<>();
338335
Optional<SourceSplitBase> split = snapshotSplitAssigner.getNext();

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-sqlserver-cdc/src/test/java/org/apache/flink/cdc/connectors/sqlserver/source/read/fetch/SqlServerScanFetchTaskTest.java

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,14 @@
1717

1818
package org.apache.flink.cdc.connectors.sqlserver.source.read.fetch;
1919

20+
import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
2021
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
2122
import org.apache.flink.cdc.connectors.base.dialect.JdbcDataSourceDialect;
2223
import org.apache.flink.cdc.connectors.base.source.assigner.SnapshotSplitAssigner;
2324
import org.apache.flink.cdc.connectors.base.source.meta.offset.OffsetFactory;
2425
import org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit;
2526
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceRecords;
2627
import org.apache.flink.cdc.connectors.base.source.meta.split.SourceSplitBase;
27-
import org.apache.flink.cdc.connectors.base.source.metrics.SourceEnumeratorMetrics;
2828
import org.apache.flink.cdc.connectors.base.source.reader.external.AbstractScanFetchTask;
2929
import org.apache.flink.cdc.connectors.base.source.reader.external.FetchTask;
3030
import org.apache.flink.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher;
@@ -37,7 +37,6 @@
3737
import org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch.SqlServerScanFetchTask;
3838
import org.apache.flink.cdc.connectors.sqlserver.source.reader.fetch.SqlServerSourceFetchTaskContext;
3939
import org.apache.flink.cdc.connectors.sqlserver.testutils.RecordsFormatter;
40-
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
4140
import org.apache.flink.table.api.DataTypes;
4241
import org.apache.flink.table.types.DataType;
4342
import org.apache.flink.table.types.logical.RowType;
@@ -333,10 +332,8 @@ private List<SnapshotSplit> getSnapshotSplits(
333332
discoverTables,
334333
sourceDialect.isDataCollectionIdCaseSensitive(sourceConfig),
335334
sourceDialect,
336-
offsetFactory);
337-
snapshotSplitAssigner.initEnumeratorMetrics(
338-
new SourceEnumeratorMetrics(
339-
UnregisteredMetricsGroup.createSplitEnumeratorMetricGroup()));
335+
offsetFactory,
336+
new MockSplitEnumeratorContext<>(1));
340337
snapshotSplitAssigner.open();
341338
List<SnapshotSplit> snapshotSplitList = new ArrayList<>();
342339
Optional<SourceSplitBase> split = snapshotSplitAssigner.getNext();

0 commit comments

Comments
 (0)