Skip to content

Commit b4913c9

Browse files
carlydfclaude
andauthored
Flush SDK client cache on PermissionDenied/Unauthenticated errors (#300)
## Summary - Adds `ClientPool.EvictClient(key)` to close and remove a cached SDK client by key - Detects `PermissionDenied` and `Unauthenticated` errors from Temporal SDK calls in the reconcile loop - Evicts the stale client from the pool at both error sites (`GetWorkerDeploymentState` and `executePlan`), so the next reconcile re-reads credentials from the K8s secret and re-dials Without this, a rotated API key or revoked mTLS cert causes a permanent stuck-retry loop that only recovers with a controller restart. Closes #295 Closes #113 ## Test plan - [ ] `go test ./internal/controller/clientpool/...` — new `TestEvictClient_RemovesAndClosesClient` and `TestEvictClient_NoopWhenKeyAbsent` pass - [ ] `go test ./internal/controller/...` — existing controller tests still pass - [ ] `go build ./...` — compiles clean 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 13203fb commit b4913c9

3 files changed

Lines changed: 62 additions & 3 deletions

File tree

internal/controller/clientpool/clientpool.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,17 @@ func New(l log.Logger, c runtimeclient.Client) *ClientPool {
7979
}
8080
}
8181

82+
// EvictClient removes the client for the given key from the pool and closes it.
83+
// Safe to call when the key is not present.
84+
func (cp *ClientPool) EvictClient(key ClientPoolKey) {
85+
cp.mux.Lock()
86+
defer cp.mux.Unlock()
87+
if info, ok := cp.clients[key]; ok {
88+
info.client.Close()
89+
delete(cp.clients, key)
90+
}
91+
}
92+
8293
func (cp *ClientPool) GetSDKClient(key ClientPoolKey) (sdkclient.Client, bool) {
8394
cp.mux.RLock()
8495
defer cp.mux.RUnlock()

internal/controller/clientpool/clientpool_test.go

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,14 +126,15 @@ type mockSDKClient struct {
126126

127127
// Used to verify that CheckHealth is not called when API Key auth is used.
128128
checkHealthCalled bool
129+
closed bool
129130
}
130131

131132
func (m *mockSDKClient) CheckHealth(_ context.Context, _ *sdkclient.CheckHealthRequest) (*sdkclient.CheckHealthResponse, error) {
132133
m.checkHealthCalled = true
133134
return &sdkclient.CheckHealthResponse{}, nil
134135
}
135136

136-
func (m *mockSDKClient) Close() {}
137+
func (m *mockSDKClient) Close() { m.closed = true }
137138

138139
// ─── Tests: fetchClientUsingMTLSSecret ────────────────────────────────────────
139140

@@ -355,6 +356,36 @@ func TestDialAndUpsert_NoCredsCallsCheckHealth(t *testing.T) {
355356
assert.True(t, mock.checkHealthCalled, "CheckHealth must be called for no-credentials auth")
356357
}
357358

359+
// ─── Tests: EvictClient ───────────────────────────────────────────────────────
360+
361+
func TestEvictClient_RemovesAndClosesClient(t *testing.T) {
362+
cp := newTestPool()
363+
key := ClientPoolKey{
364+
HostPort: "localhost:7233",
365+
Namespace: "default",
366+
SecretName: "my-secret",
367+
AuthMode: AuthModeAPIKey,
368+
}
369+
mock := &mockSDKClient{}
370+
cp.SetClientForTesting(key, mock)
371+
372+
_, ok := cp.GetSDKClient(key)
373+
require.True(t, ok, "client should be present before eviction")
374+
375+
cp.EvictClient(key)
376+
377+
assert.True(t, mock.closed, "Close should be called on eviction")
378+
_, ok = cp.GetSDKClient(key)
379+
assert.False(t, ok, "client should be absent after eviction")
380+
}
381+
382+
func TestEvictClient_NoopWhenKeyAbsent(t *testing.T) {
383+
cp := newTestPool()
384+
key := ClientPoolKey{HostPort: "localhost:7233", Namespace: "default", AuthMode: AuthModeNoCredentials}
385+
// Should not panic when key is not in the pool
386+
cp.EvictClient(key)
387+
}
388+
358389
// ─── Helpers ──────────────────────────────────────────────────────────────────
359390

360391
func decodePEMCert(certPEM []byte) (*x509.Certificate, error) {

internal/controller/worker_controller.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ import (
1515
"github.com/temporalio/temporal-worker-controller/internal/k8s"
1616
"github.com/temporalio/temporal-worker-controller/internal/temporal"
1717
"go.temporal.io/api/serviceerror"
18+
"google.golang.org/grpc/codes"
19+
grpcstatus "google.golang.org/grpc/status"
1820
appsv1 "k8s.io/api/apps/v1"
1921
corev1 "k8s.io/api/core/v1"
2022
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -182,12 +184,13 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req
182184
}
183185

184186
// Get or update temporal client for connection
185-
temporalClient, ok := r.TemporalClientPool.GetSDKClient(clientpool.ClientPoolKey{
187+
clientPoolKey := clientpool.ClientPoolKey{
186188
HostPort: temporalConnection.Spec.HostPort,
187189
Namespace: workerDeploy.Spec.WorkerOptions.TemporalNamespace,
188190
SecretName: secretName,
189191
AuthMode: authMode,
190-
})
192+
}
193+
temporalClient, ok := r.TemporalClientPool.GetSDKClient(clientPoolKey)
191194
if !ok {
192195
clientOpts, key, clientAuth, err := r.TemporalClientPool.ParseClientSecret(ctx, secretName, authMode, clientpool.NewClientOptions{
193196
K8sNamespace: workerDeploy.Namespace,
@@ -243,6 +246,9 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req
243246
getControllerIdentity(),
244247
)
245248
if err != nil {
249+
if isAccessDeniedErr(err) {
250+
r.TemporalClientPool.EvictClient(clientPoolKey)
251+
}
246252
var rateLimitErr *serviceerror.ResourceExhausted
247253
if errors.As(err, &rateLimitErr) {
248254
r.recordWarningAndSetBlocked(ctx, &workerDeploy,
@@ -286,6 +292,9 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req
286292

287293
// Execute the plan, handling any errors
288294
if err := r.executePlan(ctx, l, &workerDeploy, temporalClient, plan); err != nil {
295+
if isAccessDeniedErr(err) {
296+
r.TemporalClientPool.EvictClient(clientPoolKey)
297+
}
289298
r.recordWarningAndSetBlocked(ctx, &workerDeploy,
290299
ReasonPlanExecutionFailed,
291300
fmt.Sprintf("Unable to execute reconciliation plan: %v", err),
@@ -525,3 +534,11 @@ func (r *TemporalWorkerDeploymentReconciler) findTWDsUsingConnection(ctx context
525534

526535
return requests
527536
}
537+
538+
func isAccessDeniedErr(err error) bool {
539+
var permDenied *serviceerror.PermissionDenied
540+
if errors.As(err, &permDenied) {
541+
return true
542+
}
543+
return grpcstatus.Code(err) == codes.Unauthenticated
544+
}

0 commit comments

Comments
 (0)