Skip to content

Commit 844da5c

Browse files
lvyanquanclaude
andcommitted
[FLINK-39171][oracle-cdc] Extract removeQuotes as utility method for Oracle column name handling
Extract the removeQuotes method to OracleSchemaUtils to centralize Oracle table/column name case handling logic. This method follows Oracle naming rules: names enclosed in double quotes retain their original case, otherwise they are converted to uppercase. This change improves code reuse between OracleSchemaUtils and BaseParserListener, making it easier to add configuration options for case handling in the future. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent cefcbe0 commit 844da5c

2 files changed

Lines changed: 33 additions & 20 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/BaseParserListener.java

Lines changed: 3 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.flink.cdc.connectors.oracle.source.parser;
1919

20+
import org.apache.flink.cdc.connectors.oracle.utils.OracleSchemaUtils;
21+
2022
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
2123
import io.debezium.ddl.parser.oracle.generated.PlSqlParserBaseListener;
2224

@@ -76,21 +78,6 @@ public String getColumnName(final PlSqlParser.New_column_nameContext ctx) {
7678
* @return parsed table or column name from the supplied name argument
7779
*/
7880
private static String getTableOrColumnName(String name) {
79-
return removeQuotes(name, true);
80-
}
81-
82-
/**
83-
* Removes leading and trailing double quote characters from the provided string.
84-
*
85-
* @param text value to have double quotes removed
86-
* @param upperCaseIfNotQuoted control if returned string is upper-cased if not quoted
87-
* @return string that has had quotes removed
88-
*/
89-
@SuppressWarnings("SameParameterValue")
90-
private static String removeQuotes(String text, boolean upperCaseIfNotQuoted) {
91-
if (text != null && text.length() > 2 && text.startsWith("\"") && text.endsWith("\"")) {
92-
return text.substring(1, text.length() - 1);
93-
}
94-
return upperCaseIfNotQuoted ? text.toUpperCase() : text;
81+
return OracleSchemaUtils.removeQuotes(name);
9582
}
9683
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/utils/OracleSchemaUtils.java

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,25 @@ public class OracleSchemaUtils {
4646

4747
private static final Logger LOG = LoggerFactory.getLogger(OracleSchemaUtils.class);
4848

49+
/**
50+
* Removes leading and trailing double quote characters from the provided string.
51+
*
52+
* <p>Oracle table and column names are inherently stored in upper-case; however, if the objects
53+
* are created using double-quotes, the case of the object name is retained. This method will
54+
* adhere to those rules and will always return the name in upper-case unless the provided name
55+
* is double-quoted in which the returned value will have the double-quotes removed and case
56+
* retained.
57+
*
58+
* @param text value to have double quotes removed
59+
* @return string that has had quotes removed
60+
*/
61+
public static String removeQuotes(String text) {
62+
if (text != null && text.length() > 2 && text.startsWith("\"") && text.endsWith("\"")) {
63+
return text.substring(1, text.length() - 1);
64+
}
65+
return text.toUpperCase();
66+
}
67+
4968
public static List<TableId> listTables(
5069
OracleSourceConfig sourceConfig, @Nullable String dbName) {
5170
try (JdbcConnection jdbc = createOracleConnection(sourceConfig)) {
@@ -127,16 +146,23 @@ public static Schema toSchema(Table table) {
127146
.map(OracleSchemaUtils::toColumn)
128147
.collect(Collectors.toList());
129148

149+
List<String> primaryKeys =
150+
table.primaryKeyColumnNames().stream()
151+
.map(OracleSchemaUtils::removeQuotes)
152+
.collect(Collectors.toList());
153+
130154
return Schema.newBuilder()
131155
.setColumns(columns)
132-
.primaryKey(table.primaryKeyColumnNames())
156+
.primaryKey(primaryKeys)
133157
.comment(table.comment())
134158
.build();
135159
}
136160

137161
public static Column toColumn(io.debezium.relational.Column column) {
138162
return Column.physicalColumn(
139-
column.name(), OracleTypeUtils.fromDbzColumn(column), column.comment());
163+
removeQuotes(column.name()),
164+
OracleTypeUtils.fromDbzColumn(column),
165+
column.comment());
140166
}
141167

142168
public static io.debezium.relational.TableId toDbzTableId(TableId tableId) {
@@ -158,7 +184,7 @@ public static Schema getSchema(
158184
DataType dataType = OracleTypeUtils.fromDbzColumn(column);
159185
org.apache.flink.cdc.common.schema.Column cdcColumn =
160186
org.apache.flink.cdc.common.schema.Column.physicalColumn(
161-
column.name(), dataType);
187+
removeQuotes(column.name()), dataType);
162188
list.add(cdcColumn);
163189
}
164190
return Schema.newBuilder().setColumns(list).primaryKey(pks).build();
@@ -181,7 +207,7 @@ public static List<String> getTablePks(
181207
while (rs.next()) {
182208
String columnName;
183209
columnName = rs.getString(1);
184-
list.add(columnName);
210+
list.add(removeQuotes(columnName));
185211
}
186212
return list;
187213
});

0 commit comments

Comments
 (0)