Skip to content
Open
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
8a8d65c
[FLINK-36165][source-connector/mysql] Support capturing snapshot data…
Dec 3, 2024
f1b8776
[FLINK-36165][source-connector/mysql] add docs
Dec 4, 2024
ec83fbe
[FLINK-36165][source-connector/mysql] Implement snapshot filter for M…
Dec 5, 2024
96b885a
[FLINK-36165][source-connector/mysql] Escape dot
Dec 5, 2024
36ae871
[FLINK-36165][source-connector/mysql] support colon and semicolon in …
Aug 29, 2025
e59f5e8
Merge branch 'master' into FLINK-36165
Sep 5, 2025
8ee4c92
[FLINK-36165][source-connector/mysql] fix test
Sep 5, 2025
5f913f4
[FLINK-36165][source-connector/mysql] fix spotless
Sep 5, 2025
e50667d
[FLINK-36165][source-connector/mysql] fix spotless
Sep 5, 2025
e6dd29e
[FLINK-36165][source-connector/mysql] use yaml list/map to define sna…
Sep 5, 2025
ed9ddcc
Update docs/content/docs/connectors/pipeline-connectors/mysql.md
uicosp Jan 29, 2026
794513b
Update flink-cdc-connect/flink-cdc-source-connectors/flink-connector-…
uicosp Jan 29, 2026
3af4044
Update flink-cdc-connect/flink-cdc-source-connectors/flink-connector-…
uicosp Jan 29, 2026
ca12a3f
Update flink-cdc-connect/flink-cdc-source-connectors/flink-connector-…
uicosp Jan 29, 2026
c025e29
Update flink-cdc-connect/flink-cdc-source-connectors/flink-connector-…
uicosp Jan 29, 2026
3d360c9
validating and failing fast when scan.snapshot.filter option is set b…
Jan 29, 2026
67fae6d
validating each entry in scan.snapshot.filters and and building a Lin…
Jan 29, 2026
b7f13fd
fix non-deterministic snapshot filter selection when multiple pattern…
Jan 29, 2026
72724f1
wrap user filter in parentheses when combining with split conditions …
Jan 29, 2026
8ae2003
fix MySqlTableSourceFactoryTest
Jan 29, 2026
2a0bdf2
fix SnaphostFilterUtilsTest
Jan 29, 2026
85647b2
add more tests for this feature (IT, E2E test)
Apr 9, 2026
cc477d5
add a new use example for this feature
Apr 9, 2026
9e8b5f5
store the filters as a class member variable instead of constructing …
Apr 9, 2026
0d14a1b
Merge branch 'master' into FLINK-36165
uicosp Apr 9, 2026
2d8a8ca
resolve merge conflicts
Apr 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/content/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,13 @@ Only valid for cdc 1.x version. During a snapshot operation, the connector will
For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.
</td>
</tr>
<tr>
<td>scan.snapshot.filter</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>When reading a table snapshot, the rows of captured tables will be filtered using the specified filter expression (AKA a SQL WHERE clause). By default, no filter is applied, meaning the entire table will be synchronized. e.g. `id > 100`.</td>
</tr>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to add a new use example for this feature.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added usage examples in commit cc477d5:

  • docs/connectors/pipeline-connectors/mysql.md — Added "Snapshot
    Filters" section with a YAML pipeline example demonstrating the
    scan.snapshot.filters list syntax.
  • docs/connectors/flink-sources/mysql-cdc.md — Added "Snapshot
    Filter" section with both SQL DDL and DataStream API examples
    showing the scan.snapshot.filter option usage.

</tbody>
</table>
</div>
Expand Down
11 changes: 11 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,17 @@ pipeline:
List of readable metadata from SourceRecord to be passed to downstream and could be used in transform module, split by `,`. Available readable metadata are: op_ts.
</td>
</tr>
<tr>
<td>scan.snapshot.filters</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>When reading a table snapshot, the rows of captured tables will be filtered using the specified filter expression (AKA a SQL WHERE clause). <br>
By default, no filter is applied, meaning the entire table will be synchronized. <br>
A colon (:) separates table name and filter expression, while a semicolon (;) separate multiple filters,
e.g. `db1.user_table_[0-9]+:id > 100;db[1-2].[app|web]_order_\\.*:id < 0;`.
Comment thread
uicosp marked this conversation as resolved.
Outdated
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,18 +179,18 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
}

