From f5ea0243d5a2596cb5de0a897c18feed69da2957 Mon Sep 17 00:00:00 2001 From: xiayuxiao Date: Tue, 3 Dec 2024 16:49:36 +0800 Subject: [PATCH 1/2] [FLINK-36165][source-connector/mysql] Support capturing snapshot data with conditions [FLINK-36165][source-connector/mysql] add docs [FLINK-36165][source-connector/mysql] Implement snapshot filter for MySQL table source [FLINK-36165][source-connector/mysql] Escape dot [FLINK-36165 ] fixed supported escape like 'city != 'China:beijing'' --- .../connectors/flink-sources/mysql-cdc.md | 7 + .../connectors/pipeline-connectors/mysql.md | 11 ++ .../mysql/factory/MySqlDataSourceFactory.java | 26 +++ .../mysql/source/MySqlDataSourceOptions.java | 11 ++ .../source/MySqlDataSourceFactoryTest.java | 34 ++++ .../task/MySqlSnapshotSplitReadTask.java | 15 +- .../mysql/source/MySqlSourceBuilder.java | 9 + .../source/assigners/MySqlChunkSplitter.java | 21 ++- .../source/config/MySqlSourceConfig.java | 9 +- .../config/MySqlSourceConfigFactory.java | 22 ++- .../source/config/MySqlSourceOptions.java | 9 + .../source/utils/SnapshotFilterUtils.java | 71 ++++++++ .../mysql/source/utils/StatementUtils.java | 66 +++++-- .../mysql/table/MySqlTableSource.java | 21 ++- .../mysql/table/MySqlTableSourceFactory.java | 5 +- .../mysql/source/MySqlSourceITCase.java | 163 +++++++++++++++++- .../source/utils/SnapshotFilterUtilsTest.java | 47 +++++ .../table/MySqlTableSourceFactoryTest.java | 34 ++-- .../src/test/resources/ddl/customer.sql | 6 +- 19 files changed, 540 insertions(+), 47 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtils.java create mode 100644 flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtilsTest.java diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md b/docs/content/docs/connectors/flink-sources/mysql-cdc.md index 80247008f50..5d73a3f348e 100644 --- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md @@ -391,6 +391,13 @@ During a snapshot operation, the connector will query each included table to pro hex: The binary data type is converted to a hexadecimal string and transmitted. The default value is none. Depending on your requirements and data types, you can choose the appropriate processing mode. If your database contains a large number of binary data types, it is recommended to use base64 or hex mode to make it easier to handle during transmission. + + scan.snapshot.filter + optional + (none) + String + When reading a table snapshot, the rows of captured tables will be filtered using the specified filter expression (AKA a SQL WHERE clause). By default, no filter is applied, meaning the entire table will be synchronized. e.g. `id > 100`. + diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md index 3cc5c17a100..52a63a52503 100644 --- a/docs/content/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md @@ -293,6 +293,17 @@ pipeline: scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase. + + scan.snapshot.filters + optional + (none) + String + When reading a table snapshot, the rows of captured tables will be filtered using the specified filter expression (AKA a SQL WHERE clause).
+ By default, no filter is applied, meaning the entire table will be synchronized.
+ A colon (:) separates table name and filter expression, while a semicolon (;) separate multiple filters, + e.g. `db1.user_table_[0-9]+:id > 100;db[1-2].[app|web]_order_\\.*:id < 0;`. + + diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index 118e8cdb19f..a68437f01d2 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -70,6 +70,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FILTERS; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET; @@ -235,6 +236,30 @@ public DataSource createDataSource(Context context) { configFactory.chunkKeyColumn(chunkKeyColumnMap); } + String snapshotFilters = config.get(SCAN_SNAPSHOT_FILTERS); + if (snapshotFilters != null) { + Map snapshotFilterMap = new HashMap<>(); + for (String snapshotFilter : snapshotFilters.split("(?> optionalOptions() { options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); options.add(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED); + options.add(SCAN_SNAPSHOT_FILTERS); return options; } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index 580d370b5aa..9c7afde6b23 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -272,4 +272,15 @@ public class MySqlDataSourceOptions { + "The difference between scan.newly-added-table.enabled and scan.binlog.newly-added-table.enabled options is: \n" + "scan.newly-added-table.enabled: do re-snapshot & binlog-reading for newly added table when restored; \n" + "scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase."); + + @Experimental + public static final ConfigOption SCAN_SNAPSHOT_FILTERS = + ConfigOptions.key("scan.snapshot.filters") + .stringType() + .noDefaultValue() + .withDescription( + "When reading a table snapshot, the rows of captured tables will be filtered using the specified filter expression (AKA a SQL WHERE clause). " + + "By default, no filter is applied, meaning the entire table will be synchronized. " + + "A colon (:) separates table name and filter expression, while a semicolon (;) separate multiple filters, " + + "e.g. `db1.user_table_[0-9]+:id > 100;db[1-2].[app|web]_order_\\.*:id < 0;`."); } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java index 277bb74ac72..76876a76f29 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java @@ -41,6 +41,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FILTERS; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME; @@ -298,6 +299,39 @@ public void testAddChunkKeyColumns() { }); } + @Test + public void testAddSnapshotFilters() { + inventoryDatabase.createAndInitialize(); + Map options = new HashMap<>(); + options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost()); + options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + options.put(USERNAME.key(), TEST_USER); + options.put(PASSWORD.key(), TEST_PASSWORD); + options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".\\.*"); + options.put( + SCAN_SNAPSHOT_FILTERS.key(), + inventoryDatabase.getDatabaseName() + + ".multi_max_\\.*:id > 200;" + + inventoryDatabase.getDatabaseName() + + ".products:1 = 0;"); + Factory.Context context = new MockContext(Configuration.fromMap(options)); + + MySqlDataSourceFactory factory = new MySqlDataSourceFactory(); + MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context); + + assertThat(dataSource.getSourceConfig().getSnapshotFilters()) + .isNotEmpty() + .isEqualTo( + new HashMap() { + { + put( + inventoryDatabase.getDatabaseName() + ".multi_max_\\.*", + "id > 200"); + put(inventoryDatabase.getDatabaseName() + ".products", "1 = 0"); + } + }); + } + class MockContext implements Factory.Context { Configuration factoryConfiguration; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java index f729f59d05f..c907d477780 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java @@ -24,6 +24,7 @@ import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; +import org.apache.flink.cdc.connectors.mysql.source.utils.SnapshotFilterUtils; import org.apache.flink.cdc.connectors.mysql.source.utils.StatementUtils; import org.apache.flink.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks; @@ -242,12 +243,24 @@ private void createDataEventsForTable( long exportStart = clock.currentTimeInMillis(); LOG.info("Exporting data from split '{}' of table {}", snapshotSplit.splitId(), table.id()); + String filter = + SnapshotFilterUtils.getSnapshotFilter( + sourceConfig.getSnapshotFilters(), table.id()); + if (filter != null) { + LOG.info( + "Filter for split '{}' of table {} is: {}", + snapshotSplit.splitId(), + table.id(), + filter); + } + final String selectSql = StatementUtils.buildSplitScanQuery( snapshotSplit.getTableId(), snapshotSplit.getSplitKeyType(), snapshotSplit.getSplitStart() == null, - snapshotSplit.getSplitEnd() == null); + snapshotSplit.getSplitEnd() == null, + filter); LOG.info( "For split '{}' of table {} using select statement: '{}'", snapshotSplit.splitId(), diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java index c03aa36b056..b53646a1c3e 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java @@ -268,6 +268,15 @@ public MySqlSourceBuilder closeIdleReaders(boolean closeIdleReaders) { return this; } + /** + * When reading a table snapshot, the rows of captured tables will be filtered using the + * specified filter expression (AKA a SQL WHERE clause). + */ + public MySqlSourceBuilder snapshotFilters(String table, String filter) { + this.configFactory.snapshotFilters(table, filter); + return this; + } + /** * Build the {@link MySqlSource}. * diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java index d22cc555f9f..c0243b20783 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java @@ -26,6 +26,7 @@ import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; import org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils; import org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtils; +import org.apache.flink.cdc.connectors.mysql.source.utils.SnapshotFilterUtils; import org.apache.flink.cdc.connectors.mysql.source.utils.StatementUtils; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalTypeRoot; @@ -149,9 +150,13 @@ private void analyzeTable(MySqlPartition partition, TableId tableId) { ChunkUtils.getChunkKeyColumn( currentSplittingTable, sourceConfig.getChunkKeyColumns()); splitType = ChunkUtils.getChunkKeyColumnType(splitColumn); + String filter = + SnapshotFilterUtils.getSnapshotFilter( + sourceConfig.getSnapshotFilters(), tableId); minMaxOfSplitColumn = - StatementUtils.queryMinMax(jdbcConnection, tableId, splitColumn.name()); - approximateRowCnt = StatementUtils.queryApproximateRowCnt(jdbcConnection, tableId); + StatementUtils.queryMinMax(jdbcConnection, tableId, splitColumn.name(), filter); + approximateRowCnt = + StatementUtils.queryRowCnt(jdbcConnection, tableId, splitColumn.name(), filter); } catch (Exception e) { throw new RuntimeException("Fail to analyze table in chunk splitter.", e); } @@ -169,6 +174,8 @@ private MySqlSnapshotSplit splitOneUnevenlySizedChunk(MySqlPartition partition, nextChunkStart == ChunkSplitterState.ChunkBound.START_BOUND ? "null" : chunkStartVal.toString()); + String filter = + SnapshotFilterUtils.getSnapshotFilter(sourceConfig.getSnapshotFilters(), tableId); // we start from [null, min + chunk_size) and avoid [null, min) Object chunkEnd = nextChunkEnd( @@ -179,7 +186,8 @@ private MySqlSnapshotSplit splitOneUnevenlySizedChunk(MySqlPartition partition, tableId, splitColumn.name(), minMaxOfSplitColumn[1], - chunkSize); + chunkSize, + filter); // may sleep a while to avoid DDOS on MySQL server maySleep(nextChunkId, tableId); if (chunkEnd != null && ObjectUtils.compare(chunkEnd, minMaxOfSplitColumn[1]) <= 0) { @@ -325,16 +333,17 @@ private Object nextChunkEnd( TableId tableId, String splitColumnName, Object max, - int chunkSize) + int chunkSize, + @Nullable String filter) throws SQLException { // chunk end might be null when max values are removed Object chunkEnd = StatementUtils.queryNextChunkMax( - jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd); + jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd, filter); if (Objects.equals(previousChunkEnd, chunkEnd)) { // we don't allow equal chunk start and end, // should query the next one larger than chunkEnd - chunkEnd = StatementUtils.queryMin(jdbc, tableId, splitColumnName, chunkEnd); + chunkEnd = StatementUtils.queryMin(jdbc, tableId, splitColumnName, chunkEnd, filter); // queryMin will return null when the chunkEnd is the max value, // this will happen when the mysql table ignores the capitalization. diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index dd0ac789666..114093d91d3 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -66,6 +66,7 @@ public class MySqlSourceConfig implements Serializable { private final Properties jdbcProperties; private final Map chunkKeyColumns; private final boolean skipSnapshotBackfill; + private final Map snapshotFilters; // -------------------------------------------------------------------------------------------- // Debezium Configurations @@ -99,7 +100,8 @@ public class MySqlSourceConfig implements Serializable { Properties dbzProperties, Properties jdbcProperties, Map chunkKeyColumns, - boolean skipSnapshotBackfill) { + boolean skipSnapshotBackfill, + Map snapshotFilters) { this.hostname = checkNotNull(hostname); this.port = port; this.username = checkNotNull(username); @@ -127,6 +129,7 @@ public class MySqlSourceConfig implements Serializable { this.jdbcProperties = jdbcProperties; this.chunkKeyColumns = chunkKeyColumns; this.skipSnapshotBackfill = skipSnapshotBackfill; + this.snapshotFilters = snapshotFilters; } public String getHostname() { @@ -254,4 +257,8 @@ public Map getChunkKeyColumns() { public boolean isSkipSnapshotBackfill() { return skipSnapshotBackfill; } + + public Map getSnapshotFilters() { + return snapshotFilters; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 8b65055ca13..3524253d696 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -70,6 +70,7 @@ public class MySqlSourceConfigFactory implements Serializable { private Properties dbzProperties; private Map chunkKeyColumns = new HashMap<>(); private boolean skipSnapshotBackfill = false; + private Map snapshotFilters = new HashMap<>(); public MySqlSourceConfigFactory hostname(String hostname) { this.hostname = hostname; @@ -291,6 +292,24 @@ public MySqlSourceConfigFactory closeIdleReaders(boolean closeIdleReaders) { return this; } + /** + * When reading a table snapshot, the rows of captured tables will be filtered using the + * specified filter expression (AKA a SQL WHERE clause). + */ + public MySqlSourceConfigFactory snapshotFilters(String table, String filter) { + this.snapshotFilters.put(table, filter); + return this; + } + + /** + * When reading a table snapshot, the rows of captured tables will be filtered using the + * specified filter expression (AKA a SQL WHERE clause). + */ + public MySqlSourceConfigFactory snapshotFilters(Map snapshotFilters) { + this.snapshotFilters.putAll(snapshotFilters); + return this; + } + /** Creates a new {@link MySqlSourceConfig} for the given subtask {@code subtaskId}. */ public MySqlSourceConfig createConfig(int subtaskId) { // hard code server name, because we don't need to distinguish it, docs: @@ -384,6 +403,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { props, jdbcProperties, chunkKeyColumns, - skipSnapshotBackfill); + skipSnapshotBackfill, + snapshotFilters); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java index f3424c8dfb9..a6722822c16 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java @@ -262,4 +262,13 @@ public class MySqlSourceOptions { .defaultValue(false) .withDescription( "Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in binlog reading phase instead of being merged into the snapshot. WARNING: Skipping backfill might lead to data inconsistency because some binlog events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed binlog events should be handled specially."); + + @Experimental + public static final ConfigOption SCAN_SNAPSHOT_FILTER = + ConfigOptions.key("scan.snapshot.filter") + .stringType() + .noDefaultValue() + .withDescription( + "When reading a table snapshot, the rows of captured tables will be filtered using the specified filter expression (AKA a SQL WHERE clause). " + + "By default, no filter is applied, meaning the entire table will be synchronized. e.g. `id > 100`"); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtils.java new file mode 100644 index 00000000000..7ad16d5beab --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtils.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source.utils; + +import org.apache.flink.cdc.common.schema.Selectors; + +import io.debezium.relational.TableId; + +import java.util.HashMap; +import java.util.Map; + +/** Utilities to filter snapshot of table. */ +public class SnapshotFilterUtils { + + private SnapshotFilterUtils() {} + + private static final Map, Map> cache = new HashMap<>(); + + /** + * Don't worry about atomicity. We don't need to use the synchronized keyword to ensure thread + * safety here. Since filters come from the source configuration, they shouldn’t be changed + * during runtime. So the result will always be idempotent. + */ + private static Map toSelector(Map filters) { + Map cached = cache.get(filters); + if (cached != null) { + return cached; + } + + Map snapshotFilters = new HashMap<>(); + filters.forEach( + (table, filter) -> { + Selectors selector = + new Selectors.SelectorsBuilder().includeTables(table).build(); + snapshotFilters.put(selector, filter); + }); + cache.put(filters, snapshotFilters); + + return snapshotFilters; + } + + public static String getSnapshotFilter(Map filters, TableId tableId) { + Map snapshotFilters = toSelector(filters); + + String filter = null; + for (Selectors selector : snapshotFilters.keySet()) { + if (selector.isMatch( + org.apache.flink.cdc.common.event.TableId.tableId( + tableId.catalog(), tableId.table()))) { + filter = snapshotFilters.get(selector); + break; + } + } + return filter; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java index 6e621e0a5fb..b0d9a42c602 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java @@ -22,6 +22,8 @@ import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.TableId; +import javax.annotation.Nullable; + import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -36,12 +38,15 @@ public class StatementUtils { private StatementUtils() {} - public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName) + public static Object[] queryMinMax( + JdbcConnection jdbc, TableId tableId, String columnName, @Nullable String filter) throws SQLException { final String minMaxQuery = String.format( "SELECT MIN(%s), MAX(%s) FROM %s", - quote(columnName), quote(columnName), quote(tableId)); + quote(columnName), + quote(columnName), + filter != null ? quote(tableId) + " WHERE " + filter : quote(tableId)); return jdbc.queryAndMap( minMaxQuery, rs -> { @@ -56,6 +61,31 @@ public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String }); } + public static Long queryRowCnt( + JdbcConnection jdbc, TableId tableId, String columnName, @Nullable String filter) + throws SQLException { + + if (filter == null) { + return queryApproximateRowCnt(jdbc, tableId); + } + + final String cntQuery = + String.format( + "SELECT COUNT(%s) FROM %s WHERE %s", + quote(columnName), quote(tableId), filter); + return jdbc.queryAndMap( + cntQuery, + rs -> { + if (!rs.next()) { + // this should never happen + throw new SQLException( + String.format( + "No result returned after running query [%s]", cntQuery)); + } + return rs.getLong(1); + }); + } + public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException { // The statement used to get approximate row count which is less @@ -77,12 +107,18 @@ public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) } public static Object queryMin( - JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound) + JdbcConnection jdbc, + TableId tableId, + String columnName, + Object excludedLowerBound, + @Nullable String filter) throws SQLException { final String minQuery = String.format( "SELECT MIN(%s) FROM %s WHERE %s > ?", - quote(columnName), quote(tableId), quote(columnName)); + quote(columnName), + quote(tableId), + filter != null ? filter + " AND " + quote(columnName) : quote(columnName)); return jdbc.prepareQueryAndMap( minQuery, ps -> ps.setObject(1, excludedLowerBound), @@ -102,7 +138,8 @@ public static Object queryNextChunkMax( TableId tableId, String splitColumnName, int chunkSize, - Object includedLowerBound) + Object includedLowerBound, + @Nullable String filter) throws SQLException { String quotedColumn = quote(splitColumnName); String query = @@ -113,7 +150,7 @@ public static Object queryNextChunkMax( quotedColumn, quotedColumn, quote(tableId), - quotedColumn, + filter != null ? filter + " AND " + quotedColumn : quotedColumn, quotedColumn, chunkSize); return jdbc.prepareQueryAndMap( @@ -131,8 +168,12 @@ public static Object queryNextChunkMax( } public static String buildSplitScanQuery( - TableId tableId, RowType pkRowType, boolean isFirstSplit, boolean isLastSplit) { - return buildSplitQuery(tableId, pkRowType, isFirstSplit, isLastSplit, -1, true); + TableId tableId, + RowType pkRowType, + boolean isFirstSplit, + boolean isLastSplit, + @Nullable String filter) { + return buildSplitQuery(tableId, pkRowType, isFirstSplit, isLastSplit, -1, true, filter); } private static String buildSplitQuery( @@ -141,8 +182,9 @@ private static String buildSplitQuery( boolean isFirstSplit, boolean isLastSplit, int limitSize, - boolean isScanningData) { - final String condition; + boolean isScanningData, + @Nullable String filter) { + String condition; if (isFirstSplit && isLastSplit) { condition = null; @@ -172,6 +214,10 @@ private static String buildSplitQuery( condition = sql.toString(); } + if (filter != null) { + condition = condition == null ? filter : filter + " AND " + condition; + } + if (isScanningData) { return buildSelectWithRowLimits( tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java index 242e02da634..39306f491db 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java @@ -98,6 +98,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat private final Duration heartbeatInterval; private final String chunkKeyColumn; final boolean skipSnapshotBackFill; + private final String snapshotFilter; // -------------------------------------------------------------------------------------------- // Mutable attributes @@ -135,7 +136,8 @@ public MySqlTableSource( Properties jdbcProperties, Duration heartbeatInterval, @Nullable String chunkKeyColumn, - boolean skipSnapshotBackFill) { + boolean skipSnapshotBackFill, + @Nullable String snapshotFilter) { this.physicalSchema = physicalSchema; this.port = port; this.hostname = checkNotNull(hostname); @@ -165,6 +167,7 @@ public MySqlTableSource( this.heartbeatInterval = heartbeatInterval; this.chunkKeyColumn = chunkKeyColumn; this.skipSnapshotBackFill = skipSnapshotBackFill; + this.snapshotFilter = snapshotFilter; } @Override @@ -220,6 +223,9 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .heartbeatInterval(heartbeatInterval) .chunkKeyColumn(new ObjectPath(database, tableName), chunkKeyColumn) .skipSnapshotBackfill(skipSnapshotBackFill) + .snapshotFilters( + escapeDot(database) + "." + escapeDot(tableName), + snapshotFilter) .build(); return SourceProvider.of(parallelSource); } else { @@ -305,7 +311,8 @@ public DynamicTableSource copy() { jdbcProperties, heartbeatInterval, chunkKeyColumn, - skipSnapshotBackFill); + skipSnapshotBackFill, + snapshotFilter); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; @@ -347,7 +354,8 @@ public boolean equals(Object o) { && Objects.equals(jdbcProperties, that.jdbcProperties) && Objects.equals(heartbeatInterval, that.heartbeatInterval) && Objects.equals(chunkKeyColumn, that.chunkKeyColumn) - && Objects.equals(skipSnapshotBackFill, that.skipSnapshotBackFill); + && Objects.equals(skipSnapshotBackFill, that.skipSnapshotBackFill) + && Objects.equals(snapshotFilter, that.snapshotFilter); } @Override @@ -380,7 +388,8 @@ public int hashCode() { jdbcProperties, heartbeatInterval, chunkKeyColumn, - skipSnapshotBackFill); + skipSnapshotBackFill, + snapshotFilter); } @Override @@ -405,4 +414,8 @@ Properties getParallelDbzProperties(Properties dbzProperties) { } return newDbzProperties; } + + private String escapeDot(String str) { + return str.replace(".", "\\."); + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java index e435f946a7e..69cda36a52c 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java @@ -102,6 +102,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { config.get(MySqlSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); boolean skipSnapshotBackFill = config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); + String snapshotFilter = config.get(MySqlSourceOptions.SCAN_SNAPSHOT_FILTER); if (enableParallelRead) { validatePrimaryKeyIfEnableParallel(physicalSchema, chunkKeyColumn); @@ -145,7 +146,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { JdbcUrlUtils.getJdbcProperties(context.getCatalogTable().getOptions()), heartbeatInterval, chunkKeyColumn, - skipSnapshotBackFill); + skipSnapshotBackFill, + snapshotFilter); } @Override @@ -191,6 +193,7 @@ public Set> optionalOptions() { options.add(MySqlSourceOptions.HEARTBEAT_INTERVAL); options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN); options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); + options.add(MySqlSourceOptions.SCAN_SNAPSHOT_FILTER); return options; } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java index 7be090a8fef..29cfaa53ca7 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java @@ -141,7 +141,7 @@ public class MySqlSourceITCase extends MySqlSourceTestBase { "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", - "+I[2000, user_21, Shanghai, 123567891234]"); + "+I[2000, user_21, China:Shanghai, 123567891234]"); /** First part binlog events in string, which is made by {@link #makeFirstPartBinlogEvents}. */ private final List firstPartBinlogEvents = @@ -287,7 +287,7 @@ public void testSnapshotSplitReadingFailCrossCheckpoints() throws Exception { env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); // The sleeping source will sleep awhile after send per record - MySqlSource sleepingSource = buildSleepingSource(); + MySqlSource sleepingSource = buildSleepingSource(null); DataStreamSource source = env.fromSource(sleepingSource, WatermarkStrategy.noWatermarks(), "selfSource"); @@ -313,7 +313,7 @@ public void testSnapshotSplitReadingFailCrossCheckpoints() throws Exception { "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", - "+I[2000, user_21, Shanghai, 123567891234]" + "+I[2000, user_21, China:Shanghai, 123567891234]" }; TypeSerializer serializer = source.getTransformation().getOutputType().createSerializer(env.getConfig()); @@ -352,6 +352,150 @@ public void testSnapshotSplitReadingFailCrossCheckpoints() throws Exception { jobClient.cancel().get(); } + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testSnapshotFilters() throws Exception { + customDatabase.createAndInitialize(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(5000L); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + + // Filter user with `id > 200` + // The sleeping source will sleep awhile after send per record + MySqlSource sleepingSource = buildSleepingSource("id > 1000"); + DataStreamSource source = + env.fromSource(sleepingSource, WatermarkStrategy.noWatermarks(), "selfSource"); + + String[] expectedSnapshotData = + new String[] { + "+I[1009, user_10, Shanghai, 123567891234]", + "+I[1010, user_11, Shanghai, 123567891234]", + "+I[1011, user_12, Shanghai, 123567891234]", + "+I[1012, user_13, Shanghai, 123567891234]", + "+I[1013, user_14, Shanghai, 123567891234]", + "+I[1014, user_15, Shanghai, 123567891234]", + "+I[1015, user_16, Shanghai, 123567891234]", + "+I[1016, user_17, Shanghai, 123567891234]", + "+I[1017, user_18, Shanghai, 123567891234]", + "+I[1018, user_19, Shanghai, 123567891234]", + "+I[1019, user_20, Shanghai, 123567891234]", + "+I[2000, user_21, China:Shanghai, 123567891234]" + }; + TypeSerializer serializer = + source.getTransformation().getOutputType().createSerializer(env.getConfig()); + String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); + CollectSinkOperatorFactory factory = + new CollectSinkOperatorFactory(serializer, accumulatorName); + CollectSinkOperator operator = (CollectSinkOperator) factory.getOperator(); + CollectResultIterator iterator = + new CollectResultIterator( + operator.getOperatorIdFuture(), + serializer, + accumulatorName, + env.getCheckpointConfig(), + 10000L); + CollectStreamSink sink = new CollectStreamSink(source, factory); + sink.name("Data stream collect sink"); + env.addOperator(sink.getTransformation()); + JobClient jobClient = env.executeAsync("snapshotSplitTest"); + iterator.setJobClient(jobClient); + JobID jobId = jobClient.getJobID(); + + // Trigger failover once some snapshot records has been sent by sleeping source + if (iterator.hasNext()) { + triggerFailover( + FailoverType.JM, + jobId, + miniClusterResource.getMiniCluster(), + () -> sleepMs(100)); + } + + // Check all snapshot records are sent with exactly-once semantics + // List xx = fetchRowData(iterator, expectedSnapshotData.length); + assertEqualsInAnyOrder( + Arrays.asList(expectedSnapshotData), + fetchRowData(iterator, expectedSnapshotData.length)); + assertTrue(!hasNextData(iterator)); + jobClient.cancel().get(); + } + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testSnapshotFiltersEscape() throws Exception { + customDatabase.createAndInitialize(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(5000L); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + + // Filter user with `id > 200` + // The sleeping source will sleep awhile after send per record + MySqlSource sleepingSource = buildSleepingSource("address != 'China:Shanghai'"); + DataStreamSource source = + env.fromSource(sleepingSource, WatermarkStrategy.noWatermarks(), "selfSource"); + + String[] expectedSnapshotData = + new String[] { + "+I[101, user_1, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "+I[103, user_3, Shanghai, 123567891234]", + "+I[109, user_4, Shanghai, 123567891234]", + "+I[110, user_5, Shanghai, 123567891234]", + "+I[111, user_6, Shanghai, 123567891234]", + "+I[118, user_7, Shanghai, 123567891234]", + "+I[121, user_8, Shanghai, 123567891234]", + "+I[123, user_9, Shanghai, 123567891234]", + "+I[1009, user_10, Shanghai, 123567891234]", + "+I[1010, user_11, Shanghai, 123567891234]", + "+I[1011, user_12, Shanghai, 123567891234]", + "+I[1012, user_13, Shanghai, 123567891234]", + "+I[1013, user_14, Shanghai, 123567891234]", + "+I[1014, user_15, Shanghai, 123567891234]", + "+I[1015, user_16, Shanghai, 123567891234]", + "+I[1016, user_17, Shanghai, 123567891234]", + "+I[1017, user_18, Shanghai, 123567891234]", + "+I[1018, user_19, Shanghai, 123567891234]", + "+I[1019, user_20, Shanghai, 123567891234]" + }; + TypeSerializer serializer = + source.getTransformation().getOutputType().createSerializer(env.getConfig()); + String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); + CollectSinkOperatorFactory factory = + new CollectSinkOperatorFactory(serializer, accumulatorName); + CollectSinkOperator operator = (CollectSinkOperator) factory.getOperator(); + CollectResultIterator iterator = + new CollectResultIterator( + operator.getOperatorIdFuture(), + serializer, + accumulatorName, + env.getCheckpointConfig(), + 10000L); + CollectStreamSink sink = new CollectStreamSink(source, factory); + sink.name("Data stream collect sink"); + env.addOperator(sink.getTransformation()); + JobClient jobClient = env.executeAsync("snapshotSplitTest"); + iterator.setJobClient(jobClient); + JobID jobId = jobClient.getJobID(); + + // Trigger failover once some snapshot records has been sent by sleeping source + if (iterator.hasNext()) { + triggerFailover( + FailoverType.JM, + jobId, + miniClusterResource.getMiniCluster(), + () -> sleepMs(100)); + } + + // Check all snapshot records are sent with exactly-once semantics + // List xx = fetchRowData(iterator, expectedSnapshotData.length); + assertEqualsInAnyOrder( + Arrays.asList(expectedSnapshotData), + fetchRowData(iterator, expectedSnapshotData.length)); + assertTrue(!hasNextData(iterator)); + jobClient.cancel().get(); + } + @Test public void testStartFromEarliestOffset() throws Exception { List expected = new ArrayList<>(); @@ -392,7 +536,7 @@ public void testSnapshotOnlyModeWithDMLPostHighWaterMark() throws Exception { "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", - "+I[2000, user_21, Shanghai, 123567891234]"); + "+I[2000, user_21, China:Shanghai, 123567891234]"); assertEqualsInAnyOrder(expectedRecords, records); } @@ -528,9 +672,9 @@ public void testSkipBackfillWithDMLPreHighWaterMark() throws Exception { "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", - "+I[2000, user_21, Shanghai, 123567891234]", + "+I[2000, user_21, China:Shanghai, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]", - "-U[2000, user_21, Shanghai, 123567891234]", + "-U[2000, user_21, China:Shanghai, 123567891234]", "+U[2000, user_21, Pittsburgh, 123567891234]", "-D[1019, user_20, Shanghai, 123567891234]"); // when skip backfill, the wal log between (snapshot, high_watermark) will be seen as @@ -569,7 +713,7 @@ public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception { "+I[2000, user_21, Pittsburgh, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]", - "-U[2000, user_21, Shanghai, 123567891234]", + "-U[2000, user_21, China:Shanghai, 123567891234]", "+U[2000, user_21, Pittsburgh, 123567891234]", "-D[1019, user_20, Shanghai, 123567891234]"); // when skip backfill, the wal log between (snapshot, high_watermark) will still be @@ -816,7 +960,7 @@ private CollectResultIterator addCollector( return iterator; } - private MySqlSource buildSleepingSource() { + private MySqlSource buildSleepingSource(String snapshotFilter) { ResolvedSchema physicalSchema = new ResolvedSchema( Arrays.asList( @@ -868,6 +1012,7 @@ private MySqlSource buildSleepingSource() { .chunkKeyColumn( new ObjectPath(customDatabase.getDatabaseName(), tableName), chunkColumnName) + .snapshotFilters(customDatabase.getDatabaseName() + "." + tableName, snapshotFilter) .build(); } @@ -1009,7 +1154,7 @@ private void checkSnapshotData( "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", - "+I[2000, user_21, Shanghai, 123567891234]" + "+I[2000, user_21, China:Shanghai, 123567891234]" }; List expectedSnapshotData = new ArrayList<>(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtilsTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtilsTest.java new file mode 100644 index 00000000000..1df3c2f4ed3 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtilsTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source.utils; + +import io.debezium.relational.TableId; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +/** Unit test for {@link org.apache.flink.cdc.connectors.mysql.source.utils.SnapshotFilterUtils}. */ +public class SnapshotFilterUtilsTest { + + @Test + public void test() { + Map map = new HashMap<>(); + map.put("db.user", "id > 100"); + map.put("db.order_[0-9]+", "id > 200"); + assertEquals( + "id > 100", SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.user"))); + assertEquals( + "id > 200", + SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.order_1"))); + assertEquals( + "id > 200", + SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.order_2"))); + assertNull(SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.shop"))); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java index 8339a1db544..7c642a5d1fa 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java @@ -127,7 +127,8 @@ public void testCommonProperties() { new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null, - SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + null); assertEquals(expectedSource, actualSource); } @@ -173,7 +174,8 @@ public void testEnableParallelReadSource() { new Properties(), HEARTBEAT_INTERVAL.defaultValue(), "testCol", - SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + null); assertEquals(expectedSource, actualSource); } @@ -215,7 +217,8 @@ public void testEnableParallelReadSourceWithSingleServerId() { new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null, - SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + null); assertEquals(expectedSource, actualSource); } @@ -255,7 +258,8 @@ public void testEnableParallelReadSourceLatestOffset() { new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null, - SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + null); assertEquals(expectedSource, actualSource); } @@ -275,6 +279,7 @@ public void testOptionalProperties() { options.put("scan.incremental.snapshot.chunk.key-column", "testCol"); options.put("scan.incremental.close-idle-reader.enabled", "true"); options.put("scan.incremental.snapshot.backfill.skip", "true"); + options.put("scan.snapshot.filter", "id > 200"); DynamicTableSource actualSource = createTableSource(options); Properties dbzProperties = new Properties(); @@ -311,7 +316,8 @@ public void testOptionalProperties() { jdbcProperties, Duration.ofMillis(15213), "testCol", - true); + true, + "id > 200"); assertEquals(expectedSource, actualSource); assertTrue(actualSource instanceof MySqlTableSource); MySqlTableSource actualMySqlTableSource = (MySqlTableSource) actualSource; @@ -365,7 +371,8 @@ public void testStartupFromSpecificOffset() { new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null, - SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + null); assertEquals(expectedSource, actualSource); } @@ -403,7 +410,8 @@ public void testStartupFromInitial() { new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null, - SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + null); assertEquals(expectedSource, actualSource); } @@ -442,7 +450,8 @@ public void testStartupFromEarliestOffset() { new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null, - SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + null); assertEquals(expectedSource, actualSource); } @@ -482,7 +491,8 @@ public void testStartupFromSpecificTimestamp() { new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null, - SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + null); assertEquals(expectedSource, actualSource); } @@ -520,7 +530,8 @@ public void testStartupFromLatestOffset() { new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null, - SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + null); assertEquals(expectedSource, actualSource); } @@ -563,7 +574,8 @@ public void testMetadataColumns() { new Properties(), HEARTBEAT_INTERVAL.defaultValue(), null, - SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue()); + SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP.defaultValue(), + null); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name"); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql index e4df63f1a33..34daf3abaf5 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql @@ -46,7 +46,7 @@ VALUES (101,"user_1","Shanghai","123567891234"), (1017,"user_18","Shanghai","123567891234"), (1018,"user_19","Shanghai","123567891234"), (1019,"user_20","Shanghai","123567891234"), - (2000,"user_21","Shanghai","123567891234"); + (2000,"user_21","China:Shanghai","123567891234"); -- Create a table will not be read CREATE TABLE prefix_customers ( @@ -91,7 +91,7 @@ VALUES (101,"user_1","Shanghai","123567891234"), (1017,"user_18","Shanghai","123567891234"), (1018,"user_19","Shanghai","123567891234"), (1019,"user_20","Shanghai","123567891234"), - (2000,"user_21","Shanghai","123567891234"); + (2000,"user_21","China:Shanghai","123567891234"); -- create table whose split key is evenly distributed CREATE TABLE customers_even_dist ( @@ -179,7 +179,7 @@ VALUES (101,"user_1","Shanghai","123567891234"), (1017,"user_18","Shanghai","123567891234"), (1018,"user_19","Shanghai","123567891234"), (1019,"user_20","Shanghai","123567891234"), - (2000,"user_21","Shanghai","123567891234"); + (2000,"user_21","China:Shanghai","123567891234"); -- table has combined primary key CREATE TABLE customer_card ( From fd1722309efaeab4773abb6eea675b5b881cb183 Mon Sep 17 00:00:00 2001 From: ouyangwulin Date: Fri, 22 Aug 2025 21:04:07 +0800 Subject: [PATCH 2/2] fixed test fixed checkstyle fixed test fixed MySqlTableSourceFactoryTest test error. --- .../connectors/flink-sources/mysql-cdc.md | 24 +++ .../connectors/pipeline-connectors/mysql.md | 2 + .../source/config/MySqlSourceConfig.java | 1 - .../config/MySqlSourceConfigFactory.java | 5 +- .../mysql/table/MySqlTableSourceFactory.java | 5 +- .../reader/BinlogSplitReaderTest.java | 2 +- .../mysql/source/MySqlSourceITCase.java | 153 +++++++++++++++++- .../source/reader/MySqlSourceReaderTest.java | 4 +- .../source/utils/SnapshotFilterUtilsTest.java | 25 +-- .../table/MySqlTableSourceFactoryTest.java | 6 +- 10 files changed, 200 insertions(+), 27 deletions(-) diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md b/docs/content/docs/connectors/flink-sources/mysql-cdc.md index dc2ca8f7cdc..cfe38943737 100644 --- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md @@ -874,6 +874,30 @@ There are two places that need to be taken care of. * If no update operation is performed on the specified column, the exactly-once semantics is ensured. * If the update operation is performed on the specified column, only the at-least-once semantics is ensured. However, you can specify primary keys at downstream and perform the idempotence operation to ensure data correctness. +#### Warning + +Using a **non-primary key column** as the `scan.incremental.snapshot.chunk.key-column` for a MySQL table with primary keys may lead to data inconsistencies. Below is a scenario illustrating this issue and recommendations to mitigate potential problems. + +#### Problem Scenario + +- **Table Structure:** + - **Primary Key:** `id` + - **Chunk Key Column:** `pid` (Not a primary key) + +- **Snapshot Splits:** + - **Split 0:** `1 < pid <= 3` + - **Split 1:** `3 < pid <= 5` + +- **Operation:** + - Two different subtasks are reading Split 0 and Split 1 concurrently. + - An update operation changes `pid` from `2` to `4` for `id=0` while both splits are being read. This update occurs between the low and high watermark of both splits. + +- **Result:** + - **Split 0:** Contains the record `[id=0, pid=2]` + - **Split 1:** Contains the record `[id=0, pid=4]` + +Since the order of processing these records cannot be guaranteed, the final value of `pid` for `id=0` may end up being either `2` or `4`, leading to potential data inconsistencies. + ### About converting binary type data to base64 encoded data ```sql diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md index 35c64262272..c7eab6cb786 100644 --- a/docs/content/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md @@ -620,6 +620,8 @@ Notice: TEXT
MEDIUMTEXT
LONGTEXT
+ LONG
+ LONG VARCHAR
STRING diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index 80a4ae25df8..ac2f6eb3538 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -110,7 +110,6 @@ public class MySqlSourceConfig implements Serializable { boolean treatTinyInt1AsBoolean, boolean useLegacyJsonFormat, boolean assignUnboundedChunkFirst, - boolean skipSnapshotBackfill, Map snapshotFilters) { this.hostname = checkNotNull(hostname); this.port = port; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index a65b7b7a4f0..f100c872c57 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -437,11 +437,10 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { jdbcProperties, chunkKeyColumns, skipSnapshotBackfill, - snapshotFilters, - skipSnapshotBackfill, parseOnLineSchemaChanges, treatTinyInt1AsBoolean, useLegacyJsonFormat, - assignUnboundedChunkFirst); + assignUnboundedChunkFirst, + snapshotFilters); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java index 863664294ec..87ddfb364f8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java @@ -154,12 +154,11 @@ public DynamicTableSource createDynamicTableSource(Context context) { heartbeatInterval, chunkKeyColumn, skipSnapshotBackFill, - snapshotFilter, - skipSnapshotBackFill, parseOnLineSchemaChanges, useLegacyJsonFormat, assignUnboundedChunkFirst, - appendOnly); + appendOnly, + snapshotFilter); } @Override diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java index e9205d2c5d5..2b6536a7093 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java @@ -570,7 +570,7 @@ void testReadBinlogFromEarliestOffset() throws Exception { "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", - "+I[2000, user_21, Shanghai, 123567891234]", + "+I[2000, user_21, China:Shanghai, 123567891234]", "-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java index 46b32708a1c..daa060e993a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java @@ -417,7 +417,7 @@ void testSnapshotSplitReadingFailCrossCheckpoints(String tableName, String chunk env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); // The sleeping source will sleep awhile after send per record - MySqlSource sleepingSource = buildSleepingSource(tableName, chunkColumnName); + MySqlSource sleepingSource = buildSleepingSource(tableName, chunkColumnName, null); DataStreamSource source = env.fromSource(sleepingSource, WatermarkStrategy.noWatermarks(), "selfSource"); @@ -711,6 +711,154 @@ void testSkipBackfillWithDMLPostLowWaterMark() throws Exception { assertEqualsInAnyOrder(expectedRecords, records); } + @ParameterizedTest + @MethodSource("parameters") + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testSnapshotFilters(String tableName, String chunkColumnName) throws Exception { + customDatabase.createAndInitialize(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(5000L); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + + // Filter user with `id > 200` + // The sleeping source will sleep awhile after send per record + MySqlSource sleepingSource = + buildSleepingSource(tableName, chunkColumnName, "id > 200"); + DataStreamSource source = + env.fromSource(sleepingSource, WatermarkStrategy.noWatermarks(), "selfSource"); + + String[] expectedSnapshotData = + new String[] { + "+I[1009, user_10, Shanghai, 123567891234]", + "+I[1010, user_11, Shanghai, 123567891234]", + "+I[1011, user_12, Shanghai, 123567891234]", + "+I[1012, user_13, Shanghai, 123567891234]", + "+I[1013, user_14, Shanghai, 123567891234]", + "+I[1014, user_15, Shanghai, 123567891234]", + "+I[1015, user_16, Shanghai, 123567891234]", + "+I[1016, user_17, Shanghai, 123567891234]", + "+I[1017, user_18, Shanghai, 123567891234]", + "+I[1018, user_19, Shanghai, 123567891234]", + "+I[1019, user_20, Shanghai, 123567891234]", + "+I[2000, user_21, China:Shanghai, 123567891234]" + }; + TypeSerializer serializer = + source.getTransformation().getOutputType().createSerializer(env.getConfig()); + String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); + CollectSinkOperatorFactory factory = + new CollectSinkOperatorFactory(serializer, accumulatorName); + CollectSinkOperator operator = (CollectSinkOperator) factory.getOperator(); + CollectResultIterator iterator = + new CollectResultIterator( + operator.getOperatorIdFuture(), + serializer, + accumulatorName, + env.getCheckpointConfig(), + 10000L); + CollectStreamSink sink = new CollectStreamSink(source, factory); + sink.name("Data stream collect sink"); + env.addOperator(sink.getTransformation()); + JobClient jobClient = env.executeAsync("snapshotSplitTest"); + iterator.setJobClient(jobClient); + JobID jobId = jobClient.getJobID(); + + // Trigger failover once some snapshot records has been sent by sleeping source + if (iterator.hasNext()) { + triggerFailover( + FailoverType.JM, + jobId, + miniClusterResource.get().getMiniCluster(), + () -> sleepMs(100)); + } + + // Check all snapshot records are sent with exactly-once semantics + assertEqualsInAnyOrder( + Arrays.asList(expectedSnapshotData), + fetchRowData(iterator, expectedSnapshotData.length)); + assertThat(hasNextData(iterator)).isFalse(); + + jobClient.cancel().get(); + } + + @ParameterizedTest + @MethodSource("parameters") + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testSnapshotFiltersEscape(String tableName, String chunkColumnName) + throws Exception { + customDatabase.createAndInitialize(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(5000L); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + + // Filter user with `id > 200` + // The sleeping source will sleep awhile after send per record + MySqlSource sleepingSource = + buildSleepingSource(tableName, chunkColumnName, "address != 'China:Shanghai'"); + DataStreamSource source = + env.fromSource(sleepingSource, WatermarkStrategy.noWatermarks(), "selfSource"); + + String[] expectedSnapshotData = + new String[] { + "+I[101, user_1, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "+I[103, user_3, Shanghai, 123567891234]", + "+I[109, user_4, Shanghai, 123567891234]", + "+I[110, user_5, Shanghai, 123567891234]", + "+I[111, user_6, Shanghai, 123567891234]", + "+I[118, user_7, Shanghai, 123567891234]", + "+I[121, user_8, Shanghai, 123567891234]", + "+I[123, user_9, Shanghai, 123567891234]", + "+I[1009, user_10, Shanghai, 123567891234]", + "+I[1010, user_11, Shanghai, 123567891234]", + "+I[1011, user_12, Shanghai, 123567891234]", + "+I[1012, user_13, Shanghai, 123567891234]", + "+I[1013, user_14, Shanghai, 123567891234]", + "+I[1014, user_15, Shanghai, 123567891234]", + "+I[1015, user_16, Shanghai, 123567891234]", + "+I[1016, user_17, Shanghai, 123567891234]", + "+I[1017, user_18, Shanghai, 123567891234]", + "+I[1018, user_19, Shanghai, 123567891234]", + "+I[1019, user_20, Shanghai, 123567891234]" + }; + TypeSerializer serializer = + source.getTransformation().getOutputType().createSerializer(env.getConfig()); + String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); + CollectSinkOperatorFactory factory = + new CollectSinkOperatorFactory(serializer, accumulatorName); + CollectSinkOperator operator = (CollectSinkOperator) factory.getOperator(); + CollectResultIterator iterator = + new CollectResultIterator( + operator.getOperatorIdFuture(), + serializer, + accumulatorName, + env.getCheckpointConfig(), + 10000L); + CollectStreamSink sink = new CollectStreamSink(source, factory); + sink.name("Data stream collect sink"); + env.addOperator(sink.getTransformation()); + JobClient jobClient = env.executeAsync("snapshotSplitTest"); + iterator.setJobClient(jobClient); + JobID jobId = jobClient.getJobID(); + + // Trigger failover once some snapshot records has been sent by sleeping source + if (iterator.hasNext()) { + triggerFailover( + FailoverType.JM, + jobId, + miniClusterResource.get().getMiniCluster(), + () -> sleepMs(100)); + } + + // Check all snapshot records are sent with exactly-once semantics + assertEqualsInAnyOrder( + Arrays.asList(expectedSnapshotData), + fetchRowData(iterator, expectedSnapshotData.length)); + assertThat(hasNextData(iterator)).isFalse(); + jobClient.cancel().get(); + } + private List testBackfillWhenWritingEvents( boolean skipSnapshotBackfill, int fetchSize, @@ -1020,7 +1168,8 @@ private CollectResultIterator addCollector( return iterator; } - private MySqlSource buildSleepingSource(String tableName, String chunkColumnName) { + private MySqlSource buildSleepingSource( + String tableName, String chunkColumnName, String snapshotFilter) { ResolvedSchema physicalSchema = new ResolvedSchema( Arrays.asList( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java index 7b8dfdcdbb2..bbe6bf1e4b4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java @@ -184,7 +184,7 @@ void testRemoveTableUsingStateFromSnapshotPhase() throws Exception { "+I[1016, user_17, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", - "+I[2000, user_21, Shanghai, 123567891234]" + "+I[2000, user_21, China:Shanghai, 123567891234]" }; // Step 2: wait the snapshot splits finished reading List actualRecords = consumeSnapshotRecords(reader, dataType); @@ -295,7 +295,7 @@ void testFinishedUnackedSplitsUsingStateFromSnapshotPhase(boolean skipBackFill) "+I[1016, user_17, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", - "+I[2000, user_21, Shanghai, 123567891234]" + "+I[2000, user_21, China:Shanghai, 123567891234]" }; // Step 2: wait the snapshot splits finished reading List actualRecords = consumeSnapshotRecords(reader, dataType); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtilsTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtilsTest.java index 1df3c2f4ed3..36f4d7f0a58 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtilsTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtilsTest.java @@ -18,13 +18,12 @@ package org.apache.flink.cdc.connectors.mysql.source.utils; import io.debezium.relational.TableId; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.HashMap; import java.util.Map; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; +import static org.assertj.core.api.Assertions.assertThat; /** Unit test for {@link org.apache.flink.cdc.connectors.mysql.source.utils.SnapshotFilterUtils}. */ public class SnapshotFilterUtilsTest { @@ -34,14 +33,16 @@ public void test() { Map map = new HashMap<>(); map.put("db.user", "id > 100"); map.put("db.order_[0-9]+", "id > 200"); - assertEquals( - "id > 100", SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.user"))); - assertEquals( - "id > 200", - SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.order_1"))); - assertEquals( - "id > 200", - SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.order_2"))); - assertNull(SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.shop"))); + + assertThat(SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.user"))) + .isEqualTo("id > 100"); + + assertThat(SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.order_1"))) + .isEqualTo("id > 200"); + + assertThat(SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.order_2"))) + .isEqualTo("id > 200"); + + assertThat(SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.shop"))).isNull(); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java index d0b9cc6ac10..9a3f3d4c50a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java @@ -58,7 +58,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.USE_LEGACY_JSON_FORMAT; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; /** Test for {@link MySqlTableSource} created by {@link MySqlTableSourceFactory}. */ class MySqlTableSourceFactoryTest { @@ -333,10 +333,10 @@ public void testOptionalProperties() { Duration.ofMillis(15213), "testCol", true, - true, PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), true, SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), + false, "id > 200"); Assertions.assertThat(actualSource) .isEqualTo(expectedSource) @@ -485,7 +485,7 @@ void testStartupFromEarliestOffset() { SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), false, null); - assertEquals(expectedSource, actualSource); + assertThat(actualSource).isEqualTo(expectedSource); } @Test