Skip to content

Commit 3b73921

Browse files
committed
Fix cloud compute group backend discovery
1 parent 0044826 commit 3b73921

10 files changed

Lines changed: 413 additions & 14 deletions

File tree

flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/rest/RestService.java

Lines changed: 234 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,8 +91,11 @@ public class RestService implements Serializable {
9191
private static final String UNIQUE_KEYS_TYPE = "UNIQUE_KEYS";
9292
@Deprecated private static final String BACKENDS = "/rest/v1/system?path=//backends";
9393
private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
94+
private static final String MANAGER_BACKENDS = "/rest/v2/manager/node/backends";
9495
private static final String FE_LOGIN = "/rest/v1/login";
9596
private static final ObjectMapper objectMapper = new ObjectMapper();
97+
private static final String COMPUTE_GROUP_NAME = "compute_group_name";
98+
private static final String CLOUD_CLUSTER_NAME = "cloud_cluster_name";
9699
private static final String TABLE_SCHEMA_API = "http://%s/api/%s/%s/_schema";
97100
private static final String CATALOG_TABLE_SCHEMA_API = "http://%s/api/%s/%s/%s/_schema";
98101
private static final String QUERY_PLAN_API = "http://%s/api/%s/%s/_query_plan";
@@ -114,6 +117,16 @@ private static String send(
114117
HttpRequestBase request,
115118
Logger logger)
116119
throws ConnectedFailedException {
120+
return send(options, readOptions, request, logger, true);
121+
}
122+
123+
private static String send(
124+
DorisOptions options,
125+
DorisReadOptions readOptions,
126+
HttpRequestBase request,
127+
Logger logger,
128+
boolean unwrapData)
129+
throws ConnectedFailedException {
117130
int connectTimeout =
118131
readOptions.getRequestConnectTimeoutMs() == null
119132
? ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT
@@ -169,7 +182,7 @@ private static String send(
169182
// Handle the problem of inconsistent data format returned by http v1 and v2
170183
ObjectMapper mapper = new ObjectMapper();
171184
Map map = mapper.readValue(response, Map.class);
172-
if (map.containsKey("code") && map.containsKey("msg")) {
185+
if (unwrapData && map.containsKey("code") && map.containsKey("msg")) {
173186
Object data = map.get("data");
174187
return mapper.writeValueAsString(data);
175188
} else {
@@ -324,13 +337,27 @@ static List<String> allEndpoints(String feNodes, Logger logger) {
324337
@VisibleForTesting
325338
public static List<BackendRowV2> getBackendsV2(
326339
DorisOptions options, DorisReadOptions readOptions, Logger logger) {
340+
return getBackendsV2(options, readOptions, null, logger);
341+
}
342+
343+
@VisibleForTesting
344+
public static List<BackendRowV2> getBackendsV2(
345+
DorisOptions options,
346+
DorisReadOptions readOptions,
347+
String computeGroupName,
348+
Logger logger) {
327349
String feNodes = options.getFenodes();
328350
List<String> feNodeList = allEndpoints(feNodes, logger);
329351

330352
if (options.isAutoRedirect() && !feNodeList.isEmpty()) {
331353
return convert(feNodeList);
332354
}
333355

356+
if (StringUtils.isNotBlank(computeGroupName)) {
357+
return getManagerBackendsByComputeGroup(
358+
options, readOptions, feNodeList, computeGroupName, logger);
359+
}
360+
334361
for (String feNode : feNodeList) {
335362
try {
336363
String beUrl = "http://" + feNode + BACKENDS_V2;
@@ -351,6 +378,34 @@ public static List<BackendRowV2> getBackendsV2(
351378
throw new DorisRuntimeException(errMsg);
352379
}
353380

381+
private static List<BackendRowV2> getManagerBackendsByComputeGroup(
382+
DorisOptions options,
383+
DorisReadOptions readOptions,
384+
List<String> feNodeList,
385+
String computeGroupName,
386+
Logger logger) {
387+
for (String feNode : feNodeList) {
388+
try {
389+
String beUrl = "http://" + feNode + MANAGER_BACKENDS;
390+
HttpGet httpGet = new HttpGet(beUrl);
391+
String response = send(options, readOptions, httpGet, logger, false);
392+
logger.info("Manager backend info:{}", response);
393+
return parseManagerBackends(response, logger, computeGroupName);
394+
} catch (ConnectedFailedException e) {
395+
logger.info(
396+
"Doris FE node {} is unavailable: {}, Request the next Doris FE node",
397+
feNode,
398+
e.getMessage());
399+
}
400+
}
401+
String errMsg =
402+
String.format(
403+
"No Doris FE is available to request %s for compute group '%s'.",
404+
MANAGER_BACKENDS, computeGroupName);
405+
logger.error(errMsg);
406+
throw new DorisRuntimeException(errMsg);
407+
}
408+
354409
/**
355410
* When the user turns on redirection, there is no need to explicitly obtain the be list, just
356411
* treat the fe list as the be list.
@@ -389,6 +444,184 @@ static List<BackendRowV2> parseBackendV2(String response, Logger logger) {
389444
return backendRows;
390445
}
391446

447+
@VisibleForTesting
448+
static List<BackendRowV2> parseManagerBackends(
449+
String response, Logger logger, String computeGroupName) {
450+
if (StringUtils.isBlank(computeGroupName)) {
451+
throw managerBackendsException(computeGroupName, "compute group is empty");
452+
}
453+
454+
JsonNode rootNode = parseJsonResponse(response, computeGroupName, logger);
455+
JsonNode dataNode = unwrapManagerBackendData(rootNode, computeGroupName);
456+
JsonNode columnNode = dataNode.path("columnNames");
457+
if (!columnNode.isArray()) {
458+
columnNode = dataNode.path("column_names");
459+
}
460+
JsonNode rowNode = dataNode.path("rows");
461+
if (!columnNode.isArray() || !rowNode.isArray()) {
462+
throw managerBackendsException(
463+
computeGroupName,
464+
"response does not contain columnNames/column_names and rows");
465+
}
466+
467+
Map<String, Integer> columnIndexes = getColumnIndexes(columnNode, computeGroupName);
468+
int hostIndex = requireColumn(columnIndexes, "Host", computeGroupName);
469+
int httpPortIndex = requireColumn(columnIndexes, "HttpPort", computeGroupName);
470+
int aliveIndex = requireColumn(columnIndexes, "Alive", computeGroupName);
471+
int tagIndex = requireColumn(columnIndexes, "Tag", computeGroupName);
472+
473+
List<BackendRowV2> backends = new ArrayList<>();
474+
for (JsonNode row : rowNode) {
475+
if (!row.isArray()) {
476+
throw managerBackendsException(computeGroupName, "backend row is not an array");
477+
}
478+
if (!Boolean.parseBoolean(getManagerBackendCell(row, aliveIndex))) {
479+
continue;
480+
}
481+
String tag = getManagerBackendCell(row, tagIndex);
482+
String rowComputeGroupName = getComputeGroupNameFromTag(tag);
483+
if (!computeGroupName.equals(rowComputeGroupName)) {
484+
continue;
485+
}
486+
487+
String host = getManagerBackendCell(row, hostIndex);
488+
String httpPort = getManagerBackendCell(row, httpPortIndex);
489+
try {
490+
BackendRowV2 backend = BackendRowV2.of(host, Integer.parseInt(httpPort), true);
491+
backends.add(backend);
492+
} catch (NumberFormatException e) {
493+
throw managerBackendsException(
494+
computeGroupName, "backend HttpPort is invalid: " + httpPort);
495+
}
496+
}
497+
498+
if (backends.isEmpty()) {
499+
throw managerBackendsException(
500+
computeGroupName,
501+
"no alive backend found. If the target is a virtual compute group, configure its physical active compute group");
502+
}
503+
logger.debug("Parsing manager backend result is '{}'.", backends);
504+
return backends;
505+
}
506+
507+
private static JsonNode parseJsonResponse(
508+
String response, String computeGroupName, Logger logger) {
509+
try {
510+
return objectMapper.readTree(response);
511+
} catch (IOException e) {
512+
String errMsg = "Parse Doris manager backend response to json failed. res: " + response;
513+
logger.error(errMsg, e);
514+
throw managerBackendsException(computeGroupName, errMsg);
515+
}
516+
}
517+
518+
private static JsonNode unwrapManagerBackendData(JsonNode rootNode, String computeGroupName) {
519+
if (rootNode.has("code") && rootNode.has("msg")) {
520+
if (rootNode.path("code").asInt() != REST_RESPONSE_CODE_OK) {
521+
throw managerBackendsException(
522+
computeGroupName,
523+
rootNode.path("msg").asText() + ": " + rootNode.path("data").asText());
524+
}
525+
return rootNode.path("data");
526+
}
527+
return rootNode;
528+
}
529+
530+
private static Map<String, Integer> getColumnIndexes(
531+
JsonNode columnNode, String computeGroupName) {
532+
Map<String, Integer> columnIndexes = new HashMap<>();
533+
for (int i = 0; i < columnNode.size(); i++) {
534+
String columnName = columnNode.get(i).asText();
535+
if (StringUtils.isNotBlank(columnName)) {
536+
columnIndexes.put(columnName.toLowerCase(), i);
537+
}
538+
}
539+
if (columnIndexes.isEmpty()) {
540+
throw managerBackendsException(computeGroupName, "backend columns are empty");
541+
}
542+
return columnIndexes;
543+
}
544+
545+
private static int requireColumn(
546+
Map<String, Integer> columnIndexes, String columnName, String computeGroupName) {
547+
Integer index = columnIndexes.get(columnName.toLowerCase());
548+
if (index == null) {
549+
throw managerBackendsException(
550+
computeGroupName, "backend response missing required column " + columnName);
551+
}
552+
return index;
553+
}
554+
555+
private static String getManagerBackendCell(JsonNode row, int index) {
556+
JsonNode cell = row.get(index);
557+
if (cell == null || cell.isNull()) {
558+
return "";
559+
}
560+
return cell.asText();
561+
}
562+
563+
@VisibleForTesting
564+
static String getComputeGroupNameFromTag(String tag) {
565+
Map<String, String> tagMap = parseBackendTag(tag);
566+
String computeGroupName = tagMap.get(COMPUTE_GROUP_NAME);
567+
if (StringUtils.isNotBlank(computeGroupName)) {
568+
return computeGroupName;
569+
}
570+
return tagMap.get(CLOUD_CLUSTER_NAME);
571+
}
572+
573+
private static Map<String, String> parseBackendTag(String tag) {
574+
Map<String, String> tagMap = new HashMap<>();
575+
if (StringUtils.isBlank(tag)) {
576+
return tagMap;
577+
}
578+
579+
try {
580+
JsonNode tagNode = objectMapper.readTree(tag);
581+
if (tagNode.isObject()) {
582+
tagNode.fields()
583+
.forEachRemaining(
584+
entry -> tagMap.put(entry.getKey(), entry.getValue().asText()));
585+
return tagMap;
586+
}
587+
} catch (IOException e) {
588+
// Fall through to parse Doris PrintableMap style tag strings.
589+
}
590+
591+
String tagContent = tag.trim();
592+
if (tagContent.startsWith("{") && tagContent.endsWith("}")) {
593+
tagContent = tagContent.substring(1, tagContent.length() - 1);
594+
}
595+
for (String entry : tagContent.split(",")) {
596+
String[] keyValue = entry.split(":", 2);
597+
if (keyValue.length != 2) {
598+
continue;
599+
}
600+
tagMap.put(stripQuote(keyValue[0]), stripQuote(keyValue[1]));
601+
}
602+
return tagMap;
603+
}
604+
605+
private static String stripQuote(String value) {
606+
String result = value.trim();
607+
if (result.length() >= 2) {
608+
char first = result.charAt(0);
609+
char last = result.charAt(result.length() - 1);
610+
if ((first == '"' && last == '"') || (first == '\'' && last == '\'')) {
611+
return result.substring(1, result.length() - 1);
612+
}
613+
}
614+
return result;
615+
}
616+
617+
private static DorisRuntimeException managerBackendsException(
618+
String computeGroupName, String reason) {
619+
return new DorisRuntimeException(
620+
String.format(
621+
"Failed to get backends for compute group '%s' from %s: %s. Required privileges: information_schema SELECT on Doris 3.x/4.x, or ADMIN on Doris 2.1.",
622+
computeGroupName, MANAGER_BACKENDS, reason));
623+
}
624+
392625
/**
393626
* discover Doris table schema from Doris FE.
394627
*

flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/BackendUtil.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.apache.flink.annotation.VisibleForTesting;
2121

2222
import org.apache.commons.lang3.StringUtils;
23+
import org.apache.doris.flink.cfg.DorisExecutionOptions;
2324
import org.apache.doris.flink.cfg.DorisOptions;
2425
import org.apache.doris.flink.cfg.DorisReadOptions;
2526
import org.apache.doris.flink.exception.DorisRuntimeException;
@@ -34,9 +35,12 @@
3435
import java.util.ArrayList;
3536
import java.util.Arrays;
3637
import java.util.List;
38+
import java.util.Properties;
3739

3840
public class BackendUtil {
3941
private static final Logger LOG = LoggerFactory.getLogger(BackendUtil.class);
42+
private static final String CLOUD_CLUSTER = "cloud_cluster";
43+
private static final String COMPUTE_GROUP = "compute_group";
4044
private final List<BackendV2.BackendRowV2> backends;
4145
private long pos;
4246

@@ -71,11 +75,38 @@ private List<BackendV2.BackendRowV2> initBackends(String beNodes) {
7175

7276
public static BackendUtil getInstance(
7377
DorisOptions dorisOptions, DorisReadOptions readOptions, Logger logger) {
78+
return getInstance(dorisOptions, readOptions, null, logger);
79+
}
80+
81+
public static BackendUtil getInstance(
82+
DorisOptions dorisOptions,
83+
DorisReadOptions readOptions,
84+
DorisExecutionOptions executionOptions,
85+
Logger logger) {
7486
if (StringUtils.isNotEmpty(dorisOptions.getBenodes())) {
7587
return new BackendUtil(dorisOptions.getBenodes());
7688
} else {
77-
return new BackendUtil(RestService.getBackendsV2(dorisOptions, readOptions, logger));
89+
Properties loadProps =
90+
executionOptions == null ? null : executionOptions.getStreamLoadProp();
91+
return new BackendUtil(
92+
RestService.getBackendsV2(
93+
dorisOptions,
94+
readOptions,
95+
getLoadTargetComputeGroup(loadProps),
96+
logger));
97+
}
98+
}
99+
100+
@VisibleForTesting
101+
static String getLoadTargetComputeGroup(Properties loadProps) {
102+
if (loadProps == null) {
103+
return null;
104+
}
105+
String computeGroup = loadProps.getProperty(COMPUTE_GROUP);
106+
if (StringUtils.isNotBlank(computeGroup)) {
107+
return computeGroup;
78108
}
109+
return loadProps.getProperty(CLOUD_CLUSTER);
79110
}
80111

81112
public String getAvailableBackend() {

flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.doris.flink.cfg.DorisReadOptions;
2828
import org.apache.doris.flink.exception.DorisBatchLoadException;
2929
import org.apache.doris.flink.exception.DorisRuntimeException;
30-
import org.apache.doris.flink.rest.RestService;
3130
import org.apache.doris.flink.rest.models.RespContent;
3231
import org.apache.doris.flink.sink.BackendUtil;
3332
import org.apache.doris.flink.sink.EscapeHandler;
@@ -122,10 +121,7 @@ public DorisBatchStreamLoad(
122121
LabelGenerator labelGenerator,
123122
int subTaskId) {
124123
this.backendUtil =
125-
StringUtils.isNotEmpty(dorisOptions.getBenodes())
126-
? new BackendUtil(dorisOptions.getBenodes())
127-
: new BackendUtil(
128-
RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
124+
BackendUtil.getInstance(dorisOptions, dorisReadOptions, executionOptions, LOG);
129125
this.hostPort = backendUtil.getAvailableBackend();
130126
this.username = dorisOptions.getUsername();
131127
this.password = dorisOptions.getPassword();

flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,10 @@
2222

2323
import com.fasterxml.jackson.core.type.TypeReference;
2424
import com.fasterxml.jackson.databind.ObjectMapper;
25-
import org.apache.commons.lang3.StringUtils;
2625
import org.apache.doris.flink.cfg.DorisExecutionOptions;
2726
import org.apache.doris.flink.cfg.DorisOptions;
2827
import org.apache.doris.flink.cfg.DorisReadOptions;
2928
import org.apache.doris.flink.exception.DorisRuntimeException;
30-
import org.apache.doris.flink.rest.RestService;
3129
import org.apache.doris.flink.sink.BackendUtil;
3230
import org.apache.doris.flink.sink.DorisCommittable;
3331
import org.apache.doris.flink.sink.HttpPutBuilder;
@@ -86,10 +84,7 @@ public DorisCommitter(
8684
this.ignoreCommitError = executionOptions.ignoreCommitError();
8785
this.httpClient = client;
8886
this.backendUtil =
89-
StringUtils.isNotEmpty(dorisOptions.getBenodes())
90-
? new BackendUtil(dorisOptions.getBenodes())
91-
: new BackendUtil(
92-
RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
87+
BackendUtil.getInstance(dorisOptions, dorisReadOptions, executionOptions, LOG);
9388
}
9489

9590
@Override

0 commit comments

Comments
 (0)