Skip to content

Commit c7e66b2

Browse files
author
guoxuanlin
committed
log and test
1 parent 087831c commit c7e66b2

4 files changed

Lines changed: 191 additions & 2 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
4444
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
4545
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
46+
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_ONLY_DESERIALIZE_CAPTURED_TABLES_CHANGELOG_ENABLED;
4647
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
4748
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
4849
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TREAT_TINYINT1_AS_BOOLEAN_ENABLED;
@@ -278,6 +279,27 @@ public void testOptionalOption() {
278279
assertThat(dataSource.getSourceConfig().isAssignUnboundedChunkFirst()).isTrue();
279280
}
280281

282+
@Test
283+
public void testOnlyDeserializeCapturedTablesChangelogOption() {
284+
inventoryDatabase.createAndInitialize();
285+
Map<String, String> options = new HashMap<>();
286+
options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
287+
options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
288+
options.put(USERNAME.key(), TEST_USER);
289+
options.put(PASSWORD.key(), TEST_PASSWORD);
290+
options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*");
291+
options.put(SCAN_ONLY_DESERIALIZE_CAPTURED_TABLES_CHANGELOG_ENABLED.key(), "true");
292+
293+
Factory.Context context = new MockContext(Configuration.fromMap(options));
294+
MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
295+
assertThat(factory.optionalOptions())
296+
.contains(SCAN_ONLY_DESERIALIZE_CAPTURED_TABLES_CHANGELOG_ENABLED);
297+
298+
MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
299+
assertThat(dataSource.getSourceConfig().isOnlyDeserializeCapturedTablesChangelog())
300+
.isTrue();
301+
}
302+
281303
@Test
282304
void testPrefixRequireOption() {
283305
inventoryDatabase.createAndInitialize();

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1929,6 +1929,173 @@ private List<Event> getSnapshotExpected(TableId tableId) {
19291929
return snapshotExpected;
19301930
}
19311931

1932+
@Test
1933+
void testOnlyDeserializeCapturedTablesChangelog() throws Exception {
1934+
inventoryDatabase.createAndInitialize();
1935+
Configuration sourceConfiguration = new Configuration();
1936+
sourceConfiguration.set(MySqlDataSourceOptions.HOSTNAME, MYSQL8_CONTAINER.getHost());
1937+
sourceConfiguration.set(MySqlDataSourceOptions.PORT, MYSQL8_CONTAINER.getDatabasePort());
1938+
sourceConfiguration.set(MySqlDataSourceOptions.USERNAME, TEST_USER);
1939+
sourceConfiguration.set(MySqlDataSourceOptions.PASSWORD, TEST_PASSWORD);
1940+
// Only capture the products table
1941+
sourceConfiguration.set(
1942+
MySqlDataSourceOptions.TABLES, inventoryDatabase.getDatabaseName() + ".products");
1943+
sourceConfiguration.set(
1944+
MySqlDataSourceOptions.SERVER_ID, getServerId(env.getParallelism()));
1945+
sourceConfiguration.set(MySqlDataSourceOptions.SERVER_TIME_ZONE, "UTC");
1946+
// Enable the feature under test
1947+
sourceConfiguration.set(
1948+
MySqlDataSourceOptions.SCAN_ONLY_DESERIALIZE_CAPTURED_TABLES_CHANGELOG_ENABLED,
1949+
true);
1950+
1951+
Factory.Context context =
1952+
new FactoryHelper.DefaultContext(
1953+
sourceConfiguration, new Configuration(), this.getClass().getClassLoader());
1954+
FlinkSourceProvider sourceProvider =
1955+
(FlinkSourceProvider)
1956+
new MySqlDataSourceFactory()
1957+
.createDataSource(context)
1958+
.getEventSourceProvider();
1959+
CloseableIterator<Event> events =
1960+
env.fromSource(
1961+
sourceProvider.getSource(),
1962+
WatermarkStrategy.noWatermarks(),
1963+
MySqlDataSourceFactory.IDENTIFIER,
1964+
new EventTypeInfo())
1965+
.executeAndCollect();
1966+
Thread.sleep(10_000);
1967+
1968+
TableId tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), "products");
1969+
CreateTableEvent createTableEvent = getProductsCreateTableEvent(tableId);
1970+
1971+
// generate snapshot data
1972+
List<Event> expectedSnapshot = getSnapshotExpected(tableId);
1973+
1974+
List<Event> expectedBinlog = new ArrayList<>();
1975+
try (Connection connection = inventoryDatabase.getJdbcConnection();
1976+
Statement statement = connection.createStatement()) {
1977+
expectedBinlog.addAll(executeAlterAndProvideExpected(tableId, statement));
1978+
1979+
RowType rowType =
1980+
RowType.of(
1981+
new DataType[] {
1982+
DataTypes.INT().notNull(),
1983+
DataTypes.VARCHAR(255).notNull(),
1984+
DataTypes.FLOAT(),
1985+
DataTypes.VARCHAR(45),
1986+
DataTypes.VARCHAR(55)
1987+
},
1988+
new String[] {"id", "name", "weight", "col1", "col2"});
1989+
BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType);
1990+
1991+
// Insert data into the captured table (products)
1992+
statement.execute(
1993+
String.format(
1994+
"INSERT INTO `%s`.`products` VALUES (default,'scooter',5.5,'c-10','c-20');",
1995+
inventoryDatabase.getDatabaseName())); // 110
1996+
expectedBinlog.add(
1997+
DataChangeEvent.insertEvent(
1998+
tableId,
1999+
generator.generate(
2000+
new Object[] {
2001+
110,
2002+
BinaryStringData.fromString("scooter"),
2003+
5.5f,
2004+
BinaryStringData.fromString("c-10"),
2005+
BinaryStringData.fromString("c-20")
2006+
})));
2007+
2008+
// Insert data into a NON-captured table (orders) - these should be skipped
2009+
// at the deserialization level and should NOT affect the captured table events
2010+
statement.execute(
2011+
String.format(
2012+
"INSERT INTO `%s`.`orders` VALUES (default, '2023-01-01', 1001, 5, 102);",
2013+
inventoryDatabase.getDatabaseName()));
2014+
statement.execute(
2015+
String.format(
2016+
"INSERT INTO `%s`.`orders` VALUES (default, '2023-01-02', 1002, 3, 103);",
2017+
inventoryDatabase.getDatabaseName()));
2018+
statement.execute(
2019+
String.format(
2020+
"UPDATE `%s`.`orders` SET `quantity`=10 WHERE `order_number`=10001;",
2021+
inventoryDatabase.getDatabaseName()));
2022+
2023+
// Insert more data into the captured table (products)
2024+
statement.execute(
2025+
String.format(
2026+
"INSERT INTO `%s`.`products` VALUES (default,'football',6.6,'c-11','c-21');",
2027+
inventoryDatabase.getDatabaseName())); // 111
2028+
expectedBinlog.add(
2029+
DataChangeEvent.insertEvent(
2030+
tableId,
2031+
generator.generate(
2032+
new Object[] {
2033+
111,
2034+
BinaryStringData.fromString("football"),
2035+
6.6f,
2036+
BinaryStringData.fromString("c-11"),
2037+
BinaryStringData.fromString("c-21")
2038+
})));
2039+
2040+
// More non-captured table changes
2041+
statement.execute(
2042+
String.format(
2043+
"DELETE FROM `%s`.`orders` WHERE `order_number`=10005;",
2044+
inventoryDatabase.getDatabaseName()));
2045+
2046+
// Update captured table
2047+
statement.execute(
2048+
String.format(
2049+
"UPDATE `%s`.`products` SET `col1`='c-12', `col2`='c-22' WHERE id=110;",
2050+
inventoryDatabase.getDatabaseName()));
2051+
expectedBinlog.add(
2052+
DataChangeEvent.updateEvent(
2053+
tableId,
2054+
generator.generate(
2055+
new Object[] {
2056+
110,
2057+
BinaryStringData.fromString("scooter"),
2058+
5.5f,
2059+
BinaryStringData.fromString("c-10"),
2060+
BinaryStringData.fromString("c-20")
2061+
}),
2062+
generator.generate(
2063+
new Object[] {
2064+
110,
2065+
BinaryStringData.fromString("scooter"),
2066+
5.5f,
2067+
BinaryStringData.fromString("c-12"),
2068+
BinaryStringData.fromString("c-22")
2069+
})));
2070+
2071+
// Delete from captured table
2072+
statement.execute(
2073+
String.format(
2074+
"DELETE FROM `%s`.`products` WHERE `id` = 111;",
2075+
inventoryDatabase.getDatabaseName()));
2076+
expectedBinlog.add(
2077+
DataChangeEvent.deleteEvent(
2078+
tableId,
2079+
generator.generate(
2080+
new Object[] {
2081+
111,
2082+
BinaryStringData.fromString("football"),
2083+
6.6f,
2084+
BinaryStringData.fromString("c-11"),
2085+
BinaryStringData.fromString("c-21")
2086+
})));
2087+
}
2088+
2089+
// Verify: only captured table events should be received
2090+
List<Event> actual =
2091+
fetchResultsExcept(
2092+
events, expectedSnapshot.size() + expectedBinlog.size(), createTableEvent);
2093+
assertThat(actual.subList(0, expectedSnapshot.size()))
2094+
.containsExactlyInAnyOrder(expectedSnapshot.toArray(new Event[0]));
2095+
assertThat(actual.subList(expectedSnapshot.size(), actual.size()))
2096+
.isEqualTo(expectedBinlog);
2097+
}
2098+
19322099
/**
19332100
* * The final schema of table products is as follows.
19342101
*

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/FilteringRowsEventDataDeserializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public T deserialize(ByteArrayInputStream inputStream) throws IOException {
8383

8484
if (!tableFilter.test(tableId)) {
8585
// Table is NOT captured, skip deserialization by returning empty EventData
86-
LOGGER.trace(
86+
LOGGER.warn(
8787
"Skipping deserialization for non-captured table: {}.{}", database, table);
8888
return createEmptyEventData(tableNumber);
8989
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ public Event nextEvent(ByteArrayInputStream inputStream) throws IOException {
367367
.setMayContainExtraInformation(true);
368368

369369
if (onlyDeserializeCapturedTablesChangelog && capturedTableFilter != null) {
370-
LOGGER.debug(
370+
LOGGER.warn(
371371
"Only deserializing changelog events for captured tables is enabled. "
372372
+ "Non-captured table row events will skip deserialization.");
373373
eventDeserializer.setEventDataDeserializer(

0 commit comments

Comments
 (0)