Skip to content

Commit 087831c

Browse files
author
guoxuanlin
committed
[pipeline][mysql] skip binlog deserialization for non-captured table
1 parent 69dbcb3 commit 087831c

8 files changed

Lines changed: 309 additions & 26 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
7979
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
8080
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
81+
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_ONLY_DESERIALIZE_CAPTURED_TABLES_CHANGELOG_ENABLED;
8182
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
8283
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
8384
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
@@ -167,6 +168,8 @@ public DataSource createDataSource(Context context) {
167168
boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
168169
boolean isAssignUnboundedChunkFirst =
169170
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
171+
boolean onlyDeserializeCapturedTablesChangelog =
172+
config.get(SCAN_ONLY_DESERIALIZE_CAPTURED_TABLES_CHANGELOG_ENABLED);
170173

171174
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
172175
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -220,7 +223,9 @@ public DataSource createDataSource(Context context) {
220223
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean)
221224
.useLegacyJsonFormat(useLegacyJsonFormat)
222225
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
223-
.skipSnapshotBackfill(skipSnapshotBackfill);
226+
.skipSnapshotBackfill(skipSnapshotBackfill)
227+
.onlyDeserializeCapturedTablesChangelog(
228+
onlyDeserializeCapturedTablesChangelog);
224229

225230
List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
226231

@@ -358,6 +363,7 @@ public Set<ConfigOption<?>> optionalOptions() {
358363
options.add(PARSE_ONLINE_SCHEMA_CHANGES);
359364
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
360365
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
366+
options.add(SCAN_ONLY_DESERIALIZE_CAPTURED_TABLES_CHANGELOG_ENABLED);
361367
return options;
362368
}
363369

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,4 +330,16 @@ public class MySqlDataSourceOptions {
330330
.defaultValue(false)
331331
.withDescription(
332332
"Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.");
333+
334+
@Experimental
335+
public static final ConfigOption<Boolean>
336+
SCAN_ONLY_DESERIALIZE_CAPTURED_TABLES_CHANGELOG_ENABLED =
337+
ConfigOptions.key(
338+
"scan.only.deserialize.captured.tables.changelog.enabled")
339+
.booleanType()
340+
.defaultValue(false)
341+
.withDescription(
342+
"Whether to only deserialize changelog events for captured tables during incremental phase. "
343+
+ "When set to true, only changelog events for the target tables will be deserialized, "
344+
+ "which can speed up binlog reading. Defaults to false.");
333345
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Copyright Debezium Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at
5+
* http://www.apache.org/licenses/LICENSE-2.0
6+
*/
7+
8+
package io.debezium.connector.mysql;
9+
10+
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
11+
import com.github.shyiko.mysql.binlog.event.EventData;
12+
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
13+
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
14+
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
15+
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializer;
16+
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
17+
import io.debezium.relational.TableId;
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
20+
21+
import java.io.IOException;
22+
import java.util.Collections;
23+
import java.util.Map;
24+
import java.util.function.Predicate;
25+
26+
/**
27+
* A filtering wrapper around row event data deserializers (WRITE_ROWS, UPDATE_ROWS, DELETE_ROWS).
28+
*
29+
* <p>When enabled, this deserializer reads the table ID from the first 6 bytes of the event data,
30+
* looks up the table name via the TABLE_MAP event cache, and checks whether the table is in the
31+
* captured table list. If the table is NOT captured, it returns an empty EventData (with tableId set
32+
* but rows empty) to skip the expensive row-by-row deserialization. The remaining bytes are
33+
* automatically skipped by {@code EventDeserializer.deserializeEventData()} via {@code
34+
* skipToTheEndOfTheBlock()}.
35+
*
36+
* <p>If the table IS captured, it resets the stream position and delegates to the original
37+
* deserializer for full deserialization.
38+
*/
39+
public class FilteringRowsEventDataDeserializer<T extends EventData>
40+
implements EventDataDeserializer<T> {
41+
42+
private static final Logger LOGGER =
43+
LoggerFactory.getLogger(FilteringRowsEventDataDeserializer.class);
44+
45+
/** The type of row event this deserializer handles. */
46+
public enum RowEventType {
47+
WRITE,
48+
UPDATE,
49+
DELETE
50+
}
51+
52+
private final EventDataDeserializer<T> delegate;
53+
private final Map<Long, TableMapEventData> tableMapEventByTableId;
54+
private final Predicate<TableId> tableFilter;
55+
private final RowEventType rowEventType;
56+
57+
public FilteringRowsEventDataDeserializer(
58+
EventDataDeserializer<T> delegate,
59+
Map<Long, TableMapEventData> tableMapEventByTableId,
60+
Predicate<TableId> tableFilter,
61+
RowEventType rowEventType) {
62+
this.delegate = delegate;
63+
this.tableMapEventByTableId = tableMapEventByTableId;
64+
this.tableFilter = tableFilter;
65+
this.rowEventType = rowEventType;
66+
}
67+
68+
@Override
69+
@SuppressWarnings("unchecked")
70+
public T deserialize(ByteArrayInputStream inputStream) throws IOException {
71+
// Mark the current position so we can reset if the table is captured
72+
inputStream.mark(Integer.MAX_VALUE);
73+
74+
// Read the 6-byte table number (same format as all row events)
75+
long tableNumber = inputStream.readLong(6);
76+
77+
// Look up the table name from the TABLE_MAP event cache
78+
TableMapEventData tableMapEvent = tableMapEventByTableId.get(tableNumber);
79+
if (tableMapEvent != null) {
80+
String database = tableMapEvent.getDatabase();
81+
String table = tableMapEvent.getTable();
82+
TableId tableId = new TableId(database, null, table);
83+
84+
if (!tableFilter.test(tableId)) {
85+
// Table is NOT captured, skip deserialization by returning empty EventData
86+
LOGGER.trace(
87+
"Skipping deserialization for non-captured table: {}.{}", database, table);
88+
return createEmptyEventData(tableNumber);
89+
}
90+
}
91+
92+
// Table IS captured (or unknown), reset and delegate to original deserializer
93+
inputStream.reset();
94+
return delegate.deserialize(inputStream);
95+
}
96+
97+
@SuppressWarnings("unchecked")
98+
private T createEmptyEventData(long tableNumber) {
99+
switch (rowEventType) {
100+
case WRITE:
101+
WriteRowsEventData writeData = new WriteRowsEventData();
102+
writeData.setTableId(tableNumber);
103+
writeData.setRows(Collections.emptyList());
104+
return (T) writeData;
105+
case UPDATE:
106+
UpdateRowsEventData updateData = new UpdateRowsEventData();
107+
updateData.setTableId(tableNumber);
108+
updateData.setRows(Collections.emptyList());
109+
return (T) updateData;
110+
case DELETE:
111+
DeleteRowsEventData deleteData = new DeleteRowsEventData();
112+
deleteData.setTableId(tableNumber);
113+
deleteData.setRows(Collections.emptyList());
114+
return (T) deleteData;
115+
default:
116+
throw new IllegalStateException("Unknown row event type: " + rowEventType);
117+
}
118+
}
119+
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java

Lines changed: 107 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ public class MySqlStreamingChangeEventSource
131131
private final MySqlConnection connection;
132132
private final EventDispatcher<MySqlPartition, TableId> eventDispatcher;
133133
private final ErrorHandler errorHandler;
134+
private final boolean onlyDeserializeCapturedTablesChangelog;
135+
private final Predicate<TableId> capturedTableFilter;
134136

135137
@SingleThreadAccess("binlog client thread")
136138
private Instant eventTimestamp;
@@ -204,6 +206,28 @@ public MySqlStreamingChangeEventSource(
204206
Clock clock,
205207
MySqlTaskContext taskContext,
206208
MySqlStreamingChangeEventSourceMetrics metrics) {
209+
this(
210+
connectorConfig,
211+
connection,
212+
dispatcher,
213+
errorHandler,
214+
clock,
215+
taskContext,
216+
metrics,
217+
false,
218+
null);
219+
}
220+
221+
public MySqlStreamingChangeEventSource(
222+
MySqlConnectorConfig connectorConfig,
223+
MySqlConnection connection,
224+
EventDispatcher<MySqlPartition, TableId> dispatcher,
225+
ErrorHandler errorHandler,
226+
Clock clock,
227+
MySqlTaskContext taskContext,
228+
MySqlStreamingChangeEventSourceMetrics metrics,
229+
boolean onlyDeserializeCapturedTablesChangelog,
230+
Predicate<TableId> capturedTableFilter) {
207231

208232
this.taskContext = taskContext;
209233
this.connectorConfig = connectorConfig;
@@ -212,6 +236,8 @@ public MySqlStreamingChangeEventSource(
212236
this.eventDispatcher = dispatcher;
213237
this.errorHandler = errorHandler;
214238
this.metrics = metrics;
239+
this.onlyDeserializeCapturedTablesChangelog = onlyDeserializeCapturedTablesChangelog;
240+
this.capturedTableFilter = capturedTableFilter;
215241

216242
eventDeserializationFailureHandlingMode =
217243
connectorConfig.getEventProcessingFailureHandlingMode();
@@ -319,27 +345,87 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
319345
// Add our custom deserializers ...
320346
eventDeserializer.setEventDataDeserializer(EventType.STOP, new StopEventDataDeserializer());
321347
eventDeserializer.setEventDataDeserializer(EventType.GTID, new GtidEventDataDeserializer());
322-
eventDeserializer.setEventDataDeserializer(
323-
EventType.WRITE_ROWS,
324-
new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId));
325-
eventDeserializer.setEventDataDeserializer(
326-
EventType.UPDATE_ROWS,
327-
new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId));
328-
eventDeserializer.setEventDataDeserializer(
329-
EventType.DELETE_ROWS,
330-
new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId));
331-
eventDeserializer.setEventDataDeserializer(
332-
EventType.EXT_WRITE_ROWS,
333-
new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId)
334-
.setMayContainExtraInformation(true));
335-
eventDeserializer.setEventDataDeserializer(
336-
EventType.EXT_UPDATE_ROWS,
337-
new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId)
338-
.setMayContainExtraInformation(true));
339-
eventDeserializer.setEventDataDeserializer(
340-
EventType.EXT_DELETE_ROWS,
341-
new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId)
342-
.setMayContainExtraInformation(true));
348+
349+
// Create row deserializers, optionally wrapped with filtering for non-captured tables
350+
RowDeserializers.WriteRowsDeserializer writeRowsDeserializer =
351+
new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId);
352+
RowDeserializers.UpdateRowsDeserializer updateRowsDeserializer =
353+
new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId);
354+
RowDeserializers.DeleteRowsDeserializer deleteRowsDeserializer =
355+
new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId);
356+
RowDeserializers.WriteRowsDeserializer extWriteRowsDeserializer =
357+
(RowDeserializers.WriteRowsDeserializer)
358+
new RowDeserializers.WriteRowsDeserializer(tableMapEventByTableId)
359+
.setMayContainExtraInformation(true);
360+
RowDeserializers.UpdateRowsDeserializer extUpdateRowsDeserializer =
361+
(RowDeserializers.UpdateRowsDeserializer)
362+
new RowDeserializers.UpdateRowsDeserializer(tableMapEventByTableId)
363+
.setMayContainExtraInformation(true);
364+
RowDeserializers.DeleteRowsDeserializer extDeleteRowsDeserializer =
365+
(RowDeserializers.DeleteRowsDeserializer)
366+
new RowDeserializers.DeleteRowsDeserializer(tableMapEventByTableId)
367+
.setMayContainExtraInformation(true);
368+
369+
if (onlyDeserializeCapturedTablesChangelog && capturedTableFilter != null) {
370+
LOGGER.debug(
371+
"Only deserializing changelog events for captured tables is enabled. "
372+
+ "Non-captured table row events will skip deserialization.");
373+
eventDeserializer.setEventDataDeserializer(
374+
EventType.WRITE_ROWS,
375+
new FilteringRowsEventDataDeserializer<>(
376+
writeRowsDeserializer,
377+
tableMapEventByTableId,
378+
capturedTableFilter,
379+
FilteringRowsEventDataDeserializer.RowEventType.WRITE));
380+
eventDeserializer.setEventDataDeserializer(
381+
EventType.UPDATE_ROWS,
382+
new FilteringRowsEventDataDeserializer<>(
383+
updateRowsDeserializer,
384+
tableMapEventByTableId,
385+
capturedTableFilter,
386+
FilteringRowsEventDataDeserializer.RowEventType.UPDATE));
387+
eventDeserializer.setEventDataDeserializer(
388+
EventType.DELETE_ROWS,
389+
new FilteringRowsEventDataDeserializer<>(
390+
deleteRowsDeserializer,
391+
tableMapEventByTableId,
392+
capturedTableFilter,
393+
FilteringRowsEventDataDeserializer.RowEventType.DELETE));
394+
eventDeserializer.setEventDataDeserializer(
395+
EventType.EXT_WRITE_ROWS,
396+
new FilteringRowsEventDataDeserializer<>(
397+
extWriteRowsDeserializer,
398+
tableMapEventByTableId,
399+
capturedTableFilter,
400+
FilteringRowsEventDataDeserializer.RowEventType.WRITE));
401+
eventDeserializer.setEventDataDeserializer(
402+
EventType.EXT_UPDATE_ROWS,
403+
new FilteringRowsEventDataDeserializer<>(
404+
extUpdateRowsDeserializer,
405+
tableMapEventByTableId,
406+
capturedTableFilter,
407+
FilteringRowsEventDataDeserializer.RowEventType.UPDATE));
408+
eventDeserializer.setEventDataDeserializer(
409+
EventType.EXT_DELETE_ROWS,
410+
new FilteringRowsEventDataDeserializer<>(
411+
extDeleteRowsDeserializer,
412+
tableMapEventByTableId,
413+
capturedTableFilter,
414+
FilteringRowsEventDataDeserializer.RowEventType.DELETE));
415+
} else {
416+
eventDeserializer.setEventDataDeserializer(
417+
EventType.WRITE_ROWS, writeRowsDeserializer);
418+
eventDeserializer.setEventDataDeserializer(
419+
EventType.UPDATE_ROWS, updateRowsDeserializer);
420+
eventDeserializer.setEventDataDeserializer(
421+
EventType.DELETE_ROWS, deleteRowsDeserializer);
422+
eventDeserializer.setEventDataDeserializer(
423+
EventType.EXT_WRITE_ROWS, extWriteRowsDeserializer);
424+
eventDeserializer.setEventDataDeserializer(
425+
EventType.EXT_UPDATE_ROWS, extUpdateRowsDeserializer);
426+
eventDeserializer.setEventDataDeserializer(
427+
EventType.EXT_DELETE_ROWS, extDeleteRowsDeserializer);
428+
}
343429
client.setEventDeserializer(eventDeserializer);
344430
}
345431

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,11 @@ public void submitSplit(MySqlSplit mySqlSplit) {
137137
(MySqlStreamingChangeEventSourceMetrics)
138138
statefulTaskContext.getStreamingChangeEventSourceMetrics(),
139139
currentBinlogSplit,
140-
createEventFilter());
140+
createEventFilter(),
141+
statefulTaskContext
142+
.getSourceConfig()
143+
.isOnlyDeserializeCapturedTablesChangelog(),
144+
statefulTaskContext.getSourceConfig().getTableFilter());
141145

