Skip to content

Commit b24d0e7

Browse files
authored
[improve] hudi options (#8952)
1 parent d58ae36 commit b24d0e7

File tree

8 files changed

+115
-148
lines changed

8 files changed

+115
-148
lines changed

Diff for: seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java

-1
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,6 @@ private Set<String> buildWhiteList() {
173173
whiteList.add("TypesenseSourceOptions");
174174
whiteList.add("TypesenseSinkOptions");
175175
whiteList.add("EmailSinkOptions");
176-
whiteList.add("HudiSinkOptions");
177176
whiteList.add("PulsarSinkOptions");
178177
whiteList.add("HttpSinkOptions");
179178
whiteList.add("SlsSinkOptions");

Diff for: seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalog.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@
5454
import java.util.stream.Collectors;
5555

5656
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;
57-
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.CDC_ENABLED;
58-
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.RECORD_KEY_FIELDS;
59-
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.TABLE_TYPE;
57+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.CDC_ENABLED;
58+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.RECORD_KEY_FIELDS;
59+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.TABLE_TYPE;
6060
import static org.apache.seatunnel.connectors.seatunnel.hudi.sink.convert.AvroSchemaConverter.convertToSchema;
6161
import static org.apache.seatunnel.connectors.seatunnel.hudi.util.HudiCatalogUtil.inferTablePath;
6262
import static org.apache.seatunnel.connectors.seatunnel.hudi.util.SchemaUtil.convertSeaTunnelType;

Diff for: seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/catalog/HudiCatalogFactory.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,21 @@
2222
import org.apache.seatunnel.api.table.catalog.Catalog;
2323
import org.apache.seatunnel.api.table.factory.CatalogFactory;
2424
import org.apache.seatunnel.api.table.factory.Factory;
25+
import org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions;
2526

2627
import org.apache.hadoop.conf.Configuration;
2728

2829
import com.google.auto.service.AutoService;
2930

30-
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.CONF_FILES_PATH;
31-
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiOptions.TABLE_DFS_PATH;
3231
import static org.apache.seatunnel.connectors.seatunnel.hudi.util.HudiUtil.getConfiguration;
3332

3433
@AutoService(Factory.class)
3534
public class HudiCatalogFactory implements CatalogFactory {
3635

3736
@Override
3837
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
39-
Configuration hadoopConf = getConfiguration(options.get(CONF_FILES_PATH));
40-
String tableDfsPath = options.get(TABLE_DFS_PATH);
38+
Configuration hadoopConf = getConfiguration(options.get(HudiSinkOptions.CONF_FILES_PATH));
39+
String tableDfsPath = options.get(HudiSinkOptions.TABLE_DFS_PATH);
4140
return new HudiCatalog(catalogName, hadoopConf, tableDfsPath);
4241
}
4342

@@ -48,6 +47,9 @@ public String factoryIdentifier() {
4847

4948
@Override
5049
public OptionRule optionRule() {
51-
return OptionRule.builder().required(TABLE_DFS_PATH).optional(CONF_FILES_PATH).build();
50+
return OptionRule.builder()
51+
.required(HudiSinkOptions.TABLE_DFS_PATH)
52+
.optional(HudiSinkOptions.CONF_FILES_PATH)
53+
.build();
5254
}
5355
}

Diff for: seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiOptions.java

-58
This file was deleted.

Diff for: seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkConfig.java

+6-6
Original file line numberDiff line numberDiff line change
@@ -47,18 +47,18 @@ public class HudiSinkConfig implements Serializable {
4747
public static HudiSinkConfig of(ReadonlyConfig config) {
4848
Builder builder = HudiSinkConfig.builder();
4949
Optional<SchemaSaveMode> optionalSchemaSaveMode =
50-
config.getOptional(HudiOptions.SCHEMA_SAVE_MODE);
50+
config.getOptional(HudiSinkOptions.SCHEMA_SAVE_MODE);
5151
Optional<DataSaveMode> optionalDataSaveMode =
52-
config.getOptional(HudiOptions.DATA_SAVE_MODE);
52+
config.getOptional(HudiSinkOptions.DATA_SAVE_MODE);
5353

54-
builder.tableDfsPath(config.get(HudiOptions.TABLE_DFS_PATH));
55-
builder.confFilesPath(config.get(HudiOptions.CONF_FILES_PATH));
54+
builder.tableDfsPath(config.get(HudiSinkOptions.TABLE_DFS_PATH));
55+
builder.confFilesPath(config.get(HudiSinkOptions.CONF_FILES_PATH));
5656
builder.tableList(HudiTableConfig.of(config));
5757

5858
builder.schemaSaveMode(
59-
optionalSchemaSaveMode.orElseGet(HudiOptions.SCHEMA_SAVE_MODE::defaultValue));
59+
optionalSchemaSaveMode.orElseGet(HudiSinkOptions.SCHEMA_SAVE_MODE::defaultValue));
6060
builder.dataSaveMode(
61-
optionalDataSaveMode.orElseGet(HudiOptions.DATA_SAVE_MODE::defaultValue));
61+
optionalDataSaveMode.orElseGet(HudiSinkOptions.DATA_SAVE_MODE::defaultValue));
6262
return builder.build();
6363
}
6464
}

