Skip to content

Commit f2f73cf

Browse files
sd4324530ThorneANN
authored andcommitted
[FLINK-38244][hotfix] Fix the full snapshot phase, the field case is adjusted based on isTableIdCaseInsensitive (apache#4284)
1 parent 829904a commit f2f73cf

6 files changed

Lines changed: 114 additions & 37 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,23 +66,26 @@ public EventSourceProvider getEventSourceProvider() {
6666
.getBoolean(
6767
RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(),
6868
false);
69-
69+
boolean isTableIdCaseInsensitive = MySqlSchemaUtils.isTableIdCaseInsensitive(sourceConfig);
7070
MySqlEventDeserializer deserializer =
7171
new MySqlEventDeserializer(
7272
DebeziumChangelogMode.ALL,
7373
sourceConfig.isIncludeSchemaChanges(),
7474
readableMetadataList,
7575
includeComments,
7676
sourceConfig.isTreatTinyInt1AsBoolean(),
77-
MySqlSchemaUtils.isTableIdCaseInsensitive(sourceConfig));
77+
isTableIdCaseInsensitive);
7878

7979
MySqlSource<Event> source =
8080
new MySqlSource<>(
8181
configFactory,
8282
deserializer,
8383
(sourceReaderMetrics, sourceConfig) ->
8484
new MySqlPipelineRecordEmitter(
85-
deserializer, sourceReaderMetrics, sourceConfig));
85+
deserializer,
86+
sourceReaderMetrics,
87+
sourceConfig,
88+
isTableIdCaseInsensitive));
8689

8790
return FlinkSourceProvider.of(source);
8891
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,11 @@
5555
import java.util.HashMap;
5656
import java.util.HashSet;
5757
import java.util.List;
58+
import java.util.Locale;
5859
import java.util.Map;
5960
import java.util.Objects;
6061
import java.util.Set;
62+
import java.util.stream.Collectors;
6163

