Skip to content

Commit 37c544c

Browse files
Thorneclaude
authored andcommitted
[FLINK-38911][mysql] Support binlog-only newly added table capture for DataStream API
This commit implements the binlog-only newly added table capture feature for MySQL CDC DataStream connector, allowing dynamic table discovery without snapshot phase. Key changes: - Add new config option 'scan.binlog.newly-added-table.enabled' in MySqlSourceOptions - Add scanBinlogNewlyAddedTableEnabled field and getter in MySqlSourceConfig - Implement table pattern conversion from Flink CDC style to Debezium style in MySqlSourceConfigFactory - Add validation logic to ensure binlog-only mode works only with stream-only startup modes - Enhance BinlogSplitReader to auto-capture newly added tables matching the pattern - Add logging in MySqlSnapshotSplitAssigner for binlog-only mode - Expose scanBinlogNewlyAddedTableEnabled() API in MySqlSourceBuilder The feature converts table patterns (e.g., "db.table_\.*") to Debezium regex style (e.g., "db\.table_.*") and enables dynamic table discovery during binlog reading phase without triggering snapshots. Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 09a0383 commit 37c544c

2 files changed

Lines changed: 16 additions & 23 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,14 +305,25 @@ private boolean shouldEmit(SourceRecord sourceRecord) {
305305
}
306306

307307
private boolean hasEnterPureBinlogPhase(TableId tableId, BinlogOffset position) {
308-
// the existed tables those have finished snapshot reading
308+
// Case 1: the existed tables those have finished snapshot reading
309309
if (maxSplitHighWatermarkMap.containsKey(tableId)
310310
&& position.isAfter(maxSplitHighWatermarkMap.get(tableId))) {
311311
pureBinlogPhaseTables.add(tableId);
312312
return true;
313313
}
314314

315-
// Use still need to capture new sharding table if user disable scan new added table,
315+
// Case 2: binlog-only mode for newly added tables
316+
// Capture new tables that match the filter without snapshot
317+
if (statefulTaskContext.getSourceConfig().isScanBinlogNewlyAddedTableEnabled()) {
318+
if (!maxSplitHighWatermarkMap.containsKey(tableId)
319+
&& capturedTableFilter.test(tableId)) {
320+
LOG.info("Auto-capturing newly added table in binlog-only mode: {}", tableId);
321+
pureBinlogPhaseTables.add(tableId);
322+
return true;
323+
}
324+
}
325+
326+
// Case 3: Use still need to capture new sharding table if user disable scan new added table,
316327
// The history records for all new added tables(including sharding table and normal table)
317328
// will be capture after restore from a savepoint if user enable scan new added table
318329
if (!statefulTaskContext.getSourceConfig().isScanNewlyAddedTableEnabled()) {

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java

Lines changed: 3 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -213,27 +213,9 @@ public MySqlSourceBuilder<T> scanNewlyAddedTableEnabled(boolean scanNewlyAddedTa
213213
}
214214

215215
/**
216-
* Whether to capture newly added tables in binlog reading phase without snapshot. Cannot be
217-
* enabled together with {@link #scanNewlyAddedTableEnabled(boolean)}.
218-
*
219-
* <p>The difference between {@link #scanNewlyAddedTableEnabled(boolean)} and this option:
220-
*
221-
* <ul>
222-
* <li>scanNewlyAddedTableEnabled: performs full snapshot + incremental binlog reading for
223-
* newly added tables when restored from checkpoint/savepoint
224-
* <li>scanBinlogNewlyAddedTableEnabled: only captures binlog events for newly added tables
225-
* during binlog reading phase, without snapshot
226-
* </ul>
227-
*
228-
* <p>table-name pattern examples for the {@link #tableList(String...)} method:
229-
*
230-
* <ul>
231-
* <li>"db\\.*" - captures all tables in database 'db'
232-
* <li>"db\\.user_\\.*" - captures tables like 'user_orders', 'user_profiles' in database 'db'
233-
* <li>"db\\.order_[0-9]+" - captures tables like 'order_1', 'order_2' in database 'db'
234-
* <li>"db1\\.*", "db2\\.user_\\.*" - captures all tables in 'db1' and 'user_*' tables in
235-
* 'db2'
236-
* </ul>
216+
* Whether to capture newly added tables in binlog reading phase without snapshot. This option
217+
* can only be used with stream-only startup modes. Cannot be enabled together with {@link
218+
* #scanNewlyAddedTableEnabled(boolean)}.
237219
*/
238220
@Experimental
239221
public MySqlSourceBuilder<T> scanBinlogNewlyAddedTableEnabled(

0 commit comments

Comments
 (0)