Skip to content

Commit 91ae677

Browse files
authored
[FLINK-39055][Iceberg] Support default column values in Iceberg sink connector (#4277)
1 parent 570faa7 commit 91ae677

4 files changed

Lines changed: 546 additions & 6 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838

3939
import org.apache.hadoop.conf.Configuration;
4040
import org.apache.iceberg.CatalogUtil;
41+
import org.apache.iceberg.HasTableOperations;
4142
import org.apache.iceberg.PartitionSpec;
4243
import org.apache.iceberg.Schema;
4344
import org.apache.iceberg.Table;
@@ -46,6 +47,7 @@
4647
import org.apache.iceberg.catalog.Namespace;
4748
import org.apache.iceberg.catalog.SupportsNamespaces;
4849
import org.apache.iceberg.catalog.TableIdentifier;
50+
import org.apache.iceberg.expressions.Literal;
4951
import org.apache.iceberg.flink.FlinkSchemaUtil;
5052
import org.apache.iceberg.types.Type;
5153
import org.apache.iceberg.types.Types;
@@ -178,7 +180,12 @@ private void applyCreateTable(CreateTableEvent event) {
178180
}
179181
PartitionSpec partitionSpec = generatePartitionSpec(icebergSchema, partitionColumns);
180182
if (!catalog.tableExists(tableIdentifier)) {
181-
catalog.createTable(tableIdentifier, icebergSchema, partitionSpec, tableOptions);
183+
Table table =
184+
catalog.createTable(
185+
tableIdentifier, icebergSchema, partitionSpec, tableOptions);
186+
187+
applyDefaultValues(table, cdcSchema);
188+
182189
LOG.info(
183190
"Spend {} ms to create iceberg table {}",
184191
System.currentTimeMillis() - startTimestamp,
@@ -189,6 +196,28 @@ private void applyCreateTable(CreateTableEvent event) {
189196
}
190197
}
191198

199+
private void applyDefaultValues(
200+
Table table, org.apache.flink.cdc.common.schema.Schema cdcSchema) {
201+
if (getFormatVersion(table) < 3) {
202+
return;
203+
}
204+
UpdateSchema updateSchema = null;
205+
for (Column column : cdcSchema.getColumns()) {
206+
Literal<?> defaultValue =
207+
IcebergTypeUtils.parseDefaultValue(
208+
column.getDefaultValueExpression(), column.getType());
209+
if (defaultValue != null) {
210+
if (updateSchema == null) {
211+
updateSchema = table.updateSchema();
212+
}
213+
updateSchema.updateColumnDefault(column.getName(), defaultValue);
214+
}
215+
}
216+
if (updateSchema != null) {
217+
updateSchema.commit();
218+
}
219+
}
220+
192221
private void applyAddColumn(AddColumnEvent event) {
193222
TableIdentifier tableIdentifier = TableIdentifier.parse(event.tableId().identifier());
194223
try {
@@ -212,24 +241,32 @@ private void applyAddColumnEventWithPosition(Table table, AddColumnEvent event)
212241
FlinkSchemaUtil.convert(
213242
DataTypeUtils.toFlinkDataType(addColumn.getType())
214243
.getLogicalType());
244+
Literal<?> defaultValue =
245+
IcebergTypeUtils.parseDefaultValue(
246+
addColumn.getDefaultValueExpression(), addColumn.getType());
247+
if (defaultValue != null && getFormatVersion(table) >= 3) {
248+
updateSchema.addColumn(columnName, icebergType, columnComment, defaultValue);
249+
updateSchema.updateColumnDefault(columnName, defaultValue);
250+
} else {
251+
updateSchema.addColumn(columnName, icebergType, columnComment);
252+
}
215253
switch (columnWithPosition.getPosition()) {
216254
case FIRST:
217-
updateSchema.addColumn(columnName, icebergType, columnComment);
218-
table.updateSchema().moveFirst(columnName);
255+
updateSchema.moveFirst(columnName);
219256
break;
220257
case LAST:
221-
updateSchema.addColumn(columnName, icebergType, columnComment);
222258
break;
223259
case BEFORE:
224-
updateSchema.addColumn(columnName, icebergType, columnComment);
260+
checkNotNull(
261+
columnWithPosition.getExistedColumnName(),
262+
"Existing column name must be provided for BEFORE position");
225263
updateSchema.moveBefore(
226264
columnName, columnWithPosition.getExistedColumnName());
227265
break;
228266
case AFTER:
229267
checkNotNull(
230268
columnWithPosition.getExistedColumnName(),
231269
"Existing column name must be provided for AFTER position");
232-
updateSchema.addColumn(columnName, icebergType, columnComment);
233270
updateSchema.moveAfter(
234271
columnName, columnWithPosition.getExistedColumnName());
235272
break;
@@ -364,6 +401,13 @@ public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
364401
SchemaChangeEventType.ALTER_COLUMN_TYPE);
365402
}
366403

404+
private int getFormatVersion(Table table) {
405+
if (table instanceof HasTableOperations) {
406+
return ((HasTableOperations) table).operations().current().formatVersion();
407+
}
408+
return 2;
409+
}
410+
367411
@Override
368412
public void close() {
369413
catalog = null;

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,13 @@
2828
import org.apache.flink.cdc.connectors.iceberg.sink.IcebergDataSink;
2929
import org.apache.flink.table.data.TimestampData;
3030

31+
import org.apache.iceberg.expressions.Literal;
3132
import org.apache.iceberg.types.Type;
3233
import org.apache.iceberg.types.Types;
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
36+
37+
import javax.annotation.Nullable;
3338

3439
import java.time.ZoneId;
3540
import java.util.ArrayList;
@@ -41,6 +46,8 @@
4146
/** Util class for types in {@link IcebergDataSink}. */
4247
public class IcebergTypeUtils {
4348

49+
private static final Logger LOG = LoggerFactory.getLogger(IcebergTypeUtils.class);
50+
4451
/** Convert column from Flink CDC to Iceberg format. */
4552
public static Types.NestedField convertCdcColumnToIcebergField(
4653
int index, PhysicalColumn column) {
@@ -53,6 +60,66 @@ public static Types.NestedField convertCdcColumnToIcebergField(
5360
column.getComment());
5461
}
5562

63+
/**
64+
* Parse a CDC default value expression string into an Iceberg {@link Literal}.
65+
*
66+
* @return the parsed Literal, or null if the expression is null or cannot be parsed for the
67+
* given type.
68+
*/
69+
@Nullable
70+
public static Literal<?> parseDefaultValue(
71+
@Nullable String defaultValueExpression, DataType cdcType) {
72+
if (defaultValueExpression == null) {
73+
return null;
74+
}
75+
try {
76+
switch (cdcType.getTypeRoot()) {
77+
case CHAR:
78+
case VARCHAR:
79+
return Literal.of(defaultValueExpression);
80+
case BOOLEAN:
81+
if ("true".equalsIgnoreCase(defaultValueExpression)) {
82+
return Literal.of(true);
83+
} else if ("false".equalsIgnoreCase(defaultValueExpression)) {
84+
return Literal.of(false);
85+
} else {
86+
LOG.warn(
87+
"Invalid boolean default value '{}', skipping default value.",
88+
defaultValueExpression);
89+
return null;
90+
}
91+
case TINYINT:
92+
case SMALLINT:
93+
case INTEGER:
94+
return Literal.of(Integer.parseInt(defaultValueExpression));
95+
case BIGINT:
96+
return Literal.of(Long.parseLong(defaultValueExpression));
97+
case FLOAT:
98+
return Literal.of(Float.parseFloat(defaultValueExpression));
99+
case DOUBLE:
100+
return Literal.of(Double.parseDouble(defaultValueExpression));
101+
case DECIMAL:
102+
int scale = DataTypes.getScale(cdcType).orElse(0);
103+
return Literal.of(
104+
new java.math.BigDecimal(defaultValueExpression)
105+
.setScale(scale, java.math.RoundingMode.HALF_UP));
106+
default:
107+
LOG.debug(
108+
"Unsupported default value type {} for expression '{}', skipping default value.",
109+
cdcType.getTypeRoot(),
110+
defaultValueExpression);
111+
return null;
112+
}
113+
} catch (NumberFormatException e) {
114+
LOG.warn(
115+
"Failed to parse default value '{}' for type {}, skipping default value.",
116+
defaultValueExpression,
117+
cdcType.getTypeRoot(),
118+
e);
119+
return null;
120+
}
121+
}
122+
56123
/**
57124
* Convert data type from Flink CDC to Iceberg format, refer to <a
58125
* href="https://iceberg.apache.org/docs/nightly/flink/#flink-to-iceberg">...</a>.

0 commit comments

Comments
 (0)