6264
import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.openJdbcConnection;
6365
import static org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils.getTableId;
@@ -80,6 +82,7 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter<Event> {
8082
// Used when startup mode is snapshot
8183
private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true;
8284
private boolean isBounded = false;
85+
private final boolean isTableIdCaseInsensitive;
8386

8487
private final DebeziumDeserializationSchema<Event> debeziumDeserializationSchema;
8588

@@ -88,7 +91,8 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter<Event> {
8891
public MySqlPipelineRecordEmitter(
8992
DebeziumDeserializationSchema<Event> debeziumDeserializationSchema,
9093
MySqlSourceReaderMetrics sourceReaderMetrics,
91-
MySqlSourceConfig sourceConfig) {
94+
MySqlSourceConfig sourceConfig,
95+
boolean isTableIdCaseInsensitive) {
9296
super(
9397
debeziumDeserializationSchema,
9498
sourceReaderMetrics,
@@ -102,6 +106,7 @@ public MySqlPipelineRecordEmitter(
102106
((DebeziumEventDeserializationSchema) debeziumDeserializationSchema)
103107
.getCreateTableEventCache();
104108
this.isBounded = StartupOptions.snapshot().equals(sourceConfig.getStartupOptions());
109+
this.isTableIdCaseInsensitive = isTableIdCaseInsensitive;
105110
}
106111

107112
@Override
@@ -261,7 +266,10 @@ private Schema buildSchemaFromTable(Table table) {
261266
for (int i = 0; i < columns.size(); i++) {
262267
Column column = columns.get(i);
263268

264-
String colName = column.name();
269+
String colName =
270+
this.isTableIdCaseInsensitive
271+
? column.name().toLowerCase(Locale.ROOT)
272+
: column.name();
265273
DataType dataType =
266274
MySqlTypeUtils.fromDbzColumn(column, sourceConfig.isTreatTinyInt1AsBoolean());
267275
if (!column.isOptional()) {
@@ -277,6 +285,12 @@ private Schema buildSchemaFromTable(Table table) {
277285

278286
List<String> primaryKey = table.primaryKeyColumnNames();
279287
if (Objects.nonNull(primaryKey) && !primaryKey.isEmpty()) {
288+
if (this.isTableIdCaseInsensitive) {
289+
primaryKey =
290+
primaryKey.stream()
291+
.map(key -> key.toLowerCase(Locale.ROOT))
292+
.collect(Collectors.toList());
293+
}
280294
tableBuilder.primaryKey(primaryKey);
281295
}
282296
return tableBuilder.build();

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ void testExcludeTable() {
135135
Arrays.asList(
136136
inventoryDatabase.getDatabaseName() + ".customers",
137137
inventoryDatabase.getDatabaseName() + ".multi_max_table",
138-
inventoryDatabase.getDatabaseName() + ".products"));
138+
inventoryDatabase.getDatabaseName() + ".products",
139+
inventoryDatabase.getDatabaseName() + ".uppercase_products"));
139140
}
140141

141142
@Test

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ void testSqlInjection() throws Exception {
292292
+ " (default,\"hammer\",\"14oz carpenter's hammer\",0.875),\n"
293293
+ " (default,\"hammer\",\"16oz carpenter's hammer\",1.0),\n"
294294
+ " (default,\"rocks\",\"box of assorted rocks\",5.3),\n"
295-
+ " (default,\"jacket\",\"water resistent black wind breaker\",0.1),\n"
295+
+ " (default,\"jacket\",\"water resistant black wind breaker\",0.1),\n"
296296
+ " (default,\"spare tire\",\"24 inch spare tire\",22.2);",
297297
StatementUtils.quote(inventoryDatabase.getDatabaseName()),
298298
StatementUtils.quote(sqlInjectionTable)));
@@ -540,8 +540,8 @@ void testExcludeTables(boolean inBatch) throws Exception {
540540
.tableList(databaseName + ".*")
541541
.excludeTableList(
542542
String.format(
543-
"%s.customers, %s.orders, %s.multi_max_table",
544-
databaseName, databaseName, databaseName))
543+
"%s.customers, %s.orders, %s.multi_max_table, %s.uppercase_products",
544+
databaseName, databaseName, databaseName, databaseName))
545545
.startupOptions(StartupOptions.initial())
546546
.serverId(getServerId(env.getParallelism()))
547547
.serverTimeZone("UTC")
@@ -1835,7 +1835,7 @@ private List<Event> getSnapshotExpected(TableId tableId) {
18351835
108,
18361836
BinaryStringData.fromString("jacket"),
18371837
BinaryStringData.fromString(
1838-
"water resistent black wind breaker"),
1838+
"water resistant black wind breaker"),
18391839
0.1f
18401840
})));
18411841
snapshotExpected.add(

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlTableIdCaseInsensitveITCase.java

Lines changed: 65 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@
4444
import org.junit.jupiter.api.AfterAll;
4545
import org.junit.jupiter.api.BeforeAll;
4646
import org.junit.jupiter.api.BeforeEach;
47-
import org.junit.jupiter.api.Test;
47+
import org.junit.jupiter.params.ParameterizedTest;
48+
import org.junit.jupiter.params.provider.ValueSource;
4849
import org.testcontainers.lifecycle.Startables;
4950

5051
import java.sql.Connection;
@@ -96,8 +97,10 @@ public void before() {
9697
env.setRestartStrategy(RestartStrategies.noRestart());
9798
}
9899

99-
@Test
100-
public void testParseAlterStatementWhenTableNameAndColumnIsUpper() throws Exception {
100+
@ParameterizedTest
101+
@ValueSource(strings = {"products", "uppercase_products"})
102+
public void testParseAlterStatementWhenTableNameAndColumnIsUpper(String tableName)
103+
throws Exception {
101104
env.setParallelism(1);
102105
inventoryDatabase.createAndInitialize();
103106
MySqlSourceConfigFactory configFactory =
@@ -107,7 +110,7 @@ public void testParseAlterStatementWhenTableNameAndColumnIsUpper() throws Except
107110
.username(TEST_USER)
108111
.password(TEST_PASSWORD)
109112
.databaseList(inventoryDatabase.getDatabaseName())
110-
.tableList(inventoryDatabase.getDatabaseName() + "\\.products")
113+
.tableList(inventoryDatabase.getDatabaseName() + "\\." + tableName)
111114
.startupOptions(StartupOptions.latest())
112115
.serverId(getServerId(env.getParallelism()))
113116
.serverTimeZone("UTC")
@@ -124,17 +127,17 @@ public void testParseAlterStatementWhenTableNameAndColumnIsUpper() throws Except
124127
.executeAndCollect();
125128
Thread.sleep(5_000);
126129

127-
TableId tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), "products");
130+
TableId tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), tableName);
128131
List<Event> expected = new ArrayList<>();
129132
expected.add(getProductsCreateTableEvent(tableId));
130133
try (Connection connection = inventoryDatabase.getJdbcConnection();
131134
Statement statement = connection.createStatement()) {
132-
expected.addAll(executeAlterAndProvideExpected(tableId, statement));
135+
expected.addAll(executeAlterAndProvideExpected(tableId, statement, tableName));
133136

134137
statement.execute(
135138
String.format(
136-
"ALTER TABLE `%s`.`PRODUCTS` ADD `cols1` VARCHAR(45);",
137-
inventoryDatabase.getDatabaseName()));
139+
"ALTER TABLE `%s`.`%s` ADD `COLS1` VARCHAR(45);",
140+
inventoryDatabase.getDatabaseName(), tableName));
138141
expected.add(
139142
new AddColumnEvent(
140143
tableId,
@@ -147,6 +150,42 @@ public void testParseAlterStatementWhenTableNameAndColumnIsUpper() throws Except
147150
assertThat(actual).isEqualTo(expected);
148151
}
149152

153+
@ParameterizedTest
154+
@ValueSource(strings = {"products", "uppercase_products"})
155+
public void testSnapshotModeWhenTableNameAndColumnIsUpper(String tableName) throws Exception {
156+
env.setParallelism(1);
157+
inventoryDatabase.createAndInitialize();
158+
MySqlSourceConfigFactory configFactory =
159+
new MySqlSourceConfigFactory()
160+
.hostname(MYSQL8_CONTAINER.getHost())
161+
.port(MYSQL8_CONTAINER.getDatabasePort())
162+
.username(TEST_USER)
163+
.password(TEST_PASSWORD)
164+
.databaseList(inventoryDatabase.getDatabaseName())
165+
.tableList(inventoryDatabase.getDatabaseName() + "\\." + tableName)
166+
.startupOptions(StartupOptions.snapshot())
167+
.serverId(getServerId(env.getParallelism()))
168+
.serverTimeZone("UTC")
169+
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue());
170+
171+
FlinkSourceProvider sourceProvider =
172+
(FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();
173+
CloseableIterator<Event> events =
174+
env.fromSource(
175+
sourceProvider.getSource(),
176+
WatermarkStrategy.noWatermarks(),
177+
MySqlDataSourceFactory.IDENTIFIER,
178+
new EventTypeInfo())
179+
.executeAndCollect();
180+
Thread.sleep(5_000);
181+
182+
TableId tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), tableName);
183+
List<Event> expected = new ArrayList<>();
184+
expected.add(getProductsCreateTableEvent(tableId));
185+
List<Event> actual = fetchResults(events, expected.size());
186+
assertThat(actual).isEqualTo(expected);
187+
}
188+
150189
private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
151190
return new CreateTableEvent(
152191
tableId,
@@ -172,13 +211,13 @@ private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
172211
* );
173212
* </pre>
174213
*/
175-
private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement statement)
176-
throws SQLException {
214+
private List<Event> executeAlterAndProvideExpected(
215+
TableId tableId, Statement statement, String tableName) throws SQLException {
177216
List<Event> expected = new ArrayList<>();
178217
statement.execute(
179218
String.format(
180-
"ALTER TABLE `%s`.`products` CHANGE COLUMN `DESCRIPTION` `DESC` VARCHAR(255) NULL DEFAULT NULL;",
181-
inventoryDatabase.getDatabaseName()));
219+
"ALTER TABLE `%s`.`%s` CHANGE COLUMN `DESCRIPTION` `DESC` VARCHAR(255) NULL DEFAULT NULL;",
220+
inventoryDatabase.getDatabaseName(), tableName));
182221
expected.add(
183222
new AlterColumnTypeEvent(
184223
tableId, Collections.singletonMap("description", DataTypes.VARCHAR(255))));
@@ -187,17 +226,17 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st
187226

