Skip to content

Commit 0552130

Browse files
author
Vladimír Dudr
committed
support specifying fe protocol
1 parent c61342f commit 0552130

File tree

14 files changed

+46
-30
lines changed

14 files changed

+46
-30
lines changed

flink-doris-connector/src/main/java/org/apache/doris/flink/rest/RestService.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,8 @@ public class RestService implements Serializable {
9191
private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
9292
private static final String FE_LOGIN = "/rest/v1/login";
9393
private static final ObjectMapper objectMapper = new ObjectMapper();
94-
private static final String TABLE_SCHEMA_API = "http://%s/api/%s/%s/_schema";
95-
private static final String QUERY_PLAN_API = "http://%s/api/%s/%s/_query_plan";
94+
private static final String TABLE_SCHEMA_API = "%s/api/%s/%s/_schema";
95+
private static final String QUERY_PLAN_API = "%s/api/%s/%s/_query_plan";
9696

9797
/**
9898
* send request to Doris FE and get response json string.
@@ -131,6 +131,7 @@ private static String send(
131131
RequestConfig.custom()
132132
.setConnectTimeout(connectTimeout)
133133
.setSocketTimeout(socketTimeout)
134+
.setRedirectsEnabled(true)
134135
.build();
135136

136137
request.setConfig(requestConfig);
@@ -310,6 +311,9 @@ public static String randomEndpoint(String feNodes, Logger logger)
310311
Collections.shuffle(nodes);
311312
for (String feNode : nodes) {
312313
String host = feNode.trim();
314+
if (!host.startsWith("http://") && !host.startsWith("https://")) {
315+
host = "http://" + host;
316+
}
313317
if (BackendUtil.tryHttpConnection(host)) {
314318
return host;
315319
}
@@ -359,7 +363,10 @@ public static List<BackendRowV2> getBackendsV2(
359363

360364
for (String feNode : feNodeList) {
361365
try {
362-
String beUrl = "http://" + feNode + BACKENDS_V2;
366+
if (!feNode.startsWith("http://") && !feNode.startsWith("https://")) {
367+
feNode = "http://" + feNode;
368+
}
369+
String beUrl = feNode + BACKENDS_V2;
363370
HttpGet httpGet = new HttpGet(beUrl);
364371
String response = send(options, readOptions, httpGet, logger);
365372
logger.info("Backend Info:{}", response);
@@ -387,8 +394,7 @@ public static List<BackendRowV2> getBackendsV2(
387394
private static List<BackendRowV2> convert(List<String> feNodeList) {
388395
List<BackendRowV2> nodeList = new ArrayList<>();
389396
for (String node : feNodeList) {
390-
String[] split = node.split(":");
391-
nodeList.add(BackendRowV2.of(split[0], Integer.valueOf(split[1]), true));
397+
nodeList.add(BackendRowV2.ofUrl(node, true));
392398
}
393399
return nodeList;
394400
}

flink-doris-connector/src/main/java/org/apache/doris/flink/rest/models/BackendV2.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ public String getIp() {
5252
}
5353

5454
public void setIp(String ip) {
55+
if (!ip.startsWith("http://") && !ip.startsWith("https://")) {
56+
ip = "http://" + ip;
57+
}
5558
this.ip = ip;
5659
}
5760

@@ -82,5 +85,13 @@ public static BackendRowV2 of(String ip, int httpPort, boolean alive) {
8285
rowV2.setAlive(alive);
8386
return rowV2;
8487
}
88+
89+
public static BackendRowV2 ofUrl(String url, boolean alive) {
90+
int lastColon = url.lastIndexOf(":");
91+
return BackendRowV2.of(
92+
url.substring(0, lastColon),
93+
Integer.valueOf(url.substring(lastColon + 1)),
94+
alive);
95+
}
8596
}
8697
}

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/BackendUtil.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,12 +58,7 @@ private List<BackendV2.BackendRowV2> initBackends(String beNodes) {
5858
if (tryHttpConnection(node)) {
5959
LOG.info("{} backend http connection success.", node);
6060
node = node.trim();
61-
String[] ipAndPort = node.split(":");
62-
BackendRowV2 backendRowV2 = new BackendRowV2();
63-
backendRowV2.setIp(ipAndPort[0]);
64-
backendRowV2.setHttpPort(Integer.parseInt(ipAndPort[1]));
65-
backendRowV2.setAlive(true);
66-
backends.add(backendRowV2);
61+
backends.add(BackendRowV2.ofUrl(node, true));
6762
}
6863
});
6964
return backends;
@@ -98,8 +93,10 @@ public String getAvailableBackend(int subtaskId) {
9893

9994
public static boolean tryHttpConnection(String host) {
10095
try {
96+
if (!host.startsWith("http://") && !host.startsWith("https://")) {
97+
host = "http://" + host;
98+
}
10199
LOG.debug("try to connect host {}", host);
102-
host = "http://" + host;
103100
URL url = new URL(host);
104101
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
105102
connection.setRequestMethod("GET");

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/DorisCommittable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public class DorisCommittable implements DorisAbstractCommittable {
2626
private final long txnID;
2727

2828
public DorisCommittable(String hostPort, String db, long txnID) {
29-
this.hostPort = hostPort;
29+
this.hostPort = (hostPort.startsWith("http") ? "" : "http://") + hostPort;
3030
this.db = db;
3131
this.txnID = txnID;
3232
}

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ public class DorisBatchStreamLoad implements Serializable {
8989
private static final long STREAM_LOAD_MAX_ROWS = Integer.MAX_VALUE;
9090
private final LabelGenerator labelGenerator;
9191
private final byte[] lineDelimiter;
92-
private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load";
92+
private static final String LOAD_URL_PATTERN = "%s/api/%s/%s/_stream_load";
9393
private String loadUrl;
9494
private String hostPort;
9595
private final String username;

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
/** The committer to commit transaction. */
5353
public class DorisCommitter implements Committer<DorisCommittable>, Closeable {
5454
private static final Logger LOG = LoggerFactory.getLogger(DorisCommitter.class);
55-
private static final String commitPattern = "http://%s/api/%s/_stream_load_2pc";
55+
private static final String commitPattern = "%s/api/%s/_stream_load_2pc";
5656
private final CloseableHttpClient httpClient;
5757
private final DorisOptions dorisOptions;
5858
private final DorisReadOptions dorisReadOptions;

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/BatchStageLoad.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public class BatchStageLoad implements Serializable {
6767
private static final Logger LOG = LoggerFactory.getLogger(BatchStageLoad.class);
6868
private final LabelGenerator labelGenerator;
6969
private final byte[] lineDelimiter;
70-
private static final String UPLOAD_URL_PATTERN = "http://%s/copy/upload";
70+
private static final String UPLOAD_URL_PATTERN = "%s/copy/upload";
7171
private static final String LINE_DELIMITER_KEY_WITH_PRETIX = "file.line_delimiter";
7272
private String uploadUrl;
7373
private String hostPort;
@@ -96,7 +96,9 @@ public BatchStageLoad(
9696
this.password = dorisOptions.getPassword();
9797
this.loadProps = executionOptions.getStreamLoadProp();
9898
this.labelGenerator = labelGenerator;
99-
this.hostPort = dorisOptions.getFenodes();
99+
this.hostPort =
100+
(dorisOptions.getFenodes().startsWith("http") ? "" : "http://")
101+
+ dorisOptions.getFenodes();
100102
this.uploadUrl = String.format(UPLOAD_URL_PATTERN, hostPort);
101103
this.fileNum = new AtomicInteger();
102104
this.lineDelimiter =

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommittable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public class DorisCopyCommittable implements DorisAbstractCommittable {
2626
private final String copySQL;
2727

2828
public DorisCopyCommittable(String hostPort, String copySQL) {
29-
this.hostPort = hostPort;
29+
this.hostPort = (hostPort.startsWith("http") ? "" : "http://") + hostPort;
3030
this.copySQL = copySQL;
3131
}
3232

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/copy/DorisCopyCommitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444

4545
public class DorisCopyCommitter implements Committer<DorisCopyCommittable>, Closeable {
4646
private static final Logger LOG = LoggerFactory.getLogger(DorisCopyCommitter.class);
47-
private static final String commitPattern = "http://%s/copy/query";
47+
private static final String commitPattern = "%s/copy/query";
4848
private static final int SUCCESS = 0;
4949
private static final String FAIL = "1";
5050
private ObjectMapper objectMapper = new ObjectMapper();

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/schema/SchemaChangeManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public class SchemaChangeManager implements Serializable {
5858
private static final Logger LOG = LoggerFactory.getLogger(SchemaChangeManager.class);
5959
private static final String CHECK_SCHEMA_CHANGE_API =
6060
"http://%s/api/enable_light_schema_change/%s/%s";
61-
private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s";
61+
private static final String SCHEMA_CHANGE_API = "%s/api/query/default_cluster/%s";
6262
private ObjectMapper objectMapper = new ObjectMapper();
6363
private DorisOptions dorisOptions;
6464
private String charsetEncoding = "UTF-8";

flink-doris-connector/src/main/java/org/apache/doris/flink/sink/writer/DorisStreamLoad.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ public class DorisStreamLoad implements Serializable {
7373
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
7474
private final LabelGenerator labelGenerator;
7575
private final byte[] lineDelimiter;
76-
private static final String LOAD_URL_PATTERN = "http://%s/api/%s/%s/_stream_load";
77-
private static final String ABORT_URL_PATTERN = "http://%s/api/%s/_stream_load_2pc";
76+
private static final String LOAD_URL_PATTERN = "%s/api/%s/%s/_stream_load";
77+
private static final String ABORT_URL_PATTERN = "%s/api/%s/_stream_load_2pc";
7878
public static final String JOB_EXIST_FINISHED = "FINISHED";
7979

8080
private String loadUrlStr;
@@ -102,7 +102,7 @@ public DorisStreamLoad(
102102
DorisExecutionOptions executionOptions,
103103
LabelGenerator labelGenerator,
104104
CloseableHttpClient httpClient) {
105-
this.hostPort = hostPort;
105+
this.hostPort = (hostPort.startsWith("http") ? "" : "http://") + hostPort;
106106
String[] tableInfo = dorisOptions.getTableIdentifier().split("\\.");
107107
this.db = tableInfo[0];
108108
this.table = tableInfo[1];

flink-doris-connector/src/test/java/org/apache/doris/flink/rest/TestRestService.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,12 @@ public void testParseIdentifierIllegal() throws Exception {
122122

123123
@Test
124124
public void testChoiceFe() throws Exception {
125-
String validFes = "1,2,3";
125+
String validFes = "1,http://2,https://3";
126126
String fe = RestService.randomEndpoint(validFes, logger);
127127
List<String> feNodes = new ArrayList<>(3);
128-
feNodes.add("1");
129-
feNodes.add("2");
130-
feNodes.add("3");
128+
feNodes.add("http://1");
129+
feNodes.add("http://2");
130+
feNodes.add("https://3");
131131
Assert.assertTrue(feNodes.contains(fe));
132132

133133
String emptyFes = "";
@@ -416,7 +416,7 @@ public void testParseBackendV2Error() throws Exception {
416416
public void testGetBackendsV2() {
417417
DorisOptions options =
418418
DorisOptions.builder()
419-
.setFenodes("127.0.0.1:1,127.0.0.1:2")
419+
.setFenodes("https://127.0.0.1:1,http://127.0.0.1:2,127.0.0.1:3")
420420
.setAutoRedirect(false)
421421
.build();
422422
DorisReadOptions readOptions = DorisReadOptions.defaults();

flink-doris-connector/src/test/java/org/apache/doris/flink/sink/copy/TestDorisCopyWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public void testPrepareCommit() throws Exception {
9797

9898
Assert.assertEquals(1, committableList.size());
9999
DorisCopyCommittable committable = committableList.toArray(new DorisCopyCommittable[0])[0];
100-
Assert.assertEquals("127.0.0.1:8030", committable.getHostPort());
100+
Assert.assertEquals("http://127.0.0.1:8030", committable.getHostPort());
101101

102102
Pattern copySql =
103103
Pattern.compile(

flink-doris-connector/src/test/java/org/apache/doris/flink/sink/writer/TestDorisWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public void testPrepareCommit() throws Exception {
8585
Collection<DorisCommittable> committableList = dorisWriter.prepareCommit();
8686
Assert.assertEquals(1, committableList.size());
8787
DorisCommittable dorisCommittable = committableList.stream().findFirst().get();
88-
Assert.assertEquals("local:8040", dorisCommittable.getHostPort());
88+
Assert.assertEquals("http://local:8040", dorisCommittable.getHostPort());
8989
Assert.assertEquals("db", dorisCommittable.getDb());
9090
Assert.assertEquals(2, dorisCommittable.getTxnID());
9191
Assert.assertFalse(dorisWriter.isLoading());

0 commit comments

Comments
 (0)