Skip to content

Commit 996e2a1

Browse files
committed
[flink] Improved Handling of Custom Properties During Fluss Table Creation
[flink] Improved Handling of Custom Properties During Fluss Table Creation
1 parent 640f40c commit 996e2a1

File tree

2 files changed

+53
-14
lines changed

2 files changed

+53
-14
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/utils/FlinkConversions.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,7 @@
4545
import org.apache.flink.types.RowKind;
4646

4747
import java.time.Duration;
48-
import java.util.ArrayList;
49-
import java.util.Arrays;
50-
import java.util.Collection;
51-
import java.util.Collections;
52-
import java.util.HashMap;
53-
import java.util.List;
54-
import java.util.Map;
55-
import java.util.Optional;
48+
import java.util.*;
5649
import java.util.stream.Collectors;
5750

5851
import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
@@ -202,16 +195,18 @@ public static TableDescriptor toFlussTable(ResolvedCatalogTable catalogTable) {
202195
"Metadata column " + col + " is not supported.");
203196
});
204197

205-
Map<String, String> customProperties = flinkTableConf.toMap();
206-
CatalogPropertiesUtils.serializeComputedColumns(
207-
customProperties, resolvedSchema.getColumns());
198+
Map<String, String> properties = flinkTableConf.toMap();
199+
CatalogPropertiesUtils.serializeComputedColumns(properties, resolvedSchema.getColumns());
208200
CatalogPropertiesUtils.serializeWatermarkSpecs(
209-
customProperties, catalogTable.getResolvedSchema().getWatermarkSpecs());
201+
properties, catalogTable.getResolvedSchema().getWatermarkSpecs());
210202

211203
String comment = catalogTable.getComment();
212204

213205
// convert some flink options to fluss table configs.
214-
Map<String, String> properties = convertFlinkOptionsToFlussTableProperties(flinkTableConf);
206+
Map<String, String> flussTableProperties =
207+
convertFlinkOptionsToFlussTableProperties(flinkTableConf);
208+
Map<String, String> customProperties =
209+
extractCustomProperties(properties, flussTableProperties);
215210

216211
// then set distributed by information
217212
List<String> bucketKey;
@@ -240,7 +235,7 @@ public static TableDescriptor toFlussTable(ResolvedCatalogTable catalogTable) {
240235
.partitionedBy(catalogTable.getPartitionKeys())
241236
.distributedBy(bucketNum, bucketKey)
242237
.comment(comment)
243-
.properties(properties)
238+
.properties(flussTableProperties)
244239
.customProperties(customProperties)
245240
.build();
246241
}
@@ -352,4 +347,11 @@ private static void convertFlussTablePropertiesToFlinkOptions(
352347
}
353348
}
354349
}
350+
351+
private static Map<String, String> extractCustomProperties(
352+
Map<String, String> allProperties, Map<String, String> flussTableProperties) {
353+
Map<String, String> customProperties = new HashMap<>(allProperties);
354+
customProperties.keySet().removeAll(flussTableProperties.keySet());
355+
return customProperties;
356+
}
355357
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/utils/FlinkConversionsTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,43 @@ void testTableConversionWithOptions() {
254254
.containsEntry(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key(), "32s");
255255
}
256256

257+
@Test
258+
void testTableConversionForCustomProperties() {
259+
Map<String, String> options = new HashMap<>();
260+
// forward table option & enum type
261+
options.put(ConfigOptions.TABLE_LOG_FORMAT.key(), "indexed");
262+
// forward client memory option
263+
options.put(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE.key(), "64mb");
264+
// forward client duration option
265+
options.put(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key(), "32s");
266+
267+
ResolvedSchema schema =
268+
new ResolvedSchema(
269+
Collections.singletonList(
270+
Column.physical(
271+
"order_id",
272+
org.apache.flink.table.api.DataTypes.STRING().notNull())),
273+
Collections.emptyList(),
274+
null);
275+
CatalogTable flinkTable =
276+
CatalogTable.of(
277+
Schema.newBuilder().fromResolvedSchema(schema).build(),
278+
"test comment",
279+
Collections.emptyList(),
280+
options);
281+
282+
TableDescriptor flussTable =
283+
FlinkConversions.toFlussTable(new ResolvedCatalogTable(flinkTable, schema));
284+
285+
assertThat(flussTable.getProperties())
286+
.containsEntry(ConfigOptions.TABLE_LOG_FORMAT.key(), "indexed");
287+
288+
HashMap<String, String> customProperties = new HashMap<>();
289+
customProperties.put(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE.key(), "64mb");
290+
customProperties.put(ConfigOptions.CLIENT_WRITER_BATCH_TIMEOUT.key(), "32s");
291+
assertThat(flussTable.getCustomProperties()).containsExactlyEntriesOf(customProperties);
292+
}
293+
257294
@Test
258295
void testOptionConversions() {
259296
ConfigOption<?> flinkOption = FlinkConversions.toFlinkOption(ConfigOptions.TABLE_KV_FORMAT);

0 commit comments

Comments
 (0)