Skip to content

Commit e11ddbd

Browse files
committed
[FLINK-37120][pipeline-connector/mysql] add ending split chunk first to avoid TaskManager oom
1 parent 39608ed commit e11ddbd

File tree

59 files changed

+595
-86
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+595
-86
lines changed

docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,17 @@ Flink SQL> SELECT * FROM orders;
389389
这是一项实验性功能。
390390
</td>
391391
</tr>
392+
<tr>
393+
<td>scan.incremental.snapshot.assign-ending-chunk-first.enabled</td>
394+
<td>optional</td>
395+
<td style="word-wrap: break-word;">false</td>
396+
<td>Boolean</td>
397+
<td>
398+
快照读取阶段是否先分配 EndingChunk。<br>
399+
这有助于降低 TaskManager 在快照阶段同步最后一个chunk时遇到内存溢出 (OOM) 的风险。<br>
400+
这是一项实验特性,默认为 false。
401+
</td>
402+
</tr>
392403
</tbody>
393404
</table>
394405
</div>

docs/content.zh/docs/connectors/pipeline-connectors/mysql.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,17 @@ pipeline:
312312
<td>Boolean</td>
313313
<td>是否将TINYINT(1)类型当做Boolean类型处理,默认true。</td>
314314
</tr>
315+
<tr>
316+
<td>scan.incremental.snapshot.assign-ending-chunk-first.enabled</td>
317+
<td>optional</td>
318+
<td style="word-wrap: break-word;">false</td>
319+
<td>Boolean</td>
320+
<td>
321+
快照读取阶段是否先分配 EndingChunk。<br>
322+
这有助于降低 TaskManager 在快照阶段同步最后一个chunk时遇到内存溢出 (OOM) 的风险。<br>
323+
这是一项实验特性,默认为 false。
324+
</td>
325+
</tr>
315326
</tbody>
316327
</table>
317328
</div>

docs/content/docs/connectors/flink-sources/mysql-cdc.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -415,6 +415,17 @@ During a snapshot operation, the connector will query each included table to pro
415415
When 'use.legacy.json.format' = 'false', the data would be converted to {"key1": "value1", "key2": "value2"}, with whitespace before values and after commas preserved.
416416
</td>
417417
</tr>
418+
<tr>
419+
<td>scan.incremental.snapshot.assign-ending-chunk-first.enabled</td>
420+
<td>optional</td>
421+
<td style="word-wrap: break-word;">false</td>
422+
<td>Boolean</td>
423+
<td>
424+
Whether to assign the ending chunk first during snapshot reading phase.<br>
425+
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br>
426+
Experimental option, defaults to false.
427+
</td>
428+
</tr>
418429
</tbody>
419430
</table>
420431
</div>

