Skip to content

Commit e92c029

Browse files
committed
Enhancement: Add e2e integration and integration tests for oceanbase-cdc connector
1 parent 0842fee commit e92c029

12 files changed

Lines changed: 1759 additions & 22 deletions

File tree

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,13 @@ limitations under the License.
169169

170170
<!-- test dependencies on TestContainers -->
171171

172+
<dependency>
173+
<groupId>org.testcontainers</groupId>
174+
<artifactId>junit-jupiter</artifactId>
175+
<version>${testcontainers.version}</version>
176+
<scope>test</scope>
177+
</dependency>
178+
172179
<dependency>
173180
<groupId>org.testcontainers</groupId>
174181
<artifactId>mysql</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.oceanbase;
19+
20+
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
21+
import org.apache.flink.table.api.EnvironmentSettings;
22+
import org.apache.flink.table.api.TableResult;
23+
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
24+
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
25+
import org.apache.flink.types.Row;
26+
import org.apache.flink.util.CloseableIterator;
27+
import org.apache.flink.util.StringUtils;
28+
29+
import org.junit.jupiter.api.AfterAll;
30+
import org.junit.jupiter.api.BeforeAll;
31+
import org.junit.jupiter.api.BeforeEach;
32+
import org.junit.jupiter.params.ParameterizedTest;
33+
import org.junit.jupiter.params.provider.Arguments;
34+
import org.junit.jupiter.params.provider.MethodSource;
35+
36+
import java.sql.Connection;
37+
import java.sql.Statement;
38+
import java.util.Arrays;
39+
import java.util.stream.Stream;
40+
41+
/** Test supporting different column charsets for OceanBase. */
42+
public class OceanBaseCharsetITCase extends OceanBaseSourceTestBase {
43+
44+
private static final String DDL_FILE = "charset_test";
45+
private static final String DATABASE_NAME = "cdc_c_" + getRandomSuffix();
46+
47+
private final StreamExecutionEnvironment env =
48+
StreamExecutionEnvironment.getExecutionEnvironment();
49+
50+
private final StreamTableEnvironment tEnv =
51+
StreamTableEnvironment.create(
52+
env, EnvironmentSettings.newInstance().inStreamingMode().build());
53+
54+
@BeforeAll
55+
public static void beforeClass() throws InterruptedException {
56+
initializeOceanBaseTables(
57+
DDL_FILE,
58+
DATABASE_NAME,
59+
s -> // see:
60+
// https://www.oceanbase.com/docs/common-oceanbase-database-cn-1000000002017544
61+
!StringUtils.isNullOrWhitespaceOnly(s)
62+
&& (s.contains("utf8_test")
63+
|| s.contains("latin1_test")
64+
|| s.contains("gbk_test")
65+
|| s.contains("big5_test")
66+
|| s.contains("ascii_test")
67+
|| s.contains("sjis_test")));
68+
}
69+
70+
@AfterAll
71+
public static void after() {
72+
dropDatabase(DATABASE_NAME);
73+
}
74+
75+
@BeforeEach
76+
public void before() {
77+
TestValuesTableFactory.clearAllData();
78+
env.setParallelism(4);
79+
env.enableCheckpointing(200);
80+
}
81+
82+
public static Stream<Arguments> parameters() {
83+
return Stream.of(
84+
Arguments.of(
85+
"utf8_test",
86+
new String[] {"+I[1, 测试数据]", "+I[2, Craig Marshall]", "+I[3, 另一个测试数据]"},
87+
new String[] {
88+
"-D[1, 测试数据]",
89+
"-D[2, Craig Marshall]",
90+
"-D[3, 另一个测试数据]",
91+
"+I[11, 测试数据]",
92+
"+I[12, Craig Marshall]",
93+
"+I[13, 另一个测试数据]"
94+
}),
95+
Arguments.of(
96+
"ascii_test",
97+
new String[] {
98+
"+I[1, ascii test!?]", "+I[2, Craig Marshall]", "+I[3, {test}]"
99+
},
100+
new String[] {
101+
"-D[1, ascii test!?]",
102+
"-D[2, Craig Marshall]",
103+
"-D[3, {test}]",
104+
"+I[11, ascii test!?]",
105+
"+I[12, Craig Marshall]",
106+
"+I[13, {test}]"
107+
}),
108+
Arguments.of(
109+
"gbk_test",
110+
new String[] {"+I[1, 测试数据]", "+I[2, Craig Marshall]", "+I[3, 另一个测试数据]"},
111+
new String[] {
112+
"-D[1, 测试数据]",
113+
"-D[2, Craig Marshall]",
114+
"-D[3, 另一个测试数据]",
115+
"+I[11, 测试数据]",
116+
"+I[12, Craig Marshall]",
117+
"+I[13, 另一个测试数据]"
118+
}),
119+
Arguments.of(
120+
"latin1_test",
121+
new String[] {"+I[1, ÀÆÉ]", "+I[2, Craig Marshall]", "+I[3, Üæû]"},
122+
new String[] {
123+
"-D[1, ÀÆÉ]",
124+
"-D[2, Craig Marshall]",
125+
"-D[3, Üæû]",
126+
"+I[11, ÀÆÉ]",
127+
"+I[12, Craig Marshall]",
128+
"+I[13, Üæû]"
129+
}),
130+
Arguments.of(
131+
"big5_test",
132+
new String[] {"+I[1, 大五]", "+I[2, Craig Marshall]", "+I[3, 丹店]"},
133+
new String[] {
134+
"-D[1, 大五]",
135+
"-D[2, Craig Marshall]",
136+
"-D[3, 丹店]",
137+
"+I[11, 大五]",
138+
"+I[12, Craig Marshall]",
139+
"+I[13, 丹店]"
140+
}),
141+
Arguments.of(
142+
"sjis_test",
143+
new String[] {"+I[1, ひびぴ]", "+I[2, Craig Marshall]", "+I[3, フブプ]"},
144+
new String[] {
145+
"-D[1, ひびぴ]",
146+
"-D[2, Craig Marshall]",
147+
"-D[3, フブプ]",
148+
"+I[11, ひびぴ]",
149+
"+I[12, Craig Marshall]",
150+
"+I[13, フブプ]"
151+
}));
152+
}
153+
154+
@ParameterizedTest
155+
@MethodSource("parameters")
156+
public void testCharset(String testName, String[] snapshotExpected, String[] binlogExpected)
157+
throws Exception {
158+
String sourceDDL =
159+
String.format(
160+
"CREATE TABLE %s (\n"
161+
+ " table_id BIGINT,\n"
162+
+ " table_name STRING,\n"
163+
+ " primary key(table_id) not enforced"
164+
+ ") WITH ("
165+
+ " 'connector' = 'oceanbase-cdc',"
166+
+ " 'hostname' = '%s',"
167+
+ " 'port' = '%s',"
168+
+ " 'username' = '%s',"
169+
+ " 'password' = '%s',"
170+
+ " 'database-name' = '%s',"
171+
+ " 'table-name' = '%s',"
172+
+ " 'scan.incremental.snapshot.enabled' = '%s',"
173+
+ " 'server-id' = '%s',"
174+
+ " 'server-time-zone' = 'Asia/Shanghai',"
175+
+ " 'jdbc.properties.connectTimeout' = '6000000000',"
176+
+ " 'jdbc.properties.socketTimeout' = '6000000000',"
177+
+ " 'jdbc.properties.autoReconnect' = 'true',"
178+
+ " 'jdbc.properties.failOverReadOnly' = 'false',"
179+
+ " 'scan.incremental.snapshot.chunk.size' = '%s'"
180+
+ ")",
181+
testName,
182+
getHost(),
183+
getPort(),
184+
USER_NAME,
185+
PASSWORD,
186+
DATABASE_NAME,
187+
testName,
188+
true,
189+
getServerId(),
190+
4);
191+
tEnv.executeSql(sourceDDL);
192+
// async submit job
193+
TableResult result =
194+
tEnv.executeSql(String.format("SELECT table_id,table_name FROM %s", testName));
195+
196+
// test snapshot phase
197+
CloseableIterator<Row> iterator = result.collect();
198+
waitForSnapshotStarted(iterator);
199+
assertEqualsInAnyOrder(
200+
Arrays.asList(snapshotExpected), fetchRows(iterator, snapshotExpected.length));
201+
202+
// test binlog phase
203+
try (Connection connection = getJdbcConnection();
204+
Statement statement = connection.createStatement()) {
205+
statement.execute(
206+
String.format(
207+
"/*TDDL:FORBID_EXECUTE_DML_ALL=FALSE*/UPDATE %s.%s SET table_id = table_id + 10;",
208+
DATABASE_NAME, testName));
209+
}
210+
assertEqualsInAnyOrder(
211+
Arrays.asList(binlogExpected), fetchRows(iterator, binlogExpected.length));
212+
result.getJobClient().get().cancel().get();
213+
214+
// Sleep to avoid the issue: The last packet successfully received from the server was 35
215+
// milliseconds ago.
216+
Thread.sleep(1_000);
217+
}
218+
219+
private static void waitForSnapshotStarted(CloseableIterator<Row> iterator) throws Exception {
220+
while (!iterator.hasNext()) {
221+
Thread.sleep(100);
222+
}
223+
}
224+
}

0 commit comments

Comments
 (0)