Skip to content

Commit d307ab4

Browse files
authored
[improve] add Elasticsearch options (#8623)
1 parent a69efca commit d307ab4

File tree

15 files changed

+177
-148
lines changed

15 files changed

+177
-148
lines changed

Diff for: seatunnel-ci-tools/src/test/java/org/apache/seatunnel/api/ConnectorOptionCheckTest.java

-2
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,6 @@ public void checkConnectorOptionExist() {
169169

170170
private Set<String> buildWhiteList() {
171171
Set<String> whiteList = new HashSet<>();
172-
whiteList.add("ElasticsearchSourceOptions");
173172
whiteList.add("JdbcSinkOptions");
174173
whiteList.add("TypesenseSourceOptions");
175174
whiteList.add("RabbitmqSourceOptions");
@@ -202,7 +201,6 @@ private Set<String> buildWhiteList() {
202201
whiteList.add("SentrySinkOptions");
203202
whiteList.add("EasysearchSinkOptions");
204203
whiteList.add("QdrantSinkOptions");
205-
whiteList.add("ElasticsearchSinkOptions");
206204
whiteList.add("MilvusSourceOptions");
207205
whiteList.add("RocketMqSinkOptions");
208206
whiteList.add("ClickhouseFileSinkOptions");

Diff for: seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/client/EsRestClient.java

+10-10
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2727
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
2828
import org.apache.seatunnel.common.utils.JsonUtils;
29-
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig;
29+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions;
3030
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
3131
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.ElasticsearchClusterInfo;
3232
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount;
@@ -92,23 +92,23 @@ private EsRestClient(RestClient restClient) {
9292
}
9393

9494
public static EsRestClient createInstance(ReadonlyConfig config) {
95-
List<String> hosts = config.get(EsClusterConnectionConfig.HOSTS);
96-
Optional<String> username = config.getOptional(EsClusterConnectionConfig.USERNAME);
97-
Optional<String> password = config.getOptional(EsClusterConnectionConfig.PASSWORD);
95+
List<String> hosts = config.get(ElasticsearchBaseOptions.HOSTS);
96+
Optional<String> username = config.getOptional(ElasticsearchBaseOptions.USERNAME);
97+
Optional<String> password = config.getOptional(ElasticsearchBaseOptions.PASSWORD);
9898
Optional<String> keystorePath = Optional.empty();
9999
Optional<String> keystorePassword = Optional.empty();
100100
Optional<String> truststorePath = Optional.empty();
101101
Optional<String> truststorePassword = Optional.empty();
102-
boolean tlsVerifyCertificate = config.get(EsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE);
102+
boolean tlsVerifyCertificate = config.get(ElasticsearchBaseOptions.TLS_VERIFY_CERTIFICATE);
103103
if (tlsVerifyCertificate) {
104-
keystorePath = config.getOptional(EsClusterConnectionConfig.TLS_KEY_STORE_PATH);
105-
keystorePassword = config.getOptional(EsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD);
106-
truststorePath = config.getOptional(EsClusterConnectionConfig.TLS_TRUST_STORE_PATH);
104+
keystorePath = config.getOptional(ElasticsearchBaseOptions.TLS_KEY_STORE_PATH);
105+
keystorePassword = config.getOptional(ElasticsearchBaseOptions.TLS_KEY_STORE_PASSWORD);
106+
truststorePath = config.getOptional(ElasticsearchBaseOptions.TLS_TRUST_STORE_PATH);
107107
truststorePassword =
108-
config.getOptional(EsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD);
108+
config.getOptional(ElasticsearchBaseOptions.TLS_TRUST_STORE_PASSWORD);
109109
}
110110

111-
boolean tlsVerifyHostnames = config.get(EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME);
111+
boolean tlsVerifyHostnames = config.get(ElasticsearchBaseOptions.TLS_VERIFY_HOSTNAME);
112112
return createInstance(
113113
hosts,
114114
username,
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@
2020
import org.apache.seatunnel.api.configuration.Option;
2121
import org.apache.seatunnel.api.configuration.Options;
2222

23+
import java.io.Serializable;
2324
import java.util.List;
2425

25-
public class EsClusterConnectionConfig {
26+
public class ElasticsearchBaseOptions implements Serializable {
2627

2728
public static final Option<List<String>> HOSTS =
2829
Options.key("hosts")
@@ -37,6 +38,12 @@ public class EsClusterConnectionConfig {
3738
.noDefaultValue()
3839
.withDescription("x-pack username");
3940

41+
public static final Option<String> INDEX =
42+
Options.key("index")
43+
.stringType()
44+
.noDefaultValue()
45+
.withDescription(
46+
"Elasticsearch index name.Index support contains variables of field name,such as seatunnel_${age},and the field must appear at seatunnel row. If not, we will treat it as a normal index");
4047
public static final Option<String> PASSWORD =
4148
Options.key("password")
4249
.stringType()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
19+
20+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
21+
22+
import lombok.Getter;
23+
import lombok.Setter;
24+
25+
import java.io.Serializable;
26+
import java.util.ArrayList;
27+
import java.util.HashMap;
28+
import java.util.List;
29+
import java.util.Map;
30+
31+
@Getter
32+
@Setter
33+
public class ElasticsearchConfig implements Serializable {
34+
35+
private String index;
36+
private List<String> source;
37+
private Map<String, Object> query;
38+
private String scrollTime;
39+
private int scrollSize;
40+
41+
private CatalogTable catalogTable;
42+
43+
public ElasticsearchConfig clone() {
44+
ElasticsearchConfig elasticsearchConfig = new ElasticsearchConfig();
45+
elasticsearchConfig.setIndex(index);
46+
elasticsearchConfig.setSource(new ArrayList<>(source));
47+
elasticsearchConfig.setQuery(new HashMap<>(query));
48+
elasticsearchConfig.setScrollTime(scrollTime);
49+
elasticsearchConfig.setScrollSize(scrollSize);
50+
elasticsearchConfig.setCatalogTable(catalogTable);
51+
return elasticsearchConfig;
52+
}
53+
}

Diff for: seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SinkConfig.java renamed to seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSinkOptions.java

+1-8
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,7 @@
2929
import static org.apache.seatunnel.api.sink.DataSaveMode.DROP_DATA;
3030
import static org.apache.seatunnel.api.sink.DataSaveMode.ERROR_WHEN_DATA_EXISTS;
3131

32-
public class SinkConfig {
33-
34-
public static final Option<String> INDEX =
35-
Options.key("index")
36-
.stringType()
37-
.noDefaultValue()
38-
.withDescription(
39-
"Elasticsearch index name.Index support contains variables of field name,such as seatunnel_${age},and the field must appear at seatunnel row. If not, we will treat it as a normal index");
32+
public class ElasticsearchSinkOptions extends ElasticsearchBaseOptions {
4033

4134
public static final Option<String> INDEX_TYPE =
4235
Options.key("index_type")

Diff for: seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SourceConfig.java renamed to seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSourceOptions.java

+1-29
Original file line numberDiff line numberDiff line change
@@ -21,34 +21,25 @@
2121

2222
import org.apache.seatunnel.api.configuration.Option;
2323
import org.apache.seatunnel.api.configuration.Options;
24-
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2524

2625
import lombok.Getter;
2726
import lombok.Setter;
2827

29-
import java.io.Serializable;
30-
import java.util.ArrayList;
3128
import java.util.Collections;
3229
import java.util.HashMap;
3330
import java.util.List;
3431
import java.util.Map;
3532

3633
@Getter
3734
@Setter
38-
public class SourceConfig implements Serializable {
35+
public class ElasticsearchSourceOptions extends ElasticsearchBaseOptions {
3936

4037
public static final Option<List<Map<String, Object>>> INDEX_LIST =
4138
Options.key("index_list")
4239
.type(new TypeReference<List<Map<String, Object>>>() {})
4340
.noDefaultValue()
4441
.withDescription("index_list for multiTable sync");
4542

46-
public static final Option<String> INDEX =
47-
Options.key("index")
48-
.stringType()
49-
.noDefaultValue()
50-
.withDescription("Elasticsearch index name, support * fuzzy matching");
51-
5243
public static final Option<List<String>> SOURCE =
5344
Options.key("source")
5445
.listType()
@@ -84,23 +75,4 @@ public class SourceConfig implements Serializable {
8475
Collections.singletonMap("match_all", new HashMap<String, String>()))
8576
.withDescription(
8677
"Elasticsearch query language. You can control the range of data read");
87-
88-
private String index;
89-
private List<String> source;
90-
private Map<String, Object> query;
91-
private String scrollTime;
92-
private int scrollSize;
93-
94-
private CatalogTable catalogTable;
95-
96-
public SourceConfig clone() {
97-
SourceConfig sourceConfig = new SourceConfig();
98-
sourceConfig.setIndex(index);
99-
sourceConfig.setSource(new ArrayList<>(source));
100-
sourceConfig.setQuery(new HashMap<>(query));
101-
sourceConfig.setScrollTime(scrollTime);
102-
sourceConfig.setScrollSize(scrollSize);
103-
sourceConfig.setCatalogTable(catalogTable);
104-
return sourceConfig;
105-
}
10678
}

Diff for: seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/dto/IndexInfo.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto;
1919

2020
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21-
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
21+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions;
2222

2323
import lombok.Data;
2424

@@ -33,10 +33,10 @@ public class IndexInfo {
3333

3434
public IndexInfo(String index, ReadonlyConfig config) {
3535
this.index = index;
36-
type = config.get(SinkConfig.INDEX_TYPE);
37-
if (config.getOptional(SinkConfig.PRIMARY_KEYS).isPresent()) {
38-
primaryKeys = config.get(SinkConfig.PRIMARY_KEYS).toArray(new String[0]);
36+
type = config.get(ElasticsearchSinkOptions.INDEX_TYPE);
37+
if (config.getOptional(ElasticsearchSinkOptions.PRIMARY_KEYS).isPresent()) {
38+
primaryKeys = config.get(ElasticsearchSinkOptions.PRIMARY_KEYS).toArray(new String[0]);
3939
}
40-
keyDelimiter = config.get(SinkConfig.KEY_DELIMITER);
40+
keyDelimiter = config.get(ElasticsearchSinkOptions.KEY_DELIMITER);
4141
}
4242
}

Diff for: seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSink.java

+5-5
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.apache.seatunnel.api.table.factory.CatalogFactory;
3434
import org.apache.seatunnel.api.table.schema.SchemaChangeType;
3535
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
36-
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
36+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions;
3737
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchAggregatedCommitInfo;
3838
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchCommitInfo;
3939
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.state.ElasticsearchSinkState;
@@ -43,8 +43,8 @@
4343
import java.util.Optional;
4444

4545
import static org.apache.seatunnel.api.table.factory.FactoryUtil.discoverFactory;
46-
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_BATCH_SIZE;
47-
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_RETRY_COUNT;
46+
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions.MAX_BATCH_SIZE;
47+
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions.MAX_RETRY_COUNT;
4848

4949
public class ElasticsearchSink
5050
implements SeaTunnelSink<
@@ -92,8 +92,8 @@ public Optional<SaveModeHandler> getSaveModeHandler() {
9292
return Optional.empty();
9393
}
9494
Catalog catalog = catalogFactory.createCatalog(catalogFactory.factoryIdentifier(), config);
95-
SchemaSaveMode schemaSaveMode = config.get(SinkConfig.SCHEMA_SAVE_MODE);
96-
DataSaveMode dataSaveMode = config.get(SinkConfig.DATA_SAVE_MODE);
95+
SchemaSaveMode schemaSaveMode = config.get(ElasticsearchSinkOptions.SCHEMA_SAVE_MODE);
96+
DataSaveMode dataSaveMode = config.get(ElasticsearchSinkOptions.DATA_SAVE_MODE);
9797

9898
TablePath tablePath = TablePath.of("", catalogTable.getTableId().getTableName());
9999
return Optional.of(

Diff for: seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/sink/ElasticsearchSinkFactory.java

+21-17
Original file line numberDiff line numberDiff line change
@@ -26,25 +26,25 @@
2626
import org.apache.seatunnel.api.table.factory.Factory;
2727
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
2828
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
29-
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig;
29+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions;
3030

3131
import com.google.auto.service.AutoService;
3232

33-
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.HOSTS;
34-
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.PASSWORD;
35-
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_KEY_STORE_PASSWORD;
36-
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_KEY_STORE_PATH;
37-
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_TRUST_STORE_PASSWORD;
38-
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_TRUST_STORE_PATH;
39-
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_VERIFY_CERTIFICATE;
40-
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.TLS_VERIFY_HOSTNAME;
41-
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.EsClusterConnectionConfig.USERNAME;
42-
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.INDEX;
43-
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.INDEX_TYPE;
44-
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.KEY_DELIMITER;
45-
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_BATCH_SIZE;
46-
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.MAX_RETRY_COUNT;
47-
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SinkConfig.PRIMARY_KEYS;
33+
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.HOSTS;
34+
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.PASSWORD;
35+
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_KEY_STORE_PASSWORD;
36+
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_KEY_STORE_PATH;
37+
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_TRUST_STORE_PASSWORD;
38+
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_TRUST_STORE_PATH;
39+
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_VERIFY_CERTIFICATE;
40+
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.TLS_VERIFY_HOSTNAME;
41+
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions.USERNAME;
42+
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions.INDEX;
43+
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions.INDEX_TYPE;
44+
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions.KEY_DELIMITER;
45+
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions.MAX_BATCH_SIZE;
46+
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions.MAX_RETRY_COUNT;
47+
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSinkOptions.PRIMARY_KEYS;
4848

4949
@AutoService(Factory.class)
5050
public class ElasticsearchSinkFactory implements TableSinkFactory {
@@ -56,7 +56,11 @@ public String factoryIdentifier() {
5656
@Override
5757
public OptionRule optionRule() {
5858
return OptionRule.builder()
59-
.required(HOSTS, INDEX, SinkConfig.SCHEMA_SAVE_MODE, SinkConfig.DATA_SAVE_MODE)
59+
.required(
60+
HOSTS,
61+
INDEX,
62+
ElasticsearchSinkOptions.SCHEMA_SAVE_MODE,
63+
ElasticsearchSinkOptions.DATA_SAVE_MODE)
6064
.optional(
6165
INDEX_TYPE,
6266
PRIMARY_KEYS,

0 commit comments

Comments
 (0)