diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java index 24d01ffd0cb..1707e8c6af7 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java @@ -38,6 +38,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.HasTableOperations; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; @@ -46,6 +47,7 @@ import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.flink.FlinkSchemaUtil; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -178,7 +180,12 @@ private void applyCreateTable(CreateTableEvent event) { } PartitionSpec partitionSpec = generatePartitionSpec(icebergSchema, partitionColumns); if (!catalog.tableExists(tableIdentifier)) { - catalog.createTable(tableIdentifier, icebergSchema, partitionSpec, tableOptions); + Table table = + catalog.createTable( + tableIdentifier, icebergSchema, partitionSpec, tableOptions); + + applyDefaultValues(table, cdcSchema); + LOG.info( "Spend {} ms to create iceberg table {}", System.currentTimeMillis() - startTimestamp, @@ -189,6 +196,28 @@ private void applyCreateTable(CreateTableEvent event) { } } + private void applyDefaultValues( + Table table, org.apache.flink.cdc.common.schema.Schema cdcSchema) { + if (getFormatVersion(table) < 3) { + return; + } + UpdateSchema updateSchema = null; + for (Column column : cdcSchema.getColumns()) { + Literal defaultValue = + IcebergTypeUtils.parseDefaultValue( + column.getDefaultValueExpression(), column.getType()); + if (defaultValue != null) { + if (updateSchema == null) { + updateSchema = table.updateSchema(); + } + updateSchema.updateColumnDefault(column.getName(), defaultValue); + } + } + if (updateSchema != null) { + updateSchema.commit(); + } + } + private void applyAddColumn(AddColumnEvent event) { TableIdentifier tableIdentifier = TableIdentifier.parse(event.tableId().identifier()); try { @@ -212,16 +241,25 @@ private void applyAddColumnEventWithPosition(Table table, AddColumnEvent event) FlinkSchemaUtil.convert( DataTypeUtils.toFlinkDataType(addColumn.getType()) .getLogicalType()); + Literal defaultValue = + IcebergTypeUtils.parseDefaultValue( + addColumn.getDefaultValueExpression(), addColumn.getType()); + if (defaultValue != null && getFormatVersion(table) >= 3) { + updateSchema.addColumn(columnName, icebergType, columnComment, defaultValue); + updateSchema.updateColumnDefault(columnName, defaultValue); + } else { + updateSchema.addColumn(columnName, icebergType, columnComment); + } switch (columnWithPosition.getPosition()) { case FIRST: - updateSchema.addColumn(columnName, icebergType, columnComment); - table.updateSchema().moveFirst(columnName); + updateSchema.moveFirst(columnName); break; case LAST: - updateSchema.addColumn(columnName, icebergType, columnComment); break; case BEFORE: - updateSchema.addColumn(columnName, icebergType, columnComment); + checkNotNull( + columnWithPosition.getExistedColumnName(), + "Existing column name must be provided for BEFORE position"); updateSchema.moveBefore( columnName, columnWithPosition.getExistedColumnName()); break; @@ -229,7 +267,6 @@ private void applyAddColumnEventWithPosition(Table table, AddColumnEvent event) checkNotNull( columnWithPosition.getExistedColumnName(), "Existing column name must be provided for AFTER position"); - updateSchema.addColumn(columnName, icebergType, columnComment); updateSchema.moveAfter( columnName, columnWithPosition.getExistedColumnName()); break; @@ -364,6 +401,13 @@ public Set getSupportedSchemaEvolutionTypes() { SchemaChangeEventType.ALTER_COLUMN_TYPE); } + private int getFormatVersion(Table table) { + if (table instanceof HasTableOperations) { + return ((HasTableOperations) table).operations().current().formatVersion(); + } + return 2; + } + @Override public void close() { catalog = null; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java index 102b4dbad26..83678daa68d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java @@ -28,8 +28,13 @@ import org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSink; import org.apache.flink.table.data.TimestampData; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; import java.time.ZoneId; import java.util.ArrayList; @@ -41,6 +46,8 @@ /** Util class for types in {@link IcebergDataSink}. */ public class IcebergTypeUtils { + private static final Logger LOG = LoggerFactory.getLogger(IcebergTypeUtils.class); + /** Convert column from Flink CDC to Iceberg format. */ public static Types.NestedField convertCdcColumnToIcebergField( int index, PhysicalColumn column) { @@ -53,6 +60,66 @@ public static Types.NestedField convertCdcColumnToIcebergField( column.getComment()); } + /** + * Parse a CDC default value expression string into an Iceberg {@link Literal}. + * + * @return the parsed Literal, or null if the expression is null or cannot be parsed for the + * given type. + */ + @Nullable + public static Literal parseDefaultValue( + @Nullable String defaultValueExpression, DataType cdcType) { + if (defaultValueExpression == null) { + return null; + } + try { + switch (cdcType.getTypeRoot()) { + case CHAR: + case VARCHAR: + return Literal.of(defaultValueExpression); + case BOOLEAN: + if ("true".equalsIgnoreCase(defaultValueExpression)) { + return Literal.of(true); + } else if ("false".equalsIgnoreCase(defaultValueExpression)) { + return Literal.of(false); + } else { + LOG.warn( + "Invalid boolean default value '{}', skipping default value.", + defaultValueExpression); + return null; + } + case TINYINT: + case SMALLINT: + case INTEGER: + return Literal.of(Integer.parseInt(defaultValueExpression)); + case BIGINT: + return Literal.of(Long.parseLong(defaultValueExpression)); + case FLOAT: + return Literal.of(Float.parseFloat(defaultValueExpression)); + case DOUBLE: + return Literal.of(Double.parseDouble(defaultValueExpression)); + case DECIMAL: + int scale = DataTypes.getScale(cdcType).orElse(0); + return Literal.of( + new java.math.BigDecimal(defaultValueExpression) + .setScale(scale, java.math.RoundingMode.HALF_UP)); + default: + LOG.debug( + "Unsupported default value type {} for expression '{}', skipping default value.", + cdcType.getTypeRoot(), + defaultValueExpression); + return null; + } + } catch (NumberFormatException e) { + LOG.warn( + "Failed to parse default value '{}' for type {}, skipping default value.", + defaultValueExpression, + cdcType.getTypeRoot(), + e); + return null; + } + } + /** * Convert data type from Flink CDC to Iceberg format, refer to .... diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java index 6a491e21979..726a303b49d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java @@ -35,12 +35,14 @@ import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.expressions.Literal; import org.apache.iceberg.types.Types; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.testcontainers.lifecycle.Startables; import java.io.File; +import java.math.BigDecimal; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -434,4 +436,292 @@ public void runSchemaChangeTest(Map catalogOptions) { new HashSet<>(Collections.singletonList(1))); assertThat(table.schema().sameSchema(schema)).isTrue(); } + + @Test + public void testApplySchemaChangeWithDefaultValuesFormatV3() { + Map catalogOptions = new HashMap<>(); + String warehouse = + new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString(); + catalogOptions.put("type", "hadoop"); + catalogOptions.put("warehouse", warehouse); + catalogOptions.put("cache-enabled", "false"); + + Map tableOptions = new HashMap<>(); + tableOptions.put("format-version", "3"); + + Catalog catalog = + CatalogUtil.buildIcebergCatalog( + "cdc-iceberg-catalog", catalogOptions, new Configuration()); + + IcebergMetadataApplier icebergMetadataApplier = + new IcebergMetadataApplier(catalogOptions, tableOptions, new HashMap<>()); + String defaultTableId = "test.iceberg_table"; + TableId tableId = TableId.parse(defaultTableId); + + // Create Table with default values. + CreateTableEvent createTableEvent = + new CreateTableEvent( + tableId, + Schema.newBuilder() + .physicalColumn( + "id", + DataTypes.BIGINT().notNull(), + "column for id", + "AUTO_DECREMENT()") + .physicalColumn( + "name", + DataTypes.VARCHAR(255).notNull(), + "column for name", + "John Smith") + .physicalColumn( + "tinyIntCol", + DataTypes.TINYINT(), + "column for tinyIntCol", + "1") + .physicalColumn( + "description", + DataTypes.STRING(), + "column for descriptions", + "not important") + .physicalColumn( + "bool_column", + DataTypes.BOOLEAN(), + "column for bool", + "false") + .physicalColumn( + "float_column", + DataTypes.FLOAT(), + "column for float", + "1.0") + .physicalColumn( + "double_column", + DataTypes.DOUBLE(), + "column for double", + "1.0") + .physicalColumn( + "decimal_column", + DataTypes.DECIMAL(10, 2), + "column for decimal", + "1.0") + .primaryKey("id") + .partitionKey("id", "name") + .build()); + icebergMetadataApplier.applySchemaChange(createTableEvent); + Table table = catalog.loadTable(TableIdentifier.parse(defaultTableId)); + + // Verify default values are applied on format v3. + org.apache.iceberg.Schema schemaWithDefaults = + new org.apache.iceberg.Schema( + 0, + Arrays.asList( + Types.NestedField.builder() + .withId(1) + .asRequired() + .withName("id") + .ofType(Types.LongType.get()) + .withDoc("column for id") + .build(), + Types.NestedField.builder() + .withId(2) + .asRequired() + .withName("name") + .ofType(Types.StringType.get()) + .withDoc("column for name") + .withWriteDefault(Literal.of("John Smith")) + .build(), + Types.NestedField.builder() + .withId(3) + .asOptional() + .withName("tinyIntCol") + .ofType(Types.IntegerType.get()) + .withDoc("column for tinyIntCol") + .withWriteDefault(Literal.of(1)) + .build(), + Types.NestedField.builder() + .withId(4) + .asOptional() + .withName("description") + .ofType(Types.StringType.get()) + .withDoc("column for descriptions") + .withWriteDefault(Literal.of("not important")) + .build(), + Types.NestedField.builder() + .withId(5) + .asOptional() + .withName("bool_column") + .ofType(Types.BooleanType.get()) + .withDoc("column for bool") + .withWriteDefault(Literal.of(false)) + .build(), + Types.NestedField.builder() + .withId(6) + .asOptional() + .withName("float_column") + .ofType(Types.FloatType.get()) + .withDoc("column for float") + .withWriteDefault(Literal.of(1.0f)) + .build(), + Types.NestedField.builder() + .withId(7) + .asOptional() + .withName("double_column") + .ofType(Types.DoubleType.get()) + .withDoc("column for double") + .withWriteDefault(Literal.of(1.0d)) + .build(), + Types.NestedField.builder() + .withId(8) + .asOptional() + .withName("decimal_column") + .ofType(Types.DecimalType.of(10, 2)) + .withDoc("column for decimal") + .withWriteDefault(Literal.of(new BigDecimal("1.00"))) + .build()), + new HashSet<>(Collections.singletonList(1))); + assertThat(table.schema().sameSchema(schemaWithDefaults)).isTrue(); + + // Verify that schema without default values is NOT the same. + org.apache.iceberg.Schema schemaWithoutDefaults = + new org.apache.iceberg.Schema( + 0, + Arrays.asList( + Types.NestedField.of( + 1, false, "id", Types.LongType.get(), "column for id"), + Types.NestedField.of( + 2, + false, + "name", + Types.StringType.get(), + "column for name"), + Types.NestedField.of( + 3, + true, + "tinyIntCol", + Types.IntegerType.get(), + "column for tinyIntCol"), + Types.NestedField.of( + 4, + true, + "description", + Types.StringType.get(), + "column for descriptions"), + Types.NestedField.of( + 5, + true, + "bool_column", + Types.BooleanType.get(), + "column for bool"), + Types.NestedField.of( + 6, + true, + "float_column", + Types.FloatType.get(), + "column for float"), + Types.NestedField.of( + 7, + true, + "double_column", + Types.DoubleType.get(), + "column for double"), + Types.NestedField.of( + 8, + true, + "decimal_column", + Types.DecimalType.of(10, 2), + "column for decimal")), + new HashSet<>(Collections.singletonList(1))); + assertThat(table.schema().sameSchema(schemaWithoutDefaults)).isFalse(); + + // Add column with default value on format v3. + AddColumnEvent addColumnEvent = + new AddColumnEvent( + tableId, + Collections.singletonList( + AddColumnEvent.last( + new PhysicalColumn( + "newIntColumn", + DataTypes.INT(), + "comment for newIntColumn", + "42")))); + icebergMetadataApplier.applySchemaChange(addColumnEvent); + table = catalog.loadTable(TableIdentifier.parse(defaultTableId)); + org.apache.iceberg.Schema schemaAfterAddColumn = + new org.apache.iceberg.Schema( + 0, + Arrays.asList( + Types.NestedField.builder() + .withId(1) + .asRequired() + .withName("id") + .ofType(Types.LongType.get()) + .withDoc("column for id") + .build(), + Types.NestedField.builder() + .withId(2) + .asRequired() + .withName("name") + .ofType(Types.StringType.get()) + .withDoc("column for name") + .withWriteDefault(Literal.of("John Smith")) + .build(), + Types.NestedField.builder() + .withId(3) + .asOptional() + .withName("tinyIntCol") + .ofType(Types.IntegerType.get()) + .withDoc("column for tinyIntCol") + .withWriteDefault(Literal.of(1)) + .build(), + Types.NestedField.builder() + .withId(4) + .asOptional() + .withName("description") + .ofType(Types.StringType.get()) + .withDoc("column for descriptions") + .withWriteDefault(Literal.of("not important")) + .build(), + Types.NestedField.builder() + .withId(5) + .asOptional() + .withName("bool_column") + .ofType(Types.BooleanType.get()) + .withDoc("column for bool") + .withWriteDefault(Literal.of(false)) + .build(), + Types.NestedField.builder() + .withId(6) + .asOptional() + .withName("float_column") + .ofType(Types.FloatType.get()) + .withDoc("column for float") + .withWriteDefault(Literal.of(1.0f)) + .build(), + Types.NestedField.builder() + .withId(7) + .asOptional() + .withName("double_column") + .ofType(Types.DoubleType.get()) + .withDoc("column for double") + .withWriteDefault(Literal.of(1.0d)) + .build(), + Types.NestedField.builder() + .withId(8) + .asOptional() + .withName("decimal_column") + .ofType(Types.DecimalType.of(10, 2)) + .withDoc("column for decimal") + .withWriteDefault(Literal.of(new BigDecimal("1.00"))) + .build(), + Types.NestedField.builder() + .withId(9) + .asOptional() + .withName("newIntColumn") + .ofType(Types.IntegerType.get()) + .withDoc("comment for newIntColumn") + .withInitialDefault(Literal.of(42)) + .withWriteDefault(Literal.of(42)) + .build()), + new HashSet<>(Collections.singletonList(1))); + assertThat(table.schema().sameSchema(schemaAfterAddColumn)).isTrue(); + } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtilsTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtilsTest.java new file mode 100644 index 00000000000..faafcc5a51c --- /dev/null +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtilsTest.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cdc.connectors.iceberg.sink.utils; + +import org.apache.flink.cdc.common.types.DataTypes; + +import org.apache.iceberg.expressions.Literal; +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link IcebergTypeUtils}. */ +public class IcebergTypeUtilsTest { + + @Test + public void testParseDefaultValueNull() { + assertThat(IcebergTypeUtils.parseDefaultValue(null, DataTypes.STRING())).isNull(); + } + + @Test + public void testParseDefaultValueString() { + Literal result = IcebergTypeUtils.parseDefaultValue("hello", DataTypes.STRING()); + assertThat(result).isNotNull(); + assertThat(result.value().toString()).isEqualTo("hello"); + } + + @Test + public void testParseDefaultValueVarchar() { + Literal result = + IcebergTypeUtils.parseDefaultValue("John Smith", DataTypes.VARCHAR(255)); + assertThat(result).isNotNull(); + assertThat(result.value().toString()).isEqualTo("John Smith"); + } + + @Test + public void testParseDefaultValueBoolean() { + Literal result = IcebergTypeUtils.parseDefaultValue("true", DataTypes.BOOLEAN()); + assertThat(result).isNotNull(); + assertThat(result.value()).isEqualTo(true); + + result = IcebergTypeUtils.parseDefaultValue("false", DataTypes.BOOLEAN()); + assertThat(result).isNotNull(); + assertThat(result.value()).isEqualTo(false); + } + + @Test + public void testParseDefaultValueBooleanInvalid() { + // Non-canonical boolean values should return null + assertThat(IcebergTypeUtils.parseDefaultValue("1", DataTypes.BOOLEAN())).isNull(); + assertThat(IcebergTypeUtils.parseDefaultValue("yes", DataTypes.BOOLEAN())).isNull(); + assertThat(IcebergTypeUtils.parseDefaultValue("garbage", DataTypes.BOOLEAN())).isNull(); + } + + @Test + public void testParseDefaultValueInteger() { + Literal result = IcebergTypeUtils.parseDefaultValue("42", DataTypes.INT()); + assertThat(result).isNotNull(); + assertThat(result.value()).isEqualTo(42); + } + + @Test + public void testParseDefaultValueTinyInt() { + Literal result = IcebergTypeUtils.parseDefaultValue("1", DataTypes.TINYINT()); + assertThat(result).isNotNull(); + assertThat(result.value()).isEqualTo(1); + } + + @Test + public void testParseDefaultValueSmallInt() { + Literal result = IcebergTypeUtils.parseDefaultValue("100", DataTypes.SMALLINT()); + assertThat(result).isNotNull(); + assertThat(result.value()).isEqualTo(100); + } + + @Test + public void testParseDefaultValueBigInt() { + Literal result = IcebergTypeUtils.parseDefaultValue("9999999999", DataTypes.BIGINT()); + assertThat(result).isNotNull(); + assertThat(result.value()).isEqualTo(9999999999L); + } + + @Test + public void testParseDefaultValueFloat() { + Literal result = IcebergTypeUtils.parseDefaultValue("1.5", DataTypes.FLOAT()); + assertThat(result).isNotNull(); + assertThat(result.value()).isEqualTo(1.5f); + } + + @Test + public void testParseDefaultValueDouble() { + Literal result = IcebergTypeUtils.parseDefaultValue("3.14", DataTypes.DOUBLE()); + assertThat(result).isNotNull(); + assertThat(result.value()).isEqualTo(3.14); + } + + @Test + public void testParseDefaultValueDecimal() { + Literal result = IcebergTypeUtils.parseDefaultValue("1.23", DataTypes.DECIMAL(10, 2)); + assertThat(result).isNotNull(); + assertThat(result.value()).isEqualTo(new BigDecimal("1.23")); + } + + @Test + public void testParseDefaultValueUnsupportedType() { + Literal result = IcebergTypeUtils.parseDefaultValue("data", DataTypes.BYTES()); + assertThat(result).isNull(); + } + + @Test + public void testParseDefaultValueInvalidNumber() { + Literal result = IcebergTypeUtils.parseDefaultValue("not_a_number", DataTypes.INT()); + assertThat(result).isNull(); + } + + @Test + public void testParseDefaultValueFunctionExpression() { + // Function expressions like AUTO_DECREMENT() cannot be parsed as numbers + Literal result = + IcebergTypeUtils.parseDefaultValue("AUTO_DECREMENT()", DataTypes.BIGINT()); + assertThat(result).isNull(); + } +}