Skip to content

Commit 46558ef

Browse files
liunaijiejia zhang
authored and
jia zhang
committed
[Improve] kudu options (apache#9162)
1 parent 2a88115 commit 46558ef

File tree

12 files changed

+280
-220
lines changed

12 files changed

+280
-220
lines changed

Diff for: seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java

-2
Original file line numberDiff line numberDiff line change
@@ -204,13 +204,11 @@ private Set<String> buildWhiteList() {
204204
whiteList.add("QdrantSinkOptions");
205205
whiteList.add("RocketMqSinkOptions");
206206
whiteList.add("MaxcomputeSourceOptions");
207-
whiteList.add("KuduSourceOptions");
208207
whiteList.add("SocketSinkOptions");
209208
whiteList.add("SelectDBSinkOptions");
210209
whiteList.add("RocketMqSourceOptions");
211210
whiteList.add("TablestoreSinkOptions");
212211
whiteList.add("TableStoreDBSourceOptions");
213-
whiteList.add("KuduSinkOptions");
214212
whiteList.add("TDengineSinkOptions");
215213
whiteList.add("Neo4jSourceOptions");
216214
whiteList.add("QdrantSourceOptions");

Diff for: seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalog.java

+13-18
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
3232
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
3333
import org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig;
34+
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduBaseOptions;
3435
import org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduTypeMapper;
3536
import org.apache.seatunnel.connectors.seatunnel.kudu.util.KuduUtil;
3637

@@ -50,15 +51,6 @@
5051
import java.util.stream.Collectors;
5152
import java.util.stream.IntStream;
5253

53-
import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.ADMIN_OPERATION_TIMEOUT;
54-
import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.ENABLE_KERBEROS;
55-
import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.KERBEROS_KEYTAB;
56-
import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.KERBEROS_KRB5_CONF;
57-
import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.KERBEROS_PRINCIPAL;
58-
import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.MASTER;
59-
import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.OPERATION_TIMEOUT;
60-
import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.TABLE_NAME;
61-
import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.WORKER_COUNT;
6254
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
6355

6456
public class KuduCatalog implements Catalog {
@@ -184,19 +176,22 @@ public CatalogTable getTable(TablePath tablePath)
184176
private Map<String, String> buildConnectorOptions(TablePath tablePath) {
185177
Map<String, String> options = new HashMap<>(8);
186178
options.put("connector", "kudu");
187-
options.put(TABLE_NAME.key(), tablePath.getFullName());
188-
options.put(MASTER.key(), config.getMasters());
189-
options.put(WORKER_COUNT.key(), config.getWorkerCount().toString());
190-
options.put(OPERATION_TIMEOUT.key(), config.getOperationTimeout().toString());
191-
options.put(ADMIN_OPERATION_TIMEOUT.key(), config.getAdminOperationTimeout().toString());
179+
options.put(KuduBaseOptions.TABLE_NAME.key(), tablePath.getFullName());
180+
options.put(KuduBaseOptions.MASTER.key(), config.getMasters());
181+
options.put(KuduBaseOptions.WORKER_COUNT.key(), config.getWorkerCount().toString());
182+
options.put(
183+
KuduBaseOptions.OPERATION_TIMEOUT.key(), config.getOperationTimeout().toString());
184+
options.put(
185+
KuduBaseOptions.ADMIN_OPERATION_TIMEOUT.key(),
186+
config.getAdminOperationTimeout().toString());
192187
if (config.getEnableKerberos()) {
193-
options.put(KERBEROS_PRINCIPAL.key(), config.getPrincipal());
194-
options.put(KERBEROS_KEYTAB.key(), config.getKeytab());
188+
options.put(KuduBaseOptions.KERBEROS_PRINCIPAL.key(), config.getPrincipal());
189+
options.put(KuduBaseOptions.KERBEROS_KEYTAB.key(), config.getKeytab());
195190
if (StringUtils.isNotBlank(config.getKrb5conf())) {
196-
options.put(KERBEROS_KRB5_CONF.key(), config.getKrb5conf());
191+
options.put(KuduBaseOptions.KERBEROS_KRB5_CONF.key(), config.getKrb5conf());
197192
}
198193
}
199-
options.put(ENABLE_KERBEROS.key(), config.getEnableKerberos().toString());
194+
options.put(KuduBaseOptions.ENABLE_KERBEROS.key(), config.getEnableKerberos().toString());
200195
return options;
201196
}
202197

Diff for: seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/catalog/KuduCatalogFactory.java

+12-16
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,10 @@
2323
import org.apache.seatunnel.api.table.factory.CatalogFactory;
2424
import org.apache.seatunnel.api.table.factory.Factory;
2525
import org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig;
26+
import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduBaseOptions;
2627

2728
import com.google.auto.service.AutoService;
2829

29-
import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.ADMIN_OPERATION_TIMEOUT;
30-
import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.ENABLE_KERBEROS;
31-
import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.KERBEROS_KEYTAB;
32-
import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.KERBEROS_KRB5_CONF;
33-
import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.KERBEROS_PRINCIPAL;
34-
import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.MASTER;
35-
import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.OPERATION_TIMEOUT;
36-
import static org.apache.seatunnel.connectors.seatunnel.kudu.config.CommonConfig.WORKER_COUNT;
37-
3830
@AutoService(Factory.class)
3931
public class KuduCatalogFactory implements CatalogFactory {
4032

@@ -55,13 +47,17 @@ public String factoryIdentifier() {
5547
@Override
5648
public OptionRule optionRule() {
5749
return OptionRule.builder()
58-
.required(MASTER)
59-
.optional(WORKER_COUNT)
60-
.optional(OPERATION_TIMEOUT)
61-
.optional(ADMIN_OPERATION_TIMEOUT)
62-
.optional(KERBEROS_KRB5_CONF)
63-
.optional(ENABLE_KERBEROS)
64-
.conditional(ENABLE_KERBEROS, true, KERBEROS_PRINCIPAL, KERBEROS_KEYTAB)
50+
.required(KuduBaseOptions.MASTER)
51+
.optional(KuduBaseOptions.WORKER_COUNT)
52+
.optional(KuduBaseOptions.OPERATION_TIMEOUT)
53+
.optional(KuduBaseOptions.ADMIN_OPERATION_TIMEOUT)
54+
.optional(KuduBaseOptions.KERBEROS_KRB5_CONF)
55+
.optional(KuduBaseOptions.ENABLE_KERBEROS)
56+
.conditional(
57+
KuduBaseOptions.ENABLE_KERBEROS,
58+
true,
59+
KuduBaseOptions.KERBEROS_PRINCIPAL,
60+
KuduBaseOptions.KERBEROS_KEYTAB)
6561
.build();
6662
}
6763
}

Diff for: seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/CommonConfig.java

+8-69
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,8 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.kudu.config;
1919

20-
import org.apache.seatunnel.api.configuration.Option;
21-
import org.apache.seatunnel.api.configuration.Options;
2220
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2321

24-
import org.apache.kudu.client.AsyncKuduClient;
25-
2622
import lombok.Getter;
2723
import lombok.ToString;
2824

@@ -32,63 +28,6 @@
3228
@ToString
3329
public class CommonConfig implements Serializable {
3430

35-
public static final Option<String> MASTER =
36-
Options.key("kudu_masters")
37-
.stringType()
38-
.noDefaultValue()
39-
.withDescription("Kudu master address. Separated by ','");
40-
41-
public static final Option<String> TABLE_NAME =
42-
Options.key("table_name")
43-
.stringType()
44-
.noDefaultValue()
45-
.withDescription("Kudu table name");
46-
47-
public static final Option<Integer> WORKER_COUNT =
48-
Options.key("client_worker_count")
49-
.intType()
50-
.defaultValue(2 * Runtime.getRuntime().availableProcessors())
51-
.withDescription(
52-
"Kudu worker count. Default value is twice the current number of cpu cores");
53-
54-
public static final Option<Long> OPERATION_TIMEOUT =
55-
Options.key("client_default_operation_timeout_ms")
56-
.longType()
57-
.defaultValue(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS)
58-
.withDescription("Kudu normal operation time out");
59-
60-
public static final Option<Long> ADMIN_OPERATION_TIMEOUT =
61-
Options.key("client_default_admin_operation_timeout_ms")
62-
.longType()
63-
.defaultValue(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS)
64-
.withDescription("Kudu admin operation time out");
65-
66-
public static final Option<Boolean> ENABLE_KERBEROS =
67-
Options.key("enable_kerberos")
68-
.booleanType()
69-
.defaultValue(false)
70-
.withDescription("Kerberos principal enable.");
71-
public static final Option<String> KERBEROS_PRINCIPAL =
72-
Options.key("kerberos_principal")
73-
.stringType()
74-
.noDefaultValue()
75-
.withDescription(
76-
"Kerberos principal. Note that all zeta nodes require have this file.");
77-
78-
public static final Option<String> KERBEROS_KEYTAB =
79-
Options.key("kerberos_keytab")
80-
.stringType()
81-
.noDefaultValue()
82-
.withDescription(
83-
"Kerberos keytab. Note that all zeta nodes require have this file.");
84-
85-
public static final Option<String> KERBEROS_KRB5_CONF =
86-
Options.key("kerberos_krb5conf")
87-
.stringType()
88-
.noDefaultValue()
89-
.withDescription(
90-
"Kerberos krb5 conf. Note that all zeta nodes require have this file.");
91-
9231
protected String masters;
9332
protected Integer workerCount;
9433

@@ -102,13 +41,13 @@ public class CommonConfig implements Serializable {
10241
protected String krb5conf;
10342

10443
public CommonConfig(ReadonlyConfig config) {
105-
this.masters = config.get(MASTER);
106-
this.workerCount = config.get(WORKER_COUNT);
107-
this.operationTimeout = config.get(OPERATION_TIMEOUT);
108-
this.adminOperationTimeout = config.get(ADMIN_OPERATION_TIMEOUT);
109-
this.enableKerberos = config.get(ENABLE_KERBEROS);
110-
this.principal = config.get(KERBEROS_PRINCIPAL);
111-
this.keytab = config.get(KERBEROS_KEYTAB);
112-
this.krb5conf = config.get(KERBEROS_KRB5_CONF);
44+
this.masters = config.get(KuduBaseOptions.MASTER);
45+
this.workerCount = config.get(KuduBaseOptions.WORKER_COUNT);
46+
this.operationTimeout = config.get(KuduBaseOptions.OPERATION_TIMEOUT);
47+
this.adminOperationTimeout = config.get(KuduBaseOptions.ADMIN_OPERATION_TIMEOUT);
48+
this.enableKerberos = config.get(KuduBaseOptions.ENABLE_KERBEROS);
49+
this.principal = config.get(KuduBaseOptions.KERBEROS_PRINCIPAL);
50+
this.keytab = config.get(KuduBaseOptions.KERBEROS_KEYTAB);
51+
this.krb5conf = config.get(KuduBaseOptions.KERBEROS_KRB5_CONF);
11352
}
11453
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.kudu.config;
19+
20+
import org.apache.seatunnel.api.configuration.Option;
21+
import org.apache.seatunnel.api.configuration.Options;
22+
import org.apache.seatunnel.api.options.ConnectorCommonOptions;
23+
24+
import org.apache.kudu.client.AsyncKuduClient;
25+
26+
import java.io.Serializable;
27+
28+
public class KuduBaseOptions extends ConnectorCommonOptions implements Serializable {
29+
30+
public static final Option<String> MASTER =
31+
Options.key("kudu_masters")
32+
.stringType()
33+
.noDefaultValue()
34+
.withDescription("Kudu master address. Separated by ','");
35+
36+
public static final Option<String> TABLE_NAME =
37+
Options.key("table_name")
38+
.stringType()
39+
.noDefaultValue()
40+
.withDescription("Kudu table name");
41+
42+
public static final Option<Integer> WORKER_COUNT =
43+
Options.key("client_worker_count")
44+
.intType()
45+
.defaultValue(2 * Runtime.getRuntime().availableProcessors())
46+
.withDescription(
47+
"Kudu worker count. Default value is twice the current number of cpu cores");
48+
49+
public static final Option<Long> OPERATION_TIMEOUT =
50+
Options.key("client_default_operation_timeout_ms")
51+
.longType()
52+
.defaultValue(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS)
53+
.withDescription("Kudu normal operation time out");
54+
55+
public static final Option<Long> ADMIN_OPERATION_TIMEOUT =
56+
Options.key("client_default_admin_operation_timeout_ms")
57+
.longType()
58+
.defaultValue(AsyncKuduClient.DEFAULT_OPERATION_TIMEOUT_MS)
59+
.withDescription("Kudu admin operation time out");
60+
61+
public static final Option<Boolean> ENABLE_KERBEROS =
62+
Options.key("enable_kerberos")
63+
.booleanType()
64+
.defaultValue(false)
65+
.withDescription("Kerberos principal enable.");
66+
public static final Option<String> KERBEROS_PRINCIPAL =
67+
Options.key("kerberos_principal")
68+
.stringType()
69+
.noDefaultValue()
70+
.withDescription(
71+
"Kerberos principal. Note that all zeta nodes require have this file.");
72+
73+
public static final Option<String> KERBEROS_KEYTAB =
74+
Options.key("kerberos_keytab")
75+
.stringType()
76+
.noDefaultValue()
77+
.withDescription(
78+
"Kerberos keytab. Note that all zeta nodes require have this file.");
79+
80+
public static final Option<String> KERBEROS_KRB5_CONF =
81+
Options.key("kerberos_krb5conf")
82+
.stringType()
83+
.noDefaultValue()
84+
.withDescription(
85+
"Kerberos krb5 conf. Note that all zeta nodes require have this file.");
86+
}

Diff for: seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSinkConfig.java

+7-49
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.kudu.config;
1919

20-
import org.apache.seatunnel.api.configuration.Option;
21-
import org.apache.seatunnel.api.configuration.Options;
2220
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2321

2422
import org.apache.kudu.client.SessionConfiguration;
@@ -32,46 +30,6 @@
3230
@ToString
3331
public class KuduSinkConfig extends CommonConfig {
3432

35-
public static final Option<SaveMode> SAVE_MODE =
36-
Options.key("save_mode")
37-
.enumType(SaveMode.class)
38-
.defaultValue(SaveMode.APPEND)
39-
.withDescription("Storage mode,append is now supported");
40-
41-
public static final Option<String> FLUSH_MODE =
42-
Options.key("session_flush_mode")
43-
.stringType()
44-
.defaultValue(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC.name())
45-
.withDescription("Kudu flush mode. Default AUTO_FLUSH_SYNC");
46-
47-
public static final Option<Integer> BATCH_SIZE =
48-
Options.key("batch_size")
49-
.intType()
50-
.defaultValue(1024)
51-
.withDescription(
52-
"the flush max size (includes all append, upsert and delete records), over this number"
53-
+ " of records, will flush data. The default value is 100.");
54-
55-
public static final Option<Integer> BUFFER_FLUSH_INTERVAL =
56-
Options.key("buffer_flush_interval")
57-
.intType()
58-
.defaultValue(10000)
59-
.withDescription(
60-
"the flush interval mills, over this time, asynchronous threads will flush data. The "
61-
+ "default value is 1s.");
62-
63-
public static final Option<Boolean> IGNORE_NOT_FOUND =
64-
Options.key("ignore_not_found")
65-
.booleanType()
66-
.defaultValue(false)
67-
.withDescription("if true, ignore all not found rows");
68-
69-
public static final Option<Boolean> IGNORE_DUPLICATE =
70-
Options.key("ignore_not_duplicate")
71-
.booleanType()
72-
.defaultValue(false)
73-
.withDescription("if true, ignore all dulicate rows");
74-
7533
private SaveMode saveMode;
7634

7735
private String table;
@@ -101,13 +59,13 @@ public static SaveMode fromStr(String str) {
10159

10260
public KuduSinkConfig(ReadonlyConfig config) {
10361
super(config);
104-
this.table = config.get(TABLE_NAME);
105-
this.saveMode = config.get(SAVE_MODE);
106-
this.flushMode = fromStrFlushMode(config.get(FLUSH_MODE));
107-
this.maxBufferSize = config.get(BATCH_SIZE);
108-
this.flushInterval = config.get(BUFFER_FLUSH_INTERVAL);
109-
this.ignoreNotFound = config.get(IGNORE_NOT_FOUND);
110-
this.ignoreDuplicate = config.get(IGNORE_DUPLICATE);
62+
this.table = config.get(KuduSinkOptions.TABLE_NAME);
63+
this.saveMode = config.get(KuduSinkOptions.SAVE_MODE);
64+
this.flushMode = fromStrFlushMode(config.get(KuduSinkOptions.FLUSH_MODE));
65+
this.maxBufferSize = config.get(KuduSinkOptions.BATCH_SIZE);
66+
this.flushInterval = config.get(KuduSinkOptions.BUFFER_FLUSH_INTERVAL);
67+
this.ignoreNotFound = config.get(KuduSinkOptions.IGNORE_NOT_FOUND);
68+
this.ignoreDuplicate = config.get(KuduSinkOptions.IGNORE_DUPLICATE);
11169
}
11270

11371
private SessionConfiguration.FlushMode fromStrFlushMode(String flushMode) {

0 commit comments

Comments
 (0)