diff --git a/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md b/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md
index 0b260c7613e..203e6d29e1a 100644
--- a/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md
+++ b/docs/content.zh/docs/connectors/pipeline-connectors/elasticsearch.md
@@ -160,7 +160,7 @@ Pipeline Connector Options
| record.size.max.bytes |
optional |
- 10485760 |
+ 5242880 |
Long |
单个记录的最大大小(以byte为单位)。 |
diff --git a/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md b/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md
index 3f3d693fee8..60b3168720a 100644
--- a/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md
+++ b/docs/content/docs/connectors/pipeline-connectors/elasticsearch.md
@@ -160,7 +160,7 @@ Pipeline Connector Options
| record.size.max.bytes |
optional |
- 10485760 |
+ 5242880 |
Long |
The maximum size of a single record in bytes. |
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml
index de00614494d..1cb38ef05c1 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/pom.xml
@@ -35,7 +35,7 @@ limitations under the License.
8.12.1
- 3.0.1-1.17
+ 3.1.0-1.20
4.5.13
2.0.2
@@ -195,4 +195,13 @@ limitations under the License.
+
+
+
+ flink2
+
+ 4.0.0-2.0
+
+
+
\ No newline at end of file
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java
index 800c0f359f7..afae409b51b 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/serializer/ElasticsearchEventSerializer.java
@@ -17,7 +17,6 @@
package org.apache.flink.cdc.connectors.elasticsearch.serializer;
-import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.cdc.common.data.RecordData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
@@ -251,9 +250,4 @@ private void checkIndex(int index, int size) {
throw new IndexOutOfBoundsException("Index: " + index + ", Size: " + size);
}
}
-
- @Override
- public void open(Sink.InitContext context) {
- ElementConverter.super.open(context);
- }
}
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java
index 213b446c8d7..45183a187b1 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/sink/ElasticsearchDataSinkOptions.java
@@ -74,7 +74,7 @@ public class ElasticsearchDataSinkOptions {
public static final ConfigOption MAX_RECORD_SIZE_IN_BYTES =
ConfigOptions.key("record.size.max.bytes")
.longType()
- .defaultValue(10L * 1024L * 1024L)
+ .defaultValue(5L * 1024L * 1024L)
.withDescription("The maximum size of a single record in bytes.");
/** The version of Elasticsearch to connect to. */
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSink.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSink.java
index 4a237e3a36c..1250c94b445 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSink.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSink.java
@@ -20,14 +20,16 @@
package org.apache.flink.cdc.connectors.elasticsearch.v2;
import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.connector.base.sink.AsyncSinkBase;
+import org.apache.flink.api.connector.sink2.InitContextAdapter;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.StatefulSinkWriterAdapter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.base.sink.AsyncSinkBaseAdapter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
+import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
@@ -38,8 +40,7 @@
* @param type of records that will be converted into {@link Operation}. See {@link
* Elasticsearch8AsyncSinkBuilder} on how to construct valid instances.
*/
-public class Elasticsearch8AsyncSink extends AsyncSinkBase {
- private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch8AsyncSink.class);
+public class Elasticsearch8AsyncSink extends AsyncSinkBaseAdapter {
@VisibleForTesting protected final NetworkConfig networkConfig;
@@ -78,14 +79,13 @@ protected Elasticsearch8AsyncSink(
}
/**
- * Creates a new {@link StatefulSinkWriter} for writing elements to Elasticsearch.
+ * Creates a new {@link SinkWriter} for writing elements to Elasticsearch.
*
* @param context the initialization context.
* @return a new instance of {@link Elasticsearch8AsyncWriter}.
*/
@Override
- public StatefulSinkWriter> createWriter(
- InitContext context) {
+ public SinkWriter createWriter(InitContext context) {
return new Elasticsearch8AsyncWriter<>(
getElementConverter(),
context,
@@ -99,19 +99,35 @@ public StatefulSinkWriter> createWriter(
Collections.emptyList());
}
+ @Override
+ public SinkWriter createWriter(WriterInitContext context) throws IOException {
+ return new Elasticsearch8AsyncWriter<>(
+ getElementConverter(),
+ new InitContextAdapter(context),
+ getMaxBatchSize(),
+ getMaxInFlightRequests(),
+ getMaxBufferedRequests(),
+ getMaxBatchSizeInBytes(),
+ getMaxTimeInBufferMS(),
+ getMaxRecordSizeInBytes(),
+ networkConfig,
+ Collections.emptyList());
+ }
+
/**
- * Restores a {@link StatefulSinkWriter} from a previously saved state.
+ * Restores a {@link StatefulSinkWriterAdapter} from a previously saved state.
*
* @param context the initialization context.
* @param recoveredState the recovered state.
* @return a restored instance of {@link Elasticsearch8AsyncWriter}.
*/
@Override
- public StatefulSinkWriter> restoreWriter(
- InitContext context, Collection> recoveredState) {
+ public StatefulSinkWriterAdapter> restoreWriterAdapter(
+ WriterInitContext context, Collection> recoveredState)
+ throws IOException {
return new Elasticsearch8AsyncWriter<>(
getElementConverter(),
- context,
+ new InitContextAdapter(context),
getMaxBatchSize(),
getMaxInFlightRequests(),
getMaxBufferedRequests(),
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java
index 86d08169796..b3246b1aa51 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncWriter.java
@@ -20,10 +20,12 @@
package org.apache.flink.cdc.connectors.elasticsearch.v2;
import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.StatefulSinkWriterAdapter;
import org.apache.flink.connector.base.sink.throwable.FatalExceptionClassifier;
-import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriterAdapter;
import org.apache.flink.connector.base.sink.writer.BufferedRequestState;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.connector.base.sink.writer.ResultHandler;
import org.apache.flink.connector.base.sink.writer.config.AsyncSinkWriterConfiguration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
@@ -41,9 +43,7 @@
import java.net.NoRouteToHostException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
-import java.util.function.Consumer;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -53,7 +53,8 @@
*
* @param type of Operations
*/
-public class Elasticsearch8AsyncWriter extends AsyncSinkWriter {
+public class Elasticsearch8AsyncWriter extends AsyncSinkWriterAdapter
+ implements StatefulSinkWriterAdapter> {
private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch8AsyncWriter.class);
private final ElasticsearchAsyncClient esClient;
@@ -118,7 +119,7 @@ public Elasticsearch8AsyncWriter(
@Override
protected void submitRequestEntries(
- List requestEntries, Consumer> requestResult) {
+ List requestEntries, ResultHandler resultHandler) {
numRequestSubmittedCounter.inc();
LOG.debug("submitRequestEntries with {} items", requestEntries.size());
@@ -136,7 +137,7 @@ protected void submitRequestEntries(
LOG.debug(
"Skipping empty BulkRequest, all {} operation(s) have null BulkOperationVariant",
requestEntries.size());
- requestResult.accept(Collections.emptyList());
+ resultHandler.complete();
return;
}
@@ -144,19 +145,19 @@ protected void submitRequestEntries(
.whenComplete(
(response, error) -> {
if (error != null) {
- handleFailedRequest(requestEntries, requestResult, error);
+ handleFailedRequest(requestEntries, resultHandler, error);
} else if (response.errors()) {
handlePartiallyFailedRequest(
- requestEntries, requestResult, response);
+ requestEntries, resultHandler, response);
} else {
- handleSuccessfulRequest(requestResult, response);
+ handleSuccessfulRequest(resultHandler, response);
}
});
}
private void handleFailedRequest(
List requestEntries,
- Consumer> requestResult,
+ ResultHandler resultHandler,
Throwable error) {
LOG.warn(
"The BulkRequest of {} operation(s) has failed due to: {}",
@@ -165,14 +166,20 @@ private void handleFailedRequest(
LOG.debug("The BulkRequest has failed", error);
numRecordsOutErrorsCounter.inc(requestEntries.size());
- if (isRetryable(error.getCause())) {
- requestResult.accept(requestEntries);
+ Throwable retryableError = error.getCause() != null ? error.getCause() : error;
+ if (isRetryable(retryableError)) {
+ resultHandler.retryForEntries(requestEntries);
+ } else {
+ resultHandler.completeExceptionally(
+ retryableError instanceof Exception
+ ? (Exception) retryableError
+ : new FlinkRuntimeException(retryableError));
}
}
private void handlePartiallyFailedRequest(
List requestEntries,
- Consumer> requestResult,
+ ResultHandler resultHandler,
BulkResponse response) {
LOG.debug("The BulkRequest has failed partially. Response: {}", response);
ArrayList failedItems = new ArrayList<>();
@@ -192,16 +199,16 @@ private void handlePartiallyFailedRequest(
requestEntries.size(),
failedItems.size(),
response.took());
- requestResult.accept(failedItems);
+ resultHandler.retryForEntries(failedItems);
}
private void handleSuccessfulRequest(
- Consumer> requestResult, BulkResponse response) {
+ ResultHandler resultHandler, BulkResponse response) {
LOG.debug(
"The BulkRequest of {} operation(s) completed successfully. It took {}ms",
response.items().size(),
response.took());
- requestResult.accept(Collections.emptyList());
+ resultHandler.complete();
}
private boolean isRetryable(Throwable error) {
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
index 24e6213ed9e..7345ea0cd0d 100644
--- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml
@@ -198,6 +198,12 @@ limitations under the License.
+
+ org.apache.flink
+ flink-cdc-pipeline-connector-elasticsearch
+ ${project.version}
+ test
+
org.apache.flink
flink-connector-postgres-cdc
@@ -249,6 +255,12 @@ limitations under the License.
${testcontainers.version}
test
+
+ org.testcontainers
+ elasticsearch
+ ${testcontainers.version}
+ test
+
@@ -714,6 +726,15 @@ limitations under the License.
${project.build.directory}/dependencies
+
+ org.apache.flink
+ flink-cdc-pipeline-connector-elasticsearch
+ ${project.version}
+ elasticsearch-cdc-pipeline-connector.jar
+ jar
+ ${project.build.directory}/dependencies
+
+
org.apache.flink
flink-parquet
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToElasticsearch6E2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToElasticsearch6E2eITCase.java
new file mode 100644
index 00000000000..014f704bf0d
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToElasticsearch6E2eITCase.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests;
+
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+import org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost;
+import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.get.GetRequest;
+import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.get.GetResponse;
+import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RequestOptions;
+import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClient;
+import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient;
+import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.xcontent.XContentType;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.stream.Stream;
+
+class MysqlToElasticsearch6E2eITCase extends PipelineTestEnvironment {
+ private static final Logger LOG = LoggerFactory.getLogger(MysqlToElasticsearch6E2eITCase.class);
+
+ protected final UniqueDatabase inventoryDatabase =
+ new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+
+ private static final String ELASTICSEARCH_VERSION = "6.8.20";
+ private static final String INTER_CONTAINER_ES_ALIAS = "elasticsearch";
+
+ private RestHighLevelClient client;
+
+ @Container
+ private static final ElasticsearchContainer ELASTICSEARCH_CONTAINER =
+ createElasticsearchContainer();
+
+ @BeforeAll
+ static void initializeContainers() {
+ LOG.info("Starting containers...");
+ Startables.deepStart(Stream.of(MYSQL, ELASTICSEARCH_CONTAINER)).join();
+ LOG.info("Containers are started.");
+ }
+
+ @BeforeEach
+ public void before() throws Exception {
+ super.before();
+ inventoryDatabase.createAndInitialize();
+ client = createElasticsearchClient();
+ initEsData();
+ }
+
+ @AfterEach
+ public void after() {
+ super.after();
+ inventoryDatabase.dropDatabase();
+ if (client != null) {
+ try {
+ client.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Test
+ void testSyncWholeDatabase() throws Exception {
+ String databaseName = inventoryDatabase.getDatabaseName();
+ String pipelineJob =
+ String.format(
+ "source:\n"
+ + " type: mysql\n"
+ + " hostname: %s\n"
+ + " port: 3306\n"
+ + " username: %s\n"
+ + " password: %s\n"
+ + " tables: %s.\\.*\n"
+ + " server-id: 5400-5404\n"
+ + " server-time-zone: UTC\n"
+ + "\n"
+ + "sink:\n"
+ + " type: elasticsearch\n"
+ + " hosts: %s:9200\n"
+ + " version: 6\n"
+ + "\n"
+ + "pipeline:\n"
+ + " parallelism: %d",
+ INTER_CONTAINER_MYSQL_ALIAS,
+ MYSQL_TEST_USER,
+ MYSQL_TEST_PASSWORD,
+ databaseName,
+ INTER_CONTAINER_ES_ALIAS,
+ parallelism);
+ Path esConnectorJar = TestUtils.getResource("elasticsearch-cdc-pipeline-connector.jar");
+ submitPipelineJob(pipelineJob, esConnectorJar);
+ waitUntilJobRunning(Duration.ofSeconds(30));
+ LOG.info("Pipeline job is running");
+
+ verifySnapshotData(databaseName);
+ verifyIncrementalData(databaseName);
+ }
+
+ private void verifySnapshotData(String databaseName) throws Exception {
+ String productsIndex = databaseName + ".products";
+ String customersIndex = databaseName + ".customers";
+
+ // products id=101 (all fields populated)
+ waitForEsDocument(productsIndex, "101");
+ GetResponse resp =
+ client.get(new GetRequest(productsIndex).id("101"), RequestOptions.DEFAULT);
+ Assertions.assertThat(resp.getSource())
+ .containsEntry("name", "scooter")
+ .containsEntry("description", "Small 2-wheel scooter")
+ .containsEntry("weight", 3.14)
+ .containsEntry("enum_c", "red")
+ .containsEntry("json_c", "{\"key1\": \"value1\"}")
+ .containsEntry("point_c", "{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}");
+
+ // products id=106 (enum_c, json_c, point_c are null)
+ waitForEsDocument(productsIndex, "106");
+ resp = client.get(new GetRequest(productsIndex).id("106"), RequestOptions.DEFAULT);
+ Assertions.assertThat(resp.getSource())
+ .containsEntry("name", "hammer")
+ .containsEntry("description", "16oz carpenter's hammer")
+ .containsEntry("weight", 1.0);
+
+ // products id=109 (last snapshot row)
+ waitForEsDocument(productsIndex, "109");
+ resp = client.get(new GetRequest(productsIndex).id("109"), RequestOptions.DEFAULT);
+ Assertions.assertThat(resp.getSource())
+ .containsEntry("name", "spare tire")
+ .containsEntry("description", "24 inch spare tire")
+ .containsEntry("weight", 22.2);
+
+ // customers id=101
+ waitForEsDocument(customersIndex, "101");
+ resp = client.get(new GetRequest(customersIndex).id("101"), RequestOptions.DEFAULT);
+ Assertions.assertThat(resp.getSource())
+ .containsEntry("name", "user_1")
+ .containsEntry("address", "Shanghai")
+ .containsEntry("phone_number", "123567891234");
+ }
+
+ private void verifyIncrementalData(String databaseName) throws Exception {
+ LOG.info("Begin incremental reading stage.");
+ String productsIndex = databaseName + ".products";
+ String mysqlJdbcUrl =
+ String.format(
+ "jdbc:mysql://%s:%s/%s",
+ MYSQL.getHost(), MYSQL.getDatabasePort(), databaseName);
+ try (Connection conn =
+ DriverManager.getConnection(
+ mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+ Statement stat = conn.createStatement()) {
+
+ // INSERT
+ stat.execute(
+ "INSERT INTO products VALUES "
+ + "(default,'jacket','water resistent white wind breaker',0.2, null, null, null);");
+ waitForEsDocument(productsIndex, "110");
+ GetResponse resp =
+ client.get(new GetRequest(productsIndex).id("110"), RequestOptions.DEFAULT);
+ Assertions.assertThat(resp.getSource())
+ .containsEntry("name", "jacket")
+ .containsEntry("description", "water resistent white wind breaker")
+ .containsEntry("weight", 0.2);
+
+ // UPDATE
+ stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
+ stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
+ waitForEsDocumentField(productsIndex, "106", "description", "18oz carpenter hammer");
+ waitForEsDocumentField(productsIndex, "107", "weight", 5.1);
+
+ // DELETE
+ stat.execute("DELETE FROM products WHERE id=101;");
+ waitForEsDocumentDeleted(productsIndex, "101");
+ }
+ }
+
+ private static ElasticsearchContainer createElasticsearchContainer() {
+ DockerImageName imageName =
+ DockerImageName.parse(
+ "docker.elastic.co/elasticsearch/elasticsearch:"
+ + ELASTICSEARCH_VERSION)
+ .asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
+ ElasticsearchContainer esContainer = new ElasticsearchContainer(imageName);
+ esContainer
+ .withNetwork(NETWORK)
+ .withNetworkAliases(INTER_CONTAINER_ES_ALIAS)
+ .withEnv("xpack.security.enabled", "false")
+ .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g")
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+ return esContainer;
+ }
+
+ private RestHighLevelClient createElasticsearchClient() {
+ return new RestHighLevelClient(
+ RestClient.builder(
+ new HttpHost(
+ ELASTICSEARCH_CONTAINER.getHost(),
+ ELASTICSEARCH_CONTAINER.getFirstMappedPort(),
+ "http")));
+ }
+
+ private void initEsData() throws IOException {
+ String dbName = inventoryDatabase.getDatabaseName();
+ createProductsIndex(dbName);
+ createCustomersIndex(dbName);
+ }
+
+ private void createProductsIndex(String dbName) throws IOException {
+ String indexName = dbName + ".products";
+ String source =
+ "{\"mappings\":{\"_doc\":{\"properties\":{"
+ + "\"id\":{\"type\":\"integer\"},"
+ + "\"name\":{\"type\":\"keyword\"},"
+ + "\"description\":{\"type\":\"text\"},"
+ + "\"weight\":{\"type\":\"float\"},"
+ + "\"enum_c\":{\"type\":\"keyword\"},"
+ + "\"json_c\":{\"type\":\"keyword\"},"
+ + "\"point_c\":{\"type\":\"keyword\"}"
+ + "}}}}";
+ client.indices()
+ .create(
+ new CreateIndexRequest(indexName).source(source, XContentType.JSON),
+ RequestOptions.DEFAULT);
+ }
+
+ private void createCustomersIndex(String dbName) throws IOException {
+ String indexName = dbName + ".customers";
+ String source =
+ "{\"mappings\":{\"_doc\":{\"properties\":{"
+ + "\"id\":{\"type\":\"integer\"},"
+ + "\"name\":{\"type\":\"keyword\"},"
+ + "\"address\":{\"type\":\"text\"},"
+ + "\"phone_number\":{\"type\":\"keyword\"}"
+ + "}}}}";
+ client.indices()
+ .create(
+ new CreateIndexRequest(indexName).source(source, XContentType.JSON),
+ RequestOptions.DEFAULT);
+ }
+
+ private void waitForEsDocument(String indexName, String docId) throws Exception {
+ long deadline = System.currentTimeMillis() + EVENT_WAITING_TIMEOUT.toMillis();
+ while (System.currentTimeMillis() < deadline) {
+ GetRequest getRequest = new GetRequest(indexName).id(docId);
+ GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
+ if (response.isExists()) {
+ return;
+ }
+ Thread.sleep(1000L);
+ }
+ Assertions.fail("Timed out waiting for ES document: " + indexName + "/" + docId);
+ }
+
+ private void waitForEsDocumentField(
+ String indexName, String docId, String field, Object expectedValue) throws Exception {
+ long deadline = System.currentTimeMillis() + EVENT_WAITING_TIMEOUT.toMillis();
+ while (System.currentTimeMillis() < deadline) {
+ GetRequest getRequest = new GetRequest(indexName).id(docId);
+ GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
+ if (response.isExists() && expectedValue.equals(response.getSource().get(field))) {
+ return;
+ }
+ Thread.sleep(1000L);
+ }
+ Assertions.fail(
+ "Timed out waiting for ES document field: "
+ + indexName
+ + "/"
+ + docId
+ + " "
+ + field
+ + "="
+ + expectedValue);
+ }
+
+ private void waitForEsDocumentDeleted(String indexName, String docId) throws Exception {
+ long deadline = System.currentTimeMillis() + EVENT_WAITING_TIMEOUT.toMillis();
+ while (System.currentTimeMillis() < deadline) {
+ GetRequest getRequest = new GetRequest(indexName).id(docId);
+ GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
+ if (!response.isExists()) {
+ return;
+ }
+ Thread.sleep(1000L);
+ }
+ Assertions.fail("Document was not deleted within timeout: " + indexName + "/" + docId);
+ }
+}
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToElasticsearch7E2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToElasticsearch7E2eITCase.java
new file mode 100644
index 00000000000..155629675ff
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToElasticsearch7E2eITCase.java
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests;
+
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+import org.apache.flink.elasticsearch7.shaded.org.apache.http.HttpHost;
+import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
+import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.get.GetRequest;
+import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.action.get.GetResponse;
+import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RequestOptions;
+import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestClient;
+import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.client.RestHighLevelClient;
+import org.apache.flink.elasticsearch7.shaded.org.elasticsearch.common.xcontent.XContentType;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.stream.Stream;
+
+class MysqlToElasticsearch7E2eITCase extends PipelineTestEnvironment {
+ private static final Logger LOG = LoggerFactory.getLogger(MysqlToElasticsearch7E2eITCase.class);
+
+ protected final UniqueDatabase inventoryDatabase =
+ new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+
+ private static final String ELASTICSEARCH_VERSION = "7.10.2";
+ private static final String INTER_CONTAINER_ES_ALIAS = "elasticsearch7";
+
+ private RestHighLevelClient client;
+
+ @Container
+ private static final ElasticsearchContainer ELASTICSEARCH_CONTAINER =
+ createElasticsearchContainer();
+
+ @BeforeAll
+ static void initializeContainers() {
+ LOG.info("Starting containers...");
+ Startables.deepStart(Stream.of(MYSQL, ELASTICSEARCH_CONTAINER)).join();
+ LOG.info("Containers are started.");
+ }
+
+ @BeforeEach
+ public void before() throws Exception {
+ super.before();
+ inventoryDatabase.createAndInitialize();
+ client = createElasticsearchClient();
+ initEsData();
+ }
+
+ @AfterEach
+ public void after() {
+ super.after();
+ inventoryDatabase.dropDatabase();
+ if (client != null) {
+ try {
+ client.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Test
+ void testSyncWholeDatabase() throws Exception {
+ String databaseName = inventoryDatabase.getDatabaseName();
+ String pipelineJob =
+ String.format(
+ "source:\n"
+ + " type: mysql\n"
+ + " hostname: %s\n"
+ + " port: 3306\n"
+ + " username: %s\n"
+ + " password: %s\n"
+ + " tables: %s.\\.*\n"
+ + " server-id: 5400-5404\n"
+ + " server-time-zone: UTC\n"
+ + "\n"
+ + "sink:\n"
+ + " type: elasticsearch\n"
+ + " hosts: %s:9200\n"
+ + " version: 7\n"
+ + "\n"
+ + "pipeline:\n"
+ + " parallelism: %d",
+ INTER_CONTAINER_MYSQL_ALIAS,
+ MYSQL_TEST_USER,
+ MYSQL_TEST_PASSWORD,
+ databaseName,
+ INTER_CONTAINER_ES_ALIAS,
+ parallelism);
+ Path esConnectorJar = TestUtils.getResource("elasticsearch-cdc-pipeline-connector.jar");
+ submitPipelineJob(pipelineJob, esConnectorJar);
+ waitUntilJobRunning(Duration.ofSeconds(30));
+ LOG.info("Pipeline job is running");
+
+ verifySnapshotData(databaseName);
+ verifyIncrementalData(databaseName);
+ }
+
+ private void verifySnapshotData(String databaseName) throws Exception {
+ String productsIndex = databaseName + ".products";
+ String customersIndex = databaseName + ".customers";
+
+ // products id=101 (all fields populated)
+ waitForEsDocument(productsIndex, "101");
+ GetResponse resp =
+ client.get(new GetRequest(productsIndex).id("101"), RequestOptions.DEFAULT);
+ Assertions.assertThat(resp.getSource())
+ .containsEntry("name", "scooter")
+ .containsEntry("description", "Small 2-wheel scooter")
+ .containsEntry("weight", 3.14)
+ .containsEntry("enum_c", "red")
+ .containsEntry("json_c", "{\"key1\": \"value1\"}")
+ .containsEntry("point_c", "{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}");
+
+ // products id=106 (enum_c, json_c, point_c are null)
+ waitForEsDocument(productsIndex, "106");
+ resp = client.get(new GetRequest(productsIndex).id("106"), RequestOptions.DEFAULT);
+ Assertions.assertThat(resp.getSource())
+ .containsEntry("name", "hammer")
+ .containsEntry("description", "16oz carpenter's hammer")
+ .containsEntry("weight", 1.0);
+
+ // products id=109 (last snapshot row)
+ waitForEsDocument(productsIndex, "109");
+ resp = client.get(new GetRequest(productsIndex).id("109"), RequestOptions.DEFAULT);
+ Assertions.assertThat(resp.getSource())
+ .containsEntry("name", "spare tire")
+ .containsEntry("description", "24 inch spare tire")
+ .containsEntry("weight", 22.2);
+
+ // customers id=101
+ waitForEsDocument(customersIndex, "101");
+ resp = client.get(new GetRequest(customersIndex).id("101"), RequestOptions.DEFAULT);
+ Assertions.assertThat(resp.getSource())
+ .containsEntry("name", "user_1")
+ .containsEntry("address", "Shanghai")
+ .containsEntry("phone_number", "123567891234");
+ }
+
+ private void verifyIncrementalData(String databaseName) throws Exception {
+ LOG.info("Begin incremental reading stage.");
+ String productsIndex = databaseName + ".products";
+ String mysqlJdbcUrl =
+ String.format(
+ "jdbc:mysql://%s:%s/%s",
+ MYSQL.getHost(), MYSQL.getDatabasePort(), databaseName);
+ try (Connection conn =
+ DriverManager.getConnection(
+ mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+ Statement stat = conn.createStatement()) {
+
+ // INSERT
+ stat.execute(
+ "INSERT INTO products VALUES "
+ + "(default,'jacket','water resistent white wind breaker',0.2, null, null, null);");
+ waitForEsDocument(productsIndex, "110");
+ GetResponse resp =
+ client.get(new GetRequest(productsIndex).id("110"), RequestOptions.DEFAULT);
+ Assertions.assertThat(resp.getSource())
+ .containsEntry("name", "jacket")
+ .containsEntry("description", "water resistent white wind breaker")
+ .containsEntry("weight", 0.2);
+
+ // UPDATE
+ stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
+ stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
+ waitForEsDocumentField(productsIndex, "106", "description", "18oz carpenter hammer");
+ waitForEsDocumentField(productsIndex, "107", "weight", 5.1);
+
+ // DELETE
+ stat.execute("DELETE FROM products WHERE id=101;");
+ waitForEsDocumentDeleted(productsIndex, "101");
+ }
+ }
+
+ private static ElasticsearchContainer createElasticsearchContainer() {
+ DockerImageName imageName =
+ DockerImageName.parse(
+ "docker.elastic.co/elasticsearch/elasticsearch:"
+ + ELASTICSEARCH_VERSION)
+ .asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
+ ElasticsearchContainer esContainer = new ElasticsearchContainer(imageName);
+ esContainer
+ .withNetwork(NETWORK)
+ .withNetworkAliases(INTER_CONTAINER_ES_ALIAS)
+ .withEnv("xpack.security.enabled", "false")
+ .withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g")
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+ return esContainer;
+ }
+
+ private RestHighLevelClient createElasticsearchClient() {
+ return new RestHighLevelClient(
+ RestClient.builder(
+ new HttpHost(
+ ELASTICSEARCH_CONTAINER.getHost(),
+ ELASTICSEARCH_CONTAINER.getFirstMappedPort(),
+ "http")));
+ }
+
+ private void initEsData() throws IOException {
+ String dbName = inventoryDatabase.getDatabaseName();
+ createProductsIndex(dbName);
+ createCustomersIndex(dbName);
+ }
+
+ private void createProductsIndex(String dbName) throws IOException {
+ String indexName = dbName + ".products";
+ String source =
+ "{\"mappings\":{\"_doc\":{\"properties\":{"
+ + "\"id\":{\"type\":\"integer\"},"
+ + "\"name\":{\"type\":\"keyword\"},"
+ + "\"description\":{\"type\":\"text\"},"
+ + "\"weight\":{\"type\":\"float\"},"
+ + "\"enum_c\":{\"type\":\"keyword\"},"
+ + "\"json_c\":{\"type\":\"keyword\"},"
+ + "\"point_c\":{\"type\":\"keyword\"}"
+ + "}}}}";
+ client.indices()
+ .create(
+ new CreateIndexRequest(indexName).source(source, XContentType.JSON),
+ RequestOptions.DEFAULT);
+ }
+
+ private void createCustomersIndex(String dbName) throws IOException {
+ String indexName = dbName + ".customers";
+ String source =
+ "{\"mappings\":{\"_doc\":{\"properties\":{"
+ + "\"id\":{\"type\":\"integer\"},"
+ + "\"name\":{\"type\":\"keyword\"},"
+ + "\"address\":{\"type\":\"text\"},"
+ + "\"phone_number\":{\"type\":\"keyword\"}"
+ + "}}}}";
+ client.indices()
+ .create(
+ new CreateIndexRequest(indexName).source(source, XContentType.JSON),
+ RequestOptions.DEFAULT);
+ }
+
+ private void waitForEsDocument(String indexName, String docId) throws Exception {
+ long deadline = System.currentTimeMillis() + EVENT_WAITING_TIMEOUT.toMillis();
+ while (System.currentTimeMillis() < deadline) {
+ GetRequest getRequest = new GetRequest(indexName).id(docId);
+ GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
+ if (response.isExists()) {
+ return;
+ }
+ Thread.sleep(1000L);
+ }
+ Assertions.fail("Timed out waiting for ES document: " + indexName + "/" + docId);
+ }
+
+ private void waitForEsDocumentField(
+ String indexName, String docId, String field, Object expectedValue) throws Exception {
+ long deadline = System.currentTimeMillis() + EVENT_WAITING_TIMEOUT.toMillis();
+ while (System.currentTimeMillis() < deadline) {
+ GetRequest getRequest = new GetRequest(indexName).id(docId);
+ GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
+ if (response.isExists() && expectedValue.equals(response.getSource().get(field))) {
+ return;
+ }
+ Thread.sleep(1000L);
+ }
+ Assertions.fail(
+ "Timed out waiting for ES document field: "
+ + indexName
+ + "/"
+ + docId
+ + " "
+ + field
+ + "="
+ + expectedValue);
+ }
+
+ private void waitForEsDocumentDeleted(String indexName, String docId) throws Exception {
+ long deadline = System.currentTimeMillis() + EVENT_WAITING_TIMEOUT.toMillis();
+ while (System.currentTimeMillis() < deadline) {
+ GetRequest getRequest = new GetRequest(indexName).id(docId);
+ GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
+ if (!response.isExists()) {
+ return;
+ }
+ Thread.sleep(1000L);
+ }
+ Assertions.fail("Document was not deleted within timeout: " + indexName + "/" + docId);
+ }
+}
diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToElasticsearch8E2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToElasticsearch8E2eITCase.java
new file mode 100644
index 00000000000..c737036af75
--- /dev/null
+++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MysqlToElasticsearch8E2eITCase.java
@@ -0,0 +1,372 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.pipeline.tests;
+
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
+
+import co.elastic.clients.elasticsearch.ElasticsearchClient;
+import co.elastic.clients.elasticsearch.core.GetRequest;
+import co.elastic.clients.elasticsearch.core.GetResponse;
+import co.elastic.clients.json.jackson.JacksonJsonpMapper;
+import co.elastic.clients.transport.rest_client.RestClientTransport;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.assertj.core.api.Assertions;
+import org.elasticsearch.client.RestClient;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Map;
+import java.util.stream.Stream;
+
+class MysqlToElasticsearch8E2eITCase extends PipelineTestEnvironment {
+ private static final Logger LOG = LoggerFactory.getLogger(MysqlToElasticsearch8E2eITCase.class);
+
+ protected final UniqueDatabase inventoryDatabase =
+ new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
+
+ private static final String ELASTICSEARCH_VERSION = "8.12.1";
+ private static final String INTER_CONTAINER_ES_ALIAS = "elasticsearch8";
+ private static final String DEFAULT_USERNAME = "elastic";
+ private static final String DEFAULT_PASSWORD = "123456";
+
+ private ElasticsearchClient client;
+
+ @Container
+ private static final ElasticsearchContainer ELASTICSEARCH_CONTAINER =
+ createElasticsearchContainer();
+
+ @BeforeAll
+ static void initializeContainers() {
+ LOG.info("Starting containers...");
+ Startables.deepStart(Stream.of(MYSQL, ELASTICSEARCH_CONTAINER)).join();
+ LOG.info("Containers are started.");
+ }
+
+ @BeforeEach
+ public void before() throws Exception {
+ super.before();
+ inventoryDatabase.createAndInitialize();
+ client = createElasticsearchClient();
+ initEsData();
+ }
+
+ @AfterEach
+ public void after() {
+ super.after();
+ inventoryDatabase.dropDatabase();
+ if (client != null) {
+ try {
+ client.shutdown();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Test
+ void testSyncWholeDatabase() throws Exception {
+ String databaseName = inventoryDatabase.getDatabaseName();
+ String pipelineJob =
+ String.format(
+ "source:\n"
+ + " type: mysql\n"
+ + " hostname: %s\n"
+ + " port: 3306\n"
+ + " username: %s\n"
+ + " password: %s\n"
+ + " tables: %s.\\.*\n"
+ + " server-id: 5400-5404\n"
+ + " server-time-zone: UTC\n"
+ + "\n"
+ + "sink:\n"
+ + " type: elasticsearch\n"
+ + " hosts: %s:9200\n"
+ + " version: 8\n"
+ + " username: %s\n"
+ + " password: %s\n"
+ + "\n"
+ + "pipeline:\n"
+ + " parallelism: %d",
+ INTER_CONTAINER_MYSQL_ALIAS,
+ MYSQL_TEST_USER,
+ MYSQL_TEST_PASSWORD,
+ databaseName,
+ INTER_CONTAINER_ES_ALIAS,
+ DEFAULT_USERNAME,
+ DEFAULT_PASSWORD,
+ parallelism);
+ Path esConnectorJar = TestUtils.getResource("elasticsearch-cdc-pipeline-connector.jar");
+ submitPipelineJob(pipelineJob, esConnectorJar);
+ waitUntilJobRunning(Duration.ofSeconds(30));
+ LOG.info("Pipeline job is running");
+
+ verifySnapshotData(databaseName);
+ verifyIncrementalData(databaseName);
+ }
+
+ private void verifySnapshotData(String databaseName) throws Exception {
+ String productsIndex = databaseName + ".products";
+ String customersIndex = databaseName + ".customers";
+
+ // products id=101 (all fields populated)
+ waitForEsDocument(productsIndex, "101");
+ GetResponse