Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,11 @@ public class RestService implements Serializable {
private static final String UNIQUE_KEYS_TYPE = "UNIQUE_KEYS";
@Deprecated private static final String BACKENDS = "/rest/v1/system?path=//backends";
private static final String BACKENDS_V2 = "/api/backends?is_alive=true";
private static final String MANAGER_BACKENDS = "/rest/v2/manager/node/backends";
private static final String FE_LOGIN = "/rest/v1/login";
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final String COMPUTE_GROUP_NAME = "compute_group_name";
private static final String CLOUD_CLUSTER_NAME = "cloud_cluster_name";
private static final String TABLE_SCHEMA_API = "http://%s/api/%s/%s/_schema";
private static final String CATALOG_TABLE_SCHEMA_API = "http://%s/api/%s/%s/%s/_schema";
private static final String QUERY_PLAN_API = "http://%s/api/%s/%s/_query_plan";
Expand All @@ -114,6 +117,16 @@ private static String send(
HttpRequestBase request,
Logger logger)
throws ConnectedFailedException {
return send(options, readOptions, request, logger, true);
}

private static String send(
DorisOptions options,
DorisReadOptions readOptions,
HttpRequestBase request,
Logger logger,
boolean unwrapData)
throws ConnectedFailedException {
int connectTimeout =
readOptions.getRequestConnectTimeoutMs() == null
? ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT
Expand Down Expand Up @@ -169,7 +182,7 @@ private static String send(
// Handle the problem of inconsistent data format returned by http v1 and v2
ObjectMapper mapper = new ObjectMapper();
Map map = mapper.readValue(response, Map.class);
if (map.containsKey("code") && map.containsKey("msg")) {
if (unwrapData && map.containsKey("code") && map.containsKey("msg")) {
Object data = map.get("data");
return mapper.writeValueAsString(data);
} else {
Expand Down Expand Up @@ -324,13 +337,27 @@ static List<String> allEndpoints(String feNodes, Logger logger) {
@VisibleForTesting
public static List<BackendRowV2> getBackendsV2(
DorisOptions options, DorisReadOptions readOptions, Logger logger) {
return getBackendsV2(options, readOptions, null, logger);
}

@VisibleForTesting
public static List<BackendRowV2> getBackendsV2(
DorisOptions options,
DorisReadOptions readOptions,
String computeGroupName,
Logger logger) {
String feNodes = options.getFenodes();
List<String> feNodeList = allEndpoints(feNodes, logger);

if (options.isAutoRedirect() && !feNodeList.isEmpty()) {
return convert(feNodeList);
}

if (StringUtils.isNotBlank(computeGroupName)) {
return getManagerBackendsByComputeGroup(
options, readOptions, feNodeList, computeGroupName, logger);
}

for (String feNode : feNodeList) {
try {
String beUrl = "http://" + feNode + BACKENDS_V2;
Expand All @@ -351,6 +378,34 @@ public static List<BackendRowV2> getBackendsV2(
throw new DorisRuntimeException(errMsg);
}

private static List<BackendRowV2> getManagerBackendsByComputeGroup(
DorisOptions options,
DorisReadOptions readOptions,
List<String> feNodeList,
String computeGroupName,
Logger logger) {
for (String feNode : feNodeList) {
try {
String beUrl = "http://" + feNode + MANAGER_BACKENDS;
HttpGet httpGet = new HttpGet(beUrl);
String response = send(options, readOptions, httpGet, logger, false);
logger.info("Manager backend info:{}", response);
return parseManagerBackends(response, logger, computeGroupName);
} catch (ConnectedFailedException e) {
logger.info(
"Doris FE node {} is unavailable: {}, Request the next Doris FE node",
feNode,
e.getMessage());
}
}
String errMsg =
String.format(
"No Doris FE is available to request %s for compute group '%s'.",
MANAGER_BACKENDS, computeGroupName);
logger.error(errMsg);
throw new DorisRuntimeException(errMsg);
}

/**
* When the user turns on redirection, there is no need to explicitly obtain the be list, just
* treat the fe list as the be list.
Expand Down Expand Up @@ -389,6 +444,184 @@ static List<BackendRowV2> parseBackendV2(String response, Logger logger) {
return backendRows;
}

@VisibleForTesting
static List<BackendRowV2> parseManagerBackends(
String response, Logger logger, String computeGroupName) {
if (StringUtils.isBlank(computeGroupName)) {
throw managerBackendsException(computeGroupName, "compute group is empty");
}

JsonNode rootNode = parseJsonResponse(response, computeGroupName, logger);
JsonNode dataNode = unwrapManagerBackendData(rootNode, computeGroupName);
JsonNode columnNode = dataNode.path("columnNames");
if (!columnNode.isArray()) {
columnNode = dataNode.path("column_names");
}
JsonNode rowNode = dataNode.path("rows");
if (!columnNode.isArray() || !rowNode.isArray()) {
throw managerBackendsException(
computeGroupName,
"response does not contain columnNames/column_names and rows");
}

Map<String, Integer> columnIndexes = getColumnIndexes(columnNode, computeGroupName);
int hostIndex = requireColumn(columnIndexes, "Host", computeGroupName);
int httpPortIndex = requireColumn(columnIndexes, "HttpPort", computeGroupName);
int aliveIndex = requireColumn(columnIndexes, "Alive", computeGroupName);
int tagIndex = requireColumn(columnIndexes, "Tag", computeGroupName);

List<BackendRowV2> backends = new ArrayList<>();
for (JsonNode row : rowNode) {
if (!row.isArray()) {
throw managerBackendsException(computeGroupName, "backend row is not an array");
}
if (!Boolean.parseBoolean(getManagerBackendCell(row, aliveIndex))) {
continue;
}
String tag = getManagerBackendCell(row, tagIndex);
String rowComputeGroupName = getComputeGroupNameFromTag(tag);
if (!computeGroupName.equals(rowComputeGroupName)) {
continue;
}

String host = getManagerBackendCell(row, hostIndex);
String httpPort = getManagerBackendCell(row, httpPortIndex);
try {
BackendRowV2 backend = BackendRowV2.of(host, Integer.parseInt(httpPort), true);
backends.add(backend);
} catch (NumberFormatException e) {
throw managerBackendsException(
computeGroupName, "backend HttpPort is invalid: " + httpPort);
}
}

if (backends.isEmpty()) {
throw managerBackendsException(
computeGroupName,
"no alive backend found. If the target is a virtual compute group, configure its physical active compute group");
}
logger.debug("Parsing manager backend result is '{}'.", backends);
return backends;
}

private static JsonNode parseJsonResponse(
String response, String computeGroupName, Logger logger) {
try {
return objectMapper.readTree(response);
} catch (IOException e) {
String errMsg = "Parse Doris manager backend response to json failed. res: " + response;
logger.error(errMsg, e);
throw managerBackendsException(computeGroupName, errMsg);
}
}

private static JsonNode unwrapManagerBackendData(JsonNode rootNode, String computeGroupName) {
if (rootNode.has("code") && rootNode.has("msg")) {
if (rootNode.path("code").asInt() != REST_RESPONSE_CODE_OK) {
throw managerBackendsException(
computeGroupName,
rootNode.path("msg").asText() + ": " + rootNode.path("data").asText());
}
return rootNode.path("data");
Comment on lines +520 to +525
}
return rootNode;
}

private static Map<String, Integer> getColumnIndexes(
JsonNode columnNode, String computeGroupName) {
Map<String, Integer> columnIndexes = new HashMap<>();
for (int i = 0; i < columnNode.size(); i++) {
String columnName = columnNode.get(i).asText();
if (StringUtils.isNotBlank(columnName)) {
columnIndexes.put(columnName.toLowerCase(), i);
}
}
if (columnIndexes.isEmpty()) {
throw managerBackendsException(computeGroupName, "backend columns are empty");
}
return columnIndexes;
}

private static int requireColumn(
Map<String, Integer> columnIndexes, String columnName, String computeGroupName) {
Integer index = columnIndexes.get(columnName.toLowerCase());
if (index == null) {
throw managerBackendsException(
computeGroupName, "backend response missing required column " + columnName);
}
return index;
}

private static String getManagerBackendCell(JsonNode row, int index) {
JsonNode cell = row.get(index);
if (cell == null || cell.isNull()) {
return "";
}
return cell.asText();
}

@VisibleForTesting
static String getComputeGroupNameFromTag(String tag) {
Map<String, String> tagMap = parseBackendTag(tag);
String computeGroupName = tagMap.get(COMPUTE_GROUP_NAME);
if (StringUtils.isNotBlank(computeGroupName)) {
return computeGroupName;
}
return tagMap.get(CLOUD_CLUSTER_NAME);
}

private static Map<String, String> parseBackendTag(String tag) {
Map<String, String> tagMap = new HashMap<>();
if (StringUtils.isBlank(tag)) {
return tagMap;
}

try {
JsonNode tagNode = objectMapper.readTree(tag);
if (tagNode.isObject()) {
tagNode.fields()
.forEachRemaining(
entry -> tagMap.put(entry.getKey(), entry.getValue().asText()));
return tagMap;
}
} catch (IOException e) {
// Fall through to parse Doris PrintableMap style tag strings.
}

String tagContent = tag.trim();
if (tagContent.startsWith("{") && tagContent.endsWith("}")) {
tagContent = tagContent.substring(1, tagContent.length() - 1);
}
for (String entry : tagContent.split(",")) {
String[] keyValue = entry.split(":", 2);
if (keyValue.length != 2) {
continue;
}
tagMap.put(stripQuote(keyValue[0]), stripQuote(keyValue[1]));
}
return tagMap;
}

private static String stripQuote(String value) {
String result = value.trim();
if (result.length() >= 2) {
char first = result.charAt(0);
char last = result.charAt(result.length() - 1);
if ((first == '"' && last == '"') || (first == '\'' && last == '\'')) {
return result.substring(1, result.length() - 1);
}
}
return result;
}

private static DorisRuntimeException managerBackendsException(
String computeGroupName, String reason) {
return new DorisRuntimeException(
String.format(
"Failed to get backends for compute group '%s' from %s: %s. Required privileges: information_schema SELECT on Doris 3.x/4.x, or ADMIN on Doris 2.1.",
computeGroupName, MANAGER_BACKENDS, reason));
}

/**
* discover Doris table schema from Doris FE.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.flink.annotation.VisibleForTesting;

import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisRuntimeException;
Expand All @@ -34,9 +35,12 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

public class BackendUtil {
private static final Logger LOG = LoggerFactory.getLogger(BackendUtil.class);
private static final String CLOUD_CLUSTER = "cloud_cluster";
private static final String COMPUTE_GROUP = "compute_group";
private final List<BackendV2.BackendRowV2> backends;
private long pos;

Expand Down Expand Up @@ -71,11 +75,38 @@ private List<BackendV2.BackendRowV2> initBackends(String beNodes) {

public static BackendUtil getInstance(
DorisOptions dorisOptions, DorisReadOptions readOptions, Logger logger) {
return getInstance(dorisOptions, readOptions, null, logger);
}

public static BackendUtil getInstance(
DorisOptions dorisOptions,
DorisReadOptions readOptions,
DorisExecutionOptions executionOptions,
Logger logger) {
if (StringUtils.isNotEmpty(dorisOptions.getBenodes())) {
return new BackendUtil(dorisOptions.getBenodes());
} else {
return new BackendUtil(RestService.getBackendsV2(dorisOptions, readOptions, logger));
Properties loadProps =
executionOptions == null ? null : executionOptions.getStreamLoadProp();
return new BackendUtil(
RestService.getBackendsV2(
dorisOptions,
readOptions,
getLoadTargetComputeGroup(loadProps),
logger));
}
}

@VisibleForTesting
static String getLoadTargetComputeGroup(Properties loadProps) {
if (loadProps == null) {
return null;
}
String computeGroup = loadProps.getProperty(COMPUTE_GROUP);
if (StringUtils.isNotBlank(computeGroup)) {
return computeGroup;
}
return loadProps.getProperty(CLOUD_CLUSTER);
}

public String getAvailableBackend() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisBatchLoadException;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.rest.models.RespContent;
import org.apache.doris.flink.sink.BackendUtil;
import org.apache.doris.flink.sink.EscapeHandler;
Expand Down Expand Up @@ -122,10 +121,7 @@ public DorisBatchStreamLoad(
LabelGenerator labelGenerator,
int subTaskId) {
this.backendUtil =
StringUtils.isNotEmpty(dorisOptions.getBenodes())
? new BackendUtil(dorisOptions.getBenodes())
: new BackendUtil(
RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
BackendUtil.getInstance(dorisOptions, dorisReadOptions, executionOptions, LOG);
this.hostPort = backendUtil.getAvailableBackend();
this.username = dorisOptions.getUsername();
this.password = dorisOptions.getPassword();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.apache.doris.flink.cfg.DorisExecutionOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.cfg.DorisReadOptions;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.flink.rest.RestService;
import org.apache.doris.flink.sink.BackendUtil;
import org.apache.doris.flink.sink.DorisCommittable;
import org.apache.doris.flink.sink.HttpPutBuilder;
Expand Down Expand Up @@ -86,10 +84,7 @@ public DorisCommitter(
this.ignoreCommitError = executionOptions.ignoreCommitError();
this.httpClient = client;
this.backendUtil =
StringUtils.isNotEmpty(dorisOptions.getBenodes())
? new BackendUtil(dorisOptions.getBenodes())
: new BackendUtil(
RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG));
BackendUtil.getInstance(dorisOptions, dorisReadOptions, executionOptions, LOG);
}

@Override
Expand Down
Loading
Loading