Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.elasticsearch;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;

import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;

import java.io.IOException;
import java.io.Serializable;
import java.util.List;

/**
* An {@link ElasticsearchApiCallBridge} is used to bridge incompatible Elasticsearch Java API calls
* across different versions. This includes calls to create Elasticsearch clients, handle failed
* item responses, etc. Any incompatible Elasticsearch Java APIs should be bridged using this
* interface.
*
* <p>Implementations are allowed to be stateful. For example, for Elasticsearch 1.x, since
* connecting via an embedded node is allowed, the call bridge will hold reference to the created
* embedded node. Each instance of the sink will hold exactly one instance of the call bridge, and
* state cleanup is performed when the sink is closed.
*
* @param <C> The Elasticsearch client, that implements {@link AutoCloseable}.
*/
@Internal
public interface ElasticsearchApiCallBridge<C extends AutoCloseable> extends Serializable {

/**
* Creates an Elasticsearch client implementing {@link AutoCloseable}.
*
* @return The created client.
*/
C createClient(NetworkClientConfig networkClientConfig, List<HttpHost> hosts);

/**
* Executes a search using the Search API.
*
* @param client the Elasticsearch client.
* @param searchRequest A request to execute search against one or more indices (or all).
*/
Tuple2<String, String[]> search(C client, SearchRequest searchRequest) throws IOException;

/**
* Closes this client and releases any system resources associated with it.
*
* @param client the Elasticsearch client.
*/
void close(C client) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.connector.elasticsearch.sink;
package org.apache.flink.connector.elasticsearch;

import org.apache.flink.util.function.SerializableSupplier;

Expand All @@ -26,7 +26,8 @@

import java.io.Serializable;

class NetworkClientConfig implements Serializable {
/** Network config for es client. */
public class NetworkClientConfig implements Serializable {

@Nullable private final String username;
@Nullable private final String password;
Expand All @@ -37,7 +38,7 @@ class NetworkClientConfig implements Serializable {
@Nullable private final SerializableSupplier<SSLContext> sslContextSupplier;
@Nullable private final SerializableSupplier<HostnameVerifier> sslHostnameVerifier;

NetworkClientConfig(
public NetworkClientConfig(
@Nullable String username,
@Nullable String password,
@Nullable String connectionPathPrefix,
Expand Down Expand Up @@ -95,4 +96,69 @@ public SerializableSupplier<SSLContext> getSSLContextSupplier() {
public SerializableSupplier<HostnameVerifier> getSslHostnameVerifier() {
return sslHostnameVerifier;
}

/** Builder for {@link NetworkClientConfig}. */
public static class Builder {
private String username;
private String password;
private String connectionPathPrefix;
private Integer connectionRequestTimeout;
private Integer connectionTimeout;
private Integer socketTimeout;
private SerializableSupplier<SSLContext> sslContextSupplier;
private SerializableSupplier<HostnameVerifier> sslHostnameVerifier;

public Builder setUsername(String username) {
this.username = username;
return this;
}

public Builder setPassword(String password) {
this.password = password;
return this;
}

public Builder setConnectionPathPrefix(String connectionPathPrefix) {
this.connectionPathPrefix = connectionPathPrefix;
return this;
}

public Builder setConnectionRequestTimeout(Integer connectionRequestTimeout) {
this.connectionRequestTimeout = connectionRequestTimeout;
return this;
}

public Builder setConnectionTimeout(Integer connectionTimeout) {
this.connectionTimeout = connectionTimeout;
return this;
}

public Builder setSocketTimeout(Integer socketTimeout) {
this.socketTimeout = socketTimeout;
return this;
}

public Builder setSslContextSupplier(SerializableSupplier<SSLContext> sslContextSupplier) {
this.sslContextSupplier = sslContextSupplier;
return this;
}

public Builder setSslHostnameVerifier(
SerializableSupplier<HostnameVerifier> sslHostnameVerifier) {
this.sslHostnameVerifier = sslHostnameVerifier;
return this;
}

public NetworkClientConfig build() {
return new NetworkClientConfig(
username,
password,
connectionPathPrefix,
connectionRequestTimeout,
connectionTimeout,
socketTimeout,
sslContextSupplier,
sslHostnameVerifier);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
* limitations under the License.
*/

package org.apache.flink.streaming.connectors.elasticsearch.table;
package org.apache.flink.connector.elasticsearch.lookup;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
import org.apache.flink.connector.elasticsearch.ElasticsearchApiCallBridge;
import org.apache.flink.connector.elasticsearch.NetworkClientConfig;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
Expand All @@ -31,6 +32,7 @@
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;

import org.apache.http.HttpHost;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.query.BoolQueryBuilder;
Expand All @@ -44,6 +46,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -72,6 +75,8 @@ public class ElasticsearchRowDataLookupFunction<C extends AutoCloseable> extends
private SearchSourceBuilder searchSourceBuilder;

private final ElasticsearchApiCallBridge<C> callBridge;
private final NetworkClientConfig networkClientConfig;
private final List<HttpHost> hosts;

private transient C client;

Expand All @@ -83,13 +88,17 @@ public ElasticsearchRowDataLookupFunction(
String[] producedNames,
DataType[] producedTypes,
String[] lookupKeys,
List<HttpHost> hosts,
NetworkClientConfig networkClientConfig,
ElasticsearchApiCallBridge<C> callBridge) {

checkNotNull(deserializationSchema, "No DeserializationSchema supplied.");
checkNotNull(maxRetryTimes, "No maxRetryTimes supplied.");
checkNotNull(producedNames, "No fieldNames supplied.");
checkNotNull(producedTypes, "No fieldTypes supplied.");
checkNotNull(lookupKeys, "No keyNames supplied.");
checkNotNull(hosts, "No hosts supplied.");
checkNotNull(networkClientConfig, "No networkClientConfig supplied.");
checkNotNull(callBridge, "No ElasticsearchApiCallBridge supplied.");

this.deserializationSchema = deserializationSchema;
Expand All @@ -110,12 +119,14 @@ public ElasticsearchRowDataLookupFunction(
converters[i] = DataFormatConverters.getConverterForDataType(producedTypes[position]);
}

this.networkClientConfig = networkClientConfig;
this.hosts = hosts;
this.callBridge = callBridge;
}

@Override
public void open(FunctionContext context) throws Exception {
this.client = callBridge.createClient();
this.client = callBridge.createClient(networkClientConfig, hosts);

// Set searchRequest in open method in case of amount of calling in eval method when every
// record comes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.elasticsearch.NetworkClientConfig;
import org.apache.flink.connector.elasticsearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;

import org.apache.http.HttpHost;
Expand Down Expand Up @@ -82,7 +84,7 @@ public class ElasticsearchSink<IN> implements Sink<IN> {
}

@Override
public SinkWriter<IN> createWriter(InitContext context) throws IOException {
public SinkWriter<IN> createWriter(WriterInitContext context) throws IOException {
return new ElasticsearchWriter<>(
hosts,
emitter,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.elasticsearch.NetworkClientConfig;
import org.apache.flink.connector.elasticsearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
import org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultBulkResponseInspector;
import org.apache.flink.connector.elasticsearch.sink.ElasticsearchWriter.DefaultFailureHandler;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,14 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.elasticsearch.NetworkClientConfig;
import org.apache.flink.connector.elasticsearch.utils.RestClientUtils;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.function.ThrowingRunnable;

import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkProcessor;
Expand All @@ -40,7 +38,6 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;
import org.slf4j.Logger;
Expand Down Expand Up @@ -100,7 +97,7 @@ class ElasticsearchWriter<IN> implements SinkWriter<IN> {
this.mailboxExecutor = checkNotNull(mailboxExecutor);
this.client =
new RestHighLevelClient(
configureRestClientBuilder(
RestClientUtils.configureRestClientBuilder(
RestClient.builder(hosts.toArray(new HttpHost[0])),
networkClientConfig));
this.bulkProcessor =
Expand Down Expand Up @@ -156,79 +153,6 @@ public void close() throws Exception {
client.close();
}

private static RestClientBuilder configureRestClientBuilder(
RestClientBuilder builder, NetworkClientConfig networkClientConfig) {
if (networkClientConfig.getConnectionPathPrefix() != null) {
builder.setPathPrefix(networkClientConfig.getConnectionPathPrefix());
}

final CredentialsProvider credentialsProvider = getCredentialsProvider(networkClientConfig);
if (credentialsProvider != null
|| networkClientConfig.getSSLContextSupplier() != null
|| networkClientConfig.getSslHostnameVerifier() != null) {
builder.setHttpClientConfigCallback(
httpClientBuilder -> {
if (credentialsProvider != null) {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}

if (networkClientConfig.getSSLContextSupplier() != null) {
// client creates SSL context using the configured supplier
httpClientBuilder.setSSLContext(
networkClientConfig.getSSLContextSupplier().get());
}

if (networkClientConfig.getSslHostnameVerifier() != null) {
httpClientBuilder.setSSLHostnameVerifier(
networkClientConfig.getSslHostnameVerifier().get());
}

return httpClientBuilder;
});
}

if (networkClientConfig.getConnectionRequestTimeout() != null
|| networkClientConfig.getConnectionTimeout() != null
|| networkClientConfig.getSocketTimeout() != null) {
builder.setRequestConfigCallback(
requestConfigBuilder -> {
if (networkClientConfig.getConnectionRequestTimeout() != null) {
requestConfigBuilder.setConnectionRequestTimeout(
networkClientConfig.getConnectionRequestTimeout());
}
if (networkClientConfig.getConnectionTimeout() != null) {
requestConfigBuilder.setConnectTimeout(
networkClientConfig.getConnectionTimeout());
}
if (networkClientConfig.getSocketTimeout() != null) {
requestConfigBuilder.setSocketTimeout(
networkClientConfig.getSocketTimeout());
}
return requestConfigBuilder;
});
}
return builder;
}

/**
* Get an http client credentials provider given network client config.
*
* <p>If network client config is not configured with username or password, return null.
*/
private static CredentialsProvider getCredentialsProvider(
NetworkClientConfig networkClientConfig) {
CredentialsProvider credentialsProvider = null;
if (networkClientConfig.getPassword() != null
&& networkClientConfig.getUsername() != null) {
credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(
AuthScope.ANY,
new UsernamePasswordCredentials(
networkClientConfig.getUsername(), networkClientConfig.getPassword()));
}
return credentialsProvider;
}

private BulkProcessor createBulkProcessor(
BulkProcessorBuilderFactory bulkProcessorBuilderFactory,
BulkProcessorConfig bulkProcessorConfig,
Expand Down
Loading
Loading