Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.HasTableOperations;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
Expand All @@ -46,6 +47,7 @@
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -178,7 +180,12 @@ private void applyCreateTable(CreateTableEvent event) {
}
PartitionSpec partitionSpec = generatePartitionSpec(icebergSchema, partitionColumns);
if (!catalog.tableExists(tableIdentifier)) {
catalog.createTable(tableIdentifier, icebergSchema, partitionSpec, tableOptions);
Table table =
catalog.createTable(
tableIdentifier, icebergSchema, partitionSpec, tableOptions);

applyDefaultValues(table, cdcSchema);

LOG.info(
"Spend {} ms to create iceberg table {}",
System.currentTimeMillis() - startTimestamp,
Expand All @@ -189,6 +196,28 @@ private void applyCreateTable(CreateTableEvent event) {
}
}

private void applyDefaultValues(
Table table, org.apache.flink.cdc.common.schema.Schema cdcSchema) {
if (getFormatVersion(table) < 3) {
return;
}
UpdateSchema updateSchema = null;
for (Column column : cdcSchema.getColumns()) {
Literal<?> defaultValue =
IcebergTypeUtils.parseDefaultValue(
column.getDefaultValueExpression(), column.getType());
if (defaultValue != null) {
if (updateSchema == null) {
updateSchema = table.updateSchema();
}
updateSchema.updateColumnDefault(column.getName(), defaultValue);
}
}
if (updateSchema != null) {
updateSchema.commit();
}
}

private void applyAddColumn(AddColumnEvent event) {
TableIdentifier tableIdentifier = TableIdentifier.parse(event.tableId().identifier());
try {
Expand All @@ -212,24 +241,32 @@ private void applyAddColumnEventWithPosition(Table table, AddColumnEvent event)
FlinkSchemaUtil.convert(
DataTypeUtils.toFlinkDataType(addColumn.getType())
.getLogicalType());
Literal<?> defaultValue =
IcebergTypeUtils.parseDefaultValue(
addColumn.getDefaultValueExpression(), addColumn.getType());
if (defaultValue != null && getFormatVersion(table) >= 3) {
updateSchema.addColumn(columnName, icebergType, columnComment, defaultValue);
updateSchema.updateColumnDefault(columnName, defaultValue);
} else {
updateSchema.addColumn(columnName, icebergType, columnComment);
}
switch (columnWithPosition.getPosition()) {
case FIRST:
updateSchema.addColumn(columnName, icebergType, columnComment);
table.updateSchema().moveFirst(columnName);
updateSchema.moveFirst(columnName);
Comment thread
lvyanquan marked this conversation as resolved.
break;
case LAST:
updateSchema.addColumn(columnName, icebergType, columnComment);
break;
case BEFORE:
Comment thread
suhwan-cheon marked this conversation as resolved.
updateSchema.addColumn(columnName, icebergType, columnComment);
checkNotNull(
columnWithPosition.getExistedColumnName(),
"Existing column name must be provided for BEFORE position");
updateSchema.moveBefore(
columnName, columnWithPosition.getExistedColumnName());
break;
case AFTER:
checkNotNull(
columnWithPosition.getExistedColumnName(),
"Existing column name must be provided for AFTER position");
updateSchema.addColumn(columnName, icebergType, columnComment);
updateSchema.moveAfter(
columnName, columnWithPosition.getExistedColumnName());
break;
Expand Down Expand Up @@ -364,6 +401,13 @@ public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
SchemaChangeEventType.ALTER_COLUMN_TYPE);
}

private int getFormatVersion(Table table) {
if (table instanceof HasTableOperations) {
return ((HasTableOperations) table).operations().current().formatVersion();
}
return 2;
}

@Override
public void close() {
catalog = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,13 @@
import org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSink;
import org.apache.flink.table.data.TimestampData;

import org.apache.iceberg.expressions.Literal;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.time.ZoneId;
import java.util.ArrayList;
Expand All @@ -41,6 +46,8 @@
/** Util class for types in {@link IcebergDataSink}. */
public class IcebergTypeUtils {

private static final Logger LOG = LoggerFactory.getLogger(IcebergTypeUtils.class);

/** Convert column from Flink CDC to Iceberg format. */
public static Types.NestedField convertCdcColumnToIcebergField(
int index, PhysicalColumn column) {
Expand All @@ -53,6 +60,66 @@ public static Types.NestedField convertCdcColumnToIcebergField(
column.getComment());
}

/**
* Parse a CDC default value expression string into an Iceberg {@link Literal}.
*
* @return the parsed Literal, or null if the expression is null or cannot be parsed for the
* given type.
*/
@Nullable
public static Literal<?> parseDefaultValue(
@Nullable String defaultValueExpression, DataType cdcType) {
if (defaultValueExpression == null) {
return null;
}
try {
switch (cdcType.getTypeRoot()) {
Comment thread
lvyanquan marked this conversation as resolved.
case CHAR:
case VARCHAR:
return Literal.of(defaultValueExpression);
case BOOLEAN:
Comment thread
lvyanquan marked this conversation as resolved.
if ("true".equalsIgnoreCase(defaultValueExpression)) {
return Literal.of(true);
} else if ("false".equalsIgnoreCase(defaultValueExpression)) {
return Literal.of(false);
} else {
LOG.warn(
"Invalid boolean default value '{}', skipping default value.",
defaultValueExpression);
return null;
}
case TINYINT:
case SMALLINT:
case INTEGER:
return Literal.of(Integer.parseInt(defaultValueExpression));
case BIGINT:
return Literal.of(Long.parseLong(defaultValueExpression));
case FLOAT:
return Literal.of(Float.parseFloat(defaultValueExpression));
case DOUBLE:
return Literal.of(Double.parseDouble(defaultValueExpression));
case DECIMAL:
int scale = DataTypes.getScale(cdcType).orElse(0);
return Literal.of(
new java.math.BigDecimal(defaultValueExpression)
.setScale(scale, java.math.RoundingMode.HALF_UP));
default:
Comment thread
lvyanquan marked this conversation as resolved.
LOG.debug(
"Unsupported default value type {} for expression '{}', skipping default value.",
cdcType.getTypeRoot(),
defaultValueExpression);
return null;
Comment thread
suhwan-cheon marked this conversation as resolved.
}
} catch (NumberFormatException e) {
LOG.warn(
"Failed to parse default value '{}' for type {}, skipping default value.",
defaultValueExpression,
cdcType.getTypeRoot(),
e);
return null;
}
}

/**
* Convert data type from Flink CDC to Iceberg format, refer to <a
* href="https://iceberg.apache.org/docs/nightly/flink/#flink-to-iceberg">...</a>.
Expand Down
Loading
Loading