Skip to content

Commit 268d76c

Browse files
authored
[Improve] doris options (#8745)
1 parent 6d80ff3 commit 268d76c

File tree

15 files changed

+191
-192
lines changed

15 files changed

+191
-192
lines changed

Diff for: seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
import org.apache.seatunnel.common.exception.CommonError;
4040
import org.apache.seatunnel.common.exception.CommonErrorCode;
4141
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
42-
import org.apache.seatunnel.connectors.doris.config.DorisOptions;
42+
import org.apache.seatunnel.connectors.doris.config.DorisBaseOptions;
4343
import org.apache.seatunnel.connectors.doris.datatype.DorisTypeConverterFactory;
4444
import org.apache.seatunnel.connectors.doris.datatype.DorisTypeConverterV2;
4545
import org.apache.seatunnel.connectors.doris.util.DorisCatalogUtil;
@@ -463,9 +463,9 @@ public void dropDatabase(TablePath tablePath, boolean ignoreIfNotExists)
463463
private Map<String, String> connectorOptions() {
464464
Map<String, String> options = new HashMap<>();
465465
options.put("connector", "doris");
466-
options.put(DorisOptions.FENODES.key(), String.join(",", frontEndNodes));
467-
options.put(DorisOptions.USERNAME.key(), username);
468-
options.put(DorisOptions.PASSWORD.key(), password);
466+
options.put(DorisBaseOptions.FENODES.key(), String.join(",", frontEndNodes));
467+
options.put(DorisBaseOptions.USERNAME.key(), username);
468+
options.put(DorisBaseOptions.PASSWORD.key(), password);
469469
return options;
470470
}
471471

Diff for: seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalogFactory.java

+13-7
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,12 @@
2222
import org.apache.seatunnel.api.table.catalog.Catalog;
2323
import org.apache.seatunnel.api.table.factory.CatalogFactory;
2424
import org.apache.seatunnel.api.table.factory.Factory;
25-
import org.apache.seatunnel.connectors.doris.config.DorisOptions;
25+
import org.apache.seatunnel.connectors.doris.config.DorisBaseOptions;
2626
import org.apache.seatunnel.connectors.doris.config.DorisSinkOptions;
2727

2828
import com.google.auto.service.AutoService;
2929

30-
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.IDENTIFIER;
30+
import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.IDENTIFIER;
3131
import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.SAVE_MODE_CREATE_TEMPLATE;
3232

3333
@AutoService(Factory.class)
@@ -37,10 +37,10 @@ public class DorisCatalogFactory implements CatalogFactory {
3737
public Catalog createCatalog(String catalogName, ReadonlyConfig options) {
3838
return new DorisCatalog(
3939
catalogName,
40-
options.get(DorisOptions.FENODES),
41-
options.get(DorisOptions.QUERY_PORT),
42-
options.get(DorisOptions.USERNAME),
43-
options.get(DorisOptions.PASSWORD),
40+
options.get(DorisBaseOptions.FENODES),
41+
options.get(DorisBaseOptions.QUERY_PORT),
42+
options.get(DorisBaseOptions.USERNAME),
43+
options.get(DorisBaseOptions.PASSWORD),
4444
options.get(SAVE_MODE_CREATE_TEMPLATE),
4545
options.get(DorisSinkOptions.DEFAULT_DATABASE));
4646
}
@@ -52,6 +52,12 @@ public String factoryIdentifier() {
5252

5353
@Override
5454
public OptionRule optionRule() {
55-
return DorisOptions.CATALOG_RULE.build();
55+
return OptionRule.builder()
56+
.required(
57+
DorisBaseOptions.FENODES,
58+
DorisBaseOptions.QUERY_PORT,
59+
DorisBaseOptions.USERNAME,
60+
DorisBaseOptions.PASSWORD)
61+
.build();
5662
}
5763
}

Diff for: seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisOptions.java renamed to seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisBaseOptions.java

+10-23
Original file line numberDiff line numberDiff line change
@@ -19,58 +19,45 @@
1919

2020
import org.apache.seatunnel.api.configuration.Option;
2121
import org.apache.seatunnel.api.configuration.Options;
22-
import org.apache.seatunnel.api.configuration.util.OptionRule;
2322

24-
public interface DorisOptions {
23+
public class DorisBaseOptions {
2524

26-
String IDENTIFIER = "Doris";
27-
String DORIS_DEFAULT_CLUSTER = "default_cluster";
28-
int DORIS_BATCH_SIZE_DEFAULT = 1024;
25+
public static final String IDENTIFIER = "Doris";
2926

3027
// common option
31-
Option<String> FENODES =
28+
public static final Option<String> FENODES =
3229
Options.key("fenodes")
3330
.stringType()
3431
.noDefaultValue()
3532
.withDescription("doris fe http address.");
3633

37-
Option<Integer> QUERY_PORT =
34+
public static final Option<Integer> QUERY_PORT =
3835
Options.key("query-port")
3936
.intType()
4037
.defaultValue(9030)
4138
.withDescription("doris query port");
4239

43-
@Deprecated
44-
Option<String> TABLE_IDENTIFIER =
45-
Options.key("table.identifier")
46-
.stringType()
47-
.noDefaultValue()
48-
.withDescription("the doris table name.");
49-
50-
Option<String> USERNAME =
40+
public static final Option<String> USERNAME =
5141
Options.key("username")
5242
.stringType()
5343
.noDefaultValue()
5444
.withDescription("the doris user name.");
5545

56-
Option<String> PASSWORD =
46+
public static final Option<String> PASSWORD =
5747
Options.key("password")
5848
.stringType()
5949
.noDefaultValue()
6050
.withDescription("the doris password.");
6151

62-
Option<String> TABLE =
52+
public static final Option<String> TABLE =
6353
Options.key("table").stringType().noDefaultValue().withDescription("table");
6454

65-
Option<String> DATABASE =
55+
public static final Option<String> DATABASE =
6656
Options.key("database").stringType().noDefaultValue().withDescription("database");
6757

68-
Option<Integer> DORIS_BATCH_SIZE =
58+
public static final Option<Integer> DORIS_BATCH_SIZE =
6959
Options.key("doris.batch.size")
7060
.intType()
71-
.defaultValue(DORIS_BATCH_SIZE_DEFAULT)
61+
.defaultValue(1024)
7262
.withDescription("the batch size of the doris read/write.");
73-
74-
OptionRule.Builder CATALOG_RULE =
75-
OptionRule.builder().required(FENODES, QUERY_PORT, USERNAME, PASSWORD);
7663
}

Diff for: seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkConfig.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,13 @@
2929
import java.util.Map;
3030
import java.util.Properties;
3131

32-
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE;
33-
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_BATCH_SIZE;
34-
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.FENODES;
35-
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.PASSWORD;
36-
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.QUERY_PORT;
37-
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE;
38-
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.USERNAME;
32+
import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.DATABASE;
33+
import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.DORIS_BATCH_SIZE;
34+
import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.FENODES;
35+
import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.PASSWORD;
36+
import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.QUERY_PORT;
37+
import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.TABLE;
38+
import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.USERNAME;
3939
import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.DORIS_SINK_CONFIG_PREFIX;
4040
import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.NEEDS_UNSUPPORTED_TYPE_CASTING;
4141
import static org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.SAVE_MODE_CREATE_TEMPLATE;

Diff for: seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java

+25-57
Original file line numberDiff line numberDiff line change
@@ -19,105 +19,96 @@
1919

2020
import org.apache.seatunnel.api.configuration.Option;
2121
import org.apache.seatunnel.api.configuration.Options;
22-
import org.apache.seatunnel.api.configuration.util.OptionRule;
2322
import org.apache.seatunnel.api.sink.DataSaveMode;
2423
import org.apache.seatunnel.api.sink.SaveModePlaceHolder;
2524
import org.apache.seatunnel.api.sink.SchemaSaveMode;
2625

2726
import java.util.Map;
2827

29-
import static org.apache.seatunnel.api.options.SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA;
30-
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DATABASE;
31-
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.DORIS_BATCH_SIZE;
32-
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.FENODES;
33-
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.PASSWORD;
34-
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.QUERY_PORT;
35-
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE;
36-
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.TABLE_IDENTIFIER;
37-
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.USERNAME;
28+
public class DorisSinkOptions extends DorisBaseOptions {
3829

39-
public interface DorisSinkOptions {
40-
41-
int DEFAULT_SINK_CHECK_INTERVAL = 10000;
42-
int DEFAULT_SINK_MAX_RETRIES = 3;
43-
int DEFAULT_SINK_BUFFER_SIZE = 256 * 1024;
44-
int DEFAULT_SINK_BUFFER_COUNT = 3;
30+
@Deprecated
31+
public static final Option<String> TABLE_IDENTIFIER =
32+
Options.key("table.identifier")
33+
.stringType()
34+
.noDefaultValue()
35+
.withDescription("the doris table name.");
4536

46-
Option<Boolean> SINK_ENABLE_2PC =
37+
public static final Option<Boolean> SINK_ENABLE_2PC =
4738
Options.key("sink.enable-2pc")
4839
.booleanType()
4940
.defaultValue(false)
5041
.withDescription("enable 2PC while loading");
5142

52-
Option<Integer> SINK_CHECK_INTERVAL =
43+
public static final Option<Integer> SINK_CHECK_INTERVAL =
5344
Options.key("sink.check-interval")
5445
.intType()
55-
.defaultValue(DEFAULT_SINK_CHECK_INTERVAL)
46+
.defaultValue(10000)
5647
.withDescription("check exception with the interval while loading");
57-
Option<Integer> SINK_MAX_RETRIES =
48+
public static final Option<Integer> SINK_MAX_RETRIES =
5849
Options.key("sink.max-retries")
5950
.intType()
60-
.defaultValue(DEFAULT_SINK_MAX_RETRIES)
51+
.defaultValue(3)
6152
.withDescription("the max retry times if writing records to database failed.");
62-
Option<Integer> SINK_BUFFER_SIZE =
53+
public static final Option<Integer> SINK_BUFFER_SIZE =
6354
Options.key("sink.buffer-size")
6455
.intType()
65-
.defaultValue(DEFAULT_SINK_BUFFER_SIZE)
56+
.defaultValue(256 * 1024)
6657
.withDescription("the buffer size to cache data for stream load.");
67-
Option<Integer> SINK_BUFFER_COUNT =
58+
public static final Option<Integer> SINK_BUFFER_COUNT =
6859
Options.key("sink.buffer-count")
6960
.intType()
70-
.defaultValue(DEFAULT_SINK_BUFFER_COUNT)
61+
.defaultValue(3)
7162
.withDescription("the buffer count to cache data for stream load.");
72-
Option<String> SINK_LABEL_PREFIX =
63+
public static final Option<String> SINK_LABEL_PREFIX =
7364
Options.key("sink.label-prefix")
7465
.stringType()
7566
.defaultValue("")
7667
.withDescription("the unique label prefix.");
77-
Option<Boolean> SINK_ENABLE_DELETE =
68+
public static final Option<Boolean> SINK_ENABLE_DELETE =
7869
Options.key("sink.enable-delete")
7970
.booleanType()
8071
.defaultValue(false)
8172
.withDescription("whether to enable the delete function");
8273

83-
Option<Map<String, String>> DORIS_SINK_CONFIG_PREFIX =
74+
public static final Option<Map<String, String>> DORIS_SINK_CONFIG_PREFIX =
8475
Options.key("doris.config")
8576
.mapType()
8677
.noDefaultValue()
8778
.withDescription(
8879
"The parameter of the Stream Load data_desc. "
8980
+ "The way to specify the parameter is to add the prefix `doris.config` to the original load parameter name ");
9081

91-
Option<String> DEFAULT_DATABASE =
82+
public static final Option<String> DEFAULT_DATABASE =
9283
Options.key("default-database")
9384
.stringType()
9485
.defaultValue("information_schema")
9586
.withDescription("");
9687

97-
Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
88+
public static final Option<SchemaSaveMode> SCHEMA_SAVE_MODE =
9889
Options.key("schema_save_mode")
9990
.enumType(SchemaSaveMode.class)
10091
.defaultValue(SchemaSaveMode.CREATE_SCHEMA_WHEN_NOT_EXIST)
10192
.withDescription("schema_save_mode");
10293

103-
Option<DataSaveMode> DATA_SAVE_MODE =
94+
public static final Option<DataSaveMode> DATA_SAVE_MODE =
10495
Options.key("data_save_mode")
10596
.enumType(DataSaveMode.class)
10697
.defaultValue(DataSaveMode.APPEND_DATA)
10798
.withDescription("data_save_mode");
10899

109-
Option<String> CUSTOM_SQL =
100+
public static final Option<String> CUSTOM_SQL =
110101
Options.key("custom_sql").stringType().noDefaultValue().withDescription("custom_sql");
111102

112-
Option<Boolean> NEEDS_UNSUPPORTED_TYPE_CASTING =
103+
public static final Option<Boolean> NEEDS_UNSUPPORTED_TYPE_CASTING =
113104
Options.key("needs_unsupported_type_casting")
114105
.booleanType()
115106
.defaultValue(false)
116107
.withDescription(
117108
"Whether to enable the unsupported type casting, such as Decimal64 to Double");
118109

119110
// create table
120-
Option<String> SAVE_MODE_CREATE_TEMPLATE =
111+
public static final Option<String> SAVE_MODE_CREATE_TEMPLATE =
121112
Options.key("save_mode_create_template")
122113
.stringType()
123114
.defaultValue(
@@ -147,27 +138,4 @@ public interface DorisSinkOptions {
147138
+ "\"disable_auto_compaction\" = \"false\"\n"
148139
+ ")")
149140
.withDescription("Create table statement template, used to create Doris table");
150-
151-
OptionRule.Builder SINK_RULE =
152-
OptionRule.builder()
153-
.required(
154-
FENODES,
155-
USERNAME,
156-
PASSWORD,
157-
SINK_LABEL_PREFIX,
158-
DORIS_SINK_CONFIG_PREFIX,
159-
DATA_SAVE_MODE,
160-
SCHEMA_SAVE_MODE)
161-
.optional(
162-
DATABASE,
163-
TABLE,
164-
TABLE_IDENTIFIER,
165-
QUERY_PORT,
166-
DORIS_BATCH_SIZE,
167-
SINK_ENABLE_2PC,
168-
SINK_ENABLE_DELETE,
169-
MULTI_TABLE_SINK_REPLICA,
170-
SAVE_MODE_CREATE_TEMPLATE,
171-
NEEDS_UNSUPPORTED_TYPE_CASTING)
172-
.conditional(DATA_SAVE_MODE, DataSaveMode.CUSTOM_PROCESSING, CUSTOM_SQL);
173141
}

Diff for: seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSourceConfig.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@
2525
import java.io.Serializable;
2626
import java.util.List;
2727

28-
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.FENODES;
29-
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.PASSWORD;
30-
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.QUERY_PORT;
31-
import static org.apache.seatunnel.connectors.doris.config.DorisOptions.USERNAME;
28+
import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.FENODES;
29+
import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.PASSWORD;
30+
import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.QUERY_PORT;
31+
import static org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.USERNAME;
3232
import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_DESERIALIZE_ARROW_ASYNC;
3333
import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_DESERIALIZE_QUEUE_SIZE;
3434
import static org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS;

0 commit comments

Comments
 (0)