Skip to content

Commit 5658fd8

Browse files
author
guoxuanlin
committed
[mysql] support binlog deserialize with parallel threads
1 parent ad7b368 commit 5658fd8

15 files changed

Lines changed: 1081 additions & 18 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@
7777
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
7878
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
7979
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
80+
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_DESERIALIZE_PARALLELISM;
8081
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
8182
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
8283
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
@@ -170,6 +171,7 @@ public DataSource createDataSource(Context context) {
170171
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
171172
boolean onlyDeserializeCapturedTablesChangelog =
172173
config.get(SCAN_ONLY_DESERIALIZE_CAPTURED_TABLES_CHANGELOG_ENABLED);
174+
int deserializeParallelism = config.get(SCAN_INCREMENTAL_DESERIALIZE_PARALLELISM);
173175

174176
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
175177
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -225,7 +227,8 @@ public DataSource createDataSource(Context context) {
225227
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
226228
.skipSnapshotBackfill(skipSnapshotBackfill)
227229
.onlyDeserializeCapturedTablesChangelog(
228-
onlyDeserializeCapturedTablesChangelog);
230+
onlyDeserializeCapturedTablesChangelog)
231+
.deserializeParallelism(deserializeParallelism);
229232

230233
List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
231234

@@ -364,6 +367,7 @@ public Set<ConfigOption<?>> optionalOptions() {
364367
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
365368
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
366369
options.add(SCAN_ONLY_DESERIALIZE_CAPTURED_TABLES_CHANGELOG_ENABLED);
370+
options.add(SCAN_INCREMENTAL_DESERIALIZE_PARALLELISM);
367371
return options;
368372
}
369373

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,4 +341,15 @@ public class MySqlDataSourceOptions {
341341
"Whether to only deserialize changelog events for captured tables during incremental phase. "
342342
+ "When set to true, only changelog events for the target tables will be deserialized, "
343343
+ "which can speed up binlog reading. Defaults to false.");
344+
345+
@Experimental
346+
public static final ConfigOption<Integer> SCAN_INCREMENTAL_DESERIALIZE_PARALLELISM =
347+
ConfigOptions.key("scan.incremental.deserialize.parallelism")
348+
.intType()
349+
.defaultValue(1)
350+
.withDescription(
351+
"The number of parallel threads used to deserialize binlog events "
352+
+ "during the incremental phase. Default is 1 (single-threaded, "
353+
+ "original behavior). Setting this to a value greater than 1 "
354+
+ "enables concurrent deserialization to improve throughput.");
344355
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,8 @@ public MySqlPipelineRecordEmitter(
9898
sourceReaderMetrics,
9999
sourceConfig.isIncludeSchemaChanges(),
100100
false, // Explicitly disable heartbeat events
101-
false); // Explicitly disable transaction metadata events
101+
false, // Explicitly disable transaction metadata events
102+
sourceConfig.getDeserializeParallelism());
102103
this.debeziumDeserializationSchema = debeziumDeserializationSchema;
103104
this.sourceConfig = sourceConfig;
104105
this.alreadySendCreateTableTables = new HashSet<>();

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumEventDeserializationSchema.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@
6868
import java.time.Instant;
6969
import java.util.Collections;
7070
import java.util.Date;
71-
import java.util.HashMap;
7271
import java.util.List;
7372
import java.util.Map;
7473
import java.util.concurrent.ConcurrentHashMap;
@@ -98,7 +97,7 @@ public DebeziumEventDeserializationSchema(
9897
SchemaDataTypeInference schemaDataTypeInference, DebeziumChangelogMode changelogMode) {
9998
this.schemaDataTypeInference = schemaDataTypeInference;
10099
this.changelogMode = changelogMode;
101-
this.createTableEventCache = new HashMap<>();
100+
this.createTableEventCache = new ConcurrentHashMap<>();
102101
}
103102

104103
@Override

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@ public static <T> MySqlSourceBuilder<T> builder() {
131131
sourceReaderMetrics,
132132
sourceConfig.isIncludeSchemaChanges(),
133133
sourceConfig.isIncludeHeartbeatEvents(),
134-
sourceConfig.isIncludeTransactionMetadataEvents()));
134+
sourceConfig.isIncludeTransactionMetadataEvents(),
135+
sourceConfig.getDeserializeParallelism()));
135136
}
136137

