From 8b6689f2cfbe667cff7086edc5cc9e219f051f5c Mon Sep 17 00:00:00 2001 From: suhwan-cheon Date: Mon, 9 Mar 2026 13:23:06 +0900 Subject: [PATCH 1/8] Support default column values in iceberg sink connector --- .../iceberg/sink/IcebergMetadataApplier.java | 41 ++- .../iceberg/sink/utils/IcebergTypeUtils.java | 67 ++++ .../sink/IcebergMetadataApplierTest.java | 321 ++++-------------- .../sink/utils/IcebergTypeUtilsTest.java | 139 ++++++++ 4 files changed, 302 insertions(+), 266 deletions(-) create mode 100644 flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtilsTest.java diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java index 24d01ffd0cb..2c5507056b7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java @@ -46,6 +46,7 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -178,7 +179,13 @@ private void applyCreateTable(CreateTableEvent event) { } PartitionSpec partitionSpec = generatePartitionSpec(icebergSchema, partitionColumns); if (!catalog.tableExists(tableIdentifier)) { - catalog.createTable(tableIdentifier, icebergSchema, partitionSpec, tableOptions); + Table table = + catalog.createTable( + tableIdentifier, icebergSchema, partitionSpec, tableOptions); + + // Step3: Apply default values if any columns have them. + applyDefaultValues(table, cdcSchema); + LOG.info( "Spend {} ms to create iceberg table {}", System.currentTimeMillis() - startTimestamp, @@ -189,6 +196,25 @@ private void applyCreateTable(CreateTableEvent event) { } } + private void applyDefaultValues( + Table table, org.apache.flink.cdc.common.schema.Schema cdcSchema) { + UpdateSchema updateSchema = null; + for (Column column : cdcSchema.getColumns()) { + Literal defaultValue = + IcebergTypeUtils.parseDefaultValue( + column.getDefaultValueExpression(), column.getType()); + if (defaultValue != null) { + if (updateSchema == null) { + updateSchema = table.updateSchema(); + } + updateSchema.updateColumnDefault(column.getName(), defaultValue); + } + } + if (updateSchema != null) { + updateSchema.commit(); + } + } + private void applyAddColumn(AddColumnEvent event) { TableIdentifier tableIdentifier = TableIdentifier.parse(event.tableId().identifier()); try { @@ -212,16 +238,20 @@ private void applyAddColumnEventWithPosition(Table table, AddColumnEvent event) FlinkSchemaUtil.convert( DataTypeUtils.toFlinkDataType(addColumn.getType()) .getLogicalType()); + updateSchema.addColumn(columnName, icebergType, columnComment); + Literal defaultValue = + IcebergTypeUtils.parseDefaultValue( + addColumn.getDefaultValueExpression(), addColumn.getType()); + if (defaultValue != null) { + updateSchema.updateColumnDefault(columnName, defaultValue); + } switch (columnWithPosition.getPosition()) { case FIRST: - updateSchema.addColumn(columnName, icebergType, columnComment); - table.updateSchema().moveFirst(columnName); + updateSchema.moveFirst(columnName); break; case LAST: - updateSchema.addColumn(columnName, icebergType, columnComment); break; case BEFORE: - updateSchema.addColumn(columnName, icebergType, columnComment); updateSchema.moveBefore( columnName, columnWithPosition.getExistedColumnName()); break; @@ -229,7 +259,6 @@ private void applyAddColumnEventWithPosition(Table table, AddColumnEvent event) checkNotNull( columnWithPosition.getExistedColumnName(), "Existing column name must be provided for AFTER position"); - updateSchema.addColumn(columnName, icebergType, columnComment); updateSchema.moveAfter( columnName, columnWithPosition.getExistedColumnName()); break; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java index 102b4dbad26..49a174a2a34 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java @@ -28,8 +28,13 @@ import org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSink; import org.apache.flink.table.data.TimestampData; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; import java.time.ZoneId; import java.util.ArrayList; @@ -41,6 +46,8 @@ /** Util class for types in {@link IcebergDataSink}. */ public class IcebergTypeUtils { + private static final Logger LOG = LoggerFactory.getLogger(IcebergTypeUtils.class); + /** Convert column from Flink CDC to Iceberg format. */ public static Types.NestedField convertCdcColumnToIcebergField( int index, PhysicalColumn column) { @@ -53,6 +60,66 @@ public static Types.NestedField convertCdcColumnToIcebergField( column.getComment()); } + /** + * Parse a CDC default value expression string into an Iceberg {@link Literal}. + * + * @return the parsed Literal, or null if the expression is null or cannot be parsed for the + * given type. + */ + @Nullable + public static Literal parseDefaultValue( + @Nullable String defaultValueExpression, DataType cdcType) { + if (defaultValueExpression == null) { + return null; + } + try { + switch (cdcType.getTypeRoot()) { + case CHAR: + case VARCHAR: + return Literal.of(defaultValueExpression); + case BOOLEAN: + if ("true".equalsIgnoreCase(defaultValueExpression)) { + return Literal.of(true); + } else if ("false".equalsIgnoreCase(defaultValueExpression)) { + return Literal.of(false); + } else { + LOG.warn( + "Invalid boolean default value '{}', skipping default value.", + defaultValueExpression); + return null; + } + case TINYINT: + case SMALLINT: + case INTEGER: + return Literal.of(Integer.parseInt(defaultValueExpression)); + case BIGINT: + return Literal.of(Long.parseLong(defaultValueExpression)); + case FLOAT: + return Literal.of(Float.parseFloat(defaultValueExpression)); + case DOUBLE: + return Literal.of(Double.parseDouble(defaultValueExpression)); + case DECIMAL: + int scale = DataTypes.getScale(cdcType).orElse(0); + return Literal.of( + new java.math.BigDecimal(defaultValueExpression) + .setScale(scale, java.math.RoundingMode.HALF_UP)); + default: + LOG.warn( + "Unsupported default value type {} for expression '{}', skipping default value.", + cdcType.getTypeRoot(), + defaultValueExpression); + return null; + } + } catch (NumberFormatException e) { + LOG.warn( + "Failed to parse default value '{}' for type {}, skipping default value.", + defaultValueExpression, + cdcType.getTypeRoot(), + e); + return null; + } + } + /** * Convert data type from Flink CDC to Iceberg format, refer to .... diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java index 6a491e21979..4a9742e7f18 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java @@ -41,7 +41,6 @@ import org.testcontainers.lifecycle.Startables; import java.io.File; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -144,58 +143,49 @@ public void runSchemaChangeTest(Map catalogOptions) { .build()); icebergMetadataApplier.applySchemaChange(createTableEvent); Table table = catalog.loadTable(TableIdentifier.parse(defaultTableId)); - org.apache.iceberg.Schema schema = - new org.apache.iceberg.Schema( - 0, - Arrays.asList( - Types.NestedField.of( - 1, false, "id", Types.LongType.get(), "column for id"), - Types.NestedField.of( - 2, - false, - "name", - Types.StringType.get(), - "column for name"), - Types.NestedField.of( - 3, - true, - "tinyIntCol", - Types.IntegerType.get(), - "column for tinyIntCol"), - Types.NestedField.of( - 4, - true, - "description", - Types.StringType.get(), - "column for descriptions"), - Types.NestedField.of( - 5, - true, - "bool_column", - Types.BooleanType.get(), - "column for bool"), - Types.NestedField.of( - 6, - true, - "float_column", - Types.FloatType.get(), - "column for float"), - Types.NestedField.of( - 7, - true, - "double_column", - Types.DoubleType.get(), - "column for double"), - Types.NestedField.of( - 8, - true, - "decimal_column", - Types.DecimalType.of(10, 2), - "column for decimal")), - new HashSet<>(Collections.singletonList(1))); - assertThat(table.schema().sameSchema(schema)).isTrue(); - - // Add column. + // Verify schema structure (column names, types, nullability). + assertThat(table.schema().columns()).hasSize(8); + assertThat(table.schema().findField("id").type()).isInstanceOf(Types.LongType.class); + assertThat(table.schema().findField("id").isOptional()).isFalse(); + assertThat(table.schema().findField("name").type()).isInstanceOf(Types.StringType.class); + assertThat(table.schema().findField("name").isOptional()).isFalse(); + assertThat(table.schema().findField("tinyIntCol").type()) + .isInstanceOf(Types.IntegerType.class); + assertThat(table.schema().findField("description").type()) + .isInstanceOf(Types.StringType.class); + assertThat(table.schema().findField("bool_column").type()) + .isInstanceOf(Types.BooleanType.class); + assertThat(table.schema().findField("float_column").type()) + .isInstanceOf(Types.FloatType.class); + assertThat(table.schema().findField("double_column").type()) + .isInstanceOf(Types.DoubleType.class); + assertThat(table.schema().findField("decimal_column").type()) + .isEqualTo(Types.DecimalType.of(10, 2)); + assertThat(table.schema().identifierFieldIds()) + .isEqualTo(new HashSet<>(Collections.singletonList(1))); + + // Verify default values after create table. + // "id" has "AUTO_DECREMENT()" which is unparseable for BIGINT, so no default + assertThat(table.schema().findField("id").initialDefault()).isNull(); + assertThat(table.schema().findField("id").writeDefault()).isNull(); + // "name" has "John Smith" which is a valid string default + assertThat(table.schema().findField("name").writeDefault()).isEqualTo("John Smith"); + // "tinyIntCol" has "1" which maps to Integer + assertThat(table.schema().findField("tinyIntCol").writeDefault()).isEqualTo(1); + // "description" has "not important" which is a valid string default + assertThat(table.schema().findField("description").writeDefault()) + .isEqualTo("not important"); + // "bool_column" has "false" + assertThat(table.schema().findField("bool_column").writeDefault()).isEqualTo(false); + // "float_column" has "1.0" + assertThat(table.schema().findField("float_column").writeDefault()).isEqualTo(1.0f); + // "double_column" has "1.0" + assertThat(table.schema().findField("double_column").writeDefault()).isEqualTo(1.0d); + // "decimal_column" has "1.0" + assertThat(table.schema().findField("decimal_column").writeDefault()) + .isEqualTo(new java.math.BigDecimal("1.00")); + + // Add column with default value. AddColumnEvent addColumnEvent = new AddColumnEvent( tableId, @@ -205,177 +195,35 @@ public void runSchemaChangeTest(Map catalogOptions) { "newIntColumn", DataTypes.INT(), "comment for newIntColumn", - "not important")))); + "42")))); icebergMetadataApplier.applySchemaChange(addColumnEvent); table = catalog.loadTable(TableIdentifier.parse(defaultTableId)); - schema = - new org.apache.iceberg.Schema( - 0, - Arrays.asList( - Types.NestedField.of( - 1, false, "id", Types.LongType.get(), "column for id"), - Types.NestedField.of( - 2, - false, - "name", - Types.StringType.get(), - "column for name"), - Types.NestedField.of( - 3, - true, - "tinyIntCol", - Types.IntegerType.get(), - "column for tinyIntCol"), - Types.NestedField.of( - 4, - true, - "description", - Types.StringType.get(), - "column for descriptions"), - Types.NestedField.of( - 5, - true, - "bool_column", - Types.BooleanType.get(), - "column for bool"), - Types.NestedField.of( - 6, - true, - "float_column", - Types.FloatType.get(), - "column for float"), - Types.NestedField.of( - 7, - true, - "double_column", - Types.DoubleType.get(), - "column for double"), - Types.NestedField.of( - 8, - true, - "decimal_column", - Types.DecimalType.of(10, 2), - "column for decimal"), - Types.NestedField.of( - 9, - true, - "newIntColumn", - Types.IntegerType.get(), - "comment for newIntColumn")), - new HashSet<>(Collections.singletonList(1))); - assertThat(table.schema().sameSchema(schema)).isTrue(); + assertThat(table.schema().columns()).hasSize(9); + assertThat(table.schema().findField("newIntColumn").type()) + .isInstanceOf(Types.IntegerType.class); + assertThat(table.schema().findField("newIntColumn").doc()) + .isEqualTo("comment for newIntColumn"); + + // Verify default value for added column. + assertThat(table.schema().findField("newIntColumn").writeDefault()).isEqualTo(42); // Drop Column. DropColumnEvent dropColumnEvent = new DropColumnEvent(tableId, Collections.singletonList("description")); icebergMetadataApplier.applySchemaChange(dropColumnEvent); table = catalog.loadTable(TableIdentifier.parse(defaultTableId)); - schema = - new org.apache.iceberg.Schema( - 0, - Arrays.asList( - Types.NestedField.of( - 1, false, "id", Types.LongType.get(), "column for id"), - Types.NestedField.of( - 2, - false, - "name", - Types.StringType.get(), - "column for name"), - Types.NestedField.of( - 3, - true, - "tinyIntCol", - Types.IntegerType.get(), - "column for tinyIntCol"), - Types.NestedField.of( - 5, - true, - "bool_column", - Types.BooleanType.get(), - "column for bool"), - Types.NestedField.of( - 6, - true, - "float_column", - Types.FloatType.get(), - "column for float"), - Types.NestedField.of( - 7, - true, - "double_column", - Types.DoubleType.get(), - "column for double"), - Types.NestedField.of( - 8, - true, - "decimal_column", - Types.DecimalType.of(10, 2), - "column for decimal"), - Types.NestedField.of( - 9, - true, - "newIntColumn", - Types.IntegerType.get(), - "comment for newIntColumn")), - new HashSet<>(Collections.singletonList(1))); - assertThat(table.schema().sameSchema(schema)).isTrue(); + assertThat(table.schema().columns()).hasSize(8); + assertThat(table.schema().findField("description")).isNull(); // Rename Column. RenameColumnEvent renameColumnEvent = new RenameColumnEvent(tableId, ImmutableMap.of("newIntColumn", "renamedIntColumn")); icebergMetadataApplier.applySchemaChange(renameColumnEvent); table = catalog.loadTable(TableIdentifier.parse(defaultTableId)); - schema = - new org.apache.iceberg.Schema( - 0, - Arrays.asList( - Types.NestedField.of( - 1, false, "id", Types.LongType.get(), "column for id"), - Types.NestedField.of( - 2, - false, - "name", - Types.StringType.get(), - "column for name"), - Types.NestedField.of( - 3, - true, - "tinyIntCol", - Types.IntegerType.get(), - "column for tinyIntCol"), - Types.NestedField.of( - 5, - true, - "bool_column", - Types.BooleanType.get(), - "column for bool"), - Types.NestedField.of( - 6, - true, - "float_column", - Types.FloatType.get(), - "column for float"), - Types.NestedField.of( - 7, - true, - "double_column", - Types.DoubleType.get(), - "column for double"), - Types.NestedField.of( - 8, - true, - "decimal_column", - Types.DecimalType.of(10, 2), - "column for decimal"), - Types.NestedField.of( - 9, - true, - "renamedIntColumn", - Types.IntegerType.get(), - "comment for newIntColumn")), - new HashSet<>(Collections.singletonList(1))); - assertThat(table.schema().sameSchema(schema)).isTrue(); + assertThat(table.schema().columns()).hasSize(8); + assertThat(table.schema().findField("newIntColumn")).isNull(); + assertThat(table.schema().findField("renamedIntColumn").type()) + .isInstanceOf(Types.IntegerType.class); // Alter Column Type. AlterColumnTypeEvent alterColumnTypeEvent = @@ -383,55 +231,8 @@ public void runSchemaChangeTest(Map catalogOptions) { tableId, ImmutableMap.of("renamedIntColumn", DataTypes.BIGINT())); icebergMetadataApplier.applySchemaChange(alterColumnTypeEvent); table = catalog.loadTable(TableIdentifier.parse(defaultTableId)); - schema = - new org.apache.iceberg.Schema( - 0, - Arrays.asList( - Types.NestedField.of( - 1, false, "id", Types.LongType.get(), "column for id"), - Types.NestedField.of( - 2, - false, - "name", - Types.StringType.get(), - "column for name"), - Types.NestedField.of( - 3, - true, - "tinyIntCol", - Types.IntegerType.get(), - "column for tinyIntCol"), - Types.NestedField.of( - 5, - true, - "bool_column", - Types.BooleanType.get(), - "column for bool"), - Types.NestedField.of( - 6, - true, - "float_column", - Types.FloatType.get(), - "column for float"), - Types.NestedField.of( - 7, - true, - "double_column", - Types.DoubleType.get(), - "column for double"), - Types.NestedField.of( - 8, - true, - "decimal_column", - Types.DecimalType.of(10, 2), - "column for decimal"), - Types.NestedField.of( - 9, - true, - "renamedIntColumn", - Types.LongType.get(), - "comment for newIntColumn")), - new HashSet<>(Collections.singletonList(1))); - assertThat(table.schema().sameSchema(schema)).isTrue(); + assertThat(table.schema().columns()).hasSize(8); + assertThat(table.schema().findField("renamedIntColumn").type()) + .isInstanceOf(Types.LongType.class); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtilsTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtilsTest.java new file mode 100644 index 00000000000..faafcc5a51c --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtilsTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.iceberg.sink.utils; + +import org.apache.flink.cdc.common.types.DataTypes; + +import org.apache.iceberg.expressions.Literal; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link IcebergTypeUtils}. */ +public class IcebergTypeUtilsTest { + + @Test + public void testParseDefaultValueNull() { + assertThat(IcebergTypeUtils.parseDefaultValue(null, DataTypes.STRING())).isNull(); + } + + @Test + public void testParseDefaultValueString() { + Literal result = IcebergTypeUtils.parseDefaultValue("hello", DataTypes.STRING()); + assertThat(result).isNotNull(); + assertThat(result.value().toString()).isEqualTo("hello"); + } + + @Test + public void testParseDefaultValueVarchar() { + Literal result = + IcebergTypeUtils.parseDefaultValue("John Smith", DataTypes.VARCHAR(255)); + assertThat(result).isNotNull(); + assertThat(result.value().toString()).isEqualTo("John Smith"); + } + + @Test + public void testParseDefaultValueBoolean() { + Literal result = IcebergTypeUtils.parseDefaultValue("true", DataTypes.BOOLEAN()); + assertThat(result).isNotNull(); + assertThat(result.value()).isEqualTo(true); + + result = IcebergTypeUtils.parseDefaultValue("false", DataTypes.BOOLEAN()); + assertThat(result).isNotNull(); + assertThat(result.value()).isEqualTo(false); + } + + @Test + public void testParseDefaultValueBooleanInvalid() { + // Non-canonical boolean values should return null + assertThat(IcebergTypeUtils.parseDefaultValue("1", DataTypes.BOOLEAN())).isNull(); + assertThat(IcebergTypeUtils.parseDefaultValue("yes", DataTypes.BOOLEAN())).isNull(); + assertThat(IcebergTypeUtils.parseDefaultValue("garbage", DataTypes.BOOLEAN())).isNull(); + } + + @Test + public void testParseDefaultValueInteger() { + Literal result = IcebergTypeUtils.parseDefaultValue("42", DataTypes.INT()); + assertThat(result).isNotNull(); + assertThat(result.value()).isEqualTo(42); + } + + @Test + public void testParseDefaultValueTinyInt() { + Literal result = IcebergTypeUtils.parseDefaultValue("1", DataTypes.TINYINT()); + assertThat(result).isNotNull(); + assertThat(result.value()).isEqualTo(1); + } + + @Test + public void testParseDefaultValueSmallInt() { + Literal result = IcebergTypeUtils.parseDefaultValue("100", DataTypes.SMALLINT()); + assertThat(result).isNotNull(); + assertThat(result.value()).isEqualTo(100); + } + + @Test + public void testParseDefaultValueBigInt() { + Literal result = IcebergTypeUtils.parseDefaultValue("9999999999", DataTypes.BIGINT()); + assertThat(result).isNotNull(); + assertThat(result.value()).isEqualTo(9999999999L); + } + + @Test + public void testParseDefaultValueFloat() { + Literal result = IcebergTypeUtils.parseDefaultValue("1.5", DataTypes.FLOAT()); + assertThat(result).isNotNull(); + assertThat(result.value()).isEqualTo(1.5f); + } + + @Test + public void testParseDefaultValueDouble() { + Literal result = IcebergTypeUtils.parseDefaultValue("3.14", DataTypes.DOUBLE()); + assertThat(result).isNotNull(); + assertThat(result.value()).isEqualTo(3.14); + } + + @Test + public void testParseDefaultValueDecimal() { + Literal result = IcebergTypeUtils.parseDefaultValue("1.23", DataTypes.DECIMAL(10, 2)); + assertThat(result).isNotNull(); + assertThat(result.value()).isEqualTo(new BigDecimal("1.23")); + } + + @Test + public void testParseDefaultValueUnsupportedType() { + Literal result = IcebergTypeUtils.parseDefaultValue("data", DataTypes.BYTES()); + assertThat(result).isNull(); + } + + @Test + public void testParseDefaultValueInvalidNumber() { + Literal result = IcebergTypeUtils.parseDefaultValue("not_a_number", DataTypes.INT()); + assertThat(result).isNull(); + } + + @Test + public void testParseDefaultValueFunctionExpression() { + // Function expressions like AUTO_DECREMENT() cannot be parsed as numbers + Literal result = + IcebergTypeUtils.parseDefaultValue("AUTO_DECREMENT()", DataTypes.BIGINT()); + assertThat(result).isNull(); + } +} From 85e7fea82bcf0bb3e6abd56119830bd89a40758d Mon Sep 17 00:00:00 2001 From: suhwan-cheon Date: Mon, 9 Mar 2026 13:23:06 +0900 Subject: [PATCH 2/8] refactor test code --- .../iceberg/sink/IcebergMetadataApplier.java | 1 - .../sink/IcebergMetadataApplierTest.java | 390 +++++++++++++++--- 2 files changed, 332 insertions(+), 59 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java index 2c5507056b7..0f9aca6be06 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java @@ -183,7 +183,6 @@ private void applyCreateTable(CreateTableEvent event) { catalog.createTable( tableIdentifier, icebergSchema, partitionSpec, tableOptions); - // Step3: Apply default values if any columns have them. applyDefaultValues(table, cdcSchema); LOG.info( diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java index 4a9742e7f18..e742ffb68c7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java @@ -41,6 +41,8 @@ import org.testcontainers.lifecycle.Startables; import java.io.File; +import java.math.BigDecimal; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -143,47 +145,72 @@ public void runSchemaChangeTest(Map catalogOptions) { .build()); icebergMetadataApplier.applySchemaChange(createTableEvent); Table table = catalog.loadTable(TableIdentifier.parse(defaultTableId)); - // Verify schema structure (column names, types, nullability). - assertThat(table.schema().columns()).hasSize(8); - assertThat(table.schema().findField("id").type()).isInstanceOf(Types.LongType.class); - assertThat(table.schema().findField("id").isOptional()).isFalse(); - assertThat(table.schema().findField("name").type()).isInstanceOf(Types.StringType.class); - assertThat(table.schema().findField("name").isOptional()).isFalse(); - assertThat(table.schema().findField("tinyIntCol").type()) - .isInstanceOf(Types.IntegerType.class); - assertThat(table.schema().findField("description").type()) - .isInstanceOf(Types.StringType.class); - assertThat(table.schema().findField("bool_column").type()) - .isInstanceOf(Types.BooleanType.class); - assertThat(table.schema().findField("float_column").type()) - .isInstanceOf(Types.FloatType.class); - assertThat(table.schema().findField("double_column").type()) - .isInstanceOf(Types.DoubleType.class); - assertThat(table.schema().findField("decimal_column").type()) - .isEqualTo(Types.DecimalType.of(10, 2)); - assertThat(table.schema().identifierFieldIds()) - .isEqualTo(new HashSet<>(Collections.singletonList(1))); - - // Verify default values after create table. - // "id" has "AUTO_DECREMENT()" which is unparseable for BIGINT, so no default - assertThat(table.schema().findField("id").initialDefault()).isNull(); - assertThat(table.schema().findField("id").writeDefault()).isNull(); - // "name" has "John Smith" which is a valid string default - assertThat(table.schema().findField("name").writeDefault()).isEqualTo("John Smith"); - // "tinyIntCol" has "1" which maps to Integer - assertThat(table.schema().findField("tinyIntCol").writeDefault()).isEqualTo(1); - // "description" has "not important" which is a valid string default - assertThat(table.schema().findField("description").writeDefault()) - .isEqualTo("not important"); - // "bool_column" has "false" - assertThat(table.schema().findField("bool_column").writeDefault()).isEqualTo(false); - // "float_column" has "1.0" - assertThat(table.schema().findField("float_column").writeDefault()).isEqualTo(1.0f); - // "double_column" has "1.0" - assertThat(table.schema().findField("double_column").writeDefault()).isEqualTo(1.0d); - // "decimal_column" has "1.0" - assertThat(table.schema().findField("decimal_column").writeDefault()) - .isEqualTo(new java.math.BigDecimal("1.00")); + org.apache.iceberg.Schema schema = + new org.apache.iceberg.Schema( + 0, + Arrays.asList( + // id: "AUTO_DECREMENT()" is unparseable, no default + Types.NestedField.of( + 1, false, "id", Types.LongType.get(), "column for id"), + // name: "John Smith" is a valid string default + Types.NestedField.builder() + .withId(2) + .asRequired() + .withName("name") + .ofType(Types.StringType.get()) + .withDoc("column for name") + .withWriteDefault("John Smith") + .build(), + Types.NestedField.builder() + .withId(3) + .asOptional() + .withName("tinyIntCol") + .ofType(Types.IntegerType.get()) + .withDoc("column for tinyIntCol") + .withWriteDefault(1) + .build(), + Types.NestedField.builder() + .withId(4) + .asOptional() + .withName("description") + .ofType(Types.StringType.get()) + .withDoc("column for descriptions") + .withWriteDefault("not important") + .build(), + Types.NestedField.builder() + .withId(5) + .asOptional() + .withName("bool_column") + .ofType(Types.BooleanType.get()) + .withDoc("column for bool") + .withWriteDefault(false) + .build(), + Types.NestedField.builder() + .withId(6) + .asOptional() + .withName("float_column") + .ofType(Types.FloatType.get()) + .withDoc("column for float") + .withWriteDefault(1.0f) + .build(), + Types.NestedField.builder() + .withId(7) + .asOptional() + .withName("double_column") + .ofType(Types.DoubleType.get()) + .withDoc("column for double") + .withWriteDefault(1.0d) + .build(), + Types.NestedField.builder() + .withId(8) + .asOptional() + .withName("decimal_column") + .ofType(Types.DecimalType.of(10, 2)) + .withDoc("column for decimal") + .withWriteDefault(new BigDecimal("1.00")) + .build()), + new HashSet<>(Collections.singletonList(1))); + assertThat(table.schema().sameSchema(schema)).isTrue(); // Add column with default value. AddColumnEvent addColumnEvent = @@ -198,32 +225,218 @@ public void runSchemaChangeTest(Map catalogOptions) { "42")))); icebergMetadataApplier.applySchemaChange(addColumnEvent); table = catalog.loadTable(TableIdentifier.parse(defaultTableId)); - assertThat(table.schema().columns()).hasSize(9); - assertThat(table.schema().findField("newIntColumn").type()) - .isInstanceOf(Types.IntegerType.class); - assertThat(table.schema().findField("newIntColumn").doc()) - .isEqualTo("comment for newIntColumn"); - - // Verify default value for added column. - assertThat(table.schema().findField("newIntColumn").writeDefault()).isEqualTo(42); + schema = + new org.apache.iceberg.Schema( + 0, + Arrays.asList( + Types.NestedField.of( + 1, false, "id", Types.LongType.get(), "column for id"), + Types.NestedField.builder() + .withId(2) + .asRequired() + .withName("name") + .ofType(Types.StringType.get()) + .withDoc("column for name") + .withWriteDefault("John Smith") + .build(), + Types.NestedField.builder() + .withId(3) + .asOptional() + .withName("tinyIntCol") + .ofType(Types.IntegerType.get()) + .withDoc("column for tinyIntCol") + .withWriteDefault(1) + .build(), + Types.NestedField.builder() + .withId(4) + .asOptional() + .withName("description") + .ofType(Types.StringType.get()) + .withDoc("column for descriptions") + .withWriteDefault("not important") + .build(), + Types.NestedField.builder() + .withId(5) + .asOptional() + .withName("bool_column") + .ofType(Types.BooleanType.get()) + .withDoc("column for bool") + .withWriteDefault(false) + .build(), + Types.NestedField.builder() + .withId(6) + .asOptional() + .withName("float_column") + .ofType(Types.FloatType.get()) + .withDoc("column for float") + .withWriteDefault(1.0f) + .build(), + Types.NestedField.builder() + .withId(7) + .asOptional() + .withName("double_column") + .ofType(Types.DoubleType.get()) + .withDoc("column for double") + .withWriteDefault(1.0d) + .build(), + Types.NestedField.builder() + .withId(8) + .asOptional() + .withName("decimal_column") + .ofType(Types.DecimalType.of(10, 2)) + .withDoc("column for decimal") + .withWriteDefault(new BigDecimal("1.00")) + .build(), + Types.NestedField.builder() + .withId(9) + .asOptional() + .withName("newIntColumn") + .ofType(Types.IntegerType.get()) + .withDoc("comment for newIntColumn") + .withWriteDefault(42) + .build()), + new HashSet<>(Collections.singletonList(1))); + assertThat(table.schema().sameSchema(schema)).isTrue(); // Drop Column. DropColumnEvent dropColumnEvent = new DropColumnEvent(tableId, Collections.singletonList("description")); icebergMetadataApplier.applySchemaChange(dropColumnEvent); table = catalog.loadTable(TableIdentifier.parse(defaultTableId)); - assertThat(table.schema().columns()).hasSize(8); - assertThat(table.schema().findField("description")).isNull(); + schema = + new org.apache.iceberg.Schema( + 0, + Arrays.asList( + Types.NestedField.of( + 1, false, "id", Types.LongType.get(), "column for id"), + Types.NestedField.builder() + .withId(2) + .asRequired() + .withName("name") + .ofType(Types.StringType.get()) + .withDoc("column for name") + .withWriteDefault("John Smith") + .build(), + Types.NestedField.builder() + .withId(3) + .asOptional() + .withName("tinyIntCol") + .ofType(Types.IntegerType.get()) + .withDoc("column for tinyIntCol") + .withWriteDefault(1) + .build(), + Types.NestedField.builder() + .withId(5) + .asOptional() + .withName("bool_column") + .ofType(Types.BooleanType.get()) + .withDoc("column for bool") + .withWriteDefault(false) + .build(), + Types.NestedField.builder() + .withId(6) + .asOptional() + .withName("float_column") + .ofType(Types.FloatType.get()) + .withDoc("column for float") + .withWriteDefault(1.0f) + .build(), + Types.NestedField.builder() + .withId(7) + .asOptional() + .withName("double_column") + .ofType(Types.DoubleType.get()) + .withDoc("column for double") + .withWriteDefault(1.0d) + .build(), + Types.NestedField.builder() + .withId(8) + .asOptional() + .withName("decimal_column") + .ofType(Types.DecimalType.of(10, 2)) + .withDoc("column for decimal") + .withWriteDefault(new BigDecimal("1.00")) + .build(), + Types.NestedField.builder() + .withId(9) + .asOptional() + .withName("newIntColumn") + .ofType(Types.IntegerType.get()) + .withDoc("comment for newIntColumn") + .withWriteDefault(42) + .build()), + new HashSet<>(Collections.singletonList(1))); + assertThat(table.schema().sameSchema(schema)).isTrue(); // Rename Column. RenameColumnEvent renameColumnEvent = new RenameColumnEvent(tableId, ImmutableMap.of("newIntColumn", "renamedIntColumn")); icebergMetadataApplier.applySchemaChange(renameColumnEvent); table = catalog.loadTable(TableIdentifier.parse(defaultTableId)); - assertThat(table.schema().columns()).hasSize(8); - assertThat(table.schema().findField("newIntColumn")).isNull(); - assertThat(table.schema().findField("renamedIntColumn").type()) - .isInstanceOf(Types.IntegerType.class); + schema = + new org.apache.iceberg.Schema( + 0, + Arrays.asList( + Types.NestedField.of( + 1, false, "id", Types.LongType.get(), "column for id"), + Types.NestedField.builder() + .withId(2) + .asRequired() + .withName("name") + .ofType(Types.StringType.get()) + .withDoc("column for name") + .withWriteDefault("John Smith") + .build(), + Types.NestedField.builder() + .withId(3) + .asOptional() + .withName("tinyIntCol") + .ofType(Types.IntegerType.get()) + .withDoc("column for tinyIntCol") + .withWriteDefault(1) + .build(), + Types.NestedField.builder() + .withId(5) + .asOptional() + .withName("bool_column") + .ofType(Types.BooleanType.get()) + .withDoc("column for bool") + .withWriteDefault(false) + .build(), + Types.NestedField.builder() + .withId(6) + .asOptional() + .withName("float_column") + .ofType(Types.FloatType.get()) + .withDoc("column for float") + .withWriteDefault(1.0f) + .build(), + Types.NestedField.builder() + .withId(7) + .asOptional() + .withName("double_column") + .ofType(Types.DoubleType.get()) + .withDoc("column for double") + .withWriteDefault(1.0d) + .build(), + Types.NestedField.builder() + .withId(8) + .asOptional() + .withName("decimal_column") + .ofType(Types.DecimalType.of(10, 2)) + .withDoc("column for decimal") + .withWriteDefault(new BigDecimal("1.00")) + .build(), + Types.NestedField.builder() + .withId(9) + .asOptional() + .withName("renamedIntColumn") + .ofType(Types.IntegerType.get()) + .withDoc("comment for newIntColumn") + .withWriteDefault(42) + .build()), + new HashSet<>(Collections.singletonList(1))); + assertThat(table.schema().sameSchema(schema)).isTrue(); // Alter Column Type. AlterColumnTypeEvent alterColumnTypeEvent = @@ -231,8 +444,69 @@ public void runSchemaChangeTest(Map catalogOptions) { tableId, ImmutableMap.of("renamedIntColumn", DataTypes.BIGINT())); icebergMetadataApplier.applySchemaChange(alterColumnTypeEvent); table = catalog.loadTable(TableIdentifier.parse(defaultTableId)); - assertThat(table.schema().columns()).hasSize(8); - assertThat(table.schema().findField("renamedIntColumn").type()) - .isInstanceOf(Types.LongType.class); + schema = + new org.apache.iceberg.Schema( + 0, + Arrays.asList( + Types.NestedField.of( + 1, false, "id", Types.LongType.get(), "column for id"), + Types.NestedField.builder() + .withId(2) + .asRequired() + .withName("name") + .ofType(Types.StringType.get()) + .withDoc("column for name") + .withWriteDefault("John Smith") + .build(), + Types.NestedField.builder() + .withId(3) + .asOptional() + .withName("tinyIntCol") + .ofType(Types.IntegerType.get()) + .withDoc("column for tinyIntCol") + .withWriteDefault(1) + .build(), + Types.NestedField.builder() + .withId(5) + .asOptional() + .withName("bool_column") + .ofType(Types.BooleanType.get()) + .withDoc("column for bool") + .withWriteDefault(false) + .build(), + Types.NestedField.builder() + .withId(6) + .asOptional() + .withName("float_column") + .ofType(Types.FloatType.get()) + .withDoc("column for float") + .withWriteDefault(1.0f) + .build(), + Types.NestedField.builder() + .withId(7) + .asOptional() + .withName("double_column") + .ofType(Types.DoubleType.get()) + .withDoc("column for double") + .withWriteDefault(1.0d) + .build(), + Types.NestedField.builder() + .withId(8) + .asOptional() + .withName("decimal_column") + .ofType(Types.DecimalType.of(10, 2)) + .withDoc("column for decimal") + .withWriteDefault(new BigDecimal("1.00")) + .build(), + Types.NestedField.builder() + .withId(9) + .asOptional() + .withName("renamedIntColumn") + .ofType(Types.LongType.get()) + .withDoc("comment for newIntColumn") + .withWriteDefault(42L) + .build()), + new HashSet<>(Collections.singletonList(1))); + assertThat(table.schema().sameSchema(schema)).isTrue(); } } From bb2ffa251c055fc701e13a3d5bfa1f98acfca909 Mon Sep 17 00:00:00 2001 From: suhwan-cheon Date: Mon, 9 Mar 2026 13:23:06 +0900 Subject: [PATCH 3/8] change deprecated functions --- .../sink/IcebergMetadataApplierTest.java | 120 +++++++++++------- 1 file changed, 72 insertions(+), 48 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java index e742ffb68c7..c6b8301a660 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java @@ -35,6 +35,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -149,17 +150,20 @@ public void runSchemaChangeTest(Map catalogOptions) { new org.apache.iceberg.Schema( 0, Arrays.asList( - // id: "AUTO_DECREMENT()" is unparseable, no default - Types.NestedField.of( - 1, false, "id", Types.LongType.get(), "column for id"), - // name: "John Smith" is a valid string default + Types.NestedField.builder() + .withId(1) + .asRequired() + .withName("id") + .ofType(Types.LongType.get()) + .withDoc("column for id") + .build(), Types.NestedField.builder() .withId(2) .asRequired() .withName("name") .ofType(Types.StringType.get()) .withDoc("column for name") - .withWriteDefault("John Smith") + .withWriteDefault(Literal.of("John Smith")) .build(), Types.NestedField.builder() .withId(3) @@ -167,7 +171,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("tinyIntCol") .ofType(Types.IntegerType.get()) .withDoc("column for tinyIntCol") - .withWriteDefault(1) + .withWriteDefault(Literal.of(1)) .build(), Types.NestedField.builder() .withId(4) @@ -175,7 +179,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("description") .ofType(Types.StringType.get()) .withDoc("column for descriptions") - .withWriteDefault("not important") + .withWriteDefault(Literal.of("not important")) .build(), Types.NestedField.builder() .withId(5) @@ -183,7 +187,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("bool_column") .ofType(Types.BooleanType.get()) .withDoc("column for bool") - .withWriteDefault(false) + .withWriteDefault(Literal.of(false)) .build(), Types.NestedField.builder() .withId(6) @@ -191,7 +195,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("float_column") .ofType(Types.FloatType.get()) .withDoc("column for float") - .withWriteDefault(1.0f) + .withWriteDefault(Literal.of(1.0f)) .build(), Types.NestedField.builder() .withId(7) @@ -199,7 +203,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("double_column") .ofType(Types.DoubleType.get()) .withDoc("column for double") - .withWriteDefault(1.0d) + .withWriteDefault(Literal.of(1.0d)) .build(), Types.NestedField.builder() .withId(8) @@ -207,7 +211,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("decimal_column") .ofType(Types.DecimalType.of(10, 2)) .withDoc("column for decimal") - .withWriteDefault(new BigDecimal("1.00")) + .withWriteDefault(Literal.of(new BigDecimal("1.00"))) .build()), new HashSet<>(Collections.singletonList(1))); assertThat(table.schema().sameSchema(schema)).isTrue(); @@ -229,15 +233,20 @@ public void runSchemaChangeTest(Map catalogOptions) { new org.apache.iceberg.Schema( 0, Arrays.asList( - Types.NestedField.of( - 1, false, "id", Types.LongType.get(), "column for id"), + Types.NestedField.builder() + .withId(1) + .asRequired() + .withName("id") + .ofType(Types.LongType.get()) + .withDoc("column for id") + .build(), Types.NestedField.builder() .withId(2) .asRequired() .withName("name") .ofType(Types.StringType.get()) .withDoc("column for name") - .withWriteDefault("John Smith") + .withWriteDefault(Literal.of("John Smith")) .build(), Types.NestedField.builder() .withId(3) @@ -245,7 +254,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("tinyIntCol") .ofType(Types.IntegerType.get()) .withDoc("column for tinyIntCol") - .withWriteDefault(1) + .withWriteDefault(Literal.of(1)) .build(), Types.NestedField.builder() .withId(4) @@ -253,7 +262,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("description") .ofType(Types.StringType.get()) .withDoc("column for descriptions") - .withWriteDefault("not important") + .withWriteDefault(Literal.of("not important")) .build(), Types.NestedField.builder() .withId(5) @@ -261,7 +270,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("bool_column") .ofType(Types.BooleanType.get()) .withDoc("column for bool") - .withWriteDefault(false) + .withWriteDefault(Literal.of(false)) .build(), Types.NestedField.builder() .withId(6) @@ -269,7 +278,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("float_column") .ofType(Types.FloatType.get()) .withDoc("column for float") - .withWriteDefault(1.0f) + .withWriteDefault(Literal.of(1.0f)) .build(), Types.NestedField.builder() .withId(7) @@ -277,7 +286,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("double_column") .ofType(Types.DoubleType.get()) .withDoc("column for double") - .withWriteDefault(1.0d) + .withWriteDefault(Literal.of(1.0d)) .build(), Types.NestedField.builder() .withId(8) @@ -285,7 +294,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("decimal_column") .ofType(Types.DecimalType.of(10, 2)) .withDoc("column for decimal") - .withWriteDefault(new BigDecimal("1.00")) + .withWriteDefault(Literal.of(new BigDecimal("1.00"))) .build(), Types.NestedField.builder() .withId(9) @@ -293,7 +302,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("newIntColumn") .ofType(Types.IntegerType.get()) .withDoc("comment for newIntColumn") - .withWriteDefault(42) + .withWriteDefault(Literal.of(42)) .build()), new HashSet<>(Collections.singletonList(1))); assertThat(table.schema().sameSchema(schema)).isTrue(); @@ -307,15 +316,20 @@ public void runSchemaChangeTest(Map catalogOptions) { new org.apache.iceberg.Schema( 0, Arrays.asList( - Types.NestedField.of( - 1, false, "id", Types.LongType.get(), "column for id"), + Types.NestedField.builder() + .withId(1) + .asRequired() + .withName("id") + .ofType(Types.LongType.get()) + .withDoc("column for id") + .build(), Types.NestedField.builder() .withId(2) .asRequired() .withName("name") .ofType(Types.StringType.get()) .withDoc("column for name") - .withWriteDefault("John Smith") + .withWriteDefault(Literal.of("John Smith")) .build(), Types.NestedField.builder() .withId(3) @@ -323,7 +337,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("tinyIntCol") .ofType(Types.IntegerType.get()) .withDoc("column for tinyIntCol") - .withWriteDefault(1) + .withWriteDefault(Literal.of(1)) .build(), Types.NestedField.builder() .withId(5) @@ -331,7 +345,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("bool_column") .ofType(Types.BooleanType.get()) .withDoc("column for bool") - .withWriteDefault(false) + .withWriteDefault(Literal.of(false)) .build(), Types.NestedField.builder() .withId(6) @@ -339,7 +353,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("float_column") .ofType(Types.FloatType.get()) .withDoc("column for float") - .withWriteDefault(1.0f) + .withWriteDefault(Literal.of(1.0f)) .build(), Types.NestedField.builder() .withId(7) @@ -347,7 +361,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("double_column") .ofType(Types.DoubleType.get()) .withDoc("column for double") - .withWriteDefault(1.0d) + .withWriteDefault(Literal.of(1.0d)) .build(), Types.NestedField.builder() .withId(8) @@ -355,7 +369,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("decimal_column") .ofType(Types.DecimalType.of(10, 2)) .withDoc("column for decimal") - .withWriteDefault(new BigDecimal("1.00")) + .withWriteDefault(Literal.of(new BigDecimal("1.00"))) .build(), Types.NestedField.builder() .withId(9) @@ -363,7 +377,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("newIntColumn") .ofType(Types.IntegerType.get()) .withDoc("comment for newIntColumn") - .withWriteDefault(42) + .withWriteDefault(Literal.of(42)) .build()), new HashSet<>(Collections.singletonList(1))); assertThat(table.schema().sameSchema(schema)).isTrue(); @@ -377,15 +391,20 @@ public void runSchemaChangeTest(Map catalogOptions) { new org.apache.iceberg.Schema( 0, Arrays.asList( - Types.NestedField.of( - 1, false, "id", Types.LongType.get(), "column for id"), + Types.NestedField.builder() + .withId(1) + .asRequired() + .withName("id") + .ofType(Types.LongType.get()) + .withDoc("column for id") + .build(), Types.NestedField.builder() .withId(2) .asRequired() .withName("name") .ofType(Types.StringType.get()) .withDoc("column for name") - .withWriteDefault("John Smith") + .withWriteDefault(Literal.of("John Smith")) .build(), Types.NestedField.builder() .withId(3) @@ -393,7 +412,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("tinyIntCol") .ofType(Types.IntegerType.get()) .withDoc("column for tinyIntCol") - .withWriteDefault(1) + .withWriteDefault(Literal.of(1)) .build(), Types.NestedField.builder() .withId(5) @@ -401,7 +420,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("bool_column") .ofType(Types.BooleanType.get()) .withDoc("column for bool") - .withWriteDefault(false) + .withWriteDefault(Literal.of(false)) .build(), Types.NestedField.builder() .withId(6) @@ -409,7 +428,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("float_column") .ofType(Types.FloatType.get()) .withDoc("column for float") - .withWriteDefault(1.0f) + .withWriteDefault(Literal.of(1.0f)) .build(), Types.NestedField.builder() .withId(7) @@ -417,7 +436,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("double_column") .ofType(Types.DoubleType.get()) .withDoc("column for double") - .withWriteDefault(1.0d) + .withWriteDefault(Literal.of(1.0d)) .build(), Types.NestedField.builder() .withId(8) @@ -425,7 +444,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("decimal_column") .ofType(Types.DecimalType.of(10, 2)) .withDoc("column for decimal") - .withWriteDefault(new BigDecimal("1.00")) + .withWriteDefault(Literal.of(new BigDecimal("1.00"))) .build(), Types.NestedField.builder() .withId(9) @@ -433,7 +452,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("renamedIntColumn") .ofType(Types.IntegerType.get()) .withDoc("comment for newIntColumn") - .withWriteDefault(42) + .withWriteDefault(Literal.of(42)) .build()), new HashSet<>(Collections.singletonList(1))); assertThat(table.schema().sameSchema(schema)).isTrue(); @@ -448,15 +467,20 @@ public void runSchemaChangeTest(Map catalogOptions) { new org.apache.iceberg.Schema( 0, Arrays.asList( - Types.NestedField.of( - 1, false, "id", Types.LongType.get(), "column for id"), + Types.NestedField.builder() + .withId(1) + .asRequired() + .withName("id") + .ofType(Types.LongType.get()) + .withDoc("column for id") + .build(), Types.NestedField.builder() .withId(2) .asRequired() .withName("name") .ofType(Types.StringType.get()) .withDoc("column for name") - .withWriteDefault("John Smith") + .withWriteDefault(Literal.of("John Smith")) .build(), Types.NestedField.builder() .withId(3) @@ -464,7 +488,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("tinyIntCol") .ofType(Types.IntegerType.get()) .withDoc("column for tinyIntCol") - .withWriteDefault(1) + .withWriteDefault(Literal.of(1)) .build(), Types.NestedField.builder() .withId(5) @@ -472,7 +496,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("bool_column") .ofType(Types.BooleanType.get()) .withDoc("column for bool") - .withWriteDefault(false) + .withWriteDefault(Literal.of(false)) .build(), Types.NestedField.builder() .withId(6) @@ -480,7 +504,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("float_column") .ofType(Types.FloatType.get()) .withDoc("column for float") - .withWriteDefault(1.0f) + .withWriteDefault(Literal.of(1.0f)) .build(), Types.NestedField.builder() .withId(7) @@ -488,7 +512,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("double_column") .ofType(Types.DoubleType.get()) .withDoc("column for double") - .withWriteDefault(1.0d) + .withWriteDefault(Literal.of(1.0d)) .build(), Types.NestedField.builder() .withId(8) @@ -496,7 +520,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("decimal_column") .ofType(Types.DecimalType.of(10, 2)) .withDoc("column for decimal") - .withWriteDefault(new BigDecimal("1.00")) + .withWriteDefault(Literal.of(new BigDecimal("1.00"))) .build(), Types.NestedField.builder() .withId(9) @@ -504,7 +528,7 @@ public void runSchemaChangeTest(Map catalogOptions) { .withName("renamedIntColumn") .ofType(Types.LongType.get()) .withDoc("comment for newIntColumn") - .withWriteDefault(42L) + .withWriteDefault(Literal.of(42L)) .build()), new HashSet<>(Collections.singletonList(1))); assertThat(table.schema().sameSchema(schema)).isTrue(); From 8ea6a310bd8b513b0bbabc05cd1ea9fb675896f3 Mon Sep 17 00:00:00 2001 From: suhwan-cheon Date: Mon, 9 Mar 2026 13:23:06 +0900 Subject: [PATCH 4/8] Add does not contain default value schema --- .../sink/IcebergMetadataApplierTest.java | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java index c6b8301a660..fcda1e86b5e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java @@ -216,6 +216,70 @@ public void runSchemaChangeTest(Map catalogOptions) { new HashSet<>(Collections.singletonList(1))); assertThat(table.schema().sameSchema(schema)).isTrue(); + // Verify that schema without default values is NOT the same. + org.apache.iceberg.Schema schemaWithoutDefaults = + new org.apache.iceberg.Schema( + 0, + Arrays.asList( + Types.NestedField.builder() + .withId(1) + .asRequired() + .withName("id") + .ofType(Types.LongType.get()) + .withDoc("column for id") + .build(), + Types.NestedField.builder() + .withId(2) + .asRequired() + .withName("name") + .ofType(Types.StringType.get()) + .withDoc("column for name") + .build(), + Types.NestedField.builder() + .withId(3) + .asOptional() + .withName("tinyIntCol") + .ofType(Types.IntegerType.get()) + .withDoc("column for tinyIntCol") + .build(), + Types.NestedField.builder() + .withId(4) + .asOptional() + .withName("description") + .ofType(Types.StringType.get()) + .withDoc("column for descriptions") + .build(), + Types.NestedField.builder() + .withId(5) + .asOptional() + .withName("bool_column") + .ofType(Types.BooleanType.get()) + .withDoc("column for bool") + .build(), + Types.NestedField.builder() + .withId(6) + .asOptional() + .withName("float_column") + .ofType(Types.FloatType.get()) + .withDoc("column for float") + .build(), + Types.NestedField.builder() + .withId(7) + .asOptional() + .withName("double_column") + .ofType(Types.DoubleType.get()) + .withDoc("column for double") + .build(), + Types.NestedField.builder() + .withId(8) + .asOptional() + .withName("decimal_column") + .ofType(Types.DecimalType.of(10, 2)) + .withDoc("column for decimal") + .build()), + new HashSet<>(Collections.singletonList(1))); + assertThat(table.schema().sameSchema(schemaWithoutDefaults)).isFalse(); + // Add column with default value. AddColumnEvent addColumnEvent = new AddColumnEvent( From 88b49ed76cb483d497796b9fff329671dba42c36 Mon Sep 17 00:00:00 2001 From: suhwan-cheon Date: Mon, 9 Mar 2026 13:23:06 +0900 Subject: [PATCH 5/8] feat: add initial-default key --- .../cdc/connectors/iceberg/sink/IcebergMetadataApplier.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java index 0f9aca6be06..dac596dd8b8 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java @@ -237,12 +237,13 @@ private void applyAddColumnEventWithPosition(Table table, AddColumnEvent event) FlinkSchemaUtil.convert( DataTypeUtils.toFlinkDataType(addColumn.getType()) .getLogicalType()); - updateSchema.addColumn(columnName, icebergType, columnComment); Literal defaultValue = IcebergTypeUtils.parseDefaultValue( addColumn.getDefaultValueExpression(), addColumn.getType()); if (defaultValue != null) { - updateSchema.updateColumnDefault(columnName, defaultValue); + updateSchema.addColumn(columnName, icebergType, columnComment, defaultValue); + } else { + updateSchema.addColumn(columnName, icebergType, columnComment); } switch (columnWithPosition.getPosition()) { case FIRST: From 571f9676c5dbd54ca63b1c081a3038f10645a594 Mon Sep 17 00:00:00 2001 From: suhwan-cheon Date: Mon, 9 Mar 2026 13:23:06 +0900 Subject: [PATCH 6/8] fix: applying copilot feedback --- .../cdc/connectors/iceberg/sink/IcebergMetadataApplier.java | 3 +++ .../cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java index dac596dd8b8..99187377c3e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java @@ -252,6 +252,9 @@ private void applyAddColumnEventWithPosition(Table table, AddColumnEvent event) case LAST: break; case BEFORE: + checkNotNull( + columnWithPosition.getExistedColumnName(), + "Existing column name must be provided for BEFORE position"); updateSchema.moveBefore( columnName, columnWithPosition.getExistedColumnName()); break; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java index 49a174a2a34..138f248796a 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java @@ -72,6 +72,7 @@ public static Literal parseDefaultValue( if (defaultValueExpression == null) { return null; } + defaultValueExpression = defaultValueExpression.trim(); try { switch (cdcType.getTypeRoot()) { case CHAR: @@ -104,7 +105,7 @@ public static Literal parseDefaultValue( new java.math.BigDecimal(defaultValueExpression) .setScale(scale, java.math.RoundingMode.HALF_UP)); default: - LOG.warn( + LOG.debug( "Unsupported default value type {} for expression '{}', skipping default value.", cdcType.getTypeRoot(), defaultValueExpression); From dfd837f961492562b612c97838cd3c10d6055568 Mon Sep 17 00:00:00 2001 From: suhwan-cheon Date: Mon, 9 Mar 2026 13:23:06 +0900 Subject: [PATCH 7/8] delete: default normaliztion --- .../cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java index 138f248796a..83678daa68d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java @@ -72,7 +72,6 @@ public static Literal parseDefaultValue( if (defaultValueExpression == null) { return null; } - defaultValueExpression = defaultValueExpression.trim(); try { switch (cdcType.getTypeRoot()) { case CHAR: From 71443e7113f9d4f08cb626a073fe26ec76682e68 Mon Sep 17 00:00:00 2001 From: suhwan-cheon Date: Mon, 9 Mar 2026 15:59:50 +0900 Subject: [PATCH 8/8] fix: Skip default values for Iceberg format v2 tables --- .../iceberg/sink/IcebergMetadataApplier.java | 14 +- .../sink/IcebergMetadataApplierTest.java | 691 +++++++++++------- 2 files changed, 422 insertions(+), 283 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java index 99187377c3e..1707e8c6af7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java @@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -197,6 +198,9 @@ private void applyCreateTable(CreateTableEvent event) { private void applyDefaultValues( Table table, org.apache.flink.cdc.common.schema.Schema cdcSchema) { + if (getFormatVersion(table) < 3) { + return; + } UpdateSchema updateSchema = null; for (Column column : cdcSchema.getColumns()) { Literal defaultValue = @@ -240,8 +244,9 @@ private void applyAddColumnEventWithPosition(Table table, AddColumnEvent event) Literal defaultValue = IcebergTypeUtils.parseDefaultValue( addColumn.getDefaultValueExpression(), addColumn.getType()); - if (defaultValue != null) { + if (defaultValue != null && getFormatVersion(table) >= 3) { updateSchema.addColumn(columnName, icebergType, columnComment, defaultValue); + updateSchema.updateColumnDefault(columnName, defaultValue); } else { updateSchema.addColumn(columnName, icebergType, columnComment); } @@ -396,6 +401,13 @@ public Set getSupportedSchemaEvolutionTypes() { SchemaChangeEventType.ALTER_COLUMN_TYPE); } + private int getFormatVersion(Table table) { + if (table instanceof HasTableOperations) { + return ((HasTableOperations) table).operations().current().formatVersion(); + } + return 2; + } + @Override public void close() { catalog = null; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java index fcda1e86b5e..726a303b49d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java @@ -150,137 +150,54 @@ public void runSchemaChangeTest(Map catalogOptions) { new org.apache.iceberg.Schema( 0, Arrays.asList( - Types.NestedField.builder() - .withId(1) - .asRequired() - .withName("id") - .ofType(Types.LongType.get()) - .withDoc("column for id") - .build(), - Types.NestedField.builder() - .withId(2) - .asRequired() - .withName("name") - .ofType(Types.StringType.get()) - .withDoc("column for name") - .withWriteDefault(Literal.of("John Smith")) - .build(), - Types.NestedField.builder() - .withId(3) - .asOptional() - .withName("tinyIntCol") - .ofType(Types.IntegerType.get()) - .withDoc("column for tinyIntCol") - .withWriteDefault(Literal.of(1)) - .build(), - Types.NestedField.builder() - .withId(4) - .asOptional() - .withName("description") - .ofType(Types.StringType.get()) - .withDoc("column for descriptions") - .withWriteDefault(Literal.of("not important")) - .build(), - Types.NestedField.builder() - .withId(5) - .asOptional() - .withName("bool_column") - .ofType(Types.BooleanType.get()) - .withDoc("column for bool") - .withWriteDefault(Literal.of(false)) - .build(), - Types.NestedField.builder() - .withId(6) - .asOptional() - .withName("float_column") - .ofType(Types.FloatType.get()) - .withDoc("column for float") - .withWriteDefault(Literal.of(1.0f)) - .build(), - Types.NestedField.builder() - .withId(7) - .asOptional() - .withName("double_column") - .ofType(Types.DoubleType.get()) - .withDoc("column for double") - .withWriteDefault(Literal.of(1.0d)) - .build(), - Types.NestedField.builder() - .withId(8) - .asOptional() - .withName("decimal_column") - .ofType(Types.DecimalType.of(10, 2)) - .withDoc("column for decimal") - .withWriteDefault(Literal.of(new BigDecimal("1.00"))) - .build()), + Types.NestedField.of( + 1, false, "id", Types.LongType.get(), "column for id"), + Types.NestedField.of( + 2, + false, + "name", + Types.StringType.get(), + "column for name"), + Types.NestedField.of( + 3, + true, + "tinyIntCol", + Types.IntegerType.get(), + "column for tinyIntCol"), + Types.NestedField.of( + 4, + true, + "description", + Types.StringType.get(), + "column for descriptions"), + Types.NestedField.of( + 5, + true, + "bool_column", + Types.BooleanType.get(), + "column for bool"), + Types.NestedField.of( + 6, + true, + "float_column", + Types.FloatType.get(), + "column for float"), + Types.NestedField.of( + 7, + true, + "double_column", + Types.DoubleType.get(), + "column for double"), + Types.NestedField.of( + 8, + true, + "decimal_column", + Types.DecimalType.of(10, 2), + "column for decimal")), new HashSet<>(Collections.singletonList(1))); assertThat(table.schema().sameSchema(schema)).isTrue(); - // Verify that schema without default values is NOT the same. - org.apache.iceberg.Schema schemaWithoutDefaults = - new org.apache.iceberg.Schema( - 0, - Arrays.asList( - Types.NestedField.builder() - .withId(1) - .asRequired() - .withName("id") - .ofType(Types.LongType.get()) - .withDoc("column for id") - .build(), - Types.NestedField.builder() - .withId(2) - .asRequired() - .withName("name") - .ofType(Types.StringType.get()) - .withDoc("column for name") - .build(), - Types.NestedField.builder() - .withId(3) - .asOptional() - .withName("tinyIntCol") - .ofType(Types.IntegerType.get()) - .withDoc("column for tinyIntCol") - .build(), - Types.NestedField.builder() - .withId(4) - .asOptional() - .withName("description") - .ofType(Types.StringType.get()) - .withDoc("column for descriptions") - .build(), - Types.NestedField.builder() - .withId(5) - .asOptional() - .withName("bool_column") - .ofType(Types.BooleanType.get()) - .withDoc("column for bool") - .build(), - Types.NestedField.builder() - .withId(6) - .asOptional() - .withName("float_column") - .ofType(Types.FloatType.get()) - .withDoc("column for float") - .build(), - Types.NestedField.builder() - .withId(7) - .asOptional() - .withName("double_column") - .ofType(Types.DoubleType.get()) - .withDoc("column for double") - .build(), - Types.NestedField.builder() - .withId(8) - .asOptional() - .withName("decimal_column") - .ofType(Types.DecimalType.of(10, 2)) - .withDoc("column for decimal") - .build()), - new HashSet<>(Collections.singletonList(1))); - assertThat(table.schema().sameSchema(schemaWithoutDefaults)).isFalse(); - - // Add column with default value. + // Add column. AddColumnEvent addColumnEvent = new AddColumnEvent( tableId, @@ -290,84 +207,63 @@ public void runSchemaChangeTest(Map catalogOptions) { "newIntColumn", DataTypes.INT(), "comment for newIntColumn", - "42")))); + "not important")))); icebergMetadataApplier.applySchemaChange(addColumnEvent); table = catalog.loadTable(TableIdentifier.parse(defaultTableId)); schema = new org.apache.iceberg.Schema( 0, Arrays.asList( - Types.NestedField.builder() - .withId(1) - .asRequired() - .withName("id") - .ofType(Types.LongType.get()) - .withDoc("column for id") - .build(), - Types.NestedField.builder() - .withId(2) - .asRequired() - .withName("name") - .ofType(Types.StringType.get()) - .withDoc("column for name") - .withWriteDefault(Literal.of("John Smith")) - .build(), - Types.NestedField.builder() - .withId(3) - .asOptional() - .withName("tinyIntCol") - .ofType(Types.IntegerType.get()) - .withDoc("column for tinyIntCol") - .withWriteDefault(Literal.of(1)) - .build(), - Types.NestedField.builder() - .withId(4) - .asOptional() - .withName("description") - .ofType(Types.StringType.get()) - .withDoc("column for descriptions") - .withWriteDefault(Literal.of("not important")) - .build(), - Types.NestedField.builder() - .withId(5) - .asOptional() - .withName("bool_column") - .ofType(Types.BooleanType.get()) - .withDoc("column for bool") - .withWriteDefault(Literal.of(false)) - .build(), - Types.NestedField.builder() - .withId(6) - .asOptional() - .withName("float_column") - .ofType(Types.FloatType.get()) - .withDoc("column for float") - .withWriteDefault(Literal.of(1.0f)) - .build(), - Types.NestedField.builder() - .withId(7) - .asOptional() - .withName("double_column") - .ofType(Types.DoubleType.get()) - .withDoc("column for double") - .withWriteDefault(Literal.of(1.0d)) - .build(), - Types.NestedField.builder() - .withId(8) - .asOptional() - .withName("decimal_column") - .ofType(Types.DecimalType.of(10, 2)) - .withDoc("column for decimal") - .withWriteDefault(Literal.of(new BigDecimal("1.00"))) - .build(), - Types.NestedField.builder() - .withId(9) - .asOptional() - .withName("newIntColumn") - .ofType(Types.IntegerType.get()) - .withDoc("comment for newIntColumn") - .withWriteDefault(Literal.of(42)) - .build()), + Types.NestedField.of( + 1, false, "id", Types.LongType.get(), "column for id"), + Types.NestedField.of( + 2, + false, + "name", + Types.StringType.get(), + "column for name"), + Types.NestedField.of( + 3, + true, + "tinyIntCol", + Types.IntegerType.get(), + "column for tinyIntCol"), + Types.NestedField.of( + 4, + true, + "description", + Types.StringType.get(), + "column for descriptions"), + Types.NestedField.of( + 5, + true, + "bool_column", + Types.BooleanType.get(), + "column for bool"), + Types.NestedField.of( + 6, + true, + "float_column", + Types.FloatType.get(), + "column for float"), + Types.NestedField.of( + 7, + true, + "double_column", + Types.DoubleType.get(), + "column for double"), + Types.NestedField.of( + 8, + true, + "decimal_column", + Types.DecimalType.of(10, 2), + "column for decimal"), + Types.NestedField.of( + 9, + true, + "newIntColumn", + Types.IntegerType.get(), + "comment for newIntColumn")), new HashSet<>(Collections.singletonList(1))); assertThat(table.schema().sameSchema(schema)).isTrue(); @@ -380,69 +276,50 @@ public void runSchemaChangeTest(Map catalogOptions) { new org.apache.iceberg.Schema( 0, Arrays.asList( - Types.NestedField.builder() - .withId(1) - .asRequired() - .withName("id") - .ofType(Types.LongType.get()) - .withDoc("column for id") - .build(), - Types.NestedField.builder() - .withId(2) - .asRequired() - .withName("name") - .ofType(Types.StringType.get()) - .withDoc("column for name") - .withWriteDefault(Literal.of("John Smith")) - .build(), - Types.NestedField.builder() - .withId(3) - .asOptional() - .withName("tinyIntCol") - .ofType(Types.IntegerType.get()) - .withDoc("column for tinyIntCol") - .withWriteDefault(Literal.of(1)) - .build(), - Types.NestedField.builder() - .withId(5) - .asOptional() - .withName("bool_column") - .ofType(Types.BooleanType.get()) - .withDoc("column for bool") - .withWriteDefault(Literal.of(false)) - .build(), - Types.NestedField.builder() - .withId(6) - .asOptional() - .withName("float_column") - .ofType(Types.FloatType.get()) - .withDoc("column for float") - .withWriteDefault(Literal.of(1.0f)) - .build(), - Types.NestedField.builder() - .withId(7) - .asOptional() - .withName("double_column") - .ofType(Types.DoubleType.get()) - .withDoc("column for double") - .withWriteDefault(Literal.of(1.0d)) - .build(), - Types.NestedField.builder() - .withId(8) - .asOptional() - .withName("decimal_column") - .ofType(Types.DecimalType.of(10, 2)) - .withDoc("column for decimal") - .withWriteDefault(Literal.of(new BigDecimal("1.00"))) - .build(), - Types.NestedField.builder() - .withId(9) - .asOptional() - .withName("newIntColumn") - .ofType(Types.IntegerType.get()) - .withDoc("comment for newIntColumn") - .withWriteDefault(Literal.of(42)) - .build()), + Types.NestedField.of( + 1, false, "id", Types.LongType.get(), "column for id"), + Types.NestedField.of( + 2, + false, + "name", + Types.StringType.get(), + "column for name"), + Types.NestedField.of( + 3, + true, + "tinyIntCol", + Types.IntegerType.get(), + "column for tinyIntCol"), + Types.NestedField.of( + 5, + true, + "bool_column", + Types.BooleanType.get(), + "column for bool"), + Types.NestedField.of( + 6, + true, + "float_column", + Types.FloatType.get(), + "column for float"), + Types.NestedField.of( + 7, + true, + "double_column", + Types.DoubleType.get(), + "column for double"), + Types.NestedField.of( + 8, + true, + "decimal_column", + Types.DecimalType.of(10, 2), + "column for decimal"), + Types.NestedField.of( + 9, + true, + "newIntColumn", + Types.IntegerType.get(), + "comment for newIntColumn")), new HashSet<>(Collections.singletonList(1))); assertThat(table.schema().sameSchema(schema)).isTrue(); @@ -452,6 +329,188 @@ public void runSchemaChangeTest(Map catalogOptions) { icebergMetadataApplier.applySchemaChange(renameColumnEvent); table = catalog.loadTable(TableIdentifier.parse(defaultTableId)); schema = + new org.apache.iceberg.Schema( + 0, + Arrays.asList( + Types.NestedField.of( + 1, false, "id", Types.LongType.get(), "column for id"), + Types.NestedField.of( + 2, + false, + "name", + Types.StringType.get(), + "column for name"), + Types.NestedField.of( + 3, + true, + "tinyIntCol", + Types.IntegerType.get(), + "column for tinyIntCol"), + Types.NestedField.of( + 5, + true, + "bool_column", + Types.BooleanType.get(), + "column for bool"), + Types.NestedField.of( + 6, + true, + "float_column", + Types.FloatType.get(), + "column for float"), + Types.NestedField.of( + 7, + true, + "double_column", + Types.DoubleType.get(), + "column for double"), + Types.NestedField.of( + 8, + true, + "decimal_column", + Types.DecimalType.of(10, 2), + "column for decimal"), + Types.NestedField.of( + 9, + true, + "renamedIntColumn", + Types.IntegerType.get(), + "comment for newIntColumn")), + new HashSet<>(Collections.singletonList(1))); + assertThat(table.schema().sameSchema(schema)).isTrue(); + + // Alter Column Type. + AlterColumnTypeEvent alterColumnTypeEvent = + new AlterColumnTypeEvent( + tableId, ImmutableMap.of("renamedIntColumn", DataTypes.BIGINT())); + icebergMetadataApplier.applySchemaChange(alterColumnTypeEvent); + table = catalog.loadTable(TableIdentifier.parse(defaultTableId)); + schema = + new org.apache.iceberg.Schema( + 0, + Arrays.asList( + Types.NestedField.of( + 1, false, "id", Types.LongType.get(), "column for id"), + Types.NestedField.of( + 2, + false, + "name", + Types.StringType.get(), + "column for name"), + Types.NestedField.of( + 3, + true, + "tinyIntCol", + Types.IntegerType.get(), + "column for tinyIntCol"), + Types.NestedField.of( + 5, + true, + "bool_column", + Types.BooleanType.get(), + "column for bool"), + Types.NestedField.of( + 6, + true, + "float_column", + Types.FloatType.get(), + "column for float"), + Types.NestedField.of( + 7, + true, + "double_column", + Types.DoubleType.get(), + "column for double"), + Types.NestedField.of( + 8, + true, + "decimal_column", + Types.DecimalType.of(10, 2), + "column for decimal"), + Types.NestedField.of( + 9, + true, + "renamedIntColumn", + Types.LongType.get(), + "comment for newIntColumn")), + new HashSet<>(Collections.singletonList(1))); + assertThat(table.schema().sameSchema(schema)).isTrue(); + } + + @Test + public void testApplySchemaChangeWithDefaultValuesFormatV3() { + Map catalogOptions = new HashMap<>(); + String warehouse = + new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString(); + catalogOptions.put("type", "hadoop"); + catalogOptions.put("warehouse", warehouse); + catalogOptions.put("cache-enabled", "false"); + + Map tableOptions = new HashMap<>(); + tableOptions.put("format-version", "3"); + + Catalog catalog = + CatalogUtil.buildIcebergCatalog( + "cdc-iceberg-catalog", catalogOptions, new Configuration()); + + IcebergMetadataApplier icebergMetadataApplier = + new IcebergMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + String defaultTableId = "test.iceberg_table"; + TableId tableId = TableId.parse(defaultTableId); + + // Create Table with default values. + CreateTableEvent createTableEvent = + new CreateTableEvent( + tableId, + Schema.newBuilder() + .physicalColumn( + "id", + DataTypes.BIGINT().notNull(), + "column for id", + "AUTO_DECREMENT()") + .physicalColumn( + "name", + DataTypes.VARCHAR(255).notNull(), + "column for name", + "John Smith") + .physicalColumn( + "tinyIntCol", + DataTypes.TINYINT(), + "column for tinyIntCol", + "1") + .physicalColumn( + "description", + DataTypes.STRING(), + "column for descriptions", + "not important") + .physicalColumn( + "bool_column", + DataTypes.BOOLEAN(), + "column for bool", + "false") + .physicalColumn( + "float_column", + DataTypes.FLOAT(), + "column for float", + "1.0") + .physicalColumn( + "double_column", + DataTypes.DOUBLE(), + "column for double", + "1.0") + .physicalColumn( + "decimal_column", + DataTypes.DECIMAL(10, 2), + "column for decimal", + "1.0") + .primaryKey("id") + .partitionKey("id", "name") + .build()); + icebergMetadataApplier.applySchemaChange(createTableEvent); + Table table = catalog.loadTable(TableIdentifier.parse(defaultTableId)); + + // Verify default values are applied on format v3. + org.apache.iceberg.Schema schemaWithDefaults = new org.apache.iceberg.Schema( 0, Arrays.asList( @@ -478,6 +537,14 @@ public void runSchemaChangeTest(Map catalogOptions) { .withDoc("column for tinyIntCol") .withWriteDefault(Literal.of(1)) .build(), + Types.NestedField.builder() + .withId(4) + .asOptional() + .withName("description") + .ofType(Types.StringType.get()) + .withDoc("column for descriptions") + .withWriteDefault(Literal.of("not important")) + .build(), Types.NestedField.builder() .withId(5) .asOptional() @@ -509,25 +576,76 @@ public void runSchemaChangeTest(Map catalogOptions) { .ofType(Types.DecimalType.of(10, 2)) .withDoc("column for decimal") .withWriteDefault(Literal.of(new BigDecimal("1.00"))) - .build(), - Types.NestedField.builder() - .withId(9) - .asOptional() - .withName("renamedIntColumn") - .ofType(Types.IntegerType.get()) - .withDoc("comment for newIntColumn") - .withWriteDefault(Literal.of(42)) .build()), new HashSet<>(Collections.singletonList(1))); - assertThat(table.schema().sameSchema(schema)).isTrue(); + assertThat(table.schema().sameSchema(schemaWithDefaults)).isTrue(); - // Alter Column Type. - AlterColumnTypeEvent alterColumnTypeEvent = - new AlterColumnTypeEvent( - tableId, ImmutableMap.of("renamedIntColumn", DataTypes.BIGINT())); - icebergMetadataApplier.applySchemaChange(alterColumnTypeEvent); + // Verify that schema without default values is NOT the same. + org.apache.iceberg.Schema schemaWithoutDefaults = + new org.apache.iceberg.Schema( + 0, + Arrays.asList( + Types.NestedField.of( + 1, false, "id", Types.LongType.get(), "column for id"), + Types.NestedField.of( + 2, + false, + "name", + Types.StringType.get(), + "column for name"), + Types.NestedField.of( + 3, + true, + "tinyIntCol", + Types.IntegerType.get(), + "column for tinyIntCol"), + Types.NestedField.of( + 4, + true, + "description", + Types.StringType.get(), + "column for descriptions"), + Types.NestedField.of( + 5, + true, + "bool_column", + Types.BooleanType.get(), + "column for bool"), + Types.NestedField.of( + 6, + true, + "float_column", + Types.FloatType.get(), + "column for float"), + Types.NestedField.of( + 7, + true, + "double_column", + Types.DoubleType.get(), + "column for double"), + Types.NestedField.of( + 8, + true, + "decimal_column", + Types.DecimalType.of(10, 2), + "column for decimal")), + new HashSet<>(Collections.singletonList(1))); + assertThat(table.schema().sameSchema(schemaWithoutDefaults)).isFalse(); + + // Add column with default value on format v3. + AddColumnEvent addColumnEvent = + new AddColumnEvent( + tableId, + Collections.singletonList( + AddColumnEvent.last( + new PhysicalColumn( + "newIntColumn", + DataTypes.INT(), + "comment for newIntColumn", + "42")))); + icebergMetadataApplier.applySchemaChange(addColumnEvent); table = catalog.loadTable(TableIdentifier.parse(defaultTableId)); - schema = + org.apache.iceberg.Schema schemaAfterAddColumn = new org.apache.iceberg.Schema( 0, Arrays.asList( @@ -554,6 +672,14 @@ public void runSchemaChangeTest(Map catalogOptions) { .withDoc("column for tinyIntCol") .withWriteDefault(Literal.of(1)) .build(), + Types.NestedField.builder() + .withId(4) + .asOptional() + .withName("description") + .ofType(Types.StringType.get()) + .withDoc("column for descriptions") + .withWriteDefault(Literal.of("not important")) + .build(), Types.NestedField.builder() .withId(5) .asOptional() @@ -589,12 +715,13 @@ public void runSchemaChangeTest(Map catalogOptions) { Types.NestedField.builder() .withId(9) .asOptional() - .withName("renamedIntColumn") - .ofType(Types.LongType.get()) + .withName("newIntColumn") + .ofType(Types.IntegerType.get()) .withDoc("comment for newIntColumn") - .withWriteDefault(Literal.of(42L)) + .withInitialDefault(Literal.of(42)) + .withWriteDefault(Literal.of(42)) .build()), new HashSet<>(Collections.singletonList(1))); - assertThat(table.schema().sameSchema(schema)).isTrue(); + assertThat(table.schema().sameSchema(schemaAfterAddColumn)).isTrue(); } }