Skip to content

Commit bffd15e

Browse files
guoxuanlinlinguoxuan
authored andcommitted
[pipeline][mysql] skip binlog deserialization for non-captured table
1 parent 69dbcb3 commit bffd15e

10 files changed

Lines changed: 496 additions & 26 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
7979
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
8080
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
81+
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_ONLY_DESERIALIZE_CAPTURED_TABLES_CHANGELOG_ENABLED;
8182
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
8283
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
8384
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
@@ -167,6 +168,8 @@ public DataSource createDataSource(Context context) {
167168
boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
168169
boolean isAssignUnboundedChunkFirst =
169170
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
171+
boolean onlyDeserializeCapturedTablesChangelog =
172+
config.get(SCAN_ONLY_DESERIALIZE_CAPTURED_TABLES_CHANGELOG_ENABLED);
170173

171174
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
172175
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -220,7 +223,9 @@ public DataSource createDataSource(Context context) {
220223
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean)
221224
.useLegacyJsonFormat(useLegacyJsonFormat)
222225
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
223-
.skipSnapshotBackfill(skipSnapshotBackfill);
226+
.skipSnapshotBackfill(skipSnapshotBackfill)
227+
.onlyDeserializeCapturedTablesChangelog(
228+
onlyDeserializeCapturedTablesChangelog);
224229

225230
List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
226231

@@ -358,6 +363,7 @@ public Set<ConfigOption<?>> optionalOptions() {
358363
options.add(PARSE_ONLINE_SCHEMA_CHANGES);
359364
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
360365
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
366+
options.add(SCAN_ONLY_DESERIALIZE_CAPTURED_TABLES_CHANGELOG_ENABLED);
361367
return options;
362368
}
363369

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,4 +330,16 @@ public class MySqlDataSourceOptions {
330330
.defaultValue(false)
331331
.withDescription(
332332
"Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.");
333+
334+
@Experimental
335+
public static final ConfigOption<Boolean>
336+
SCAN_ONLY_DESERIALIZE_CAPTURED_TABLES_CHANGELOG_ENABLED =
337+
ConfigOptions.key(
338+
"scan.only.deserialize.captured.tables.changelog.enabled")
339+
.booleanType()
340+
.defaultValue(false)
341+
.withDescription(
342+
"Whether to only deserialize changelog events for captured tables during incremental phase. "
343+
+ "When set to true, only changelog events for the target tables will be deserialized, "
344+
+ "which can speed up binlog reading. Defaults to false.");
333345
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
4444
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
4545
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
46+
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_ONLY_DESERIALIZE_CAPTURED_TABLES_CHANGELOG_ENABLED;
4647
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
4748
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
4849
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TREAT_TINYINT1_AS_BOOLEAN_ENABLED;
@@ -278,6 +279,27 @@ public void testOptionalOption() {
278279
assertThat(dataSource.getSourceConfig().isAssignUnboundedChunkFirst()).isTrue();
279280
}
280281

282+
@Test
283+
public void testOnlyDeserializeCapturedTablesChangelogOption() {
284+
inventoryDatabase.createAndInitialize();
285+
Map<String, String> options = new HashMap<>();
286+
options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
287+
options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
288+
options.put(USERNAME.key(), TEST_USER);
289+
options.put(PASSWORD.key(), TEST_PASSWORD);
290+
options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*");
291+
options.put(SCAN_ONLY_DESERIALIZE_CAPTURED_TABLES_CHANGELOG_ENABLED.key(), "true");
292+
293+
Factory.Context context = new MockContext(Configuration.fromMap(options));
294+
MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
295+
assertThat(factory.optionalOptions())
296+
.contains(SCAN_ONLY_DESERIALIZE_CAPTURED_TABLES_CHANGELOG_ENABLED);
297+
298+
MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
299+
assertThat(dataSource.getSourceConfig().isOnlyDeserializeCapturedTablesChangelog())
300+
.isTrue();
301+
}
302+
281303
@Test
282304
void testPrefixRequireOption() {
283305
inventoryDatabase.createAndInitialize();

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1929,6 +1929,164 @@ private List<Event> getSnapshotExpected(TableId tableId) {
19291929
return snapshotExpected;
19301930
}
19311931

1932+
@Test
1933+
void testOnlyDeserializeCapturedTablesChangelog() throws Exception {
1934+
inventoryDatabase.createAndInitialize();
1935+
Configuration sourceConfiguration = new Configuration();
1936+
sourceConfiguration.set(MySqlDataSourceOptions.HOSTNAME, MYSQL8_CONTAINER.getHost());
1937+
sourceConfiguration.set(MySqlDataSourceOptions.PORT, MYSQL8_CONTAINER.getDatabasePort());
1938+
sourceConfiguration.set(MySqlDataSourceOptions.USERNAME, TEST_USER);
1939+
sourceConfiguration.set(MySqlDataSourceOptions.PASSWORD, TEST_PASSWORD);
1940+
1941+
sourceConfiguration.set(
1942+
MySqlDataSourceOptions.TABLES, inventoryDatabase.getDatabaseName() + ".products");
1943+
sourceConfiguration.set(
1944+
MySqlDataSourceOptions.SERVER_ID, getServerId(env.getParallelism()));
1945+
sourceConfiguration.set(MySqlDataSourceOptions.SERVER_TIME_ZONE, "UTC");
1946+
1947+
sourceConfiguration.set(
1948+
MySqlDataSourceOptions.SCAN_ONLY_DESERIALIZE_CAPTURED_TABLES_CHANGELOG_ENABLED,
1949+
true);
1950+
1951+
Factory.Context context =
1952+
new FactoryHelper.DefaultContext(
1953+
sourceConfiguration, new Configuration(), this.getClass().getClassLoader());
1954+
FlinkSourceProvider sourceProvider =
1955+
(FlinkSourceProvider)
1956+
new MySqlDataSourceFactory()
1957+
.createDataSource(context)
1958+
.getEventSourceProvider();
1959+
CloseableIterator<Event> events =
1960+
env.fromSource(
1961+
sourceProvider.getSource(),
1962+
WatermarkStrategy.noWatermarks(),
1963+
MySqlDataSourceFactory.IDENTIFIER,
1964+
new EventTypeInfo())
1965+
.executeAndCollect();
1966+
Thread.sleep(10_000);
1967+
1968+
TableId tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), "products");
1969+
CreateTableEvent createTableEvent = getProductsCreateTableEvent(tableId);
1970+
1971+
List<Event> expectedSnapshot = getSnapshotExpected(tableId);
1972+
1973+
List<Event> expectedBinlog = new ArrayList<>();
1974+
try (Connection connection = inventoryDatabase.getJdbcConnection();
1975+
Statement statement = connection.createStatement()) {
1976+
expectedBinlog.addAll(executeAlterAndProvideExpected(tableId, statement));
1977+
1978+
RowType rowType =
1979+
RowType.of(
1980+
new DataType[] {
1981+
DataTypes.INT().notNull(),
1982+
DataTypes.VARCHAR(255).notNull(),
1983+
DataTypes.FLOAT(),
1984+
DataTypes.VARCHAR(45),
1985+
DataTypes.VARCHAR(55)
1986+
},
1987+
new String[] {"id", "name", "weight", "col1", "col2"});
1988+
BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType);
1989+
1990+
statement.execute(
1991+
String.format(
1992+
"INSERT INTO `%s`.`products` VALUES (default,'scooter',5.5,'c-10','c-20');",
1993+
inventoryDatabase.getDatabaseName())); // 110
1994+
expectedBinlog.add(
1995+
DataChangeEvent.insertEvent(
1996+
tableId,
1997+
generator.generate(
1998+
new Object[] {
1999+
110,
2000+
BinaryStringData.fromString("scooter"),
2001+
5.5f,
2002+
BinaryStringData.fromString("c-10"),
2003+
BinaryStringData.fromString("c-20")
2004+
})));
2005+
2006+
statement.execute(
2007+
String.format(
2008+
"INSERT INTO `%s`.`orders` VALUES (default, '2023-01-01', 1001, 5, 102, 'test1');",
2009+
inventoryDatabase.getDatabaseName()));
2010+
statement.execute(
2011+
String.format(
2012+
"INSERT INTO `%s`.`orders` VALUES (default, '2023-01-02', 1002, 3, 103, 'test2');",
2013+
inventoryDatabase.getDatabaseName()));
2014+
statement.execute(
2015+
String.format(
2016+
"UPDATE `%s`.`orders` SET `quantity`=10 WHERE `order_number`=10001;",
2017+
inventoryDatabase.getDatabaseName()));
2018+
2019+
statement.execute(
2020+
String.format(
2021+
"INSERT INTO `%s`.`products` VALUES (default,'football',6.6,'c-11','c-21');",
2022+
inventoryDatabase.getDatabaseName())); // 111
2023+
expectedBinlog.add(
2024+
DataChangeEvent.insertEvent(
2025+
tableId,
2026+
generator.generate(
2027+
new Object[] {
2028+
111,
2029+
BinaryStringData.fromString("football"),
2030+
6.6f,
2031+
BinaryStringData.fromString("c-11"),
2032+
BinaryStringData.fromString("c-21")
2033+
})));
2034+
2035+
statement.execute(
2036+
String.format(
2037+
"DELETE FROM `%s`.`orders` WHERE `order_number`=10005;",
2038+
inventoryDatabase.getDatabaseName()));
2039+
2040+
statement.execute(
2041+
String.format(
2042+
"UPDATE `%s`.`products` SET `col1`='c-12', `col2`='c-22' WHERE id=110;",
2043+
inventoryDatabase.getDatabaseName()));
2044+
expectedBinlog.add(
2045+
DataChangeEvent.updateEvent(
2046+
tableId,
2047+
generator.generate(
2048+
new Object[] {
2049+
110,
2050+
BinaryStringData.fromString("scooter"),
2051+
5.5f,
2052+
BinaryStringData.fromString("c-10"),
2053+
BinaryStringData.fromString("c-20")
2054+
}),
2055+
generator.generate(
2056+
new Object[] {
2057+
110,
2058+
BinaryStringData.fromString("scooter"),
2059+
5.5f,
2060+
BinaryStringData.fromString("c-12"),
2061+
BinaryStringData.fromString("c-22")
2062+
})));
2063+
2064+
statement.execute(
2065+
String.format(
2066+
"DELETE FROM `%s`.`products` WHERE `id` = 111;",
2067+
inventoryDatabase.getDatabaseName()));
2068+
expectedBinlog.add(
2069+
DataChangeEvent.deleteEvent(
2070+
tableId,
2071+
generator.generate(
2072+
new Object[] {
2073+
111,
2074+
BinaryStringData.fromString("football"),
2075+
6.6f,
2076+
BinaryStringData.fromString("c-11"),
2077+
BinaryStringData.fromString("c-21")
2078+
})));
2079+
}
2080+
2081+
List<Event> actual =
2082+
fetchResultsExcept(
2083+
events, expectedSnapshot.size() + expectedBinlog.size(), createTableEvent);
2084+
assertThat(actual.subList(0, expectedSnapshot.size()))
2085+
.containsExactlyInAnyOrder(expectedSnapshot.toArray(new Event[0]));
2086+
assertThat(actual.subList(expectedSnapshot.size(), actual.size()))
2087+
.isEqualTo(expectedBinlog);
2088+
}
2089+
19322090
/**
19332091
* * The final schema of table products is as follows.
19342092
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright Debezium Authors.
3+
*
4+
* Licensed under the Apache Software License version 2.0, available at
5+
* http://www.apache.org/licenses/LICENSE-2.0
6+
*/
7+
8+
package io.debezium.connector.mysql;
9+
10+
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
11+
import com.github.shyiko.mysql.binlog.event.EventData;
12+
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
13+
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
14+
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
15+
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializer;
16+
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
17+
import io.debezium.relational.TableId;
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
20+
21+
import java.io.IOException;
22+
import java.util.Collections;
23+
import java.util.Map;
24+
import java.util.function.Predicate;
25+
26+
/**
27+
* A filtering wrapper around row event data deserializers (WRITE_ROWS, UPDATE_ROWS, DELETE_ROWS).
28+
*
29+
* <p>When enabled, this deserializer reads the table ID from the first 6 bytes of the event data,
30+
* looks up the table name via the TABLE_MAP event cache, and checks whether the table is in the
31+
* captured table list. If the table is NOT captured, it returns an empty EventData (with tableId set
32+
* but rows empty) to skip the expensive row-by-row deserialization. The remaining bytes are
33+
* automatically skipped by {@code EventDeserializer.deserializeEventData()} via {@code
34+
* skipToTheEndOfTheBlock()}.
35+
*
36+
* <p>If the table IS captured, it resets the stream position and delegates to the original
37+
* deserializer for full deserialization.
38+
*/
39+
public class FilteringRowsEventDataDeserializer<T extends EventData>
40+
implements EventDataDeserializer<T> {
41+
42+
private static final Logger LOGGER =
43+
LoggerFactory.getLogger(FilteringRowsEventDataDeserializer.class);
44+
45+
/** The length in bytes of the table ID field in MySQL binlog row events. */
46+
private static final int TABLE_ID_LENGTH = 6;
47+
48+
/** The type of row event this deserializer handles. */
49+
public enum RowEventType {
50+
WRITE,
51+
UPDATE,
52+
DELETE
53+
}
54+
55+
private final EventDataDeserializer<T> delegate;
56+
private final Map<Long, TableMapEventData> tableMapEventByTableId;
57+
private final Predicate<TableId> tableFilter;
58+
private final RowEventType rowEventType;
59+
60+
public FilteringRowsEventDataDeserializer(
61+
EventDataDeserializer<T> delegate,
62+
Map<Long, TableMapEventData> tableMapEventByTableId,
63+
Predicate<TableId> tableFilter,
64+
RowEventType rowEventType) {
65+
this.delegate = delegate;
66+
this.tableMapEventByTableId = tableMapEventByTableId;
67+
this.tableFilter = tableFilter;
68+
this.rowEventType = rowEventType;
69+
}
70+
71+
@Override
72+
@SuppressWarnings("unchecked")
73+
public T deserialize(ByteArrayInputStream inputStream) throws IOException {
74+
// Read all remaining bytes from the stream so we can inspect the table ID
75+
// without relying on mark/reset (which may not be supported by the underlying stream).
76+
int available = inputStream.available();
77+
byte[] allBytes = inputStream.read(available);
78+
79+
// The first 6 bytes encode the table number (same format as all row events)
80+
long tableNumber = 0;
81+
for (int i = 0; i < TABLE_ID_LENGTH; i++) {
82+
tableNumber |= ((long) (allBytes[i] & 0xFF)) << (i * 8);
83+
}
84+
85+
// Look up the table name from the TABLE_MAP event cache
86+
TableMapEventData tableMapEvent = tableMapEventByTableId.get(tableNumber);
87+
if (tableMapEvent != null) {
88+
String database = tableMapEvent.getDatabase();
89+
String table = tableMapEvent.getTable();
90+
TableId tableId = new TableId(database, null, table);
91+
92+
if (!tableFilter.test(tableId)) {
93+
// Table is NOT captured, skip deserialization by returning empty EventData
94+
LOGGER.debug(
95+
"Skipping deserialization for non-captured table: {}.{}", database, table);
96+
return createEmptyEventData(tableNumber);
97+
}
98+
}
99+
100+
ByteArrayInputStream newStream = new ByteArrayInputStream(allBytes);
101+
return delegate.deserialize(newStream);
102+
}
103+
104+
@SuppressWarnings("unchecked")
105+
private T createEmptyEventData(long tableNumber) {
106+
switch (rowEventType) {
107+
case WRITE:
108+
WriteRowsEventData writeData = new WriteRowsEventData();
109+
writeData.setTableId(tableNumber);
110+
writeData.setRows(Collections.emptyList());
111+
return (T) writeData;
112+
case UPDATE:
113+
UpdateRowsEventData updateData = new UpdateRowsEventData();
114+
updateData.setTableId(tableNumber);
115+
updateData.setRows(Collections.emptyList());
116+
return (T) updateData;
117+
case DELETE:
118+
DeleteRowsEventData deleteData = new DeleteRowsEventData();
119+
deleteData.setTableId(tableNumber);
120+
deleteData.setRows(Collections.emptyList());
121+
return (T) deleteData;
122+
default:
123+
throw new IllegalStateException("Unknown row event type: " + rowEventType);
124+
}
125+
}
126+
}

0 commit comments

Comments
 (0)