-
Notifications
You must be signed in to change notification settings - Fork 130
Feature/svc port forward #436
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
新增Service端口转发功能,允许用户通过Service选择器自动关联Pod并进行端口转发。 - 在Service操作列添加端口转发按钮,支持启动、停止和查看转发状态 - 重构Pod端口转发逻辑,提取可复用的StartPortForwardByPod等方法 - 新增svc_forward.go控制器,处理Service端口转发相关路由和业务逻辑
修复在并发场景下停止端口转发时可能出现的通道关闭竞态问题。将通道操作移出互斥锁保护范围,避免在持有锁时进行可能阻塞的通道操作,并使用安全的通道关闭函数防止重复关闭导致的 panic。
📝 WalkthroughSummary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings. Walkthrough在集群路由中新增服务端口转发路由;重构 Pod 端口转发以抽离可复用的启动/停止/状态接口;新增服务级端口转发控制器及其路由、内存会话表和端点;UI 增加服务列表的端口转发抽屉与相关操作按钮和表单。 Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant API as API Server
participant SvcCtrl as Service Controller
participant PodCtrl as Pod Controller
participant K8s as Kubernetes API
participant Forwarder as port-forward goroutine
Client->>API: POST /k8s/service/port_forward/.../start (svc, svcPort, localPort)
API->>SvcCtrl: StartPortForward()
SvcCtrl->>K8s: Get Service
SvcCtrl->>K8s: List Pods by selector
SvcCtrl->>SvcCtrl: getServicePortInfo() / resolvePodPortAndContainer()
SvcCtrl->>PodCtrl: StartPortForwardByPod(ctx, pod, container, podPort, localPort)
PodCtrl->>PodCtrl: allocate localPort (if empty) & update portForwardTable
PodCtrl->>Forwarder: launch port-forward goroutine
Forwarder->>K8s: port-forward stream
PodCtrl-->>SvcCtrl: return localPort/status
SvcCtrl-->>API: respond {localPort, status}
API-->>Client: 200 OK
sequenceDiagram
participant Client
participant API as API Server
participant SvcCtrl as Service Controller
participant PodCtrl as Pod Controller
Client->>API: POST /k8s/service/port_forward/.../stop (svc, svcPort)
API->>SvcCtrl: StopPortForward()
SvcCtrl->>SvcCtrl: lookup svcPortForwardTable -> pod info
SvcCtrl->>PodCtrl: StopPortForwardByPod(pod, container, podPort)
PodCtrl->>PodCtrl: retrieve StopCh, clear entry, set status
PodCtrl->>PodCtrl: closeStopChSafely(StopCh)
PodCtrl-->>SvcCtrl: complete
SvcCtrl-->>API: 200 OK
API-->>Client: Success
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 分钟 Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@pkg/controller/pod/pod_forward.go`:
- Around line 208-215: StartPortForwardByPod currently only assigns a random
port when localPort == "" but does not validate non-empty localPort, allowing
non-numeric or out-of-range values to be persisted and return success; change
the logic so that after the empty-check (and after calling getRandomPort() when
empty) you validate localPort is numeric and within 1–65535 (use strconv.Atoi or
strconv.ParseInt and check bounds) and return an error immediately if invalid,
before any DB/write operations or returning OK; keep references to
StartPortForwardByPod and getRandomPort to locate where to add the check.
In `@pkg/controller/svc/svc_forward.go`:
- Around line 127-151: When StartPortForwardByPod returns an error you must
rollback the entry you inserted into svcPortForwardTable to avoid stale
mappings; after calling pod.StartPortForwardByPod (and it returns non-nil err)
acquire svcPortForwardTableMutex, delete
svcPortForwardTable[getSvcForwardMapKey(selectedCluster, ns, svcName,
svcPortStr)] (or restore oldEntry if non-nil) and then unlock before returning
and calling amis.WriteJsonError, ensuring any previous oldEntry is not leaked
and StopPortForwardByPod is not later called on a non-existent forward.
🧹 Nitpick comments (2)
ui/public/pages/ns/svc.json (1)
826-837: “指定端口”开启时建议强制填写端口。
当前local_port在specify_port为 true 时仍可为空,容易导致用户选择“指定端口”却实际走随机端口。可用requiredOn做约束。♻️ 建议修改
{ "type": "input-number", "name": "local_port", "label": "转发端口", "visibleOn": "this.specify_port == true", + "requiredOn": "this.specify_port == true", "min": 1, "max": 65535 }pkg/controller/svc/svc_forward.go (1)
97-112: 建议优先选择 Running/Ready 的 Pod 来转发。
当前直接取pods[0],在滚动更新或存在 Pending/Terminating Pod 时容易失败,即使已有健康 Pod。建议筛选Running且Ready的 Pod。
当启动端口转发失败时,需要回滚对 svcPortForwardTable 的修改,以避免表状态不一致。在错误处理分支中添加了恢复逻辑,确保在失败场景下正确恢复原有表项或删除新添加的条目。
检查本地端口是否为有效的整数且在1-65535范围内,避免无效端口导致的运行时错误。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@pkg/controller/pod/pod_forward.go`:
- Around line 206-262: StartPortForwardByPod currently passes the request-scoped
ctx into kom.Cluster(...).WithContext(ctx), which can be cancelled when the HTTP
request ends and prematurely stop the goroutine; change the call to use a
long-lived context (e.g. context.Background() or a context derived from
background) instead of the request context and rely on stopCh as the sole
lifecycle signal for the port forward; update the invocation around
kom.Cluster(selectedCluster).WithContext(ctx).Resource(...).PortForward(...) to
use WithContext(context.Background()) (or a background-derived context) and
ensure the goroutine only ends when stopCh is closed so that PortForward is not
cancelled by the HTTP request context.
In `@pkg/controller/svc/svc_forward.go`:
- Around line 66-73: The current code silently ignores the error from
c.ShouldBindJSON; change it to first check if a request body exists (e.g.,
inspect c.Request.ContentLength or c.Request.Body) and only call
c.ShouldBindJSON(&req) when non-empty, and when ShouldBindJSON returns an error
respond with a 400 (or appropriate) error instead of continuing; keep the
existing localPort logic (req.LocalPort and the "undefined" -> "" adjustment)
but ensure parsing failures are handled and returned to the caller.
| // StartPortForwardByPod 通过 Pod 信息启动端口转发。 | ||
| // localPort 为空时将随机分配一个本地端口并返回。 | ||
| func StartPortForwardByPod(ctx context.Context, selectedCluster, ns, podName, containerName, podPort, localPort string) (string, error) { | ||
| if _, err := strconv.Atoi(podPort); err != nil { | ||
| return "", fmt.Errorf("无效的容器组端口号: %s", podPort) | ||
| } | ||
| if localPort == "" { | ||
| localPort = getRandomPort() | ||
| } | ||
| localPortInt, err := strconv.Atoi(localPort) | ||
| if err != nil || localPortInt < 1 || localPortInt > 65535 { | ||
| return "", fmt.Errorf("无效的本地端口号: %s", localPort) | ||
| } | ||
|
|
||
| stopCh := make(chan struct{}) | ||
| key := getMapKey(selectedCluster, ns, podName, containerName, podPort) | ||
|
|
||
| var oldStopCh chan struct{} | ||
| portForwardTableMutex.Lock() | ||
| if old, ok := portForwardTable[key]; ok { | ||
| oldStopCh = old.StopCh | ||
| old.StopCh = nil | ||
| old.Status = "stopped" | ||
| old.LocalPort = "" | ||
| } | ||
| portForwardTable[key] = &PortInfo{ | ||
| Cluster: selectedCluster, | ||
| Namespace: ns, | ||
| Name: podName, | ||
| ContainerName: containerName, | ||
| LocalPort: localPort, | ||
| PodPort: podPort, | ||
| Status: "running", | ||
| StopCh: stopCh, | ||
| } | ||
| portForwardTableMutex.Unlock() | ||
| closeStopChSafely(oldStopCh) | ||
|
|
||
| go func() { | ||
| err := kom.Cluster(selectedCluster).WithContext(ctx). | ||
| Resource(&v1.Pod{}). | ||
| Namespace(ns). | ||
| Name(podName). | ||
| Ctl().Pod(). | ||
| ContainerName(containerName). | ||
| PortForward(localPort, podPort, stopCh).Error | ||
| if err != nil { | ||
| portForwardTableMutex.Lock() | ||
| if pf, ok := portForwardTable[key]; ok { | ||
| pf.Status = "failed" | ||
| } | ||
| portForwardTableMutex.Unlock() | ||
| } | ||
| }() | ||
|
|
||
| return localPort, nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Search for GetContextWithUser implementation
echo "=== Searching for GetContextWithUser ==="
rg "GetContextWithUser" -C 5 --type=go
echo ""
echo "=== Searching for PortForward calls ==="
rg "PortForward\(" -C 3 --type=go
echo ""
echo "=== Searching for where StartPortForwardByPod is called ==="
rg "StartPortForwardByPod" -C 3 --type=goRepository: weibaohui/k8m
Length of output: 50370
🏁 Script executed:
#!/bin/bash
# Search for GetContextWithUser implementation in amis package
echo "=== Finding amis package ==="
fd -t f "amis" | head -20
echo ""
echo "=== Search for GetContextWithUser function definition ==="
rg "func.*GetContextWithUser" -A 10 --type=go
echo ""
echo "=== Search for PortForward method implementation ==="
rg "func.*PortForward" -A 5 --type=go | head -50
echo ""
echo "=== Search for response.Context type definition ==="
rg "type Context" --type=go -A 5
echo ""
echo "=== Check ctx usage in the goroutine in pod_forward.go ==="
rg -B5 -A15 "go func\(\)" pkg/controller/pod/pod_forward.goRepository: weibaohui/k8m
Length of output: 5407
🏁 Script executed:
#!/bin/bash
# Find PortForward implementation - likely in kom library or external
echo "=== Search for PortForward implementation in the codebase ==="
rg "func.*PortForward" --type=go -B 2 -A 20
echo ""
echo "=== Check how ctx is typically used in WithContext chain ==="
rg "WithContext" -A 3 --type=go | head -50
echo ""
echo "=== Look for any context-related handling in pod operations ==="
rg "ctx.*Done\|ctx.*Err\|select.*ctx" pkg/controller/pod/pod_forward.go --type=go
echo ""
echo "=== Understand the relationship between stopCh and ctx ==="
cat -n pkg/controller/pod/pod_forward.go | sed -n '206,262p'Repository: weibaohui/k8m
Length of output: 18436
请改用长生命周期的 context,避免请求结束导致端口转发中断
amis.GetContextWithUser() 基于 HTTP 请求的 context(c.Request.Context()),当 HTTP 请求结束时会被自动取消。StartPortForwardByPod 在 goroutine 中将此请求级 context 传递给 kom.Cluster().WithContext(ctx).PortForward(),若 kom 的 PortForward 实现监听 context 取消,则请求完成后端口转发会意外中断。
建议改为在函数执行时使用 context.Background() 作为基础 context,用 stopCh 作为唯一的生命周期控制信号;或在调用处使用更长生命周期的 context。
🤖 Prompt for AI Agents
In `@pkg/controller/pod/pod_forward.go` around lines 206 - 262,
StartPortForwardByPod currently passes the request-scoped ctx into
kom.Cluster(...).WithContext(ctx), which can be cancelled when the HTTP request
ends and prematurely stop the goroutine; change the call to use a long-lived
context (e.g. context.Background() or a context derived from background) instead
of the request context and rely on stopCh as the sole lifecycle signal for the
port forward; update the invocation around
kom.Cluster(selectedCluster).WithContext(ctx).Resource(...).PortForward(...) to
use WithContext(context.Background()) (or a background-derived context) and
ensure the goroutine only ends when stopCh is closed so that PortForward is not
cancelled by the HTTP request context.
| var req struct { | ||
| LocalPort string `json:"local_port"` | ||
| } | ||
| _ = c.ShouldBindJSON(&req) | ||
| localPort := req.LocalPort | ||
| if localPort == "undefined" { | ||
| localPort = "" | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
处理 JSON 解析错误,避免静默吞错
当前忽略 ShouldBindJSON 的返回错误;当请求体损坏时会继续执行并隐藏问题。建议仅在请求体存在时解析并对错误返回。
✅ 建议修复
var req struct {
LocalPort string `json:"local_port"`
}
-_ = c.ShouldBindJSON(&req)
+if c.Request != nil && c.Request.ContentLength > 0 {
+ if err := c.ShouldBindJSON(&req); err != nil {
+ amis.WriteJsonError(c, err)
+ return
+ }
+}
localPort := req.LocalPort
if localPort == "undefined" {
localPort = ""
}🤖 Prompt for AI Agents
In `@pkg/controller/svc/svc_forward.go` around lines 66 - 73, The current code
silently ignores the error from c.ShouldBindJSON; change it to first check if a
request body exists (e.g., inspect c.Request.ContentLength or c.Request.Body)
and only call c.ShouldBindJSON(&req) when non-empty, and when ShouldBindJSON
returns an error respond with a 400 (or appropriate) error instead of
continuing; keep the existing localPort logic (req.LocalPort and the "undefined"
-> "" adjustment) but ensure parsing failures are handled and returned to the
caller.
No description provided.