Skip to content

Commit 1b840ed

Browse files
committed
Separate default value parsing from format version check
1 parent 23b679e commit 1b840ed

3 files changed

Lines changed: 69 additions & 32 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java

Lines changed: 17 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838

3939
import org.apache.hadoop.conf.Configuration;
4040
import org.apache.iceberg.CatalogUtil;
41+
import org.apache.iceberg.HasTableOperations;
4142
import org.apache.iceberg.PartitionSpec;
4243
import org.apache.iceberg.Schema;
4344
import org.apache.iceberg.Table;
@@ -179,11 +180,14 @@ private void applyCreateTable(CreateTableEvent event) {
179180
}
180181
PartitionSpec partitionSpec = generatePartitionSpec(icebergSchema, partitionColumns);
181182
if (!catalog.tableExists(tableIdentifier)) {
182-
Table table =
183-
catalog.createTable(
184-
tableIdentifier, icebergSchema, partitionSpec, tableOptions);
185-
186-
applyDefaultValues(table, cdcSchema);
183+
boolean hasDefaults =
184+
columns.stream().anyMatch(field -> field.initialDefault() != null);
185+
Map<String, String> properties = tableOptions;
186+
if (hasDefaults) {
187+
properties = new HashMap<>(tableOptions);
188+
properties.put("format-version", "3");
189+
}
190+
catalog.createTable(tableIdentifier, icebergSchema, partitionSpec, properties);
187191

188192
LOG.info(
189193
"Spend {} ms to create iceberg table {}",
@@ -195,25 +199,6 @@ private void applyCreateTable(CreateTableEvent event) {
195199
}
196200
}
197201

198-
private void applyDefaultValues(
199-
Table table, org.apache.flink.cdc.common.schema.Schema cdcSchema) {
200-
UpdateSchema updateSchema = null;
201-
for (Column column : cdcSchema.getColumns()) {
202-
Literal<?> defaultValue =
203-
IcebergTypeUtils.parseDefaultValue(
204-
column.getDefaultValueExpression(), column.getType());
205-
if (defaultValue != null) {
206-
if (updateSchema == null) {
207-
updateSchema = table.updateSchema();
208-
}
209-
updateSchema.updateColumnDefault(column.getName(), defaultValue);
210-
}
211-
}
212-
if (updateSchema != null) {
213-
updateSchema.commit();
214-
}
215-
}
216-
217202
private void applyAddColumn(AddColumnEvent event) {
218203
TableIdentifier tableIdentifier = TableIdentifier.parse(event.tableId().identifier());
219204
try {
@@ -229,6 +214,10 @@ private void applyAddColumnEventWithPosition(Table table, AddColumnEvent event)
229214

230215
try {
231216
UpdateSchema updateSchema = table.updateSchema();
217+
int formatVersion =
218+
table instanceof HasTableOperations
219+
? ((HasTableOperations) table).operations().current().formatVersion()
220+
: 2;
232221
for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) {
233222
Column addColumn = columnWithPosition.getAddColumn();
234223
String columnName = addColumn.getName();
@@ -237,12 +226,14 @@ private void applyAddColumnEventWithPosition(Table table, AddColumnEvent event)
237226
FlinkSchemaUtil.convert(
238227
DataTypeUtils.toFlinkDataType(addColumn.getType())
239228
.getLogicalType());
240-
updateSchema.addColumn(columnName, icebergType, columnComment);
241229
Literal<?> defaultValue =
242230
IcebergTypeUtils.parseDefaultValue(
243231
addColumn.getDefaultValueExpression(), addColumn.getType());
244-
if (defaultValue != null) {
232+
if (defaultValue != null && formatVersion >= 3) {
233+
updateSchema.addColumn(columnName, icebergType, columnComment, defaultValue);
245234
updateSchema.updateColumnDefault(columnName, defaultValue);
235+
} else {
236+
updateSchema.addColumn(columnName, icebergType, columnComment);
246237
}
247238
switch (columnWithPosition.getPosition()) {
248239
case FIRST:

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,12 +52,22 @@ public class IcebergTypeUtils {
5252
public static Types.NestedField convertCdcColumnToIcebergField(
5353
int index, PhysicalColumn column) {
5454
DataType dataType = column.getType();
55-
return Types.NestedField.of(
56-
index,
57-
dataType.isNullable(),
58-
column.getName(),
59-
convertCDCTypeToIcebergType(dataType),
60-
column.getComment());
55+
Literal<?> defaultValue = parseDefaultValue(column.getDefaultValueExpression(), dataType);
56+
Types.NestedField.Builder builder =
57+
Types.NestedField.builder()
58+
.withId(index)
59+
.withName(column.getName())
60+
.ofType(convertCDCTypeToIcebergType(dataType))
61+
.withDoc(column.getComment());
62+
if (dataType.isNullable()) {
63+
builder.asOptional();
64+
} else {
65+
builder.asRequired();
66+
}
67+
if (defaultValue != null) {
68+
builder.withInitialDefault(defaultValue).withWriteDefault(defaultValue);
69+
}
70+
return builder.build();
6171
}
6272

6373
/**

0 commit comments

Comments
 (0)