142146
executorService.submit(
143147
() -> {

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlBinlogSplitReadTask.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,44 @@ public MySqlBinlogSplitReadTask(
6969
MySqlStreamingChangeEventSourceMetrics metrics,
7070
MySqlBinlogSplit binlogSplit,
7171
Predicate<Event> eventFilter) {
72-
super(connectorConfig, connection, dispatcher, errorHandler, clock, taskContext, metrics);
72+
this(
73+
connectorConfig,
74+
connection,
75+
dispatcher,
76+
signalEventDispatcher,
77+
errorHandler,
78+
clock,
79+
taskContext,
80+
metrics,
81+
binlogSplit,
82+
eventFilter,
83+
false,
84+
null);
85+
}
86+
87+
public MySqlBinlogSplitReadTask(
88+
MySqlConnectorConfig connectorConfig,
89+
MySqlConnection connection,
90+
EventDispatcherImpl<TableId> dispatcher,
91+
SignalEventDispatcher signalEventDispatcher,
92+
ErrorHandler errorHandler,
93+
Clock clock,
94+
MySqlTaskContext taskContext,
95+
MySqlStreamingChangeEventSourceMetrics metrics,
96+
MySqlBinlogSplit binlogSplit,
97+
Predicate<Event> eventFilter,
98+
boolean onlyDeserializeCapturedTablesChangelog,
99+
Predicate<TableId> capturedTableFilter) {
100+
super(
101+
connectorConfig,
102+
connection,
103+
dispatcher,
104+
errorHandler,
105+
clock,
106+
taskContext,
107+
metrics,
108+
onlyDeserializeCapturedTablesChangelog,
109+
capturedTableFilter);
73110
this.binlogSplit = binlogSplit;
74111
this.eventDispatcher = dispatcher;
75112
this.errorHandler = errorHandler;

0 commit comments

Comments
 (0)