From 3b73921d635b077c9eb634cfe2268738a6bb2439 Mon Sep 17 00:00:00 2001 From: gnehil Date: Thu, 30 Apr 2026 22:12:02 +0800 Subject: [PATCH] Fix cloud compute group backend discovery --- .../apache/doris/flink/rest/RestService.java | 235 +++++++++++++++++- .../apache/doris/flink/sink/BackendUtil.java | 33 ++- .../sink/batch/DorisBatchStreamLoad.java | 6 +- .../flink/sink/committer/DorisCommitter.java | 7 +- .../doris/flink/sink/writer/DorisWriter.java | 3 +- .../doris/flink/rest/TestRestService.java | 73 ++++++ .../doris/flink/sink/TestBackendUtil.java | 47 ++++ .../sink/batch/TestDorisBatchStreamLoad.java | 8 + .../sink/batch/TestDorisBatchWriter.java | 9 + .../sink/committer/TestDorisCommitter.java | 6 + 10 files changed, 413 insertions(+), 14 deletions(-) diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/rest/RestService.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/rest/RestService.java index f9aad4ffd..9513daec8 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/rest/RestService.java +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/rest/RestService.java @@ -91,8 +91,11 @@ public class RestService implements Serializable { private static final String UNIQUE_KEYS_TYPE = "UNIQUE_KEYS"; @Deprecated private static final String BACKENDS = "/rest/v1/system?path=//backends"; private static final String BACKENDS_V2 = "/api/backends?is_alive=true"; + private static final String MANAGER_BACKENDS = "/rest/v2/manager/node/backends"; private static final String FE_LOGIN = "/rest/v1/login"; private static final ObjectMapper objectMapper = new ObjectMapper(); + private static final String COMPUTE_GROUP_NAME = "compute_group_name"; + private static final String CLOUD_CLUSTER_NAME = "cloud_cluster_name"; private static final String TABLE_SCHEMA_API = "http://%s/api/%s/%s/_schema"; private static final String CATALOG_TABLE_SCHEMA_API = "http://%s/api/%s/%s/%s/_schema"; private static final String QUERY_PLAN_API = "http://%s/api/%s/%s/_query_plan"; @@ -114,6 +117,16 @@ private static String send( HttpRequestBase request, Logger logger) throws ConnectedFailedException { + return send(options, readOptions, request, logger, true); + } + + private static String send( + DorisOptions options, + DorisReadOptions readOptions, + HttpRequestBase request, + Logger logger, + boolean unwrapData) + throws ConnectedFailedException { int connectTimeout = readOptions.getRequestConnectTimeoutMs() == null ? ConfigurationOptions.DORIS_REQUEST_CONNECT_TIMEOUT_MS_DEFAULT @@ -169,7 +182,7 @@ private static String send( // Handle the problem of inconsistent data format returned by http v1 and v2 ObjectMapper mapper = new ObjectMapper(); Map map = mapper.readValue(response, Map.class); - if (map.containsKey("code") && map.containsKey("msg")) { + if (unwrapData && map.containsKey("code") && map.containsKey("msg")) { Object data = map.get("data"); return mapper.writeValueAsString(data); } else { @@ -324,6 +337,15 @@ static List allEndpoints(String feNodes, Logger logger) { @VisibleForTesting public static List getBackendsV2( DorisOptions options, DorisReadOptions readOptions, Logger logger) { + return getBackendsV2(options, readOptions, null, logger); + } + + @VisibleForTesting + public static List getBackendsV2( + DorisOptions options, + DorisReadOptions readOptions, + String computeGroupName, + Logger logger) { String feNodes = options.getFenodes(); List feNodeList = allEndpoints(feNodes, logger); @@ -331,6 +353,11 @@ public static List getBackendsV2( return convert(feNodeList); } + if (StringUtils.isNotBlank(computeGroupName)) { + return getManagerBackendsByComputeGroup( + options, readOptions, feNodeList, computeGroupName, logger); + } + for (String feNode : feNodeList) { try { String beUrl = "http://" + feNode + BACKENDS_V2; @@ -351,6 +378,34 @@ public static List getBackendsV2( throw new DorisRuntimeException(errMsg); } + private static List getManagerBackendsByComputeGroup( + DorisOptions options, + DorisReadOptions readOptions, + List feNodeList, + String computeGroupName, + Logger logger) { + for (String feNode : feNodeList) { + try { + String beUrl = "http://" + feNode + MANAGER_BACKENDS; + HttpGet httpGet = new HttpGet(beUrl); + String response = send(options, readOptions, httpGet, logger, false); + logger.info("Manager backend info:{}", response); + return parseManagerBackends(response, logger, computeGroupName); + } catch (ConnectedFailedException e) { + logger.info( + "Doris FE node {} is unavailable: {}, Request the next Doris FE node", + feNode, + e.getMessage()); + } + } + String errMsg = + String.format( + "No Doris FE is available to request %s for compute group '%s'.", + MANAGER_BACKENDS, computeGroupName); + logger.error(errMsg); + throw new DorisRuntimeException(errMsg); + } + /** * When the user turns on redirection, there is no need to explicitly obtain the be list, just * treat the fe list as the be list. @@ -389,6 +444,184 @@ static List parseBackendV2(String response, Logger logger) { return backendRows; } + @VisibleForTesting + static List parseManagerBackends( + String response, Logger logger, String computeGroupName) { + if (StringUtils.isBlank(computeGroupName)) { + throw managerBackendsException(computeGroupName, "compute group is empty"); + } + + JsonNode rootNode = parseJsonResponse(response, computeGroupName, logger); + JsonNode dataNode = unwrapManagerBackendData(rootNode, computeGroupName); + JsonNode columnNode = dataNode.path("columnNames"); + if (!columnNode.isArray()) { + columnNode = dataNode.path("column_names"); + } + JsonNode rowNode = dataNode.path("rows"); + if (!columnNode.isArray() || !rowNode.isArray()) { + throw managerBackendsException( + computeGroupName, + "response does not contain columnNames/column_names and rows"); + } + + Map columnIndexes = getColumnIndexes(columnNode, computeGroupName); + int hostIndex = requireColumn(columnIndexes, "Host", computeGroupName); + int httpPortIndex = requireColumn(columnIndexes, "HttpPort", computeGroupName); + int aliveIndex = requireColumn(columnIndexes, "Alive", computeGroupName); + int tagIndex = requireColumn(columnIndexes, "Tag", computeGroupName); + + List backends = new ArrayList<>(); + for (JsonNode row : rowNode) { + if (!row.isArray()) { + throw managerBackendsException(computeGroupName, "backend row is not an array"); + } + if (!Boolean.parseBoolean(getManagerBackendCell(row, aliveIndex))) { + continue; + } + String tag = getManagerBackendCell(row, tagIndex); + String rowComputeGroupName = getComputeGroupNameFromTag(tag); + if (!computeGroupName.equals(rowComputeGroupName)) { + continue; + } + + String host = getManagerBackendCell(row, hostIndex); + String httpPort = getManagerBackendCell(row, httpPortIndex); + try { + BackendRowV2 backend = BackendRowV2.of(host, Integer.parseInt(httpPort), true); + backends.add(backend); + } catch (NumberFormatException e) { + throw managerBackendsException( + computeGroupName, "backend HttpPort is invalid: " + httpPort); + } + } + + if (backends.isEmpty()) { + throw managerBackendsException( + computeGroupName, + "no alive backend found. If the target is a virtual compute group, configure its physical active compute group"); + } + logger.debug("Parsing manager backend result is '{}'.", backends); + return backends; + } + + private static JsonNode parseJsonResponse( + String response, String computeGroupName, Logger logger) { + try { + return objectMapper.readTree(response); + } catch (IOException e) { + String errMsg = "Parse Doris manager backend response to json failed. res: " + response; + logger.error(errMsg, e); + throw managerBackendsException(computeGroupName, errMsg); + } + } + + private static JsonNode unwrapManagerBackendData(JsonNode rootNode, String computeGroupName) { + if (rootNode.has("code") && rootNode.has("msg")) { + if (rootNode.path("code").asInt() != REST_RESPONSE_CODE_OK) { + throw managerBackendsException( + computeGroupName, + rootNode.path("msg").asText() + ": " + rootNode.path("data").asText()); + } + return rootNode.path("data"); + } + return rootNode; + } + + private static Map getColumnIndexes( + JsonNode columnNode, String computeGroupName) { + Map columnIndexes = new HashMap<>(); + for (int i = 0; i < columnNode.size(); i++) { + String columnName = columnNode.get(i).asText(); + if (StringUtils.isNotBlank(columnName)) { + columnIndexes.put(columnName.toLowerCase(), i); + } + } + if (columnIndexes.isEmpty()) { + throw managerBackendsException(computeGroupName, "backend columns are empty"); + } + return columnIndexes; + } + + private static int requireColumn( + Map columnIndexes, String columnName, String computeGroupName) { + Integer index = columnIndexes.get(columnName.toLowerCase()); + if (index == null) { + throw managerBackendsException( + computeGroupName, "backend response missing required column " + columnName); + } + return index; + } + + private static String getManagerBackendCell(JsonNode row, int index) { + JsonNode cell = row.get(index); + if (cell == null || cell.isNull()) { + return ""; + } + return cell.asText(); + } + + @VisibleForTesting + static String getComputeGroupNameFromTag(String tag) { + Map tagMap = parseBackendTag(tag); + String computeGroupName = tagMap.get(COMPUTE_GROUP_NAME); + if (StringUtils.isNotBlank(computeGroupName)) { + return computeGroupName; + } + return tagMap.get(CLOUD_CLUSTER_NAME); + } + + private static Map parseBackendTag(String tag) { + Map tagMap = new HashMap<>(); + if (StringUtils.isBlank(tag)) { + return tagMap; + } + + try { + JsonNode tagNode = objectMapper.readTree(tag); + if (tagNode.isObject()) { + tagNode.fields() + .forEachRemaining( + entry -> tagMap.put(entry.getKey(), entry.getValue().asText())); + return tagMap; + } + } catch (IOException e) { + // Fall through to parse Doris PrintableMap style tag strings. + } + + String tagContent = tag.trim(); + if (tagContent.startsWith("{") && tagContent.endsWith("}")) { + tagContent = tagContent.substring(1, tagContent.length() - 1); + } + for (String entry : tagContent.split(",")) { + String[] keyValue = entry.split(":", 2); + if (keyValue.length != 2) { + continue; + } + tagMap.put(stripQuote(keyValue[0]), stripQuote(keyValue[1])); + } + return tagMap; + } + + private static String stripQuote(String value) { + String result = value.trim(); + if (result.length() >= 2) { + char first = result.charAt(0); + char last = result.charAt(result.length() - 1); + if ((first == '"' && last == '"') || (first == '\'' && last == '\'')) { + return result.substring(1, result.length() - 1); + } + } + return result; + } + + private static DorisRuntimeException managerBackendsException( + String computeGroupName, String reason) { + return new DorisRuntimeException( + String.format( + "Failed to get backends for compute group '%s' from %s: %s. Required privileges: information_schema SELECT on Doris 3.x/4.x, or ADMIN on Doris 2.1.", + computeGroupName, MANAGER_BACKENDS, reason)); + } + /** * discover Doris table schema from Doris FE. * diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/BackendUtil.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/BackendUtil.java index 26771c9d9..d838a6730 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/BackendUtil.java +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/BackendUtil.java @@ -20,6 +20,7 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.commons.lang3.StringUtils; +import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.DorisRuntimeException; @@ -34,9 +35,12 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Properties; public class BackendUtil { private static final Logger LOG = LoggerFactory.getLogger(BackendUtil.class); + private static final String CLOUD_CLUSTER = "cloud_cluster"; + private static final String COMPUTE_GROUP = "compute_group"; private final List backends; private long pos; @@ -71,11 +75,38 @@ private List initBackends(String beNodes) { public static BackendUtil getInstance( DorisOptions dorisOptions, DorisReadOptions readOptions, Logger logger) { + return getInstance(dorisOptions, readOptions, null, logger); + } + + public static BackendUtil getInstance( + DorisOptions dorisOptions, + DorisReadOptions readOptions, + DorisExecutionOptions executionOptions, + Logger logger) { if (StringUtils.isNotEmpty(dorisOptions.getBenodes())) { return new BackendUtil(dorisOptions.getBenodes()); } else { - return new BackendUtil(RestService.getBackendsV2(dorisOptions, readOptions, logger)); + Properties loadProps = + executionOptions == null ? null : executionOptions.getStreamLoadProp(); + return new BackendUtil( + RestService.getBackendsV2( + dorisOptions, + readOptions, + getLoadTargetComputeGroup(loadProps), + logger)); + } + } + + @VisibleForTesting + static String getLoadTargetComputeGroup(Properties loadProps) { + if (loadProps == null) { + return null; + } + String computeGroup = loadProps.getProperty(COMPUTE_GROUP); + if (StringUtils.isNotBlank(computeGroup)) { + return computeGroup; } + return loadProps.getProperty(CLOUD_CLUSTER); } public String getAvailableBackend() { diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java index fc21bb979..cc82eae3d 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java @@ -27,7 +27,6 @@ import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.DorisBatchLoadException; import org.apache.doris.flink.exception.DorisRuntimeException; -import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.rest.models.RespContent; import org.apache.doris.flink.sink.BackendUtil; import org.apache.doris.flink.sink.EscapeHandler; @@ -122,10 +121,7 @@ public DorisBatchStreamLoad( LabelGenerator labelGenerator, int subTaskId) { this.backendUtil = - StringUtils.isNotEmpty(dorisOptions.getBenodes()) - ? new BackendUtil(dorisOptions.getBenodes()) - : new BackendUtil( - RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG)); + BackendUtil.getInstance(dorisOptions, dorisReadOptions, executionOptions, LOG); this.hostPort = backendUtil.getAvailableBackend(); this.username = dorisOptions.getUsername(); this.password = dorisOptions.getPassword(); diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java index 2593e3032..75097580e 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java @@ -22,12 +22,10 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.commons.lang3.StringUtils; import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.DorisRuntimeException; -import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.sink.BackendUtil; import org.apache.doris.flink.sink.DorisCommittable; import org.apache.doris.flink.sink.HttpPutBuilder; @@ -86,10 +84,7 @@ public DorisCommitter( this.ignoreCommitError = executionOptions.ignoreCommitError(); this.httpClient = client; this.backendUtil = - StringUtils.isNotEmpty(dorisOptions.getBenodes()) - ? new BackendUtil(dorisOptions.getBenodes()) - : new BackendUtil( - RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG)); + BackendUtil.getInstance(dorisOptions, dorisReadOptions, executionOptions, LOG); } @Override diff --git a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java index 4435b360a..bef385923 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java +++ b/flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java @@ -118,7 +118,8 @@ public DorisWriter( } public void initializeLoad(Collection state) { - this.backendUtil = BackendUtil.getInstance(dorisOptions, dorisReadOptions, LOG); + this.backendUtil = + BackendUtil.getInstance(dorisOptions, dorisReadOptions, executionOptions, LOG); try { if (executionOptions.enabled2PC()) { abortLingeringTransactions(state); diff --git a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/rest/TestRestService.java b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/rest/TestRestService.java index 6fe176cc2..ceaf9715d 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/rest/TestRestService.java +++ b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/rest/TestRestService.java @@ -410,6 +410,79 @@ public void testParseBackendV2() throws Exception { RestService.parseBackendV2(response, logger); } + @Test + public void testParseManagerBackendsFilterComputeGroup() { + String response = + "{\"columnNames\":[\"BackendId\",\"Host\",\"HttpPort\",\"Alive\",\"Tag\"]," + + "\"rows\":[" + + "[\"1\",\"192.168.1.1\",\"8042\",\"true\",\"{\\\"compute_group_name\\\":\\\"cluster_a\\\"}\"]," + + "[\"2\",\"192.168.1.2\",\"8042\",\"true\",\"{\\\"compute_group_name\\\":\\\"cluster_b\\\"}\"]," + + "[\"3\",\"192.168.1.3\",\"8042\",\"false\",\"{\\\"compute_group_name\\\":\\\"cluster_a\\\"}\"]" + + "]}"; + + List backendRows = + RestService.parseManagerBackends(response, logger, "cluster_a"); + + Assert.assertEquals(1, backendRows.size()); + Assert.assertEquals("192.168.1.1:8042", backendRows.get(0).toBackendString()); + } + + @Test + public void testParseManagerBackendsColumnNamesAndCloudCluster() { + String response = + "{\"code\":0,\"msg\":\"success\",\"data\":{" + + "\"column_names\":[\"BackendId\",\"Host\",\"HttpPort\",\"Alive\",\"Tag\"]," + + "\"rows\":[" + + "[\"1\",\"192.168.1.1\",\"8042\",\"true\",\"{\\\"cloud_cluster_name\\\":\\\"cluster_a\\\"}\"]" + + "]}}"; + + List backendRows = + RestService.parseManagerBackends(response, logger, "cluster_a"); + + Assert.assertEquals(1, backendRows.size()); + Assert.assertEquals("192.168.1.1:8042", backendRows.get(0).toBackendString()); + } + + @Test + public void testParseManagerBackendsPrintableTag() { + String tag = "{compute_group_name:cluster_a, location:default}"; + Assert.assertEquals("cluster_a", RestService.getComputeGroupNameFromTag(tag)); + } + + @Test + public void testParseManagerBackendsNoTargetBackend() { + String response = + "{\"columnNames\":[\"BackendId\",\"Host\",\"HttpPort\",\"Alive\",\"Tag\"]," + + "\"rows\":[" + + "[\"1\",\"192.168.1.1\",\"8042\",\"true\",\"{\\\"compute_group_name\\\":\\\"cluster_a\\\"}\"]," + + "[\"2\",\"192.168.1.2\",\"8042\",\"false\",\"{\\\"compute_group_name\\\":\\\"cluster_b\\\"}\"]" + + "]}"; + + thrown.expect(DorisRuntimeException.class); + thrown.expectMessage("no alive backend found"); + RestService.parseManagerBackends(response, logger, "cluster_b"); + } + + @Test + public void testParseManagerBackendsMissingColumn() { + String response = + "{\"columnNames\":[\"BackendId\",\"Host\",\"HttpPort\",\"Alive\"]," + + "\"rows\":[[\"1\",\"192.168.1.1\",\"8042\",\"true\"]]}"; + + thrown.expect(DorisRuntimeException.class); + thrown.expectMessage("missing required column Tag"); + RestService.parseManagerBackends(response, logger, "cluster_a"); + } + + @Test + public void testParseManagerBackendsErrorResponse() { + String response = "{\"code\":1,\"msg\":\"Error\",\"data\":\"Access denied\"}"; + + thrown.expect(DorisRuntimeException.class); + thrown.expectMessage("Error: Access denied"); + RestService.parseManagerBackends(response, logger, "cluster_a"); + } + @Test public void testParseBackendV2Error() { String response = diff --git a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java index 43527d6ad..dfb30f56c 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java +++ b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java @@ -17,7 +17,11 @@ package org.apache.doris.flink.sink; +import org.apache.doris.flink.cfg.DorisExecutionOptions; +import org.apache.doris.flink.cfg.DorisOptions; +import org.apache.doris.flink.cfg.DorisReadOptions; import org.apache.doris.flink.exception.DorisRuntimeException; +import org.apache.doris.flink.rest.RestService; import org.apache.doris.flink.rest.models.BackendV2; import org.junit.After; import org.junit.Assert; @@ -27,8 +31,10 @@ import java.util.Arrays; import java.util.List; +import java.util.Properties; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mockStatic; public class TestBackendUtil { @@ -38,6 +44,9 @@ public class TestBackendUtil { @Before public void setUp() throws Exception { backendUtilMockedStatic = mockStatic(BackendUtil.class); + backendUtilMockedStatic + .when(() -> BackendUtil.getLoadTargetComputeGroup(any())) + .thenCallRealMethod(); } @Test @@ -86,6 +95,44 @@ public void testInitBackends() { Assert.assertTrue(backendUtil.getBackends().isEmpty()); } + @Test + public void testGetLoadTargetComputeGroup() { + Properties loadProps = new Properties(); + loadProps.setProperty("cloud_cluster", "cluster_a"); + Assert.assertEquals("cluster_a", BackendUtil.getLoadTargetComputeGroup(loadProps)); + + loadProps.setProperty("compute_group", "cluster_b"); + Assert.assertEquals("cluster_b", BackendUtil.getLoadTargetComputeGroup(loadProps)); + } + + @Test + public void testGetInstanceUsesLoadTargetComputeGroup() { + backendUtilMockedStatic + .when(() -> BackendUtil.getInstance(any(), any(), any(), any())) + .thenCallRealMethod(); + + DorisOptions dorisOptions = DorisOptions.builder().setFenodes("127.0.0.1:8030").build(); + DorisReadOptions readOptions = DorisReadOptions.defaults(); + Properties loadProps = new Properties(); + loadProps.setProperty("cloud_cluster", "cluster_a"); + loadProps.setProperty("compute_group", "cluster_b"); + DorisExecutionOptions executionOptions = + DorisExecutionOptions.builder().setStreamLoadProp(loadProps).build(); + + try (MockedStatic restServiceMockedStatic = mockStatic(RestService.class)) { + restServiceMockedStatic + .when(() -> RestService.getBackendsV2(any(), any(), any(), any())) + .thenReturn(Arrays.asList(newBackend("127.0.0.1", 8040))); + + BackendUtil backendUtil = + BackendUtil.getInstance(dorisOptions, readOptions, executionOptions, null); + + Assert.assertEquals(1, backendUtil.getBackends().size()); + restServiceMockedStatic.verify( + () -> RestService.getBackendsV2(any(), any(), eq("cluster_b"), any())); + } + } + private BackendV2.BackendRowV2 newBackend(String host, int port) { BackendV2.BackendRowV2 backend = new BackendV2.BackendRowV2(); backend.setIp(host); diff --git a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java index f45e69aca..c3d58c88f 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java +++ b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java @@ -22,6 +22,7 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.rest.models.BackendV2; import org.apache.doris.flink.sink.BackendUtil; import org.apache.doris.flink.sink.HttpTestUtil; import org.apache.doris.flink.sink.TestUtil; @@ -47,6 +48,7 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Properties; @@ -69,6 +71,12 @@ public class TestDorisBatchStreamLoad { @Before public void setUp() throws Exception { backendUtilMockedStatic = mockStatic(BackendUtil.class); + backendUtilMockedStatic + .when(() -> BackendUtil.getInstance(any(), any(), any(), any())) + .thenReturn( + new BackendUtil( + Collections.singletonList( + BackendV2.BackendRowV2.of("127.0.0.1", 8040, true)))); backendUtilMockedStatic.when(() -> BackendUtil.tryHttpConnection(any())).thenReturn(true); } diff --git a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchWriter.java b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchWriter.java index 411c2c7f5..ae135fbdc 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchWriter.java +++ b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchWriter.java @@ -20,6 +20,7 @@ import org.apache.doris.flink.cfg.DorisExecutionOptions; import org.apache.doris.flink.cfg.DorisOptions; import org.apache.doris.flink.cfg.DorisReadOptions; +import org.apache.doris.flink.rest.models.BackendV2; import org.apache.doris.flink.sink.BackendUtil; import org.apache.doris.flink.sink.writer.serializer.DorisRecord; import org.apache.doris.flink.sink.writer.serializer.SimpleStringSerializer; @@ -30,6 +31,8 @@ import org.junit.rules.ExpectedException; import org.mockito.MockedStatic; +import java.util.Collections; + import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mockStatic; @@ -42,6 +45,12 @@ public class TestDorisBatchWriter { @Before public void setUp() throws Exception { backendUtilMockedStatic = mockStatic(BackendUtil.class); + backendUtilMockedStatic + .when(() -> BackendUtil.getInstance(any(), any(), any(), any())) + .thenReturn( + new BackendUtil( + Collections.singletonList( + BackendV2.BackendRowV2.of("127.0.0.1", 8040, true)))); backendUtilMockedStatic.when(() -> BackendUtil.tryHttpConnection(any())).thenReturn(true); } diff --git a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java index e14a04606..6fd1188c0 100644 --- a/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java +++ b/flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java @@ -80,6 +80,12 @@ public void setUp() throws Exception { .thenReturn( Collections.singletonList( BackendV2.BackendRowV2.of("127.0.0.1", 8040, true))); + backendUtilMockedStatic + .when(() -> BackendUtil.getInstance(any(), any(), any(), any())) + .thenReturn( + new BackendUtil( + Collections.singletonList( + BackendV2.BackendRowV2.of("127.0.0.1", 8040, true)))); backendUtilMockedStatic.when(() -> BackendUtil.tryHttpConnection(any())).thenReturn(true); dorisCommitter =