Skip to content

Commit c61342f

Browse files
authored
[Improve](case) add customer doris container cluster (#491)
1 parent c4ae051 commit c61342f

File tree

6 files changed

+196
-7
lines changed

6 files changed

+196
-7
lines changed

flink-doris-connector/src/test/java/org/apache/doris/flink/container/AbstractContainerTestBase.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.doris.flink.container.instance.ContainerService;
2121
import org.apache.doris.flink.container.instance.DorisContainer;
22+
import org.apache.doris.flink.container.instance.DorisCustomerContainer;
2223
import org.junit.BeforeClass;
2324
import org.slf4j.Logger;
2425
import org.slf4j.LoggerFactory;
@@ -48,7 +49,8 @@ private static void initDorisContainer() {
4849
LOG.info("The doris container has been started and is running status.");
4950
return;
5051
}
51-
dorisContainerService = new DorisContainer();
52+
Boolean customerEnv = Boolean.valueOf(System.getProperty("customer_env", "false"));
53+
dorisContainerService = customerEnv ? new DorisCustomerContainer() : new DorisContainer();
5254
dorisContainerService.startContainer();
5355
LOG.info("Doris container was started.");
5456
}
@@ -74,9 +76,7 @@ protected String getDorisPassword() {
7476
}
7577

7678
protected String getDorisQueryUrl() {
77-
return String.format(
78-
"jdbc:mysql://%s:%s",
79-
getDorisInstanceHost(), dorisContainerService.getMappedPort(9030));
79+
return dorisContainerService.getJdbcUrl();
8080
}
8181

