Skip to content

Commit 001d216

Browse files
authored
feat(worker-pool): one query session per worker in k8s/remote mode (#691)
* feat(worker-pool): one query session per worker in k8s/remote mode Guarantee deterministic per-query resources in the control-plane remote (k8s) backend: a worker pod serves exactly one client query session, so each query owns the pod's full resources (workerDuckDBLimits already hands one session ~75% of pod RAM + all cores) and a heavy query can never be starved or OOM'd by a co-resident one. Previously this held only emergently (OrgReservedPool never co-assigned), with nothing enforcing it and the resource math already assuming it — a single co-assignment would have caused ~150% memory overcommit. Scope: remote/k8s (OrgReservedPool) only. Standalone and process backends unchanged. - Enforce the invariant: spawn remote worker pods with DUCKGRES_DUCKDB_MAX_SESSIONS=1 (k8s_pool.go). A 2nd concurrent CreateSession is rejected, not silently overcommitted. Internal control/maintenance work runs on the worker's controlDB/warmupDB side connections (not counted sessions), so cap=1 doesn't starve it. Remove the dead leastLoadedAssignedWorkerLocked so no one re-wires session sharing into the org path. - At org max workers + all busy: fail fast with the clear org-cap message ("your organization has reached its maximum number of concurrent Duckgres workers...") instead of busy-waiting until the client deadline. Works without a runtime store via an in-process cap check (atOrgWorkerCap). - Under cap + all busy: hold up to warmAcquireTimeout for a worker to spawn (bounded by ctx) — now for default/exclusive requests too, not only colocated. - Anti-snatch: serialize the slow acquire path per org with a cancel-safe FIFO turnstile (orgAcquireGate) so a worker the CP scaled up for an earlier waiter cannot be snatched by a later connection. The fast idle-reuse path stays ungated (only reuses already-org-owned Hot workers; neutral warm workers are claimed only through the gate). - Destroy-before-reuse ordering already holds (session_mgr awaits the worker DestroySession RPC before ReleaseWorker); documented as load-bearing for cap=1. Tests: - duckdbservice: CreateSession rejects the 2nd session at MaxSessions=1. - controlplane: org-cap fast-fail; FIFO gate ordering + cancelled-waiter skip (race-clean); pod spec carries DUCKGRES_DUCKDB_MAX_SESSIONS=1. - e2e harness: one_session_per_worker — two concurrent queries for one org land on two distinct worker pods with correct results (regression net for sharing). Docs: CLAUDE.md gains a "Worker Session Model" load-bearing contract section (and a drain-protocol summary) so future changes don't reintroduce session sharing or break the cap/hold/anti-snatch/destroy-before-reuse guarantees. * feat(worker-pool): recover from worker session-cap drift instead of failing the query If a worker rejects a control-plane-scheduled CreateSession because it already holds its max session (the one-session-per-worker invariant momentarily drifted between the CP's view and the worker's actual session count), don't punish the client for our broken logic. CreateSessionWithProtocol now: - detects the worker's "max sessions reached" rejection (isWorkerSessionCapError), - logs loudly at ERROR and bumps a new metric (duckgres_control_plane_worker_session_cap_drift_total) so the drift is visible and we can fix the root cause, - retires/recycles the inconsistent worker (graceful via the drain protocol), and - re-acquires a fresh worker and retries, bounded by maxWorkerSessionCapDriftRetries. So a transient drift self-heals (the query lands on a healthy worker) while a persistent drift surfaces a clear error after the bounded retries rather than spinning. Recovery + loud logging: fix our bug without dropping the user's query. Unit test covers the wrapped-error classifier; CLAUDE.md documents the behavior in the worker-session-model contract.
1 parent 95e3c24 commit 001d216

15 files changed

Lines changed: 620 additions & 156 deletions

CLAUDE.md

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,73 @@ DML with RETURNING is rejected at extended-query Describe time with SQLSTATE `0A
162162
- LIMIT 0 does NOT prevent CTE side effects — Postgres CTEs are optimization fences, so writable CTEs execute even with LIMIT 0.
163163
- DuckDB does not currently support MERGE. If it adds MERGE RETURNING, add `MERGE` to the prefix check in `isDMLReturning`.
164164

165+
## Worker Session Model (k8s / remote backend) — LOAD-BEARING CONTRACT
166+
167+
In the **control-plane remote/k8s backend** a worker pod serves **exactly one
168+
client query session at a time**. This is deliberate: `workerDuckDBLimits`
169+
(`controlplane/control.go`) gives the single session ~75% of the *whole pod's*
170+
RAM + all CPU cores — it does NOT divide by session count. Two sessions on one
171+
pod would each believe they own 75% → ~150% overcommit → nondeterministic OOM /
172+
a heavy query killed by a co-resident one. Do not break the following:
173+
174+
- **One session per worker is enforced, not emergent.** The CP spawns remote
175+
worker pods with `DUCKGRES_DUCKDB_MAX_SESSIONS=1` (`k8s_pool.go::spawnWorker`).
176+
A 2nd concurrent `CreateSession` on a worker is rejected, not silently
177+
overcommitted. Internal control/maintenance work uses the worker's side
178+
connections (`controlDB`/`warmupDB`), which are NOT counted sessions — so
179+
cap=1 does not starve them. Do not raise this to >1 for k8s workers, and do not
180+
route internal work through `CreateSession`.
181+
- **`OrgReservedPool` (remote/multitenant) must never co-assign.** It reuses only
182+
idle (`activeSessions==0`, Hot, org-owned) workers via
183+
`findIdleAssignedWorkerLocked`, or claims/spawns a fresh one. There is NO
184+
least-loaded "share onto a busy worker" path (that exists only in the
185+
single-tenant flat `K8sWorkerPool.AcquireWorker`, which is not used in remote
186+
mode). Do NOT add one, and do not resurrect a `leastLoaded*` helper here.
187+
- **At org max workers + all busy → fail fast with the clear org-cap message**
188+
(`WorkerClaimMissReasonOrgCap`, see `warm_capacity_policy.go`). Never
189+
busy-wait at cap.
190+
- **Under cap + all busy → hold for a spawn** up to `warmAcquireTimeout` (bounded
191+
by the client ctx). This applies to default/exclusive requests too, not just
192+
colocated.
193+
- **FIFO anti-snatch:** the slow acquisition path is serialized per org by
194+
`orgAcquireGate` (`org_acquire_gate.go`) so a worker the CP scaled up for an
195+
earlier waiter cannot be snatched by a later connection. Keep the gate
196+
cancel-safe (a queued waiter whose ctx is cancelled must be skipped, not
197+
deadlock the gate).
198+
- **Destroy-before-reuse ordering:** `SessionManager.DestroySession`
199+
(`session_mgr.go`) MUST await the worker-side `DestroySession` RPC *before*
200+
`ReleaseWorker`, so a reused (hot-idle) worker's prior session is gone before
201+
the next one is assigned (otherwise cap=1 spuriously rejects the reuse).
202+
- **Cap-drift is recovered, not fatal:** if a worker still rejects a CP-scheduled
203+
session at its cap (CP↔worker accounting drift — should never happen),
204+
`SessionManager.CreateSessionWithProtocol` does NOT fail the client: it logs
205+
loudly (ERROR), bumps `duckgres_control_plane_worker_session_cap_drift_total`,
206+
retires (recycles) the inconsistent worker, and re-acquires a fresh one
207+
(bounded by `maxWorkerSessionCapDriftRetries`). Detection is
208+
`isWorkerSessionCapError` (matches the worker's "max sessions reached"
209+
message). A nonzero drift metric means the scheduling invariant is broken —
210+
fix the root cause, don't just lean on the retry.
211+
212+
Touching any of: `controlplane/org_reserved_pool.go`, `org_acquire_gate.go`,
213+
`k8s_pool.go::spawnWorker`/`AcquireWorker`, `control.go::workerDuckDBLimits`, or
214+
`duckdbservice` session counting → update the unit tests
215+
(`org_reserved_pool_test.go`, `org_acquire_gate_test.go`,
216+
`duckdbservice/service_test.go`) AND the `one_session_per_worker` assertion in
217+
`tests/e2e-mw-dev/harness.sh`.
218+
219+
## Worker Drain Protocol (graceful shutdown, #690)
220+
221+
Remote worker pods drain on SIGTERM (pod deletion): they reject new work, keep
222+
in-flight work alive, then exit; the CP marks them `Draining` (not crashed) and
223+
retires them cleanly. Drain readiness is tracked by a refcount (`activeWork` in
224+
`duckdbservice/service.go`) of "drain tokens" — one taken per unit of in-flight
225+
work (query, txn, metadata stream, COPY, activation), released when it finishes.
226+
Invariants: take exactly one token when work starts and release exactly one when
227+
it ends on **every** path (a leak hangs drain to the shutdown timeout, an early
228+
release lets shutdown kill live work); `reapIdle` releases tokens stranded by a
229+
`GetFlightInfo` whose `DoGet` never arrived. `terminationGracePeriodSeconds=3600`
230+
(`k8s_pool.go`) must stay above `workerShutdownDrainTime` (55m).
231+
165232
## TODO Reference
166233

167234
See `TODO.md` for the full feature roadmap and known issues.

controlplane/control_cancel_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func TestSessionCreationErrorResponse(t *testing.T) {
113113
{
114114
name: "org capacity exhausted",
115115
reason: configstore.WorkerClaimMissReasonOrgCap,
116-
message: "Duckgres worker capacity for this organization is currently exhausted; retry later",
116+
message: "your organization has reached its maximum number of concurrent Duckgres workers and they are all busy; retry once a query finishes",
117117
},
118118
{
119119
name: "global capacity exhausted",

controlplane/flight_ingress_metrics.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,20 @@ func observeControlPlaneWorkerAcquireFailure(reason string) {
8888
controlPlaneWorkerAcquireFailuresCounter.WithLabelValues(reason).Inc()
8989
}
9090

91+
// controlPlaneWorkerSessionCapDriftCounter counts times a worker rejected a
92+
// control-plane-scheduled CreateSession because it already held its max session
93+
// — a CP↔worker accounting drift that must never happen under the
94+
// one-session-per-worker contract. Should sit at 0; a sustained nonzero rate
95+
// means scheduling is double-assigning workers (alert on it).
96+
var controlPlaneWorkerSessionCapDriftCounter = promauto.NewCounter(prometheus.CounterOpts{
97+
Name: "duckgres_control_plane_worker_session_cap_drift_total",
98+
Help: "Times a worker rejected a CP-scheduled CreateSession at its session cap (CP↔worker accounting drift; recovered by recycling the worker and retrying).",
99+
})
100+
101+
func observeWorkerSessionCapDrift() {
102+
controlPlaneWorkerSessionCapDriftCounter.Inc()
103+
}
104+
91105
func observeFlightSessionsReaped(trigger string, count int) {
92106
if count <= 0 {
93107
return

controlplane/k8s_pool.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,20 @@ func (p *K8sWorkerPool) spawnWorker(ctx context.Context, id int, image string, p
775775
Name: "DUCKGRES_KEY",
776776
Value: workerRPCMountDir + "/" + workerRPCKeyKey,
777777
},
778+
{
779+
// One client query session per worker pod: the pod's full
780+
// resources (workerDuckDBLimits gives the session ~75% of pod
781+
// RAM + all cores) belong to a single query, so queries never
782+
// contend and a heavy query can't be OOM'd by a co-resident
783+
// one. The CP scheduler (OrgReservedPool) already never
784+
// co-assigns; this is the hard worker-side guarantee — a 2nd
785+
// CreateSession is rejected rather than silently overcommitting.
786+
// Internal control/maintenance work runs on the worker's side
787+
// connections (controlDB/warmupDB), which are NOT counted
788+
// sessions, so this does not starve them.
789+
Name: "DUCKGRES_DUCKDB_MAX_SESSIONS",
790+
Value: "1",
791+
},
778792
},
779793
SecurityContext: &corev1.SecurityContext{
780794
AllowPrivilegeEscalation: boolPtr(false),

controlplane/k8s_pool_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3793,6 +3793,7 @@ func assertSpawnedWorkerPod(t *testing.T, pod *corev1.Pod) {
37933793
foundSharedWarmWorkerEnv := false
37943794
foundTLSCertEnv := false
37953795
foundTLSKeyEnv := false
3796+
foundMaxSessionsEnv := false
37963797
for _, env := range c.Env {
37973798
if env.Name == "DUCKGRES_DUCKDB_TOKEN" && env.ValueFrom != nil &&
37983799
env.ValueFrom.SecretKeyRef != nil &&
@@ -3808,6 +3809,9 @@ func assertSpawnedWorkerPod(t *testing.T, pod *corev1.Pod) {
38083809
if env.Name == "DUCKGRES_KEY" && env.Value == "/etc/duckgres/worker-rpc/tls.key" {
38093810
foundTLSKeyEnv = true
38103811
}
3812+
if env.Name == "DUCKGRES_DUCKDB_MAX_SESSIONS" && env.Value == "1" {
3813+
foundMaxSessionsEnv = true
3814+
}
38113815
}
38123816
if !foundEnv {
38133817
t.Fatal("bearer token env var not found or incorrect")
@@ -3818,6 +3822,9 @@ func assertSpawnedWorkerPod(t *testing.T, pod *corev1.Pod) {
38183822
if !foundTLSCertEnv || !foundTLSKeyEnv {
38193823
t.Fatal("expected worker RPC TLS env vars to be present")
38203824
}
3825+
if !foundMaxSessionsEnv {
3826+
t.Fatal("expected DUCKGRES_DUCKDB_MAX_SESSIONS=1 (one query session per worker)")
3827+
}
38213828

38223829
if len(pod.Spec.Volumes) == 0 {
38233830
t.Fatal("expected configmap volume")

controlplane/org_acquire_gate.go

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
//go:build kubernetes
2+
3+
package controlplane
4+
5+
import (
6+
"context"
7+
"sync"
8+
)
9+
10+
// orgAcquireGate is a cancellable FIFO turnstile. It serializes the slow
11+
// worker-acquisition path (no idle worker → claim/spawn) for one org so that a
12+
// newly-spawned or freed worker is handed to the EARLIEST waiting connection,
13+
// and a later-arriving connection cannot snatch it. Plain sync.Mutex is
14+
// unsuitable: it is not strictly FIFO and a goroutine blocked in Lock() cannot
15+
// abort when its request context is cancelled (client disconnect / deadline).
16+
//
17+
// Holders must call release() exactly once (defer) after acquire() returns nil.
18+
type orgAcquireGate struct {
19+
mu sync.Mutex
20+
held bool
21+
queue []*gateWaiter
22+
}
23+
24+
type gateWaiter struct {
25+
ready chan struct{} // closed when the gate is granted to this waiter
26+
canceled bool // set under mu when the waiter abandoned before grant
27+
}
28+
29+
func newOrgAcquireGate() *orgAcquireGate { return &orgAcquireGate{} }
30+
31+
// acquire blocks until this caller owns the gate (FIFO) or ctx is done. On a nil
32+
// return the caller owns the gate and MUST call release().
33+
func (g *orgAcquireGate) acquire(ctx context.Context) error {
34+
g.mu.Lock()
35+
if !g.held {
36+
g.held = true
37+
g.mu.Unlock()
38+
return nil
39+
}
40+
w := &gateWaiter{ready: make(chan struct{})}
41+
g.queue = append(g.queue, w)
42+
g.mu.Unlock()
43+
44+
select {
45+
case <-w.ready:
46+
return nil
47+
case <-ctx.Done():
48+
g.mu.Lock()
49+
select {
50+
case <-w.ready:
51+
// Granted concurrently with cancellation: we now own the gate, so we
52+
// must pass it on rather than leak it.
53+
g.mu.Unlock()
54+
g.release()
55+
default:
56+
w.canceled = true
57+
g.mu.Unlock()
58+
}
59+
return ctx.Err()
60+
}
61+
}
62+
63+
// release hands the gate to the next live waiter (FIFO), or marks it free.
64+
func (g *orgAcquireGate) release() {
65+
g.mu.Lock()
66+
for len(g.queue) > 0 {
67+
w := g.queue[0]
68+
g.queue = g.queue[1:]
69+
if w.canceled {
70+
continue // waiter gave up; skip it
71+
}
72+
close(w.ready) // grant; held stays true (ownership transfers)
73+
g.mu.Unlock()
74+
return
75+
}
76+
g.held = false
77+
g.mu.Unlock()
78+
}
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
//go:build kubernetes
2+
3+
package controlplane
4+
5+
import (
6+
"context"
7+
"sync"
8+
"testing"
9+
"time"
10+
)
11+
12+
// The gate must grant ownership to one holder at a time and release to waiters
13+
// in FIFO arrival order — this is what stops a later connection from snatching a
14+
// worker a longer-waiting one is owed.
15+
func TestOrgAcquireGateFIFOOrder(t *testing.T) {
16+
g := newOrgAcquireGate()
17+
18+
// First acquire wins immediately.
19+
if err := g.acquire(context.Background()); err != nil {
20+
t.Fatalf("first acquire: %v", err)
21+
}
22+
23+
const n = 5
24+
entered := make([]int, 0, n)
25+
var mu sync.Mutex
26+
var wg sync.WaitGroup
27+
starts := make([]chan struct{}, n)
28+
29+
for i := 0; i < n; i++ {
30+
starts[i] = make(chan struct{})
31+
wg.Add(1)
32+
go func(idx int) {
33+
defer wg.Done()
34+
<-starts[idx] // queue in deterministic order
35+
if err := g.acquire(context.Background()); err != nil {
36+
t.Errorf("waiter %d acquire: %v", idx, err)
37+
return
38+
}
39+
mu.Lock()
40+
entered = append(entered, idx)
41+
mu.Unlock()
42+
g.release()
43+
}(i)
44+
}
45+
46+
// Release each waiter onto the queue one at a time so arrival order is 0..n-1.
47+
for i := 0; i < n; i++ {
48+
close(starts[i])
49+
time.Sleep(10 * time.Millisecond)
50+
}
51+
52+
g.release() // hand the gate to the FIFO queue head
53+
wg.Wait()
54+
55+
for i := 0; i < n; i++ {
56+
if entered[i] != i {
57+
t.Fatalf("gate granted out of FIFO order: got %v want [0 1 2 3 4]", entered)
58+
}
59+
}
60+
}
61+
62+
// A waiter whose context is cancelled while queued must not deadlock the gate:
63+
// release() skips it and grants to the next live waiter.
64+
func TestOrgAcquireGateCancelledWaiterIsSkipped(t *testing.T) {
65+
g := newOrgAcquireGate()
66+
if err := g.acquire(context.Background()); err != nil {
67+
t.Fatalf("first acquire: %v", err)
68+
}
69+
70+
// Waiter A queues, then cancels.
71+
ctxA, cancelA := context.WithCancel(context.Background())
72+
aDone := make(chan error, 1)
73+
go func() { aDone <- g.acquire(ctxA) }()
74+
time.Sleep(20 * time.Millisecond)
75+
76+
// Waiter B queues behind A.
77+
bGot := make(chan struct{}, 1)
78+
go func() {
79+
if err := g.acquire(context.Background()); err == nil {
80+
bGot <- struct{}{}
81+
}
82+
}()
83+
time.Sleep(20 * time.Millisecond)
84+
85+
cancelA()
86+
if err := <-aDone; err == nil {
87+
t.Fatal("expected cancelled waiter A to return an error")
88+
}
89+
90+
// Hand off the gate: A is cancelled, so B must get it.
91+
g.release()
92+
select {
93+
case <-bGot:
94+
case <-time.After(2 * time.Second):
95+
t.Fatal("waiter B did not acquire the gate after A cancelled (gate leaked)")
96+
}
97+
g.release()
98+
}

0 commit comments

Comments
 (0)