Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
8a8d65c
[FLINK-36165][source-connector/mysql] Support capturing snapshot data…
Dec 3, 2024
f1b8776
[FLINK-36165][source-connector/mysql] add docs
Dec 4, 2024
ec83fbe
[FLINK-36165][source-connector/mysql] Implement snapshot filter for M…
Dec 5, 2024
96b885a
[FLINK-36165][source-connector/mysql] Escape dot
Dec 5, 2024
36ae871
[FLINK-36165][source-connector/mysql] support colon and semicolon in …
Aug 29, 2025
e59f5e8
Merge branch 'master' into FLINK-36165
Sep 5, 2025
8ee4c92
[FLINK-36165][source-connector/mysql] fix test
Sep 5, 2025
5f913f4
[FLINK-36165][source-connector/mysql] fix spotless
Sep 5, 2025
e50667d
[FLINK-36165][source-connector/mysql] fix spotless
Sep 5, 2025
e6dd29e
[FLINK-36165][source-connector/mysql] use yaml list/map to define sna…
Sep 5, 2025
ed9ddcc
Update docs/content/docs/connectors/pipeline-connectors/mysql.md
uicosp Jan 29, 2026
794513b
Update flink-cdc-connect/flink-cdc-source-connectors/flink-connector-…
uicosp Jan 29, 2026
3af4044
Update flink-cdc-connect/flink-cdc-source-connectors/flink-connector-…
uicosp Jan 29, 2026
ca12a3f
Update flink-cdc-connect/flink-cdc-source-connectors/flink-connector-…
uicosp Jan 29, 2026
c025e29
Update flink-cdc-connect/flink-cdc-source-connectors/flink-connector-…
uicosp Jan 29, 2026
3d360c9
validating and failing fast when scan.snapshot.filter option is set b…
Jan 29, 2026
67fae6d
validating each entry in scan.snapshot.filters and and building a Lin…
Jan 29, 2026
b7f13fd
fix non-deterministic snapshot filter selection when multiple pattern…
Jan 29, 2026
72724f1
wrap user filter in parentheses when combining with split conditions …
Jan 29, 2026
8ae2003
fix MySqlTableSourceFactoryTest
Jan 29, 2026
2a0bdf2
fix SnaphostFilterUtilsTest
Jan 29, 2026
85647b2
add more tests for this feature (IT, E2E test)
Apr 9, 2026
cc477d5
add a new use example for this feature
Apr 9, 2026
9e8b5f5
store the filters as a class member variable instead of constructing …
Apr 9, 2026
0d14a1b
Merge branch 'master' into FLINK-36165
uicosp Apr 9, 2026
2d8a8ca
resolve merge conflicts
Apr 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions docs/content/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,13 @@ Only valid for cdc 1.x version. During a snapshot operation, the connector will
For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.
</td>
</tr>
<tr>
<td>scan.snapshot.filter</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>When reading a table snapshot, the rows of captured tables will be filtered using the specified filter expression (AKA a SQL WHERE clause). By default, no filter is applied, meaning the entire table will be synchronized. e.g. `id > 100`.</td>
</tr>
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is better to add a new use example for this feature.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added usage examples in commit cc477d5:

  • docs/connectors/pipeline-connectors/mysql.md — Added "Snapshot
    Filters" section with a YAML pipeline example demonstrating the
    scan.snapshot.filters list syntax.
  • docs/connectors/flink-sources/mysql-cdc.md — Added "Snapshot
    Filter" section with both SQL DDL and DataStream API examples
    showing the scan.snapshot.filter option usage.

