@@ -210,15 +210,24 @@ public static TableDescriptor toFlussTable(ResolvedCatalogBaseTable<?> catalogBa
210210 .collect (Collectors .toList ()));
211211
212212 // convert some flink options to fluss table configs.
213- Map <String , String > properties = convertFlinkOptionsToFlussTableProperties (flinkTableConf );
213+ Map <String , String > storageProperties =
214+ convertFlinkOptionsToFlussTableProperties (flinkTableConf );
214215
215- if (properties .containsKey (TABLE_AUTO_INCREMENT_FIELDS .key ())) {
216+ if (storageProperties .containsKey (TABLE_AUTO_INCREMENT_FIELDS .key ())) {
216217 for (String autoIncrementColumn :
217- properties .get (TABLE_AUTO_INCREMENT_FIELDS .key ()).split ("," )) {
218+ storageProperties .get (TABLE_AUTO_INCREMENT_FIELDS .key ()).split ("," )) {
218219 schemBuilder .enableAutoIncrement (autoIncrementColumn );
219220 }
220221 }
221222
223+ // serialize computed column and watermark spec to custom properties
224+ Map <String , String > customProperties =
225+ extractCustomProperties (flinkTableConf , storageProperties );
226+ CatalogPropertiesUtils .serializeComputedColumns (
227+ customProperties , resolvedSchema .getColumns ());
228+ CatalogPropertiesUtils .serializeWatermarkSpecs (
229+ customProperties , catalogBaseTable .getResolvedSchema ().getWatermarkSpecs ());
230+
222231 Schema schema = schemBuilder .build ();
223232
224233 resolvedSchema .getColumns ().stream ()
@@ -236,11 +245,6 @@ public static TableDescriptor toFlussTable(ResolvedCatalogBaseTable<?> catalogBa
236245 ? ((ResolvedCatalogTable ) catalogBaseTable ).getPartitionKeys ()
237246 : ((ResolvedCatalogMaterializedTable ) catalogBaseTable ).getPartitionKeys ();
238247
239- Map <String , String > properties = flinkTableConf .toMap ();
240- CatalogPropertiesUtils .serializeComputedColumns (properties , resolvedSchema .getColumns ());
241- CatalogPropertiesUtils .serializeWatermarkSpecs (
242- properties , catalogBaseTable .getResolvedSchema ().getWatermarkSpecs ());
243-
244248 // Set materialized table flags to fluss table custom properties
245249 if (CatalogTableAdapter .isMaterializedTable (tableKind )) {
246250 CatalogMaterializedTable .RefreshMode refreshMode =
@@ -254,11 +258,6 @@ public static TableDescriptor toFlussTable(ResolvedCatalogBaseTable<?> catalogBa
254258 }
255259
256260 String comment = catalogBaseTable .getComment ();
257- // convert some flink options to fluss table configs.
258- Map <String , String > flussTableProperties =
259- convertFlinkOptionsToFlussTableProperties (flinkTableConf );
260- Map <String , String > customProperties =
261- extractCustomProperties (properties , flussTableProperties );
262261
263262 // then set distributed by information
264263 List <String > bucketKey ;
@@ -287,7 +286,7 @@ public static TableDescriptor toFlussTable(ResolvedCatalogBaseTable<?> catalogBa
287286 .partitionedBy (partitionKeys )
288287 .distributedBy (bucketNum , bucketKey )
289288 .comment (comment )
290- .properties (flussTableProperties )
289+ .properties (storageProperties )
291290 .customProperties (customProperties )
292291 .build ();
293292 }
@@ -650,8 +649,8 @@ private static CatalogMaterializedTable toFlinkMaterializedTable(
650649 }
651650
652651 private static Map <String , String > extractCustomProperties (
653- Map < String , String > allProperties , Map <String , String > flussTableProperties ) {
654- Map <String , String > customProperties = new HashMap <>(allProperties );
652+ Configuration allProperties , Map <String , String > flussTableProperties ) {
653+ Map <String , String > customProperties = new HashMap <>(allProperties . toMap () );
655654 customProperties .keySet ().removeAll (flussTableProperties .keySet ());
656655 return customProperties ;
657656 }
0 commit comments