diff --git a/docs/content/docs/connectors/flink-sources/mysql-cdc.md b/docs/content/docs/connectors/flink-sources/mysql-cdc.md index 2752298c258..cfe38943737 100644 --- a/docs/content/docs/connectors/flink-sources/mysql-cdc.md +++ b/docs/content/docs/connectors/flink-sources/mysql-cdc.md @@ -460,6 +460,13 @@ Only valid for cdc 1.x version. During a snapshot operation, the connector will For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially. + + scan.snapshot.filter + optional + (none) + String + When reading a table snapshot, the rows of captured tables will be filtered using the specified filter expression (AKA a SQL WHERE clause). By default, no filter is applied, meaning the entire table will be synchronized. e.g. `id > 100`. + diff --git a/docs/content/docs/connectors/pipeline-connectors/mysql.md b/docs/content/docs/connectors/pipeline-connectors/mysql.md index a8964cdaffb..c7eab6cb786 100644 --- a/docs/content/docs/connectors/pipeline-connectors/mysql.md +++ b/docs/content/docs/connectors/pipeline-connectors/mysql.md @@ -293,6 +293,17 @@ pipeline: scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase. + + scan.snapshot.filters + optional + (none) + String + When reading a table snapshot, the rows of captured tables will be filtered using the specified filter expression (AKA a SQL WHERE clause).
+ By default, no filter is applied, meaning the entire table will be synchronized.
+ A colon (:) separates table name and filter expression, while a semicolon (;) separate multiple filters, + e.g. `db1.user_table_[0-9]+:id > 100;db[1-2].[app|web]_order_\\.*:id < 0;`. + + scan.parse.online.schema.changes.enabled optional diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index f522adfdb1a..13418bc52d5 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -84,6 +84,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FILTERS; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET; @@ -285,6 +286,29 @@ public DataSource createDataSource(Context context) { LOG.info("Add chunkKeyColumn {}.", chunkKeyColumnMap); configFactory.chunkKeyColumn(chunkKeyColumnMap); } + String snapshotFilters = config.get(SCAN_SNAPSHOT_FILTERS); + if (snapshotFilters != null) { + Map snapshotFilterMap = new HashMap<>(); + for (String snapshotFilter : snapshotFilters.split("(? readableMetadataList = listReadableMetadata(metadataList); return new MySqlDataSource(configFactory, readableMetadataList); @@ -352,6 +376,7 @@ public Set> optionalOptions() { options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND); options.add(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED); + options.add(SCAN_SNAPSHOT_FILTERS); options.add(METADATA_LIST); options.add(INCLUDE_COMMENTS_ENABLED); options.add(USE_LEGACY_JSON_FORMAT); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index 6aff556e7fa..d2845996d0a 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -273,6 +273,17 @@ public class MySqlDataSourceOptions { + "scan.newly-added-table.enabled: do re-snapshot & binlog-reading for newly added table when restored; \n" + "scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase."); + @Experimental + public static final ConfigOption SCAN_SNAPSHOT_FILTERS = + ConfigOptions.key("scan.snapshot.filters") + .stringType() + .noDefaultValue() + .withDescription( + "When reading a table snapshot, the rows of captured tables will be filtered using the specified filter expression (AKA a SQL WHERE clause). " + + "By default, no filter is applied, meaning the entire table will be synchronized. " + + "A colon (:) separates table name and filter expression, while a semicolon (;) separate multiple filters, " + + "e.g. `db1.user_table_[0-9]+:id > 100;db[1-2].[app|web]_order_\\.*:id < 0;`."); + @Experimental public static final ConfigOption METADATA_LIST = ConfigOptions.key("metadata.list") diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java index 74f3ef52a3c..85d06a2d712 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java @@ -43,6 +43,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FILTERS; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TREAT_TINYINT1_AS_BOOLEAN_ENABLED; @@ -330,6 +331,39 @@ void testAddChunkKeyColumns() { }); } + @Test + public void testAddSnapshotFilters() { + inventoryDatabase.createAndInitialize(); + Map options = new HashMap<>(); + options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost()); + options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + options.put(USERNAME.key(), TEST_USER); + options.put(PASSWORD.key(), TEST_PASSWORD); + options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".\\.*"); + options.put( + SCAN_SNAPSHOT_FILTERS.key(), + inventoryDatabase.getDatabaseName() + + ".multi_max_\\.*:id > 200;" + + inventoryDatabase.getDatabaseName() + + ".products:1 = 0;"); + Factory.Context context = new MockContext(Configuration.fromMap(options)); + + MySqlDataSourceFactory factory = new MySqlDataSourceFactory(); + MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context); + + assertThat(dataSource.getSourceConfig().getSnapshotFilters()) + .isNotEmpty() + .isEqualTo( + new HashMap() { + { + put( + inventoryDatabase.getDatabaseName() + ".multi_max_\\.*", + "id > 200"); + put(inventoryDatabase.getDatabaseName() + ".products", "1 = 0"); + } + }); + } + class MockContext implements Factory.Context { Configuration factoryConfiguration; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java index f729f59d05f..c907d477780 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java @@ -24,6 +24,7 @@ import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; +import org.apache.flink.cdc.connectors.mysql.source.utils.SnapshotFilterUtils; import org.apache.flink.cdc.connectors.mysql.source.utils.StatementUtils; import org.apache.flink.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks; @@ -242,12 +243,24 @@ private void createDataEventsForTable( long exportStart = clock.currentTimeInMillis(); LOG.info("Exporting data from split '{}' of table {}", snapshotSplit.splitId(), table.id()); + String filter = + SnapshotFilterUtils.getSnapshotFilter( + sourceConfig.getSnapshotFilters(), table.id()); + if (filter != null) { + LOG.info( + "Filter for split '{}' of table {} is: {}", + snapshotSplit.splitId(), + table.id(), + filter); + } + final String selectSql = StatementUtils.buildSplitScanQuery( snapshotSplit.getTableId(), snapshotSplit.getSplitKeyType(), snapshotSplit.getSplitStart() == null, - snapshotSplit.getSplitEnd() == null); + snapshotSplit.getSplitEnd() == null, + filter); LOG.info( "For split '{}' of table {} using select statement: '{}'", snapshotSplit.splitId(), diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java index caf316d1b4a..349a320ebd3 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java @@ -292,6 +292,15 @@ public MySqlSourceBuilder assignUnboundedChunkFirst(boolean assignUnboundedCh return this; } + /** + * When reading a table snapshot, the rows of captured tables will be filtered using the + * specified filter expression (AKA a SQL WHERE clause). + */ + public MySqlSourceBuilder snapshotFilters(String table, String filter) { + this.configFactory.snapshotFilters(table, filter); + return this; + } + /** * Build the {@link MySqlSource}. * diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java index 4821eaba2ea..965831c28bf 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java @@ -26,6 +26,7 @@ import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit; import org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils; import org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtils; +import org.apache.flink.cdc.connectors.mysql.source.utils.SnapshotFilterUtils; import org.apache.flink.cdc.connectors.mysql.source.utils.StatementUtils; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.LogicalTypeRoot; @@ -148,12 +149,16 @@ private void analyzeTable(MySqlPartition partition, TableId tableId) { splitColumn = ChunkUtils.getChunkKeyColumn( currentSplittingTable, sourceConfig.getChunkKeyColumns()); + String filter = + SnapshotFilterUtils.getSnapshotFilter( + sourceConfig.getSnapshotFilters(), tableId); splitType = ChunkUtils.getChunkKeyColumnType( splitColumn, sourceConfig.isTreatTinyInt1AsBoolean()); minMaxOfSplitColumn = - StatementUtils.queryMinMax(jdbcConnection, tableId, splitColumn.name()); - approximateRowCnt = StatementUtils.queryApproximateRowCnt(jdbcConnection, tableId); + StatementUtils.queryMinMax(jdbcConnection, tableId, splitColumn.name(), filter); + approximateRowCnt = + StatementUtils.queryRowCnt(jdbcConnection, tableId, splitColumn.name(), filter); } catch (Exception e) { throw new RuntimeException("Fail to analyze table in chunk splitter.", e); } @@ -171,6 +176,8 @@ private MySqlSnapshotSplit splitOneUnevenlySizedChunk(MySqlPartition partition, nextChunkStart == ChunkSplitterState.ChunkBound.START_BOUND ? "null" : chunkStartVal.toString()); + String filter = + SnapshotFilterUtils.getSnapshotFilter(sourceConfig.getSnapshotFilters(), tableId); // we start from [null, min + chunk_size) and avoid [null, min) Object chunkEnd = nextChunkEnd( @@ -181,7 +188,8 @@ private MySqlSnapshotSplit splitOneUnevenlySizedChunk(MySqlPartition partition, tableId, splitColumn.name(), minMaxOfSplitColumn[1], - chunkSize); + chunkSize, + filter); // may sleep a while to avoid DDOS on MySQL server maySleep(nextChunkId, tableId); if (chunkEnd != null && ObjectUtils.compare(chunkEnd, minMaxOfSplitColumn[1]) <= 0) { @@ -316,7 +324,6 @@ public List splitEvenlySizedChunks( break; } } - // add the ending split splits.add(ChunkRange.of(chunkStart, null)); return splits; @@ -328,16 +335,17 @@ private Object nextChunkEnd( TableId tableId, String splitColumnName, Object max, - int chunkSize) + int chunkSize, + @Nullable String filter) throws SQLException { // chunk end might be null when max values are removed Object chunkEnd = StatementUtils.queryNextChunkMax( - jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd); + jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd, filter); if (Objects.equals(previousChunkEnd, chunkEnd)) { // we don't allow equal chunk start and end, // should query the next one larger than chunkEnd - chunkEnd = StatementUtils.queryMin(jdbc, tableId, splitColumnName, chunkEnd); + chunkEnd = StatementUtils.queryMin(jdbc, tableId, splitColumnName, chunkEnd, filter); // queryMin will return null when the chunkEnd is the max value, // this will happen when the mysql table ignores the capitalization. diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java index 260a7cd2b5d..ac2f6eb3538 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java @@ -67,6 +67,7 @@ public class MySqlSourceConfig implements Serializable { private final Properties jdbcProperties; private final Map chunkKeyColumns; private final boolean skipSnapshotBackfill; + private final Map snapshotFilters; private final boolean parseOnLineSchemaChanges; public static boolean useLegacyJsonFormat = true; private final boolean assignUnboundedChunkFirst; @@ -108,7 +109,8 @@ public class MySqlSourceConfig implements Serializable { boolean parseOnLineSchemaChanges, boolean treatTinyInt1AsBoolean, boolean useLegacyJsonFormat, - boolean assignUnboundedChunkFirst) { + boolean assignUnboundedChunkFirst, + Map snapshotFilters) { this.hostname = checkNotNull(hostname); this.port = port; this.username = checkNotNull(username); @@ -148,6 +150,7 @@ public class MySqlSourceConfig implements Serializable { this.jdbcProperties = jdbcProperties; this.chunkKeyColumns = chunkKeyColumns; this.skipSnapshotBackfill = skipSnapshotBackfill; + this.snapshotFilters = snapshotFilters; this.parseOnLineSchemaChanges = parseOnLineSchemaChanges; this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean; this.useLegacyJsonFormat = useLegacyJsonFormat; @@ -285,4 +288,8 @@ public boolean isSkipSnapshotBackfill() { public boolean isTreatTinyInt1AsBoolean() { return treatTinyInt1AsBoolean; } + + public Map getSnapshotFilters() { + return snapshotFilters; + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java index 427115edea7..f100c872c57 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java @@ -70,6 +70,7 @@ public class MySqlSourceConfigFactory implements Serializable { private Properties dbzProperties; private Map chunkKeyColumns = new HashMap<>(); private boolean skipSnapshotBackfill = false; + private Map snapshotFilters = new HashMap<>(); private boolean parseOnLineSchemaChanges = false; private boolean treatTinyInt1AsBoolean = true; private boolean useLegacyJsonFormat = true; @@ -304,6 +305,24 @@ public MySqlSourceConfigFactory closeIdleReaders(boolean closeIdleReaders) { return this; } + /** + * When reading a table snapshot, the rows of captured tables will be filtered using the + * specified filter expression (AKA a SQL WHERE clause). + */ + public MySqlSourceConfigFactory snapshotFilters(String table, String filter) { + this.snapshotFilters.put(table, filter); + return this; + } + + /** + * When reading a table snapshot, the rows of captured tables will be filtered using the + * specified filter expression (AKA a SQL WHERE clause). + */ + public MySqlSourceConfigFactory snapshotFilters(Map snapshotFilters) { + this.snapshotFilters.putAll(snapshotFilters); + return this; + } + /** Whether to parse gh-ost/pt-osc utility generated schema change events. Defaults to false. */ public MySqlSourceConfigFactory parseOnLineSchemaChanges(boolean parseOnLineSchemaChanges) { this.parseOnLineSchemaChanges = parseOnLineSchemaChanges; @@ -421,6 +440,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) { parseOnLineSchemaChanges, treatTinyInt1AsBoolean, useLegacyJsonFormat, - assignUnboundedChunkFirst); + assignUnboundedChunkFirst, + snapshotFilters); } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java index a8e143f5fc5..673d39396d6 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java @@ -270,6 +270,15 @@ public class MySqlSourceOptions { .withDescription( "Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in binlog reading phase instead of being merged into the snapshot. WARNING: Skipping backfill might lead to data inconsistency because some binlog events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed binlog events should be handled specially."); + @Experimental + public static final ConfigOption SCAN_SNAPSHOT_FILTER = + ConfigOptions.key("scan.snapshot.filter") + .stringType() + .noDefaultValue() + .withDescription( + "When reading a table snapshot, the rows of captured tables will be filtered using the specified filter expression (AKA a SQL WHERE clause). " + + "By default, no filter is applied, meaning the entire table will be synchronized. e.g. `id > 100`"); + @Experimental public static final ConfigOption PARSE_ONLINE_SCHEMA_CHANGES = ConfigOptions.key("scan.parse.online.schema.changes.enabled") diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtils.java new file mode 100644 index 00000000000..7ad16d5beab --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtils.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source.utils; + +import org.apache.flink.cdc.common.schema.Selectors; + +import io.debezium.relational.TableId; + +import java.util.HashMap; +import java.util.Map; + +/** Utilities to filter snapshot of table. */ +public class SnapshotFilterUtils { + + private SnapshotFilterUtils() {} + + private static final Map, Map> cache = new HashMap<>(); + + /** + * Don't worry about atomicity. We don't need to use the synchronized keyword to ensure thread + * safety here. Since filters come from the source configuration, they shouldn’t be changed + * during runtime. So the result will always be idempotent. + */ + private static Map toSelector(Map filters) { + Map cached = cache.get(filters); + if (cached != null) { + return cached; + } + + Map snapshotFilters = new HashMap<>(); + filters.forEach( + (table, filter) -> { + Selectors selector = + new Selectors.SelectorsBuilder().includeTables(table).build(); + snapshotFilters.put(selector, filter); + }); + cache.put(filters, snapshotFilters); + + return snapshotFilters; + } + + public static String getSnapshotFilter(Map filters, TableId tableId) { + Map snapshotFilters = toSelector(filters); + + String filter = null; + for (Selectors selector : snapshotFilters.keySet()) { + if (selector.isMatch( + org.apache.flink.cdc.common.event.TableId.tableId( + tableId.catalog(), tableId.table()))) { + filter = snapshotFilters.get(selector); + break; + } + } + return filter; + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java index e538c325d78..feb26a724d9 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java @@ -22,6 +22,8 @@ import io.debezium.jdbc.JdbcConnection; import io.debezium.relational.TableId; +import javax.annotation.Nullable; + import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -36,12 +38,15 @@ public class StatementUtils { private StatementUtils() {} - public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName) + public static Object[] queryMinMax( + JdbcConnection jdbc, TableId tableId, String columnName, @Nullable String filter) throws SQLException { final String minMaxQuery = String.format( "SELECT MIN(%s), MAX(%s) FROM %s", - quote(columnName), quote(columnName), quote(tableId)); + quote(columnName), + quote(columnName), + filter != null ? quote(tableId) + " WHERE " + filter : quote(tableId)); return jdbc.queryAndMap( minMaxQuery, rs -> { @@ -56,6 +61,31 @@ public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String }); } + public static Long queryRowCnt( + JdbcConnection jdbc, TableId tableId, String columnName, @Nullable String filter) + throws SQLException { + + if (filter == null) { + return queryApproximateRowCnt(jdbc, tableId); + } + + final String cntQuery = + String.format( + "SELECT COUNT(%s) FROM %s WHERE %s", + quote(columnName), quote(tableId), filter); + return jdbc.queryAndMap( + cntQuery, + rs -> { + if (!rs.next()) { + // this should never happen + throw new SQLException( + String.format( + "No result returned after running query [%s]", cntQuery)); + } + return rs.getLong(1); + }); + } + public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) throws SQLException { // The statement used to get approximate row count which is less @@ -77,12 +107,18 @@ public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId) } public static Object queryMin( - JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound) + JdbcConnection jdbc, + TableId tableId, + String columnName, + Object excludedLowerBound, + @Nullable String filter) throws SQLException { final String minQuery = String.format( "SELECT MIN(%s) FROM %s WHERE %s > ?", - quote(columnName), quote(tableId), quote(columnName)); + quote(columnName), + quote(tableId), + filter != null ? filter + " AND " + quote(columnName) : quote(columnName)); return jdbc.prepareQueryAndMap( minQuery, ps -> ps.setObject(1, excludedLowerBound), @@ -102,7 +138,8 @@ public static Object queryNextChunkMax( TableId tableId, String splitColumnName, int chunkSize, - Object includedLowerBound) + Object includedLowerBound, + @Nullable String filter) throws SQLException { String quotedColumn = quote(splitColumnName); String query = @@ -113,7 +150,7 @@ public static Object queryNextChunkMax( quotedColumn, quotedColumn, quote(tableId), - quotedColumn, + filter != null ? filter + " AND " + quotedColumn : quotedColumn, quotedColumn, chunkSize); return jdbc.prepareQueryAndMap( @@ -131,8 +168,12 @@ public static Object queryNextChunkMax( } public static String buildSplitScanQuery( - TableId tableId, RowType pkRowType, boolean isFirstSplit, boolean isLastSplit) { - return buildSplitQuery(tableId, pkRowType, isFirstSplit, isLastSplit, -1, true); + TableId tableId, + RowType pkRowType, + boolean isFirstSplit, + boolean isLastSplit, + @Nullable String filter) { + return buildSplitQuery(tableId, pkRowType, isFirstSplit, isLastSplit, -1, true, filter); } private static String buildSplitQuery( @@ -141,8 +182,9 @@ private static String buildSplitQuery( boolean isFirstSplit, boolean isLastSplit, int limitSize, - boolean isScanningData) { - final String condition; + boolean isScanningData, + @Nullable String filter) { + String condition; if (isFirstSplit && isLastSplit) { condition = null; @@ -172,6 +214,10 @@ private static String buildSplitQuery( condition = sql.toString(); } + if (filter != null) { + condition = condition == null ? filter : filter + " AND " + condition; + } + if (isScanningData) { return buildSelectWithRowLimits( tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty()); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java index 2a1f0519435..f010dd1a534 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java @@ -98,6 +98,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat private final Duration heartbeatInterval; private final String chunkKeyColumn; final boolean skipSnapshotBackFill; + private final String snapshotFilter; final boolean parseOnlineSchemaChanges; private final boolean useLegacyJsonFormat; private final boolean assignUnboundedChunkFirst; @@ -144,7 +145,8 @@ public MySqlTableSource( boolean parseOnlineSchemaChanges, boolean useLegacyJsonFormat, boolean assignUnboundedChunkFirst, - boolean appendOnly) { + boolean appendOnly, + @Nullable String snapshotFilter) { this.physicalSchema = physicalSchema; this.port = port; this.hostname = checkNotNull(hostname); @@ -178,6 +180,7 @@ public MySqlTableSource( this.useLegacyJsonFormat = useLegacyJsonFormat; this.assignUnboundedChunkFirst = assignUnboundedChunkFirst; this.appendOnly = appendOnly; + this.snapshotFilter = snapshotFilter; } @Override @@ -241,6 +244,9 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { .parseOnLineSchemaChanges(parseOnlineSchemaChanges) .useLegacyJsonFormat(useLegacyJsonFormat) .assignUnboundedChunkFirst(assignUnboundedChunkFirst) + .snapshotFilters( + escapeDot(database) + "." + escapeDot(tableName), + snapshotFilter) .build(); return SourceProvider.of(parallelSource); } else { @@ -330,7 +336,8 @@ public DynamicTableSource copy() { parseOnlineSchemaChanges, useLegacyJsonFormat, assignUnboundedChunkFirst, - appendOnly); + appendOnly, + snapshotFilter); source.metadataKeys = metadataKeys; source.producedDataType = producedDataType; return source; @@ -373,6 +380,8 @@ public boolean equals(Object o) { && Objects.equals(heartbeatInterval, that.heartbeatInterval) && Objects.equals(chunkKeyColumn, that.chunkKeyColumn) && Objects.equals(skipSnapshotBackFill, that.skipSnapshotBackFill) + && Objects.equals(snapshotFilter, that.snapshotFilter) + && Objects.equals(skipSnapshotBackFill, that.skipSnapshotBackFill) && parseOnlineSchemaChanges == that.parseOnlineSchemaChanges && useLegacyJsonFormat == that.useLegacyJsonFormat && assignUnboundedChunkFirst == that.assignUnboundedChunkFirst @@ -413,7 +422,8 @@ public int hashCode() { parseOnlineSchemaChanges, useLegacyJsonFormat, assignUnboundedChunkFirst, - appendOnly); + appendOnly, + snapshotFilter); } @Override @@ -438,4 +448,8 @@ Properties getParallelDbzProperties(Properties dbzProperties) { } return newDbzProperties; } + + private String escapeDot(String str) { + return str.replace(".", "\\."); + } } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java index 5ea430d94e7..87ddfb364f8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java @@ -101,6 +101,7 @@ public DynamicTableSource createDynamicTableSource(Context context) { config.get(MySqlSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); boolean skipSnapshotBackFill = config.get(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); + String snapshotFilter = config.get(MySqlSourceOptions.SCAN_SNAPSHOT_FILTER); boolean parseOnLineSchemaChanges = config.get(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES); boolean useLegacyJsonFormat = config.get(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT); @@ -156,7 +157,8 @@ public DynamicTableSource createDynamicTableSource(Context context) { parseOnLineSchemaChanges, useLegacyJsonFormat, assignUnboundedChunkFirst, - appendOnly); + appendOnly, + snapshotFilter); } @Override @@ -202,6 +204,7 @@ public Set> optionalOptions() { options.add(MySqlSourceOptions.HEARTBEAT_INTERVAL); options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN); options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP); + options.add(MySqlSourceOptions.SCAN_SNAPSHOT_FILTER); options.add(MySqlSourceOptions.PARSE_ONLINE_SCHEMA_CHANGES); options.add(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT); options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java index e9205d2c5d5..2b6536a7093 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java @@ -570,7 +570,7 @@ void testReadBinlogFromEarliestOffset() throws Exception { "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", - "+I[2000, user_21, Shanghai, 123567891234]", + "+I[2000, user_21, China:Shanghai, 123567891234]", "-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java index 3c7f9d141ea..daa060e993a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java @@ -140,7 +140,7 @@ class MySqlSourceITCase extends MySqlSourceTestBase { "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", - "+I[2000, user_21, Shanghai, 123567891234]"); + "+I[2000, user_21, China:Shanghai, 123567891234]"); /** First part binlog events in string, which is made by {@link #makeFirstPartBinlogEvents}. */ private final List firstPartBinlogEvents = @@ -417,7 +417,7 @@ void testSnapshotSplitReadingFailCrossCheckpoints(String tableName, String chunk env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); // The sleeping source will sleep awhile after send per record - MySqlSource sleepingSource = buildSleepingSource(tableName, chunkColumnName); + MySqlSource sleepingSource = buildSleepingSource(tableName, chunkColumnName, null); DataStreamSource source = env.fromSource(sleepingSource, WatermarkStrategy.noWatermarks(), "selfSource"); @@ -443,7 +443,7 @@ void testSnapshotSplitReadingFailCrossCheckpoints(String tableName, String chunk "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", - "+I[2000, user_21, Shanghai, 123567891234]" + "+I[2000, user_21, China:Shanghai, 123567891234]" }; TypeSerializer serializer = source.getTransformation().getOutputType().createSerializer(env.getConfig()); @@ -525,7 +525,7 @@ void testSnapshotOnlyModeWithDMLPostHighWaterMark() throws Exception { "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", - "+I[2000, user_21, Shanghai, 123567891234]"); + "+I[2000, user_21, China:Shanghai, 123567891234]"); assertEqualsInAnyOrder(expectedRecords, records); } @@ -661,9 +661,9 @@ void testSkipBackfillWithDMLPreHighWaterMark() throws Exception { "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", - "+I[2000, user_21, Shanghai, 123567891234]", + "+I[2000, user_21, China:Shanghai, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]", - "-U[2000, user_21, Shanghai, 123567891234]", + "-U[2000, user_21, China:Shanghai, 123567891234]", "+U[2000, user_21, Pittsburgh, 123567891234]", "-D[1019, user_20, Shanghai, 123567891234]"); // when skip backfill, the wal log between (snapshot, high_watermark) will be seen as @@ -702,7 +702,7 @@ void testSkipBackfillWithDMLPostLowWaterMark() throws Exception { "+I[2000, user_21, Pittsburgh, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]", - "-U[2000, user_21, Shanghai, 123567891234]", + "-U[2000, user_21, China:Shanghai, 123567891234]", "+U[2000, user_21, Pittsburgh, 123567891234]", "-D[1019, user_20, Shanghai, 123567891234]"); // when skip backfill, the wal log between (snapshot, high_watermark) will still be @@ -711,6 +711,154 @@ void testSkipBackfillWithDMLPostLowWaterMark() throws Exception { assertEqualsInAnyOrder(expectedRecords, records); } + @ParameterizedTest + @MethodSource("parameters") + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testSnapshotFilters(String tableName, String chunkColumnName) throws Exception { + customDatabase.createAndInitialize(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(5000L); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + + // Filter user with `id > 200` + // The sleeping source will sleep awhile after send per record + MySqlSource sleepingSource = + buildSleepingSource(tableName, chunkColumnName, "id > 200"); + DataStreamSource source = + env.fromSource(sleepingSource, WatermarkStrategy.noWatermarks(), "selfSource"); + + String[] expectedSnapshotData = + new String[] { + "+I[1009, user_10, Shanghai, 123567891234]", + "+I[1010, user_11, Shanghai, 123567891234]", + "+I[1011, user_12, Shanghai, 123567891234]", + "+I[1012, user_13, Shanghai, 123567891234]", + "+I[1013, user_14, Shanghai, 123567891234]", + "+I[1014, user_15, Shanghai, 123567891234]", + "+I[1015, user_16, Shanghai, 123567891234]", + "+I[1016, user_17, Shanghai, 123567891234]", + "+I[1017, user_18, Shanghai, 123567891234]", + "+I[1018, user_19, Shanghai, 123567891234]", + "+I[1019, user_20, Shanghai, 123567891234]", + "+I[2000, user_21, China:Shanghai, 123567891234]" + }; + TypeSerializer serializer = + source.getTransformation().getOutputType().createSerializer(env.getConfig()); + String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); + CollectSinkOperatorFactory factory = + new CollectSinkOperatorFactory(serializer, accumulatorName); + CollectSinkOperator operator = (CollectSinkOperator) factory.getOperator(); + CollectResultIterator iterator = + new CollectResultIterator( + operator.getOperatorIdFuture(), + serializer, + accumulatorName, + env.getCheckpointConfig(), + 10000L); + CollectStreamSink sink = new CollectStreamSink(source, factory); + sink.name("Data stream collect sink"); + env.addOperator(sink.getTransformation()); + JobClient jobClient = env.executeAsync("snapshotSplitTest"); + iterator.setJobClient(jobClient); + JobID jobId = jobClient.getJobID(); + + // Trigger failover once some snapshot records has been sent by sleeping source + if (iterator.hasNext()) { + triggerFailover( + FailoverType.JM, + jobId, + miniClusterResource.get().getMiniCluster(), + () -> sleepMs(100)); + } + + // Check all snapshot records are sent with exactly-once semantics + assertEqualsInAnyOrder( + Arrays.asList(expectedSnapshotData), + fetchRowData(iterator, expectedSnapshotData.length)); + assertThat(hasNextData(iterator)).isFalse(); + + jobClient.cancel().get(); + } + + @ParameterizedTest + @MethodSource("parameters") + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testSnapshotFiltersEscape(String tableName, String chunkColumnName) + throws Exception { + customDatabase.createAndInitialize(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setParallelism(DEFAULT_PARALLELISM); + env.enableCheckpointing(5000L); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0)); + + // Filter user with `id > 200` + // The sleeping source will sleep awhile after send per record + MySqlSource sleepingSource = + buildSleepingSource(tableName, chunkColumnName, "address != 'China:Shanghai'"); + DataStreamSource source = + env.fromSource(sleepingSource, WatermarkStrategy.noWatermarks(), "selfSource"); + + String[] expectedSnapshotData = + new String[] { + "+I[101, user_1, Shanghai, 123567891234]", + "+I[102, user_2, Shanghai, 123567891234]", + "+I[103, user_3, Shanghai, 123567891234]", + "+I[109, user_4, Shanghai, 123567891234]", + "+I[110, user_5, Shanghai, 123567891234]", + "+I[111, user_6, Shanghai, 123567891234]", + "+I[118, user_7, Shanghai, 123567891234]", + "+I[121, user_8, Shanghai, 123567891234]", + "+I[123, user_9, Shanghai, 123567891234]", + "+I[1009, user_10, Shanghai, 123567891234]", + "+I[1010, user_11, Shanghai, 123567891234]", + "+I[1011, user_12, Shanghai, 123567891234]", + "+I[1012, user_13, Shanghai, 123567891234]", + "+I[1013, user_14, Shanghai, 123567891234]", + "+I[1014, user_15, Shanghai, 123567891234]", + "+I[1015, user_16, Shanghai, 123567891234]", + "+I[1016, user_17, Shanghai, 123567891234]", + "+I[1017, user_18, Shanghai, 123567891234]", + "+I[1018, user_19, Shanghai, 123567891234]", + "+I[1019, user_20, Shanghai, 123567891234]" + }; + TypeSerializer serializer = + source.getTransformation().getOutputType().createSerializer(env.getConfig()); + String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); + CollectSinkOperatorFactory factory = + new CollectSinkOperatorFactory(serializer, accumulatorName); + CollectSinkOperator operator = (CollectSinkOperator) factory.getOperator(); + CollectResultIterator iterator = + new CollectResultIterator( + operator.getOperatorIdFuture(), + serializer, + accumulatorName, + env.getCheckpointConfig(), + 10000L); + CollectStreamSink sink = new CollectStreamSink(source, factory); + sink.name("Data stream collect sink"); + env.addOperator(sink.getTransformation()); + JobClient jobClient = env.executeAsync("snapshotSplitTest"); + iterator.setJobClient(jobClient); + JobID jobId = jobClient.getJobID(); + + // Trigger failover once some snapshot records has been sent by sleeping source + if (iterator.hasNext()) { + triggerFailover( + FailoverType.JM, + jobId, + miniClusterResource.get().getMiniCluster(), + () -> sleepMs(100)); + } + + // Check all snapshot records are sent with exactly-once semantics + assertEqualsInAnyOrder( + Arrays.asList(expectedSnapshotData), + fetchRowData(iterator, expectedSnapshotData.length)); + assertThat(hasNextData(iterator)).isFalse(); + jobClient.cancel().get(); + } + private List testBackfillWhenWritingEvents( boolean skipSnapshotBackfill, int fetchSize, @@ -1020,7 +1168,8 @@ private CollectResultIterator addCollector( return iterator; } - private MySqlSource buildSleepingSource(String tableName, String chunkColumnName) { + private MySqlSource buildSleepingSource( + String tableName, String chunkColumnName, String snapshotFilter) { ResolvedSchema physicalSchema = new ResolvedSchema( Arrays.asList( @@ -1072,6 +1221,7 @@ private MySqlSource buildSleepingSource(String tableName, String chunkC .chunkKeyColumn( new ObjectPath(customDatabase.getDatabaseName(), tableName), chunkColumnName) + .snapshotFilters(customDatabase.getDatabaseName() + "." + tableName, snapshotFilter) .build(); } @@ -1222,7 +1372,7 @@ private void checkSnapshotData( "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", - "+I[2000, user_21, Shanghai, 123567891234]" + "+I[2000, user_21, China:Shanghai, 123567891234]" }; List expectedSnapshotData = new ArrayList<>(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java index 7b8dfdcdbb2..bbe6bf1e4b4 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java @@ -184,7 +184,7 @@ void testRemoveTableUsingStateFromSnapshotPhase() throws Exception { "+I[1016, user_17, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", - "+I[2000, user_21, Shanghai, 123567891234]" + "+I[2000, user_21, China:Shanghai, 123567891234]" }; // Step 2: wait the snapshot splits finished reading List actualRecords = consumeSnapshotRecords(reader, dataType); @@ -295,7 +295,7 @@ void testFinishedUnackedSplitsUsingStateFromSnapshotPhase(boolean skipBackFill) "+I[1016, user_17, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", - "+I[2000, user_21, Shanghai, 123567891234]" + "+I[2000, user_21, China:Shanghai, 123567891234]" }; // Step 2: wait the snapshot splits finished reading List actualRecords = consumeSnapshotRecords(reader, dataType); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtilsTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtilsTest.java new file mode 100644 index 00000000000..36f4d7f0a58 --- /dev/null +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtilsTest.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.mysql.source.utils; + +import io.debezium.relational.TableId; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit test for {@link org.apache.flink.cdc.connectors.mysql.source.utils.SnapshotFilterUtils}. */ +public class SnapshotFilterUtilsTest { + + @Test + public void test() { + Map map = new HashMap<>(); + map.put("db.user", "id > 100"); + map.put("db.order_[0-9]+", "id > 200"); + + assertThat(SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.user"))) + .isEqualTo("id > 100"); + + assertThat(SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.order_1"))) + .isEqualTo("id > 200"); + + assertThat(SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.order_2"))) + .isEqualTo("id > 200"); + + assertThat(SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.shop"))).isNull(); + } +} diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java index 4f249335f55..9a3f3d4c50a 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java @@ -58,6 +58,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.USE_LEGACY_JSON_FORMAT; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; /** Test for {@link MySqlTableSource} created by {@link MySqlTableSourceFactory}. */ class MySqlTableSourceFactoryTest { @@ -129,7 +130,8 @@ void testCommonProperties() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + null); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -179,7 +181,8 @@ void testEnableParallelReadSource() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + null); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -225,7 +228,8 @@ void testEnableParallelReadSourceWithSingleServerId() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + null); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -269,12 +273,13 @@ void testEnableParallelReadSourceLatestOffset() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + null); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @Test - void testOptionalProperties() { + public void testOptionalProperties() { Map options = getAllOptions(); options.put("port", "3307"); options.put("server-id", "4321"); @@ -290,6 +295,7 @@ void testOptionalProperties() { options.put("scan.incremental.close-idle-reader.enabled", "true"); options.put("scan.incremental.snapshot.backfill.skip", "true"); options.put("use.legacy.json.format", "true"); + options.put("scan.snapshot.filter", "id > 200"); DynamicTableSource actualSource = createTableSource(options); Properties dbzProperties = new Properties(); @@ -330,7 +336,8 @@ void testOptionalProperties() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), true, SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + "id > 200"); Assertions.assertThat(actualSource) .isEqualTo(expectedSource) .isInstanceOf(MySqlTableSource.class); @@ -389,7 +396,8 @@ void testStartupFromSpecificOffset() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + null); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -431,7 +439,8 @@ void testStartupFromInitial() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + null); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -474,8 +483,9 @@ void testStartupFromEarliestOffset() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); - Assertions.assertThat(actualSource).isEqualTo(expectedSource); + false, + null); + assertThat(actualSource).isEqualTo(expectedSource); } @Test @@ -518,7 +528,8 @@ void testStartupFromSpecificTimestamp() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + null); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -560,7 +571,8 @@ void testStartupFromLatestOffset() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + null); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } @@ -607,7 +619,8 @@ void testMetadataColumns() { PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(), USE_LEGACY_JSON_FORMAT.defaultValue(), SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(), - false); + false, + null); expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType(); expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name"); @@ -810,7 +823,8 @@ void testEnablingExperimentalOptions() { true, true, true, - false); + false, + null); Assertions.assertThat(actualSource).isEqualTo(expectedSource); } diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql index e4df63f1a33..34daf3abaf5 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/resources/ddl/customer.sql @@ -46,7 +46,7 @@ VALUES (101,"user_1","Shanghai","123567891234"), (1017,"user_18","Shanghai","123567891234"), (1018,"user_19","Shanghai","123567891234"), (1019,"user_20","Shanghai","123567891234"), - (2000,"user_21","Shanghai","123567891234"); + (2000,"user_21","China:Shanghai","123567891234"); -- Create a table will not be read CREATE TABLE prefix_customers ( @@ -91,7 +91,7 @@ VALUES (101,"user_1","Shanghai","123567891234"), (1017,"user_18","Shanghai","123567891234"), (1018,"user_19","Shanghai","123567891234"), (1019,"user_20","Shanghai","123567891234"), - (2000,"user_21","Shanghai","123567891234"); + (2000,"user_21","China:Shanghai","123567891234"); -- create table whose split key is evenly distributed CREATE TABLE customers_even_dist ( @@ -179,7 +179,7 @@ VALUES (101,"user_1","Shanghai","123567891234"), (1017,"user_18","Shanghai","123567891234"), (1018,"user_19","Shanghai","123567891234"), (1019,"user_20","Shanghai","123567891234"), - (2000,"user_21","Shanghai","123567891234"); + (2000,"user_21","China:Shanghai","123567891234"); -- table has combined primary key CREATE TABLE customer_card (