private SourceDef toSourceDef(JsonNode sourceNode) {
Map<String, String> sourceMap =
mapper.convertValue(sourceNode, new TypeReference<Map<String, String>>() {});
Map<String, Object> sourceMap =
mapper.convertValue(sourceNode, new TypeReference<Map<String, Object>>() {});

// "type" field is required
String type =
checkNotNull(
sourceMap.remove(TYPE_KEY),
(String) sourceMap.remove(TYPE_KEY),
"Missing required field \"%s\" in source configuration",
TYPE_KEY);

// "name" field is optional
String name = sourceMap.remove(NAME_KEY);
String name = (String) sourceMap.remove(NAME_KEY);

return new SourceDef(type, name, Configuration.fromMap(sourceMap));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.cdc.composer.definition.UdfDef;
import org.apache.flink.core.fs.Path;

import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
import org.apache.flink.shaded.guava31.com.google.common.io.Resources;
Expand Down Expand Up @@ -296,9 +297,9 @@ private void testSchemaEvolutionTypesParsing(
"mysql",
"source-database",
Configuration.fromMap(
ImmutableMap.<String, String>builder()
ImmutableMap.<String, Object>builder()
.put("host", "localhost")
.put("port", "3306")
.put("port", 3306)
.put("username", "admin")
.put("password", "pass")
.put(
Expand All @@ -307,7 +308,24 @@ private void testSchemaEvolutionTypesParsing(
.put(
"chunk-column",
"app_order_.*:id,web_order:product_id")
.put("capture-new-tables", "true")
.put("capture-new-tables", true)
.put(
"scan.snapshot.filters",
ImmutableList.of(
ImmutableMap.<String, String>builder()
.put(
"table",
"db1.user_table_[0-9]+")
.put("filter", "id > 100")
.build(),
ImmutableMap.<String, String>builder()
.put(
"table",
"db[1-2].[app|web]_order_\\.*")
.put(
"filter",
"city != 'China:beijing'")
.build()))
.build())),
new SinkDef(
"kafka",
Expand Down Expand Up @@ -434,9 +452,9 @@ void testParsingFullDefinitionFromString() throws Exception {
"mysql",
"source-database",
Configuration.fromMap(
ImmutableMap.<String, String>builder()
ImmutableMap.<String, Object>builder()
.put("host", "localhost")
.put("port", "3306")
.put("port", 3306)
.put("username", "admin")
.put("password", "pass")
.put(
Expand All @@ -445,7 +463,24 @@ void testParsingFullDefinitionFromString() throws Exception {
.put(
"chunk-column",
"app_order_.*:id,web_order:product_id")
.put("capture-new-tables", "true")
.put("capture-new-tables", true)
.put(
"scan.snapshot.filters",
ImmutableList.of(
ImmutableMap.<String, String>builder()
.put(
"table",
"db1.user_table_[0-9]+")
.put("filter", "id > 100")
.build(),
ImmutableMap.<String, String>builder()
.put(
"table",
"db[1-2].[app|web]_order_\\.*")
.put(
"filter",
"city != 'China:beijing'")
.build()))
.build())),
new SinkDef(
"kafka",
Expand Down Expand Up @@ -513,9 +548,9 @@ void testParsingFullDefinitionFromString() throws Exception {
"mysql",
null,
Configuration.fromMap(
ImmutableMap.<String, String>builder()
ImmutableMap.<String, Object>builder()
.put("host", "localhost")
.put("port", "3306")
.put("port", 3306)
.put("username", "admin")
.put("password", "pass")
.put(
Expand Down Expand Up @@ -574,9 +609,9 @@ void testParsingFullDefinitionFromString() throws Exception {
"mysql",
"source-database",
Configuration.fromMap(
ImmutableMap.<String, String>builder()
ImmutableMap.<String, Object>builder()
.put("host", "localhost")
.put("port", "3306")
.put("port", 3306)
.put("username", "admin")
.put("password", "pass")
.put(
Expand All @@ -585,7 +620,7 @@ void testParsingFullDefinitionFromString() throws Exception {
.put(
"chunk-column",
"app_order_.*:id,web_order:product_id")
.put("capture-new-tables", "true")
.put("capture-new-tables", true)
.build())),
new SinkDef(
"kafka",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ source:
tables: adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*
chunk-column: app_order_.*:id,web_order:product_id
capture-new-tables: true
scan.snapshot.filters:
- table: db1.user_table_[0-9]+
filter: id > 100
- table: db[1-2].[app|web]_order_\.*
filter: city != 'China:beijing'

sink:
type: kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public Configuration(Configuration other) {
}

/** Creates a new configuration that is initialized with the options of the given map. */
public static Configuration fromMap(Map<String, String> map) {
public static Configuration fromMap(Map<String, ?> map) {
final Configuration configuration = new Configuration();
configuration.confData.putAll(map);
return configuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FILTERS;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET;
Expand Down Expand Up @@ -113,6 +114,8 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
private static final Logger LOG = LoggerFactory.getLogger(MySqlDataSourceFactory.class);

public static final String IDENTIFIER = "mysql";
public static final String SNAPSHOT_FILTER_TABLE_KEY = "table";
public static final String SNAPSHOT_FILTER_FILTER_KEY = "filter";

@Override
public DataSource createDataSource(Context context) {
Expand Down Expand Up @@ -285,6 +288,19 @@ public DataSource createDataSource(Context context) {
LOG.info("Add chunkKeyColumn {}.", chunkKeyColumnMap);
configFactory.chunkKeyColumn(chunkKeyColumnMap);
}

List<Map<String, String>> snapshotFilters = config.get(SCAN_SNAPSHOT_FILTERS);
if (snapshotFilters != null && !snapshotFilters.isEmpty()) {
Map<String, String> snapshotFiltersMap =
snapshotFilters.stream()
.collect(
Collectors.toMap(
it -> it.get(SNAPSHOT_FILTER_TABLE_KEY),
it -> it.get(SNAPSHOT_FILTER_FILTER_KEY)));
LOG.info("Add snapshotFilters {}.", snapshotFiltersMap);
configFactory.snapshotFilters(snapshotFiltersMap);
}
Comment on lines +292 to +298
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scan.snapshot.filters is parsed via Collectors.toMap(...) without validating required keys (table/filter) and without handling duplicates. This can throw a generic NullPointerException/IllegalStateException on malformed or duplicate entries and also drops the original list order (which matters if patterns overlap). Consider validating each entry and building a LinkedHashMap with explicit duplicate handling (e.g., reject with a clear message).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed 67fae6d


String metadataList = config.get(METADATA_LIST);
List<MySqlReadableMetadata> readableMetadataList = listReadableMetadata(metadataList);
return new MySqlDataSource(configFactory, readableMetadataList);
Expand Down Expand Up @@ -359,6 +375,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(PARSE_ONLINE_SCHEMA_CHANGES);
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
options.add(SCAN_SNAPSHOT_FILTERS);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.apache.flink.cdc.common.configuration.ConfigOptions;

import java.time.Duration;
import java.util.List;
import java.util.Map;

/** Configurations for {@link MySqlDataSource}. */
@PublicEvolving
Expand Down Expand Up @@ -330,4 +332,21 @@ public class MySqlDataSourceOptions {
.defaultValue(false)
.withDescription(
"Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in change log reading phase instead of being merged into the snapshot.WARNING: Skipping backfill might lead to data inconsistency because some change log events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.");

@Experimental
public static final ConfigOption<List<Map<String, String>>> SCAN_SNAPSHOT_FILTERS =
ConfigOptions.key("scan.snapshot.filters")
.mapType()
.asList()
.noDefaultValue()
.withDescription(
"When reading a table snapshot, the rows of captured tables will be filtered using the specified filter expressions (AKA SQL WHERE clauses). "
+ "By default, no filter is applied, meaning the entire table will be synchronized. "
+ "This option expects a list of maps, where each map specifies a table pattern and its filter expression. "
+ "The configuration format in YAML looks like:\n"
+ "scan.snapshot.filters:\n"
+ " - table: db1.user_table_[0-9]+\n"
+ " filter: id > 100\n"
+ " - table: db[1-2].[app|web]_order_\\.*\n"
+ " filter: city != 'China:beijing'\n");
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand All @@ -43,6 +44,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FILTERS;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TREAT_TINYINT1_AS_BOOLEAN_ENABLED;
Expand Down Expand Up @@ -330,6 +332,58 @@ void testAddChunkKeyColumns() {
});
}

@Test
void testAddSnapshotFilters() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need more tests for this feature(IT, E2E test).

Copy link
Copy Markdown
Author

@uicosp uicosp Apr 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added two tests in commit 85647b2:

  • MySqlSourceITCase#testSnapshotFiltersMultipleTables — IT test
    verifying per-table filters work independently across two tables
    via the DataStream API.
  • MysqlE2eITCase#testSnapshotFilters — E2E test verifying the
    full pipeline path using the YAML-based scan.snapshot.filters
    configuration.

inventoryDatabase.createAndInitialize();
Map<String, Object> options = new HashMap<>();
options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
options.put(USERNAME.key(), TEST_USER);
options.put(PASSWORD.key(), TEST_PASSWORD);
options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".\\.*");
options.put(SCAN_SNAPSHOT_FILTERS.key(), mockSnapshotFilters());
Factory.Context context = new MockContext(Configuration.fromMap(options));

MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);

assertThat(dataSource.getSourceConfig().getSnapshotFilters())
.isNotEmpty()
.isEqualTo(
new HashMap<String, String>() {
{
put(
inventoryDatabase.getDatabaseName() + ".multi_max_\\.*",
"id > 200");
put(inventoryDatabase.getDatabaseName() + ".products", "1 = 0");
put(
inventoryDatabase.getDatabaseName() + ".customers",
"city != 'China:beijing'");
}
});
}

private List<Map<String, String>> mockSnapshotFilters() {
List<Map<String, String>> snapshotFilters = new ArrayList<>();

Map<String, String> filter1 = new HashMap<>();
filter1.put("table", inventoryDatabase.getDatabaseName() + ".multi_max_\\.*");
filter1.put("filter", "id > 200");
snapshotFilters.add(filter1);

Map<String, String> filter2 = new HashMap<>();
filter2.put("table", inventoryDatabase.getDatabaseName() + ".products");
filter2.put("filter", "1 = 0");
snapshotFilters.add(filter2);

Map<String, String> filter3 = new HashMap<>();
filter3.put("table", inventoryDatabase.getDatabaseName() + ".customers");
filter3.put("filter", "city != 'China:beijing'");
snapshotFilters.add(filter3);

return snapshotFilters;
}

class MockContext implements Factory.Context {

Configuration factoryConfiguration;
Expand Down
Loading
Loading