From 62af352a4c91fc8cad667e9f442295ffc8f30865 Mon Sep 17 00:00:00 2001 From: wending <1062698930@qq.com> Date: Sat, 12 Apr 2025 22:02:59 +0800 Subject: [PATCH 1/9] [Bugfix][Elasticsearch] leak jobmanager memory leak (#9160) --- .../ElasticsearchSourceSplitEnumerator.java | 46 ++++++-- .../elasticsearch/util/HttpClientUtil.java | 106 ++++++++++++++++++ 2 files changed, 144 insertions(+), 8 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/util/HttpClientUtil.java diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java index d501c649a81..f32b7a7cfd2 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java @@ -20,10 +20,19 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.source.SourceSplitEnumerator; import org.apache.seatunnel.common.exception.CommonErrorCode; -import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient; +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchConfig; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.util.HttpClientUtil; + +import org.apache.http.HttpEntity; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.util.EntityUtils; import lombok.extern.slf4j.Slf4j; @@ -46,8 +55,6 @@ public class ElasticsearchSourceSplitEnumerator private final ReadonlyConfig connConfig; - private EsRestClient esRestClient; - private final Object stateLock = new Object(); private Map> pendingSplit; @@ -80,9 +87,7 @@ public ElasticsearchSourceSplitEnumerator( } @Override - public void open() { - esRestClient = EsRestClient.createInstance(connConfig); - } + public void open() {} @Override public void run() { @@ -142,7 +147,7 @@ private List getElasticsearchSplit() { for (ElasticsearchConfig elasticsearchConfig : elasticsearchConfigs) { String index = elasticsearchConfig.getIndex(); - List indexDocsCounts = esRestClient.getIndexDocsCount(index); + List indexDocsCounts = getIndexDocsCount(index); indexDocsCounts = indexDocsCounts.stream() .filter(x -> x.getDocsCount() != null && x.getDocsCount() > 0) @@ -161,7 +166,6 @@ private List getElasticsearchSplit() { @Override public void close() throws IOException { - esRestClient.close(); } @Override @@ -201,4 +205,30 @@ public ElasticsearchSourceState snapshotState(long checkpointId) throws Exceptio @Override public void notifyCheckpointComplete(long checkpointId) throws Exception {} + + public List getIndexDocsCount(String index) { + List hosts = connConfig.get(ElasticsearchBaseOptions.HOSTS); + String endpoint = + String.format( + "%s/_cat/indices/%s?h=index,docsCount&format=json", hosts.get(0), index); + HttpClientUtil httpClientUtil = new HttpClientUtil(); + CloseableHttpClient httpClient = httpClientUtil.getHttpClient(connConfig); + HttpGet httpGet = new HttpGet(endpoint); + try (CloseableHttpResponse response = httpClient.execute(httpGet)) { + if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { + HttpEntity entity = response.getEntity(); + String entityStr = EntityUtils.toString(entity); + return JsonUtils.toList(entityStr, IndexDocsCount.class); + } + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + try { + httpClient.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return Collections.emptyList(); + } } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/util/HttpClientUtil.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/util/HttpClientUtil.java new file mode 100644 index 00000000000..201eaeecbeb --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/util/HttpClientUtil.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.util; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions; + +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.conn.ssl.TrustAllStrategy; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.ssl.SSLContexts; + +import lombok.extern.slf4j.Slf4j; + +import javax.net.ssl.SSLContext; + +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.util.Optional; + +@Slf4j +public class HttpClientUtil { + + public CloseableHttpClient getHttpClient(ReadonlyConfig config) { + // 1. Initialize credentials provider + CredentialsProvider credsProvider = new BasicCredentialsProvider(); + Optional username = config.getOptional(ElasticsearchBaseOptions.USERNAME); + Optional password = config.getOptional(ElasticsearchBaseOptions.PASSWORD); + + // 2. Configure SSL + boolean tlsVerifyCertificate = config.get(ElasticsearchBaseOptions.TLS_VERIFY_CERTIFICATE); + boolean tlsVerifyHostnames = config.get(ElasticsearchBaseOptions.TLS_VERIFY_HOSTNAME); + + SSLContext sslContext = buildSSLContext(config, tlsVerifyCertificate); + + // 3. Set credentials if provided + username.ifPresent( + u -> + password.ifPresent( + p -> + credsProvider.setCredentials( + new AuthScope( + AuthScope.ANY_HOST, AuthScope.ANY_PORT), + new UsernamePasswordCredentials(u, p)))); + + // 4. Build HttpClient + HttpClientBuilder httpClientBuilder = + HttpClients.custom().setDefaultCredentialsProvider(credsProvider); + + if (sslContext != null) { + httpClientBuilder.setSSLContext(sslContext); + } + + if (!tlsVerifyHostnames) { + httpClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE); + } + + return httpClientBuilder.build(); + } + + private SSLContext buildSSLContext(ReadonlyConfig config, boolean tlsVerifyCertificate) { + try { + if (tlsVerifyCertificate) { + Optional keystorePath = + config.getOptional(ElasticsearchBaseOptions.TLS_KEY_STORE_PATH); + Optional keystorePassword = + config.getOptional(ElasticsearchBaseOptions.TLS_KEY_STORE_PASSWORD); + Optional truststorePath = + config.getOptional(ElasticsearchBaseOptions.TLS_TRUST_STORE_PATH); + Optional truststorePassword = + config.getOptional(ElasticsearchBaseOptions.TLS_TRUST_STORE_PASSWORD); + + return SSLUtils.buildSSLContext( + keystorePath, keystorePassword, truststorePath, truststorePassword) + .orElseThrow(() -> new RuntimeException("Failed to build SSLContext")); + } else { + log.warn( + "TLS certificate verification is disabled. This is not secure for production environments."); + return SSLContexts.custom().loadTrustMaterial(new TrustAllStrategy()).build(); + } + } catch (GeneralSecurityException | IOException e) { + throw new RuntimeException("Failed to initialize SSLContext", e); + } + } +} From 4904ac97f6cbabcf597c26a3ca3ffa515cf739d0 Mon Sep 17 00:00:00 2001 From: wending <1062698930@qq.com> Date: Sat, 12 Apr 2025 22:42:45 +0800 Subject: [PATCH 2/9] Trigger CI From bdcd0bb2bff3cd9c78f571ec19292e135d64c743 Mon Sep 17 00:00:00 2001 From: wending <1062698930@qq.com> Date: Sat, 12 Apr 2025 23:16:39 +0800 Subject: [PATCH 3/9] fix these violation --- .../source/ElasticsearchSourceSplitEnumerator.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java index f32b7a7cfd2..d9848085800 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java @@ -165,8 +165,7 @@ private List getElasticsearchSplit() { } @Override - public void close() throws IOException { - } + public void close() throws IOException {} @Override public void addSplitsBack(List splits, int subtaskId) { From 4d3d7dfee3af46d94ebd6b1f0c10fae6b6aa9ba6 Mon Sep 17 00:00:00 2001 From: wending <1062698930@qq.com> Date: Sun, 13 Apr 2025 23:13:02 +0800 Subject: [PATCH 4/9] fix check es host --- .../source/ElasticsearchSourceSplitEnumerator.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java index d9848085800..e62244d6723 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java @@ -207,6 +207,9 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {} public List getIndexDocsCount(String index) { List hosts = connConfig.get(ElasticsearchBaseOptions.HOSTS); + if (hosts.isEmpty()) { + throw new IllegalArgumentException("hosts must not be null nor empty"); + } String endpoint = String.format( "%s/_cat/indices/%s?h=index,docsCount&format=json", hosts.get(0), index); From 290019da6f339544105c90b7bd091a94617c6371 Mon Sep 17 00:00:00 2001 From: WenDing-Y <1062698930@qq.com> Date: Wed, 16 Apr 2025 15:09:46 +0800 Subject: [PATCH 5/9] Update seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java fix statusCode!=SC_OK Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- .../source/ElasticsearchSourceSplitEnumerator.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java index e62244d6723..317fe61baba 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java @@ -217,10 +217,16 @@ public List getIndexDocsCount(String index) { CloseableHttpClient httpClient = httpClientUtil.getHttpClient(connConfig); HttpGet httpGet = new HttpGet(endpoint); try (CloseableHttpResponse response = httpClient.execute(httpGet)) { - if (response.getStatusLine().getStatusCode() == HttpStatus.SC_OK) { + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode == HttpStatus.SC_OK) { HttpEntity entity = response.getEntity(); String entityStr = EntityUtils.toString(entity); return JsonUtils.toList(entityStr, IndexDocsCount.class); + } else { + log.warn("Failed to fetch index docs count. HTTP status code: {}, endpoint: {}", statusCode, endpoint); + throw new ElasticsearchConnectorException( + CommonErrorCode.REMOTE_SERVICE_ERROR, + String.format("Unexpected HTTP status code %d when accessing endpoint: %s", statusCode, endpoint)); } } catch (IOException e) { throw new RuntimeException(e); From 05842a584a224fe39a368cdef1915131309f7f59 Mon Sep 17 00:00:00 2001 From: WenDing-Y <1062698930@qq.com> Date: Wed, 16 Apr 2025 15:29:07 +0800 Subject: [PATCH 6/9] fix style error --- .../source/ElasticsearchSourceSplitEnumerator.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java index 317fe61baba..61c96f4dfd7 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java @@ -223,10 +223,15 @@ public List getIndexDocsCount(String index) { String entityStr = EntityUtils.toString(entity); return JsonUtils.toList(entityStr, IndexDocsCount.class); } else { - log.warn("Failed to fetch index docs count. HTTP status code: {}, endpoint: {}", statusCode, endpoint); + log.warn( + "Failed to fetch index docs count. HTTP status code: {}, endpoint: {}", + statusCode, + endpoint); throw new ElasticsearchConnectorException( CommonErrorCode.REMOTE_SERVICE_ERROR, - String.format("Unexpected HTTP status code %d when accessing endpoint: %s", statusCode, endpoint)); + String.format( + "Unexpected HTTP status code %d when accessing endpoint: %s", + statusCode, endpoint)); } } catch (IOException e) { throw new RuntimeException(e); From 39a70b6145ad8277c77b98eb829a7e357c879dd8 Mon Sep 17 00:00:00 2001 From: WenDing-Y <1062698930@qq.com> Date: Wed, 16 Apr 2025 15:44:42 +0800 Subject: [PATCH 7/9] fix compile error --- .../source/ElasticsearchSourceSplitEnumerator.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java index 61c96f4dfd7..76f3a57d68f 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchConfig; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.util.HttpClientUtil; @@ -228,7 +229,7 @@ public List getIndexDocsCount(String index) { statusCode, endpoint); throw new ElasticsearchConnectorException( - CommonErrorCode.REMOTE_SERVICE_ERROR, + ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED, String.format( "Unexpected HTTP status code %d when accessing endpoint: %s", statusCode, endpoint)); @@ -242,6 +243,5 @@ public List getIndexDocsCount(String index) { throw new RuntimeException(e); } } - return Collections.emptyList(); } } From 05a75686db83d775a922baab07e9fc5b5b6ff450 Mon Sep 17 00:00:00 2001 From: wending <1062698930@qq.com> Date: Wed, 16 Apr 2025 20:37:28 +0800 Subject: [PATCH 8/9] =?UTF-8?q?=E8=BF=99=E6=98=AF=E4=B8=80=E4=B8=AA?= =?UTF-8?q?=E7=A9=BA=E6=8F=90=E4=BA=A4=E7=9A=84=E8=AF=B4=E6=98=8E=E4=BF=A1?= =?UTF-8?q?=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit From 44f985688cdbd6bbee46a4558b225addeee39508 Mon Sep 17 00:00:00 2001 From: WenDing-Y <1062698930@qq.com> Date: Thu, 17 Apr 2025 09:55:41 +0800 Subject: [PATCH 9/9] fix add close error info --- .../source/ElasticsearchSourceSplitEnumerator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java index 76f3a57d68f..2396e5092b0 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java @@ -240,7 +240,7 @@ public List getIndexDocsCount(String index) { try { httpClient.close(); } catch (IOException e) { - throw new RuntimeException(e); + log.error("Failed to close HttpClient", e); } } }