Skip to content

Commit a6aa18d

Browse files
committed
The flink-cdc-pipeline-connector-elasticsearch module offers better compatibility with Flink 1.20 and Flink 2.x.
Signed-off-by: Pei Yu <125331682@qq.com>
1 parent 13cd198 commit a6aa18d

File tree

12 files changed

+523
-33
lines changed

12 files changed

+523
-33
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ limitations under the License.
3535

3636
<properties>
3737
<elasticsearch.version>8.12.1</elasticsearch.version>
38-
<flink.connector.elasticsearch.version>3.0.1-1.17</flink.connector.elasticsearch.version>
38+
<flink.connector.elasticsearch.version>3.1.0-1.20</flink.connector.elasticsearch.version>
3939
<httpclient.version>4.5.13</httpclient.version>
4040
<jakarta.json.version>2.0.2</jakarta.json.version>
4141
</properties>
@@ -195,4 +195,13 @@ limitations under the License.
195195
</plugin>
196196
</plugins>
197197
</build>
198+
199+
<profiles>
200+
<profile>
201+
<id>flink2</id>
202+
<properties>
203+
<flink.connector.elasticsearch.version>4.0.0-2.0</flink.connector.elasticsearch.version>
204+
</properties>
205+
</profile>
206+
</profiles>
198207
</project>

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.flink.cdc.connectors.elasticsearch.serializer;
1919

20-
import org.apache.flink.api.connector.sink2.Sink;
2120
import org.apache.flink.api.connector.sink2.SinkWriter;
2221
import org.apache.flink.cdc.common.data.RecordData;
2322
import org.apache.flink.cdc.common.event.AddColumnEvent;
@@ -251,9 +250,4 @@ private void checkIndex(int index, int size) {
251250
throw new IndexOutOfBoundsException("Index: " + index + ", Size: " + size);
252251
}
253252
}
254-
255-
@Override
256-
public void open(Sink.InitContext context) {
257-
ElementConverter.super.open(context);
258-
}
259253
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSink.java

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,16 @@
2020
package org.apache.flink.cdc.connectors.elasticsearch.v2;
2121

2222
import org.apache.flink.annotation.VisibleForTesting;
23-
import org.apache.flink.connector.base.sink.AsyncSinkBase;
23+
import org.apache.flink.api.connector.sink2.InitContextAdapter;
24+
import org.apache.flink.api.connector.sink2.SinkWriter;
25+
import org.apache.flink.api.connector.sink2.StatefulSinkWriterAdapter;
26+
import org.apache.flink.api.connector.sink2.WriterInitContext;
27+
import org.apache.flink.connector.base.sink.AsyncSinkBaseAdapter;
2428
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
2529
import org.apache.flink.connector.base.sink.writer.ElementConverter;
2630
import org.apache.flink.core.io.SimpleVersionedSerializer;
2731

28-
import org.slf4j.Logger;
29-
import org.slf4j.LoggerFactory;
30-
32+
import java.io.IOException;
3133
import java.util.Collection;
3234
import java.util.Collections;
3335