Diff for: seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableOptions.java renamed to seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiSinkOptions.java

+51-17
Original file line numberDiff line numberDiff line change
@@ -21,107 +21,141 @@
2121

2222
import org.apache.seatunnel.api.configuration.Option;
2323
import org.apache.seatunnel.api.configuration.Options;
24+
import org.apache.seatunnel.api.sink.DataSaveMode;
25+
import org.apache.seatunnel.api.sink.SchemaSaveMode;
2426

2527
import org.apache.hudi.common.model.HoodieTableType;
2628
import org.apache.hudi.common.model.WriteOperationType;
2729
import org.apache.hudi.index.HoodieIndex;
2830

29-
public interface HudiTableOptions {
31+
import java.util.List;
3032

31-
Option<String> TABLE_NAME =
33+
public class HudiSinkOptions {
34+
35+
public static Option<String> TABLE_DFS_PATH =
36+
Options.key("table_dfs_path")
37+
.stringType()
38+
.noDefaultValue()
39+
.withDescription("the dfs path of hudi table");
40+
41+
public static Option<String> CONF_FILES_PATH =
42+
Options.key("conf_files_path")
43+
.stringType()
44+
.noDefaultValue()
45+
.withDescription("hudi conf files");
46+
47+
public static Option<List<HudiTableConfig>> TABLE_LIST =
48+
Options.key("table_list")
49+
.listType(HudiTableConfig.class)
50+
.noDefaultValue()
51+
.withDescription("table_list");
52+
53+
public static Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
54+
Options.key("schema_save_mode")
55+
.enumType(SchemaSaveMode.class)
56+
.defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
57+
.withDescription("schema save mode");
58+
59+
public static Option<DataSaveMode> DATA_SAVE_MODE =
60+
Options.key("data_save_mode")
61+
.enumType(DataSaveMode.class)
62+
.defaultValue(DataSaveMode.APPEND_DATA)
63+
.withDescription("data save mode");
64+
65+
public static Option<String> TABLE_NAME =
3266
Options.key("table_name")
3367
.stringType()
3468
.noDefaultValue()
3569
.withDescription("hudi table name");
3670

37-
Option<String> DATABASE =
71+
public static Option<String> DATABASE =
3872
Options.key("database")
3973
.stringType()
4074
.defaultValue("default")
4175
.withDescription("hudi database name");
4276

43-
Option<HoodieTableType> TABLE_TYPE =
77+
public static Option<HoodieTableType> TABLE_TYPE =
4478
Options.key("table_type")
4579
.type(new TypeReference<HoodieTableType>() {})
4680
.defaultValue(HoodieTableType.COPY_ON_WRITE)
4781
.withDescription("hudi table type");
4882

49-
Option<Boolean> CDC_ENABLED =
83+
public static Option<Boolean> CDC_ENABLED =
5084
Options.key("cdc_enabled")
5185
.booleanType()
5286
.defaultValue(false)
5387
.withDescription(
5488
"When enable, persist the change data if necessary, and can be queried as a CDC query mode.");
5589

56-
Option<String> RECORD_KEY_FIELDS =
90+
public static Option<String> RECORD_KEY_FIELDS =
5791
Options.key("record_key_fields")
5892
.stringType()
5993
.noDefaultValue()
6094
.withDescription("the record key fields of hudi table");
6195

62-
Option<String> PARTITION_FIELDS =
96+
public static Option<String> PARTITION_FIELDS =
6397
Options.key("partition_fields")
6498
.stringType()
6599
.noDefaultValue()
66100
.withDescription("the partition fields of hudi table");
67101

68-
Option<HoodieIndex.IndexType> INDEX_TYPE =
102+
public static Option<HoodieIndex.IndexType> INDEX_TYPE =
69103
Options.key("index_type")
70104
.type(new TypeReference<HoodieIndex.IndexType>() {})
71105
.defaultValue(HoodieIndex.IndexType.BLOOM)
72106
.withDescription(
73107
"the index type of hudi table, currently supported: [BLOOM, SIMPLE, GLOBAL_BLOOM]");
74108

75-
Option<String> INDEX_CLASS_NAME =
109+
public static Option<String> INDEX_CLASS_NAME =
76110
Options.key("index_class_name")
77111
.stringType()
78112
.noDefaultValue()
79113
.withDescription(
80114
"customized hudi index type, the index classpath is configured here");
81115

82-
Option<Integer> RECORD_BYTE_SIZE =
116+
public static Option<Integer> RECORD_BYTE_SIZE =
83117
Options.key("record_byte_size")
84118
.intType()
85119
.defaultValue(1024)
86120
.withDescription("The byte size of each record");
87121

88-
Option<WriteOperationType> OP_TYPE =
122+
public static Option<WriteOperationType> OP_TYPE =
89123
Options.key("op_type")
90124
.type(new TypeReference<WriteOperationType>() {})
91125
.defaultValue(WriteOperationType.INSERT)
92126
.withDescription("op_type");
93127

94-
Option<Integer> BATCH_SIZE =
128+
public static Option<Integer> BATCH_SIZE =
95129
Options.key("batch_size")
96130
.intType()
97131
.defaultValue(1000)
98132
.withDescription("the size of each insert batch");
99133

100-
Option<Integer> BATCH_INTERVAL_MS =
134+
public static Option<Integer> BATCH_INTERVAL_MS =
101135
Options.key("batch_interval_ms")
102136
.intType()
103137
.defaultValue(1000)
104138
.withDescription("batch interval milliSecond");
105139

106-
Option<Integer> INSERT_SHUFFLE_PARALLELISM =
140+
public static Option<Integer> INSERT_SHUFFLE_PARALLELISM =
107141
Options.key("insert_shuffle_parallelism")
108142
.intType()
109143
.defaultValue(2)
110144
.withDescription("insert_shuffle_parallelism");
111145

112-
Option<Integer> UPSERT_SHUFFLE_PARALLELISM =
146+
public static Option<Integer> UPSERT_SHUFFLE_PARALLELISM =
113147
Options.key("upsert_shuffle_parallelism")
114148
.intType()
115149
.defaultValue(2)
116150
.withDescription("upsert_shuffle_parallelism");
117151

118-
Option<Integer> MIN_COMMITS_TO_KEEP =
152+
public static Option<Integer> MIN_COMMITS_TO_KEEP =
119153
Options.key("min_commits_to_keep")
120154
.intType()
121155
.defaultValue(20)
122156
.withDescription("hoodie.keep.min.commits");
123157

124-
Option<Integer> MAX_COMMITS_TO_KEEP =
158+
public static Option<Integer> MAX_COMMITS_TO_KEEP =
125159
Options.key("max_commits_to_keep")
126160
.intType()
127161
.defaultValue(30)

Diff for: seatunnel-connectors-v2/connector-hudi/src/main/java/org/apache/seatunnel/connectors/seatunnel/hudi/config/HudiTableConfig.java

+18-18
Original file line numberDiff line numberDiff line change
@@ -38,22 +38,22 @@
3838
import java.util.Set;
3939
import java.util.stream.Collectors;
4040

41-
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.BATCH_INTERVAL_MS;
42-
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.BATCH_SIZE;
43-
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.CDC_ENABLED;
44-
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.DATABASE;
45-
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.INDEX_CLASS_NAME;
46-
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.INDEX_TYPE;
47-
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.INSERT_SHUFFLE_PARALLELISM;
48-
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.MAX_COMMITS_TO_KEEP;
49-
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.MIN_COMMITS_TO_KEEP;
50-
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.OP_TYPE;
51-
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.PARTITION_FIELDS;
52-
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.RECORD_BYTE_SIZE;
53-
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.RECORD_KEY_FIELDS;
54-
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.TABLE_NAME;
55-
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.TABLE_TYPE;
56-
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiTableOptions.UPSERT_SHUFFLE_PARALLELISM;
41+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.BATCH_INTERVAL_MS;
42+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.BATCH_SIZE;
43+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.CDC_ENABLED;
44+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.DATABASE;
45+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.INDEX_CLASS_NAME;
46+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.INDEX_TYPE;
47+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.INSERT_SHUFFLE_PARALLELISM;
48+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.MAX_COMMITS_TO_KEEP;
49+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.MIN_COMMITS_TO_KEEP;
50+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.OP_TYPE;
51+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.PARTITION_FIELDS;
52+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.RECORD_BYTE_SIZE;
53+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.RECORD_KEY_FIELDS;
54+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.TABLE_NAME;
55+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.TABLE_TYPE;
56+
import static org.apache.seatunnel.connectors.seatunnel.hudi.config.HudiSinkOptions.UPSERT_SHUFFLE_PARALLELISM;
5757

5858
@Data
5959
@Builder
@@ -114,8 +114,8 @@ public HudiTableConfig() {}
114114

115115
public static List<HudiTableConfig> of(ReadonlyConfig connectorConfig) {
116116
List<HudiTableConfig> tableList;
117-
if (connectorConfig.getOptional(HudiOptions.TABLE_LIST).isPresent()) {
118-
tableList = connectorConfig.get(HudiOptions.TABLE_LIST);
117+
if (connectorConfig.getOptional(HudiSinkOptions.TABLE_LIST).isPresent()) {
118+
tableList = connectorConfig.get(HudiSinkOptions.TABLE_LIST);
119119
} else {
120120
HudiTableConfig hudiTableConfig =
121121
HudiTableConfig.builder()

0 commit comments

Comments
 (0)