Skip to content

Commit 1bb296f

Browse files
committed
[Feature][connector-elasticsearch] elasticsearch source support PIT
1 parent 25b647f commit 1bb296f

File tree

9 files changed

+111
-55
lines changed

9 files changed

+111
-55
lines changed

Diff for: docs/en/connector-v2/source/Elasticsearch.md

+19-12
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,9 @@ support version >= 2.x and <= 8.x.
3030
| index_list | array | no | used to define a multiple table task |
3131
| source | array | no | - |
3232
| query | json | no | {"match_all": {}} |
33-
| search_type | json | no | Search method,SQL、PIT、SCROLL,default SCROLL |
34-
| sql_query | json | no | sql query |
33+
| search_type | enum | no | Query type, SQL or DSL, default DSL |
34+
| search_api_type | enum | no | Pagination API type, SCROLL or PIT, default SCROLL |
35+
| sql_query | json | no | SQL query, required when search_type is SQL |
3536
| scroll_time | string | no | 1m |
3637
| scroll_size | int | no | 100 |
3738
| tls_verify_certificate | boolean | no | true |
@@ -116,9 +117,14 @@ The path to PEM or JKS trust store. This file must be readable by the operating
116117
The key password for the trust store specified
117118

118119
### search_type
119-
PIT: Whether to use the Point in Time (PIT) API instead of the scroll API
120-
SQL: Use SQL query
121-
SCROLL: Use the scroll API by default
120+
Query type, available values:
121+
- DSL: Use Domain Specific Language query (default)
122+
- SQL: Use SQL query
123+
124+
### search_api_type
125+
Pagination API type, available values:
126+
- SCROLL: Use Scroll API for pagination (default)
127+
- PIT: Use Point in Time (PIT) API for pagination
122128