@@ -38,8 +40,7 @@
3840
* @param <InputT> type of records that will be converted into {@link Operation}. See {@link
3941
* Elasticsearch8AsyncSinkBuilder} on how to construct valid instances.
4042
*/
41-
public class Elasticsearch8AsyncSink<InputT> extends AsyncSinkBase<InputT, Operation> {
42-
private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch8AsyncSink.class);
43+
public class Elasticsearch8AsyncSink<InputT> extends AsyncSinkBaseAdapter<InputT, Operation> {
4344

4445
@VisibleForTesting protected final NetworkConfig networkConfig;
4546

@@ -84,8 +85,7 @@ protected Elasticsearch8AsyncSink(
8485
* @return a new instance of {@link Elasticsearch8AsyncWriter}.
8586
*/
8687
@Override
87-
public StatefulSinkWriter<InputT, BufferedRequestState<Operation>> createWriter(
88-
InitContext context) {
88+
public SinkWriter<InputT> createWriter(InitContext context) {
8989
return new Elasticsearch8AsyncWriter<>(
9090
getElementConverter(),
9191
context,
@@ -107,11 +107,12 @@ public StatefulSinkWriter<InputT, BufferedRequestState<Operation>> createWriter(
107107
* @return a restored instance of {@link Elasticsearch8AsyncWriter}.
108108
*/
109109
@Override
110-
public StatefulSinkWriter<InputT, BufferedRequestState<Operation>> restoreWriter(
111-
InitContext context, Collection<BufferedRequestState<Operation>> recoveredState) {
110+
public StatefulSinkWriterAdapter<InputT, BufferedRequestState<Operation>> restoreWriterAdapter(
111+
WriterInitContext context, Collection<BufferedRequestState<Operation>> recoveredState)
112+
throws IOException {
112113
return new Elasticsearch8AsyncWriter<>(
113114
getElementConverter(),
114-
context,
115+
new InitContextAdapter(context),
115116
getMaxBatchSize(),
116117
getMaxInFlightRequests(),
117118
getMaxBufferedRequests(),

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020
package org.apache.flink.cdc.connectors.elasticsearch.v2;
2121

2222
import org.apache.flink.api.connector.sink2.Sink;
23+
import org.apache.flink.api.connector.sink2.StatefulSinkWriterAdapter;
2324
import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
24-
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
25+
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterAdapter;
2526
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
2627
import org.apache.flink.connector.base.sink.writer.ElementConverter;
28+
import org.apache.flink.connector.base.sink.writer.ResultHandler;
2729
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
2830
import org.apache.flink.metrics.Counter;
2931
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
@@ -41,9 +43,7 @@
4143
import java.net.NoRouteToHostException;
4244
import java.util.ArrayList;
4345
import java.util.Collection;
44-
import java.util.Collections;
4546
import java.util.List;
46-
import java.util.function.Consumer;
4747

4848
import static org.apache.flink.util.Preconditions.checkNotNull;
4949

@@ -53,7 +53,8 @@
5353
*
5454
* @param <InputT> type of Operations
5555
*/
56-
public class Elasticsearch8AsyncWriter<InputT> extends AsyncSinkWriter<InputT, Operation> {
56+
public class Elasticsearch8AsyncWriter<InputT> extends AsyncSinkWriterAdapter<InputT, Operation>
57+
implements StatefulSinkWriterAdapter<InputT, BufferedRequestState<Operation>> {
5758
private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch8AsyncWriter.class);
5859

5960
private final ElasticsearchAsyncClient esClient;
@@ -118,7 +119,7 @@ public Elasticsearch8AsyncWriter(
118119

119120
@Override
120121
protected void submitRequestEntries(
121-
List<Operation> requestEntries, Consumer<List<Operation>> requestResult) {
122+
List<Operation> requestEntries, ResultHandler<Operation> resultHandler) {
122123
numRequestSubmittedCounter.inc();
123124
LOG.debug("submitRequestEntries with {} items", requestEntries.size());
124125

@@ -136,27 +137,27 @@ protected void submitRequestEntries(
136137
LOG.debug(
137138
"Skipping empty BulkRequest, all {} operation(s) have null BulkOperationVariant",
138139
requestEntries.size());
139-
requestResult.accept(Collections.emptyList());
140+
resultHandler.complete();
140141
return;
141142
}
142143

143144
esClient.bulk(br.build())
144145
.whenComplete(
145146
(response, error) -> {
146147
if (error != null) {
147-
handleFailedRequest(requestEntries, requestResult, error);
148+
handleFailedRequest(requestEntries, resultHandler, error);
148149
} else if (response.errors()) {
149150
handlePartiallyFailedRequest(
150-
requestEntries, requestResult, response);
151+
requestEntries, resultHandler, response);
151152
} else {
152-
handleSuccessfulRequest(requestResult, response);
153+
handleSuccessfulRequest(resultHandler, response);
153154
}
154155
});
155156
}
156157

157158
private void handleFailedRequest(
158159
List<Operation> requestEntries,
159-
Consumer<List<Operation>> requestResult,
160+
ResultHandler<Operation> resultHandler,
160161
Throwable error) {
161162
LOG.warn(
162163
"The BulkRequest of {} operation(s) has failed due to: {}",
@@ -166,13 +167,13 @@ private void handleFailedRequest(
166167
numRecordsOutErrorsCounter.inc(requestEntries.size());
167168

168169
if (isRetryable(error.getCause())) {
169-
requestResult.accept(requestEntries);
170+
resultHandler.retryForEntries(requestEntries);
170171
}
171172
}
172173

173174
private void handlePartiallyFailedRequest(
174175
List<Operation> requestEntries,
175-
Consumer<List<Operation>> requestResult,
176+
ResultHandler<Operation> resultHandler,
176177
BulkResponse response) {
177178
LOG.debug("The BulkRequest has failed partially. Response: {}", response);
178179
ArrayList<Operation> failedItems = new ArrayList<>();
@@ -192,16 +193,16 @@ private void handlePartiallyFailedRequest(
192193
requestEntries.size(),
193194
failedItems.size(),
194195
response.took());
195-
requestResult.accept(failedItems);
196+
resultHandler.retryForEntries(failedItems);
196197
}
197198

198199
private void handleSuccessfulRequest(
199-
Consumer<List<Operation>> requestResult, BulkResponse response) {
200+
ResultHandler<Operation> resultHandler, BulkResponse response) {
200201
LOG.debug(
201202
"The BulkRequest of {} operation(s) completed successfully. It took {}ms",
202203
response.items().size(),
203204
response.took());
204-
requestResult.accept(Collections.emptyList());
205+
resultHandler.complete();
205206
}
206207

207208
private boolean isRetryable(Throwable error) {
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.flink.api.connector.sink2;
21+
22+
/**
23+
* Compatibility adapter for Flink 1.20. This class is part of the multi-version compatibility layer
24+
* that allows Flink CDC to work across different Flink versions.
25+
*/
26+
public interface StatefulSinkWriterAdapter<InputT, WriterStateT>
27+
extends StatefulSink.StatefulSinkWriter<InputT, WriterStateT> {}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.flink.api.connector.sink2;
21+
22+
import org.apache.flink.api.common.JobInfo;
23+
import org.apache.flink.api.common.TaskInfo;
24+
import org.apache.flink.api.common.operators.MailboxExecutor;
25+
import org.apache.flink.api.common.operators.ProcessingTimeService;
26+
import org.apache.flink.api.common.serialization.SerializationSchema;
27+
import org.apache.flink.api.common.typeutils.TypeSerializer;
28+
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
29+
import org.apache.flink.util.UserCodeClassLoader;
30+
31+
import java.util.OptionalLong;
32+
33+
/**
34+
* Compatibility adapter for Flink 1.20. This class is part of the multi-version compatibility layer
35+
* that allows Flink CDC to work across different Flink versions.
36+
*/
37+
public class WriterInitContextAdapter implements WriterInitContext {
38+
39+
private final Sink.InitContext context;
40+
41+
public WriterInitContextAdapter(Sink.InitContext context) {
42+
this.context = context;
43+
}
44+
45+
@Override
46+
public UserCodeClassLoader getUserCodeClassLoader() {
47+
return this.context.getUserCodeClassLoader();
48+
}
49+
50+
@Override
51+
public MailboxExecutor getMailboxExecutor() {
52+
return this.context.getMailboxExecutor();
53+
}
54+
55+
@Override
56+
public ProcessingTimeService getProcessingTimeService() {
57+
return this.context.getProcessingTimeService();
58+
}
59+
60+
@Override
61+
public SinkWriterMetricGroup metricGroup() {
62+
return this.context.metricGroup();
63+
}
64+
65+
@Override
66+
public SerializationSchema.InitializationContext asSerializationSchemaInitializationContext() {
67+
return this.context.asSerializationSchemaInitializationContext();
68+
}
69+
70+
@Override
71+
public boolean isObjectReuseEnabled() {
72+
return this.context.isObjectReuseEnabled();
73+
}
74+
75+
@Override
76+
public <IN> TypeSerializer<IN> createInputSerializer() {
77+
return this.context.createInputSerializer();
78+
}
79+
80+
@Override
81+
public OptionalLong getRestoredCheckpointId() {
82+
return this.context.getRestoredCheckpointId();
83+
}
84+
85+
@Override
86+
public JobInfo getJobInfo() {
87+
return this.context.getJobInfo();
88+
}
89+
90+
@Override
91+
public TaskInfo getTaskInfo() {
92+
return this.context.getTaskInfo();
93+
}
94+
}

0 commit comments

Comments
 (0)