Skip to content

Commit 39f2d04

Browse files
committed
fix: add column default v2 table
1 parent 1b840ed commit 39f2d04

2 files changed

Lines changed: 110 additions & 1 deletion

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,11 +213,29 @@ private void applyAddColumnEventWithPosition(Table table, AddColumnEvent event)
213213
throws SchemaEvolveException {
214214

215215
try {
216-
UpdateSchema updateSchema = table.updateSchema();
217216
int formatVersion =
218217
table instanceof HasTableOperations
219218
? ((HasTableOperations) table).operations().current().formatVersion()
220219
: 2;
220+
221+
// Upgrade to format version 3 if any added column has a default value.
222+
if (formatVersion < 3) {
223+
boolean hasDefaults =
224+
event.getAddedColumns().stream()
225+
.anyMatch(
226+
cwp ->
227+
IcebergTypeUtils.parseDefaultValue(
228+
cwp.getAddColumn()
229+
.getDefaultValueExpression(),
230+
cwp.getAddColumn().getType())
231+
!= null);
232+
if (hasDefaults) {
233+
table.updateProperties().set("format-version", "3").commit();
234+
formatVersion = 3;
235+
}
236+
}
237+
238+
UpdateSchema updateSchema = table.updateSchema();
221239
for (AddColumnEvent.ColumnWithPosition columnWithPosition : event.getAddedColumns()) {
222240
Column addColumn = columnWithPosition.getAddColumn();
223241
String columnName = addColumn.getName();

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplierTest.java

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,97 @@ public void testApplySchemaChangeHadoopCatalog() throws InterruptedException {
8787
runSchemaChangeTest(catalogOptions);
8888
}
8989

90+
@Test
91+
public void testAddColumnWithDefaultOnV2Table() {
92+
Map<String, String> catalogOptions = new HashMap<>();
93+
String warehouse =
94+
new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString();
95+
catalogOptions.put("type", "hadoop");
96+
catalogOptions.put("warehouse", warehouse);
97+
catalogOptions.put("cache-enabled", "false");
98+
99+
Catalog catalog =
100+
CatalogUtil.buildIcebergCatalog(
101+
"cdc-iceberg-catalog", catalogOptions, new Configuration());
102+
103+
IcebergMetadataApplier icebergMetadataApplier = new IcebergMetadataApplier(catalogOptions);
104+
String defaultTableId = "test.v2_table";
105+
TableId tableId = TableId.parse(defaultTableId);
106+
107+
// Create table WITHOUT default values (should be format-version 2).
108+
CreateTableEvent createTableEvent =
109+
new CreateTableEvent(
110+
tableId,
111+
Schema.newBuilder()
112+
.physicalColumn("id", DataTypes.BIGINT().notNull(), "column for id")
113+
.physicalColumn("name", DataTypes.VARCHAR(255), "column for name")
114+
.primaryKey("id")
115+
.build());
116+
icebergMetadataApplier.applySchemaChange(createTableEvent);
117+
Table table = catalog.loadTable(TableIdentifier.parse(defaultTableId));
118+
119+
// Verify table is format-version 2.
120+
int formatVersion =
121+
((org.apache.iceberg.HasTableOperations) table)
122+
.operations()
123+
.current()
124+
.formatVersion();
125+
assertThat(formatVersion).isEqualTo(2);
126+
127+
// Add column WITH default value on the v2 table.
128+
AddColumnEvent addColumnEvent =
129+
new AddColumnEvent(
130+
tableId,
131+
Collections.singletonList(
132+
AddColumnEvent.last(
133+
new PhysicalColumn(
134+
"status",
135+
DataTypes.VARCHAR(50),
136+
"column for status",
137+
"active"))));
138+
icebergMetadataApplier.applySchemaChange(addColumnEvent);
139+
140+
// Reload table and verify it was upgraded to format-version 3.
141+
table = catalog.loadTable(TableIdentifier.parse(defaultTableId));
142+
formatVersion =
143+
((org.apache.iceberg.HasTableOperations) table)
144+
.operations()
145+
.current()
146+
.formatVersion();
147+
assertThat(formatVersion).isEqualTo(3);
148+
149+
// Verify the new column has both initial-default and write-default.
150+
org.apache.iceberg.Schema expectedSchema =
151+
new org.apache.iceberg.Schema(
152+
0,
153+
Arrays.asList(
154+
Types.NestedField.builder()
155+
.withId(1)
156+
.asRequired()
157+
.withName("id")
158+
.ofType(Types.LongType.get())
159+
.withDoc("column for id")
160+
.build(),
161+
Types.NestedField.builder()
162+
.withId(2)
163+
.asOptional()
164+
.withName("name")
165+
.ofType(Types.StringType.get())
166+
.withDoc("column for name")
167+
.build(),
168+
Types.NestedField.builder()
169+
.withId(3)
170+
.asOptional()
171+
.withName("status")
172+
.ofType(Types.StringType.get())
173+
.withDoc("column for status")
174+
.withInitialDefault(Literal.of("active"))
175+
.withWriteDefault(Literal.of("active"))
176+
.build()),
177+
new HashSet<>(Collections.singletonList(1)));
178+
assertThat(table.schema().sameSchema(expectedSchema)).isTrue();
179+
}
180+
90181
public void runSchemaChangeTest(Map<String, String> catalogOptions) {
91182
Catalog catalog =
92183
CatalogUtil.buildIcebergCatalog(

0 commit comments

Comments
 (0)