Skip to content

Commit 0bcf771

Browse files
committed
add testcase
Signed-off-by: Pei Yu <125331682@qq.com>
1 parent ae3983b commit 0bcf771

2 files changed

Lines changed: 49 additions & 26 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlTableIdCaseInsensitveITCase.java

Lines changed: 29 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@
4444
import org.junit.jupiter.api.AfterAll;
4545
import org.junit.jupiter.api.BeforeAll;
4646
import org.junit.jupiter.api.BeforeEach;
47-
import org.junit.jupiter.api.Test;
47+
import org.junit.jupiter.params.ParameterizedTest;
48+
import org.junit.jupiter.params.provider.ValueSource;
4849
import org.testcontainers.lifecycle.Startables;
4950

5051
import java.sql.Connection;
@@ -96,8 +97,10 @@ public void before() {
9697
env.setRestartStrategy(RestartStrategies.noRestart());
9798
}
9899

99-
@Test
100-
public void testParseAlterStatementWhenTableNameAndColumnIsUpper() throws Exception {
100+
@ParameterizedTest
101+
@ValueSource(strings = {"products", "uppercase_products"})
102+
public void testParseAlterStatementWhenTableNameAndColumnIsUpper(String tableName)
103+
throws Exception {
101104
env.setParallelism(1);
102105
inventoryDatabase.createAndInitialize();
103106
MySqlSourceConfigFactory configFactory =
@@ -107,7 +110,7 @@ public void testParseAlterStatementWhenTableNameAndColumnIsUpper() throws Except
107110
.username(TEST_USER)
108111
.password(TEST_PASSWORD)
109112
.databaseList(inventoryDatabase.getDatabaseName())
110-
.tableList(inventoryDatabase.getDatabaseName() + "\\.products")
113+
.tableList(inventoryDatabase.getDatabaseName() + "\\." + tableName)
111114
.startupOptions(StartupOptions.latest())
112115
.serverId(getServerId(env.getParallelism()))
113116
.serverTimeZone("UTC")
@@ -124,17 +127,17 @@ public void testParseAlterStatementWhenTableNameAndColumnIsUpper() throws Except
124127
.executeAndCollect();
125128
Thread.sleep(5_000);
126129

127-
TableId tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), "products");
130+
TableId tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), tableName);
128131
List<Event> expected = new ArrayList<>();
129132
expected.add(getProductsCreateTableEvent(tableId));
130133
try (Connection connection = inventoryDatabase.getJdbcConnection();
131134
Statement statement = connection.createStatement()) {
132-
expected.addAll(executeAlterAndProvideExpected(tableId, statement));
135+
expected.addAll(executeAlterAndProvideExpected(tableId, statement, tableName));
133136

134137
statement.execute(
135138
String.format(
136-
"ALTER TABLE `%s`.`PRODUCTS` ADD `cols1` VARCHAR(45);",
137-
inventoryDatabase.getDatabaseName()));
139+
"ALTER TABLE `%s`.`%s` ADD `COLS1` VARCHAR(45);",
140+
inventoryDatabase.getDatabaseName(), tableName));
138141
expected.add(
139142
new AddColumnEvent(
140143
tableId,
@@ -172,13 +175,13 @@ private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
172175
* );
173176
* </pre>
174177
*/
175-
private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement statement)
176-
throws SQLException {
178+
private List<Event> executeAlterAndProvideExpected(
179+
TableId tableId, Statement statement, String tableName) throws SQLException {
177180
List<Event> expected = new ArrayList<>();
178181
statement.execute(
179182
String.format(
180-
"ALTER TABLE `%s`.`products` CHANGE COLUMN `DESCRIPTION` `DESC` VARCHAR(255) NULL DEFAULT NULL;",
181-
inventoryDatabase.getDatabaseName()));
183+
"ALTER TABLE `%s`.`%s` CHANGE COLUMN `DESCRIPTION` `DESC` VARCHAR(255) NULL DEFAULT NULL;",
184+
inventoryDatabase.getDatabaseName(), tableName));
182185
expected.add(
183186
new AlterColumnTypeEvent(
184187
tableId, Collections.singletonMap("description", DataTypes.VARCHAR(255))));
@@ -187,17 +190,17 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st
187190

188191
statement.execute(
189192
String.format(
190-
"ALTER TABLE `%s`.`products` CHANGE COLUMN `desc` `desc2` VARCHAR(400) NULL DEFAULT NULL;",
191-
inventoryDatabase.getDatabaseName()));
193+
"ALTER TABLE `%s`.`%s` CHANGE COLUMN `desc` `desc2` VARCHAR(400) NULL DEFAULT NULL;",
194+
inventoryDatabase.getDatabaseName(), tableName));
192195
expected.add(
193196
new AlterColumnTypeEvent(
194197
tableId, Collections.singletonMap("desc", DataTypes.VARCHAR(400))));
195198
expected.add(new RenameColumnEvent(tableId, Collections.singletonMap("desc", "desc2")));
196199

