Skip to content

Commit 9519b02

Browse files
author
Vladimír Dudr
committed
support specifying fe protocol
1 parent c61342f commit 9519b02

File tree

4 files changed

+20
-10
lines changed

4 files changed

+20
-10
lines changed

flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ public class RestService implements Serializable {
9191
private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
9292
private static final String FE_LOGIN = "/rest/v1/login";
9393
private static final ObjectMapper objectMapper = new ObjectMapper();
94-
private static final String TABLE_SCHEMA_API = "http://%s/api/%s/%s/_schema";
95-
private static final String QUERY_PLAN_API = "http://%s/api/%s/%s/_query_plan";
94+
private static final String TABLE_SCHEMA_API = "%s/api/%s/%s/_schema";
95+
private static final String QUERY_PLAN_API = "%s/api/%s/%s/_query_plan";
9696

9797
/**
9898
* send request to Doris FE and get response json string.
@@ -131,6 +131,7 @@ private static String send(
131131
RequestConfig.custom()
132132
.setConnectTimeout(connectTimeout)
133133
.setSocketTimeout(socketTimeout)
134+
.setRedirectsEnabled(true)
134135
.build();
135136

136137
request.setConfig(requestConfig);
@@ -310,6 +311,9 @@ public static String randomEndpoint(String feNodes, Logger logger)
310311
Collections.shuffle(nodes);
311312
for (String feNode : nodes) {
312313
String host = feNode.trim();
314+
if (!host.startsWith("http://") && !host.startsWith("https://")) {
315+
host = "http://" + host;
316+
}
313317
if (BackendUtil.tryHttpConnection(host)) {
314318
return host;
315319
}
@@ -359,7 +363,11 @@ public static List<BackendRowV2> getBackendsV2(
359363

360364
for (String feNode : feNodeList) {
361365
try {
362-
String beUrl = "http://" + feNode + BACKENDS_V2;
366+
if (!feNode.startsWith("http://") && !feNode.startsWith("https://")) {
367+
feNode = "http://" + feNode;
368+
}
369+
String beUrl = feNode + BACKENDS_V2;
370+
logger.warn("---------------- URL set: {}", beUrl);
363371
HttpGet httpGet = new HttpGet(beUrl);
364372
String response = send(options, readOptions, httpGet, logger);
365373
logger.info("Backend Info:{}", response);

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,10 @@ public String getAvailableBackend(int subtaskId) {
9898

9999
public static boolean tryHttpConnection(String host) {
100100
try {
101+
if (!host.startsWith("http://") && !host.startsWith("https://")) {
102+
host = "http://" + host;
103+
}
101104
LOG.debug("try to connect host {}", host);
102-
host = "http://" + host;
103105
URL url = new URL(host);
104106
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
105107
connection.setRequestMethod("GET");

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class SchemaChangeManager implements Serializable {
5858
private static final Logger LOG = LoggerFactory.getLogger(SchemaChangeManager.class);
5959
private static final String CHECK_SCHEMA_CHANGE_API =
6060
"http://%s/api/enable_light_schema_change/%s/%s";
61-
private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s";
61+
private static final String SCHEMA_CHANGE_API = "%s/api/query/default_cluster/%s";
6262
private ObjectMapper objectMapper = new ObjectMapper();
6363
private DorisOptions dorisOptions;
6464
private String charsetEncoding = "UTF-8";

flink-doris-connector/src/test/java/org/apache/doris/flink/rest/TestRestService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,12 @@ public void testParseIdentifierIllegal() throws Exception {
122122

123123
@Test
124124
public void testChoiceFe() throws Exception {
125-
String validFes = "1,2,3";
125+
String validFes = "1,http://2,https://3";
126126
String fe = RestService.randomEndpoint(validFes, logger);
127127
List<String> feNodes = new ArrayList<>(3);
128-
feNodes.add("1");
129-
feNodes.add("2");
130-
feNodes.add("3");
128+
feNodes.add("http://1");
129+
feNodes.add("http://2");
130+
feNodes.add("https://3");
131131
Assert.assertTrue(feNodes.contains(fe));
132132

133133
String emptyFes = "";
@@ -416,7 +416,7 @@ public void testParseBackendV2Error() throws Exception {
416416
public void testGetBackendsV2() {
417417
DorisOptions options =
418418
DorisOptions.builder()
419-
.setFenodes("127.0.0.1:1,127.0.0.1:2")
419+
.setFenodes("https://127.0.0.1:1,http://127.0.0.1:2,127.0.0.1:3")
420420
.setAutoRedirect(false)
421421
.build();
422422
DorisReadOptions readOptions = DorisReadOptions.defaults();

0 commit comments

Comments
 (0)