Skip to content

Commit 2a5153d

Browse files
committed
feat: add initial-default key
1 parent 23b679e commit 2a5153d

1 file changed

Lines changed: 3 additions & 2 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,12 +237,13 @@ private void applyAddColumnEventWithPosition(Table table, AddColumnEvent event)
237237
FlinkSchemaUtil.convert(
238238
DataTypeUtils.toFlinkDataType(addColumn.getType())
239239
.getLogicalType());
240-
updateSchema.addColumn(columnName, icebergType, columnComment);
241240
Literal<?> defaultValue =
242241
IcebergTypeUtils.parseDefaultValue(
243242
addColumn.getDefaultValueExpression(), addColumn.getType());
244243
if (defaultValue != null) {
245-
updateSchema.updateColumnDefault(columnName, defaultValue);
244+
updateSchema.addColumn(columnName, icebergType, columnComment, defaultValue);
245+
} else {
246+
updateSchema.addColumn(columnName, icebergType, columnComment);
246247
}
247248
switch (columnWithPosition.getPosition()) {
248249
case FIRST:

0 commit comments

Comments
 (0)