Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -210,15 +210,24 @@ public static TableDescriptor toFlussTable(ResolvedCatalogBaseTable<?> catalogBa
.collect(Collectors.toList()));

// convert some flink options to fluss table configs.
Map<String, String> properties = convertFlinkOptionsToFlussTableProperties(flinkTableConf);
Map<String, String> storageProperties =
convertFlinkOptionsToFlussTableProperties(flinkTableConf);

if (properties.containsKey(TABLE_AUTO_INCREMENT_FIELDS.key())) {
if (storageProperties.containsKey(TABLE_AUTO_INCREMENT_FIELDS.key())) {
for (String autoIncrementColumn :
properties.get(TABLE_AUTO_INCREMENT_FIELDS.key()).split(",")) {
storageProperties.get(TABLE_AUTO_INCREMENT_FIELDS.key()).split(",")) {
schemBuilder.enableAutoIncrement(autoIncrementColumn);
}
}

// serialize computed column and watermark spec to custom properties
Map<String, String> customProperties =
extractCustomProperties(flinkTableConf, storageProperties);
CatalogPropertiesUtils.serializeComputedColumns(
customProperties, resolvedSchema.getColumns());
CatalogPropertiesUtils.serializeWatermarkSpecs(
customProperties, catalogBaseTable.getResolvedSchema().getWatermarkSpecs());

Schema schema = schemBuilder.build();

resolvedSchema.getColumns().stream()
Expand All @@ -236,12 +245,6 @@ public static TableDescriptor toFlussTable(ResolvedCatalogBaseTable<?> catalogBa
? ((ResolvedCatalogTable) catalogBaseTable).getPartitionKeys()
: ((ResolvedCatalogMaterializedTable) catalogBaseTable).getPartitionKeys();

Map<String, String> customProperties = flinkTableConf.toMap();
CatalogPropertiesUtils.serializeComputedColumns(
customProperties, resolvedSchema.getColumns());
CatalogPropertiesUtils.serializeWatermarkSpecs(
customProperties, catalogBaseTable.getResolvedSchema().getWatermarkSpecs());

// Set materialized table flags to fluss table custom properties
if (CatalogTableAdapter.isMaterializedTable(tableKind)) {
CatalogMaterializedTable.RefreshMode refreshMode =
Expand Down Expand Up @@ -283,7 +286,7 @@ public static TableDescriptor toFlussTable(ResolvedCatalogBaseTable<?> catalogBa
.partitionedBy(partitionKeys)
.distributedBy(bucketNum, bucketKey)
.comment(comment)
.properties(properties)
.properties(storageProperties)
.customProperties(customProperties)
.build();
}
Expand Down Expand Up @@ -644,4 +647,11 @@ private static CatalogMaterializedTable toFlinkMaterializedTable(
.serializedRefreshHandler(refreshHandlerBytes);
return builder.build();
}

private static Map<String, String> extractCustomProperties(
Configuration allProperties, Map<String, String> flussTableProperties) {
Map<String, String> customProperties = new HashMap<>(allProperties.toMap());
customProperties.keySet().removeAll(flussTableProperties.keySet());
return customProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.fluss.flink.catalog;

import org.apache.fluss.client.Connection;
import org.apache.fluss.client.ConnectionFactory;
import org.apache.fluss.client.admin.Admin;
import org.apache.fluss.cluster.ServerNode;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
Expand All @@ -25,6 +28,7 @@
import org.apache.fluss.exception.InvalidConfigException;
import org.apache.fluss.exception.InvalidTableException;
import org.apache.fluss.metadata.DataLakeFormat;
import org.apache.fluss.metadata.TableInfo;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.testutils.FlussClusterExtension;

Expand Down Expand Up @@ -586,7 +590,9 @@ void testTableWithExpression() throws Exception {
+ " cost AS price * quantity,\n"
+ " order_time TIMESTAMP(3),\n"
+ " WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND\n"
+ ") with ('k1' = 'v1')");
+ ") with ('k1' = 'v1', 'bucket.num' = '2', "
+ "'table.datalake.format' = 'paimon', "
+ "'client.connect-timeout' = '120s')");
CatalogTable table =
(CatalogTable) catalog.getTable(new ObjectPath(DEFAULT_DB, "expression_test"));
Schema.Builder schemaBuilder = Schema.newBuilder();
Expand All @@ -606,9 +612,36 @@ void testTableWithExpression() throws Exception {
Map<String, String> expectedOptions = new HashMap<>();
expectedOptions.put("k1", "v1");
expectedOptions.put(BUCKET_KEY.key(), "user");
expectedOptions.put(BUCKET_NUMBER.key(), "1");
expectedOptions.put(BUCKET_NUMBER.key(), "2");
expectedOptions.put("table.datalake.format", "paimon");
expectedOptions.put("client.connect-timeout", "120s");
assertOptionsEqual(table.getOptions(), expectedOptions);

// assert the stored table/custom configs
Configuration clientConfig = FLUSS_CLUSTER_EXTENSION.getClientConfig();
try (Connection conn = ConnectionFactory.createConnection(clientConfig)) {
Admin admin = conn.getAdmin();
TableInfo tableInfo =
admin.getTableInfo(TablePath.of(DEFAULT_DB, "expression_test")).get();

Map<String, String> expectedTableProperties = new HashMap<>();
expectedTableProperties.put("table.datalake.format", "paimon");
expectedTableProperties.put("table.replication.factor", "1");
assertThat(tableInfo.getProperties().toMap()).isEqualTo(expectedTableProperties);

Map<String, String> expectedCustomProperties = new HashMap<>();
expectedCustomProperties.put("k1", "v1");
expectedCustomProperties.put("client.connect-timeout", "120s");
expectedCustomProperties.put(
"schema.watermark.0.strategy.expr", "`order_time` - INTERVAL '5' SECOND");
expectedCustomProperties.put("schema.watermark.0.rowtime", "order_time");
expectedCustomProperties.put("schema.watermark.0.strategy.data-type", "TIMESTAMP(3)");
expectedCustomProperties.put("schema.4.name", "cost");
expectedCustomProperties.put("schema.4.expr", "`price` * `quantity`");
expectedCustomProperties.put("schema.4.data-type", "DOUBLE");
expectedCustomProperties.put("bucket.num", "2");
assertThat(tableInfo.getCustomProperties().toMap()).isEqualTo(expectedCustomProperties);
}
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,43 @@ void testTableConversionWithOptions() {
.containsEntry(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key(), "32s");
}

@Test
void testTableConversionForCustomProperties() {
Map<String, String> options = new HashMap<>();
// forward table option & enum type
options.put(ConfigOptions.TABLE_LOG_FORMAT.key(), "indexed");
// forward client memory option
options.put(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE.key(), "64mb");
// forward client duration option
options.put(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key(), "32s");

ResolvedSchema schema =
new ResolvedSchema(
Collections.singletonList(
Column.physical(
"order_id",
org.apache.flink.table.api.DataTypes.STRING().notNull())),
Collections.emptyList(),
null);
CatalogTable flinkTable =
CatalogTable.of(
Schema.newBuilder().fromResolvedSchema(schema).build(),
"test comment",
Collections.emptyList(),
options);

TableDescriptor flussTable =
FlinkConversions.toFlussTable(new ResolvedCatalogTable(flinkTable, schema));

assertThat(flussTable.getProperties())
.containsEntry(ConfigOptions.TABLE_LOG_FORMAT.key(), "indexed");

HashMap<String, String> customProperties = new HashMap<>();
customProperties.put(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE.key(), "64mb");
customProperties.put(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key(), "32s");
assertThat(flussTable.getCustomProperties()).containsExactlyEntriesOf(customProperties);
}

@Test
void testOptionConversions() {
ConfigOption<?> flinkOption = FlinkConversions.toFlinkOption(ConfigOptions.TABLE_KV_FORMAT);
Expand Down