Skip to content

Commit 210e753

Browse files
author
lwn
committed
Add table sink support for Elasticsearch 8 connector
1 parent f19a471 commit 210e753

24 files changed

+2708
-10
lines changed

flink-connector-elasticsearch8/pom.xml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,27 @@ under the License.
138138
<type>test-jar</type>
139139
<scope>test</scope>
140140
</dependency>
141+
142+
<dependency>
143+
<groupId>org.apache.flink</groupId>
144+
<artifactId>flink-table-runtime</artifactId>
145+
<version>${flink.version}</version>
146+
<scope>provided</scope>
147+
</dependency>
148+
149+
<dependency>
150+
<groupId>org.apache.flink</groupId>
151+
<artifactId>flink-table-planner-loader</artifactId>
152+
<version>${flink.version}</version>
153+
<scope>test</scope>
154+
</dependency>
155+
156+
<dependency>
157+
<groupId>com.fasterxml.jackson.datatype</groupId>
158+
<artifactId>jackson-datatype-jsr310</artifactId>
159+
<version>${jackson.version}</version>
160+
</dependency>
161+
141162
</dependencies>
142163

143164
<dependencyManagement>

flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/Elasticsearch8AsyncSinkBuilder.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
package org.apache.flink.connector.elasticsearch.sink;
2323

2424
import org.apache.flink.api.connector.sink2.SinkWriter;
25+
import org.apache.flink.api.connector.sink2.WriterInitContext;
2526
import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
2627
import org.apache.flink.connector.base.sink.writer.ElementConverter;
2728
import org.apache.flink.util.function.SerializableSupplier;
@@ -80,6 +81,15 @@ public class Elasticsearch8AsyncSinkBuilder<InputT>
8081
*/
8182
private ElementConverter<InputT, BulkOperationVariant> elementConverter;
8283

84+
/** the path's prefix for every request. */
85+
private String connectionPathPrefix;
86+
87+
private Integer connectionTimeout;
88+
89+
private Integer connectionRequestTimeout;
90+
91+
private Integer socketTimeout;
92+
8393
private SerializableSupplier<SSLContext> sslContextSupplier;
8494

8595
private SerializableSupplier<HostnameVerifier> sslHostnameVerifier;
@@ -97,6 +107,28 @@ public Elasticsearch8AsyncSinkBuilder<InputT> setHosts(HttpHost... hosts) {
97107
return this;
98108
}
99109

110+
public Elasticsearch8AsyncSinkBuilder<InputT> setConnectionPathPrefix(
111+
String connectionPathPrefix) {
112+
this.connectionPathPrefix = connectionPathPrefix;
113+
return this;
114+
}
115+
116+
public Elasticsearch8AsyncSinkBuilder<InputT> setConnectionTimeout(Integer connectionTimeout) {
117+
this.connectionTimeout = connectionTimeout;
118+
return this;
119+
}
120+
121+
public Elasticsearch8AsyncSinkBuilder<InputT> setConnectionRequestTimeout(
122+
Integer connectionRequestTimeout) {
123+
this.connectionRequestTimeout = connectionRequestTimeout;
124+
return this;
125+
}
126+
127+
public Elasticsearch8AsyncSinkBuilder<InputT> setSocketTimeout(Integer socketTimeout) {
128+
this.socketTimeout = socketTimeout;
129+
return this;
130+
}
131+
100132
/**
101133
* setHeaders set the headers to be sent with the requests made to Elasticsearch cluster..
102134
*
@@ -239,7 +271,16 @@ private OperationConverter<InputT> buildOperationConverter(
239271
private NetworkConfig buildNetworkConfig() {
240272
checkArgument(!hosts.isEmpty(), "Hosts cannot be empty.");
241273
return new NetworkConfig(
242-
hosts, username, password, headers, sslContextSupplier, sslHostnameVerifier);
274+
hosts,
275+
username,
276+
password,
277+
headers,
278+
connectionPathPrefix,
279+
connectionRequestTimeout,
280+
connectionTimeout,
281+
socketTimeout,
282+
sslContextSupplier,
283+
sslHostnameVerifier);
243284
}
244285

245286
/** A wrapper that evolves the Operation, since a BulkOperationVariant is not Serializable. */
@@ -250,6 +291,12 @@ public OperationConverter(ElementConverter<T, BulkOperationVariant> converter) {
250291
this.converter = converter;
251292
}
252293

