Skip to content

Commit 3656a93

Browse files
zml1206lvyanquan
authored andcommitted
[FLINK-39196][pipeline-connector][oracle] Support change column nullable without data type (apache#4295)
(cherry picked from commit ec244f1)
1 parent d98372e commit 3656a93

3 files changed

Lines changed: 54 additions & 28 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/OracleEventDeserializer.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,18 @@
2424
import org.apache.flink.cdc.connectors.oracle.source.parser.OracleAntlrDdlParser;
2525
import org.apache.flink.cdc.connectors.oracle.table.OracleReadableMetaData;
2626
import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema;
27+
import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
2728
import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
2829
import org.apache.flink.table.data.TimestampData;
2930

3031
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
3132

3233
import io.debezium.data.Envelope;
3334
import io.debezium.data.geometry.Geometry;
35+
import io.debezium.document.Array;
3436
import io.debezium.relational.Tables;
3537
import io.debezium.relational.history.HistoryRecord;
38+
import io.debezium.relational.history.TableChanges;
3639
import org.apache.kafka.connect.data.Schema;
3740
import org.apache.kafka.connect.data.Struct;
3841
import org.apache.kafka.connect.source.SourceRecord;
@@ -58,6 +61,8 @@ public class OracleEventDeserializer extends DebeziumEventDeserializationSchema
5861
"io.debezium.connector.oracle.SchemaChangeKey";
5962

6063
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
64+
private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER =
65+
new FlinkJsonTableChangeSerializer();
6166

6267
private final boolean includeSchemaChanges;
6368

@@ -90,6 +95,25 @@ protected List<SchemaChangeEvent> deserializeSchemaChangeRecord(SourceRecord rec
9095
customParser = new OracleAntlrDdlParser(databaseName, schemaName);
9196
tables = new Tables();
9297
}
98+
Array tableChanges =
99+
historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
100+
if (tableChanges != null) {
101+
TableChanges changes = TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
102+
for (TableChanges.TableChange tableChange : changes) {
103+
switch (tableChange.getType()) {
104+
case CREATE:
105+
case ALTER:
106+
tables.overwriteTable(tableChange.getTable());
107+
break;
108+
case DROP:
109+
// Keep current cache behavior for DROP: parser state relies on
110+
// overwrite and downstream events handle table drop separately.
111+
break;
112+
default:
113+
break;
114+
}
115+
}
116+
}
93117
String ddl =
94118
historyRecord.document().getString(HistoryRecord.Fields.DDL_STATEMENTS);
95119
customParser.setCurrentDatabase(databaseName);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/main/java/org/apache/flink/cdc/connectors/oracle/source/parser/OracleAlterTableParserListener.java

Lines changed: 21 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
2929
import io.debezium.relational.Column;
3030
import io.debezium.relational.ColumnEditor;
31-
import io.debezium.relational.Table;
3231
import io.debezium.relational.TableEditor;
3332
import io.debezium.relational.TableId;
3433
import io.debezium.text.ParsingException;
@@ -88,15 +87,7 @@ public OracleAlterTableParserListener(
8887
@Override
8988
public void enterAlter_table(PlSqlParser.Alter_tableContext ctx) {
9089
TableId tableId = new TableId(catalogName, schemaName, getTableName(ctx.tableview_name()));
91-
tableEditor = Table.editor().tableId(tableId);
92-
if (tableEditor == null) {
93-
throw new ParsingException(
94-
null,
95-
"Trying to alter table "
96-
+ tableId
97-
+ ", which does not exist. Query: "
98-
+ getText(ctx));
99-
}
90+
tableEditor = parser.databaseTables().editOrCreateTable(tableId);
10091
super.enterAlter_table(ctx);
10192
}
10293

@@ -174,24 +165,28 @@ public void enterModify_column_clauses(PlSqlParser.Modify_column_clausesContext
174165
columnEditors = new ArrayList<>(columns.size());
175166
for (PlSqlParser.Modify_col_propertiesContext column : columns) {
176167
String columnName = getColumnName(column.column_name());
177-
Column existingColumn = Column.editor().name(columnName).create();
178-
if (existingColumn != null) {
179-
ColumnEditor columnEditor = existingColumn.edit();
180-
columnDefinitionParserListener =
181-
new ColumnDefinitionParserListener(tableEditor, columnEditor);
182-
listeners.add(columnDefinitionParserListener);
183-
184-
columnEditors.add(columnEditor);
168+
Column existingColumn = tableEditor.create().columnWithName(columnName);
169+
ColumnEditor columnEditor;
170+
if (existingColumn == null) {
171+
if (column.datatype() == null) {
172+
throw new ParsingException(
173+
null,
174+
"Trying to change column "
175+
+ columnName
176+
+ " in "
177+
+ tableEditor.tableId()
178+
+ " table, but column schema is missing and "
179+
+ "MODIFY statement doesn't provide datatype. Query: "
180+
+ getText(ctx));
181+
}
182+
columnEditor = Column.editor().name(columnName);
185183
} else {
186-
throw new ParsingException(
187-
null,
188-
"trying to change column "
189-
+ columnName
190-
+ " in "
191-
+ tableEditor.tableId().toString()
192-
+ " table, which does not exist. Query: "
193-
+ getText(ctx));
184+
columnEditor = existingColumn.edit();
194185
}
186+
columnDefinitionParserListener =
187+
new ColumnDefinitionParserListener(tableEditor, columnEditor);
188+
listeners.add(columnDefinitionParserListener);
189+
columnEditors.add(columnEditor);
195190
}
196191
},
197192
tableEditor);

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-oracle/src/test/java/org/apache/flink/cdc/connectors/oracle/source/OraclePipelineITCase.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1727,13 +1727,20 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st
17271727

17281728
statement.execute(
17291729
String.format(
1730-
"ALTER TABLE %s.products ADD DESC1 VARCHAR(45) DEFAULT NULL", "debezium"));
1730+
"ALTER TABLE %s.products ADD DESC1 VARCHAR(45) DEFAULT 'N/A' NOT NULL",
1731+
"debezium"));
17311732
expected.add(
17321733
new AddColumnEvent(
17331734
tableId,
17341735
Collections.singletonList(
17351736
new AddColumnEvent.ColumnWithPosition(
1736-
Column.physicalColumn("DESC1", DataTypes.VARCHAR(45))))));
1737+
Column.physicalColumn(
1738+
"DESC1", DataTypes.VARCHAR(45).notNull())))));
1739+
1740+
statement.execute(String.format("ALTER TABLE %s.products MODIFY DESC1 NULL", "debezium"));
1741+
expected.add(
1742+
new AlterColumnTypeEvent(
1743+
tableId, Collections.singletonMap("DESC1", DataTypes.VARCHAR(45))));
17371744

17381745
statement.execute(
17391746
String.format(

0 commit comments

Comments
 (0)