188227
statement.execute(
189228
String.format(
190-
"ALTER TABLE `%s`.`products` CHANGE COLUMN `desc` `desc2` VARCHAR(400) NULL DEFAULT NULL;",
191-
inventoryDatabase.getDatabaseName()));
229+
"ALTER TABLE `%s`.`%s` CHANGE COLUMN `desc` `desc2` VARCHAR(400) NULL DEFAULT NULL;",
230+
inventoryDatabase.getDatabaseName(), tableName));
192231
expected.add(
193232
new AlterColumnTypeEvent(
194233
tableId, Collections.singletonMap("desc", DataTypes.VARCHAR(400))));
195234
expected.add(new RenameColumnEvent(tableId, Collections.singletonMap("desc", "desc2")));
196235

197236
statement.execute(
198237
String.format(
199-
"ALTER TABLE `%s`.`products` ADD COLUMN `DESC1` VARCHAR(45) NULL AFTER `weight`;",
200-
inventoryDatabase.getDatabaseName()));
238+
"ALTER TABLE `%s`.`%s` ADD COLUMN `DESC1` VARCHAR(45) NULL AFTER `weight`;",
239+
inventoryDatabase.getDatabaseName(), tableName));
201240
expected.add(
202241
new AddColumnEvent(
203242
tableId,
@@ -209,8 +248,8 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st
209248

210249
statement.execute(
211250
String.format(
212-
"ALTER TABLE `%s`.`products` ADD COLUMN `col1` VARCHAR(45) NULL AFTER `weight`, ADD COLUMN `COL2` VARCHAR(55) NULL AFTER `desc1`;",
213-
inventoryDatabase.getDatabaseName()));
251+
"ALTER TABLE `%s`.`%s` ADD COLUMN `col1` VARCHAR(45) NULL AFTER `weight`, ADD COLUMN `COL2` VARCHAR(55) NULL AFTER `desc1`;",
252+
inventoryDatabase.getDatabaseName(), tableName));
214253
expected.add(
215254
new AddColumnEvent(
216255
tableId,
@@ -230,8 +269,8 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st
230269

231270
statement.execute(
232271
String.format(
233-
"ALTER TABLE `%s`.`products` DROP COLUMN `desc2`, CHANGE COLUMN `desc1` `desc1` VARCHAR(65) NULL DEFAULT NULL;",
234-
inventoryDatabase.getDatabaseName()));
272+
"ALTER TABLE `%s`.`%s` DROP COLUMN `desc2`, CHANGE COLUMN `desc1` `desc1` VARCHAR(65) NULL DEFAULT NULL;",
273+
inventoryDatabase.getDatabaseName(), tableName));
235274
expected.add(new DropColumnEvent(tableId, Collections.singletonList("desc2")));
236275
expected.add(
237276
new AlterColumnTypeEvent(
@@ -240,22 +279,22 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st
240279
// Only available in mysql 8.0
241280
statement.execute(
242281
String.format(
243-
"ALTER TABLE `%s`.`products` RENAME COLUMN `desc1` TO `desc3`;",
244-
inventoryDatabase.getDatabaseName()));
282+
"ALTER TABLE `%s`.`%s` RENAME COLUMN `desc1` TO `desc3`;",
283+
inventoryDatabase.getDatabaseName(), tableName));
245284
expected.add(new RenameColumnEvent(tableId, Collections.singletonMap("desc1", "desc3")));
246285

