-
Notifications
You must be signed in to change notification settings - Fork 2.1k
[Improve][Connector-V2][Starrocks-Source]Add be_host_port_mapping configuration
#10238
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
[Improve][Connector-V2][Starrocks-Source]Add be_host_port_mapping configuration
#10238
Conversation
…rrock, an exception prompt 'Unknown HostException' appears
|
Can you add this param in StarRocks E2E? |
After careful consideration, it is not convenient to construct this scenario using e2e. Currently, I have only encountered it in the k8s scenario. If tested through the k8s scenario, I think it would be more responsible. My configuration is optional and will not affect the functionality. What do you think? @davidzollo @zhangshenghang |
davidzollo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
LGTM
By the way, it's better to add UT if E2E is too difficult for this scenario.
| private static Map<String, Pair<String, String>> formatBeHostPortMapping( | ||
| SourceConfig sourceConfig) { | ||
| return sourceConfig.getBeHostPortMapping().stream() | ||
| .collect( | ||
| Collectors.toMap( | ||
| mapping -> { | ||
| // host:be_port | ||
| String[] hostInfo = mapping.getHost_port().split(":"); | ||
| return hostInfo[0]; | ||
| }, | ||
| mapping -> { | ||
| // accessible ip and be_port | ||
| String[] accessIpInfo = mapping.getIp_port().split(":"); | ||
| return Pair.of(accessIpInfo[0], accessIpInfo[1]); | ||
| })); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is necessary to handle various abnormal configuration situations, such as missing colons, empty strings, null, etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| this.port = Integer.parseInt(hostPort[1].trim()); | ||
|
|
||
| // If the user has configured beHostPortMapping, we need to parse it | ||
| Map<String, Pair<String, String>> beHostPortMapping = formatBeHostPortMapping(sourceConfig); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| Map<String, Pair<String, String>> beHostPortMapping = formatBeHostPortMapping(sourceConfig); | |
| Map<String, Pair<String, Integer>> beHostPortMapping = formatBeHostPortMapping(sourceConfig); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| scan.params.scanner_thread_pool_thread_num = "3" | ||
| be_host_port_mapping = [ | ||
| { | ||
| host_port = "pingt-7f5cf4cfdc-cn-0.headless.olap:9060:9060" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| host_port = "pingt-7f5cf4cfdc-cn-0.headless.olap:9060:9060" | |
| host_port = "be_host_1:9060" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
|
||
| `StarRocks`集群`BE`的host:be_port与能够访问的ip:be_port映射关系。 | ||
| 该配置可选的,主要是解决计算集群不能够直接访问`BE`的`host`以及`be_port`的场景,如`StarRocks`部署在k8s中,但是flink不能直接访问`BE`的`host`以及`be_port`,利用此配置,`flink`可以能够访问`BE`以及`be_port`。 | ||
| 例如 `[{"pingt-7f5cf4cfdc-cn-0.headless.olap:9060"="xx.xx.xx.xx:31088"}]`。 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Standardize the sample code a bit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| @OptionMark(description = "The be host and be_port") | ||
| private String host_port; | ||
|
|
||
| @OptionMark(description = "The accessible ip and be_port") | ||
| private String ip_port; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use camel case naming
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
| if (beHostPortMapping.containsKey(hostPort[0].trim())) { | ||
| Pair<String, String> accessIpPort = beHostPortMapping.get(hostPort[0].trim()); | ||
| this.ip = accessIpPort.getKey(); | ||
| this.port = Integer.parseInt(accessIpPort.getValue()); | ||
| } else { | ||
| this.ip = hostPort[0].trim(); | ||
| this.port = Integer.parseInt(hostPort[1].trim()); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add some logs to prompt the user
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
…rrock, an exception prompt 'Unknown HostException' appears
|
I found that the |

Purpose of this pull request
Issue is #10123 . When obtaining data from Starrock, an exception prompt 'Unknown HostException' appears
Does this PR introduce any user-facing change?
How was this patch tested?
Check list
New License Guide
incompatible-changes.mdto describe the incompatibility caused by this PR.