Skip to content

Commit 9a43277

Browse files
liziyanclaude
andcommitted
[FLINK-39749][mysql-cdc] Add scan.incremental.snapshot.string-key.compare-mode option
This commit introduces a new configuration option `scan.incremental.snapshot.string-key.compare-mode` to fix chunk splitting and binlog event routing issues when MySQL collation differs from Java's natural String ordering. Problem: - Java String.compareTo() is case-sensitive (Unicode code point order). - MySQL collations like utf8mb4_general_ci are case-insensitive. - This mismatch causes chunk boundaries computed by Java to diverge from actual MySQL row ordering, leading to premature unbounded chunks, overlapping splits, or lost binlog events. Solution: - Introduce ChunkKeyCompareMode enum: DEFAULT, CASE_INSENSITIVE, BINARY. - DEFAULT: preserves existing behavior (String.compareTo()). - CASE_INSENSITIVE: uses String.compareToIgnoreCase() for Java-side comparisons. - BINARY: injects BINARY keyword in SQL predicates and uses byte-level comparison in Java. Changes cover all three API layers: - DataStream API (MySqlSourceBuilder) - Flink SQL (MySqlTableSourceFactory) - Pipeline YAML (MySqlDataSourceFactory) Also updates documentation (EN + ZH) and adds test coverage. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 24ab548 commit 9a43277

22 files changed

Lines changed: 427 additions & 73 deletions

File tree

docs/content.zh/docs/connectors/flink-sources/mysql-cdc.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,19 @@ Flink SQL> SELECT * FROM orders;
441441
如果跳过 backfill ,快照阶段捕获表的更改将在稍后的 binlog 读取阶段被回放,而不是合并到快照中。<br>
442442
警告:跳过 backfill 可能会导致数据不一致,因为快照阶段发生的某些 binlog 事件可能会被重放(仅保证 at-least-once )。
443443
例如,更新快照阶段已更新的值,或删除快照阶段已删除的数据。这些重放的 binlog 事件应进行特殊处理。
444+
</td>
445+
</tr>
446+
<tr>
447+
<td>scan.incremental.snapshot.string-key.compare-mode</td>
448+
<td>optional</td>
449+
<td style="word-wrap: break-word;">default</td>
450+
<td>String</td>
451+
<td>
452+
增量快照阶段字符串类型 chunk key 的比较模式。可选值包括:
453+
<li><code>default</code>:使用 Java 默认字符串比较(区分大小写,Unicode 码点顺序)。可能与 MySQL 的 <code>utf8mb4_general_ci</code> 等大小写不敏感排序规则不一致。</li>
454+
<li><code>case-insensitive</code>:使用大小写不敏感比较(Java 中的 <code>compareToIgnoreCase</code>),适用于 MySQL 使用 <code>utf8mb4_general_ci</code> 等大小写不敏感排序规则的场景。</li>
455+
<li><code>binary</code>:在 SQL 查询和 Java 比较中均使用二进制比较,强制按字节精确匹配。适用于 MySQL 使用 <code>utf8mb4_bin</code> 或 <code>BINARY</code> 关键字等二进制比较的场景。</li>
456+
</td>
444457
</tr>
445458
<tr>
446459
<td>use.legacy.json.format</td>

