Skip to content

Commit cefcbe0

Browse files
committed
[FLINK-39071][oracle-cdc] Fix Oracle pipeline connector wrong field case
1 parent c155638 commit cefcbe0

3 files changed

Lines changed: 14 additions & 14 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ public static Schema getSchema(
158158
DataType dataType = OracleTypeUtils.fromDbzColumn(column);
159159
org.apache.flink.cdc.common.schema.Column cdcColumn =
160160
org.apache.flink.cdc.common.schema.Column.physicalColumn(
161-
column.name().toLowerCase(Locale.ROOT), dataType);
161+
column.name(), dataType);
162162
list.add(cdcColumn);
163163
}
164164
return Schema.newBuilder().setColumns(list).primaryKey(pks).build();
@@ -181,7 +181,7 @@ public static List<String> getTablePks(
181181
while (rs.next()) {
182182
String columnName;
183183
columnName = rs.getString(1);
184-
list.add(columnName.toLowerCase(Locale.ROOT));
184+
list.add(columnName);
185185
}
186186
return list;
187187
});

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -981,10 +981,10 @@ public void testGeometryType() throws Exception {
981981
new CreateTableEvent(
982982
TableId.tableId("DEBEZIUM", "MYLAKE"),
983983
Schema.newBuilder()
984-
.physicalColumn("feature_id", DataTypes.BIGINT().notNull())
985-
.physicalColumn("name", DataTypes.VARCHAR(32))
986-
.physicalColumn("shape", DataTypes.STRING())
987-
.primaryKey(Arrays.asList("feature_id"))
984+
.physicalColumn("FEATURE_ID", DataTypes.BIGINT().notNull())
985+
.physicalColumn("NAME", DataTypes.VARCHAR(32))
986+
.physicalColumn("SHAPE", DataTypes.STRING())
987+
.primaryKey(Arrays.asList("FEATURE_ID"))
988988
.build());
989989

990990
RowType rowType =
@@ -1568,11 +1568,11 @@ private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
15681568
return new CreateTableEvent(
15691569
tableId,
15701570
Schema.newBuilder()
1571-
.physicalColumn("id", DataTypes.BIGINT().notNull())
1572-
.physicalColumn("name", DataTypes.VARCHAR(255).notNull())
1573-
.physicalColumn("description", DataTypes.VARCHAR(512))
1574-
.physicalColumn("weight", DataTypes.FLOAT())
1575-
.primaryKey(Collections.singletonList("id"))
1571+
.physicalColumn("ID", DataTypes.BIGINT().notNull())
1572+
.physicalColumn("NAME", DataTypes.VARCHAR(255).notNull())
1573+
.physicalColumn("DESCRIPTION", DataTypes.VARCHAR(512))
1574+
.physicalColumn("WEIGHT", DataTypes.FLOAT())
1575+
.primaryKey(Collections.singletonList("ID"))
15761576
.build());
15771577
}
15781578

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ void testSyncWholeDatabase() throws Exception {
150150
Statement stat = conn.createStatement()) {
151151

152152
waitUntilSpecificEvent(
153-
"CreateTableEvent{tableId=DEBEZIUM.PRODUCTS, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`description` VARCHAR(512),`weight` FLOAT}, primaryKeys=id, options=()}");
153+
"CreateTableEvent{tableId=DEBEZIUM.PRODUCTS, schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`DESCRIPTION` VARCHAR(512),`WEIGHT` FLOAT}, primaryKeys=ID, options=()}");
154154
waitUntilSpecificEvent(
155155
"DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[], after=[109, spare tire, 24 inch spare tire, 22.2], op=INSERT, meta=()}");
156156
waitUntilSpecificEvent(
@@ -171,7 +171,7 @@ void testSyncWholeDatabase() throws Exception {
171171
"DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[], after=[105, hammer, 14oz carpenters hammer, 0.875], op=INSERT, meta=()}");
172172

173173
waitUntilSpecificEvent(
174-
"CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}");
174+
"CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS, schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS` VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}");
175175
waitUntilSpecificEvent(
176176
"DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691841, user_1, Shanghai, 123567891234], op=INSERT, meta=()}");
177177
waitUntilSpecificEvent(
@@ -225,7 +225,7 @@ void testSyncWholeDatabase() throws Exception {
225225
waitUntilSpecificEvent(
226226
"DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[107, rocks, box of assorted rocks, 5.3], after=[107, rocks, box of assorted rocks, 5.1], op=UPDATE, meta=()}");
227227
waitUntilSpecificEvent(
228-
"CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS_1, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}");
228+
"CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS_1, schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS` VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}");
229229
waitUntilSpecificEvent(
230230
"DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691842, user_10, Shanghai, 123567891234], op=INSERT, meta=()}");
231231
waitUntilSpecificEvent(

0 commit comments

Comments
 (0)