diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java index c571df4966..3088c9d308 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java @@ -210,15 +210,24 @@ public static TableDescriptor toFlussTable(ResolvedCatalogBaseTable catalogBa .collect(Collectors.toList())); // convert some flink options to fluss table configs. - Map properties = convertFlinkOptionsToFlussTableProperties(flinkTableConf); + Map storageProperties = + convertFlinkOptionsToFlussTableProperties(flinkTableConf); - if (properties.containsKey(TABLE_AUTO_INCREMENT_FIELDS.key())) { + if (storageProperties.containsKey(TABLE_AUTO_INCREMENT_FIELDS.key())) { for (String autoIncrementColumn : - properties.get(TABLE_AUTO_INCREMENT_FIELDS.key()).split(",")) { + storageProperties.get(TABLE_AUTO_INCREMENT_FIELDS.key()).split(",")) { schemBuilder.enableAutoIncrement(autoIncrementColumn); } } + // serialize computed column and watermark spec to custom properties + Map customProperties = + extractCustomProperties(flinkTableConf, storageProperties); + CatalogPropertiesUtils.serializeComputedColumns( + customProperties, resolvedSchema.getColumns()); + CatalogPropertiesUtils.serializeWatermarkSpecs( + customProperties, catalogBaseTable.getResolvedSchema().getWatermarkSpecs()); + Schema schema = schemBuilder.build(); resolvedSchema.getColumns().stream() @@ -236,12 +245,6 @@ public static TableDescriptor toFlussTable(ResolvedCatalogBaseTable catalogBa ? ((ResolvedCatalogTable) catalogBaseTable).getPartitionKeys() : ((ResolvedCatalogMaterializedTable) catalogBaseTable).getPartitionKeys(); - Map customProperties = flinkTableConf.toMap(); - CatalogPropertiesUtils.serializeComputedColumns( - customProperties, resolvedSchema.getColumns()); - CatalogPropertiesUtils.serializeWatermarkSpecs( - customProperties, catalogBaseTable.getResolvedSchema().getWatermarkSpecs()); - // Set materialized table flags to fluss table custom properties if (CatalogTableAdapter.isMaterializedTable(tableKind)) { CatalogMaterializedTable.RefreshMode refreshMode = @@ -283,7 +286,7 @@ public static TableDescriptor toFlussTable(ResolvedCatalogBaseTable catalogBa .partitionedBy(partitionKeys) .distributedBy(bucketNum, bucketKey) .comment(comment) - .properties(properties) + .properties(storageProperties) .customProperties(customProperties) .build(); } @@ -644,4 +647,11 @@ private static CatalogMaterializedTable toFlinkMaterializedTable( .serializedRefreshHandler(refreshHandlerBytes); return builder.build(); } + + private static Map extractCustomProperties( + Configuration allProperties, Map flussTableProperties) { + Map customProperties = new HashMap<>(allProperties.toMap()); + customProperties.keySet().removeAll(flussTableProperties.keySet()); + return customProperties; + } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java index bc04543a47..e544bfd1e9 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java @@ -17,6 +17,9 @@ package org.apache.fluss.flink.catalog; +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; @@ -25,6 +28,7 @@ import org.apache.fluss.exception.InvalidConfigException; import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.metadata.DataLakeFormat; +import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.server.testutils.FlussClusterExtension; @@ -586,7 +590,9 @@ void testTableWithExpression() throws Exception { + " cost AS price * quantity,\n" + " order_time TIMESTAMP(3),\n" + " WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND\n" - + ") with ('k1' = 'v1')"); + + ") with ('k1' = 'v1', 'bucket.num' = '2', " + + "'table.datalake.format' = 'paimon', " + + "'client.connect-timeout' = '120s')"); CatalogTable table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, "expression_test")); Schema.Builder schemaBuilder = Schema.newBuilder(); @@ -606,9 +612,36 @@ void testTableWithExpression() throws Exception { Map expectedOptions = new HashMap<>(); expectedOptions.put("k1", "v1"); expectedOptions.put(BUCKET_KEY.key(), "user"); - expectedOptions.put(BUCKET_NUMBER.key(), "1"); + expectedOptions.put(BUCKET_NUMBER.key(), "2"); expectedOptions.put("table.datalake.format", "paimon"); + expectedOptions.put("client.connect-timeout", "120s"); assertOptionsEqual(table.getOptions(), expectedOptions); + + // assert the stored table/custom configs + Configuration clientConfig = FLUSS_CLUSTER_EXTENSION.getClientConfig(); + try (Connection conn = ConnectionFactory.createConnection(clientConfig)) { + Admin admin = conn.getAdmin(); + TableInfo tableInfo = + admin.getTableInfo(TablePath.of(DEFAULT_DB, "expression_test")).get(); + + Map expectedTableProperties = new HashMap<>(); + expectedTableProperties.put("table.datalake.format", "paimon"); + expectedTableProperties.put("table.replication.factor", "1"); + assertThat(tableInfo.getProperties().toMap()).isEqualTo(expectedTableProperties); + + Map expectedCustomProperties = new HashMap<>(); + expectedCustomProperties.put("k1", "v1"); + expectedCustomProperties.put("client.connect-timeout", "120s"); + expectedCustomProperties.put( + "schema.watermark.0.strategy.expr", "`order_time` - INTERVAL '5' SECOND"); + expectedCustomProperties.put("schema.watermark.0.rowtime", "order_time"); + expectedCustomProperties.put("schema.watermark.0.strategy.data-type", "TIMESTAMP(3)"); + expectedCustomProperties.put("schema.4.name", "cost"); + expectedCustomProperties.put("schema.4.expr", "`price` * `quantity`"); + expectedCustomProperties.put("schema.4.data-type", "DOUBLE"); + expectedCustomProperties.put("bucket.num", "2"); + assertThat(tableInfo.getCustomProperties().toMap()).isEqualTo(expectedCustomProperties); + } } @Test diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java index cafc9d8fa4..b511c9050f 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java @@ -258,6 +258,43 @@ void testTableConversionWithOptions() { .containsEntry(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key(), "32s"); } + @Test + void testTableConversionForCustomProperties() { + Map options = new HashMap<>(); + // forward table option & enum type + options.put(ConfigOptions.TABLE_LOG_FORMAT.key(), "indexed"); + // forward client memory option + options.put(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE.key(), "64mb"); + // forward client duration option + options.put(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key(), "32s"); + + ResolvedSchema schema = + new ResolvedSchema( + Collections.singletonList( + Column.physical( + "order_id", + org.apache.flink.table.api.DataTypes.STRING().notNull())), + Collections.emptyList(), + null); + CatalogTable flinkTable = + CatalogTable.of( + Schema.newBuilder().fromResolvedSchema(schema).build(), + "test comment", + Collections.emptyList(), + options); + + TableDescriptor flussTable = + FlinkConversions.toFlussTable(new ResolvedCatalogTable(flinkTable, schema)); + + assertThat(flussTable.getProperties()) + .containsEntry(ConfigOptions.TABLE_LOG_FORMAT.key(), "indexed"); + + HashMap customProperties = new HashMap<>(); + customProperties.put(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE.key(), "64mb"); + customProperties.put(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key(), "32s"); + assertThat(flussTable.getCustomProperties()).containsExactlyEntriesOf(customProperties); + } + @Test void testOptionConversions() { ConfigOption flinkOption = FlinkConversions.toFlinkOption(ConfigOptions.TABLE_KV_FORMAT);