docs/content.zh/docs/connectors/pipeline-connectors/mysql.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,19 @@ pipeline:
340340
如果跳过 backfill ,快照阶段捕获表的更改将在稍后的 binlog 读取阶段被回放,而不是合并到快照中。<br>
341341
警告:跳过 backfill 可能会导致数据不一致,因为快照阶段发生的某些 binlog 事件可能会被重放(仅保证 at-least-once )。
342342
例如,更新快照阶段已更新的值,或删除快照阶段已删除的数据。这些重放的 binlog 事件应进行特殊处理。
343+
</td>
344+
</tr>
345+
<tr>
346+
<td>scan.incremental.snapshot.string-key.compare-mode</td>
347+
<td>optional</td>
348+
<td style="word-wrap: break-word;">default</td>
349+
<td>String</td>
350+
<td>
351+
增量快照阶段字符串类型 chunk key 的比较模式。可选值包括:
352+
<li><code>default</code>:使用 Java 默认字符串比较(区分大小写,Unicode 码点顺序)。可能与 MySQL 的 <code>utf8mb4_general_ci</code> 等大小写不敏感排序规则不一致。</li>
353+
<li><code>case-insensitive</code>:使用大小写不敏感比较(Java 中的 <code>compareToIgnoreCase</code>),适用于 MySQL 使用 <code>utf8mb4_general_ci</code> 等大小写不敏感排序规则的场景。</li>
354+
<li><code>binary</code>:在 SQL 查询和 Java 比较中均使用二进制比较,强制按字节精确匹配。适用于 MySQL 使用 <code>utf8mb4_bin</code> 或 <code>BINARY</code> 关键字等二进制比较的场景。</li>
355+
</td>
343356
</tr>
344357
<tr>
345358
<td>metadata.list</td>

docs/content/docs/connectors/flink-sources/mysql-cdc.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,18 @@ Only valid for cdc 1.x version. During a snapshot operation, the connector will
468468
For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.
469469
</td>
470470
</tr>
471+
<tr>
472+
<td>scan.incremental.snapshot.string-key.compare-mode</td>
473+
<td>optional</td>
474+
<td style="word-wrap: break-word;">default</td>
475+
<td>String</td>
476+
<td>
477+
The compare mode for string type chunk key during incremental snapshot phase. Available values are:
478+
<li><code>default</code>: Use Java's default string comparison (case-sensitive, Unicode code point order). This may be inconsistent with MySQL's case-insensitive collations like <code>utf8mb4_general_ci</code>.</li>
479+
<li><code>case-insensitive</code>: Use case-insensitive comparison (<code>compareToIgnoreCase</code> in Java), suitable when MySQL uses case-insensitive collation such as <code>utf8mb4_general_ci</code>.</li>
480+
<li><code>binary</code>: Use binary comparison in both SQL queries and Java comparison, forcing byte-level exact match. Suitable when MySQL uses binary collation like <code>utf8mb4_bin</code> or <code>BINARY</code> keyword.</li>
481+
</td>
482+
</tr>
471483
</tbody>
472484
</table>
473485
</div>

