Skip to content

Commit 31ba0e7

Browse files
liunaijiejia zhang
authored and
jia zhang
committed
[improve] paimon options (apache#9167)
1 parent 81848d8 commit 31ba0e7

File tree

15 files changed

+278
-175
lines changed

15 files changed

+278
-175
lines changed

seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java

-2
Original file line numberDiff line numberDiff line change
@@ -196,11 +196,9 @@ private Set<String> buildWhiteList() {
196196
whiteList.add("SlsSinkOptions");
197197
whiteList.add("Neo4jSinkOptions");
198198
whiteList.add("MaxcomputeSinkOptions");
199-
whiteList.add("PaimonSinkOptions");
200199
whiteList.add("TDengineSourceOptions");
201200
whiteList.add("PulsarSourceOptions");
202201
whiteList.add("MongodbSinkOptions");
203-
whiteList.add("PaimonSourceOptions");
204202
whiteList.add("SlsSourceOptions");
205203
whiteList.add("SentrySinkOptions");
206204
whiteList.add("QdrantSinkOptions");

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/catalog/PaimonCatalogFactory.java

+18-13
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@
2222
import org.apache.seatunnel.api.table.catalog.Catalog;
2323
import org.apache.seatunnel.api.table.factory.CatalogFactory;
2424
import org.apache.seatunnel.api.table.factory.Factory;
25-
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonConfig;
26-
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkConfig;
25+
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonBaseOptions;
26+
import org.apache.seatunnel.connectors.seatunnel.paimon.config.PaimonSinkOptions;
2727

2828
import com.google.auto.service.AutoService;
2929

@@ -42,19 +42,24 @@ public String factoryIdentifier() {
4242
@Override
4343
public OptionRule optionRule() {
4444
return OptionRule.builder()
45-
.required(PaimonConfig.WAREHOUSE, PaimonConfig.DATABASE, PaimonConfig.TABLE)
45+
.required(
46+
PaimonBaseOptions.WAREHOUSE,
47+
PaimonBaseOptions.DATABASE,
48+
PaimonBaseOptions.TABLE)
4649
.optional(
47-
PaimonConfig.HDFS_SITE_PATH,
48-
PaimonConfig.HADOOP_CONF,
49-
PaimonConfig.HADOOP_CONF_PATH,
50-
PaimonConfig.CATALOG_TYPE,
51-
PaimonSinkConfig.SCHEMA_SAVE_MODE,
52-
PaimonSinkConfig.DATA_SAVE_MODE,
53-
PaimonSinkConfig.PRIMARY_KEYS,
54-
PaimonSinkConfig.PARTITION_KEYS,
55-
PaimonSinkConfig.WRITE_PROPS)
50+
PaimonBaseOptions.HDFS_SITE_PATH,
51+
PaimonBaseOptions.HADOOP_CONF,
52+
PaimonBaseOptions.HADOOP_CONF_PATH,
53+
PaimonBaseOptions.CATALOG_TYPE,
54+
PaimonSinkOptions.SCHEMA_SAVE_MODE,
55+
PaimonSinkOptions.DATA_SAVE_MODE,
56+
PaimonSinkOptions.PRIMARY_KEYS,
57+
PaimonSinkOptions.PARTITION_KEYS,
58+
PaimonSinkOptions.WRITE_PROPS)
5659
.conditional(
57-
PaimonConfig.CATALOG_TYPE, PaimonCatalogEnum.HIVE, PaimonConfig.CATALOG_URI)
60+
PaimonBaseOptions.CATALOG_TYPE,
61+
PaimonCatalogEnum.HIVE,
62+
PaimonBaseOptions.CATALOG_URI)
5863
.build();
5964
}
6065
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.paimon.config;
19+
20+
import org.apache.seatunnel.api.configuration.Option;
21+
import org.apache.seatunnel.api.configuration.Options;
22+
import org.apache.seatunnel.connectors.seatunnel.paimon.catalog.PaimonCatalogEnum;
23+
24+
import java.util.HashMap;
25+
import java.util.Map;
26+
27+
public class PaimonBaseOptions {
28+
29+
public static final String CONNECTOR_IDENTITY = "Paimon";
30+
31+
public static final Option<String> WAREHOUSE =
32+
Options.key("warehouse")
33+
.stringType()
34+
.noDefaultValue()
35+
.withDescription("The warehouse path of paimon");
36+
37+
public static final Option<PaimonCatalogEnum> CATALOG_TYPE =
38+
Options.key("catalog_type")
39+
.enumType(PaimonCatalogEnum.class)
40+
.defaultValue(PaimonCatalogEnum.FILESYSTEM)
41+
.withDescription("The type of paimon catalog");
42+
43+
public static final Option<String> CATALOG_URI =
44+
Options.key("catalog_uri")
45+
.stringType()
46+
.noDefaultValue()
47+
.withDescription("The uri of paimon with hive catalog");
48+
49+
public static final Option<String> CATALOG_NAME =
50+
Options.key("catalog_name")
51+
.stringType()
52+
.defaultValue("paimon")
53+
.withDescription(" the paimon catalog name");
54+
55+
public static final Option<String> DATABASE =
56+
Options.key("database")
57+
.stringType()
58+
.noDefaultValue()
59+
.withDescription("The database you intend to access");
60+
61+
public static final Option<String> TABLE =
62+
Options.key("table")
63+
.stringType()
64+
.noDefaultValue()
65+
.withDescription("The table you intend to access");
66+
67+
@Deprecated
68+
public static final Option<String> HDFS_SITE_PATH =
69+
Options.key("hdfs_site_path")
70+
.stringType()
71+
.noDefaultValue()
72+
.withDescription("The file path of hdfs-site.xml");
73+
74+
public static final Option<Map<String, String>> HADOOP_CONF =
75+
Options.key("paimon.hadoop.conf")
76+
.mapType()
77+
.defaultValue(new HashMap<>())
78+
.withDescription("Properties in hadoop conf");
79+
80+
public static final Option<String> HADOOP_CONF_PATH =
81+
Options.key("paimon.hadoop.conf-path")
82+
.stringType()
83+
.noDefaultValue()
84+
.withDescription(
85+
"The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files");
86+
}

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonConfig.java

+21-70
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
2121
import org.apache.seatunnel.shade.com.google.common.collect.ImmutableList;
2222

23-
import org.apache.seatunnel.api.configuration.Option;
24-
import org.apache.seatunnel.api.configuration.Options;
2523
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2624
import org.apache.seatunnel.api.sink.SeaTunnelSink;
2725
import org.apache.seatunnel.api.source.SeaTunnelSource;
@@ -35,7 +33,6 @@
3533

3634
import java.io.Serializable;
3735
import java.util.Arrays;
38-
import java.util.HashMap;
3936
import java.util.List;
4037
import java.util.Map;
4138

@@ -48,64 +45,6 @@
4845
@Getter
4946
public class PaimonConfig implements Serializable {
5047

51-
public static final String CONNECTOR_IDENTITY = "Paimon";
52-
53-
public static final Option<String> WAREHOUSE =
54-
Options.key("warehouse")
55-
.stringType()
56-
.noDefaultValue()
57-
.withDescription("The warehouse path of paimon");
58-
59-
public static final Option<PaimonCatalogEnum> CATALOG_TYPE =
60-
Options.key("catalog_type")
61-
.enumType(PaimonCatalogEnum.class)
62-
.defaultValue(PaimonCatalogEnum.FILESYSTEM)
63-
.withDescription("The type of paimon catalog");
64-
65-
public static final Option<String> CATALOG_URI =
66-
Options.key("catalog_uri")
67-
.stringType()
68-
.noDefaultValue()
69-
.withDescription("The uri of paimon with hive catalog");
70-
71-
public static final Option<String> CATALOG_NAME =
72-
Options.key("catalog_name")
73-
.stringType()
74-
.defaultValue("paimon")
75-
.withDescription(" the paimon catalog name");
76-
77-
public static final Option<String> DATABASE =
78-
Options.key("database")
79-
.stringType()
80-
.noDefaultValue()
81-
.withDescription("The database you intend to access");
82-
83-
public static final Option<String> TABLE =
84-
Options.key("table")
85-
.stringType()
86-
.noDefaultValue()
87-
.withDescription("The table you intend to access");
88-
89-
@Deprecated
90-
public static final Option<String> HDFS_SITE_PATH =
91-
Options.key("hdfs_site_path")
92-
.stringType()
93-
.noDefaultValue()
94-
.withDescription("The file path of hdfs-site.xml");
95-
96-
public static final Option<Map<String, String>> HADOOP_CONF =
97-
Options.key("paimon.hadoop.conf")
98-
.mapType()
99-
.defaultValue(new HashMap<>())
100-
.withDescription("Properties in hadoop conf");
101-
102-
public static final Option<String> HADOOP_CONF_PATH =
103-
Options.key("paimon.hadoop.conf-path")
104-
.stringType()
105-
.noDefaultValue()
106-
.withDescription(
107-
"The specified loading path for the 'core-site.xml', 'hdfs-site.xml', 'hive-site.xml' files");
108-
10948
protected String catalogName;
11049
protected PaimonCatalogEnum catalogType;
11150
protected String catalogUri;
@@ -118,17 +57,29 @@ public class PaimonConfig implements Serializable {
11857

11958
public PaimonConfig(ReadonlyConfig readonlyConfig) {
12059
this.catalogName =
121-
checkArgumentNotBlank(readonlyConfig.get(CATALOG_NAME), CATALOG_NAME.key());
122-
this.warehouse = checkArgumentNotBlank(readonlyConfig.get(WAREHOUSE), WAREHOUSE.key());
123-
this.namespace = checkArgumentNotBlank(readonlyConfig.get(DATABASE), DATABASE.key());
124-
this.table = checkArgumentNotBlank(readonlyConfig.get(TABLE), TABLE.key());
125-
this.hdfsSitePath = readonlyConfig.get(HDFS_SITE_PATH);
126-
this.hadoopConfProps = readonlyConfig.get(HADOOP_CONF);
127-
this.hadoopConfPath = readonlyConfig.get(HADOOP_CONF_PATH);
128-
this.catalogType = readonlyConfig.get(CATALOG_TYPE);
60+
checkArgumentNotBlank(
61+
readonlyConfig.get(PaimonBaseOptions.CATALOG_NAME),
62+
PaimonBaseOptions.CATALOG_NAME.key());
63+
this.warehouse =
64+
checkArgumentNotBlank(
65+
readonlyConfig.get(PaimonBaseOptions.WAREHOUSE),
66+
PaimonBaseOptions.WAREHOUSE.key());
67+
this.namespace =
68+
checkArgumentNotBlank(
69+
readonlyConfig.get(PaimonBaseOptions.DATABASE),
70+
PaimonBaseOptions.DATABASE.key());
71+
this.table =
72+
checkArgumentNotBlank(
73+
readonlyConfig.get(PaimonBaseOptions.TABLE), PaimonBaseOptions.TABLE.key());
74+
this.hdfsSitePath = readonlyConfig.get(PaimonBaseOptions.HDFS_SITE_PATH);
75+
this.hadoopConfProps = readonlyConfig.get(PaimonBaseOptions.HADOOP_CONF);
76+
this.hadoopConfPath = readonlyConfig.get(PaimonBaseOptions.HADOOP_CONF_PATH);
77+
this.catalogType = readonlyConfig.get(PaimonBaseOptions.CATALOG_TYPE);
12978
if (PaimonCatalogEnum.HIVE.getType().equals(catalogType.getType())) {
13079
this.catalogUri =
131-
checkArgumentNotBlank(readonlyConfig.get(CATALOG_URI), CATALOG_URI.key());
80+
checkArgumentNotBlank(
81+
readonlyConfig.get(PaimonBaseOptions.CATALOG_URI),
82+
PaimonBaseOptions.CATALOG_URI.key());
13283
}
13384
}
13485

seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/config/PaimonSinkConfig.java

+8-43
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.paimon.config;
1919

20-
import org.apache.seatunnel.api.configuration.Option;
21-
import org.apache.seatunnel.api.configuration.Options;
2220
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2321
import org.apache.seatunnel.api.sink.DataSaveMode;
2422
import org.apache.seatunnel.api.sink.SchemaSaveMode;
@@ -28,7 +26,6 @@
2826
import lombok.Getter;
2927
import lombok.extern.slf4j.Slf4j;
3028

31-
import java.util.HashMap;
3229
import java.util.List;
3330
import java.util.Map;
3431
import java.util.stream.Stream;
@@ -37,40 +34,6 @@
3734
@Slf4j
3835
public class PaimonSinkConfig extends PaimonConfig {
3936

40-
public static final String CHANGELOG_TMP_PATH = "changelog-tmp-path";
41-
42-
public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
43-
Options.key("schema_save_mode")
44-
.enumType(SchemaSaveMode.class)
45-
.defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
46-
.withDescription("schema_save_mode");
47-
48-
public static final Option<DataSaveMode> DATA_SAVE_MODE =
49-
Options.key("data_save_mode")
50-
.enumType(DataSaveMode.class)
51-
.defaultValue(DataSaveMode.APPEND_DATA)
52-
.withDescription("data_save_mode");
53-
public static final Option<String> PRIMARY_KEYS =
54-
Options.key("paimon.table.primary-keys")
55-
.stringType()
56-
.noDefaultValue()
57-
.withDescription(
58-
"Default comma-separated list of columns that identify a row in tables (primary key)");
59-
60-
public static final Option<String> PARTITION_KEYS =
61-
Options.key("paimon.table.partition-keys")
62-
.stringType()
63-
.noDefaultValue()
64-
.withDescription(
65-
"Default comma-separated list of partition fields to use when creating tables.");
66-
67-
public static final Option<Map<String, String>> WRITE_PROPS =
68-
Options.key("paimon.table.write-props")
69-
.mapType()
70-
.defaultValue(new HashMap<>())
71-
.withDescription(
72-
"Properties passed through to paimon table initialization, such as 'file.format', 'bucket'(org.apache.paimon.CoreOptions)");
73-
7437
private final SchemaSaveMode schemaSaveMode;
7538
private final DataSaveMode dataSaveMode;
7639
private final CoreOptions.ChangelogProducer changelogProducer;
@@ -81,11 +44,12 @@ public class PaimonSinkConfig extends PaimonConfig {
8144

8245
public PaimonSinkConfig(ReadonlyConfig readonlyConfig) {
8346
super(readonlyConfig);
84-
this.schemaSaveMode = readonlyConfig.get(SCHEMA_SAVE_MODE);
85-
this.dataSaveMode = readonlyConfig.get(DATA_SAVE_MODE);
86-
this.primaryKeys = stringToList(readonlyConfig.get(PRIMARY_KEYS), ",");
87-
this.partitionKeys = stringToList(readonlyConfig.get(PARTITION_KEYS), ",");
88-
this.writeProps = readonlyConfig.get(WRITE_PROPS);
47+
this.schemaSaveMode = readonlyConfig.get(PaimonSinkOptions.SCHEMA_SAVE_MODE);
48+
this.dataSaveMode = readonlyConfig.get(PaimonSinkOptions.DATA_SAVE_MODE);
49+
this.primaryKeys = stringToList(readonlyConfig.get(PaimonSinkOptions.PRIMARY_KEYS), ",");
50+
this.partitionKeys =
51+
stringToList(readonlyConfig.get(PaimonSinkOptions.PARTITION_KEYS), ",");
52+
this.writeProps = readonlyConfig.get(PaimonSinkOptions.WRITE_PROPS);
8953
this.changelogProducer =
9054
Stream.of(CoreOptions.ChangelogProducer.values())
9155
.filter(
@@ -99,7 +63,8 @@ public PaimonSinkConfig(ReadonlyConfig readonlyConfig) {
9963
.findFirst()
10064
.orElse(null);
10165
this.changelogTmpPath =
102-
writeProps.getOrDefault(CHANGELOG_TMP_PATH, System.getProperty("java.io.tmpdir"));
66+
writeProps.getOrDefault(
67+
PaimonSinkOptions.CHANGELOG_TMP_PATH, System.getProperty("java.io.tmpdir"));
10368
checkConfig();
10469
}
10570

0 commit comments

Comments
 (0)