Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ Pipeline Connector Options
<tr>
<td>record.size.max.bytes</td>
<td>optional</td>
<td style="word-wrap: break-word;">10485760</td>
<td style="word-wrap: break-word;">5242880</td>
<td>Long</td>
<td>单个记录的最大大小(以byte为单位)。</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ Pipeline Connector Options
<tr>
<td>record.size.max.bytes</td>
<td>optional</td>
<td style="word-wrap: break-word;">10485760</td>
<td style="word-wrap: break-word;">5242880</td>
<td>Long</td>
<td>The maximum size of a single record in bytes.</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ limitations under the License.

<properties>
<elasticsearch.version>8.12.1</elasticsearch.version>
<flink.connector.elasticsearch.version>3.0.1-1.17</flink.connector.elasticsearch.version>
<flink.connector.elasticsearch.version>3.1.0-1.20</flink.connector.elasticsearch.version>
<httpclient.version>4.5.13</httpclient.version>
<jakarta.json.version>2.0.2</jakarta.json.version>
</properties>
Expand Down Expand Up @@ -195,4 +195,13 @@ limitations under the License.
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>flink2</id>
<properties>
<flink.connector.elasticsearch.version>4.0.0-2.0</flink.connector.elasticsearch.version>
</properties>
</profile>
</profiles>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.flink.cdc.connectors.elasticsearch.serializer;

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
Expand Down Expand Up @@ -251,9 +250,4 @@ private void checkIndex(int index, int size) {
throw new IndexOutOfBoundsException("Index: " + index + ", Size: " + size);
}
}