docs/content/docs/connectors/pipeline-connectors/mysql.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,6 +362,18 @@ pipeline:
362362
For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.
363363
</td>
364364
</tr>
365+
<tr>
366+
<td>scan.incremental.snapshot.string-key.compare-mode</td>
367+
<td>optional</td>
368+
<td style="word-wrap: break-word;">default</td>
369+
<td>String</td>
370+
<td>
371+
The compare mode for string type chunk key during incremental snapshot phase. Available values are:
372+
<li><code>default</code>: Use Java's default string comparison (case-sensitive, Unicode code point order). This may be inconsistent with MySQL's case-insensitive collations like <code>utf8mb4_general_ci</code>.</li>
373+
<li><code>case-insensitive</code>: Use case-insensitive comparison (<code>compareToIgnoreCase</code> in Java), suitable when MySQL uses case-insensitive collation such as <code>utf8mb4_general_ci</code>.</li>
374+
<li><code>binary</code>: Use binary comparison in both SQL queries and Java comparison, forcing byte-level exact match. Suitable when MySQL uses binary collation like <code>utf8mb4_bin</code> or <code>BINARY</code> keyword.</li>
375+
</td>
376+
</tr>
365377
<tr>
366378
<td>metadata.list</td>
367379
<td>optional</td>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.flink.cdc.common.source.DataSource;
3131
import org.apache.flink.cdc.common.utils.StringUtils;
3232
import org.apache.flink.cdc.connectors.mysql.source.MySqlDataSource;
33+
import org.apache.flink.cdc.connectors.mysql.source.config.ChunkKeyCompareMode;
3334
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
3435
import org.apache.flink.cdc.connectors.mysql.source.config.ServerIdRange;
3536
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
@@ -80,6 +81,7 @@
8081
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP;
8182
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
8283
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
84+
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_STRING_KEY_COMPARE_MODE;
8385
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED;
8486
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED;
8587
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
@@ -167,6 +169,9 @@ public DataSource createDataSource(Context context) {
167169
boolean useLegacyJsonFormat = config.get(USE_LEGACY_JSON_FORMAT);
168170
boolean isAssignUnboundedChunkFirst =
169171
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
172+
ChunkKeyCompareMode chunkKeyCompareMode =
173+
ChunkKeyCompareMode.fromValue(
174+
config.get(SCAN_INCREMENTAL_SNAPSHOT_STRING_KEY_COMPARE_MODE));
170175

171176
validateIntegerOption(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE, splitSize, 1);
172177
validateIntegerOption(CHUNK_META_GROUP_SIZE, splitMetaGroupSize, 1);
@@ -220,6 +225,7 @@ public DataSource createDataSource(Context context) {
220225
.treatTinyInt1AsBoolean(treatTinyInt1AsBoolean)
221226
.useLegacyJsonFormat(useLegacyJsonFormat)
222227
.assignUnboundedChunkFirst(isAssignUnboundedChunkFirst)
228+
.chunkKeyCompareMode(chunkKeyCompareMode)
223229
.skipSnapshotBackfill(skipSnapshotBackfill);
224230

225231
List<TableId> tableIds = MySqlSchemaUtils.listTables(configFactory.createConfig(0), null);
@@ -358,6 +364,7 @@ public Set<ConfigOption<?>> optionalOptions() {
358364
options.add(PARSE_ONLINE_SCHEMA_CHANGES);
359365
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
360366
options.add(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
367+
options.add(SCAN_INCREMENTAL_SNAPSHOT_STRING_KEY_COMPARE_MODE);
361368
return options;
362369
}
363370

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,19 @@ public class MySqlDataSourceOptions {
224224
+ "By default, the chunk key is the first column of the primary key."
225225
+ "eg. db1.user_table_[0-9]+:col1;db[1-2].[app|web]_order_\\.*:col2;");
226226

227+
@Experimental
228+
public static final ConfigOption<String> SCAN_INCREMENTAL_SNAPSHOT_STRING_KEY_COMPARE_MODE =
229+
ConfigOptions.key("scan.incremental.snapshot.string-key.compare-mode")
230+
.stringType()
231+
.defaultValue("default")
232+
.withDescription(
233+
"The compare mode for string chunk key during incremental snapshot. Supported values are: "
234+
+ "'default' (uses Java String.compareTo), "
235+
+ "'case-insensitive' (uses Java String.compareToIgnoreCase to align with MySQL case-insensitive collations), "
236+
+ "'binary' (forces binary comparison in both MySQL SQL and Java). "
237+
+ "The 'case-insensitive' mode is recommended for tables using utf8mb4_general_ci or similar case-insensitive collations. "
238+
+ "The 'binary' mode ensures consistent byte-level ordering but may impact query performance during chunk splitting.");
239+
227240
@Experimental
228241
public static final ConfigOption<Boolean> SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED =
229242
ConfigOptions.key("scan.incremental.close-idle-reader.enabled")

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReader.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
2121
import org.apache.flink.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask;
2222
import org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
23+
import org.apache.flink.cdc.connectors.mysql.source.config.ChunkKeyCompareMode;
2324
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
2425
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
2526
import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
@@ -93,6 +94,7 @@ public class BinlogSplitReader implements DebeziumReader<SourceRecords, MySqlSpl
9394
new StoppableChangeEventSourceContext();
9495
private final boolean isParsingOnLineSchemaChanges;
9596
private final boolean isBackfillSkipped;
97+
private final ChunkKeyCompareMode chunkKeyCompareMode;
9698
private final Map<String, List<SourceRecord>> pendingSchemaChangeEvents;
9799

98100
private static final long READER_CLOSE_TIMEOUT = 30L;
@@ -116,6 +118,7 @@ public BinlogSplitReader(StatefulTaskContext statefulTaskContext, int subtaskId)
116118
this.isParsingOnLineSchemaChanges =
117119
statefulTaskContext.getSourceConfig().isParseOnLineSchemaChanges();
118120
this.isBackfillSkipped = statefulTaskContext.getSourceConfig().isSkipSnapshotBackfill();
121+
this.chunkKeyCompareMode = statefulTaskContext.getSourceConfig().getChunkKeyCompareMode();
119122
this.pendingSchemaChangeEvents = new HashMap<>();
120123
}
121124

@@ -316,7 +319,7 @@ private boolean shouldEmit(SourceRecord sourceRecord) {
316319

317320
FinishedSnapshotSplitInfo matchedSplit =
318321
SplitKeyUtils.findSplitByKeyBinary(
319-
finishedSplitsInfo.get(tableId), chunkKey);
322+
finishedSplitsInfo.get(tableId), chunkKey, chunkKeyCompareMode);
320323

321324
return matchedSplit != null && position.isAfter(matchedSplit.getHighWatermark());
322325
}
@@ -381,7 +384,12 @@ private void configureFilter() {
381384
}
382385
// Sort splits by splitStart for binary search optimization
383386
// Binary search requires sorted data to work correctly
384-
splitsInfoMap.values().forEach(SplitKeyUtils::sortFinishedSplitInfos);
387+
splitsInfoMap
388+
.values()
389+
.forEach(
390+
splits ->
391+
SplitKeyUtils.sortFinishedSplitInfos(
392+
splits, chunkKeyCompareMode));
385393
}
386394
this.finishedSplitsInfo = splitsInfoMap;
387395
this.maxSplitHighWatermarkMap = tableIdBinlogPositionMap;

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/SnapshotSplitReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,7 +360,8 @@ public Iterator<SourceRecords> pollWithBuffer() throws InterruptedException {
360360
currentSnapshotSplit.getSplitKeyType(),
361361
nameAdjuster,
362362
currentSnapshotSplit.getSplitStart(),
363-
currentSnapshotSplit.getSplitEnd());
363+
currentSnapshotSplit.getSplitEnd(),
364+
statefulTaskContext.getSourceConfig().getChunkKeyCompareMode());
364365
}
365366
}
366367
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/MySqlSnapshotSplitReadTask.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -247,7 +247,8 @@ private void createDataEventsForTable(
247247
snapshotSplit.getTableId(),
248248
snapshotSplit.getSplitKeyType(),
249249
snapshotSplit.getSplitStart() == null,
250-
snapshotSplit.getSplitEnd() == null);
250+
snapshotSplit.getSplitEnd() == null,
251+
sourceConfig.getChunkKeyCompareMode());
251252
LOG.info(
252253
"For split '{}' of table {} using select statement: '{}'",
253254
snapshotSplit.splitId(),

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.cdc.connectors.mysql.source;
1919

2020
import org.apache.flink.cdc.common.annotation.PublicEvolving;
21+
import org.apache.flink.cdc.connectors.mysql.source.config.ChunkKeyCompareMode;
2122
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
2223
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
2324
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
@@ -312,6 +313,12 @@ public MySqlSourceBuilder<T> assignUnboundedChunkFirst(boolean assignUnboundedCh
312313
return this;
313314
}
314315

316+
/** The compare mode for string chunk key during incremental snapshot. Defaults to 'default'. */
317+
public MySqlSourceBuilder<T> chunkKeyCompareMode(ChunkKeyCompareMode chunkKeyCompareMode) {
318+
this.configFactory.chunkKeyCompareMode(chunkKeyCompareMode);
319+
return this;
320+
}
321+
315322
/**
316323
* Build the {@link MySqlSource}.
317324
*

0 commit comments

Comments
 (0)