3838
3939import org .apache .hadoop .conf .Configuration ;
4040import org .apache .iceberg .CatalogUtil ;
41+ import org .apache .iceberg .HasTableOperations ;
4142import org .apache .iceberg .PartitionSpec ;
4243import org .apache .iceberg .Schema ;
4344import org .apache .iceberg .Table ;
4647import org .apache .iceberg .catalog .Namespace ;
4748import org .apache .iceberg .catalog .SupportsNamespaces ;
4849import org .apache .iceberg .catalog .TableIdentifier ;
50+ import org .apache .iceberg .expressions .Literal ;
4951import org .apache .iceberg .flink .FlinkSchemaUtil ;
5052import org .apache .iceberg .types .Type ;
5153import org .apache .iceberg .types .Types ;
@@ -178,7 +180,12 @@ private void applyCreateTable(CreateTableEvent event) {
178180 }
179181 PartitionSpec partitionSpec = generatePartitionSpec (icebergSchema , partitionColumns );
180182 if (!catalog .tableExists (tableIdentifier )) {
181- catalog .createTable (tableIdentifier , icebergSchema , partitionSpec , tableOptions );
183+ Table table =
184+ catalog .createTable (
185+ tableIdentifier , icebergSchema , partitionSpec , tableOptions );
186+
187+ applyDefaultValues (table , cdcSchema );
188+
182189 LOG .info (
183190 "Spend {} ms to create iceberg table {}" ,
184191 System .currentTimeMillis () - startTimestamp ,
@@ -189,6 +196,28 @@ private void applyCreateTable(CreateTableEvent event) {
189196 }
190197 }
191198
199+ private void applyDefaultValues (
200+ Table table , org .apache .flink .cdc .common .schema .Schema cdcSchema ) {
201+ if (getFormatVersion (table ) < 3 ) {
202+ return ;
203+ }
204+ UpdateSchema updateSchema = null ;
205+ for (Column column : cdcSchema .getColumns ()) {
206+ Literal <?> defaultValue =
207+ IcebergTypeUtils .parseDefaultValue (
208+ column .getDefaultValueExpression (), column .getType ());
209+ if (defaultValue != null ) {
210+ if (updateSchema == null ) {
211+ updateSchema = table .updateSchema ();
212+ }
213+ updateSchema .updateColumnDefault (column .getName (), defaultValue );
214+ }
215+ }
216+ if (updateSchema != null ) {
217+ updateSchema .commit ();
218+ }
219+ }
220+
192221 private void applyAddColumn (AddColumnEvent event ) {
193222 TableIdentifier tableIdentifier = TableIdentifier .parse (event .tableId ().identifier ());
194223 try {
@@ -212,24 +241,32 @@ private void applyAddColumnEventWithPosition(Table table, AddColumnEvent event)
212241 FlinkSchemaUtil .convert (
213242 DataTypeUtils .toFlinkDataType (addColumn .getType ())
214243 .getLogicalType ());
244+ Literal <?> defaultValue =
245+ IcebergTypeUtils .parseDefaultValue (
246+ addColumn .getDefaultValueExpression (), addColumn .getType ());
247+ if (defaultValue != null && getFormatVersion (table ) >= 3 ) {
248+ updateSchema .addColumn (columnName , icebergType , columnComment , defaultValue );
249+ updateSchema .updateColumnDefault (columnName , defaultValue );
250+ } else {
251+ updateSchema .addColumn (columnName , icebergType , columnComment );
252+ }
215253 switch (columnWithPosition .getPosition ()) {
216254 case FIRST :
217- updateSchema .addColumn (columnName , icebergType , columnComment );
218- table .updateSchema ().moveFirst (columnName );
255+ updateSchema .moveFirst (columnName );
219256 break ;
220257 case LAST :
221- updateSchema .addColumn (columnName , icebergType , columnComment );
222258 break ;
223259 case BEFORE :
224- updateSchema .addColumn (columnName , icebergType , columnComment );
260+ checkNotNull (
261+ columnWithPosition .getExistedColumnName (),
262+ "Existing column name must be provided for BEFORE position" );
225263 updateSchema .moveBefore (
226264 columnName , columnWithPosition .getExistedColumnName ());
227265 break ;
228266 case AFTER :
229267 checkNotNull (
230268 columnWithPosition .getExistedColumnName (),
231269 "Existing column name must be provided for AFTER position" );
232- updateSchema .addColumn (columnName , icebergType , columnComment );
233270 updateSchema .moveAfter (
234271 columnName , columnWithPosition .getExistedColumnName ());
235272 break ;
@@ -364,6 +401,13 @@ public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
364401 SchemaChangeEventType .ALTER_COLUMN_TYPE );
365402 }
366403
404+ private int getFormatVersion (Table table ) {
405+ if (table instanceof HasTableOperations ) {
406+ return ((HasTableOperations ) table ).operations ().current ().formatVersion ();
407+ }
408+ return 2 ;
409+ }
410+
367411 @ Override
368412 public void close () {
369413 catalog = null ;
0 commit comments