> cache =
+ new ConcurrentHashMap<>();
+
+ /**
+ * Converts the given filters to a map keyed by {@link Selectors}, caching the result.
+ *
+ * The cache is backed by a {@link ConcurrentHashMap} to be safe under concurrent access. To
+ * avoid using a mutable {@link Map} as the cache key, an immutable copy of the input filters is
+ * created and used as the key.
+ *
+ *
Uses {@link LinkedHashMap} to preserve insertion order, ensuring deterministic matching
+ * when multiple patterns could match the same table.
+ */
+ private static Map toSelector(Map filters) {
+ // Create an immutable copy of the filters to avoid using a mutable map as the cache key.
+ // Use LinkedHashMap to preserve the user-defined order.
+ Map immutableFilters =
+ Collections.unmodifiableMap(new LinkedHashMap<>(filters));
+
+ return cache.computeIfAbsent(
+ immutableFilters,
+ key -> {
+ // Use LinkedHashMap to preserve insertion order for deterministic matching
+ Map snapshotFilters = new LinkedHashMap<>();
+ key.forEach(
+ (table, filter) -> {
+ Selectors selector =
+ new Selectors.SelectorsBuilder()
+ .includeTables(table)
+ .build();
+ snapshotFilters.put(selector, filter);
+ });
+ return snapshotFilters;
+ });
+ }
+
+ public static String getSnapshotFilter(Map filters, TableId tableId) {
+ Map snapshotFilters = toSelector(filters);
+
+ String filter = null;
+ for (Selectors selector : snapshotFilters.keySet()) {
+ if (selector.isMatch(
+ org.apache.flink.cdc.common.event.TableId.tableId(
+ tableId.catalog(), tableId.table()))) {
+ filter = snapshotFilters.get(selector);
+ break;
+ }
+ }
+ return filter;
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java
index c910cbfdc37..ad5fd1ce439 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java
@@ -22,6 +22,8 @@
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
+import javax.annotation.Nullable;
+
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.Connection;
@@ -38,12 +40,15 @@ public class StatementUtils {
private StatementUtils() {}
- public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String columnName)
+ public static Object[] queryMinMax(
+ JdbcConnection jdbc, TableId tableId, String columnName, @Nullable String filter)
throws SQLException {
final String minMaxQuery =
String.format(
"SELECT MIN(%s), MAX(%s) FROM %s",
- quote(columnName), quote(columnName), quote(tableId));
+ quote(columnName),
+ quote(columnName),
+ filter != null ? quote(tableId) + " WHERE " + filter : quote(tableId));
return jdbc.queryAndMap(
minMaxQuery,
rs -> {
@@ -58,6 +63,29 @@ public static Object[] queryMinMax(JdbcConnection jdbc, TableId tableId, String
});
}
+ public static Long queryRowCnt(
+ JdbcConnection jdbc, TableId tableId, String columnName, @Nullable String filter)
+ throws SQLException {
+
+ if (filter == null) {
+ return queryApproximateRowCnt(jdbc, tableId);
+ }
+
+ final String cntQuery =
+ String.format("SELECT COUNT(1) FROM %s WHERE (%s)", quote(tableId), filter);
+ return jdbc.queryAndMap(
+ cntQuery,
+ rs -> {
+ if (!rs.next()) {
+ // this should never happen
+ throw new SQLException(
+ String.format(
+ "No result returned after running query [%s]", cntQuery));
+ }
+ return rs.getLong(1);
+ });
+ }
+
public static long queryApproximateRowCnt(JdbcConnection jdbc, TableId tableId)
throws SQLException {
// The statement used to get approximate row count which is less
@@ -97,12 +125,20 @@ public static void setSafeObject(PreparedStatement ps, int parameterIndex, Objec
}
public static Object queryMin(
- JdbcConnection jdbc, TableId tableId, String columnName, Object excludedLowerBound)
+ JdbcConnection jdbc,
+ TableId tableId,
+ String columnName,
+ Object excludedLowerBound,
+ @Nullable String filter)
throws SQLException {
final String minQuery =
String.format(
"SELECT MIN(%s) FROM %s WHERE %s > ?",
- quote(columnName), quote(tableId), quote(columnName));
+ quote(columnName),
+ quote(tableId),
+ filter != null
+ ? "(" + filter + ") AND " + quote(columnName)
+ : quote(columnName));
return jdbc.prepareQueryAndMap(
minQuery,
ps -> setSafeObject(ps, 1, excludedLowerBound),
@@ -122,7 +158,8 @@ public static Object queryNextChunkMax(
TableId tableId,
String splitColumnName,
int chunkSize,
- Object includedLowerBound)
+ Object includedLowerBound,
+ @Nullable String filter)
throws SQLException {
String quotedColumn = quote(splitColumnName);
String query =
@@ -133,7 +170,7 @@ public static Object queryNextChunkMax(
quotedColumn,
quotedColumn,
quote(tableId),
- quotedColumn,
+ filter != null ? "(" + filter + ") AND " + quotedColumn : quotedColumn,
quotedColumn,
chunkSize);
return jdbc.prepareQueryAndMap(
@@ -151,8 +188,12 @@ public static Object queryNextChunkMax(
}
public static String buildSplitScanQuery(
- TableId tableId, RowType pkRowType, boolean isFirstSplit, boolean isLastSplit) {
- return buildSplitQuery(tableId, pkRowType, isFirstSplit, isLastSplit, -1, true);
+ TableId tableId,
+ RowType pkRowType,
+ boolean isFirstSplit,
+ boolean isLastSplit,
+ @Nullable String filter) {
+ return buildSplitQuery(tableId, pkRowType, isFirstSplit, isLastSplit, -1, true, filter);
}
private static String buildSplitQuery(
@@ -161,8 +202,9 @@ private static String buildSplitQuery(
boolean isFirstSplit,
boolean isLastSplit,
int limitSize,
- boolean isScanningData) {
- final String condition;
+ boolean isScanningData,
+ @Nullable String filter) {
+ String condition;
if (isFirstSplit && isLastSplit) {
condition = null;
@@ -192,6 +234,10 @@ private static String buildSplitQuery(
condition = sql.toString();
}
+ if (filter != null) {
+ condition = condition == null ? filter : "(" + filter + ") AND " + condition;
+ }
+
if (isScanningData) {
return buildSelectWithRowLimits(
tableId, limitSize, "*", Optional.ofNullable(condition), Optional.empty());
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
index 2a1f0519435..6278c4dcbb7 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSource.java
@@ -103,6 +103,7 @@ public class MySqlTableSource implements ScanTableSource, SupportsReadingMetadat
private final boolean assignUnboundedChunkFirst;
private final boolean appendOnly;
+ private final String snapshotFilter;
// --------------------------------------------------------------------------------------------
// Mutable attributes
@@ -144,7 +145,8 @@ public MySqlTableSource(
boolean parseOnlineSchemaChanges,
boolean useLegacyJsonFormat,
boolean assignUnboundedChunkFirst,
- boolean appendOnly) {
+ boolean appendOnly,
+ @Nullable String snapshotFilter) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
@@ -178,6 +180,7 @@ public MySqlTableSource(
this.useLegacyJsonFormat = useLegacyJsonFormat;
this.assignUnboundedChunkFirst = assignUnboundedChunkFirst;
this.appendOnly = appendOnly;
+ this.snapshotFilter = snapshotFilter;
}
@Override
@@ -241,6 +244,9 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.parseOnLineSchemaChanges(parseOnlineSchemaChanges)
.useLegacyJsonFormat(useLegacyJsonFormat)
.assignUnboundedChunkFirst(assignUnboundedChunkFirst)
+ .snapshotFilters(
+ escapeDot(database) + "." + escapeDot(tableName),
+ snapshotFilter)
.build();
return SourceProvider.of(parallelSource);
} else {
@@ -330,7 +336,8 @@ public DynamicTableSource copy() {
parseOnlineSchemaChanges,
useLegacyJsonFormat,
assignUnboundedChunkFirst,
- appendOnly);
+ appendOnly,
+ snapshotFilter);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
@@ -376,7 +383,8 @@ public boolean equals(Object o) {
&& parseOnlineSchemaChanges == that.parseOnlineSchemaChanges
&& useLegacyJsonFormat == that.useLegacyJsonFormat
&& assignUnboundedChunkFirst == that.assignUnboundedChunkFirst
- && Objects.equals(appendOnly, that.appendOnly);
+ && Objects.equals(appendOnly, that.appendOnly)
+ && Objects.equals(snapshotFilter, that.snapshotFilter);
}
@Override
@@ -413,7 +421,8 @@ public int hashCode() {
parseOnlineSchemaChanges,
useLegacyJsonFormat,
assignUnboundedChunkFirst,
- appendOnly);
+ appendOnly,
+ snapshotFilter);
}
@Override
@@ -438,4 +447,8 @@ Properties getParallelDbzProperties(Properties dbzProperties) {
}
return newDbzProperties;
}
+
+ private String escapeDot(String str) {
+ return str.replace(".", "\\.");
+ }
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
index 5ea430d94e7..72a7a3b12ac 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
@@ -109,6 +109,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
boolean appendOnly =
config.get(MySqlSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
+ String snapshotFilter = config.get(MySqlSourceOptions.SCAN_SNAPSHOT_FILTER);
if (enableParallelRead) {
validatePrimaryKeyIfEnableParallel(physicalSchema, chunkKeyColumn);
@@ -124,6 +125,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
MySqlSourceOptions.CONNECT_TIMEOUT, connectTimeout, Duration.ofMillis(250));
}
+ validateSnapshotFilterWithParallelRead(snapshotFilter, enableParallelRead);
+
OptionUtils.printOptions(IDENTIFIER, config.toMap());
return new MySqlTableSource(
@@ -156,7 +159,8 @@ public DynamicTableSource createDynamicTableSource(Context context) {
parseOnLineSchemaChanges,
useLegacyJsonFormat,
assignUnboundedChunkFirst,
- appendOnly);
+ appendOnly,
+ snapshotFilter);
}
@Override
@@ -206,6 +210,7 @@ public Set> optionalOptions() {
options.add(MySqlSourceOptions.USE_LEGACY_JSON_FORMAT);
options.add(MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST);
options.add(MySqlSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
+ options.add(MySqlSourceOptions.SCAN_SNAPSHOT_FILTER);
return options;
}
@@ -385,6 +390,28 @@ private void validateDistributionFactorLower(double distributionFactorLower) {
distributionFactorLower));
}
+ /**
+ * Checks that snapshot filter is only used when parallel read is enabled.
+ *
+ * @param snapshotFilter The snapshot filter expression
+ * @param enableParallelRead Whether parallel read is enabled
+ * @throws ValidationException If snapshot filter is set but parallel read is disabled
+ */
+ private void validateSnapshotFilterWithParallelRead(
+ @Nullable String snapshotFilter, boolean enableParallelRead) {
+ if (snapshotFilter != null && !snapshotFilter.isEmpty() && !enableParallelRead) {
+ throw new ValidationException(
+ String.format(
+ "Option '%s' can only be used when '%s' is enabled. "
+ + "Either enable parallel snapshot reading by setting '%s' to true, "
+ + "or remove the '%s' option.",
+ MySqlSourceOptions.SCAN_SNAPSHOT_FILTER.key(),
+ MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED.key(),
+ MySqlSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED.key(),
+ MySqlSourceOptions.SCAN_SNAPSHOT_FILTER.key()));
+ }
+ }
+
/** Replaces the default timezone placeholder with session timezone, if applicable. */
private static ZoneId getServerTimeZone(ReadableConfig config) {
final String serverTimeZone = config.get(MySqlSourceOptions.SERVER_TIME_ZONE);
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java
index 62f51e49512..7911150e0b7 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java
@@ -423,7 +423,7 @@ void testSnapshotSplitReadingFailCrossCheckpoints(String tableName, String chunk
RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0);
// The sleeping source will sleep awhile after send per record
- MySqlSource sleepingSource = buildSleepingSource(tableName, chunkColumnName);
+ MySqlSource sleepingSource = buildSleepingSource(tableName, chunkColumnName, null);
DataStreamSource source =
env.fromSource(sleepingSource, WatermarkStrategy.noWatermarks(), "selfSource");
@@ -496,6 +496,191 @@ void testSnapshotSplitReadingFailCrossCheckpoints(String tableName, String chunk
jobClient.cancel().get();
}
+ @ParameterizedTest
+ @MethodSource("parameters")
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ void testSnapshotFilters(String tableName, String chunkColumnName) throws Exception {
+ customDatabase.createAndInitialize();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ env.enableCheckpointing(5000L);
+ RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0);
+
+ // Filter user with `id > 200`
+ // The sleeping source will sleep awhile after send per record
+ MySqlSource sleepingSource =
+ buildSleepingSource(tableName, chunkColumnName, "id > 200");
+ DataStreamSource source =
+ env.fromSource(sleepingSource, WatermarkStrategy.noWatermarks(), "selfSource");
+
+ String[] expectedSnapshotData =
+ new String[] {
+ "+I[1009, user_10, Shanghai, 123567891234]",
+ "+I[1010, user_11, Shanghai, 123567891234]",
+ "+I[1011, user_12, Shanghai, 123567891234]",
+ "+I[1012, user_13, Shanghai, 123567891234]",
+ "+I[1013, user_14, Shanghai, 123567891234]",
+ "+I[1014, user_15, Shanghai, 123567891234]",
+ "+I[1015, user_16, Shanghai, 123567891234]",
+ "+I[1016, user_17, Shanghai, 123567891234]",
+ "+I[1017, user_18, Shanghai, 123567891234]",
+ "+I[1018, user_19, Shanghai, 123567891234]",
+ "+I[1019, user_20, Shanghai, 123567891234]",
+ "+I[2000, user_21, Shanghai, 123567891234]"
+ };
+ TypeSerializer serializer =
+ source.getTransformation().getOutputType().createSerializer(env.getConfig());
+ String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+ CollectSinkOperatorFactory factory =
+ new CollectSinkOperatorFactory(serializer, accumulatorName);
+ CollectSinkOperator operator = (CollectSinkOperator) factory.getOperator();
+ CollectResultIterator iterator =
+ new CollectResultIterator(
+ operator.getOperatorIdFuture(),
+ serializer,
+ accumulatorName,
+ env.getCheckpointConfig(),
+ 10000L);
+ CollectStreamSink sink = new CollectStreamSink(source, factory);
+ sink.name("Data stream collect sink");
+ env.addOperator(sink.getTransformation());
+ JobClient jobClient = env.executeAsync("snapshotSplitTest");
+ iterator.setJobClient(jobClient);
+ JobID jobId = jobClient.getJobID();
+
+ // Trigger failover once some snapshot records has been sent by sleeping source
+ if (iterator.hasNext()) {
+ triggerFailover(
+ FailoverType.JM,
+ jobId,
+ miniClusterResource.get().getMiniCluster(),
+ () -> sleepMs(100));
+ }
+
+ // Check all snapshot records are sent with exactly-once semantics
+ assertEqualsInAnyOrder(
+ Arrays.asList(expectedSnapshotData),
+ fetchRowData(iterator, expectedSnapshotData.length));
+ Assertions.assertThat(hasNextData(iterator)).isFalse();
+ jobClient.cancel().get();
+ }
+
+ @Test
+ @SuppressWarnings({"rawtypes", "unchecked"})
+ void testSnapshotFiltersMultipleTables() throws Exception {
+ customDatabase.createAndInitialize();
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(DEFAULT_PARALLELISM);
+ env.enableCheckpointing(5000L);
+ RestartStrategyUtils.configureFixedDelayRestartStrategy(env, 1, 0);
+
+ ResolvedSchema physicalSchema =
+ new ResolvedSchema(
+ Arrays.asList(
+ Column.physical("id", DataTypes.BIGINT().notNull()),
+ Column.physical("name", DataTypes.STRING()),
+ Column.physical("address", DataTypes.STRING()),
+ Column.physical("phone_number", DataTypes.STRING())),
+ new ArrayList<>(),
+ UniqueConstraint.primaryKey("pk", Collections.singletonList("id")));
+ RowType physicalDataType =
+ (RowType) physicalSchema.toPhysicalRowDataType().getLogicalType();
+ final TypeInformation typeInfo = InternalTypeInfo.of(physicalDataType);
+ RowDataDebeziumDeserializeSchema deserializer =
+ RowDataDebeziumDeserializeSchema.newBuilder()
+ .setPhysicalRowType(physicalDataType)
+ .setMetadataConverters(new MetadataConverter[0])
+ .setResultTypeInfo(typeInfo)
+ .setServerTimeZone(ZoneId.of("UTC"))
+ .setUserDefinedConverterFactory(
+ MySqlDeserializationConverterFactory.instance())
+ .build();
+
+ // Apply different filters to customers (id > 200) and customers_1 (id < 200)
+ MySqlSource source =
+ MySqlSource.builder()
+ .hostname(MYSQL_CONTAINER.getHost())
+ .port(MYSQL_CONTAINER.getDatabasePort())
+ .databaseList(customDatabase.getDatabaseName())
+ .tableList(
+ customDatabase.getDatabaseName() + ".customers",
+ customDatabase.getDatabaseName() + ".customers_1")
+ .username(customDatabase.getUsername())
+ .password(customDatabase.getPassword())
+ .serverTimeZone("UTC")
+ .serverId(getServerId())
+ .splitSize(8096)
+ .fetchSize(1024)
+ .connectTimeout(Duration.ofSeconds(30))
+ .debeziumProperties(new Properties())
+ .startupOptions(StartupOptions.initial())
+ .deserializer(deserializer)
+ .snapshotFilters(
+ customDatabase.getDatabaseName() + ".customers", "id > 200")
+ .snapshotFilters(
+ customDatabase.getDatabaseName() + ".customers_1", "id < 200")
+ .build();
+
+ DataStreamSource stream =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(), "multiTableSource");
+
+ // customers with id > 200: 1009-1019, 2000 (12 rows)
+ // customers_1 with id < 200: 101, 102, 103, 109, 110, 111, 118, 121, 123 (9 rows)
+ int expectedCount = 12 + 9;
+
+ TypeSerializer serializer =
+ stream.getTransformation().getOutputType().createSerializer(env.getConfig());
+ String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+ CollectSinkOperatorFactory factory =
+ new CollectSinkOperatorFactory(serializer, accumulatorName);
+ CollectSinkOperator operator = (CollectSinkOperator) factory.getOperator();
+ CollectResultIterator iterator =
+ new CollectResultIterator(
+ operator.getOperatorIdFuture(),
+ serializer,
+ accumulatorName,
+ env.getCheckpointConfig(),
+ 10000L);
+ CollectStreamSink sink = new CollectStreamSink(stream, factory);
+ sink.name("Data stream collect sink");
+ env.addOperator(sink.getTransformation());
+ JobClient jobClient = env.executeAsync("snapshotFiltersMultiTableTest");
+ iterator.setJobClient(jobClient);
+
+ List actual = fetchRowData(iterator, expectedCount);
+ // customers: id > 200
+ assertThat(actual)
+ .containsAll(
+ Arrays.asList(
+ "+I[1009, user_10, Shanghai, 123567891234]",
+ "+I[1010, user_11, Shanghai, 123567891234]",
+ "+I[1011, user_12, Shanghai, 123567891234]",
+ "+I[1012, user_13, Shanghai, 123567891234]",
+ "+I[1013, user_14, Shanghai, 123567891234]",
+ "+I[1014, user_15, Shanghai, 123567891234]",
+ "+I[1015, user_16, Shanghai, 123567891234]",
+ "+I[1016, user_17, Shanghai, 123567891234]",
+ "+I[1017, user_18, Shanghai, 123567891234]",
+ "+I[1018, user_19, Shanghai, 123567891234]",
+ "+I[1019, user_20, Shanghai, 123567891234]",
+ "+I[2000, user_21, Shanghai, 123567891234]"));
+ // customers_1: id < 200
+ assertThat(actual)
+ .containsAll(
+ Arrays.asList(
+ "+I[101, user_1, Shanghai, 123567891234]",
+ "+I[102, user_2, Shanghai, 123567891234]",
+ "+I[103, user_3, Shanghai, 123567891234]",
+ "+I[109, user_4, Shanghai, 123567891234]",
+ "+I[110, user_5, Shanghai, 123567891234]",
+ "+I[111, user_6, Shanghai, 123567891234]",
+ "+I[118, user_7, Shanghai, 123567891234]",
+ "+I[121, user_8, Shanghai, 123567891234]",
+ "+I[123, user_9, Shanghai, 123567891234]"));
+ Assertions.assertThat(hasNextData(iterator)).isFalse();
+ jobClient.cancel().get();
+ }
+
@ParameterizedTest
@MethodSource("parameters")
void testStartFromEarliestOffset(String tableName, String chunkColumnName) throws Exception {
@@ -1041,7 +1226,8 @@ private CollectResultIterator addCollector(
return iterator;
}
- private MySqlSource buildSleepingSource(String tableName, String chunkColumnName) {
+ private MySqlSource buildSleepingSource(
+ String tableName, String chunkColumnName, String snapshotFilter) {
ResolvedSchema physicalSchema =
new ResolvedSchema(
Arrays.asList(
@@ -1093,6 +1279,7 @@ private MySqlSource buildSleepingSource(String tableName, String chunkC
.chunkKeyColumn(
new ObjectPath(customDatabase.getDatabaseName(), tableName),
chunkColumnName)
+ .snapshotFilters(customDatabase.getDatabaseName() + "." + tableName, snapshotFilter)
.build();
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java
index 0e4f369bb81..98776ac9681 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlChunkSplitterTest.java
@@ -135,7 +135,7 @@ public T prepareQueryAndMap(
int chunkSize = 5;
Object result =
- splitter.nextChunkEnd(jdbc, previousChunkEnd, tableId, "id", max, chunkSize);
+ splitter.nextChunkEnd(jdbc, previousChunkEnd, tableId, "id", max, chunkSize, null);
// when queryNextChunkMax returns null, nextChunkEnd should also return null
// instead of propagating the null further and causing errors
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtilsTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtilsTest.java
new file mode 100644
index 00000000000..00a2f7aea79
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/SnapshotFilterUtilsTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source.utils;
+
+import io.debezium.relational.TableId;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+/** Unit test for {@link org.apache.flink.cdc.connectors.mysql.source.utils.SnapshotFilterUtils}. */
+public class SnapshotFilterUtilsTest {
+
+ @Test
+ public void testGetSnapshotFilter() {
+ Map map = new HashMap<>();
+ map.put("db.user", "id > 100");
+ map.put("db.order_[0-9]+", "id > 200");
+ Assertions.assertThat(SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.user")))
+ .isEqualTo("id > 100");
+ Assertions.assertThat(
+ SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.order_1")))
+ .isEqualTo("id > 200");
+ Assertions.assertThat(
+ SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.order_2")))
+ .isEqualTo("id > 200");
+ Assertions.assertThat(SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.shop")))
+ .isNull();
+ }
+
+ @Test
+ public void testGetSnapshotFilterPreservesOrder() {
+ // Use LinkedHashMap to ensure deterministic order
+ Map map = new LinkedHashMap<>();
+ map.put("db.table_a", "id > 100");
+ map.put("db.table_b", "id > 200");
+ map.put("db.table_c", "id > 300");
+
+ // Verify each table matches its corresponding filter
+ Assertions.assertThat(
+ SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.table_a")))
+ .isEqualTo("id > 100");
+ Assertions.assertThat(
+ SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.table_b")))
+ .isEqualTo("id > 200");
+ Assertions.assertThat(
+ SnapshotFilterUtils.getSnapshotFilter(map, TableId.parse("db.table_c")))
+ .isEqualTo("id > 300");
+
+ // Test with regex patterns - non-overlapping patterns
+ Map regexMap = new LinkedHashMap<>();
+ regexMap.put("db.order_[0-9]+", "id > 100");
+ regexMap.put("db.user_[0-9]+", "id > 200");
+ regexMap.put("db.product_[0-9]+", "id > 300");
+
+ Assertions.assertThat(
+ SnapshotFilterUtils.getSnapshotFilter(
+ regexMap, TableId.parse("db.order_1")))
+ .isEqualTo("id > 100");
+ Assertions.assertThat(
+ SnapshotFilterUtils.getSnapshotFilter(regexMap, TableId.parse("db.user_2")))
+ .isEqualTo("id > 200");
+ Assertions.assertThat(
+ SnapshotFilterUtils.getSnapshotFilter(
+ regexMap, TableId.parse("db.product_3")))
+ .isEqualTo("id > 300");
+
+ // Test with overlapping patterns - this is the critical case that reproduces the original
+ // bug
+ // Multiple patterns match the same table, should return the first one in insertion order
+ Map overlappingMap = new LinkedHashMap<>();
+ overlappingMap.put("db.order_[0-9]+", "id > 100"); // Broader pattern - matches order_1
+ overlappingMap.put("db.order_1", "id > 200"); // More specific - also matches order_1
+ overlappingMap.put("db.order_[1-9]", "id > 300"); // Also matches order_1
+
+ // All three patterns match db.order_1, but should consistently return the first one
+ Assertions.assertThat(
+ SnapshotFilterUtils.getSnapshotFilter(
+ overlappingMap, TableId.parse("db.order_1")))
+ .isEqualTo("id > 100"); // First pattern wins
+
+ // Verify this is deterministic by calling multiple times
+ for (int i = 0; i < 10; i++) {
+ Assertions.assertThat(
+ SnapshotFilterUtils.getSnapshotFilter(
+ overlappingMap, TableId.parse("db.order_1")))
+ .isEqualTo("id > 100");
+ }
+ }
+}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
index 01c2dff84da..cd26e076157 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java
@@ -129,7 +129,8 @@ void testCommonProperties() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
- false);
+ false,
+ null);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -179,7 +180,8 @@ void testEnableParallelReadSource() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
- false);
+ false,
+ null);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -225,7 +227,8 @@ void testEnableParallelReadSourceWithSingleServerId() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
- false);
+ false,
+ null);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -269,7 +272,8 @@ void testEnableParallelReadSourceLatestOffset() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
- false);
+ false,
+ null);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -290,6 +294,8 @@ void testOptionalProperties() {
options.put("scan.incremental.close-idle-reader.enabled", "true");
options.put("scan.incremental.snapshot.backfill.skip", "true");
options.put("use.legacy.json.format", "true");
+ options.put("scan.incremental.snapshot.enabled", "true");
+ options.put("scan.snapshot.filter", "id > 200");
DynamicTableSource actualSource = createTableSource(options);
Properties dbzProperties = new Properties();
@@ -311,7 +317,7 @@ void testOptionalProperties() {
ZoneId.of("Asia/Shanghai"),
dbzProperties,
"4321",
- false,
+ true,
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE.defaultValue(),
CHUNK_META_GROUP_SIZE.defaultValue(),
SCAN_SNAPSHOT_FETCH_SIZE.defaultValue(),
@@ -330,7 +336,8 @@ void testOptionalProperties() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
true,
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
- false);
+ false,
+ "id > 200");
Assertions.assertThat(actualSource)
.isEqualTo(expectedSource)
.isInstanceOf(MySqlTableSource.class);
@@ -389,7 +396,8 @@ void testStartupFromSpecificOffset() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
- false);
+ false,
+ null);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -431,7 +439,8 @@ void testStartupFromInitial() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
- false);
+ false,
+ null);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -474,7 +483,8 @@ void testStartupFromEarliestOffset() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
- false);
+ false,
+ null);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -518,7 +528,8 @@ void testStartupFromSpecificTimestamp() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
- false);
+ false,
+ null);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -560,7 +571,8 @@ void testStartupFromLatestOffset() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
- false);
+ false,
+ null);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -607,7 +619,8 @@ void testMetadataColumns() {
PARSE_ONLINE_SCHEMA_CHANGES.defaultValue(),
USE_LEGACY_JSON_FORMAT.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST.defaultValue(),
- false);
+ false,
+ null);
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys = Arrays.asList("op_ts", "database_name");
@@ -768,6 +781,17 @@ void testValidation() {
String.format(
"The table-name '%s' is not a valid regular expression",
"*_invalid_table"));
+
+ // validate snapshot filter requires parallel read enabled
+ Assertions.assertThatThrownBy(
+ () -> {
+ Map properties = getAllOptions();
+ properties.put("scan.incremental.snapshot.enabled", "false");
+ properties.put("scan.snapshot.filter", "id > 100");
+ createTableSource(properties);
+ })
+ .hasStackTraceContaining(
+ "Option 'scan.snapshot.filter' can only be used when 'scan.incremental.snapshot.enabled' is enabled");
}
@Test
@@ -810,7 +834,8 @@ void testEnablingExperimentalOptions() {
true,
true,
true,
- false);
+ false,
+ null);
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
index 7b551db8333..651f72e0789 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlE2eITCase.java
@@ -535,4 +535,54 @@ void testDanglingDropTableEventInBinlog() throws Exception {
"CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}",
"DataChangeEvent{tableId=%s.products, before=[106, hammer, 16oz carpenter's hammer, 1.0, null, null, null], after=[106, hammer, 18oz carpenter hammer, 1.0, null, null, null], op=UPDATE, meta=()}");
}
+
+ @Test
+ void testSnapshotFilters() throws Exception {
+ String pipelineJob =
+ String.format(
+ "source:\n"
+ + " type: mysql\n"
+ + " hostname: %s\n"
+ + " port: 3306\n"
+ + " username: %s\n"
+ + " password: %s\n"
+ + " tables: %s.\\.*\n"
+ + " server-id: 5400-5404\n"
+ + " server-time-zone: UTC\n"
+ + " scan.snapshot.filters:\n"
+ + " - table: %s.customers\n"
+ + " filter: id > 102\n"
+ + " - table: %s.products\n"
+ + " filter: id < 105\n"
+ + "\n"
+ + "sink:\n"
+ + " type: values\n"
+ + "\n"
+ + "pipeline:\n"
+ + " parallelism: %d",
+ INTER_CONTAINER_MYSQL_ALIAS,
+ MYSQL_TEST_USER,
+ MYSQL_TEST_PASSWORD,
+ mysqlInventoryDatabase.getDatabaseName(),
+ mysqlInventoryDatabase.getDatabaseName(),
+ mysqlInventoryDatabase.getDatabaseName(),
+ parallelism);
+
+ submitPipelineJob(pipelineJob);
+ waitUntilJobRunning(Duration.ofSeconds(30));
+ LOG.info("Pipeline job is running");
+
+ // customers: only id > 102 (id=103, 104)
+ // products: only id < 105 (id=101, 102, 103, 104)
+ validateResult(
+ dbNameFormatter,
+ "CreateTableEvent{tableId=%s.customers, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}",
+ "DataChangeEvent{tableId=%s.customers, before=[], after=[103, user_3, Shanghai, 123567891234], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.customers, before=[], after=[104, user_4, Shanghai, 123567891234], op=INSERT, meta=()}",
+ "CreateTableEvent{tableId=%s.products, schema=columns={`id` INT NOT NULL,`name` VARCHAR(255) NOT NULL 'flink',`description` VARCHAR(512),`weight` FLOAT,`enum_c` STRING 'red',`json_c` STRING,`point_c` STRING}, primaryKeys=id, options=()}",
+ "DataChangeEvent{tableId=%s.products, before=[], after=[101, scooter, Small 2-wheel scooter, 3.14, red, {\"key1\": \"value1\"}, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.products, before=[], after=[102, car battery, 12V car battery, 8.1, white, {\"key2\": \"value2\"}, {\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.products, before=[], after=[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, red, {\"key3\": \"value3\"}, {\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}",
+ "DataChangeEvent{tableId=%s.products, before=[], after=[104, hammer, 12oz carpenter's hammer, 0.75, white, {\"key4\": \"value4\"}, {\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}], op=INSERT, meta=()}");
+ }
}