Skip to content

Commit ac405b3

Browse files
make skip-unsubscribed-table binlog deserialization configurable
1 parent b49b948 commit ac405b3

18 files changed

Lines changed: 333 additions & 82 deletions

File tree

docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -433,6 +433,17 @@ Flink SQL> SELECT * FROM orders;
433433
<li>false(默认):所有类型的消息都保持原样下发。</li>
434434
</td>
435435
</tr>
436+
<tr>
437+
<td>scan.binlog.skip-unsubscribed-tables.enabled</td>
438+
<td>optional</td>
439+
<td style="word-wrap: break-word;">false</td>
440+
<td>Boolean</td>
441+
<td>
442+
在增量解析阶段,是否跳过未订阅表的增量数据(binlog)反序列化。<br>
443+
建议在订阅部分表的场景下选择开启,能过滤非订阅表的增量数据解析,提升解析性能。<br>
444+
这是一项实验性功能,默认关闭。
445+
</td>
446+
</tr>
436447
<tr>
437448
<td>scan.incremental.snapshot.backfill.skip</td>
438449
<td>optional</td>

docs/content.zh/docs/connectors/pipeline-connectors/mysql.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,17 @@ pipeline:
286286
scan.binlog.newly-added-table.enabled: 只在 binlog 读取阶段读取新增表的增量数据。
287287
</td>
288288
</tr>
289+
<tr>
290+
<td>scan.binlog.skip-unsubscribed-tables.enabled</td>
291+
<td>optional</td>
292+
<td style="word-wrap: break-word;">false</td>
293+
<td>Boolean</td>
294+
<td>
295+
在增量解析阶段,是否跳过未订阅表的增量数据(binlog)反序列化。<br>
296+
建议在订阅部分表的场景下选择开启,能过滤非订阅表的增量数据解析,提升解析性能。<br>
297+
这是一项实验性功能,默认关闭。
298+
</td>
299+
</tr>
289300
<tr>
290301
<td>scan.parse.online.schema.changes.enabled</td>
291302
<td>optional</td>

docs/content/docs/connectors/flink-sources/mysql-cdc.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,17 @@ Only valid for cdc 1.x version. During a snapshot operation, the connector will
458458
<li>false (default): All types of messages are sent as is.</li>
459459
</td>
460460
</tr>
461+
<tr>
462+
<td>scan.binlog.skip-unsubscribed-tables.enabled</td>
463+
<td>optional</td>
464+
<td style="word-wrap: break-word;">false</td>
465+
<td>Boolean</td>
466+
<td>
467+
During incremental reading, whether to skip deserialization of incremental data (binlog) for unsubscribed tables.<br>
468+
It is recommended to enable this option when only a subset of tables are subscribed. It can avoid parsing incremental events of unsubscribed tables and improve performance.<br>
469+
This is an experimental feature and is disabled by default.
470+
</td>
471+
</tr>
461472
<tr>
462473
<td>scan.incremental.snapshot.backfill.skip</td>
463474
<td>optional</td>

