Skip to content

Commit 3cd451f

Browse files
committed
add the e2e test
Signed-off-by: Pei Yu <125331682@qq.com>
1 parent 2ae5332 commit 3cd451f

5 files changed

Lines changed: 1050 additions & 0 deletions

File tree

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-elasticsearch/src/main/java/org/apache/flink/cdc/connectors/elasticsearch/v2/Elasticsearch8AsyncSink.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,21 @@ public SinkWriter<InputT> createWriter(InitContext context) {
9999
Collections.emptyList());
100100
}
101101

102+
@Override
103+
public SinkWriter<InputT> createWriter(WriterInitContext context) throws IOException {
104+
return new Elasticsearch8AsyncWriter<>(
105+
getElementConverter(),
106+
new InitContextAdapter(context),
107+
getMaxBatchSize(),
108+
getMaxInFlightRequests(),
109+
getMaxBufferedRequests(),
110+
getMaxBatchSizeInBytes(),
111+
getMaxTimeInBufferMS(),
112+
getMaxRecordSizeInBytes(),
113+
networkConfig,
114+
Collections.emptyList());
115+
}
116+
102117
/**
103118
* Restores a {@link StatefulSinkWriterAdapter} from a previously saved state.
104119
*

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/pom.xml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,12 @@ limitations under the License.
198198
</exclusion>
199199
</exclusions>
200200
</dependency>
201+
<dependency>
202+
<groupId>org.apache.flink</groupId>
203+
<artifactId>flink-cdc-pipeline-connector-elasticsearch</artifactId>
204+
<version>${project.version}</version>
205+
<scope>test</scope>
206+
</dependency>
201207
<dependency>
202208
<groupId>org.apache.flink</groupId>
203209
<artifactId>flink-connector-postgres-cdc</artifactId>
@@ -249,6 +255,12 @@ limitations under the License.
249255
<version>${testcontainers.version}</version>
250256
<scope>test</scope>
251257
</dependency>
258+
<dependency>
259+
<groupId>org.testcontainers</groupId>
260+
<artifactId>elasticsearch</artifactId>
261+
<version>${testcontainers.version}</version>
262+
<scope>test</scope>
263+
</dependency>
252264

253265
<!-- benchmark -->
254266
<dependency>
@@ -714,6 +726,15 @@ limitations under the License.
714726
<outputDirectory>${project.build.directory}/dependencies
715727
</outputDirectory>
716728
</artifactItem>
729+
<artifactItem>
730+
<groupId>org.apache.flink</groupId>
731+
<artifactId>flink-cdc-pipeline-connector-elasticsearch</artifactId>
732+
<version>${project.version}</version>
733+
<destFileName>elasticsearch-cdc-pipeline-connector.jar</destFileName>
734+
<type>jar</type>
735+
<outputDirectory>${project.build.directory}/dependencies
736+
</outputDirectory>
737+
</artifactItem>
717738
<artifactItem>
718739
<groupId>org.apache.flink</groupId>
719740
<artifactId>flink-parquet</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,321 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.pipeline.tests;
19+
20+
import org.apache.flink.cdc.common.test.utils.TestUtils;
21+
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
22+
import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
23+
import org.apache.flink.elasticsearch6.shaded.org.apache.http.HttpHost;
24+
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
25+
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.get.GetRequest;
26+
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.action.get.GetResponse;
27+
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RequestOptions;
28+
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestClient;
29+
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.client.RestHighLevelClient;
30+
import org.apache.flink.elasticsearch6.shaded.org.elasticsearch.common.xcontent.XContentType;
31+
32+
import org.assertj.core.api.Assertions;
33+
import org.junit.jupiter.api.AfterEach;
34+
import org.junit.jupiter.api.BeforeAll;
35+
import org.junit.jupiter.api.BeforeEach;
36+
import org.junit.jupiter.api.Test;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
39+
import org.testcontainers.containers.output.Slf4jLogConsumer;
40+
import org.testcontainers.elasticsearch.ElasticsearchContainer;
41+
import org.testcontainers.junit.jupiter.Container;
42+
import org.testcontainers.lifecycle.Startables;
43+
import org.testcontainers.utility.DockerImageName;
44+
45+
import java.io.IOException;
46+
import java.nio.file.Path;
47+
import java.sql.Connection;
48+
import java.sql.DriverManager;
49+
import java.sql.Statement;
50+
import java.time.Duration;
51+
import java.util.stream.Stream;
52+
53+
class MysqlToElasticsearch6E2eITCase extends PipelineTestEnvironment {
54+
private static final Logger LOG = LoggerFactory.getLogger(MysqlToElasticsearch6E2eITCase.class);
55+
56+
protected final UniqueDatabase inventoryDatabase =
57+
new UniqueDatabase(MYSQL, "mysql_inventory", MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
58+
59+
private static final String ELASTICSEARCH_VERSION = "6.8.20";
60+
private static final String INTER_CONTAINER_ES_ALIAS = "elasticsearch";
61+
62+
private RestHighLevelClient client;
63+
64+
@Container
65+
private static final ElasticsearchContainer ELASTICSEARCH_CONTAINER =
66+
createElasticsearchContainer();
67+
68+
@BeforeAll
69+
static void initializeContainers() {
70+
LOG.info("Starting containers...");
71+
Startables.deepStart(Stream.of(MYSQL, ELASTICSEARCH_CONTAINER)).join();
72+
LOG.info("Containers are started.");
73+
}
74+
75+
@BeforeEach
76+
public void before() throws Exception {
77+
super.before();
78+
inventoryDatabase.createAndInitialize();
79+
client = createElasticsearchClient();
80+
initEsData();
81+
}
82+
83+
@AfterEach
84+
public void after() {
85+
super.after();
86+
inventoryDatabase.dropDatabase();
87+
if (client != null) {
88+
try {
89+
client.close();
90+
} catch (IOException e) {
91+
throw new RuntimeException(e);
92+
}
93+
}
94+
}
95+
96+
@Test
97+
void testSyncWholeDatabase() throws Exception {
98+
String databaseName = inventoryDatabase.getDatabaseName();
99+
String pipelineJob =
100+
String.format(
101+
"source:\n"
102+
+ " type: mysql\n"
103+
+ " hostname: %s\n"
104+
+ " port: 3306\n"
105+
+ " username: %s\n"
106+
+ " password: %s\n"
107+
+ " tables: %s.\\.*\n"
108+
+ " server-id: 5400-5404\n"
109+
+ " server-time-zone: UTC\n"
110+
+ "\n"
111+
+ "sink:\n"
112+
+ " type: elasticsearch\n"
113+
+ " hosts: %s:9200\n"
114+
+ " version: 6\n"
115+
+ "\n"
116+
+ "pipeline:\n"
117+
+ " parallelism: %d",
118+
INTER_CONTAINER_MYSQL_ALIAS,
119+
MYSQL_TEST_USER,
120+
MYSQL_TEST_PASSWORD,
121+
databaseName,
122+
INTER_CONTAINER_ES_ALIAS,
123+
parallelism);
124+
Path esConnectorJar = TestUtils.getResource("elasticsearch-cdc-pipeline-connector.jar");
125+
submitPipelineJob(pipelineJob, esConnectorJar);
126+
waitUntilJobRunning(Duration.ofSeconds(30));
127+
LOG.info("Pipeline job is running");
128+
129+
verifySnapshotData(databaseName);
130+
verifyIncrementalData(databaseName);
131+
}
132+
133+
private void verifySnapshotData(String databaseName) throws Exception {
134+
String productsIndex = databaseName + ".products";
135+
String customersIndex = databaseName + ".customers";
136+
137+
// products id=101 (all fields populated)
138+
waitForEsDocument(productsIndex, "101");
139+
GetResponse resp =
140+
client.get(new GetRequest(productsIndex).id("101"), RequestOptions.DEFAULT);
141+
Assertions.assertThat(resp.getSource())
142+
.containsEntry("name", "scooter")
143+
.containsEntry("description", "Small 2-wheel scooter")
144+
.containsEntry("weight", 3.14)
145+
.containsEntry("enum_c", "red")
146+
.containsEntry("json_c", "{\"key1\": \"value1\"}")
147+
.containsEntry("point_c", "{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}");
148+
149+
// products id=106 (enum_c, json_c, point_c are null)
150+
waitForEsDocument(productsIndex, "106");
151+
resp = client.get(new GetRequest(productsIndex).id("106"), RequestOptions.DEFAULT);
152+
Assertions.assertThat(resp.getSource())
153+
.containsEntry("name", "hammer")
154+
.containsEntry("description", "16oz carpenter's hammer")
155+
.containsEntry("weight", 1.0);
156+
157+
// products id=109 (last snapshot row)
158+
waitForEsDocument(productsIndex, "109");
159+
resp = client.get(new GetRequest(productsIndex).id("109"), RequestOptions.DEFAULT);
160+
Assertions.assertThat(resp.getSource())
161+
.containsEntry("name", "spare tire")
162+
.containsEntry("description", "24 inch spare tire")
163+
.containsEntry("weight", 22.2);
164+
165+
// customers id=101
166+
waitForEsDocument(customersIndex, "101");
167+
resp = client.get(new GetRequest(customersIndex).id("101"), RequestOptions.DEFAULT);
168+
Assertions.assertThat(resp.getSource())
169+
.containsEntry("name", "user_1")
170+
.containsEntry("address", "Shanghai")
171+
.containsEntry("phone_number", "123567891234");
172+
}
173+
174+
private void verifyIncrementalData(String databaseName) throws Exception {
175+
LOG.info("Begin incremental reading stage.");
176+
String productsIndex = databaseName + ".products";
177+
String mysqlJdbcUrl =
178+
String.format(
179+
"jdbc:mysql://%s:%s/%s",
180+
MYSQL.getHost(), MYSQL.getDatabasePort(), databaseName);
181+
try (Connection conn =
182+
DriverManager.getConnection(
183+
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
184+
Statement stat = conn.createStatement()) {
185+
186+
// INSERT
187+
stat.execute(
188+
"INSERT INTO products VALUES "
189+
+ "(default,'jacket','water resistent white wind breaker',0.2, null, null, null);");
190+
waitForEsDocument(productsIndex, "110");
191+
GetResponse resp =
192+
client.get(new GetRequest(productsIndex).id("110"), RequestOptions.DEFAULT);
193+
Assertions.assertThat(resp.getSource())
194+
.containsEntry("name", "jacket")
195+
.containsEntry("description", "water resistent white wind breaker")
196+
.containsEntry("weight", 0.2);
197+
198+
// UPDATE
199+
stat.execute("UPDATE products SET description='18oz carpenter hammer' WHERE id=106;");
200+
stat.execute("UPDATE products SET weight='5.1' WHERE id=107;");
201+
waitForEsDocumentField(productsIndex, "106", "description", "18oz carpenter hammer");
202+
waitForEsDocumentField(productsIndex, "107", "weight", 5.1);
203+
204+
// DELETE
205+
stat.execute("DELETE FROM products WHERE id=101;");
206+
waitForEsDocumentDeleted(productsIndex, "101");
207+
}
208+
}
209+
210+
private static ElasticsearchContainer createElasticsearchContainer() {
211+
DockerImageName imageName =
212+
DockerImageName.parse(
213+
"docker.elastic.co/elasticsearch/elasticsearch:"
214+
+ ELASTICSEARCH_VERSION)
215+
.asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
216+
ElasticsearchContainer esContainer = new ElasticsearchContainer(imageName);
217+
esContainer
218+
.withNetwork(NETWORK)
219+
.withNetworkAliases(INTER_CONTAINER_ES_ALIAS)
220+
.withEnv("xpack.security.enabled", "false")
221+
.withEnv("ES_JAVA_OPTS", "-Xms2g -Xmx2g")
222+
.withLogConsumer(new Slf4jLogConsumer(LOG));
223+
return esContainer;
224+
}
225+
226+
private RestHighLevelClient createElasticsearchClient() {
227+
return new RestHighLevelClient(
228+
RestClient.builder(
229+
new HttpHost(
230+
ELASTICSEARCH_CONTAINER.getHost(),
231+
ELASTICSEARCH_CONTAINER.getFirstMappedPort(),
232+
"http")));
233+
}
234+
235+
private void initEsData() throws IOException {
236+
String dbName = inventoryDatabase.getDatabaseName();
237+
createProductsIndex(dbName);
238+
createCustomersIndex(dbName);
239+
}
240+
241+
private void createProductsIndex(String dbName) throws IOException {
242+
String indexName = dbName + ".products";
243+
String source =
244+
"{\"mappings\":{\"_doc\":{\"properties\":{"
245+
+ "\"id\":{\"type\":\"integer\"},"
246+
+ "\"name\":{\"type\":\"keyword\"},"
247+
+ "\"description\":{\"type\":\"text\"},"
248+
+ "\"weight\":{\"type\":\"float\"},"
249+
+ "\"enum_c\":{\"type\":\"keyword\"},"
250+
+ "\"json_c\":{\"type\":\"keyword\"},"
251+
+ "\"point_c\":{\"type\":\"keyword\"}"
252+
+ "}}}}";
253+
client.indices()
254+
.create(
255+
new CreateIndexRequest(indexName).source(source, XContentType.JSON),
256+
RequestOptions.DEFAULT);
257+
}
258+
259+
private void createCustomersIndex(String dbName) throws IOException {
260+
String indexName = dbName + ".customers";
261+
String source =
262+
"{\"mappings\":{\"_doc\":{\"properties\":{"
263+
+ "\"id\":{\"type\":\"integer\"},"
264+
+ "\"name\":{\"type\":\"keyword\"},"
265+
+ "\"address\":{\"type\":\"text\"},"
266+
+ "\"phone_number\":{\"type\":\"keyword\"}"
267+
+ "}}}}";
268+
client.indices()
269+
.create(
270+
new CreateIndexRequest(indexName).source(source, XContentType.JSON),
271+
RequestOptions.DEFAULT);
272+
}
273+
274+
private void waitForEsDocument(String indexName, String docId) throws Exception {
275+
long deadline = System.currentTimeMillis() + EVENT_WAITING_TIMEOUT.toMillis();
276+
while (System.currentTimeMillis() < deadline) {
277+
GetRequest getRequest = new GetRequest(indexName).id(docId);
278+
GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
279+
if (response.isExists()) {
280+
return;
281+
}
282+
Thread.sleep(1000L);
283+
}
284+
Assertions.fail("Timed out waiting for ES document: " + indexName + "/" + docId);
285+
}
286+
287+
private void waitForEsDocumentField(
288+
String indexName, String docId, String field, Object expectedValue) throws Exception {
289+
long deadline = System.currentTimeMillis() + EVENT_WAITING_TIMEOUT.toMillis();
290+
while (System.currentTimeMillis() < deadline) {
291+
GetRequest getRequest = new GetRequest(indexName).id(docId);
292+
GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
293+
if (response.isExists() && expectedValue.equals(response.getSource().get(field))) {
294+
return;
295+
}
296+
Thread.sleep(1000L);
297+
}
298+
Assertions.fail(
299+
"Timed out waiting for ES document field: "
300+
+ indexName
301+
+ "/"
302+
+ docId
303+
+ " "
304+
+ field
305+
+ "="
306+
+ expectedValue);
307+
}
308+
309+
private void waitForEsDocumentDeleted(String indexName, String docId) throws Exception {
310+
long deadline = System.currentTimeMillis() + EVENT_WAITING_TIMEOUT.toMillis();
311+
while (System.currentTimeMillis() < deadline) {
312+
GetRequest getRequest = new GetRequest(indexName).id(docId);
313+
GetResponse response = client.get(getRequest, RequestOptions.DEFAULT);
314+
if (!response.isExists()) {
315+
return;
316+
}
317+
Thread.sleep(1000L);
318+
}
319+
Assertions.fail("Document was not deleted within timeout: " + indexName + "/" + docId);
320+
}
321+
}

0 commit comments

Comments
 (0)