247286
statement.execute(
248287
String.format(
249-
"ALTER TABLE `%s`.`products` MODIFY COLUMN `DESC3` VARCHAR(255) NULL DEFAULT NULL;",
250-
inventoryDatabase.getDatabaseName()));
288+
"ALTER TABLE `%s`.`%s` MODIFY COLUMN `DESC3` VARCHAR(255) NULL DEFAULT NULL;",
289+
inventoryDatabase.getDatabaseName(), tableName));
251290
expected.add(
252291
new AlterColumnTypeEvent(
253292
tableId, Collections.singletonMap("desc3", DataTypes.VARCHAR(255))));
254293

255294
statement.execute(
256295
String.format(
257-
"ALTER TABLE `%s`.`products` DROP COLUMN `desc3`;",
258-
inventoryDatabase.getDatabaseName()));
296+
"ALTER TABLE `%s`.`%s` DROP COLUMN `desc3`;",
297+
inventoryDatabase.getDatabaseName(), tableName));
259298
expected.add(new DropColumnEvent(tableId, Collections.singletonList("desc3")));
260299

261300
// Should not catch SchemaChangeEvent of tables other than `products`

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,27 @@ VALUES (default,"scooter","Small 2-wheel scooter",3.14),
3434
(default,"hammer","14oz carpenter's hammer",0.875),
3535
(default,"hammer","16oz carpenter's hammer",1.0),
3636
(default,"rocks","box of assorted rocks",5.3),
37-
(default,"jacket","water resistent black wind breaker",0.1),
37+
(default,"jacket","water resistant black wind breaker",0.1),
38+
(default,"spare tire","24 inch spare tire",22.2);
39+
40+
-- Create a table where all fields are in uppercase.
41+
CREATE TABLE uppercase_products (
42+
ID INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
43+
NAME VARCHAR(255) NOT NULL DEFAULT 'flink',
44+
DESCRIPTION VARCHAR(512),
45+
WEIGHT FLOAT(6)
46+
);
47+
ALTER TABLE uppercase_products AUTO_INCREMENT = 101;
48+
49+
INSERT INTO uppercase_products
50+
VALUES (default,"scooter","Small 2-wheel scooter",3.14),
51+
(default,"car battery","12V car battery",8.1),
52+
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8),
53+
(default,"hammer","12oz carpenter's hammer",0.75),
54+
(default,"hammer","14oz carpenter's hammer",0.875),
55+
(default,"hammer","16oz carpenter's hammer",1.0),
56+
(default,"rocks","box of assorted rocks",5.3),
57+
(default,"jacket","water resistant black wind breaker",0.1),
3858
(default,"spare tire","24 inch spare tire",22.2);
3959

4060
-- Create some customers ...

0 commit comments

Comments
 (0)