Skip to content

Commit b6a702b

Browse files
authored
[Improve] hbase options (#8923)
1 parent 641cc72 commit b6a702b

File tree

10 files changed

+142
-141
lines changed

10 files changed

+142
-141
lines changed

Diff for: seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java

-2
Original file line numberDiff line numberDiff line change
@@ -183,12 +183,10 @@ private Set<String> buildWhiteList() {
183183
whiteList.add("TDengineSourceOptions");
184184
whiteList.add("PulsarSourceOptions");
185185
whiteList.add("FakeSourceOptions");
186-
whiteList.add("HbaseSinkOptions");
187186
whiteList.add("MongodbSinkOptions");
188187
whiteList.add("IoTDBSinkOptions");
189188
whiteList.add("EasysearchSourceOptions");
190189
whiteList.add("IcebergSourceOptions");
191-
whiteList.add("HbaseSourceOptions");
192190
whiteList.add("PaimonSourceOptions");
193191
whiteList.add("IoTDBSourceOptions");
194192
whiteList.add("SlsSourceOptions");
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.hbase.config;
19+
20+
import org.apache.seatunnel.api.configuration.Option;
21+
import org.apache.seatunnel.api.configuration.Options;
22+
23+
import java.util.List;
24+
25+
public class HbaseBaseOptions {
26+
27+
public static final Option<String> ZOOKEEPER_QUORUM =
28+
Options.key("zookeeper_quorum")
29+
.stringType()
30+
.noDefaultValue()
31+
.withDescription("Hbase zookeeper quorum");
32+
33+
public static final Option<String> TABLE =
34+
Options.key("table").stringType().noDefaultValue().withDescription("Hbase table name");
35+
36+
public static final Option<List<String>> ROWKEY_COLUMNS =
37+
Options.key("rowkey_column")
38+
.listType()
39+
.noDefaultValue()
40+
.withDescription("Hbase rowkey column");
41+
}

Diff for: seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseParameters.java

+40-55
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.hbase.config;
1919

20-
import org.apache.seatunnel.shade.com.typesafe.config.Config;
21-
2220
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
23-
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
2421

2522
import lombok.Builder;
2623
import lombok.Getter;
@@ -29,22 +26,6 @@
2926
import java.util.List;
3027
import java.util.Map;
3128

32-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING;
33-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME;
34-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_BATCH_CONFIG;
35-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_CACHE_BLOCKS_CONFIG;
36-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_CACHING_CONFIG;
37-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG;
38-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_TTL_CONFIG;
39-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE;
40-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS;
41-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_DELIMITER;
42-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE;
43-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.VERSION_COLUMN;
44-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WAL_WRITE;
45-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WRITE_BUFFER_SIZE;
46-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM;
47-
4829
@Builder
4930
@Getter
5031
public class HbaseParameters implements Serializable {
@@ -65,27 +46,32 @@ public class HbaseParameters implements Serializable {
6546

6647
private Map<String, String> hbaseExtraConfig;
6748

68-
@Builder.Default private int caching = HBASE_CACHING_CONFIG.defaultValue();
49+
@Builder.Default private int caching = HbaseSinkOptions.HBASE_CACHING_CONFIG.defaultValue();
6950

70-
@Builder.Default private int batch = HBASE_BATCH_CONFIG.defaultValue();
51+
@Builder.Default private int batch = HbaseSinkOptions.HBASE_BATCH_CONFIG.defaultValue();
7152

72-
@Builder.Default private Long ttl = HBASE_TTL_CONFIG.defaultValue();
53+
@Builder.Default private Long ttl = HbaseSinkOptions.HBASE_TTL_CONFIG.defaultValue();
7354

74-
@Builder.Default private boolean cacheBlocks = HBASE_CACHE_BLOCKS_CONFIG.defaultValue();
55+
@Builder.Default
56+
private boolean cacheBlocks = HbaseSinkOptions.HBASE_CACHE_BLOCKS_CONFIG.defaultValue();
7557

76-
@Builder.Default private String rowkeyDelimiter = ROWKEY_DELIMITER.defaultValue();
58+
@Builder.Default
59+
private String rowkeyDelimiter = HbaseSinkOptions.ROWKEY_DELIMITER.defaultValue();
7760

78-
@Builder.Default private HbaseConfig.NullMode nullMode = NULL_MODE.defaultValue();
61+
@Builder.Default
62+
private HbaseSinkOptions.NullMode nullMode = HbaseSinkOptions.NULL_MODE.defaultValue();
7963

80-
@Builder.Default private boolean walWrite = WAL_WRITE.defaultValue();
64+
@Builder.Default private boolean walWrite = HbaseSinkOptions.WAL_WRITE.defaultValue();
8165

82-
@Builder.Default private int writeBufferSize = WRITE_BUFFER_SIZE.defaultValue();
66+
@Builder.Default
67+
private int writeBufferSize = HbaseSinkOptions.WRITE_BUFFER_SIZE.defaultValue();
8368

84-
@Builder.Default private HbaseConfig.EnCoding enCoding = ENCODING.defaultValue();
69+
@Builder.Default
70+
private HbaseSinkOptions.EnCoding enCoding = HbaseSinkOptions.ENCODING.defaultValue();
8571

8672
public static HbaseParameters buildWithConfig(ReadonlyConfig config) {
8773
HbaseParametersBuilder builder = HbaseParameters.builder();
88-
String table = config.get(TABLE);
74+
String table = config.get(HbaseBaseOptions.TABLE);
8975
int colonIndex = table.indexOf(':');
9076
if (colonIndex != -1) {
9177
String namespace = table.substring(0, colonIndex);
@@ -97,29 +83,29 @@ public static HbaseParameters buildWithConfig(ReadonlyConfig config) {
9783
}
9884

9985
// required parameters
100-
builder.zookeeperQuorum(config.get(ZOOKEEPER_QUORUM));
101-
builder.rowkeyColumns(config.get(ROWKEY_COLUMNS));
102-
builder.familyNames(config.get(FAMILY_NAME));
103-
104-
builder.rowkeyDelimiter(config.get(ROWKEY_DELIMITER));
105-
builder.versionColumn(config.get(VERSION_COLUMN));
106-
String nullMode = String.valueOf(config.get(NULL_MODE));
107-
builder.nullMode(HbaseConfig.NullMode.valueOf(nullMode.toUpperCase()));
108-
builder.walWrite(config.get(WAL_WRITE));
109-
builder.writeBufferSize(config.get(WRITE_BUFFER_SIZE));
110-
String encoding = String.valueOf(config.get(ENCODING));
111-
builder.enCoding(HbaseConfig.EnCoding.valueOf(encoding.toUpperCase()));
112-
builder.hbaseExtraConfig(config.get(HBASE_EXTRA_CONFIG));
113-
builder.ttl(config.get(HBASE_TTL_CONFIG));
86+
builder.zookeeperQuorum(config.get(HbaseBaseOptions.ZOOKEEPER_QUORUM));
87+
builder.rowkeyColumns(config.get(HbaseBaseOptions.ROWKEY_COLUMNS));
88+
builder.familyNames(config.get(HbaseSinkOptions.FAMILY_NAME));
89+
90+
builder.rowkeyDelimiter(config.get(HbaseSinkOptions.ROWKEY_DELIMITER));
91+
builder.versionColumn(config.get(HbaseSinkOptions.VERSION_COLUMN));
92+
String nullMode = String.valueOf(config.get(HbaseSinkOptions.NULL_MODE));
93+
builder.nullMode(HbaseSinkOptions.NullMode.valueOf(nullMode.toUpperCase()));
94+
builder.walWrite(config.get(HbaseSinkOptions.WAL_WRITE));
95+
builder.writeBufferSize(config.get(HbaseSinkOptions.WRITE_BUFFER_SIZE));
96+
String encoding = String.valueOf(config.get(HbaseSinkOptions.ENCODING));
97+
builder.enCoding(HbaseSinkOptions.EnCoding.valueOf(encoding.toUpperCase()));
98+
builder.hbaseExtraConfig(config.get(HbaseSinkOptions.HBASE_EXTRA_CONFIG));
99+
builder.ttl(config.get(HbaseSinkOptions.HBASE_TTL_CONFIG));
114100
return builder.build();
115101
}
116102

117-
public static HbaseParameters buildWithSourceConfig(Config pluginConfig) {
103+
public static HbaseParameters buildWithSourceConfig(ReadonlyConfig pluginConfig) {
118104
HbaseParametersBuilder builder = HbaseParameters.builder();
119105

120106
// required parameters
121-
builder.zookeeperQuorum(pluginConfig.getString(ZOOKEEPER_QUORUM.key()));
122-
String table = pluginConfig.getString(TABLE.key());
107+
builder.zookeeperQuorum(pluginConfig.get(HbaseBaseOptions.ZOOKEEPER_QUORUM));
108+
String table = pluginConfig.get(HbaseBaseOptions.TABLE);
123109
int colonIndex = table.indexOf(':');
124110
if (colonIndex != -1) {
125111
String namespace = table.substring(0, colonIndex);
@@ -129,18 +115,17 @@ public static HbaseParameters buildWithSourceConfig(Config pluginConfig) {
129115
builder.table(table);
130116
}
131117

132-
if (pluginConfig.hasPath(HBASE_EXTRA_CONFIG.key())) {
133-
Config extraConfig = pluginConfig.getConfig(HBASE_EXTRA_CONFIG.key());
134-
builder.hbaseExtraConfig(TypesafeConfigUtils.configToMap(extraConfig));
118+
if (pluginConfig.getOptional(HbaseSinkOptions.HBASE_EXTRA_CONFIG).isPresent()) {
119+
builder.hbaseExtraConfig(pluginConfig.get(HbaseSinkOptions.HBASE_EXTRA_CONFIG));
135120
}
136-
if (pluginConfig.hasPath(HBASE_CACHING_CONFIG.key())) {
137-
builder.caching(pluginConfig.getInt(HBASE_CACHING_CONFIG.key()));
121+
if (pluginConfig.getOptional(HbaseSinkOptions.HBASE_CACHING_CONFIG).isPresent()) {
122+
builder.caching(pluginConfig.get(HbaseSinkOptions.HBASE_CACHING_CONFIG));
138123
}
139-
if (pluginConfig.hasPath(HBASE_BATCH_CONFIG.key())) {
140-
builder.batch(pluginConfig.getInt(HBASE_BATCH_CONFIG.key()));
124+
if (pluginConfig.getOptional(HbaseSinkOptions.HBASE_BATCH_CONFIG).isPresent()) {
125+
builder.batch(pluginConfig.get(HbaseSinkOptions.HBASE_BATCH_CONFIG));
141126
}
142-
if (pluginConfig.hasPath(HBASE_CACHE_BLOCKS_CONFIG.key())) {
143-
builder.cacheBlocks(pluginConfig.getBoolean(HBASE_CACHE_BLOCKS_CONFIG.key()));
127+
if (pluginConfig.getOptional(HbaseSinkOptions.HBASE_CACHE_BLOCKS_CONFIG).isPresent()) {
128+
builder.cacheBlocks(pluginConfig.get(HbaseSinkOptions.HBASE_CACHE_BLOCKS_CONFIG));
144129
}
145130
return builder.build();
146131
}

Diff for: seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseConfig.java renamed to seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/config/HbaseSinkOptions.java

+3-14
Original file line numberDiff line numberDiff line change
@@ -30,25 +30,16 @@
3030
import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
3131
import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
3232

33-
public class HbaseConfig {
34-
35-
private static final Integer DEFAULT_BUFFER_SIZE = 8 * 1024 * 1024;
36-
37-
public static final Option<String> ZOOKEEPER_QUORUM =
38-
Options.key("zookeeper_quorum")
39-
.stringType()
40-
.noDefaultValue()
41-
.withDescription("Hbase zookeeper quorum");
42-
43-
public static final Option<String> TABLE =
44-
Options.key("table").stringType().noDefaultValue().withDescription("Hbase table name");
33+
public class HbaseSinkOptions extends HbaseBaseOptions {
4534

4635
public static final Option<List<String>> ROWKEY_COLUMNS =
4736
Options.key("rowkey_column")
4837
.listType()
4938
.noDefaultValue()
5039
.withDescription("Hbase rowkey column");
5140

41+
private static final Integer DEFAULT_BUFFER_SIZE = 8 * 1024 * 1024;
42+
5243
public static final Option<String> ROWKEY_DELIMITER =
5344
Options.key("rowkey_delimiter")
5445
.stringType()
@@ -149,6 +140,4 @@ public enum EnCoding {
149140
UTF8,
150141
GBK;
151142
}
152-
153-
private HbaseConfig() {}
154143
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.hbase.config;
19+
20+
public class HbaseSourceOptions extends HbaseBaseOptions {}

Diff for: seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSink.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@
3232
import org.apache.seatunnel.api.table.factory.CatalogFactory;
3333
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
3434
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
35-
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig;
3635
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters;
36+
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseSinkOptions;
3737
import org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
3838
import org.apache.seatunnel.connectors.seatunnel.hbase.state.HbaseAggregatedCommitInfo;
3939
import org.apache.seatunnel.connectors.seatunnel.hbase.state.HbaseCommitInfo;
@@ -102,8 +102,8 @@ public Optional<SaveModeHandler> getSaveModeHandler() {
102102
return Optional.empty();
103103
}
104104
Catalog catalog = catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), config);
105-
SchemaSaveMode schemaSaveMode = config.get(HbaseConfig.SCHEMA_SAVE_MODE);
106-
DataSaveMode dataSaveMode = config.get(HbaseConfig.DATA_SAVE_MODE);
105+
SchemaSaveMode schemaSaveMode = config.get(HbaseSinkOptions.SCHEMA_SAVE_MODE);
106+
DataSaveMode dataSaveMode = config.get(HbaseSinkOptions.DATA_SAVE_MODE);
107107
TablePath tablePath =
108108
TablePath.of(hbaseParameters.getNamespace(), hbaseParameters.getTable());
109109
return Optional.of(

Diff for: seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkFactory.java

+14-29
Original file line numberDiff line numberDiff line change
@@ -24,29 +24,14 @@
2424
import org.apache.seatunnel.api.table.factory.Factory;
2525
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
2626
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
27+
import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseSinkOptions;
2728
import org.apache.seatunnel.connectors.seatunnel.hbase.constant.HbaseIdentifier;
2829

2930
import com.google.auto.service.AutoService;
3031

31-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.DATA_SAVE_MODE;
32-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ENCODING;
33-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.FAMILY_NAME;
34-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.HBASE_EXTRA_CONFIG;
35-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.NULL_MODE;
36-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_COLUMNS;
37-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ROWKEY_DELIMITER;
38-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.SCHEMA_SAVE_MODE;
39-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.TABLE;
40-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.VERSION_COLUMN;
41-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WAL_WRITE;
42-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.WRITE_BUFFER_SIZE;
43-
import static org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseConfig.ZOOKEEPER_QUORUM;
44-
4532
@AutoService(Factory.class)
4633
public class HbaseSinkFactory implements TableSinkFactory {
4734

48-
public static final String IDENTIFIER = "Hbase";
49-
5035
@Override
5136
public String factoryIdentifier() {
5237
return HbaseIdentifier.IDENTIFIER_NAME;
@@ -56,20 +41,20 @@ public String factoryIdentifier() {
5641
public OptionRule optionRule() {
5742
return OptionRule.builder()
5843
.required(
59-
ZOOKEEPER_QUORUM,
60-
TABLE,
61-
ROWKEY_COLUMNS,
62-
FAMILY_NAME,
63-
SCHEMA_SAVE_MODE,
64-
DATA_SAVE_MODE)
44+
HbaseSinkOptions.ZOOKEEPER_QUORUM,
45+
HbaseSinkOptions.TABLE,
46+
HbaseSinkOptions.ROWKEY_COLUMNS,
47+
HbaseSinkOptions.FAMILY_NAME,
48+
HbaseSinkOptions.SCHEMA_SAVE_MODE,
49+
HbaseSinkOptions.DATA_SAVE_MODE)
6550
.optional(
66-
ROWKEY_DELIMITER,
67-
VERSION_COLUMN,
68-
NULL_MODE,
69-
WAL_WRITE,
70-
WRITE_BUFFER_SIZE,
71-
ENCODING,
72-
HBASE_EXTRA_CONFIG,
51+
HbaseSinkOptions.ROWKEY_DELIMITER,
52+
HbaseSinkOptions.VERSION_COLUMN,
53+
HbaseSinkOptions.NULL_MODE,
54+
HbaseSinkOptions.WAL_WRITE,
55+
HbaseSinkOptions.WRITE_BUFFER_SIZE,
56+
HbaseSinkOptions.ENCODING,
57+
HbaseSinkOptions.HBASE_EXTRA_CONFIG,
7358
SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA)
7459
.build();
7560
}

0 commit comments

Comments
 (0)