From 27ed8d32a9d9895274f89f4ee44625415b1ca5a9 Mon Sep 17 00:00:00 2001 From: Rion Williams Date: Tue, 23 Sep 2025 21:13:57 -0500 Subject: [PATCH 1/4] [flink] Improved Handling of Custom Properties During Fluss Table Creation [flink] Improved Handling of Custom Properties During Fluss Table Creation [flink] Improved Handling of Custom Properties During Fluss Table Creation --- .../fluss/flink/utils/FlinkConversions.java | 21 ++++++++--- .../flink/utils/FlinkConversionsTest.java | 37 +++++++++++++++++++ 2 files changed, 53 insertions(+), 5 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java index c571df4966..942920adac 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java @@ -236,11 +236,10 @@ public static TableDescriptor toFlussTable(ResolvedCatalogBaseTable catalogBa ? ((ResolvedCatalogTable) catalogBaseTable).getPartitionKeys() : ((ResolvedCatalogMaterializedTable) catalogBaseTable).getPartitionKeys(); - Map customProperties = flinkTableConf.toMap(); - CatalogPropertiesUtils.serializeComputedColumns( - customProperties, resolvedSchema.getColumns()); + Map properties = flinkTableConf.toMap(); + CatalogPropertiesUtils.serializeComputedColumns(properties, resolvedSchema.getColumns()); CatalogPropertiesUtils.serializeWatermarkSpecs( - customProperties, catalogBaseTable.getResolvedSchema().getWatermarkSpecs()); + properties, catalogBaseTable.getResolvedSchema().getWatermarkSpecs()); // Set materialized table flags to fluss table custom properties if (CatalogTableAdapter.isMaterializedTable(tableKind)) { @@ -255,6 +254,11 @@ public static TableDescriptor toFlussTable(ResolvedCatalogBaseTable catalogBa } String comment = catalogBaseTable.getComment(); + // convert some flink options to fluss table configs. + Map flussTableProperties = + convertFlinkOptionsToFlussTableProperties(flinkTableConf); + Map customProperties = + extractCustomProperties(properties, flussTableProperties); // then set distributed by information List bucketKey; @@ -283,7 +287,7 @@ public static TableDescriptor toFlussTable(ResolvedCatalogBaseTable catalogBa .partitionedBy(partitionKeys) .distributedBy(bucketNum, bucketKey) .comment(comment) - .properties(properties) + .properties(flussTableProperties) .customProperties(customProperties) .build(); } @@ -644,4 +648,11 @@ private static CatalogMaterializedTable toFlinkMaterializedTable( .serializedRefreshHandler(refreshHandlerBytes); return builder.build(); } + + private static Map extractCustomProperties( + Map allProperties, Map flussTableProperties) { + Map customProperties = new HashMap<>(allProperties); + customProperties.keySet().removeAll(flussTableProperties.keySet()); + return customProperties; + } } diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java index cafc9d8fa4..b511c9050f 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java @@ -258,6 +258,43 @@ void testTableConversionWithOptions() { .containsEntry(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key(), "32s"); } + @Test + void testTableConversionForCustomProperties() { + Map options = new HashMap<>(); + // forward table option & enum type + options.put(ConfigOptions.TABLE_LOG_FORMAT.key(), "indexed"); + // forward client memory option + options.put(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE.key(), "64mb"); + // forward client duration option + options.put(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key(), "32s"); + + ResolvedSchema schema = + new ResolvedSchema( + Collections.singletonList( + Column.physical( + "order_id", + org.apache.flink.table.api.DataTypes.STRING().notNull())), + Collections.emptyList(), + null); + CatalogTable flinkTable = + CatalogTable.of( + Schema.newBuilder().fromResolvedSchema(schema).build(), + "test comment", + Collections.emptyList(), + options); + + TableDescriptor flussTable = + FlinkConversions.toFlussTable(new ResolvedCatalogTable(flinkTable, schema)); + + assertThat(flussTable.getProperties()) + .containsEntry(ConfigOptions.TABLE_LOG_FORMAT.key(), "indexed"); + + HashMap customProperties = new HashMap<>(); + customProperties.put(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE.key(), "64mb"); + customProperties.put(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key(), "32s"); + assertThat(flussTable.getCustomProperties()).containsExactlyEntriesOf(customProperties); + } + @Test void testOptionConversions() { ConfigOption flinkOption = FlinkConversions.toFlinkOption(ConfigOptions.TABLE_KV_FORMAT); From f96950d99060ceb980f2799a7a7db3bb5be7cd94 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Wed, 24 Dec 2025 19:16:06 +0800 Subject: [PATCH 2/4] rebase and resolve conflicts --- .../fluss/flink/utils/FlinkConversions.java | 31 +++++++++---------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java index 942920adac..3088c9d308 100644 --- a/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java +++ b/fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java @@ -210,15 +210,24 @@ public static TableDescriptor toFlussTable(ResolvedCatalogBaseTable catalogBa .collect(Collectors.toList())); // convert some flink options to fluss table configs. - Map properties = convertFlinkOptionsToFlussTableProperties(flinkTableConf); + Map storageProperties = + convertFlinkOptionsToFlussTableProperties(flinkTableConf); - if (properties.containsKey(TABLE_AUTO_INCREMENT_FIELDS.key())) { + if (storageProperties.containsKey(TABLE_AUTO_INCREMENT_FIELDS.key())) { for (String autoIncrementColumn : - properties.get(TABLE_AUTO_INCREMENT_FIELDS.key()).split(",")) { + storageProperties.get(TABLE_AUTO_INCREMENT_FIELDS.key()).split(",")) { schemBuilder.enableAutoIncrement(autoIncrementColumn); } } + // serialize computed column and watermark spec to custom properties + Map customProperties = + extractCustomProperties(flinkTableConf, storageProperties); + CatalogPropertiesUtils.serializeComputedColumns( + customProperties, resolvedSchema.getColumns()); + CatalogPropertiesUtils.serializeWatermarkSpecs( + customProperties, catalogBaseTable.getResolvedSchema().getWatermarkSpecs()); + Schema schema = schemBuilder.build(); resolvedSchema.getColumns().stream() @@ -236,11 +245,6 @@ public static TableDescriptor toFlussTable(ResolvedCatalogBaseTable catalogBa ? ((ResolvedCatalogTable) catalogBaseTable).getPartitionKeys() : ((ResolvedCatalogMaterializedTable) catalogBaseTable).getPartitionKeys(); - Map properties = flinkTableConf.toMap(); - CatalogPropertiesUtils.serializeComputedColumns(properties, resolvedSchema.getColumns()); - CatalogPropertiesUtils.serializeWatermarkSpecs( - properties, catalogBaseTable.getResolvedSchema().getWatermarkSpecs()); - // Set materialized table flags to fluss table custom properties if (CatalogTableAdapter.isMaterializedTable(tableKind)) { CatalogMaterializedTable.RefreshMode refreshMode = @@ -254,11 +258,6 @@ public static TableDescriptor toFlussTable(ResolvedCatalogBaseTable catalogBa } String comment = catalogBaseTable.getComment(); - // convert some flink options to fluss table configs. - Map flussTableProperties = - convertFlinkOptionsToFlussTableProperties(flinkTableConf); - Map customProperties = - extractCustomProperties(properties, flussTableProperties); // then set distributed by information List bucketKey; @@ -287,7 +286,7 @@ public static TableDescriptor toFlussTable(ResolvedCatalogBaseTable catalogBa .partitionedBy(partitionKeys) .distributedBy(bucketNum, bucketKey) .comment(comment) - .properties(flussTableProperties) + .properties(storageProperties) .customProperties(customProperties) .build(); } @@ -650,8 +649,8 @@ private static CatalogMaterializedTable toFlinkMaterializedTable( } private static Map extractCustomProperties( - Map allProperties, Map flussTableProperties) { - Map customProperties = new HashMap<>(allProperties); + Configuration allProperties, Map flussTableProperties) { + Map customProperties = new HashMap<>(allProperties.toMap()); customProperties.keySet().removeAll(flussTableProperties.keySet()); return customProperties; } From 5ce69af94032fe0a5c4dd568008244d19c0fd1e4 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Wed, 24 Dec 2025 19:41:57 +0800 Subject: [PATCH 3/4] add test --- .../flink/catalog/FlinkCatalogITCase.java | 37 ++++++++++++++++++- 1 file changed, 35 insertions(+), 2 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java index bc04543a47..5394ee5256 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java @@ -17,6 +17,9 @@ package org.apache.fluss.flink.catalog; +import org.apache.fluss.client.Connection; +import org.apache.fluss.client.ConnectionFactory; +import org.apache.fluss.client.admin.Admin; import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; @@ -25,6 +28,7 @@ import org.apache.fluss.exception.InvalidConfigException; import org.apache.fluss.exception.InvalidTableException; import org.apache.fluss.metadata.DataLakeFormat; +import org.apache.fluss.metadata.TableInfo; import org.apache.fluss.metadata.TablePath; import org.apache.fluss.server.testutils.FlussClusterExtension; @@ -586,7 +590,9 @@ void testTableWithExpression() throws Exception { + " cost AS price * quantity,\n" + " order_time TIMESTAMP(3),\n" + " WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND\n" - + ") with ('k1' = 'v1')"); + + ") with ('k1' = 'v1', 'bucket.num' = '2', " + + "'table.datalake.format' = 'paimon', " + + "'client.connect-timeout' = '120s')"); CatalogTable table = (CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, "expression_test")); Schema.Builder schemaBuilder = Schema.newBuilder(); @@ -606,9 +612,36 @@ void testTableWithExpression() throws Exception { Map expectedOptions = new HashMap<>(); expectedOptions.put("k1", "v1"); expectedOptions.put(BUCKET_KEY.key(), "user"); - expectedOptions.put(BUCKET_NUMBER.key(), "1"); + expectedOptions.put(BUCKET_NUMBER.key(), "2"); expectedOptions.put("table.datalake.format", "paimon"); + expectedOptions.put("client.connect-timeout", "120s"); assertOptionsEqual(table.getOptions(), expectedOptions); + + // assert the stored table/custom configs + Configuration clientConfig = FLUSS_CLUSTER_EXTENSION.getClientConfig(); + try (Connection conn = ConnectionFactory.createConnection(clientConfig)) { + Admin admin = conn.getAdmin(); + TableInfo tableInfo = + admin.getTableInfo(TablePath.of(DEFAULT_DB, "expression_test")).get(); + + Map expectedTableProperties = new HashMap<>(); + expectedTableProperties.put("table.datalake.format", "paimon"); + expectedTableProperties.put("table.replication.factor", "1"); + assertThat(tableInfo.getTableConfig()).isEqualTo(expectedTableProperties); + + Map expectedCustomProperties = new HashMap<>(); + expectedCustomProperties.put("k1", "v1"); + expectedCustomProperties.put("client.connect-timeout", "120s"); + expectedCustomProperties.put( + "schema.watermark.0.strategy.expr", "`order_time` - INTERVAL '5' SECOND"); + expectedCustomProperties.put("schema.watermark.0.rowtime", "order_time"); + expectedCustomProperties.put("schema.watermark.0.data-type", "TIMESTAMP(3)"); + expectedCustomProperties.put("schema.4.name", "cost"); + expectedCustomProperties.put("schema.4.expr", "`price` * `quantity`"); + expectedCustomProperties.put("schema.4.data-type", "DOUBLE"); + expectedCustomProperties.put("bucket.num", "2"); + assertThat(tableInfo.getCustomProperties()).isEqualTo(expectedCustomProperties); + } } @Test From 51ef65c1c14fda9e97ddb20238ef991a937b9c33 Mon Sep 17 00:00:00 2001 From: Jark Wu Date: Wed, 24 Dec 2025 22:52:24 +0800 Subject: [PATCH 4/4] fix --- .../org/apache/fluss/flink/catalog/FlinkCatalogITCase.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java index 5394ee5256..e544bfd1e9 100644 --- a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java +++ b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java @@ -627,7 +627,7 @@ void testTableWithExpression() throws Exception { Map expectedTableProperties = new HashMap<>(); expectedTableProperties.put("table.datalake.format", "paimon"); expectedTableProperties.put("table.replication.factor", "1"); - assertThat(tableInfo.getTableConfig()).isEqualTo(expectedTableProperties); + assertThat(tableInfo.getProperties().toMap()).isEqualTo(expectedTableProperties); Map expectedCustomProperties = new HashMap<>(); expectedCustomProperties.put("k1", "v1"); @@ -635,12 +635,12 @@ void testTableWithExpression() throws Exception { expectedCustomProperties.put( "schema.watermark.0.strategy.expr", "`order_time` - INTERVAL '5' SECOND"); expectedCustomProperties.put("schema.watermark.0.rowtime", "order_time"); - expectedCustomProperties.put("schema.watermark.0.data-type", "TIMESTAMP(3)"); + expectedCustomProperties.put("schema.watermark.0.strategy.data-type", "TIMESTAMP(3)"); expectedCustomProperties.put("schema.4.name", "cost"); expectedCustomProperties.put("schema.4.expr", "`price` * `quantity`"); expectedCustomProperties.put("schema.4.data-type", "DOUBLE"); expectedCustomProperties.put("bucket.num", "2"); - assertThat(tableInfo.getCustomProperties()).isEqualTo(expectedCustomProperties); + assertThat(tableInfo.getCustomProperties().toMap()).isEqualTo(expectedCustomProperties); } }