8282
protected String getDorisInstanceHost() {

flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/ContainerService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ public interface ContainerService {
2929

3030
Connection getQueryConnection();
3131

32+
String getJdbcUrl();
33+
3234
String getInstanceHost();
3335

3436
Integer getMappedPort(int originalPort);

flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/DorisContainer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,11 @@ public Connection getQueryConnection() {
115115
}
116116
}
117117

118+
@Override
119+
public String getJdbcUrl() {
120+
return String.format(JDBC_URL, dorisContainer.getHost());
121+
}
122+
118123
@Override
119124
public String getInstanceHost() {
120125
return dorisContainer.getHost();
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.apache.doris.flink.container.instance;
19+
20+
import org.apache.flink.util.Preconditions;
21+
22+
import org.apache.doris.flink.exception.DorisRuntimeException;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import java.sql.Connection;
27+
import java.sql.DriverManager;
28+
import java.sql.ResultSet;
29+
import java.sql.SQLException;
30+
import java.sql.Statement;
31+
32+
/** Using a custom Doris environment */
33+
public class DorisCustomerContainer implements ContainerService {
34+
private static final Logger LOG = LoggerFactory.getLogger(DorisCustomerContainer.class);
35+
private static final String JDBC_URL = "jdbc:mysql://%s:%s";
36+
37+
@Override
38+
public void startContainer() {
39+
LOG.info("Using doris customer containers env.");
40+
checkParams();
41+
if (!isRunning()) {
42+
throw new DorisRuntimeException(
43+
"Backend is not alive. Please check the doris cluster.");
44+
}
45+
}
46+
47+
private void checkParams() {
48+
Preconditions.checkArgument(
49+
System.getProperty("doris_host") != null, "doris_host is required.");
50+
Preconditions.checkArgument(
51+
System.getProperty("doris_query_port") != null, "doris_query_port is required.");
52+
Preconditions.checkArgument(
53+
System.getProperty("doris_http_port") != null, "doris_http_port is required.");
54+
Preconditions.checkArgument(
55+
System.getProperty("doris_user") != null, "doris_user is required.");
56+
Preconditions.checkArgument(
57+
System.getProperty("doris_passwd") != null, "doris_passwd is required.");
58+
}
59+
60+
@Override
61+
public boolean isRunning() {
62+
try (Connection conn = getQueryConnection();
63+
Statement stmt = conn.createStatement()) {
64+
ResultSet showBackends = stmt.executeQuery("show backends");
65+
while (showBackends.next()) {
66+
String isAlive = showBackends.getString("Alive").trim();
67+
if (Boolean.toString(true).equalsIgnoreCase(isAlive)) {
68+
return true;
69+
}
70+
}
71+
} catch (SQLException e) {
72+
LOG.error("Failed to connect doris cluster.", e);
73+
return false;
74+
}
75+
return false;
76+
}
77+
78+
@Override
79+
public Connection getQueryConnection() {
80+
LOG.info("Try to get query connection from doris.");
81+
String jdbcUrl =
82+
String.format(
83+
JDBC_URL,
84+
System.getProperty("doris_host"),
85+
System.getProperty("doris_query_port"));
86+
try {
87+
return DriverManager.getConnection(jdbcUrl, getUsername(), getPassword());
88+
} catch (SQLException e) {
89+
LOG.info("Failed to get doris query connection. jdbcUrl={}", jdbcUrl, e);
90+
throw new DorisRuntimeException(e);
91+
}
92+
}
93+
94+
@Override
95+
public String getJdbcUrl() {
96+
return String.format(
97+
JDBC_URL, System.getProperty("doris_host"), System.getProperty("doris_query_port"));
98+
}
99+
100+
@Override
101+
public String getInstanceHost() {
102+
return System.getProperty("doris_host");
103+
}
104+
105+
@Override
106+
public Integer getMappedPort(int originalPort) {
107+
return originalPort;
108+
}
109+
110+
@Override
111+
public String getUsername() {
112+
return System.getProperty("doris_user");
113+
}
114+
115+
@Override
116+
public String getPassword() {
117+
return System.getProperty("doris_passwd");
118+
}
119+
120+
@Override
121+
public String getFenodes() {
122+
return System.getProperty("doris_host") + ":" + System.getProperty("doris_http_port");
123+
}
124+
125+
@Override
126+
public String getBenodes() {
127+
return null;
128+
}
129+
130+
@Override
131+
public void close() {}
132+
}

flink-doris-connector/src/test/java/org/apache/doris/flink/container/instance/MySQLContainer.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,11 @@ public Connection getQueryConnection() {
9292
}
9393
}
9494

95+
@Override
96+
public String getJdbcUrl() {
97+
return mysqlcontainer.getJdbcUrl();
98+
}
99+
95100
@Override
96101
public void close() {
97102
LOG.info("Stopping MySQL container.");

flink-doris-connector/src/test/java/org/apache/doris/flink/sink/DorisSinkITCase.java

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
2828
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
2929
import org.apache.flink.test.util.MiniClusterWithClientResource;
30+
import org.apache.flink.util.StringUtils;
3031

3132
import com.fasterxml.jackson.databind.ObjectMapper;
3233
import org.apache.doris.flink.cfg.DorisExecutionOptions;
@@ -63,6 +64,7 @@ public class DorisSinkITCase extends AbstractITCaseService {
6364
static final String TABLE_CSV = "tbl_csv";
6465
static final String TABLE_JSON = "tbl_json";
6566
static final String TABLE_JSON_TBL = "tbl_json_tbl";
67+
static final String TABLE_TBL_AUTO_REDIRECT = "tbl_tbl_auto_redirect";
6668
static final String TABLE_CSV_BATCH_TBL = "tbl_csv_batch_tbl";
6769
static final String TABLE_CSV_BATCH_DS = "tbl_csv_batch_DS";
6870
static final String TABLE_GROUP_COMMIT = "tbl_group_commit";
@@ -177,8 +179,6 @@ public void testTableSinkJsonFormat() throws Exception {
177179
+ DorisConfigOptions.IDENTIFIER
178180
+ "',"
179181
+ " 'fenodes' = '%s',"
180-
+ " 'benodes' = '%s',"
181-
+ " 'auto-redirect' = 'false',"
182182
+ " 'table.identifier' = '%s',"
183183
+ " 'username' = '%s',"
184184
+ " 'password' = '%s',"
@@ -196,7 +196,6 @@ public void testTableSinkJsonFormat() throws Exception {
196196
+ "'"
197197
+ ")",
198198
getFenodes(),
199-
getBenodes(),
200199
DATABASE + "." + TABLE_JSON_TBL,
201200
getDorisUsername(),
202201
getDorisPassword());
@@ -210,6 +209,52 @@ public void testTableSinkJsonFormat() throws Exception {
210209
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
211210
}
212211

212+
@Test
213+
public void testTableSinkAutoRedirectFalse() throws Exception {
214+
if (StringUtils.isNullOrWhitespaceOnly(getBenodes())) {
215+
LOG.info("benodes is empty, skip the test.");
216+
return;
217+
}
218+
initializeTable(TABLE_TBL_AUTO_REDIRECT);
219+
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
220+
env.setParallelism(DEFAULT_PARALLELISM);
221+
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
222+
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
223+
224+
String sinkDDL =
225+
String.format(
226+
"CREATE TABLE doris_sink ("
227+
+ " name STRING,"
228+
+ " age INT"
229+
+ ") WITH ("
230+
+ " 'connector' = '"
231+
+ DorisConfigOptions.IDENTIFIER
232+
+ "',"
233+
+ " 'fenodes' = '%s',"
234+
+ " 'benodes' = '%s',"
235+
+ " 'auto-redirect' = 'false',"
236+
+ " 'table.identifier' = '%s',"
237+
+ " 'username' = '%s',"
238+
+ " 'password' = '%s',"
239+
+ " 'sink.label-prefix' = 'doris_sink"
240+
+ UUID.randomUUID()
241+
+ "'"
242+
+ ")",
243+
getFenodes(),
244+
getBenodes(),
245+
DATABASE + "." + TABLE_TBL_AUTO_REDIRECT,
246+
getDorisUsername(),
247+
getDorisPassword());
248+
tEnv.executeSql(sinkDDL);
249+
tEnv.executeSql("INSERT INTO doris_sink SELECT 'doris',1 union all SELECT 'flink',2");
250+
251+
Thread.sleep(10000);
252+
List<String> expected = Arrays.asList("doris,1", "flink,2");
253+
String query =
254+
String.format("select name,age from %s.%s order by 1", DATABASE, TABLE_JSON_TBL);
255+
ContainerUtils.checkResult(getDorisQueryConnection(), LOG, expected, query, 2);
256+
}
257+
213258
@Test
214259
public void testTableBatch() throws Exception {
215260
initializeTable(TABLE_CSV_BATCH_TBL);

0 commit comments

Comments
 (0)