Skip to content

Commit cb22c69

Browse files
authored
[lake/paimon] Allow to set paimon native option by the prefix of paimon.. (#958)
1 parent 1b54f2b commit cb22c69

File tree

5 files changed

+65
-14
lines changed

5 files changed

+65
-14
lines changed

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/catalog/FlinkTableFactory.java

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.alibaba.fluss.flink.sink.FlinkTableSink;
2424
import com.alibaba.fluss.flink.source.FlinkTableSource;
2525
import com.alibaba.fluss.flink.utils.FlinkConnectorOptionsUtils;
26+
import com.alibaba.fluss.metadata.DataLakeFormat;
2627
import com.alibaba.fluss.metadata.TablePath;
2728

2829
import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -51,10 +52,13 @@
5152
import java.util.HashSet;
5253
import java.util.List;
5354
import java.util.Map;
55+
import java.util.Optional;
5456
import java.util.Set;
5557

58+
import static com.alibaba.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT;
5659
import static com.alibaba.fluss.config.FlussConfigUtils.CLIENT_PREFIX;
5760
import static com.alibaba.fluss.flink.catalog.FlinkCatalog.LAKE_TABLE_SPLITTER;
61+
import static com.alibaba.fluss.flink.utils.DataLakeUtils.getDatalakeFormat;
5862
import static com.alibaba.fluss.flink.utils.FlinkConnectorOptionsUtils.getBucketKeyIndexes;
5963
import static com.alibaba.fluss.flink.utils.FlinkConnectorOptionsUtils.getBucketKeys;
6064
import static com.alibaba.fluss.flink.utils.FlinkConversions.toFlinkOption;
@@ -76,13 +80,18 @@ public DynamicTableSource createDynamicTableSource(Context context) {
7680
}
7781

7882
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
79-
helper.validateExcept("table.", "client.");
83+
final ReadableConfig tableOptions = helper.getOptions();
84+
Optional<DataLakeFormat> datalakeFormat = getDatalakeFormat(tableOptions);
85+
if (datalakeFormat.isPresent()) {
86+
helper.validateExcept("table.", "client.", datalakeFormat.get() + ".");
87+
} else {
88+
helper.validateExcept("table.", "client.");
89+
}
8090

8191
boolean isStreamingMode =
8292
context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
8393
== RuntimeExecutionMode.STREAMING;
8494

85-
final ReadableConfig tableOptions = helper.getOptions();
8695
RowType tableOutputType = (RowType) context.getPhysicalRowDataType().getLogicalType();
8796
FlinkConnectorOptionsUtils.validateTableSourceOptions(tableOptions);
8897

@@ -140,7 +149,13 @@ public DynamicTableSource createDynamicTableSource(Context context) {
140149
@Override
141150
public DynamicTableSink createDynamicTableSink(Context context) {
142151
FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
143-
helper.validateExcept("table.", "client.");
152+
final ReadableConfig tableOptions = helper.getOptions();
153+
Optional<DataLakeFormat> datalakeFormat = getDatalakeFormat(tableOptions);
154+
if (datalakeFormat.isPresent()) {
155+
helper.validateExcept("table.", "client.", datalakeFormat.get() + ".");
156+
} else {
157+
helper.validateExcept("table.", "client.");
158+
}
144159

145160
boolean isStreamingMode =
146161
context.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
@@ -150,7 +165,6 @@ public DynamicTableSink createDynamicTableSink(Context context) {
150165
List<String> partitionKeys = resolvedCatalogTable.getPartitionKeys();
151166

152167
RowType rowType = (RowType) context.getPhysicalRowDataType().getLogicalType();
153-
final ReadableConfig tableOptions = helper.getOptions();
154168

155169
return new FlinkTableSink(
156170
toFlussTablePath(context.getObjectIdentifier()),
@@ -161,7 +175,7 @@ public DynamicTableSink createDynamicTableSink(Context context) {
161175
partitionKeys,
162176
isStreamingMode,
163177
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_MERGE_ENGINE)),
164-
tableOptions.get(toFlinkOption(ConfigOptions.TABLE_DATALAKE_FORMAT)),
178+
tableOptions.get(toFlinkOption(TABLE_DATALAKE_FORMAT)),
165179
tableOptions.get(FlinkConnectorOptions.SINK_IGNORE_DELETE),
166180
tableOptions.get(FlinkConnectorOptions.BUCKET_NUMBER),
167181
getBucketKeys(tableOptions),

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/utils/DataLakeUtils.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,13 @@
2020
import com.alibaba.fluss.config.Configuration;
2121
import com.alibaba.fluss.metadata.DataLakeFormat;
2222

23+
import org.apache.flink.configuration.ReadableConfig;
24+
2325
import java.util.Map;
26+
import java.util.Optional;
2427

28+
import static com.alibaba.fluss.config.ConfigOptions.TABLE_DATALAKE_FORMAT;
29+
import static com.alibaba.fluss.flink.utils.FlinkConversions.toFlinkOption;
2530
import static com.alibaba.fluss.utils.PropertiesUtils.extractAndRemovePrefix;
2631

2732
/** Utility class for accessing data lake related configurations. */
@@ -47,4 +52,13 @@ public static Map<String, String> extractLakeCatalogProperties(Configuration tab
4752
String dataLakePrefix = "table.datalake." + datalakeFormat + ".";
4853
return extractAndRemovePrefix(tableOptions.toMap(), dataLakePrefix);
4954
}
55+
56+
public static Optional<DataLakeFormat> getDatalakeFormat(ReadableConfig tableOptions) {
57+
Optional<DataLakeFormat> tableOptional =
58+
tableOptions.getOptional(toFlinkOption(TABLE_DATALAKE_FORMAT));
59+
if (tableOptional.isPresent()) {
60+
return tableOptional;
61+
}
62+
return Optional.empty();
63+
}
5064
}

fluss-lake/fluss-lake-paimon/src/main/java/com/alibaba/fluss/lake/paimon/PaimonLakeCatalog.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,11 @@ public class PaimonLakeCatalog implements LakeCatalog {
4747

4848
private final Catalog paimonCatalog;
4949

50+
// for fluss config
51+
private static final String FLUSS_CONF_PREFIX = "fluss.";
52+
// for paimon config
53+
private static final String PAIMON_CONF_PREFIX = "paimon.";
54+
5055
public PaimonLakeCatalog(Configuration configuration) {
5156
this.paimonCatalog =
5257
CatalogFactory.createCatalog(
@@ -164,14 +169,20 @@ private Schema toPaimonSchema(TableDescriptor tableDescriptor) {
164169
schemaBuilder.partitionKeys(tableDescriptor.getPartitionKeys());
165170

166171
// set properties to paimon schema
167-
tableDescriptor.getProperties().forEach((k, v) -> setFlussProperty(k, v, options));
168-
tableDescriptor.getCustomProperties().forEach((k, v) -> setFlussProperty(k, v, options));
172+
tableDescriptor.getProperties().forEach((k, v) -> setFlussPropertyToPaimon(k, v, options));
173+
tableDescriptor
174+
.getCustomProperties()
175+
.forEach((k, v) -> setFlussPropertyToPaimon(k, v, options));
169176
schemaBuilder.options(options.toMap());
170177
return schemaBuilder.build();
171178
}
172179

173-
private void setFlussProperty(String key, String value, Options options) {
174-
options.set("fluss." + key, value);
180+
private void setFlussPropertyToPaimon(String key, String value, Options options) {
181+
if (key.startsWith(PAIMON_CONF_PREFIX)) {
182+
options.set(key.substring(PAIMON_CONF_PREFIX.length()), value);
183+
} else {
184+
options.set(FLUSS_CONF_PREFIX + key, value);
185+
}
175186
}
176187

177188
@Override

fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/LakeEnabledTableCreateITCase.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import java.util.Arrays;
5050
import java.util.HashMap;
5151
import java.util.Map;
52+
import java.util.stream.Stream;
5253

5354
import static com.alibaba.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
5455
import static com.alibaba.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
@@ -119,7 +120,7 @@ private static Configuration initConfig() {
119120
void testCreateLakeEnabledTable() throws Exception {
120121
Map<String, String> customProperties = new HashMap<>();
121122
customProperties.put("k1", "v1");
122-
customProperties.put("k2", "v2");
123+
customProperties.put("paimon.file.format", "parquet");
123124

124125
// test bucket key log table
125126
TableDescriptor logTable =
@@ -160,7 +161,6 @@ void testCreateLakeEnabledTable() throws Exception {
160161
"log_c1,log_c2",
161162
BUCKET_NUM);
162163

163-
// test log no bucket key table
164164
TableDescriptor logNoBucketKeyTable =
165165
TableDescriptor.builder()
166166
.schema(
@@ -393,8 +393,20 @@ private void verifyPaimonTable(
393393

394394
// check table properties
395395
Map<String, String> expectedProperties = new HashMap<>();
396-
flussTable.getProperties().forEach((k, v) -> expectedProperties.put("fluss." + k, v));
397-
flussTable.getCustomProperties().forEach((k, v) -> expectedProperties.put("fluss." + k, v));
396+
397+
Stream.concat(
398+
flussTable.getProperties().entrySet().stream(),
399+
flussTable.getCustomProperties().entrySet().stream())
400+
.forEach(
401+
e -> {
402+
String k = e.getKey();
403+
String v = e.getValue();
404+
if (k.startsWith("paimon.")) {
405+
expectedProperties.put(k.substring("paimon.".length()), v);
406+
} else {
407+
expectedProperties.put("fluss." + k, v);
408+
}
409+
});
398410
assertThat(paimonTable.options()).containsAllEntriesOf(expectedProperties);
399411

400412
// now, check schema

fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/LakeTableManagerITCase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ void testCreateAndGetTable() throws Exception {
6060
TableDescriptor tableDescriptor =
6161
TableDescriptor.builder()
6262
.schema(Schema.newBuilder().column("f1", DataTypes.INT()).build())
63-
.property("table.datalake.enabled", "true")
63+
.property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
6464
.build();
6565

6666
TablePath tablePath = TablePath.of("fluss", "test_lake_table");

0 commit comments

Comments
 (0)