docs/content/docs/connectors/pipeline-connectors/mysql.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,17 @@ pipeline:
332332
When 'use.legacy.json.format' = 'false', the data would be converted to {"key1": "value1", "key2": "value2"}, with whitespace before values and after commas preserved.
333333
</td>
334334
</tr>
335+
<tr>
336+
<td>scan.incremental.snapshot.assign-ending-chunk-first.enabled</td>
337+
<td>optional</td>
338+
<td style="word-wrap: break-word;">false</td>
339+
<td>Boolean</td>
340+
<td>
341+
Whether to assign the ending chunk first during snapshot reading phase.<br>
342+
This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.<br>
343+
Experimental option, defaults to false.
344+
</td>
345+
</tr>
335346
</tbody>
336347
</table>
337348
</div>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
7777
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
7878
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
79+
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
7980
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
8081
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
8182
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
@@ -150,6 +151,8 @@ public DataSource createDataSource(Context context) {
150151
config.get(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
151152
boolean isParsingOnLineSchemaChanges = config.get(PARSE_ONLINE_SCHEMA_CHANGES);
152153
boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
154+
boolean isAssignEndingChunkFirst =
155+
config.get(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
153156

154157
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
155158
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -201,7 +204,8 @@ public DataSource createDataSource(Context context) {
201204
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
202205
.parseOnLineSchemaChanges(isParsingOnLineSchemaChanges)
203206
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean)
204-
.useLegacyJsonFormat(useLegacyJsonFormat);
207+
.useLegacyJsonFormat(useLegacyJsonFormat)
208+
.assignEndingChunkFirst(isAssignEndingChunkFirst);
205209

206210
List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
207211

@@ -337,6 +341,7 @@ public Set<ConfigOption<?>> optionalOptions() {
337341
options.add(USE_LEGACY_JSON_FORMAT);
338342
options.add(TREAT_TINYINT1_AS_BOOLEAN_ENABLED);
339343
options.add(PARSE_ONLINE_SCHEMA_CHANGES);
344+
options.add(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST);
340345
return options;
341346
}
342347

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -313,4 +313,12 @@ public class MySqlDataSourceOptions {
313313
.defaultValue(true)
314314
.withDescription(
315315
"Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format.");
316+
317+
@Experimental
318+
public static final ConfigOption<Boolean> SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST =
319+
ConfigOptions.key("scan.incremental.snapshot.assign-ending-chunk-first.enabled")
320+
.booleanType()
321+
.defaultValue(false)
322+
.withDescription(
323+
"Whether to assign the ending chunk first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false.");
316324
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
4242
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
4343
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
44+
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST;
4445
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
4546
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
4647
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
@@ -260,15 +261,17 @@ public void testOptionalOption() {
260261
// optional option
261262
options.put(TREAT_TINYINT1_AS_BOOLEAN_ENABLED.key(), "false");
262263
options.put(PARSE_ONLINE_SCHEMA_CHANGES.key(), "true");
264+
options.put(SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.key(), "true");
263265

264266
Factory.Context context = new MockContext(Configuration.fromMap(options));
265267
MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
266268
assertThat(factory.optionalOptions())
267-
.contains(TREAT_TINYINT1_AS_BOOLEAN_ENABLED, PARSE_ONLINE_SCHEMA_CHANGES);
269+
.contains(TREAT_TINYINT1_AS_BOOLEAN_ENABLED, PARSE_ONLINE_SCHEMA_CHANGES, SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
268270

269271
MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
270272
assertThat(dataSource.getSourceConfig().isTreatTinyInt1AsBoolean()).isFalse();
271273
assertThat(dataSource.getSourceConfig().isParseOnLineSchemaChanges()).isTrue();
274+
assertThat(dataSource.getSourceConfig().isAssignEndingChunkFirst()).isTrue();
272275
}
273276

274277
@Test

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/BaseSourceConfig.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public abstract class BaseSourceConfig implements SourceConfig {
3838
protected final boolean closeIdleReaders;
3939
protected final boolean skipSnapshotBackfill;
4040
protected final boolean isScanNewlyAddedTableEnabled;
41+
protected final boolean assignEndingChunkFirst;
4142

4243
// --------------------------------------------------------------------------------------------
4344
// Debezium Configurations
@@ -56,7 +57,8 @@ public BaseSourceConfig(
5657
boolean skipSnapshotBackfill,
5758
boolean isScanNewlyAddedTableEnabled,
5859
Properties dbzProperties,
59-
Configuration dbzConfiguration) {
60+
Configuration dbzConfiguration,
61+
boolean assignEndingChunkFirst) {
6062
this.startupOptions = startupOptions;
6163
this.splitSize = splitSize;
6264
this.splitMetaGroupSize = splitMetaGroupSize;
@@ -68,6 +70,7 @@ public BaseSourceConfig(
6870
this.isScanNewlyAddedTableEnabled = isScanNewlyAddedTableEnabled;
6971
this.dbzProperties = dbzProperties;
7072
this.dbzConfiguration = dbzConfiguration;
73+
this.assignEndingChunkFirst = assignEndingChunkFirst;
7174
}
7275

7376
@Override
@@ -115,4 +118,9 @@ public Configuration getDbzConfiguration() {
115118
public boolean isSkipSnapshotBackfill() {
116119
return skipSnapshotBackfill;
117120
}
121+
122+
@Override
123+
public boolean isAssignEndingChunkFirst() {
124+
return assignEndingChunkFirst;
125+
}
118126
}

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfig.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ public JdbcSourceConfig(
7373
int connectionPoolSize,
7474
String chunkKeyColumn,
7575
boolean skipSnapshotBackfill,
76-
boolean isScanNewlyAddedTableEnabled) {
76+
boolean isScanNewlyAddedTableEnabled,
77+
boolean assignEndingChunkFirst) {
7778
super(
7879
startupOptions,
7980
splitSize,
@@ -85,7 +86,8 @@ public JdbcSourceConfig(
8586
skipSnapshotBackfill,
8687
isScanNewlyAddedTableEnabled,
8788
dbzProperties,
88-
dbzConfiguration);
89+
dbzConfiguration,
90+
assignEndingChunkFirst);
8991
this.driverClassName = driverClassName;
9092
this.hostname = hostname;
9193
this.port = port;

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/config/JdbcSourceConfigFactory.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@ public abstract class JdbcSourceConfigFactory implements Factory<JdbcSourceConfi
6060
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue();
6161
protected boolean scanNewlyAddedTableEnabled =
6262
JdbcSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue();
63+
protected boolean assignEndingChunkFirst =
64+
JdbcSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ASSIGN_ENDING_CHUNK_FIRST.defaultValue();
6365

6466
/** Integer port number of the database server. */
6567
public JdbcSourceConfigFactory hostname(String hostname) {
@@ -252,6 +254,14 @@ public JdbcSourceConfigFactory scanNewlyAddedTableEnabled(boolean scanNewlyAdded
252254
return this;
253255
}
254256

257+
/**
258+
* Whether to assign the ending chunk first during snapshot reading phase. Defaults to false.
259+
*/
260+
public JdbcSourceConfigFactory assignEndingChunkFirst(boolean assignEndingChunkFirst) {
261+
this.assignEndingChunkFirst = assignEndingChunkFirst;
262+
return this;
263+
}
264+
255265
@Override
256266
public abstract JdbcSourceConfig create(int subtask);
257267
}

0 commit comments

Comments
 (0)