Skip to content

Commit 3e86102

Browse files
[Feature][Connector-V2]Support Doris Fe Node HA (#8311)
1 parent d58fce1 commit 3e86102

File tree

3 files changed

+55
-91
lines changed

3 files changed

+55
-91
lines changed

Diff for: seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java

+27-21
Original file line numberDiff line numberDiff line change
@@ -240,30 +240,15 @@ static String[] parseIdentifier(String tableIdentifier, Logger logger)
240240
}
241241

242242
@VisibleForTesting
243-
public static String randomEndpoint(String feNodes, Logger logger)
244-
throws DorisConnectorException {
245-
logger.trace("Parse fenodes '{}'.", feNodes);
246-
if (StringUtils.isEmpty(feNodes)) {
247-
String errMsg =
248-
String.format(ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE, "fenodes", feNodes);
249-
throw new DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg);
250-
}
251-
List<String> nodes = Arrays.asList(feNodes.split(","));
252-
Collections.shuffle(nodes);
253-
return nodes.get(0).trim();
254-
}
255-
256-
@VisibleForTesting
257-
static String getUriStr(
258-
DorisSourceConfig dorisSourceConfig, DorisSourceTable dorisSourceTable, Logger logger)
243+
static String getUriStr(String node, DorisSourceTable dorisSourceTable, Logger logger)
259244
throws DorisConnectorException {
260245
String tableIdentifier =
261246
dorisSourceTable.getTablePath().getDatabaseName()
262247
+ "."
263248
+ dorisSourceTable.getTablePath().getTableName();
264249
String[] identifier = parseIdentifier(tableIdentifier, logger);
265250
return "http://"
266-
+ randomEndpoint(dorisSourceConfig.getFrontends(), logger)
251+
+ node.trim()
267252
+ API_PREFIX
268253
+ "/"
269254
+ identifier[0]
@@ -298,16 +283,37 @@ public static List<PartitionDefinition> findPartitions(
298283
}
299284
logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql);
300285

301-
HttpPost httpPost =
302-
new HttpPost(getUriStr(dorisSourceConfig, dorisSourceTable, logger) + QUERY_PLAN);
303286
String entity = "{\"sql\": \"" + sql + "\"}";
304287
logger.debug("Post body Sending to Doris FE is: '{}'.", entity);
305288
StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8);
306289
stringEntity.setContentEncoding("UTF-8");
307290
stringEntity.setContentType("application/json");
308-
httpPost.setEntity(stringEntity);
309291

310-
String resStr = send(dorisSourceConfig, httpPost, logger);
292+
List<String> feNodes = Arrays.asList(dorisSourceConfig.getFrontends().split(","));
293+
Collections.shuffle(feNodes);
294+
int feNodesNum = feNodes.size();
295+
String resStr = null;
296+
297+
for (int i = 0; i < feNodesNum; i++) {
298+
try {
299+
HttpPost httpPost =
300+
new HttpPost(
301+
getUriStr(feNodes.get(i), dorisSourceTable, logger) + QUERY_PLAN);
302+
httpPost.setEntity(stringEntity);
303+
resStr = send(dorisSourceConfig, httpPost, logger);
304+
break;
305+
} catch (Exception e) {
306+
if (i == feNodesNum - 1) {
307+
throw new DorisConnectorException(
308+
DorisConnectorErrorCode.REST_SERVICE_FAILED, e);
309+
}
310+
log.error(
311+
"Find partition error for feNode: {} with exception: {}",
312+
feNodes.get(i),
313+
e.getMessage());
314+
}
315+
}
316+
311317
logger.debug("Find partition response is '{}'.", resStr);
312318
QueryPlan queryPlan = getQueryPlan(resStr, logger);
313319
Map<String, List<Long>> be2Tablets = selectBeForTablet(queryPlan, logger);

Diff for: seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java

+28-14
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.seatunnel.connectors.doris.config.DorisSinkConfig;
2828
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode;
2929
import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException;
30-
import org.apache.seatunnel.connectors.doris.rest.RestService;
3130
import org.apache.seatunnel.connectors.doris.rest.models.RespContent;
3231
import org.apache.seatunnel.connectors.doris.serialize.DorisSerializer;
3332
import org.apache.seatunnel.connectors.doris.serialize.SeaTunnelRowSerializer;
@@ -98,21 +97,36 @@ public DorisSinkWriter(
9897
}
9998

10099
private void initializeLoad() {
101-
String backend = RestService.randomEndpoint(dorisSinkConfig.getFrontends(), log);
102-
try {
103-
this.dorisStreamLoad =
104-
new DorisStreamLoad(
105-
backend,
106-
catalogTable.getTablePath(),
107-
dorisSinkConfig,
108-
labelGenerator,
109-
new HttpUtil().getHttpClient());
110-
if (dorisSinkConfig.getEnable2PC()) {
111-
dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 1);
100+
101+
List<String> feNodes = Arrays.asList(dorisSinkConfig.getFrontends().split(","));
102+
Collections.shuffle(feNodes);
103+
int feNodesNum = feNodes.size();
104+
105+
for (int i = 0; i < feNodesNum; i++) {
106+
try {
107+
this.dorisStreamLoad =
108+
new DorisStreamLoad(
109+
feNodes.get(i),
110+
catalogTable.getTablePath(),
111+
dorisSinkConfig,
112+
labelGenerator,
113+
new HttpUtil().getHttpClient());
114+
if (dorisSinkConfig.getEnable2PC()) {
115+
dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 1);
116+
}
117+
break;
118+
} catch (Exception e) {
119+
if (i == feNodesNum - 1) {
120+
throw new DorisConnectorException(
121+
DorisConnectorErrorCode.STREAM_LOAD_FAILED, e);
122+
}
123+
log.error(
124+
"stream load error for feNode: {} with exception: {}",
125+
feNodes.get(i),
126+
e.getMessage());
112127
}
113-
} catch (Exception e) {
114-
throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, e);
115128
}
129+
116130
startLoad(labelGenerator.generateLabel(lastCheckpointId + 1));
117131
// when uploading data in streaming mode, we need to regularly detect whether there are
118132
// exceptions.

Diff for: seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/rest/models/RestServiceTest.java

-56
This file was deleted.

0 commit comments

Comments
 (0)