Skip to content

Commit 62af352

Browse files
committed
[Bugfix][Elasticsearch] leak jobmanager memory leak (#9160)
1 parent c1adbf6 commit 62af352

File tree

2 files changed

+144
-8
lines changed

2 files changed

+144
-8
lines changed

Diff for: seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java

+38-8
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,19 @@
2020
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
2121
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
2222
import org.apache.seatunnel.common.exception.CommonErrorCode;
23-
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
23+
import org.apache.seatunnel.common.utils.JsonUtils;
24+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions;
2425
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchConfig;
2526
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount;
2627
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException;
28+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.util.HttpClientUtil;
29+
30+
import org.apache.http.HttpEntity;
31+
import org.apache.http.HttpStatus;
32+
import org.apache.http.client.methods.CloseableHttpResponse;
33+
import org.apache.http.client.methods.HttpGet;
34+
import org.apache.http.impl.client.CloseableHttpClient;
35+
import org.apache.http.util.EntityUtils;
2736

2837
import lombok.extern.slf4j.Slf4j;
2938

@@ -46,8 +55,6 @@ public class ElasticsearchSourceSplitEnumerator
4655

4756
private final ReadonlyConfig connConfig;
4857

49-
private EsRestClient esRestClient;
50-
5158
private final Object stateLock = new Object();
5259

5360
private Map<Integer, List<ElasticsearchSourceSplit>> pendingSplit;
@@ -80,9 +87,7 @@ public ElasticsearchSourceSplitEnumerator(
8087
}
8188

8289
@Override
83-
public void open() {
84-
esRestClient = EsRestClient.createInstance(connConfig);
85-
}
90+
public void open() {}
8691

8792
@Override
8893
public void run() {
@@ -142,7 +147,7 @@ private List<ElasticsearchSourceSplit> getElasticsearchSplit() {
142147
for (ElasticsearchConfig elasticsearchConfig : elasticsearchConfigs) {
143148

144149
String index = elasticsearchConfig.getIndex();
145-
List<IndexDocsCount> indexDocsCounts = esRestClient.getIndexDocsCount(index);
150+
List<IndexDocsCount> indexDocsCounts = getIndexDocsCount(index);
146151
indexDocsCounts =
147152
indexDocsCounts.stream()
148153
.filter(x -> x.getDocsCount() != null && x.getDocsCount() > 0)
@@ -161,7 +166,6 @@ private List<ElasticsearchSourceSplit> getElasticsearchSplit() {
161166

162167
@Override
163168
public void close() throws IOException {
164-
esRestClient.close();
165169
}
166170

167171
@Override
@@ -201,4 +205,30 @@ public ElasticsearchSourceState snapshotState(long checkpointId) throws Exceptio
201205

202206
@Override
203207
public void notifyCheckpointComplete(long checkpointId) throws Exception {}
208+
209+
public List<IndexDocsCount> getIndexDocsCount(String index) {
210+
List<String> hosts = connConfig.get(ElasticsearchBaseOptions.HOSTS);
211+
String endpoint =
212+
String.format(
213+
"%s/_cat/indices/%s?h=index,docsCount&format=json", hosts.get(0), index);
214+
HttpClientUtil httpClientUtil = new HttpClientUtil();
215+
CloseableHttpClient httpClient = httpClientUtil.getHttpClient(connConfig);
216+
HttpGet httpGet = new HttpGet(endpoint);
217+
try (CloseableHttpResponse response = httpClient.execute(httpGet)) {
218+
if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) {
219+
HttpEntity entity = response.getEntity();
220+
String entityStr = EntityUtils.toString(entity);
221+
return JsonUtils.toList(entityStr, IndexDocsCount.class);
222+
}
223+
} catch (IOException e) {
224+
throw new RuntimeException(e);
225+
} finally {
226+
try {
227+
httpClient.close();
228+
} catch (IOException e) {
229+
throw new RuntimeException(e);
230+
}
231+
}
232+
return Collections.emptyList();
233+
}
204234
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.elasticsearch.util;
19+
20+
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
21+
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions;
22+
23+
import org.apache.http.auth.AuthScope;
24+
import org.apache.http.auth.UsernamePasswordCredentials;
25+
import org.apache.http.client.CredentialsProvider;
26+
import org.apache.http.conn.ssl.NoopHostnameVerifier;
27+
import org.apache.http.conn.ssl.TrustAllStrategy;
28+
import org.apache.http.impl.client.BasicCredentialsProvider;
29+
import org.apache.http.impl.client.CloseableHttpClient;
30+
import org.apache.http.impl.client.HttpClientBuilder;
31+
import org.apache.http.impl.client.HttpClients;
32+
import org.apache.http.ssl.SSLContexts;
33+
34+
import lombok.extern.slf4j.Slf4j;
35+
36+
import javax.net.ssl.SSLContext;
37+
38+
import java.io.IOException;
39+
import java.security.GeneralSecurityException;
40+
import java.util.Optional;
41+
42+
@Slf4j
43+
public class HttpClientUtil {
44+
45+
public CloseableHttpClient getHttpClient(ReadonlyConfig config) {
46+
// 1. Initialize credentials provider
47+
CredentialsProvider credsProvider = new BasicCredentialsProvider();
48+
Optional<String> username = config.getOptional(ElasticsearchBaseOptions.USERNAME);
49+
Optional<String> password = config.getOptional(ElasticsearchBaseOptions.PASSWORD);
50+
51+
// 2. Configure SSL
52+
boolean tlsVerifyCertificate = config.get(ElasticsearchBaseOptions.TLS_VERIFY_CERTIFICATE);
53+
boolean tlsVerifyHostnames = config.get(ElasticsearchBaseOptions.TLS_VERIFY_HOSTNAME);
54+
55+
SSLContext sslContext = buildSSLContext(config, tlsVerifyCertificate);
56+
57+
// 3. Set credentials if provided
58+
username.ifPresent(
59+
u ->
60+
password.ifPresent(
61+
p ->
62+
credsProvider.setCredentials(
63+
new AuthScope(
64+
AuthScope.ANY_HOST, AuthScope.ANY_PORT),
65+
new UsernamePasswordCredentials(u, p))));
66+
67+
// 4. Build HttpClient
68+
HttpClientBuilder httpClientBuilder =
69+
HttpClients.custom().setDefaultCredentialsProvider(credsProvider);
70+
71+
if (sslContext != null) {
72+
httpClientBuilder.setSSLContext(sslContext);
73+
}
74+
75+
if (!tlsVerifyHostnames) {
76+
httpClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
77+
}
78+
79+
return httpClientBuilder.build();
80+
}
81+
82+
private SSLContext buildSSLContext(ReadonlyConfig config, boolean tlsVerifyCertificate) {
83+
try {
84+
if (tlsVerifyCertificate) {
85+
Optional<String> keystorePath =
86+
config.getOptional(ElasticsearchBaseOptions.TLS_KEY_STORE_PATH);
87+
Optional<String> keystorePassword =
88+
config.getOptional(ElasticsearchBaseOptions.TLS_KEY_STORE_PASSWORD);
89+
Optional<String> truststorePath =
90+
config.getOptional(ElasticsearchBaseOptions.TLS_TRUST_STORE_PATH);
91+
Optional<String> truststorePassword =
92+
config.getOptional(ElasticsearchBaseOptions.TLS_TRUST_STORE_PASSWORD);
93+
94+
return SSLUtils.buildSSLContext(
95+
keystorePath, keystorePassword, truststorePath, truststorePassword)
96+
.orElseThrow(() -> new RuntimeException("Failed to build SSLContext"));
97+
} else {
98+
log.warn(
99+
"TLS certificate verification is disabled. This is not secure for production environments.");
100+
return SSLContexts.custom().loadTrustMaterial(new TrustAllStrategy()).build();
101+
}
102+
} catch (GeneralSecurityException | IOException e) {
103+
throw new RuntimeException("Failed to initialize SSLContext", e);
104+
}
105+
}
106+
}

0 commit comments

Comments
 (0)