Skip to content

Commit d98372e

Browse files
Hisoka-Xlvyanquanclaude
committed
[FLINK-39171][oracle-cdc] Fix Oracle pipeline connector wrong field case (apache#4292)
Co-authored-by: lvyanquan <lvyanquan.lyq@alibaba-inc.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> (cherry picked from commit 1f64e57)
1 parent c8e444b commit d98372e

4 files changed

Lines changed: 45 additions & 32 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.flink.cdc.connectors.oracle.source.parser;
1919

20+
import org.apache.flink.cdc.connectors.oracle.utils.OracleSchemaUtils;
21+
2022
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
2123
import io.debezium.ddl.parser.oracle.generated.PlSqlParserBaseListener;
2224

@@ -76,21 +78,6 @@ public String getColumnName(final PlSqlParser.New_column_nameContext ctx) {
7678
* @return parsed table or column name from the supplied name argument
7779
*/
7880
private static String getTableOrColumnName(String name) {
79-
return removeQuotes(name, true);
80-
}
81-
82-
/**
83-
* Removes leading and trailing double quote characters from the provided string.
84-
*
85-
* @param text value to have double quotes removed
86-
* @param upperCaseIfNotQuoted control if returned string is upper-cased if not quoted
87-
* @return string that has had quotes removed
88-
*/
89-
@SuppressWarnings("SameParameterValue")
90-
private static String removeQuotes(String text, boolean upperCaseIfNotQuoted) {
91-
if (text != null && text.length() > 2 && text.startsWith("\"") && text.endsWith("\"")) {
92-
return text.substring(1, text.length() - 1);
93-
}
94-
return upperCaseIfNotQuoted ? text.toUpperCase() : text;
81+
return OracleSchemaUtils.removeQuotes(name);
9582
}
9683
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,25 @@ public class OracleSchemaUtils {
4646

4747
private static final Logger LOG = LoggerFactory.getLogger(OracleSchemaUtils.class);
4848

49+
/**
50+
* Removes leading and trailing double quote characters from the provided string.
51+
*
52+
* <p>Oracle table and column names are inherently stored in upper-case; however, if the objects
53+
* are created using double-quotes, the case of the object name is retained. This method will
54+
* adhere to those rules and will always return the name in upper-case unless the provided name
55+
* is double-quoted in which the returned value will have the double-quotes removed and case
56+
* retained.
57+
*
58+
* @param text value to have double quotes removed
59+
* @return string that has had quotes removed
60+
*/
61+
public static String removeQuotes(String text) {
62+
if (text != null && text.length() > 2 && text.startsWith("\"") && text.endsWith("\"")) {
63+
return text.substring(1, text.length() - 1);
64+
}
65+
return text.toUpperCase();
66+
}
67+
4968
public static List<TableId> listTables(
5069
OracleSourceConfig sourceConfig, @Nullable String dbName) {
5170
try (JdbcConnection jdbc = createOracleConnection(sourceConfig)) {
@@ -127,16 +146,23 @@ public static Schema toSchema(Table table) {
127146
.map(OracleSchemaUtils::toColumn)
128147
.collect(Collectors.toList());
129148

149+
List<String> primaryKeys =
150+
table.primaryKeyColumnNames().stream()
151+
.map(OracleSchemaUtils::removeQuotes)
152+
.collect(Collectors.toList());
153+
130154
return Schema.newBuilder()
131155
.setColumns(columns)
132-
.primaryKey(table.primaryKeyColumnNames())
156+
.primaryKey(primaryKeys)
133157
.comment(table.comment())
134158
.build();
135159
}
136160

137161
public static Column toColumn(io.debezium.relational.Column column) {
138162
return Column.physicalColumn(
139-
column.name(), OracleTypeUtils.fromDbzColumn(column), column.comment());
163+
removeQuotes(column.name()),
164+
OracleTypeUtils.fromDbzColumn(column),
165+
column.comment());
140166
}
141167

142168
public static io.debezium.relational.TableId toDbzTableId(TableId tableId) {
@@ -158,7 +184,7 @@ public static Schema getSchema(
158184
DataType dataType = OracleTypeUtils.fromDbzColumn(column);
159185
org.apache.flink.cdc.common.schema.Column cdcColumn =
160186
org.apache.flink.cdc.common.schema.Column.physicalColumn(
161-
column.name().toLowerCase(Locale.ROOT), dataType);
187+
removeQuotes(column.name()), dataType);
162188
list.add(cdcColumn);
163189
}
164190
return Schema.newBuilder().setColumns(list).primaryKey(pks).build();
@@ -181,7 +207,7 @@ public static List<String> getTablePks(
181207
while (rs.next()) {
182208
String columnName;
183209
columnName = rs.getString(1);
184-
list.add(columnName.toLowerCase(Locale.ROOT));
210+
list.add(removeQuotes(columnName));
185211
}
186212
return list;
187213
});

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -981,10 +981,10 @@ public void testGeometryType() throws Exception {
981981
new CreateTableEvent(
982982
TableId.tableId("DEBEZIUM", "MYLAKE"),
983983
Schema.newBuilder()
984-
.physicalColumn("feature_id", DataTypes.BIGINT().notNull())
985-
.physicalColumn("name", DataTypes.VARCHAR(32))
986-
.physicalColumn("shape", DataTypes.STRING())
987-
.primaryKey(Arrays.asList("feature_id"))
984+
.physicalColumn("FEATURE_ID", DataTypes.BIGINT().notNull())
985+
.physicalColumn("NAME", DataTypes.VARCHAR(32))
986+
.physicalColumn("SHAPE", DataTypes.STRING())
987+
.primaryKey(Arrays.asList("FEATURE_ID"))
988988
.build());
989989

990990
RowType rowType =
@@ -1568,11 +1568,11 @@ private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
15681568
return new CreateTableEvent(
15691569
tableId,
15701570
Schema.newBuilder()
1571-
.physicalColumn("id", DataTypes.BIGINT().notNull())
1572-
.physicalColumn("name", DataTypes.VARCHAR(255).notNull())
1573-
.physicalColumn("description", DataTypes.VARCHAR(512))
1574-
.physicalColumn("weight", DataTypes.FLOAT())
1575-
.primaryKey(Collections.singletonList("id"))
1571+
.physicalColumn("ID", DataTypes.BIGINT().notNull())
1572+
.physicalColumn("NAME", DataTypes.VARCHAR(255).notNull())
1573+
.physicalColumn("DESCRIPTION", DataTypes.VARCHAR(512))
1574+
.physicalColumn("WEIGHT", DataTypes.FLOAT())
1575+
.primaryKey(Collections.singletonList("ID"))
15761576
.build());
15771577
}
15781578

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/OracleE2eITCase.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ void testSyncWholeDatabase() throws Exception {
150150
Statement stat = conn.createStatement()) {
151151

152152
waitUntilSpecificEvent(
153-
"CreateTableEvent{tableId=DEBEZIUM.PRODUCTS, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`description` VARCHAR(512),`weight` FLOAT}, primaryKeys=id, options=()}");
153+
"CreateTableEvent{tableId=DEBEZIUM.PRODUCTS, schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`DESCRIPTION` VARCHAR(512),`WEIGHT` FLOAT}, primaryKeys=ID, options=()}");
154154
waitUntilSpecificEvent(
155155
"DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[], after=[109, spare tire, 24 inch spare tire, 22.2], op=INSERT, meta=()}");
156156
waitUntilSpecificEvent(
@@ -171,7 +171,7 @@ void testSyncWholeDatabase() throws Exception {
171171
"DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[], after=[105, hammer, 14oz carpenters hammer, 0.875], op=INSERT, meta=()}");
172172

173173
waitUntilSpecificEvent(
174-
"CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}");
174+
"CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS, schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS` VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}");
175175
waitUntilSpecificEvent(
176176
"DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS, before=[], after=[171798691841, user_1, Shanghai, 123567891234], op=INSERT, meta=()}");
177177
waitUntilSpecificEvent(
@@ -225,7 +225,7 @@ void testSyncWholeDatabase() throws Exception {
225225
waitUntilSpecificEvent(
226226
"DataChangeEvent{tableId=DEBEZIUM.PRODUCTS, before=[107, rocks, box of assorted rocks, 5.3], after=[107, rocks, box of assorted rocks, 5.1], op=UPDATE, meta=()}");
227227
waitUntilSpecificEvent(
228-
"CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS_1, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255) NOT NULL,`address` VARCHAR(1024),`phone_number` VARCHAR(512)}, primaryKeys=id, options=()}");
228+
"CreateTableEvent{tableId=DEBEZIUM.CUSTOMERS_1, schema=columns={`ID` BIGINT NOT NULL,`NAME` VARCHAR(255) NOT NULL,`ADDRESS` VARCHAR(1024),`PHONE_NUMBER` VARCHAR(512)}, primaryKeys=ID, options=()}");
229229
waitUntilSpecificEvent(
230230
"DataChangeEvent{tableId=DEBEZIUM.CUSTOMERS_1, before=[], after=[171798691842, user_10, Shanghai, 123567891234], op=INSERT, meta=()}");
231231
waitUntilSpecificEvent(

0 commit comments

Comments
 (0)