docs/content/docs/connectors/pipeline-connectors/mysql.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,17 @@ pipeline:
293293
scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase.
294294
</td>
295295
</tr>
296+
<tr>
297+
<td>scan.binlog.skip-unsubscribed-tables.enabled</td>
298+
<td>optional</td>
299+
<td style="word-wrap: break-word;">false</td>
300+
<td>Boolean</td>
301+
<td>
302+
During incremental reading, whether to skip deserialization of incremental data (binlog) for unsubscribed tables.<br>
303+
It is recommended to enable this option when only a subset of tables are subscribed. It can avoid parsing incremental events of unsubscribed tables and improve performance.<br>
304+
This is an experimental feature and is disabled by default.
305+
</td>
306+
</tr>
296307
<tr>
297308
<td>scan.parse.online.schema.changes.enabled</td>
298309
<td>optional</td>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
7777
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
7878
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
79+
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED;
7980
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
8081
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
8182
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
@@ -167,6 +168,8 @@ public DataSource createDataSource(Context context) {
167168
boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
168169
boolean isAssignUnboundedChunkFirst =
169170
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
171+
boolean skipBinlogDeserializationOfUnsubscribedTables =
172+
config.get(SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED);
170173

171174
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
172175
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -220,6 +223,8 @@ public DataSource createDataSource(Context context) {
220223
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean)
221224
.useLegacyJsonFormat(useLegacyJsonFormat)
222225
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
226+
.skipBinlogDeserializationOfUnsubscribedTables(
227+
skipBinlogDeserializationOfUnsubscribedTables)
223228
.skipSnapshotBackfill(skipSnapshotBackfill);
224229

225230
List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
@@ -351,6 +356,7 @@ public Set<ConfigOption<?>> optionalOptions() {
351356
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
352357
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
353358
options.add(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
359+
options.add(SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED);
354360
options.add(METADATA_LIST);
355361
options.add(INCLUDE_COMMENTS_ENABLED);
356362
options.add(USE_LEGACY_JSON_FORMAT);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,4 +330,12 @@ public class MySqlDataSourceOptions {
330330
.defaultValue(false)
331331
.withDescription(
332332
"Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.");
333+
334+
@Experimental
335+
public static final ConfigOption<Boolean> SCAN_BINLOG_SKIP_UNSUBSCRIBED_TABLES_ENABLED =
336+
ConfigOptions.key("scan.binlog.skip-unsubscribed-tables.enabled")
337+
.booleanType()
338+
.defaultValue(false)
339+
.withDescription(
340+
"Whether to skip deserialization of binlog row events for unsubscribed tables.");
333341
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,21 +88,21 @@
8888
* Copied from Debezium project(1.9.8.Final) to fix
8989
* https://github.com/ververica/flink-cdc-connectors/issues/1944.
9090
*
91-
* <p>Line 1432-1443 : Adjust GTID merging logic to support recovering from job which previously
91+
* <p>Line 1454-1466 : Adjust GTID merging logic to support recovering from job which previously
9292
* specifying starting offset on start. Uses {@link GtidUtils#fixOldChannelsGtidSet} for shared
9393
* EARLIEST/LATEST logic.
9494
*
95-
* <p>Line 1444-1452 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions
95+
* <p>Line 1467-1475 : Fix LATEST mode GTID merging to avoid replaying pre-checkpoint transactions
9696
* when checkpoint GTID has non-contiguous ranges. Delegates to {@link
9797
* GtidUtils#computeLatestModeGtidSet}. See FLINK-39149.
9898
*
99-
* <p>Line 1490 : Add more error details for some exceptions.
99+
* <p>Line 1526 : Add more error details for some exceptions.
100100
*
101-
* <p>Line 951-963 : Use iterator instead of index-based loop to avoid O(n²) complexity when
101+
* <p>Line 965-977 : Use iterator instead of index-based loop to avoid O(n²) complexity when
102102
* processing LinkedList rows in handleChange method. See FLINK-38846.
103103
*
104-
* <p>Line 317, 1358-1366 : Use a {@link TableIdFilter} to skip binlog deserialization of unmatched
105-
* tables.
104+
* <p>Line 324-330, 1366-1373 : Use a {@link TableIdFilter} to skip binlog deserialization of
105+
* unmatched tables.
106106
*/
107107
public class MySqlStreamingChangeEventSource
108108
implements StreamingChangeEventSource<MySqlPartition, MySqlOffsetContext> {
@@ -207,7 +207,8 @@ public MySqlStreamingChangeEventSource(
207207
ErrorHandler errorHandler,
208208
Clock clock,
209209
MySqlTaskContext taskContext,
210-
MySqlStreamingChangeEventSourceMetrics metrics) {
210+
MySqlStreamingChangeEventSourceMetrics metrics,
211+
boolean skipBinlogDeserializationOfUnsubscribedTables) {
211212

212213
this.taskContext = taskContext;
213214
this.connectorConfig = connectorConfig;
@@ -320,7 +321,13 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
320321
}
321322
};
322323

323-
final TableIdFilter tableIdFilter = getTableIdDeserializationFilter();
324+
LOGGER.info(
325+
"Skip binlog deserialization for unsubscribed tables: {}",
326+
skipBinlogDeserializationOfUnsubscribedTables);
327+
final TableIdFilter tableIdFilter =
328+
skipBinlogDeserializationOfUnsubscribedTables
329+
? getTableIdDeserializationFilter()
330+
: TableIdFilter.all();
324331

325332
// Add our custom deserializers ...
326333
eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,10 @@ public void submitSplit(MySqlSplit mySqlSplit) {
137137
(MySqlStreamingChangeEventSourceMetrics)
138138
statefulTaskContext.getStreamingChangeEventSourceMetrics(),
139139
currentBinlogSplit,
140-
createEventFilter());
140+
createEventFilter(),
141+
statefulTaskContext
142+
.getSourceConfig()
143+
.isSkipBinlogDeserializationOfUnsubscribedTables());
141144

142145
executorService.submit(
143146
() -> {

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,10 @@ private MySqlBinlogSplitReadTask createBackfillBinlogReadTask(
254254
(MySqlStreamingChangeEventSourceMetrics)
255255
statefulTaskContext.getStreamingChangeEventSourceMetrics(),
256256
backfillBinlogSplit,
257-
event -> true);
257+
event -> true,
258+
statefulTaskContext
259+
.getSourceConfig()
260+
.isSkipBinlogDeserializationOfUnsubscribedTables());
258261
}
259262

260263
private void dispatchBinlogEndEvent(MySqlBinlogSplit backFillBinlogSplit)

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlBinlogSplitReadTask.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,17 @@ public MySqlBinlogSplitReadTask(
6868
MySqlTaskContext taskContext,
6969
MySqlStreamingChangeEventSourceMetrics metrics,
7070
MySqlBinlogSplit binlogSplit,
71-
Predicate<Event> eventFilter) {
72-
super(connectorConfig, connection, dispatcher, errorHandler, clock, taskContext, metrics);
71+
Predicate<Event> eventFilter,
72+
boolean skipBinlogDeserializationOfUnsubscribedTables) {
73+
super(
74+
connectorConfig,
75+
connection,
76+
dispatcher,
77+
errorHandler,
78+
clock,
79+
taskContext,
80+
metrics,
81+
skipBinlogDeserializationOfUnsubscribedTables);
7382
this.binlogSplit = binlogSplit;
7483
this.eventDispatcher = dispatcher;
7584
this.errorHandler = errorHandler;

0 commit comments

Comments
 (0)