Skip to content

Commit 71443e7

Browse files
committed
fix: Skip default values for Iceberg format v2 tables
1 parent dfd837f commit 71443e7

2 files changed

Lines changed: 422 additions & 283 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838

3939
import org.apache.hadoop.conf.Configuration;
4040
import org.apache.iceberg.CatalogUtil;
41+
import org.apache.iceberg.HasTableOperations;
4142
import org.apache.iceberg.PartitionSpec;
4243
import org.apache.iceberg.Schema;
4344
import org.apache.iceberg.Table;
@@ -197,6 +198,9 @@ private void applyCreateTable(CreateTableEvent event) {
197198

198199
private void applyDefaultValues(
199200
Table table, org.apache.flink.cdc.common.schema.Schema cdcSchema) {
201+
if (getFormatVersion(table) < 3) {
202+
return;
203+
}
200204
UpdateSchema updateSchema = null;
201205
for (Column column : cdcSchema.getColumns()) {
202206
Literal<?> defaultValue =
@@ -240,8 +244,9 @@ private void applyAddColumnEventWithPosition(Table table, AddColumnEvent event)
240244
Literal<?> defaultValue =
241245
IcebergTypeUtils.parseDefaultValue(
242246
addColumn.getDefaultValueExpression(), addColumn.getType());
243-
if (defaultValue != null) {
247+
if (defaultValue != null && getFormatVersion(table) >= 3) {
244248
updateSchema.addColumn(columnName, icebergType, columnComment, defaultValue);
249+
updateSchema.updateColumnDefault(columnName, defaultValue);
245250
} else {
246251
updateSchema.addColumn(columnName, icebergType, columnComment);
247252
}
@@ -396,6 +401,13 @@ public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
396401
SchemaChangeEventType.ALTER_COLUMN_TYPE);
397402
}
398403

404+
private int getFormatVersion(Table table) {
405+
if (table instanceof HasTableOperations) {
406+
return ((HasTableOperations) table).operations().current().formatVersion();
407+
}
408+
return 2;
409+
}
410+
399411
@Override
400412
public void close() {
401413
catalog = null;

0 commit comments

Comments
 (0)