[FLINK-36165][source-connector/mysql] Support capturing snapshot data with conditions#3776
[FLINK-36165][source-connector/mysql] Support capturing snapshot data with conditions#3776uicosp wants to merge 26 commits into
Conversation
|
Thanks for @uicosp's great work! It's indeed a long awaited feature. Seems Debezium has a similar option called As these options aren't available in incremental framework, it would be nice if we could support both row and column level pushdown with similar syntax, since they're both related to tweaking snapshot querying SQL. WDYT? |
it is also support table api and datastream api with incremental framework |
|
Should we consider this pr with only |
|
Column pruning is certainly a feature worth considering. In theory, it would only require minor adjustments to the current design, such as implementing a syntax like Taking this a step further, since the pipeline config already includes a transform option, we could leverage it to automatically extract filter conditions and projection fields, dynamically generating scan.snapshot.filters and thereby simplifying the config. That said, I’m currently focused on other tasks and don't have time to work on this. Once I have time, I’d be happy to revisit and enhance this feature. |
|
Does this function also apply to oracle.cdc ? |
no, this function only works for mysql connector. |
Thanks for the reply, does oracle connector have similar capabilities? I tried to configure |
I haven't used oracle, so I'm not sure. You can try to find the answer through the offical flink cdc docs or the oracle connector source code. |
|
Thank you. I will follow your suggestion to attempt implementing a filtering feature based on an incremental snapshot connector |
|
@yuxiqian @loserwang1024 PTAL |
| if (snapshotFilters != null) { | ||
| Map<String, String> snapshotFilterMap = new HashMap<>(); | ||
| for (String snapshotFilter : snapshotFilters.split(";")) { | ||
| String[] splits = snapshotFilter.trim().split(":"); |
There was a problem hiding this comment.
What if the filter in sql contains ; or :, like city != 'China:beijing'?
There was a problem hiding this comment.
Sorry I’m late, I was busy with something else. I have modified the snapshotFilters parsing logic in MySqlDataSourceFactory to correctly handle semicolons and colons within filter conditions.
Key improvements:
- parseSnapshotFilters method:
- Intelligently parses semicolon delimiters while recognizing quoted content
- Supports both single and double quotes
- Semicolons within quotes are not treated as delimiters
- parseTableAndCondition method:
- Splits only on the first colon to avoid misinterpreting colons in condition values
- Ensures proper separation of table names and conditions
There was a problem hiding this comment.
I think a better solution to this problem should allow passing nested YAML structures (LIST and MAP should be enough) as config option values instead of inventing more customized syntaxes, as they're hard to write and parse.
There was a problem hiding this comment.
I think a better solution to this problem should allow passing nested YAML structures (LIST and MAP should be enough) as config option values instead of inventing more customized syntaxes, as they're hard to write and parse.
I agree with u , but i think that we should consider the difference with snapshot filters or cdc filters
There was a problem hiding this comment.
You are right. Actually, at the beginning I tried using nested YAML structures (LIST and MAP). However, I found that the YamlPipelineDefinitionParser.toSourceDef() method doesn’t support that; it only supports STRING values (because it uses Map<String, String> in Java).
I’m not sure whether this was intentional or simply a limitation of the current implementation. If I could use Map<String, Object>, I could switch to YAML structures, which is clearly more reasonable than parsing everything from strings.
# Conflicts: # docs/content/docs/connectors/flink-sources/mysql-cdc.md # docs/content/docs/connectors/pipeline-connectors/mysql.md # flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java # flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java # flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java # flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java # flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java # flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java # flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java # flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java # flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java # flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java # flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
|
This version adds support for configuring scan.snapshot.filters in the pipeline using a YAML list format. scan.snapshot.filters:
- table: db1.user_table_[0-9]+
filter: id > 100
- table: db[1-2].[app|web]_order_\.*
filter: city != 'China:beijing'Note: |
yuxiqian
left a comment
There was a problem hiding this comment.
It appears that modifying config methods to support nested types should be safe, since all existing connectors will only declare ConfigOptions with primitive types.
Would @lvyanquan like to take a look?
|
@lvyanquan Hi,Can you push this PR |
|
This functionality would be really helpful. What is the status of this PR? Can it be merged? |
There was a problem hiding this comment.
Pull request overview
Implements snapshot-row filtering for MySQL CDC snapshot reads, allowing users to restrict which rows are captured during the snapshot phase via configurable SQL WHERE-like predicates.
Changes:
- Add snapshot filter configuration for both the Table API MySQL CDC source (
scan.snapshot.filter) and the pipeline MySQL connector (scan.snapshot.filters). - Apply snapshot filters throughout snapshot chunk analysis and snapshot split reading (min/max, row counting, split scan query generation).
- Add/extend unit + IT coverage and update YAML/examples/docs.
Reviewed changes
Copilot reviewed 22 out of 22 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java | Extends table source factory tests to include the new snapshot filter option in expected source construction. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtilsTest.java | Adds unit coverage for table-pattern-to-filter matching. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java | Adds an integration test validating snapshot filtering behavior under failover. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java | Wires scan.snapshot.filter option into MySqlTableSource creation and optional options set. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java | Stores and forwards the snapshot filter into the parallel MySqlSource builder path. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java | Extends snapshot SQL building utilities to incorporate optional filters in min/max, row count, chunk boundary, and split scan queries. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtils.java | Introduces utility to resolve an applicable snapshot filter for a given table based on configured patterns. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceOptions.java | Adds scan.snapshot.filter config option for MySQL CDC source connector. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java | Adds snapshot filters storage + builder methods and passes them into MySqlSourceConfig. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java | Stores snapshot filters map in source config and exposes getter. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitter.java | Applies snapshot filter to min/max, row counting, and chunk boundary computation. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java | Adds builder method for setting snapshot filters. |
| flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java | Applies per-table snapshot filter when generating the split scan SQL. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java | Adds pipeline connector factory test coverage for snapshot filters parsing/propagation. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java | Adds scan.snapshot.filters option as a list of {table, filter} maps. |
| flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java | Parses scan.snapshot.filters and forwards into MySqlSourceConfigFactory. |
| flink-cdc-common/src/main/java/org/apache/flink/cdc/common/configuration/Configuration.java | Broadens fromMap to accept non-string values for YAML parsing use-cases. |
| flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml | Adds example scan.snapshot.filters YAML structure to the full pipeline definition fixture. |
| flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java | Updates parser tests to use non-string typed values and includes snapshot filters list in expected config. |
| flink-cdc-cli/src/main/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParser.java | Parses source config as Map<String,Object> to support structured/non-string YAML values. |
| docs/content/docs/connectors/pipeline-connectors/mysql.md | Documents the new pipeline connector snapshot filters option (but currently mismatched vs implementation). |
| docs/content/docs/connectors/flink-sources/mysql-cdc.md | Documents the new source connector scan.snapshot.filter option. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| .assignUnboundedChunkFirst(assignUnboundedChunkFirst) | ||
| .snapshotFilters( | ||
| escapeDot(database) + "." + escapeDot(tableName), | ||
| snapshotFilter) | ||
| .build(); |
There was a problem hiding this comment.
snapshotFilter is only forwarded into .snapshotFilters(...) when enableParallelRead is true. If a user sets scan.snapshot.filter while parallel snapshot is disabled, the option will currently be silently ignored. Consider either (1) validating and failing fast when the option is set but parallel read is disabled, or (2) implementing equivalent filtering for the non-parallel Debezium source path, and documenting the supported modes.
| List<Map<String, String>> snapshotFilters = config.get(SCAN_SNAPSHOT_FILTERS); | ||
| if (snapshotFilters != null && !snapshotFilters.isEmpty()) { | ||
| Map<String, String> snapshotFiltersMap = | ||
| snapshotFilters.stream() | ||
| .collect( | ||
| Collectors.toMap( | ||
| it -> it.get(SNAPSHOT_FILTER_TABLE_KEY), | ||
| it -> it.get(SNAPSHOT_FILTER_FILTER_KEY))); | ||
| LOG.info("Add snapshotFilters {}.", snapshotFiltersMap); | ||
| configFactory.snapshotFilters(snapshotFiltersMap); | ||
| } |
There was a problem hiding this comment.
scan.snapshot.filters is parsed via Collectors.toMap(...) without validating required keys (table/filter) and without handling duplicates. This can throw a generic NullPointerException/IllegalStateException on malformed or duplicate entries and also drops the original list order (which matters if patterns overlap). Consider validating each entry and building a LinkedHashMap with explicit duplicate handling (e.g., reject with a clear message).
| String filter = null; | ||
| for (Selectors selector : snapshotFilters.keySet()) { | ||
| if (selector.isMatch( | ||
| org.apache.flink.cdc.common.event.TableId.tableId( | ||
| tableId.catalog(), tableId.table()))) { | ||
| filter = snapshotFilters.get(selector); | ||
| break; | ||
| } |
There was a problem hiding this comment.
getSnapshotFilter picks the first matching selector from a HashMap keySet iteration, which is non-deterministic when multiple patterns match the same table. This can cause different filters to be applied across runs/JVMs. Please make matching deterministic (e.g., preserve user-defined order via LinkedHashMap/list, or define precedence like exact match > regex, or fail fast on multiple matches).
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtilsTest.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtils.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
…ut parallel read is disabled
…kedHashMap with explicit duplicate handling
…(queryMin / queryNextChunkMax)
|
@lvyanquan I've addressed the review comments and pushed the updates. |
|
@leonardBang PTAL,TKS. |
ruanhang1993
left a comment
There was a problem hiding this comment.
@uicosp Hi, the MySQL changes are significant, and it looks like there are still some issues that need further discussion. It may not be feasible to complete this in version 3.6. We can schedule this feature for the next release.
| } | ||
|
|
||
| @Test | ||
| void testAddSnapshotFilters() { |
There was a problem hiding this comment.
We need more tests for this feature(IT, E2E test).
There was a problem hiding this comment.
Added two tests in commit 85647b2:
- MySqlSourceITCase#testSnapshotFiltersMultipleTables — IT test
verifying per-table filters work independently across two tables
via the DataStream API. - MysqlE2eITCase#testSnapshotFilters — E2E test verifying the
full pipeline path using the YAML-based scan.snapshot.filters
configuration.
| options.add(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT); | ||
| options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST); | ||
| options.add(MySqlSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED); | ||
| options.add(MySqlSourceOptions.SCAN_SNAPSHOT_FILTER); |
There was a problem hiding this comment.
I am considering whether we should implement SupportsFilterPushDown for the sql connector instead of a new config.
There was a problem hiding this comment.
Thanks for the suggestion!
We actually discussed this with @ThorneANN earlier — the conclusion was that SupportsFilterPushDown is not suitable here because of the Incremental Snapshot framework's design.
The current config-based approach (scan.snapshot.filter) is intentional.
| <td style="word-wrap: break-word;">(none)</td> | ||
| <td>String</td> | ||
| <td>When reading a table snapshot, the rows of captured tables will be filtered using the specified filter expression (AKA a SQL WHERE clause). By default, no filter is applied, meaning the entire table will be synchronized. e.g. `id > 100`.</td> | ||
| </tr> |
There was a problem hiding this comment.
It is better to add a new use example for this feature.
There was a problem hiding this comment.
Added usage examples in commit cc477d5:
- docs/connectors/pipeline-connectors/mysql.md — Added "Snapshot
Filters" section with a YAML pipeline example demonstrating the
scan.snapshot.filters list syntax. - docs/connectors/flink-sources/mysql-cdc.md — Added "Snapshot
Filter" section with both SQL DDL and DataStream API examples
showing the scan.snapshot.filter option usage.
|
|
||
| String filter = | ||
| SnapshotFilterUtils.getSnapshotFilter( | ||
| sourceConfig.getSnapshotFilters(), table.id()); |
There was a problem hiding this comment.
Could we store the filters as a class member variable instead of constructing it on every invocation?
There was a problem hiding this comment.
Optimized filter lookup in commit 9e8b5f5:
- Moved snapshot filter resolution from
createDataEventsForTable() to the constructor of
MySqlSnapshotSplitReadTask. - The filter is now computed once and stored as a class member
variable (snapshotFilter), avoiding repeated lookups on every
method invocation.
|
Really looking forward to this functionality. |
Hi, I think that we do not to supple the |
Hey, this is an implementation designed to capture snapshot data with filtering conditions.
For example, by specifying
scan.snapshot.filters: db.user_table:id > 200;, we can synchronize only the user data where the id is greater than 200.issue link: https://issues.apache.org/jira/browse/FLINK-36165