diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java index d501c649a81..2396e5092b0 100644 --- a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/source/ElasticsearchSourceSplitEnumerator.java @@ -20,10 +20,20 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig; import org.apache.seatunnel.api.source.SourceSplitEnumerator; import org.apache.seatunnel.common.exception.CommonErrorCode; -import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient; +import org.apache.seatunnel.common.utils.JsonUtils; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchConfig; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.IndexDocsCount; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.util.HttpClientUtil; + +import org.apache.http.HttpEntity; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.util.EntityUtils; import lombok.extern.slf4j.Slf4j; @@ -46,8 +56,6 @@ public class ElasticsearchSourceSplitEnumerator private final ReadonlyConfig connConfig; - private EsRestClient esRestClient; - private final Object stateLock = new Object(); private Map> pendingSplit; @@ -80,9 +88,7 @@ public ElasticsearchSourceSplitEnumerator( } @Override - public void open() { - esRestClient = EsRestClient.createInstance(connConfig); - } + public void open() {} @Override public void run() { @@ -142,7 +148,7 @@ private List getElasticsearchSplit() { for (ElasticsearchConfig elasticsearchConfig : elasticsearchConfigs) { String index = elasticsearchConfig.getIndex(); - List indexDocsCounts = esRestClient.getIndexDocsCount(index); + List indexDocsCounts = getIndexDocsCount(index); indexDocsCounts = indexDocsCounts.stream() .filter(x -> x.getDocsCount() != null && x.getDocsCount() > 0) @@ -160,9 +166,7 @@ private List getElasticsearchSplit() { } @Override - public void close() throws IOException { - esRestClient.close(); - } + public void close() throws IOException {} @Override public void addSplitsBack(List splits, int subtaskId) { @@ -201,4 +205,43 @@ public ElasticsearchSourceState snapshotState(long checkpointId) throws Exceptio @Override public void notifyCheckpointComplete(long checkpointId) throws Exception {} + + public List getIndexDocsCount(String index) { + List hosts = connConfig.get(ElasticsearchBaseOptions.HOSTS); + if (hosts.isEmpty()) { + throw new IllegalArgumentException("hosts must not be null nor empty"); + } + String endpoint = + String.format( + "%s/_cat/indices/%s?h=index,docsCount&format=json", hosts.get(0), index); + HttpClientUtil httpClientUtil = new HttpClientUtil(); + CloseableHttpClient httpClient = httpClientUtil.getHttpClient(connConfig); + HttpGet httpGet = new HttpGet(endpoint); + try (CloseableHttpResponse response = httpClient.execute(httpGet)) { + int statusCode = response.getStatusLine().getStatusCode(); + if (statusCode == HttpStatus.SC_OK) { + HttpEntity entity = response.getEntity(); + String entityStr = EntityUtils.toString(entity); + return JsonUtils.toList(entityStr, IndexDocsCount.class); + } else { + log.warn( + "Failed to fetch index docs count. HTTP status code: {}, endpoint: {}", + statusCode, + endpoint); + throw new ElasticsearchConnectorException( + ElasticsearchConnectorErrorCode.GET_INDEX_DOCS_COUNT_FAILED, + String.format( + "Unexpected HTTP status code %d when accessing endpoint: %s", + statusCode, endpoint)); + } + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + try { + httpClient.close(); + } catch (IOException e) { + log.error("Failed to close HttpClient", e); + } + } + } } diff --git a/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/util/HttpClientUtil.java b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/util/HttpClientUtil.java new file mode 100644 index 00000000000..201eaeecbeb --- /dev/null +++ b/seatunnel-connectors-v2/connector-elasticsearch/src/main/java/org/apache/seatunnel/connectors/seatunnel/elasticsearch/util/HttpClientUtil.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.elasticsearch.util; + +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.ElasticsearchBaseOptions; + +import org.apache.http.auth.AuthScope; +import org.apache.http.auth.UsernamePasswordCredentials; +import org.apache.http.client.CredentialsProvider; +import org.apache.http.conn.ssl.NoopHostnameVerifier; +import org.apache.http.conn.ssl.TrustAllStrategy; +import org.apache.http.impl.client.BasicCredentialsProvider; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.ssl.SSLContexts; + +import lombok.extern.slf4j.Slf4j; + +import javax.net.ssl.SSLContext; + +import java.io.IOException; +import java.security.GeneralSecurityException; +import java.util.Optional; + +@Slf4j +public class HttpClientUtil { + + public CloseableHttpClient getHttpClient(ReadonlyConfig config) { + // 1. Initialize credentials provider + CredentialsProvider credsProvider = new BasicCredentialsProvider(); + Optional username = config.getOptional(ElasticsearchBaseOptions.USERNAME); + Optional password = config.getOptional(ElasticsearchBaseOptions.PASSWORD); + + // 2. Configure SSL + boolean tlsVerifyCertificate = config.get(ElasticsearchBaseOptions.TLS_VERIFY_CERTIFICATE); + boolean tlsVerifyHostnames = config.get(ElasticsearchBaseOptions.TLS_VERIFY_HOSTNAME); + + SSLContext sslContext = buildSSLContext(config, tlsVerifyCertificate); + + // 3. Set credentials if provided + username.ifPresent( + u -> + password.ifPresent( + p -> + credsProvider.setCredentials( + new AuthScope( + AuthScope.ANY_HOST, AuthScope.ANY_PORT), + new UsernamePasswordCredentials(u, p)))); + + // 4. Build HttpClient + HttpClientBuilder httpClientBuilder = + HttpClients.custom().setDefaultCredentialsProvider(credsProvider); + + if (sslContext != null) { + httpClientBuilder.setSSLContext(sslContext); + } + + if (!tlsVerifyHostnames) { + httpClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE); + } + + return httpClientBuilder.build(); + } + + private SSLContext buildSSLContext(ReadonlyConfig config, boolean tlsVerifyCertificate) { + try { + if (tlsVerifyCertificate) { + Optional keystorePath = + config.getOptional(ElasticsearchBaseOptions.TLS_KEY_STORE_PATH); + Optional keystorePassword = + config.getOptional(ElasticsearchBaseOptions.TLS_KEY_STORE_PASSWORD); + Optional truststorePath = + config.getOptional(ElasticsearchBaseOptions.TLS_TRUST_STORE_PATH); + Optional truststorePassword = + config.getOptional(ElasticsearchBaseOptions.TLS_TRUST_STORE_PASSWORD); + + return SSLUtils.buildSSLContext( + keystorePath, keystorePassword, truststorePath, truststorePassword) + .orElseThrow(() -> new RuntimeException("Failed to build SSLContext")); + } else { + log.warn( + "TLS certificate verification is disabled. This is not secure for production environments."); + return SSLContexts.custom().loadTrustMaterial(new TrustAllStrategy()).build(); + } + } catch (GeneralSecurityException | IOException e) { + throw new RuntimeException("Failed to initialize SSLContext", e); + } + } +}