</tbody>
</table>
</div>
Expand Down Expand Up @@ -929,6 +936,45 @@ CREATE TABLE products (
`binary_data` field in the database is of type VARBINARY(N). In some scenarios, we need to convert binary data to base64 encoded string data. This feature can be enabled by adding the parameter 'debezium.binary.handling.mode'='base64',
By adding this parameter, we can map the binary field type to 'STRING' in Flink SQL, thereby obtaining base64 encoded string data.

### Snapshot Filter

The `scan.snapshot.filter` option allows you to apply a SQL WHERE clause during snapshot reading, so that only matching rows are synchronized. This is useful for partial initial loads (e.g. migrating only recent data) without modifying the source tables. Binlog events are still captured for all rows after the snapshot phase, regardless of the filter.

**SQL DDL Example**

```sql
CREATE TABLE products (
id INT NOT NULL,
name STRING,
description STRING,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '123456',
'database-name' = 'mydb',
'table-name' = 'products',
'scan.snapshot.filter' = 'id > 100'
);
```

**DataStream API Example**

```java
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("yourHostname")
.port(yourPort)
.databaseList("mydb")
.tableList("mydb.products")
.username("yourUsername")
.password("yourPassword")
.snapshotFilters("mydb.products", "id > 100")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
```

### Available Source metrics

Metrics can help understand the progress of assignments, and the following are the supported [Flink metrics](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/):
Expand Down
42 changes: 42 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,16 @@ pipeline:
List of readable metadata from SourceRecord to be passed to downstream and could be used in transform module, split by `,`. Available readable metadata are: op_ts.
</td>
</tr>
<tr>
<td>scan.snapshot.filters</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>List&lt;Map&lt;String, String&gt;&gt;</td>
<td>When reading a table snapshot, the rows of captured tables will be filtered using the specified filter expressions (SQL WHERE clauses). <br>
This option is configured as a list of filter specifications, where each specification is represented as a map of string key-value pairs (as shown in the example configuration below). <br>
By default, no filter is applied, meaning the entire table will be synchronized.
</td>
</tr>
</tbody>
</table>
</div>
Expand Down Expand Up @@ -405,6 +415,38 @@ source:
# ...
```

## Snapshot Filters

The `scan.snapshot.filters` option allows you to apply per-table SQL WHERE clauses during snapshot reading, so that only matching rows are synchronized. This is useful when you want to perform a partial initial load (e.g. migrating only recent data) without modifying the source tables.

Each entry in the list specifies a `table` pattern (using the same format as the `tables` option) and a `filter` expression. Rows that do not satisfy the expression are skipped during snapshot reading. Binlog events are still captured for all rows after the snapshot phase, regardless of the filter.

For example, to read only orders placed after 2024-01-01 from `mydb.orders` and only active users from `mydb.users`:

```yaml
source:
type: mysql
hostname: 127.0.0.1
port: 3306
username: admin
password: pass
tables: mydb\..*
server-id: 5401-5404
scan.snapshot.filters:
- table: mydb.orders
filter: order_date >= '2024-01-01'
- table: mydb.users
filter: status = 'active'

sink:
type: doris
# ...

pipeline:
name: Filtered MySQL to Doris Pipeline
parallelism: 4
```

## Available Source metrics

Metrics can help understand the progress of assignments, and the following are the supported [Flink metrics](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/metrics/):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,18 @@ private PipelineDef parse(JsonNode pipelineDefJsonNode, Configuration globalPipe
}

private SourceDef toSourceDef(JsonNode sourceNode) {
Map<String, String> sourceMap =
mapper.convertValue(sourceNode, new TypeReference<Map<String, String>>() {});
Map<String, Object> sourceMap =
mapper.convertValue(sourceNode, new TypeReference<Map<String, Object>>() {});

// "type" field is required
String type =
checkNotNull(
sourceMap.remove(TYPE_KEY),
(String) sourceMap.remove(TYPE_KEY),
"Missing required field \"%s\" in source configuration",
TYPE_KEY);

// "name" field is optional
String name = sourceMap.remove(NAME_KEY);
String name = (String) sourceMap.remove(NAME_KEY);

return new SourceDef(type, name, Configuration.fromMap(sourceMap));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.cdc.composer.definition.UdfDef;
import org.apache.flink.core.fs.Path;

import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableSet;
import org.apache.flink.shaded.guava31.com.google.common.io.Resources;
Expand Down Expand Up @@ -370,9 +371,9 @@ private void testSchemaEvolutionTypesParsing(
"mysql",
"source-database",
Configuration.fromMap(
ImmutableMap.<String, String>builder()
ImmutableMap.<String, Object>builder()
.put("host", "localhost")
.put("port", "3306")
.put("port", 3306)
.put("username", "admin")
.put("password", "pass")
.put(
Expand All @@ -381,7 +382,24 @@ private void testSchemaEvolutionTypesParsing(
.put(
"chunk-column",
"app_order_.*:id,web_order:product_id")
.put("capture-new-tables", "true")
.put("capture-new-tables", true)
.put(
"scan.snapshot.filters",
ImmutableList.of(
ImmutableMap.<String, String>builder()
.put(
"table",
"db1.user_table_[0-9]+")
.put("filter", "id > 100")
.build(),
ImmutableMap.<String, String>builder()
.put(
"table",
"db[1-2].[app|web]_order_\\.*")
.put(
"filter",
"city != 'China:beijing'")
.build()))
.build())),
new SinkDef(
"kafka",
Expand Down Expand Up @@ -508,9 +526,9 @@ void testParsingFullDefinitionFromString() throws Exception {
"mysql",
"source-database",
Configuration.fromMap(
ImmutableMap.<String, String>builder()
ImmutableMap.<String, Object>builder()
.put("host", "localhost")
.put("port", "3306")
.put("port", 3306)
.put("username", "admin")
.put("password", "pass")
.put(
Expand All @@ -519,7 +537,24 @@ void testParsingFullDefinitionFromString() throws Exception {
.put(
"chunk-column",
"app_order_.*:id,web_order:product_id")
.put("capture-new-tables", "true")
.put("capture-new-tables", true)
.put(
"scan.snapshot.filters",
ImmutableList.of(
ImmutableMap.<String, String>builder()
.put(
"table",
"db1.user_table_[0-9]+")
.put("filter", "id > 100")
.build(),
ImmutableMap.<String, String>builder()
.put(
"table",
"db[1-2].[app|web]_order_\\.*")
.put(
"filter",
"city != 'China:beijing'")
.build()))
.build())),
new SinkDef(
"kafka",
Expand Down Expand Up @@ -587,9 +622,9 @@ void testParsingFullDefinitionFromString() throws Exception {
"mysql",
null,
Configuration.fromMap(
ImmutableMap.<String, String>builder()
ImmutableMap.<String, Object>builder()
.put("host", "localhost")
.put("port", "3306")
.put("port", 3306)
.put("username", "admin")
.put("password", "pass")
.put(
Expand Down Expand Up @@ -650,9 +685,9 @@ void testParsingFullDefinitionFromString() throws Exception {
"mysql",
"source-database",
Configuration.fromMap(
ImmutableMap.<String, String>builder()
ImmutableMap.<String, Object>builder()
.put("host", "localhost")
.put("port", "3306")
.put("port", 3306)
.put("username", "admin")
.put("password", "pass")
.put(
Expand All @@ -661,7 +696,7 @@ void testParsingFullDefinitionFromString() throws Exception {
.put(
"chunk-column",
"app_order_.*:id,web_order:product_id")
.put("capture-new-tables", "true")
.put("capture-new-tables", true)
.build())),
new SinkDef(
"kafka",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ source:
tables: adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*
chunk-column: app_order_.*:id,web_order:product_id
capture-new-tables: true
scan.snapshot.filters:
- table: db1.user_table_[0-9]+
filter: id > 100
- table: db[1-2].[app|web]_order_\.*
filter: city != 'China:beijing'

sink:
type: kafka
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public Configuration(Configuration other) {
}

/** Creates a new configuration that is initialized with the options of the given map. */
public static Configuration fromMap(Map<String, String> map) {
public static Configuration fromMap(Map<String, ?> map) {
final Configuration configuration = new Configuration();
configuration.confData.putAll(map);
return configuration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -83,6 +84,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FILTERS;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET;
Expand Down Expand Up @@ -112,6 +114,8 @@ public class MySqlDataSourceFactory implements DataSourceFactory {
private static final Logger LOG = LoggerFactory.getLogger(MySqlDataSourceFactory.class);

public static final String IDENTIFIER = "mysql";
public static final String SNAPSHOT_FILTER_TABLE_KEY = "table";
public static final String SNAPSHOT_FILTER_FILTER_KEY = "filter";

@Override
public DataSource createDataSource(Context context) {
Expand Down Expand Up @@ -284,11 +288,78 @@ public DataSource createDataSource(Context context) {
LOG.info("Add chunkKeyColumn {}.", chunkKeyColumnMap);
configFactory.chunkKeyColumn(chunkKeyColumnMap);
}

List<Map<String, String>> snapshotFilters = config.get(SCAN_SNAPSHOT_FILTERS);
if (snapshotFilters != null && !snapshotFilters.isEmpty()) {
Map<String, String> snapshotFiltersMap =
parseAndValidateSnapshotFilters(snapshotFilters);
LOG.info("Add snapshotFilters {}.", snapshotFiltersMap);
configFactory.snapshotFilters(snapshotFiltersMap);
}
Comment on lines +292 to +298
Copy link

Copilot AI Jan 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scan.snapshot.filters is parsed via Collectors.toMap(...) without validating required keys (table/filter) and without handling duplicates. This can throw a generic NullPointerException/IllegalStateException on malformed or duplicate entries and also drops the original list order (which matters if patterns overlap). Consider validating each entry and building a LinkedHashMap with explicit duplicate handling (e.g., reject with a clear message).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed 67fae6d


String metadataList = config.get(METADATA_LIST);
List<MySqlReadableMetadata> readableMetadataList = listReadableMetadata(metadataList);
return new MySqlDataSource(configFactory, readableMetadataList);
}

/**
* Parses and validates snapshot filters configuration.
*
* @param snapshotFilters List of filter entries, each containing 'table' and 'filter' keys
* @return LinkedHashMap preserving insertion order, mapping table patterns to filter
* expressions
* @throws ValidationException If any entry is missing required keys or contains duplicate table
* patterns
*/
private Map<String, String> parseAndValidateSnapshotFilters(
List<Map<String, String>> snapshotFilters) {
Map<String, String> result = new LinkedHashMap<>();

for (int i = 0; i < snapshotFilters.size(); i++) {
Map<String, String> entry = snapshotFilters.get(i);

// Validate required keys
String table = entry.get(SNAPSHOT_FILTER_TABLE_KEY);
String filter = entry.get(SNAPSHOT_FILTER_FILTER_KEY);

if (table == null || table.trim().isEmpty()) {
throw new ValidationException(
String.format(
"Snapshot filter entry at index %d is missing required key '%s'. "
+ "Each entry must contain both '%s' and '%s' keys.",
i,
SNAPSHOT_FILTER_TABLE_KEY,
SNAPSHOT_FILTER_TABLE_KEY,
SNAPSHOT_FILTER_FILTER_KEY));
}

if (filter == null || filter.trim().isEmpty()) {
throw new ValidationException(
String.format(
"Snapshot filter entry at index %d is missing required key '%s'. "
+ "Each entry must contain both '%s' and '%s' keys.",
i,
SNAPSHOT_FILTER_FILTER_KEY,
SNAPSHOT_FILTER_TABLE_KEY,
SNAPSHOT_FILTER_FILTER_KEY));
}

// Check for duplicates
if (result.containsKey(table)) {
throw new ValidationException(
String.format(
"Duplicate table pattern '%s' found in snapshot filters at index %d. "
+ "Each table pattern can only appear once. "
+ "Previous definition: '%s', Current definition: '%s'.",
table, i, result.get(table), filter));
}

result.put(table, filter);
}

return result;
}

private List<MySqlReadableMetadata> listReadableMetadata(String metadataList) {
if (StringUtils.isNullOrWhitespaceOnly(metadataList)) {
return new ArrayList<>();
Expand Down Expand Up @@ -358,6 +429,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(PARSE_ONLINE_SCHEMA_CHANGES);
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
options.add(SCAN_SNAPSHOT_FILTERS);
return options;
}

Expand Down
Loading