123129
### pit_keep_alive [long]
124130
The amount of time (in milliseconds) for which the PIT should be keep alive
@@ -190,7 +196,7 @@ source {
190196
c_date2,
191197
c_null
192198
]
193-
199+
194200
}
195201
196202
]
@@ -227,7 +233,7 @@ source {
227233
hosts = ["https://localhost:9200"]
228234
username = "elastic"
229235
password = "elasticsearch"
230-
236+
231237
tls_verify_certificate = false
232238
}
233239
}
@@ -241,7 +247,7 @@ source {
241247
hosts = ["https://localhost:9200"]
242248
username = "elastic"
243249
password = "elasticsearch"
244-
250+
245251
tls_verify_hostname = false
246252
}
247253
}
@@ -255,7 +261,7 @@ source {
255261
hosts = ["https://localhost:9200"]
256262
username = "elastic"
257263
password = "elasticsearch"
258-
264+
259265
tls_keystore_path = "${your elasticsearch home}/config/certs/http.p12"
260266
tls_keystore_password = "${your password}"
261267
}
@@ -291,9 +297,10 @@ source {
291297
292298
index = "st_index"
293299
query = {"range": {"c_int": {"gte": 10, "lte": 20}}}
294-
295-
# Enable PIT API
296-
use_pit = true
300+
301+
# Use DSL query with PIT API
302+
search_type = DSL
303+
search_api_type = PIT
297304
pit_keep_alive = 60000 # 1 minute in milliseconds
298305
pit_batch_size = 100
299306
```

Diff for: docs/zh/connector-v2/source/Elasticsearch.md

+19-12
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ import ChangeLog from '../changelog/connector-elasticsearch.md';
2828
| index_list | array | no | 用来定义多索引同步任务 |
2929
| source | array | no | - |
3030
| query | json | no | {"match_all": {}} |
31-
| search_type | json | no | 查询方式,SQL、PIT、SCROLL,默认 SCROLL |
32-
| sql_query | json | no | sql 查询语句 |
31+
| search_type | enum | no | 查询类型,SQL 或 DSL,默认 DSL |
32+
| search_api_type | enum | no | 分页 API 类型,SCROLL 或 PIT,默认 SCROLL |
33+
| sql_query | json | no | SQL 查询语句,当 search_type 为 SQL 时必须 |
3334
| scroll_time | string | no | 1m |
3435
| scroll_size | int | no | 100 |
3536
| tls_verify_certificate | boolean | no | true |
@@ -118,9 +119,14 @@ PEM 或 JKS 信任库的路径。该文件必须对运行 SeaTunnel 的操作系
118119
指定信任库的密钥密码。
119120

120121
### search_type
121-
PIT:是否使用时间点 (PIT) API 代替滚动 API
122-
SQL:使用sql 方式查询
123-
SCROLL:默认使用scroll API
122+
查询类型,可选值:
123+
- DSL: 使用 Domain Specific Language 查询(默认)
124+
- SQL: 使用 SQL 查询
125+
126+
### search_api_type
127+
分页 API 类型,可选值:
128+
- SCROLL: 使用 Scroll API 进行分页(默认)
129+
- PIT: 使用 Point in Time (PIT) API 进行分页
124130

125131
### pit_keep_alive [long]
126132
PIT 应保持活动的时间量(以毫秒为单位)
@@ -193,7 +199,7 @@ source {
193199
c_date2,
194200
c_null
195201
]
196-
202+
197203
}
198204
199205
]
@@ -228,7 +234,7 @@ source {
228234
hosts = ["https://localhost:9200"]
229235
username = "elastic"
230236
password = "elasticsearch"
231-
237+
232238
tls_verify_certificate = false
233239
}
234240
}
@@ -242,7 +248,7 @@ source {
242248
hosts = ["https://localhost:9200"]
243249
username = "elastic"
244250
password = "elasticsearch"
245-
251+
246252
tls_verify_hostname = false
247253
}
248254
}
@@ -256,7 +262,7 @@ source {
256262
hosts = ["https://localhost:9200"]
257263
username = "elastic"
258264
password = "elasticsearch"
259-
265+
260266
tls_keystore_path = "${your elasticsearch home}/config/certs/http.p12"
261267
tls_keystore_password = "${your password}"
262268
}
@@ -292,9 +298,10 @@ source {
292298
293299
index = "st_index"
294300
query = {"range": {"c_int": {"gte": 10, "lte": 20}}}
295-
296-
# Enable PIT API
297-
search_type = PIT
301+
302+
# 使用 DSL 查询和 PIT API
303+
search_type = DSL
304+
search_api_type = PIT
298305
pit_keep_alive = 60000 # 1 minute in milliseconds
299306
pit_batch_size = 100
300307
```

Diff for: seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchConfig.java

+2
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class ElasticsearchConfig implements Serializable {
3838
private String scrollTime;
3939
private int scrollSize;
4040
private SearchTypeEnum searchType;
41+
private SearchApiTypeEnum searchApiType;
4142
private String sqlQuery;
4243

4344
private long pitKeepAlive;
@@ -56,6 +57,7 @@ public ElasticsearchConfig clone() {
5657
elasticsearchConfig.setScrollSize(scrollSize);
5758
elasticsearchConfig.setCatalogTable(catalogTable);
5859
elasticsearchConfig.setSearchType(searchType);
60+
elasticsearchConfig.setSearchApiType(searchApiType);
5961
elasticsearchConfig.setSqlQuery(sqlQuery);
6062
elasticsearchConfig.setPitKeepAlive(pitKeepAlive);
6163
elasticsearchConfig.setPitBatchSize(pitBatchSize);

Diff for: seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/ElasticsearchSourceOptions.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,14 @@ public class ElasticsearchSourceOptions extends ElasticsearchBaseOptions {
6565
public static final Option<SearchTypeEnum> SEARCH_TYPE =
6666
Options.key("search_type")
6767
.enumType(SearchTypeEnum.class)
68-
.defaultValue(SearchTypeEnum.SCROLL)
69-
.withDescription("Choose dsl syntax or x-pack sql.");
68+
.defaultValue(SearchTypeEnum.DSL)
69+
.withDescription("Choose query type: DSL (Domain Specific Language) or SQL.");
70+
71+
public static final Option<SearchApiTypeEnum> SEARCH_API_TYPE =
72+
Options.key("search_api_type")
73+
.enumType(SearchApiTypeEnum.class)
74+
.defaultValue(SearchApiTypeEnum.SCROLL)
75+
.withDescription("Choose API type for pagination: SCROLL or PIT (Point in Time).");
7076

7177
public static final Option<String> SQL_QUERY =
7278
Options.key("sql_query")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
19+
20+
public enum SearchApiTypeEnum {
21+
/** Use Scroll API for pagination */
22+
SCROLL,
23+
24+
/** Use Point-in-Time (PIT) API for pagination */
25+
PIT
26+
}

Diff for: seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/config/SearchTypeEnum.java

+3-6
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,9 @@
1818
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.config;
1919

2020
public enum SearchTypeEnum {
21-
/** Use Domain Specific Language (DSL) query with Scroll API */
22-
SCROLL,
21+
/** Use Domain Specific Language (DSL) query */
22+
DSL,
2323

2424
/** Use SQL query */
25-
SQL,
26-
27-
/** Use Point-in-Time (PIT) API with DSL query */
28-
PIT,
25+
SQL
2926
}

Diff for: seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSource.java

+4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;
1919

20+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SearchApiTypeEnum;
2021
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
2122

2223
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
@@ -56,6 +57,7 @@
5657
import java.util.Map;
5758
import java.util.stream.Collectors;
5859

60+
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.SEARCH_API_TYPE;
5961
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.SEARCH_TYPE;
6062
import static org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchSourceOptions.SQL_QUERY;
6163

@@ -175,6 +177,7 @@ private ElasticsearchConfig parseOneIndexQueryConfig(ReadonlyConfig readonlyConf
175177
"");
176178
}
177179
SearchTypeEnum searchType = readonlyConfig.get(SEARCH_TYPE);
180+
SearchApiTypeEnum searchApiType = readonlyConfig.get(SEARCH_API_TYPE);
178181
String sqlQuery = readonlyConfig.get(ElasticsearchSourceOptions.SQL_QUERY);
179182
String scrollTime = readonlyConfig.get(ElasticsearchSourceOptions.SCROLL_TIME);
180183
int scrollSize = readonlyConfig.get(ElasticsearchSourceOptions.SCROLL_SIZE);
@@ -192,6 +195,7 @@ private ElasticsearchConfig parseOneIndexQueryConfig(ReadonlyConfig readonlyConf
192195
elasticsearchConfig.setCatalogTable(catalogTable);
193196
elasticsearchConfig.setSqlQuery(sqlQuery);
194197
elasticsearchConfig.setSearchType(searchType);
198+
elasticsearchConfig.setSearchApiType(searchApiType);
195199

196200
elasticsearchConfig.setPitKeepAlive(pitKeepAlive);
197201
elasticsearchConfig.setPitBatchSize(pitBatchSize);

Diff for: seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceReader.java

+25-19
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2525
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
2626
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchConfig;
27+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SearchApiTypeEnum;
2728
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SearchTypeEnum;
2829
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.PointInTimeResult;
2930
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
@@ -98,13 +99,9 @@ private void scrollSearchResult(
9899
SeaTunnelRowDeserializer deserializer =
99100
new DefaultSeaTunnelRowDeserializer(seaTunnelRowType);
100101

101-
// Check if we should use PIT API
102-
if (SearchTypeEnum.PIT.equals(sourceIndexInfo.getSearchType())) {
103-
log.info("Using Point-in-Time (PIT) API for index: {}", sourceIndexInfo.getIndex());
104-
searchWithPointInTime(sourceIndexInfo, output, deserializer);
105-
}
106102
// SQL client
107-
else if (SearchTypeEnum.SQL.equals(sourceIndexInfo.getSearchType())) {
103+
if (SearchTypeEnum.SQL.equals(sourceIndexInfo.getSearchType())) {
104+
log.info("Using SQL query for index: {}", sourceIndexInfo.getIndex());
108105
ScrollResult scrollResult =
109106
esRestClient.searchBySql(
110107
sourceIndexInfo.getSqlQuery(), sourceIndexInfo.getScrollSize());
@@ -117,21 +114,30 @@ else if (SearchTypeEnum.SQL.equals(sourceIndexInfo.getSearchType())) {
117114
outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer);
118115
}
119116
}
120-
// Default scroll API
117+
// DSL query
121118
else {
122-
ScrollResult scrollResult =
123-
esRestClient.searchByScroll(
124-
sourceIndexInfo.getIndex(),
125-
sourceIndexInfo.getSource(),
126-
sourceIndexInfo.getQuery(),
127-
sourceIndexInfo.getScrollTime(),
128-
sourceIndexInfo.getScrollSize());
129-
outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer);
130-
while (scrollResult.getDocs() != null && !scrollResult.getDocs().isEmpty()) {
131-
scrollResult =
132-
esRestClient.searchWithScrollId(
133-
scrollResult.getScrollId(), sourceIndexInfo.getScrollTime());
119+
// Check if we should use PIT API
120+
if (SearchApiTypeEnum.PIT.equals(sourceIndexInfo.getSearchApiType())) {
121+
log.info("Using Point-in-Time (PIT) API for index: {}", sourceIndexInfo.getIndex());
122+
searchWithPointInTime(sourceIndexInfo, output, deserializer);
123+
}
124+
// Default scroll API
125+
else {
126+
log.info("Using Scroll API for index: {}", sourceIndexInfo.getIndex());
127+
ScrollResult scrollResult =
128+
esRestClient.searchByScroll(
129+
sourceIndexInfo.getIndex(),
130+
sourceIndexInfo.getSource(),
131+
sourceIndexInfo.getQuery(),
132+
sourceIndexInfo.getScrollTime(),
133+
sourceIndexInfo.getScrollSize());
134134
outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer);
135+
while (scrollResult.getDocs() != null && !scrollResult.getDocs().isEmpty()) {
136+
scrollResult =
137+
esRestClient.searchWithScrollId(
138+
scrollResult.getScrollId(), sourceIndexInfo.getScrollTime());
139+
outputFromScrollResult(scrollResult, sourceIndexInfo, output, deserializer);
140+
}
135141
}
136142
}
137143
}

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-elasticsearch-e2e/src/test/resources/elasticsearch/elasticsearch_source_with_pit.conf

+5-4
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,13 @@ source {
3434

3535
index = "st_index"
3636
query = {"range": {"c_int": {"gte": 10, "lte": 20}}}
37-
38-
# Enable PIT API
39-
search_type = "PIT"
37+
38+
# Use DSL query with PIT API
39+
search_type = "DSL"
40+
search_api_type = "PIT"
4041
pit_keep_alive = 60000 # 1 minute in milliseconds
4142
pit_batch_size = 100
42-
43+
4344
schema = {
4445
fields {
4546
c_map = "map<string, tinyint>"

0 commit comments

Comments
 (0)