@Override
public void open(Sink.InitContext context) {
ElementConverter.super.open(context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class ElasticsearchDataSinkOptions {
public static final ConfigOption<Long> MAX_RECORD_SIZE_IN_BYTES =
ConfigOptions.key("record.size.max.bytes")
.longType()
.defaultValue(10L * 1024L * 1024L)
.defaultValue(5L * 1024L * 1024L)
.withDescription("The maximum size of a single record in bytes.");

/** The version of Elasticsearch to connect to. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@
package org.apache.flink.cdc.connectors.elasticsearch.v2;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.connector.base.sink.AsyncSinkBase;
import org.apache.flink.api.connector.sink2.InitContextAdapter;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSinkWriterAdapter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.base.sink.AsyncSinkBaseAdapter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.core.io.SimpleVersionedSerializer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Collection;
import java.util.Collections;

Expand All @@ -38,8 +40,7 @@
* @param <InputT> type of records that will be converted into {@link Operation}. See {@link
* Elasticsearch8AsyncSinkBuilder} on how to construct valid instances.
*/
public class Elasticsearch8AsyncSink<InputT> extends AsyncSinkBase<InputT, Operation> {
private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch8AsyncSink.class);
public class Elasticsearch8AsyncSink<InputT> extends AsyncSinkBaseAdapter<InputT, Operation> {

@VisibleForTesting protected final NetworkConfig networkConfig;

Expand Down Expand Up @@ -78,14 +79,13 @@ protected Elasticsearch8AsyncSink(
}

/**
* Creates a new {@link StatefulSinkWriter} for writing elements to Elasticsearch.
* Creates a new {@link SinkWriter} for writing elements to Elasticsearch.
*
* @param context the initialization context.
* @return a new instance of {@link Elasticsearch8AsyncWriter}.
*/
@Override
public StatefulSinkWriter<InputT, BufferedRequestState<Operation>> createWriter(
InitContext context) {
public SinkWriter<InputT> createWriter(InitContext context) {
return new Elasticsearch8AsyncWriter<>(
getElementConverter(),
context,
Expand All @@ -99,19 +99,35 @@ public StatefulSinkWriter<InputT, BufferedRequestState<Operation>> createWriter(
Collections.emptyList());
}

@Override
public SinkWriter<InputT> createWriter(WriterInitContext context) throws IOException {
return new Elasticsearch8AsyncWriter<>(
getElementConverter(),
new InitContextAdapter(context),
getMaxBatchSize(),
getMaxInFlightRequests(),
getMaxBufferedRequests(),
getMaxBatchSizeInBytes(),
getMaxTimeInBufferMS(),
getMaxRecordSizeInBytes(),
networkConfig,
Collections.emptyList());
}

/**
* Restores a {@link StatefulSinkWriter} from a previously saved state.
* Restores a {@link StatefulSinkWriterAdapter} from a previously saved state.
*
* @param context the initialization context.
* @param recoveredState the recovered state.
* @return a restored instance of {@link Elasticsearch8AsyncWriter}.
*/
@Override
public StatefulSinkWriter<InputT, BufferedRequestState<Operation>> restoreWriter(
InitContext context, Collection<BufferedRequestState<Operation>> recoveredState) {
public StatefulSinkWriterAdapter<InputT, BufferedRequestState<Operation>> restoreWriterAdapter(
WriterInitContext context, Collection<BufferedRequestState<Operation>> recoveredState)
throws IOException {
return new Elasticsearch8AsyncWriter<>(
getElementConverter(),
context,
new InitContextAdapter(context),
getMaxBatchSize(),
getMaxInFlightRequests(),
getMaxBufferedRequests(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
package org.apache.flink.cdc.connectors.elasticsearch.v2;

import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.StatefulSinkWriterAdapter;
import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterAdapter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.connector.base.sink.writer.ResultHandler;
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
Expand All @@ -41,9 +43,7 @@
import java.net.NoRouteToHostException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;

import static org.apache.flink.util.Preconditions.checkNotNull;

Expand All @@ -53,7 +53,8 @@
*
* @param <InputT> type of Operations
*/
public class Elasticsearch8AsyncWriter<InputT> extends AsyncSinkWriter<InputT, Operation> {
public class Elasticsearch8AsyncWriter<InputT> extends AsyncSinkWriterAdapter<InputT, Operation>
implements StatefulSinkWriterAdapter<InputT, BufferedRequestState<Operation>> {
private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch8AsyncWriter.class);

private final ElasticsearchAsyncClient esClient;
Expand Down Expand Up @@ -118,7 +119,7 @@ public Elasticsearch8AsyncWriter(

@Override
protected void submitRequestEntries(
List<Operation> requestEntries, Consumer<List<Operation>> requestResult) {
List<Operation> requestEntries, ResultHandler<Operation> resultHandler) {
numRequestSubmittedCounter.inc();
LOG.debug("submitRequestEntries with {} items", requestEntries.size());

Expand All @@ -136,27 +137,27 @@ protected void submitRequestEntries(
LOG.debug(
"Skipping empty BulkRequest, all {} operation(s) have null BulkOperationVariant",
requestEntries.size());
requestResult.accept(Collections.emptyList());
resultHandler.complete();
return;
}

esClient.bulk(br.build())
.whenComplete(
(response, error) -> {
if (error != null) {
handleFailedRequest(requestEntries, requestResult, error);
handleFailedRequest(requestEntries, resultHandler, error);
} else if (response.errors()) {
handlePartiallyFailedRequest(
requestEntries, requestResult, response);
requestEntries, resultHandler, response);
} else {
handleSuccessfulRequest(requestResult, response);
handleSuccessfulRequest(resultHandler, response);
}
});
}

private void handleFailedRequest(
List<Operation> requestEntries,
Consumer<List<Operation>> requestResult,
ResultHandler<Operation> resultHandler,
Throwable error) {
LOG.warn(
"The BulkRequest of {} operation(s) has failed due to: {}",
Expand All @@ -165,14 +166,20 @@ private void handleFailedRequest(
LOG.debug("The BulkRequest has failed", error);
numRecordsOutErrorsCounter.inc(requestEntries.size());

if (isRetryable(error.getCause())) {
requestResult.accept(requestEntries);
Throwable retryableError = error.getCause() != null ? error.getCause() : error;
if (isRetryable(retryableError)) {
resultHandler.retryForEntries(requestEntries);
} else {
resultHandler.completeExceptionally(
retryableError instanceof Exception
? (Exception) retryableError
: new FlinkRuntimeException(retryableError));
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ResultHandler API has a completeExceptionally(Exception) method to get it fail-fast. I think we should better handle the case where the error is not retriable:

        if (isRetryable(retryableError)) {
            resultHandler.retryForEntries(requestEntries);
        } else {
            resultHandler.completeExceptionally(
                    retryableError instanceof Exception
                            ? (Exception) retryableError
                            : new FlinkRuntimeException(retryableError));
        }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, done

}

private void handlePartiallyFailedRequest(
List<Operation> requestEntries,
Consumer<List<Operation>> requestResult,
ResultHandler<Operation> resultHandler,
BulkResponse response) {
LOG.debug("The BulkRequest has failed partially. Response: {}", response);
ArrayList<Operation> failedItems = new ArrayList<>();
Expand All @@ -192,16 +199,16 @@ private void handlePartiallyFailedRequest(
requestEntries.size(),
failedItems.size(),
response.took());
requestResult.accept(failedItems);
resultHandler.retryForEntries(failedItems);
}

private void handleSuccessfulRequest(
Consumer<List<Operation>> requestResult, BulkResponse response) {
ResultHandler<Operation> resultHandler, BulkResponse response) {
LOG.debug(
"The BulkRequest of {} operation(s) completed successfully. It took {}ms",
response.items().size(),
response.took());
requestResult.accept(Collections.emptyList());
resultHandler.complete();
}

private boolean isRetryable(Throwable error) {
Expand Down
21 changes: 21 additions & 0 deletions flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,12 @@ limitations under the License.
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-elasticsearch</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-postgres-cdc</artifactId>
Expand Down Expand Up @@ -249,6 +255,12 @@ limitations under the License.
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>elasticsearch</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>

<!-- benchmark -->
<dependency>
Expand Down Expand Up @@ -714,6 +726,15 @@ limitations under the License.
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cdc-pipeline-connector-elasticsearch</artifactId>
<version>${project.version}</version>
<destFileName>elasticsearch-cdc-pipeline-connector.jar</destFileName>
<type>jar</type>
<outputDirectory>${project.build.directory}/dependencies
</outputDirectory>
</artifactItem>
<artifactItem>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet</artifactId>
Expand Down
Loading
Loading