294+
@Override
295+
public void open(WriterInitContext context) {
296+
// call converter.open() before calling converter.apply()
297+
converter.open(context);
298+
}
299+
253300
@Override
254301
public Operation apply(T element, SinkWriter.Context context) {
255302
return new Operation(converter.apply(element, context));

flink-connector-elasticsearch8/src/main/java/org/apache/flink/connector/elasticsearch/sink/NetworkConfig.java

Lines changed: 60 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@
2626
import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
2727
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
2828
import co.elastic.clients.transport.rest_client.RestClientTransport;
29+
import com.fasterxml.jackson.databind.ObjectMapper;
30+
import com.fasterxml.jackson.databind.json.JsonMapper;
31+
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
32+
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer;
33+
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
34+
import com.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer;
2935
import org.apache.http.Header;
3036
import org.apache.http.HttpHost;
3137
import org.apache.http.auth.AuthScope;
@@ -40,43 +46,66 @@
4046
import javax.net.ssl.SSLContext;
4147

4248
import java.io.Serializable;
49+
import java.time.LocalDate;
50+
import java.time.LocalDateTime;
51+
import java.time.LocalTime;
52+
import java.time.format.DateTimeFormatter;
4353
import java.util.List;
4454

4555
import static org.apache.flink.util.Preconditions.checkState;
4656

4757
/** A factory that creates valid ElasticsearchClient instances. */
4858
public class NetworkConfig implements Serializable {
4959
private final List<HttpHost> hosts;
50-
5160
private final List<Header> headers;
52-
5361
private final String username;
54-
5562
private final String password;
56-
63+
@Nullable private final String connectionPathPrefix;
64+
@Nullable Integer connectionRequestTimeout;
65+
@Nullable Integer connectionTimeout;
66+
@Nullable Integer socketTimeout;
5767
@Nullable private final SerializableSupplier<SSLContext> sslContextSupplier;
58-
5968
@Nullable private final SerializableSupplier<HostnameVerifier> sslHostnameVerifier;
69+
private static final DateTimeFormatter DATE_TIME_FORMATTER =
70+
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
71+
private static final DateTimeFormatter DATE_FORMATTER =
72+
DateTimeFormatter.ofPattern("yyyy-MM-dd");
73+
private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss");
6074

6175
public NetworkConfig(
6276
List<HttpHost> hosts,
6377
String username,
6478
String password,
6579
List<Header> headers,
66-
SerializableSupplier<SSLContext> sslContextSupplier,
67-
SerializableSupplier<HostnameVerifier> sslHostnameVerifier) {
80+
@Nullable String connectionPathPrefix,
81+
@Nullable Integer connectionRequestTimeout,
82+
@Nullable Integer connectionTimeout,
83+
@Nullable Integer socketTimeout,
84+
@Nullable SerializableSupplier<SSLContext> sslContextSupplier,
85+
@Nullable SerializableSupplier<HostnameVerifier> sslHostnameVerifier) {
6886
checkState(!hosts.isEmpty(), "Hosts must not be empty");
6987
this.hosts = hosts;
7088
this.username = username;
7189
this.password = password;
7290
this.headers = headers;
91+
this.connectionRequestTimeout = connectionRequestTimeout;
92+
this.connectionTimeout = connectionTimeout;
93+
this.socketTimeout = socketTimeout;
94+
this.connectionPathPrefix = connectionPathPrefix;
7395
this.sslContextSupplier = sslContextSupplier;
7496
this.sslHostnameVerifier = sslHostnameVerifier;
7597
}
7698

7799
public ElasticsearchAsyncClient createEsClient() {
100+
// the JavaTimeModule is added to provide support for java 8 Time classes.
101+
JavaTimeModule javaTimeModule = new JavaTimeModule();
102+
javaTimeModule.addSerializer(
103+
LocalDateTime.class, new LocalDateTimeSerializer(DATE_TIME_FORMATTER));
104+
javaTimeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DATE_FORMATTER));
105+
javaTimeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(TIME_FORMATTER));
106+
ObjectMapper mapper = JsonMapper.builder().addModule(javaTimeModule).build();
78107
return new ElasticsearchAsyncClient(
79-
new RestClientTransport(this.getRestClient(), new JacksonJsonpMapper()));
108+
new RestClientTransport(this.getRestClient(), new JacksonJsonpMapper(mapper)));
80109
}
81110

82111
private RestClient getRestClient() {
@@ -105,6 +134,29 @@ private RestClient getRestClient() {
105134
restClientBuilder.setDefaultHeaders(headers.toArray(new Header[0]));
106135
}
107136

137+
if (connectionPathPrefix != null) {
138+
restClientBuilder.setPathPrefix(connectionPathPrefix);
139+
}
140+
141+
if (connectionRequestTimeout != null
142+
|| connectionTimeout != null
143+
|| socketTimeout != null) {
144+
restClientBuilder.setRequestConfigCallback(
145+
requestConfigBuilder -> {
146+
if (connectionRequestTimeout != null) {
147+
requestConfigBuilder.setConnectionRequestTimeout(
148+
connectionRequestTimeout);
149+
}
150+
if (connectionTimeout != null) {
151+
requestConfigBuilder.setConnectTimeout(connectionTimeout);
152+
}
153+
if (socketTimeout != null) {
154+
requestConfigBuilder.setSocketTimeout(socketTimeout);
155+
}
156+
return requestConfigBuilder;
157+
});
158+
}
159+
108160
return restClientBuilder.build();
109161
}
110162

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.apache.flink.connector.elasticsearch.table;
2+
3+
import org.apache.flink.annotation.Internal;
4+
5+
import java.time.format.DateTimeFormatter;
6+
7+
/** Abstract class for time related {@link IndexGenerator}. */
8+
@Internal
9+
abstract class AbstractTimeIndexGenerator extends IndexGeneratorBase {
10+
11+
private final String dateTimeFormat;
12+
protected transient DateTimeFormatter dateTimeFormatter;
13+
14+
public AbstractTimeIndexGenerator(String index, String dateTimeFormat) {
15+
super(index);
16+
this.dateTimeFormat = dateTimeFormat;
17+
}
18+
19+
@Override
20+
public void open() {
21+
this.dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat);
22+
}
23+
}

0 commit comments

Comments
 (0)