Skip to content

Commit 05dedaa

Browse files
committed
fix FLINK-38244. During the full snapshot phase, the field case is adjusted based on isTableIdCaseInsensitive.
Signed-off-by: peiyu <125331682@qq.com>
1 parent 2f59f35 commit 05dedaa

2 files changed

Lines changed: 22 additions & 5 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,23 +66,26 @@ public EventSourceProvider getEventSourceProvider() {
6666
.getBoolean(
6767
RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(),
6868
false);
69-
69+
boolean isTableIdCaseInsensitive = MySqlSchemaUtils.isTableIdCaseInsensitive(sourceConfig);
7070
MySqlEventDeserializer deserializer =
7171
new MySqlEventDeserializer(
7272
DebeziumChangelogMode.ALL,
7373
sourceConfig.isIncludeSchemaChanges(),
7474
readableMetadataList,
7575
includeComments,
7676
sourceConfig.isTreatTinyInt1AsBoolean(),
77-
MySqlSchemaUtils.isTableIdCaseInsensitive(sourceConfig));
77+
isTableIdCaseInsensitive);
7878

7979
MySqlSource<Event> source =
8080
new MySqlSource<>(
8181
configFactory,
8282
deserializer,
8383
(sourceReaderMetrics, sourceConfig) ->
8484
new MySqlPipelineRecordEmitter(
85-
deserializer, sourceReaderMetrics, sourceConfig));
85+
deserializer,
86+
sourceReaderMetrics,
87+
sourceConfig,
88+
isTableIdCaseInsensitive));
8689

8790
return FlinkSourceProvider.of(source);
8891
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,11 @@
5555
import java.util.HashMap;
5656
import java.util.HashSet;
5757
import java.util.List;
58+
import java.util.Locale;
5859
import java.util.Map;
5960
import java.util.Objects;
6061
import java.util.Set;
62+
import java.util.stream.Collectors;
6163

6264
import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection;
6365
import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getTableId;
@@ -80,6 +82,7 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter<Event> {
8082
// Used when startup mode is snapshot
8183
private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true;
8284
private boolean isBounded = false;
85+
private final boolean isTableIdCaseInsensitive;
8386

8487
private final DebeziumDeserializationSchema<Event> debeziumDeserializationSchema;
8588

@@ -88,7 +91,8 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter<Event> {
8891
public MySqlPipelineRecordEmitter(
8992
DebeziumDeserializationSchema<Event> debeziumDeserializationSchema,
9093
MySqlSourceReaderMetrics sourceReaderMetrics,
91-
MySqlSourceConfig sourceConfig) {
94+
MySqlSourceConfig sourceConfig,
95+
boolean isTableIdCaseInsensitive) {
9296
super(
9397
debeziumDeserializationSchema,
9498
sourceReaderMetrics,
@@ -102,6 +106,7 @@ public MySqlPipelineRecordEmitter(
102106
((DebeziumEventDeserializationSchema) debeziumDeserializationSchema)
103107
.getCreateTableEventCache();
104108
this.isBounded = StartupOptions.snapshot().equals(sourceConfig.getStartupOptions());
109+
this.isTableIdCaseInsensitive = isTableIdCaseInsensitive;
105110
}
106111

107112
@Override
@@ -261,7 +266,10 @@ private Schema buildSchemaFromTable(Table table) {
261266
for (int i = 0; i < columns.size(); i++) {
262267
Column column = columns.get(i);
263268

264-
String colName = column.name();
269+
String colName =
270+
this.isTableIdCaseInsensitive
271+
? column.name().toLowerCase(Locale.ROOT)
272+
: column.name();
265273
DataType dataType =
266274
MySqlTypeUtils.fromDbzColumn(column, sourceConfig.isTreatTinyInt1AsBoolean());
267275
if (!column.isOptional()) {
@@ -277,6 +285,12 @@ private Schema buildSchemaFromTable(Table table) {
277285

278286
List<String> primaryKey = table.primaryKeyColumnNames();
279287
if (Objects.nonNull(primaryKey) && !primaryKey.isEmpty()) {
288+
if (this.isTableIdCaseInsensitive) {
289+
primaryKey =
290+
primaryKey.stream()
291+
.map(key -> key.toLowerCase(Locale.ROOT))
292+
.collect(Collectors.toList());
293+
}
280294
tableBuilder.primaryKey(primaryKey);
281295
}
282296
return tableBuilder.build();

0 commit comments

Comments
 (0)