[fix] cloud mode backend discovery with specified compute group#655
Open
gnehil wants to merge 1 commit into
Open
[fix] cloud mode backend discovery with specified compute group#655gnehil wants to merge 1 commit into
gnehil wants to merge 1 commit into
Conversation
There was a problem hiding this comment.
Pull request overview
This PR fixes cloud-mode backend discovery so that, when a target compute group is configured via stream load properties, the connector only discovers and uses alive BE nodes belonging to that compute group (instead of using the global BE list across clusters).
Changes:
- Add compute-group aware backend discovery by passing a target compute group name into backend discovery and querying the manager backends endpoint when present.
- Centralize backend selection logic in
BackendUtil.getInstance(...)and update writer/committer/batch stream load code paths to use it. - Add/adjust unit tests for compute group selection and for parsing/filtering manager backends responses.
Reviewed changes
Copilot reviewed 10 out of 10 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/BackendUtil.java | Determines target compute group from stream load props and routes backend discovery through RestService.getBackendsV2(..., computeGroup, ...). |
| flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/rest/RestService.java | Adds manager-backends discovery path and parsing/tag filtering logic for compute-group-scoped backend selection. |
| flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/writer/DorisWriter.java | Uses the centralized compute-group aware BackendUtil.getInstance(...) during writer initialization. |
| flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/committer/DorisCommitter.java | Switches committer backend initialization to BackendUtil.getInstance(...) so it benefits from compute-group filtering. |
| flink-doris-connector/flink-doris-connector-base/src/main/java/org/apache/doris/flink/sink/batch/DorisBatchStreamLoad.java | Switches batch stream load backend initialization to BackendUtil.getInstance(...). |
| flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/TestBackendUtil.java | Adds tests for compute-group property selection and verifies the compute group is passed into backend discovery. |
| flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/rest/TestRestService.java | Adds tests for parsing/filtering manager backend responses and tag parsing behavior. |
| flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/committer/TestDorisCommitter.java | Updates mocking to align with centralized BackendUtil.getInstance(...) usage. |
| flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchWriter.java | Updates mocking to align with centralized BackendUtil.getInstance(...) usage. |
| flink-doris-connector/flink-doris-connector-base/src/test/java/org/apache/doris/flink/sink/batch/TestDorisBatchStreamLoad.java | Updates mocking to align with centralized BackendUtil.getInstance(...) usage. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
83
to
+87
| this.maxRetry = executionOptions.getMaxRetries(); | ||
| this.ignoreCommitError = executionOptions.ignoreCommitError(); | ||
| this.httpClient = client; | ||
| this.backendUtil = | ||
| StringUtils.isNotEmpty(dorisOptions.getBenodes()) | ||
| ? new BackendUtil(dorisOptions.getBenodes()) | ||
| : new BackendUtil( | ||
| RestService.getBackendsV2(dorisOptions, dorisReadOptions, LOG)); | ||
| BackendUtil.getInstance(dorisOptions, dorisReadOptions, executionOptions, LOG); |
Comment on lines
+520
to
+525
| if (rootNode.path("code").asInt() != REST_RESPONSE_CODE_OK) { | ||
| throw managerBackendsException( | ||
| computeGroupName, | ||
| rootNode.path("msg").asText() + ": " + rootNode.path("data").asText()); | ||
| } | ||
| return rootNode.path("data"); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Proposed changes
Issue Number: close #xxx
Problem Summary:
In cloud mode, when FE redirect is not used and
benodesis not configured, the connector discovers backends through the normal backend API. That API returns alive BE nodesacross all clusters / compute groups.
As a result, stream load may be sent to any BE in the global backend list instead of only the BE nodes that belong to the target compute group. This can cause stream load
requests to be routed to the wrong compute group.
This PR makes backend discovery compute-group aware:
compute_groupcloud_cluster/rest/v2/manager/node/backends.compute_group_nameorcloud_cluster_namematches the target group.benodesis explicitly configuredBackendUtiland reuse it in writer, batch stream load, and committer paths.Checklist(Required)
Further comments
If this is a relatively large or complex change, kick off the discussion at dev@doris.apache.org by explaining why you chose the solution you did and what alternatives you considered, etc...