Skip to content

Commit e60fecc

Browse files
fuziontechclaude
andauthored
admin/live: cancel sessions by worker id (fix cross-CP cancel) (#855)
* admin/live: cancel sessions by worker id so cross-CP cancel works The Live view's Cancel button failed with "no active session with pid N": KillSession scans only the SERVING replica's stacks, but behind the admin ALB the session usually lives on a different CP. And fanning the pid-based cancel out is unsafe — pids collide across CPs (each CP's counter starts at 1000), so a pid fan-out could kill the wrong replica's session. Fix: cancel by the CLUSTER-UNIQUE worker id (the key the detail view already uses), with fan-out — the same collision-safe pattern. - KillSessionByWorkerID(wid): destroys the session on that worker on this replica (0/1), located via SessionForWorker. - POST /sessions/by-worker/:wid/cancel: kills locally, and only if this replica didn't own it, fans out to peers (?scope=local recursion guard) and sums the killed count; 404 only if no replica owns the worker. The old pid route stays (local-only, documented). - UI: the query-row, session-row, and detail-dialog Cancel buttons all address by worker_id now. - Tests: TestCancelByWorkerFansOut (local hit skips fan-out, peer-owned via fan-out, scope=local no-recursion, unknown→404); harness admin_cancel_by_worker kills a real session by worker id + asserts unknown→404. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01NUq2EVxvKQFq3YEDNLF5HP * admin/live: address review nits on cancel-by-worker - Drop cp_responders/cp_total from the by-worker cancel response: a worker is owned by exactly one CP, so non-owning peers 404 (dropped by the fetcher) and the coverage count would undercount and mislead. It's a single-owner op — return just {killed}. (The per-user kill keeps coverage; it IS an aggregate.) - harness admin_cancel_by_worker: hold the session (sleep) longer than the appear-poll budget so a slow cold-start can't exit the client before the session is observed (still cancels well before the 60s idle timeout). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01NUq2EVxvKQFq3YEDNLF5HP --------- Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 20d3388 commit e60fecc

10 files changed

Lines changed: 175 additions & 10 deletions

File tree

controlplane/admin/README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@ Added for the console:
6464
| `GET /api/v1/sessions`, `/workers` | viewer | live sessions / session-holding workers |
6565
| `GET /api/v1/workers/fleet` | viewer | cluster worker counts by lifecycle state |
6666
| `GET /api/v1/cluster/instances` | viewer | live CP replicas (self-flagged) |
67-
| `POST /api/v1/sessions/:pid/cancel` | admin | tear down a session + its worker |
67+
| `POST /api/v1/sessions/:pid/cancel` | admin | tear down a session by pid — LOCAL only (pid is per-CP); prefer the worker-id form |
68+
| `POST /api/v1/sessions/by-worker/:wid/cancel` | admin | tear down the session on a cluster-unique worker id; fans out to whichever CP owns it (pid can't be fanned out — it collides across CPs). Returns `{killed, cp_responders, cp_total}` |
6869
| `POST /api/v1/orgs/:id/users/:username/kill` | admin | per-user kill switch (one-shot): tear down ALL of a user's sessions + in-flight queries cluster-wide. Returns `{killed, cp_responders, cp_total}`. Does NOT block reconnects |
6970
| `POST /api/v1/orgs/:id/users/:username/disable` | admin | persist `disabled=true` (refused at connect on PG wire + Flight), reload the snapshot cluster-wide so the block is immediate, AND kill the user's live sessions. Returns `{disabled, killed, …}` |
7071
| `POST /api/v1/orgs/:id/users/:username/enable` | admin | persist `disabled=false` + reload cluster-wide so the user can reconnect at once |

controlplane/admin/live.go

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ package admin
55
import (
66
"encoding/json"
77
"errors"
8+
"fmt"
89
"net/http"
910
"net/url"
1011
"strconv"
@@ -102,8 +103,14 @@ type LiveInfo interface {
102103
// ControlPlaneInstances returns the live CP replicas.
103104
ControlPlaneInstances() ([]CPInstance, error)
104105
// KillSession tears down the session (and its exclusive worker) for pid.
105-
// Returns an error if no such session exists.
106+
// Returns an error if no such session exists. NOTE: pid is per-CP, not
107+
// cluster-unique — prefer KillSessionByWorkerID for cross-CP cancel.
106108
KillSession(pid int32) error
109+
// KillSessionByWorkerID tears down the session bound to the cluster-unique
110+
// worker id on THIS replica (0 or 1). The handler fans it out so a cancel
111+
// hits whichever replica owns the session — pid can't be used for the
112+
// fan-out because pids collide across CPs.
113+
KillSessionByWorkerID(workerID int) int
107114
// KillUserSessions tears down every active session for (orgID, username) owned
108115
// by THIS control-plane replica and returns the count destroyed. The handler
109116
// fans it out to peers so the kill is cluster-wide. 0 (not an error) when the
@@ -230,7 +237,9 @@ func registerLiveAPI(r *gin.RouterGroup, live LiveInfo, fetcher PeerFetcher, use
230237
c.JSON(http.StatusOK, gin.H{"instances": instances})
231238
})
232239

233-
// Kill a session (admin-only via RoleGate — it's a POST).
240+
// Kill a session by pid (admin-only via RoleGate — it's a POST). LOCAL ONLY:
241+
// pid is per-CP, so this only finds a session the serving replica owns.
242+
// Prefer /sessions/by-worker/:wid/cancel, which fans out.
234243
r.POST("/sessions/:pid/cancel", func(c *gin.Context) {
235244
pid64, err := strconv.ParseInt(c.Param("pid"), 10, 32)
236245
if err != nil {
@@ -244,6 +253,34 @@ func registerLiveAPI(r *gin.RouterGroup, live LiveInfo, fetcher PeerFetcher, use
244253
c.JSON(http.StatusOK, gin.H{"killed": pid64})
245254
})
246255

256+
// Kill a session addressed by CLUSTER-UNIQUE worker id (admin-only POST).
257+
// A session lives on exactly one CP, so this kills locally and, unless it's
258+
// a scope=local peer call, fans out to peer replicas (?scope=local stops the
259+
// recursion). Worker id — not pid — is the address because pids collide
260+
// across CPs, so a pid fan-out could kill the wrong replica's session.
261+
r.POST("/sessions/by-worker/:wid/cancel", func(c *gin.Context) {
262+
wid, err := strconv.Atoi(c.Param("wid"))
263+
if err != nil {
264+
c.JSON(http.StatusBadRequest, gin.H{"error": "invalid worker id"})
265+
return
266+
}
267+
killed := live.KillSessionByWorkerID(wid)
268+
// A worker hosts exactly one session on exactly one CP: only fan out when
269+
// this replica didn't own it (avoids needless peer POSTs on the hit path).
270+
// This is a single-owner op, so — unlike the per-user kill — cp coverage
271+
// isn't meaningful (non-owning peers 404): the response is just {killed}.
272+
if killed == 0 && !localScope(c) && fetcher != nil {
273+
bodies, _ := fetcher.PostPeers(c.Request.Context(), "/api/v1/sessions/by-worker/"+strconv.Itoa(wid)+"/cancel")
274+
k, _ := sumKilled(bodies)
275+
killed += k
276+
}
277+
if killed == 0 {
278+
c.JSON(http.StatusNotFound, gin.H{"error": fmt.Sprintf("no active session on worker %d", wid)})
279+
return
280+
}
281+
c.JSON(http.StatusOK, gin.H{"killed": killed})
282+
})
283+
247284
// Per-user kill switch (admin-only via RoleGate — all POSTs). A user's
248285
// sessions live on whichever CP replica owns each connection, so every action
249286
// kills locally and fans out to peers (?scope=local stops the recursion); the

controlplane/admin/live_aggregate_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ type fakeLiveInfo struct {
2424

2525
killedPerUser int // returned by KillUserSessions (local kill count)
2626
killUserCalls []killUserCall // recorded (org, user) of each KillUserSessions
27+
28+
killByWorkerReturn int // returned by KillSessionByWorkerID (local kill count)
29+
killByWorkerCalls []int // recorded worker ids
2730
}
2831

2932
type killUserCall struct{ org, user string }
@@ -38,6 +41,10 @@ func (f *fakeLiveInfo) QueryDetailForWorkerID(wid int) (QueryDetail, bool) {
3841
func (f *fakeLiveInfo) WorkerFleet() ([]FleetStat, error) { return nil, nil }
3942
func (f *fakeLiveInfo) ControlPlaneInstances() ([]CPInstance, error) { return nil, nil }
4043
func (f *fakeLiveInfo) KillSession(int32) error { return nil }
44+
func (f *fakeLiveInfo) KillSessionByWorkerID(wid int) int {
45+
f.killByWorkerCalls = append(f.killByWorkerCalls, wid)
46+
return f.killByWorkerReturn
47+
}
4148
func (f *fakeLiveInfo) KillUserSessions(org, user string) int {
4249
f.killUserCalls = append(f.killUserCalls, killUserCall{org, user})
4350
return f.killedPerUser

controlplane/admin/live_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,3 +124,56 @@ func TestQueryDetailWorkerIDDistinguishesCollidingPIDs(t *testing.T) {
124124
t.Fatalf("worker 22 should resolve to org-b's query despite the shared pid, got %d %+v", w.Code, b)
125125
}
126126
}
127+
128+
// TestCancelByWorkerFansOut covers the collision-safe cross-CP cancel: the
129+
// serving replica kills locally if it owns the worker, else fans out to peers
130+
// (scope=local guard), and 404s only if no replica owns it.
131+
func TestCancelByWorkerFansOut(t *testing.T) {
132+
// Owned locally → killed without fan-out.
133+
local := &fakeLiveInfo{killByWorkerReturn: 1}
134+
fetcher := &fakePeerFetcher{}
135+
r := liveTestRouter(local, fetcher)
136+
w := httptest.NewRecorder()
137+
r.ServeHTTP(w, httptest.NewRequest(http.MethodPost, "/api/v1/sessions/by-worker/77/cancel", nil))
138+
if w.Code != http.StatusOK {
139+
t.Fatalf("locally-owned cancel: got %d (%s)", w.Code, w.Body.String())
140+
}
141+
if len(local.killByWorkerCalls) != 1 || local.killByWorkerCalls[0] != 77 {
142+
t.Fatalf("KillSessionByWorkerID not called with 77: %v", local.killByWorkerCalls)
143+
}
144+
if fetcher.postCallCount() != 0 {
145+
t.Fatalf("owned locally should not fan out, but PostPeers ran")
146+
}
147+
148+
// Not local → fan out; a peer reports killed:1 → success.
149+
local2 := &fakeLiveInfo{killByWorkerReturn: 0}
150+
peerBody, _ := json.Marshal(map[string]any{"killed": 1})
151+
fetcher2 := &fakePeerFetcher{postByPath: map[string][][]byte{"/api/v1/sessions/by-worker/88/cancel": {peerBody}}}
152+
r2 := liveTestRouter(local2, fetcher2)
153+
w = httptest.NewRecorder()
154+
r2.ServeHTTP(w, httptest.NewRequest(http.MethodPost, "/api/v1/sessions/by-worker/88/cancel", nil))
155+
if w.Code != http.StatusOK {
156+
t.Fatalf("peer-owned cancel via fan-out: got %d (%s)", w.Code, w.Body.String())
157+
}
158+
159+
// scope=local (a peer answering us): kill locally only, NO recursion.
160+
before := fetcher2.postCallCount()
161+
w = httptest.NewRecorder()
162+
r2.ServeHTTP(w, httptest.NewRequest(http.MethodPost, "/api/v1/sessions/by-worker/88/cancel?scope=local", nil))
163+
if w.Code != http.StatusNotFound {
164+
t.Fatalf("scope=local with no local session should 404, got %d", w.Code)
165+
}
166+
if fetcher2.postCallCount() != before {
167+
t.Fatalf("scope=local must not fan out")
168+
}
169+
170+
// Nobody owns it anywhere → 404.
171+
local3 := &fakeLiveInfo{killByWorkerReturn: 0}
172+
fetcher3 := &fakePeerFetcher{}
173+
r3 := liveTestRouter(local3, fetcher3)
174+
w = httptest.NewRecorder()
175+
r3.ServeHTTP(w, httptest.NewRequest(http.MethodPost, "/api/v1/sessions/by-worker/99/cancel", nil))
176+
if w.Code != http.StatusNotFound {
177+
t.Fatalf("unknown worker cancel should 404, got %d", w.Code)
178+
}
179+
}

controlplane/admin/ui/src/components/QueryDetailDialog.tsx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export function QueryDetailDialog({
3434
}: {
3535
workerId: number | null;
3636
onClose: () => void;
37-
onCancel: (pid: number) => void;
37+
onCancel: (workerId: number) => void;
3838
}) {
3939
const detail = useQueryDetail(workerId);
4040
const d = detail.data;
@@ -115,7 +115,7 @@ export function QueryDetailDialog({
115115
variant="ghost"
116116
size="sm"
117117
onClick={() => {
118-
onCancel(d.pid);
118+
onCancel(d.worker_id);
119119
onClose();
120120
}}
121121
>

controlplane/admin/ui/src/hooks/useApi.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -267,7 +267,7 @@ export function useSessions() {
267267
export function useCancelSession() {
268268
const qc = useQueryClient();
269269
return useMutation({
270-
mutationFn: (pid: number) => api.cancelSession(pid),
270+
mutationFn: (workerId: number) => api.cancelSession(workerId),
271271
onSuccess: () => {
272272
qc.invalidateQueries({ queryKey: ["sessions"] });
273273
qc.invalidateQueries({ queryKey: ["queries"] });

controlplane/admin/ui/src/lib/api.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,10 @@ export const api = {
151151
listQueries: () => get<{ queries: RunningQuery[] }>("/queries").then((r) => r.queries ?? []),
152152
// Detail is addressed by cluster-unique worker id (pid is per-org, not unique).
153153
queryDetail: (workerId: number) => get<QueryDetail>(`/queries/by-worker/${workerId}`),
154-
cancelSession: (pid: number) => post<{ killed: number }>(`/sessions/${pid}/cancel`, {}),
154+
// Cancel is addressed by cluster-unique worker id (pid is per-CP and collides
155+
// across replicas); the server fans out to whichever CP owns the session.
156+
cancelSession: (workerId: number) =>
157+
post<{ killed: number }>(`/sessions/by-worker/${workerId}/cancel`, {}),
155158
// Per-user kill switch. killUser terminates all of a user's sessions + queries
156159
// (one-shot); disableUser also persists a block so new connections are refused
157160
// until enableUser. All three fan out across CP replicas server-side.

controlplane/admin/ui/src/pages/Live.tsx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -219,7 +219,7 @@ export function Live() {
219219
disabled={cancel.isPending}
220220
onClick={(e) => {
221221
e.stopPropagation();
222-
cancel.mutate(q.pid);
222+
cancel.mutate(q.worker_id);
223223
}}
224224
>
225225
<Ban className="h-4 w-4 text-destructive" /> Cancel
@@ -272,7 +272,7 @@ export function Live() {
272272
size="sm"
273273
className="-my-1 h-6"
274274
disabled={cancel.isPending}
275-
onClick={() => cancel.mutate(s.pid)}
275+
onClick={() => cancel.mutate(s.worker_id)}
276276
>
277277
<Ban className="h-4 w-4 text-destructive" /> Cancel
278278
</Button>
@@ -289,7 +289,7 @@ export function Live() {
289289
<QueryDetailDialog
290290
workerId={detailWid}
291291
onClose={() => setDetailWid(null)}
292-
onCancel={(pid) => cancel.mutate(pid)}
292+
onCancel={(workerId) => cancel.mutate(workerId)}
293293
/>
294294
</>
295295
);

controlplane/admin_providers.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,21 @@ func (p *clusterInfoProvider) KillSession(pid int32) error {
177177
return fmt.Errorf("no active session with pid %d", pid)
178178
}
179179

180+
// KillSessionByWorkerID tears down the session bound to the cluster-unique
181+
// worker id on this replica, returning the count destroyed (0 or 1 — one
182+
// session per worker). The admin handler fans this out so a cancel reaches
183+
// whichever CP owns the session; addressing by worker id (not the per-CP pid,
184+
// which collides across CPs) makes that fan-out collision-safe.
185+
func (p *clusterInfoProvider) KillSessionByWorkerID(workerID int) int {
186+
for _, stack := range p.router.AllStacks() {
187+
if s, ok := stack.Sessions.SessionForWorker(workerID); ok {
188+
stack.Sessions.DestroySession(s.PID)
189+
return 1
190+
}
191+
}
192+
return 0
193+
}
194+
180195
// KillUserSessions tears down every active session for (orgID, username) owned by
181196
// THIS control-plane replica and returns the count destroyed. It is the local
182197
// half of the cluster-wide per-user kill switch: the admin handler fans this out

tests/e2e-mw-dev/harness.sh

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1604,6 +1604,52 @@ admin_idle_session_flagged() { # org password
16041604
esac
16051605
}
16061606

1607+
# ---- admin: cancel a session by (cluster-unique) worker id ------------------
1608+
# The Live view's Cancel button posts /sessions/by-worker/:wid/cancel. Worker id
1609+
# (not the per-CP pid) is the address, and the handler kills locally or fans out
1610+
# to whichever CP owns the session — so cancel works regardless of which replica
1611+
# the request lands on. (The cross-CP fan-out itself is unit-tested in
1612+
# TestCancelByWorkerFansOut; the CI CP is single-replica, so this covers the
1613+
# real kill + the unknown→404 path end-to-end.)
1614+
admin_cancel_by_worker() { # org password
1615+
org="$1"; pw="$2"
1616+
log "admin: cancel a live session by worker id on $org"
1617+
# Hold longer than the appear-poll budget (30×2s) so a slow cold-start can't
1618+
# exit the client before the session is observed (we cancel it well before
1619+
# the 60s idle timeout anyway).
1620+
( printf 'BEGIN;\n'; sleep 90 ) | PGPASSWORD="$pw" psql \
1621+
"sslmode=require host=$org$SNI_SUFFIX hostaddr=$CP_IP port=5432 user=root dbname=ducklake" \
1622+
-v ON_ERROR_STOP=1 -qtA >/dev/null 2>&1 &
1623+
bg=$!
1624+
cleanup_cbw() { kill "$bg" 2>/dev/null || true; wait "$bg" 2>/dev/null || true; }
1625+
wid="" a=0
1626+
while [ "$a" -lt 30 ]; do
1627+
kill -0 "$bg" 2>/dev/null || break
1628+
wid="$(curl -fsS -H "$H" "$API/api/v1/queries" \
1629+
| jq -r --arg o "$org" 'first(.queries[]? | select(.org==$o and .user=="root")) | .worker_id // empty')"
1630+
[ -n "$wid" ] && break
1631+
sleep 2; a=$((a + 1))
1632+
done
1633+
[ -n "$wid" ] || { cleanup_cbw; fail "admin_cancel_by_worker: session never appeared for $org"; }
1634+
# Cancel it by worker id → killed>=1.
1635+
resp="$(curl -fsS -H "$H" -X POST "$API/api/v1/sessions/by-worker/$wid/cancel")" \
1636+
|| { cleanup_cbw; fail "admin_cancel_by_worker: POST cancel failed for worker $wid"; }
1637+
echo "$resp" | jq -e '.killed >= 1' >/dev/null \
1638+
|| { cleanup_cbw; fail "admin_cancel_by_worker: cancel did not kill worker $wid: $resp"; }
1639+
# The session must disappear from /queries.
1640+
gone="" a=0
1641+
while [ "$a" -lt 15 ]; do
1642+
[ "$(curl -fsS -H "$H" "$API/api/v1/queries" | jq -r --argjson w "$wid" 'any(.queries[]?; .worker_id==$w)')" = "false" ] && { gone=1; break; }
1643+
sleep 2; a=$((a + 1))
1644+
done
1645+
cleanup_cbw
1646+
[ -n "$gone" ] || fail "admin_cancel_by_worker: session on worker $wid still present after cancel"
1647+
# Unknown worker → 404 (not a 500).
1648+
code="$(curl -s -o /dev/null -w '%{http_code}' -H "$H" -X POST "$API/api/v1/sessions/by-worker/999999999/cancel")"
1649+
[ "$code" = "404" ] || fail "admin_cancel_by_worker: unknown worker cancel returned $code, want 404"
1650+
log "admin: cancel by worker id OK (killed worker $wid, unknown→404) on $org"
1651+
}
1652+
16071653
admin_query_detail() { # org password
16081654
org="$1"; pw="$2"
16091655
log "admin live: per-query detail round-trip on $org"
@@ -2099,6 +2145,9 @@ main() {
20992145
# ---- admin live-query detail view (phase 1) — cnpg stack is warm now ----
21002146
admin_query_detail "$CNPG" "$cnpg_pw"
21012147

2148+
# ---- admin: cancel a live session by worker id (cross-CP addressed) ----
2149+
admin_cancel_by_worker "$CNPG" "$cnpg_pw"
2150+
21022151
# ---- admin live: idle-in-transaction session is flagged (state column) ----
21032152
admin_idle_session_flagged "$CNPG" "$cnpg_pw"
21042153

0 commit comments

Comments
 (0)