Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions docs/content/docs/connectors/flink-sources/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,13 @@ Only valid for cdc 1.x version. During a snapshot operation, the connector will
For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.
</td>
</tr>
<tr>
<td>scan.snapshot.filter</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>When reading a table snapshot, the rows of captured tables will be filtered using the specified filter expression (AKA a SQL WHERE clause). By default, no filter is applied, meaning the entire table will be synchronized. e.g. `id > 100`.</td>
</tr>
</tbody>
</table>
</div>
Expand Down
11 changes: 11 additions & 0 deletions docs/content/docs/connectors/pipeline-connectors/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,17 @@ pipeline:
scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase.
</td>
</tr>
<tr>
<td>scan.snapshot.filters</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>When reading a table snapshot, the rows of captured tables will be filtered using the specified filter expression (AKA a SQL WHERE clause). <br>
By default, no filter is applied, meaning the entire table will be synchronized. <br>
A colon (:) separates table name and filter expression, while a semicolon (;) separate multiple filters,
e.g. `db1.user_table_[0-9]+:id > 100;db[1-2].[app|web]_order_\\.*:id < 0;`.
</td>
</tr>
Comment on lines +296 to +306
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about explicitly stating that the filter conditions are combined using AND and that it has nothing to do with the binlog step?

<tr>
<td>scan.parse.online.schema.changes.enabled</td>
<td>optional</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FILTERS;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_FILE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_SPECIFIC_OFFSET_GTID_SET;
Expand Down Expand Up @@ -285,6 +286,29 @@ public DataSource createDataSource(Context context) {
LOG.info("Add chunkKeyColumn {}.", chunkKeyColumnMap);
configFactory.chunkKeyColumn(chunkKeyColumnMap);
}
String snapshotFilters = config.get(SCAN_SNAPSHOT_FILTERS);
if (snapshotFilters != null) {
Map<String, String> snapshotFilterMap = new HashMap<>();
for (String snapshotFilter : snapshotFilters.split("(?<!\\\\);")) {
String[] splits = snapshotFilter.trim().split("(?<!\\\\):", 2);
if (splits.length == 2) {
// Handle escape characters
String key = splits[0].replace("\\:", ":").replace("\\;", ";").trim();
String value = splits[1].replace("\\:", ":").replace("\\;", ";").trim();
snapshotFilterMap.put(key, value);
} else {
throw new IllegalArgumentException(
SCAN_SNAPSHOT_FILTERS.key()
+ " = "
+ snapshotFilters
+ " failed to be parsed in this part '"
+ snapshotFilter
+ "'.");
}
}
LOG.info("Add snapshotFilters {}.", snapshotFilterMap);
configFactory.snapshotFilters(snapshotFilterMap);
}
String metadataList = config.get(METADATA_LIST);
List<MySqlReadableMetadata> readableMetadataList = listReadableMetadata(metadataList);
return new MySqlDataSource(configFactory, readableMetadataList);
Expand Down Expand Up @@ -352,6 +376,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND);
options.add(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED);
options.add(SCAN_SNAPSHOT_FILTERS);
options.add(METADATA_LIST);
options.add(INCLUDE_COMMENTS_ENABLED);
options.add(USE_LEGACY_JSON_FORMAT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,17 @@ public class MySqlDataSourceOptions {
+ "scan.newly-added-table.enabled: do re-snapshot & binlog-reading for newly added table when restored; \n"
+ "scan.binlog.newly-added-table.enabled: only do binlog-reading for newly added table during binlog reading phase.");

@Experimental
public static final ConfigOption<String> SCAN_SNAPSHOT_FILTERS =
ConfigOptions.key("scan.snapshot.filters")
.stringType()
.noDefaultValue()
.withDescription(
"When reading a table snapshot, the rows of captured tables will be filtered using the specified filter expression (AKA a SQL WHERE clause). "
+ "By default, no filter is applied, meaning the entire table will be synchronized. "
+ "A colon (:) separates table name and filter expression, while a semicolon (;) separate multiple filters, "
+ "e.g. `db1.user_table_[0-9]+:id > 100;db[1-2].[app|web]_order_\\.*:id < 0;`.");

@Experimental
public static final ConfigOption<String> METADATA_LIST =
ConfigOptions.key("metadata.list")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FILTERS;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TREAT_TINYINT1_AS_BOOLEAN_ENABLED;
Expand Down Expand Up @@ -330,6 +331,39 @@ void testAddChunkKeyColumns() {
});
}

@Test
public void testAddSnapshotFilters() {
inventoryDatabase.createAndInitialize();
Map<String, String> options = new HashMap<>();
options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
options.put(USERNAME.key(), TEST_USER);
options.put(PASSWORD.key(), TEST_PASSWORD);
options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".\\.*");
options.put(
SCAN_SNAPSHOT_FILTERS.key(),
inventoryDatabase.getDatabaseName()
+ ".multi_max_\\.*:id > 200;"
+ inventoryDatabase.getDatabaseName()
+ ".products:1 = 0;");
Factory.Context context = new MockContext(Configuration.fromMap(options));

MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);

assertThat(dataSource.getSourceConfig().getSnapshotFilters())
.isNotEmpty()
.isEqualTo(
new HashMap<String, String>() {
{
put(
inventoryDatabase.getDatabaseName() + ".multi_max_\\.*",
"id > 200");
put(inventoryDatabase.getDatabaseName() + ".products", "1 = 0");
}
});
}

class MockContext implements Factory.Context {

Configuration factoryConfiguration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import org.apache.flink.cdc.connectors.mysql.source.utils.SnapshotFilterUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.StatementUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks;

Expand Down Expand Up @@ -242,12 +243,24 @@ private void createDataEventsForTable(
long exportStart = clock.currentTimeInMillis();
LOG.info("Exporting data from split '{}' of table {}", snapshotSplit.splitId(), table.id());

String filter =
SnapshotFilterUtils.getSnapshotFilter(
sourceConfig.getSnapshotFilters(), table.id());
if (filter != null) {
LOG.info(
"Filter for split '{}' of table {} is: {}",
snapshotSplit.splitId(),
table.id(),
filter);
}

final String selectSql =
StatementUtils.buildSplitScanQuery(
snapshotSplit.getTableId(),
snapshotSplit.getSplitKeyType(),
snapshotSplit.getSplitStart() == null,
snapshotSplit.getSplitEnd() == null);
snapshotSplit.getSplitEnd() == null,
filter);
LOG.info(
"For split '{}' of table {} using select statement: '{}'",
snapshotSplit.splitId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,15 @@ public MySqlSourceBuilder<T> assignUnboundedChunkFirst(boolean assignUnboundedCh
return this;
}

/**
* When reading a table snapshot, the rows of captured tables will be filtered using the
* specified filter expression (AKA a SQL WHERE clause).
*/
public MySqlSourceBuilder<T> snapshotFilters(String table, String filter) {
this.configFactory.snapshotFilters(table, filter);
return this;
}

/**
* Build the {@link MySqlSource}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
import org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.ObjectUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.SnapshotFilterUtils;
import org.apache.flink.cdc.connectors.mysql.source.utils.StatementUtils;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
Expand Down Expand Up @@ -148,12 +149,16 @@ private void analyzeTable(MySqlPartition partition, TableId tableId) {
splitColumn =
ChunkUtils.getChunkKeyColumn(
currentSplittingTable, sourceConfig.getChunkKeyColumns());
String filter =
SnapshotFilterUtils.getSnapshotFilter(
sourceConfig.getSnapshotFilters(), tableId);
splitType =
ChunkUtils.getChunkKeyColumnType(
splitColumn, sourceConfig.isTreatTinyInt1AsBoolean());
minMaxOfSplitColumn =
StatementUtils.queryMinMax(jdbcConnection, tableId, splitColumn.name());
approximateRowCnt = StatementUtils.queryApproximateRowCnt(jdbcConnection, tableId);
StatementUtils.queryMinMax(jdbcConnection, tableId, splitColumn.name(), filter);
approximateRowCnt =
StatementUtils.queryRowCnt(jdbcConnection, tableId, splitColumn.name(), filter);
} catch (Exception e) {
throw new RuntimeException("Fail to analyze table in chunk splitter.", e);
}
Expand All @@ -171,6 +176,8 @@ private MySqlSnapshotSplit splitOneUnevenlySizedChunk(MySqlPartition partition,
nextChunkStart == ChunkSplitterState.ChunkBound.START_BOUND
? "null"
: chunkStartVal.toString());
String filter =
SnapshotFilterUtils.getSnapshotFilter(sourceConfig.getSnapshotFilters(), tableId);
// we start from [null, min + chunk_size) and avoid [null, min)
Object chunkEnd =
nextChunkEnd(
Expand All @@ -181,7 +188,8 @@ private MySqlSnapshotSplit splitOneUnevenlySizedChunk(MySqlPartition partition,
tableId,
splitColumn.name(),
minMaxOfSplitColumn[1],
chunkSize);
chunkSize,
filter);
// may sleep a while to avoid DDOS on MySQL server
maySleep(nextChunkId, tableId);
if (chunkEnd != null && ObjectUtils.compare(chunkEnd, minMaxOfSplitColumn[1]) <= 0) {
Expand Down Expand Up @@ -316,7 +324,6 @@ public List<ChunkRange> splitEvenlySizedChunks(
break;
}
}

// add the ending split
splits.add(ChunkRange.of(chunkStart, null));
return splits;
Expand All @@ -328,16 +335,17 @@ private Object nextChunkEnd(
TableId tableId,
String splitColumnName,
Object max,
int chunkSize)
int chunkSize,
@Nullable String filter)
throws SQLException {
// chunk end might be null when max values are removed
Object chunkEnd =
StatementUtils.queryNextChunkMax(
jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd);
jdbc, tableId, splitColumnName, chunkSize, previousChunkEnd, filter);
if (Objects.equals(previousChunkEnd, chunkEnd)) {
// we don't allow equal chunk start and end,
// should query the next one larger than chunkEnd
chunkEnd = StatementUtils.queryMin(jdbc, tableId, splitColumnName, chunkEnd);
chunkEnd = StatementUtils.queryMin(jdbc, tableId, splitColumnName, chunkEnd, filter);

// queryMin will return null when the chunkEnd is the max value,
// this will happen when the mysql table ignores the capitalization.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ public class MySqlSourceConfig implements Serializable {
private final Properties jdbcProperties;
private final Map<ObjectPath, String> chunkKeyColumns;
private final boolean skipSnapshotBackfill;
private final Map<String, String> snapshotFilters;
private final boolean parseOnLineSchemaChanges;
public static boolean useLegacyJsonFormat = true;
private final boolean assignUnboundedChunkFirst;
Expand Down Expand Up @@ -108,7 +109,8 @@ public class MySqlSourceConfig implements Serializable {
boolean parseOnLineSchemaChanges,
boolean treatTinyInt1AsBoolean,
boolean useLegacyJsonFormat,
boolean assignUnboundedChunkFirst) {
boolean assignUnboundedChunkFirst,
Map<String, String> snapshotFilters) {
this.hostname = checkNotNull(hostname);
this.port = port;
this.username = checkNotNull(username);
Expand Down Expand Up @@ -148,6 +150,7 @@ public class MySqlSourceConfig implements Serializable {
this.jdbcProperties = jdbcProperties;
this.chunkKeyColumns = chunkKeyColumns;
this.skipSnapshotBackfill = skipSnapshotBackfill;
this.snapshotFilters = snapshotFilters;
this.parseOnLineSchemaChanges = parseOnLineSchemaChanges;
this.treatTinyInt1AsBoolean = treatTinyInt1AsBoolean;
this.useLegacyJsonFormat = useLegacyJsonFormat;
Expand Down Expand Up @@ -285,4 +288,8 @@ public boolean isSkipSnapshotBackfill() {
public boolean isTreatTinyInt1AsBoolean() {
return treatTinyInt1AsBoolean;
}

public Map<String, String> getSnapshotFilters() {
return snapshotFilters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class MySqlSourceConfigFactory implements Serializable {
private Properties dbzProperties;
private Map<ObjectPath, String> chunkKeyColumns = new HashMap<>();
private boolean skipSnapshotBackfill = false;
private Map<String, String> snapshotFilters = new HashMap<>();
private boolean parseOnLineSchemaChanges = false;
private boolean treatTinyInt1AsBoolean = true;
private boolean useLegacyJsonFormat = true;
Expand Down Expand Up @@ -304,6 +305,24 @@ public MySqlSourceConfigFactory closeIdleReaders(boolean closeIdleReaders) {
return this;
}

/**
* When reading a table snapshot, the rows of captured tables will be filtered using the
* specified filter expression (AKA a SQL WHERE clause).
*/
public MySqlSourceConfigFactory snapshotFilters(String table, String filter) {
this.snapshotFilters.put(table, filter);
return this;
}

/**
* When reading a table snapshot, the rows of captured tables will be filtered using the
* specified filter expression (AKA a SQL WHERE clause).
*/
public MySqlSourceConfigFactory snapshotFilters(Map<String, String> snapshotFilters) {
this.snapshotFilters.putAll(snapshotFilters);
return this;
}

/** Whether to parse gh-ost/pt-osc utility generated schema change events. Defaults to false. */
public MySqlSourceConfigFactory parseOnLineSchemaChanges(boolean parseOnLineSchemaChanges) {
this.parseOnLineSchemaChanges = parseOnLineSchemaChanges;
Expand Down Expand Up @@ -421,6 +440,7 @@ public MySqlSourceConfig createConfig(int subtaskId, String serverName) {
parseOnLineSchemaChanges,
treatTinyInt1AsBoolean,
useLegacyJsonFormat,
assignUnboundedChunkFirst);
assignUnboundedChunkFirst,
snapshotFilters);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,15 @@ public class MySqlSourceOptions {
.withDescription(
"Whether to skip backfill in snapshot reading phase. If backfill is skipped, changes on captured tables during snapshot phase will be consumed later in binlog reading phase instead of being merged into the snapshot. WARNING: Skipping backfill might lead to data inconsistency because some binlog events happened within the snapshot phase might be replayed (only at-least-once semantic is promised). For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed binlog events should be handled specially.");

@Experimental
public static final ConfigOption<String> SCAN_SNAPSHOT_FILTER =
ConfigOptions.key("scan.snapshot.filter")
.stringType()
.noDefaultValue()
.withDescription(
"When reading a table snapshot, the rows of captured tables will be filtered using the specified filter expression (AKA a SQL WHERE clause). "
+ "By default, no filter is applied, meaning the entire table will be synchronized. e.g. `id > 100`");

@Experimental
public static final ConfigOption<Boolean> PARSE_ONLINE_SCHEMA_CHANGES =
ConfigOptions.key("scan.parse.online.schema.changes.enabled")
Expand Down
Loading
Loading