Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,18 @@
import org.apache.flink.cdc.connectors.oracle.source.parser.OracleAntlrDdlParser;
import org.apache.flink.cdc.connectors.oracle.table.OracleReadableMetaData;
import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema;
import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer;
import org.apache.flink.cdc.debezium.table.DebeziumChangelogMode;
import org.apache.flink.table.data.TimestampData;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;

import io.debezium.data.Envelope;
import io.debezium.data.geometry.Geometry;
import io.debezium.document.Array;
import io.debezium.relational.Tables;
import io.debezium.relational.history.HistoryRecord;
import io.debezium.relational.history.TableChanges;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
Expand All @@ -58,6 +61,8 @@ public class OracleEventDeserializer extends DebeziumEventDeserializationSchema
"io.debezium.connector.oracle.SchemaChangeKey";

private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER =
new FlinkJsonTableChangeSerializer();

private final boolean includeSchemaChanges;

Expand Down Expand Up @@ -90,6 +95,25 @@ protected List<SchemaChangeEvent> deserializeSchemaChangeRecord(SourceRecord rec
customParser = new OracleAntlrDdlParser(databaseName, schemaName);
tables = new Tables();
}
Array tableChanges =
historyRecord.document().getArray(HistoryRecord.Fields.TABLE_CHANGES);
if (tableChanges != null) {
TableChanges changes = TABLE_CHANGE_SERIALIZER.deserialize(tableChanges, true);
for (TableChanges.TableChange tableChange : changes) {
switch (tableChange.getType()) {
case CREATE:
case ALTER:
tables.overwriteTable(tableChange.getTable());
break;
case DROP:
// Keep current cache behavior for DROP: parser state relies on
// overwrite and downstream events handle table drop separately.
break;
default:
break;
}
}
}
String ddl =
historyRecord.document().getString(HistoryRecord.Fields.DDL_STATEMENTS);
customParser.setCurrentDatabase(databaseName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.debezium.ddl.parser.oracle.generated.PlSqlParser;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
import io.debezium.relational.Table;
import io.debezium.relational.TableEditor;
import io.debezium.relational.TableId;
import io.debezium.text.ParsingException;
Expand Down Expand Up @@ -88,15 +87,7 @@ public OracleAlterTableParserListener(
@Override
public void enterAlter_table(PlSqlParser.Alter_tableContext ctx) {
TableId tableId = new TableId(catalogName, schemaName, getTableName(ctx.tableview_name()));
tableEditor = Table.editor().tableId(tableId);
if (tableEditor == null) {
throw new ParsingException(
null,
"Trying to alter table "
+ tableId
+ ", which does not exist. Query: "
+ getText(ctx));
}
tableEditor = parser.databaseTables().editOrCreateTable(tableId);
super.enterAlter_table(ctx);
}

Expand Down Expand Up @@ -174,24 +165,28 @@ public void enterModify_column_clauses(PlSqlParser.Modify_column_clausesContext
columnEditors = new ArrayList<>(columns.size());
for (PlSqlParser.Modify_col_propertiesContext column : columns) {
String columnName = getColumnName(column.column_name());
Column existingColumn = Column.editor().name(columnName).create();
if (existingColumn != null) {
ColumnEditor columnEditor = existingColumn.edit();
columnDefinitionParserListener =
new ColumnDefinitionParserListener(tableEditor, columnEditor);
listeners.add(columnDefinitionParserListener);

columnEditors.add(columnEditor);
Column existingColumn = tableEditor.create().columnWithName(columnName);
ColumnEditor columnEditor;
if (existingColumn == null) {
if (column.datatype() == null) {
throw new ParsingException(
null,
"Trying to change column "
+ columnName
+ " in "
+ tableEditor.tableId()
+ " table, but column schema is missing and "
+ "MODIFY statement doesn't provide datatype. Query: "
+ getText(ctx));
}
columnEditor = Column.editor().name(columnName);
} else {
throw new ParsingException(
null,
"trying to change column "
+ columnName
+ " in "
+ tableEditor.tableId().toString()
+ " table, which does not exist. Query: "
+ getText(ctx));
columnEditor = existingColumn.edit();
}
columnDefinitionParserListener =
new ColumnDefinitionParserListener(tableEditor, columnEditor);
listeners.add(columnDefinitionParserListener);
columnEditors.add(columnEditor);
}
},
tableEditor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1727,13 +1727,20 @@ private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement st

statement.execute(
String.format(
"ALTER TABLE %s.products ADD DESC1 VARCHAR(45) DEFAULT NULL", "debezium"));
"ALTER TABLE %s.products ADD DESC1 VARCHAR(45) DEFAULT 'N/A' NOT NULL",
"debezium"));
expected.add(
new AddColumnEvent(
tableId,
Collections.singletonList(
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("DESC1", DataTypes.VARCHAR(45))))));
Column.physicalColumn(
"DESC1", DataTypes.VARCHAR(45).notNull())))));

statement.execute(String.format("ALTER TABLE %s.products MODIFY DESC1 NULL", "DEBEZIUM"));
expected.add(
new AlterColumnTypeEvent(
tableId, Collections.singletonMap("DESC1", DataTypes.VARCHAR(45))));

statement.execute(
String.format(
Expand Down
Loading