Skip to content

Commit 6f200fa

Browse files
authored
[flink] Fix creating table from Flink will make all table properties in custom properties (#1748)
1 parent 097d221 commit 6f200fa

File tree

3 files changed

+92
-12
lines changed

3 files changed

+92
-12
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -210,15 +210,24 @@ public static TableDescriptor toFlussTable(ResolvedCatalogBaseTable<?> catalogBa
210210
.collect(Collectors.toList()));
211211

212212
// convert some flink options to fluss table configs.
213-
Map<String, String> properties = convertFlinkOptionsToFlussTableProperties(flinkTableConf);
213+
Map<String, String> storageProperties =
214+
convertFlinkOptionsToFlussTableProperties(flinkTableConf);
214215

215-
if (properties.containsKey(TABLE_AUTO_INCREMENT_FIELDS.key())) {
216+
if (storageProperties.containsKey(TABLE_AUTO_INCREMENT_FIELDS.key())) {
216217
for (String autoIncrementColumn :
217-
properties.get(TABLE_AUTO_INCREMENT_FIELDS.key()).split(",")) {
218+
storageProperties.get(TABLE_AUTO_INCREMENT_FIELDS.key()).split(",")) {
218219
schemBuilder.enableAutoIncrement(autoIncrementColumn);
219220
}
220221
}
221222

223+
// serialize computed column and watermark spec to custom properties
224+
Map<String, String> customProperties =
225+
extractCustomProperties(flinkTableConf, storageProperties);
226+
CatalogPropertiesUtils.serializeComputedColumns(
227+
customProperties, resolvedSchema.getColumns());
228+
CatalogPropertiesUtils.serializeWatermarkSpecs(
229+
customProperties, catalogBaseTable.getResolvedSchema().getWatermarkSpecs());
230+
222231
Schema schema = schemBuilder.build();
223232

224233
resolvedSchema.getColumns().stream()
@@ -236,12 +245,6 @@ public static TableDescriptor toFlussTable(ResolvedCatalogBaseTable<?> catalogBa
236245
? ((ResolvedCatalogTable) catalogBaseTable).getPartitionKeys()
237246
: ((ResolvedCatalogMaterializedTable) catalogBaseTable).getPartitionKeys();
238247

239-
Map<String, String> customProperties = flinkTableConf.toMap();
240-
CatalogPropertiesUtils.serializeComputedColumns(
241-
customProperties, resolvedSchema.getColumns());
242-
CatalogPropertiesUtils.serializeWatermarkSpecs(
243-
customProperties, catalogBaseTable.getResolvedSchema().getWatermarkSpecs());
244-
245248
// Set materialized table flags to fluss table custom properties
246249
if (CatalogTableAdapter.isMaterializedTable(tableKind)) {
247250
CatalogMaterializedTable.RefreshMode refreshMode =
@@ -283,7 +286,7 @@ public static TableDescriptor toFlussTable(ResolvedCatalogBaseTable<?> catalogBa
283286
.partitionedBy(partitionKeys)
284287
.distributedBy(bucketNum, bucketKey)
285288
.comment(comment)
286-
.properties(properties)
289+
.properties(storageProperties)
287290
.customProperties(customProperties)
288291
.build();
289292
}
@@ -644,4 +647,11 @@ private static CatalogMaterializedTable toFlinkMaterializedTable(
644647
.serializedRefreshHandler(refreshHandlerBytes);
645648
return builder.build();
646649
}
650+
651+
private static Map<String, String> extractCustomProperties(
652+
Configuration allProperties, Map<String, String> flussTableProperties) {
653+
Map<String, String> customProperties = new HashMap<>(allProperties.toMap());
654+
customProperties.keySet().removeAll(flussTableProperties.keySet());
655+
return customProperties;
656+
}
647657
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.fluss.flink.catalog;
1919

20+
import org.apache.fluss.client.Connection;
21+
import org.apache.fluss.client.ConnectionFactory;
22+
import org.apache.fluss.client.admin.Admin;
2023
import org.apache.fluss.cluster.ServerNode;
2124
import org.apache.fluss.config.ConfigOptions;
2225
import org.apache.fluss.config.Configuration;
@@ -25,6 +28,7 @@
2528
import org.apache.fluss.exception.InvalidConfigException;
2629
import org.apache.fluss.exception.InvalidTableException;
2730
import org.apache.fluss.metadata.DataLakeFormat;
31+
import org.apache.fluss.metadata.TableInfo;
2832
import org.apache.fluss.metadata.TablePath;
2933
import org.apache.fluss.server.testutils.FlussClusterExtension;
3034

@@ -586,7 +590,9 @@ void testTableWithExpression() throws Exception {
586590
+ " cost AS price * quantity,\n"
587591
+ " order_time TIMESTAMP(3),\n"
588592
+ " WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND\n"
589-
+ ") with ('k1' = 'v1')");
593+
+ ") with ('k1' = 'v1', 'bucket.num' = '2', "
594+
+ "'table.datalake.format' = 'paimon', "
595+
+ "'client.connect-timeout' = '120s')");
590596
CatalogTable table =
591597
(CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, "expression_test"));
592598
Schema.Builder schemaBuilder = Schema.newBuilder();
@@ -606,9 +612,36 @@ void testTableWithExpression() throws Exception {
606612
Map<String, String> expectedOptions = new HashMap<>();
607613
expectedOptions.put("k1", "v1");
608614
expectedOptions.put(BUCKET_KEY.key(), "user");
609-
expectedOptions.put(BUCKET_NUMBER.key(), "1");
615+
expectedOptions.put(BUCKET_NUMBER.key(), "2");
610616
expectedOptions.put("table.datalake.format", "paimon");
617+
expectedOptions.put("client.connect-timeout", "120s");
611618
assertOptionsEqual(table.getOptions(), expectedOptions);
619+
620+
// assert the stored table/custom configs
621+
Configuration clientConfig = FLUSS_CLUSTER_EXTENSION.getClientConfig();
622+
try (Connection conn = ConnectionFactory.createConnection(clientConfig)) {
623+
Admin admin = conn.getAdmin();
624+
TableInfo tableInfo =
625+
admin.getTableInfo(TablePath.of(DEFAULT_DB, "expression_test")).get();
626+
627+
Map<String, String> expectedTableProperties = new HashMap<>();
628+
expectedTableProperties.put("table.datalake.format", "paimon");
629+
expectedTableProperties.put("table.replication.factor", "1");
630+
assertThat(tableInfo.getProperties().toMap()).isEqualTo(expectedTableProperties);
631+
632+
Map<String, String> expectedCustomProperties = new HashMap<>();
633+
expectedCustomProperties.put("k1", "v1");
634+
expectedCustomProperties.put("client.connect-timeout", "120s");
635+
expectedCustomProperties.put(
636+
"schema.watermark.0.strategy.expr", "`order_time` - INTERVAL '5' SECOND");
637+
expectedCustomProperties.put("schema.watermark.0.rowtime", "order_time");
638+
expectedCustomProperties.put("schema.watermark.0.strategy.data-type", "TIMESTAMP(3)");
639+
expectedCustomProperties.put("schema.4.name", "cost");
640+
expectedCustomProperties.put("schema.4.expr", "`price` * `quantity`");
641+
expectedCustomProperties.put("schema.4.data-type", "DOUBLE");
642+
expectedCustomProperties.put("bucket.num", "2");
643+
assertThat(tableInfo.getCustomProperties().toMap()).isEqualTo(expectedCustomProperties);
644+
}
612645
}
613646

614647
@Test

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,43 @@ void testTableConversionWithOptions() {
258258
.containsEntry(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key(), "32s");
259259
}
260260

261+
@Test
262+
void testTableConversionForCustomProperties() {
263+
Map<String, String> options = new HashMap<>();
264+
// forward table option & enum type
265+
options.put(ConfigOptions.TABLE_LOG_FORMAT.key(), "indexed");
266+
// forward client memory option
267+
options.put(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE.key(), "64mb");
268+
// forward client duration option
269+
options.put(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key(), "32s");
270+
271+
ResolvedSchema schema =
272+
new ResolvedSchema(
273+
Collections.singletonList(
274+
Column.physical(
275+
"order_id",
276+
org.apache.flink.table.api.DataTypes.STRING().notNull())),
277+
Collections.emptyList(),
278+
null);
279+
CatalogTable flinkTable =
280+
CatalogTable.of(
281+
Schema.newBuilder().fromResolvedSchema(schema).build(),
282+
"test comment",
283+
Collections.emptyList(),
284+
options);
285+
286+
TableDescriptor flussTable =
287+
FlinkConversions.toFlussTable(new ResolvedCatalogTable(flinkTable, schema));
288+
289+
assertThat(flussTable.getProperties())
290+
.containsEntry(ConfigOptions.TABLE_LOG_FORMAT.key(), "indexed");
291+
292+
HashMap<String, String> customProperties = new HashMap<>();
293+
customProperties.put(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE.key(), "64mb");
294+
customProperties.put(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key(), "32s");
295+
assertThat(flussTable.getCustomProperties()).containsExactlyEntriesOf(customProperties);
296+
}
297+
261298
@Test
262299
void testOptionConversions() {
263300
ConfigOption<?> flinkOption = FlinkConversions.toFlinkOption(ConfigOptions.TABLE_KV_FORMAT);

0 commit comments

Comments
 (0)