197200
statement.execute(
198201
String.format(
199-
"ALTER TABLE `%s`.`products` ADD COLUMN `DESC1` VARCHAR(45) NULL AFTER `weight`;",
200-
inventoryDatabase.getDatabaseName()));
202+
"ALTER TABLE `%s`.`%s` ADD COLUMN `DESC1` VARCHAR(45) NULL AFTER `weight`;",
203+
inventoryDatabase.getDatabaseName(), tableName));
201204
expected.add(
202205
new AddColumnEvent(
203206
tableId,
@@ -209,8 +212,8 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st
209212

210213
statement.execute(
211214
String.format(
212-
"ALTER TABLE `%s`.`products` ADD COLUMN `col1` VARCHAR(45) NULL AFTER `weight`, ADD COLUMN `COL2` VARCHAR(55) NULL AFTER `desc1`;",
213-
inventoryDatabase.getDatabaseName()));
215+
"ALTER TABLE `%s`.`%s` ADD COLUMN `col1` VARCHAR(45) NULL AFTER `weight`, ADD COLUMN `COL2` VARCHAR(55) NULL AFTER `desc1`;",
216+
inventoryDatabase.getDatabaseName(), tableName));
214217
expected.add(
215218
new AddColumnEvent(
216219
tableId,
@@ -230,8 +233,8 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st
230233

231234
statement.execute(
232235
String.format(
233-
"ALTER TABLE `%s`.`products` DROP COLUMN `desc2`, CHANGE COLUMN `desc1` `desc1` VARCHAR(65) NULL DEFAULT NULL;",
234-
inventoryDatabase.getDatabaseName()));
236+
"ALTER TABLE `%s`.`%s` DROP COLUMN `desc2`, CHANGE COLUMN `desc1` `desc1` VARCHAR(65) NULL DEFAULT NULL;",
237+
inventoryDatabase.getDatabaseName(), tableName));
235238
expected.add(new DropColumnEvent(tableId, Collections.singletonList("desc2")));
236239
expected.add(
237240
new AlterColumnTypeEvent(
@@ -240,22 +243,22 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st
240243
// Only available in mysql 8.0
241244
statement.execute(
242245
String.format(
243-
"ALTER TABLE `%s`.`products` RENAME COLUMN `desc1` TO `desc3`;",
244-
inventoryDatabase.getDatabaseName()));
246+
"ALTER TABLE `%s`.`%s` RENAME COLUMN `desc1` TO `desc3`;",
247+
inventoryDatabase.getDatabaseName(), tableName));
245248
expected.add(new RenameColumnEvent(tableId, Collections.singletonMap("desc1", "desc3")));
246249

247250
statement.execute(
248251
String.format(
249-
"ALTER TABLE `%s`.`products` MODIFY COLUMN `DESC3` VARCHAR(255) NULL DEFAULT NULL;",
250-
inventoryDatabase.getDatabaseName()));
252+
"ALTER TABLE `%s`.`%s` MODIFY COLUMN `DESC3` VARCHAR(255) NULL DEFAULT NULL;",
253+
inventoryDatabase.getDatabaseName(), tableName));
251254
expected.add(
252255
new AlterColumnTypeEvent(
253256
tableId, Collections.singletonMap("desc3", DataTypes.VARCHAR(255))));
254257

255258
statement.execute(
256259
String.format(
257-
"ALTER TABLE `%s`.`products` DROP COLUMN `desc3`;",
258-
inventoryDatabase.getDatabaseName()));
260+
"ALTER TABLE `%s`.`%s` DROP COLUMN `desc3`;",
261+
inventoryDatabase.getDatabaseName(), tableName));
259262
expected.add(new DropColumnEvent(tableId, Collections.singletonList("desc3")));
260263

261264
// Should not catch SchemaChangeEvent of tables other than `products`

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,26 @@ VALUES (default,"scooter","Small 2-wheel scooter",3.14),
3737
(default,"jacket","water resistent black wind breaker",0.1),
3838
(default,"spare tire","24 inch spare tire",22.2);
3939

40+
-- Create a table where all fields are in uppercase.
41+
CREATE TABLE uppercase_products (
42+
ID INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
43+
NAME VARCHAR(255) NOT NULL DEFAULT 'flink',
44+
DESCRIPTION VARCHAR(512),
45+
WEIGHT FLOAT(6)
46+
);
47+
ALTER TABLE uppercase_products AUTO_INCREMENT = 101;
48+
49+
INSERT INTO uppercase_products
50+
VALUES (default,"scooter","Small 2-wheel scooter",3.14),
51+
(default,"car battery","12V car battery",8.1),
52+
(default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3",0.8),
53+
(default,"hammer","12oz carpenter's hammer",0.75),
54+
(default,"hammer","14oz carpenter's hammer",0.875),
55+
(default,"hammer","16oz carpenter's hammer",1.0),
56+
(default,"rocks","box of assorted rocks",5.3),
57+
(default,"jacket","water resistent black wind breaker",0.1),
58+
(default,"spare tire","24 inch spare tire",22.2);
59+
4060
-- Create some customers ...
4161
CREATE TABLE customers (
4262
id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,

0 commit comments

Comments
 (0)