137138
MySqlSource(

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,15 @@ public MySqlSourceBuilder<T> assignUnboundedChunkFirst(boolean assignUnboundedCh
312312
return this;
313313
}
314314

315+
/**
316+
* The number of parallel threads used to deserialize binlog events during the incremental
317+
* phase. Default is 1 (single-threaded, original behavior).
318+
*/
319+
public MySqlSourceBuilder<T> deserializeParallelism(int deserializeParallelism) {
320+
this.configFactory.deserializeParallelism(deserializeParallelism);
321+
return this;
322+
}
323+
315324
/**
316325
* Build the {@link MySqlSource}.
317326
*

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ public class MySqlSourceConfig implements Serializable {
7373
public static boolean useLegacyJsonFormat = true;
7474
private final boolean assignUnboundedChunkFirst;
7575
private final boolean onlyDeserializeCapturedTablesChangelog;
76+
private final int deserializeParallelism;
7677

7778
// --------------------------------------------------------------------------------------------
7879
// Debezium Configurations
@@ -114,7 +115,8 @@ public class MySqlSourceConfig implements Serializable {
114115
boolean treatTinyInt1AsBoolean,
115116
boolean useLegacyJsonFormat,
116117
boolean assignUnboundedChunkFirst,
117-
boolean onlyDeserializeCapturedTablesChangelog) {
118+
boolean onlyDeserializeCapturedTablesChangelog,
119+
int deserializeParallelism) {
118120
this.hostname = checkNotNull(hostname);
119121
this.port = port;
120122
this.username = checkNotNull(username);
@@ -161,6 +163,7 @@ public class MySqlSourceConfig implements Serializable {
161163
this.useLegacyJsonFormat = useLegacyJsonFormat;
162164
this.assignUnboundedChunkFirst = assignUnboundedChunkFirst;
163165
this.onlyDeserializeCapturedTablesChangelog = onlyDeserializeCapturedTablesChangelog;
166+
this.deserializeParallelism = deserializeParallelism;
164167
}
165168

166169
public String getHostname() {
@@ -306,4 +309,8 @@ public boolean isTreatTinyInt1AsBoolean() {
306309
public boolean isOnlyDeserializeCapturedTablesChangelog() {
307310
return onlyDeserializeCapturedTablesChangelog;
308311
}
312+
313+
public int getDeserializeParallelism() {
314+
return deserializeParallelism;
315+
}
309316
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ public class MySqlSourceConfigFactory implements Serializable {
7979
private boolean useLegacyJsonFormat = true;
8080
private boolean assignUnboundedChunkFirst = false;
8181
private boolean onlyDeserializeCapturedTablesChangelog = false;
82+
private int deserializeParallelism = 1;
8283

8384
public MySqlSourceConfigFactory hostname(String hostname) {
8485
this.hostname = hostname;
@@ -352,6 +353,28 @@ public MySqlSourceConfigFactory onlyDeserializeCapturedTablesChangelog(
352353
return this;
353354
}
354355

356+
/**
357+
* The number of parallel threads used to deserialize binlog events during the incremental
358+
* phase. Default is 1 (single-threaded, original behavior). Setting this to a value greater
359+
* than 1 enables concurrent deserialization to improve throughput.
360+
*/
361+
public MySqlSourceConfigFactory deserializeParallelism(int deserializeParallelism) {
362+
if (deserializeParallelism <= 0) {
363+
throw new IllegalArgumentException(
364+
String.format(
365+
"The value of 'scan.incremental.deserialize.parallelism' must be positive, but is %d",
366+
deserializeParallelism));
367+
}
368+
if (deserializeParallelism > 256) {
369+
throw new IllegalArgumentException(
370+
String.format(
371+
"The value of 'scan.incremental.deserialize.parallelism' must not exceed 256, but is %d",
372+
deserializeParallelism));
373+
}
374+
this.deserializeParallelism = deserializeParallelism;
375+
return this;
376+
}
377+
355378
/** Creates a new {@link MySqlSourceConfig} for the given subtask {@code subtaskId}. */
356379
public MySqlSourceConfig createConfig(int subtaskId) {
357380
// hard code server name, because we don't need to distinguish it, docs:
@@ -456,6 +479,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
456479
treatTinyInt1AsBoolean,
457480
useLegacyJsonFormat,
458481
assignUnboundedChunkFirst,
459-
onlyDeserializeCapturedTablesChangelog);
482+
onlyDeserializeCapturedTablesChangelog,
483+
deserializeParallelism);
460484
}
461485
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,4 +292,15 @@ public class MySqlSourceOptions {
292292
.defaultValue(true)
293293
.withDescription(
294294
"Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.");
295+
296+
@Experimental
297+
public static final ConfigOption<Integer> SCAN_INCREMENTAL_DESERIALIZE_PARALLELISM =
298+
ConfigOptions.key("scan.incremental.deserialize.parallelism")
299+
.intType()
300+
.defaultValue(1)
301+
.withDescription(
302+
"The number of parallel threads used to deserialize binlog events "
303+
+ "during the incremental phase. Default is 1 (single-threaded, "
304+
+ "original behavior). Setting this to a value greater than 1 "
305+
+ "enables concurrent deserialization to improve throughput.");
295306
}

0 commit comments

Comments
 (0)