Skip to content

Commit 5c76231

Browse files
Shivs11claude
andauthored
omit DescribeVersion API calls for drained versions (#229)
## Summary - Switch `GetWorkerDeploymentState` from the SDK's `Describe()` to the gRPC `DescribeWorkerDeployment` API, which returns full `DrainageInfo` (including `DrainedSince` timestamps) in the version summaries - Eliminate per-version `DescribeVersion` calls for obtaining drainage timestamps — read them directly from the gRPC summary - Additionally, change/alter the constraint surrounding drained versions that are considered eligible to be deleted by the server from the worker-controller's pov. ## Test plan - [ ] Verify existing unit tests pass (`go test ./internal/temporal/...`) - [ ] Verify integration tests pass (`go test ./internal/tests/...`) - [ ] Verify `DrainedSince` is correctly populated from the gRPC summary - [ ] Verify `NoTaskQueuesHaveVersionedPoller` still works for drained versions with k8s Deployments at 0 replicas - [ ] Verify drained versions without k8s Deployments no longer trigger `DescribeVersion` calls 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 6e8e1d2 commit 5c76231

4 files changed

Lines changed: 157 additions & 108 deletions

File tree

api/v1alpha1/worker_types.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -308,9 +308,14 @@ type DeprecatedWorkerDeploymentVersion struct {
308308
// +optional
309309
DrainedSince *metav1.Time `json:"drainedSince,omitempty"`
310310

311-
// A Version is eligible for deletion if it is drained and has no pollers on any task queue.
312-
// After pollers stop polling, the server will still consider them present until `matching.PollerHistoryTTL`
313-
// has passed.
311+
// A Version is considered eligible for deletion if it is drained and has no
312+
// controller-managed workers with active replicas polling on its task queues.
313+
// The server automatically deletes eligible versions when it needs to make room for new ones.
314+
// If this version has pollers that are not managed by the controller, the server
315+
// will not be able to delete the version. `EligibleForDeletion=true` does not consider
316+
// that scenario, because it is rare and it is too expensive to cover (requires describing
317+
// every drained version without controller-managed pollers and all task queues in
318+
// that version).
314319
// +optional
315320
EligibleForDeletion bool `json:"eligibleForDeletion,omitempty"`
316321
}

internal/controller/reconciler_events_test.go

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@ import (
1717
temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
1818
"github.com/temporalio/temporal-worker-controller/internal/controller/clientpool"
1919
"github.com/temporalio/temporal-worker-controller/internal/planner"
20+
deploymentpb "go.temporal.io/api/deployment/v1"
2021
"go.temporal.io/api/serviceerror"
22+
"go.temporal.io/api/workflowservice/v1"
2123
sdkclient "go.temporal.io/sdk/client"
24+
"google.golang.org/grpc"
2225
appsv1 "k8s.io/api/apps/v1"
2326
corev1 "k8s.io/api/core/v1"
2427
"k8s.io/apimachinery/pkg/api/meta"
@@ -215,24 +218,48 @@ type stubWDClient struct {
215218

216219
func (s *stubWDClient) GetHandle(_ string) sdkclient.WorkerDeploymentHandle { return s.handle }
217220

221+
// stubWorkflowServiceClient implements workflowservice.WorkflowServiceClient, returning
222+
// a valid empty response for DescribeWorkerDeployment (no versions, no routing config),
223+
// or a configurable error if describeDeploymentErr is set.
224+
type stubWorkflowServiceClient struct {
225+
workflowservice.WorkflowServiceClient
226+
describeDeploymentErr error
227+
}
228+
229+
func (s *stubWorkflowServiceClient) DescribeWorkerDeployment(_ context.Context, _ *workflowservice.DescribeWorkerDeploymentRequest, _ ...grpc.CallOption) (*workflowservice.DescribeWorkerDeploymentResponse, error) {
230+
if s.describeDeploymentErr != nil {
231+
return nil, s.describeDeploymentErr
232+
}
233+
return &workflowservice.DescribeWorkerDeploymentResponse{
234+
WorkerDeploymentInfo: &deploymentpb.WorkerDeploymentInfo{
235+
RoutingConfig: &deploymentpb.RoutingConfig{},
236+
},
237+
}, nil
238+
}
239+
218240
// stubTemporalClient implements sdkclient.Client, routing WorkerDeploymentClient and
219241
// ExecuteWorkflow to configurable stubs.
220242
type stubTemporalClient struct {
221243
sdkclient.Client
222-
wdClient sdkclient.WorkerDeploymentClient
223-
execErr error
244+
wdClient sdkclient.WorkerDeploymentClient
245+
execErr error
246+
describeDeploymentErr error
224247
}
225248

226249
func (s *stubTemporalClient) WorkerDeploymentClient() sdkclient.WorkerDeploymentClient {
227250
return s.wdClient
228251
}
229252

253+
func (s *stubTemporalClient) WorkflowService() workflowservice.WorkflowServiceClient {
254+
return &stubWorkflowServiceClient{describeDeploymentErr: s.describeDeploymentErr}
255+
}
256+
230257
func (s *stubTemporalClient) ExecuteWorkflow(_ context.Context, _ sdkclient.StartWorkflowOptions, _ interface{}, _ ...interface{}) (sdkclient.WorkflowRun, error) {
231258
return nil, s.execErr
232259
}
233260

234-
// newStubTemporalClient returns a stub client whose Describe returns NotFound (no existing
235-
// Worker Deployment), and whose ExecuteWorkflow returns execErr.
261+
// newStubTemporalClient returns a stub client whose WorkflowService().DescribeWorkerDeployment
262+
// returns a valid empty response, and whose ExecuteWorkflow returns execErr.
236263
func newStubTemporalClient(execErr error) *stubTemporalClient {
237264
handle := &stubWDHandle{describeErr: &serviceerror.NotFound{}}
238265
return &stubTemporalClient{
@@ -511,6 +538,36 @@ func TestReconcile_PlanExecutionFailed_EmitsEvent(t *testing.T) {
511538
assertEventEmitted(t, drainEvents(recorder), ReasonPlanExecutionFailed)
512539
}
513540

541+
// TestReconcile_DescribeWorkerDeploymentNotFound verifies that when the gRPC
542+
// DescribeWorkerDeployment call returns NotFound (no deployment exists in Temporal yet),
543+
// reconciliation succeeds and proceeds to plan generation (creating a new k8s Deployment).
544+
func TestReconcile_DescribeWorkerDeploymentNotFound(t *testing.T) {
545+
k8sNamespace := "default"
546+
hostPort := "localhost:7233"
547+
548+
tc := makeNoCredsTemporalConnection("my-conn", k8sNamespace, hostPort)
549+
twd := makeTWD("test-worker", k8sNamespace, tc.Name)
550+
551+
r, recorder := newTestReconcilerWithInterceptors([]client.Object{twd, tc}, interceptor.Funcs{})
552+
553+
stub := newStubTemporalClient(nil)
554+
stub.describeDeploymentErr = &serviceerror.NotFound{}
555+
r.TemporalClientPool.SetClientForTesting(
556+
noCredsPoolKey(tc.Spec.HostPort, twd.Spec.WorkerOptions.TemporalNamespace),
557+
stub,
558+
)
559+
560+
_, err := r.Reconcile(context.Background(), ctrl.Request{
561+
NamespacedName: types.NamespacedName{Name: twd.Name, Namespace: twd.Namespace},
562+
})
563+
564+
// NotFound on the first reconcile means the Worker Deployment has not come up on the
565+
// server side yet; however, the k8s Deployment would be created by the controller
566+
// with no reconciliation errors.
567+
require.NoError(t, err)
568+
assertNoEventEmitted(t, drainEvents(recorder), ReasonPlanGenerationFailed)
569+
}
570+
514571
// ─── executeK8sOperations tests ──────────────────────────────────────────────
515572

516573
func TestExecuteK8sOperations_EmitsEventOnFailure(t *testing.T) {

internal/controller/state_mapper.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,9 +157,19 @@ func (m *stateMapper) mapDeprecatedWorkerDeploymentVersionByBuildID(buildID stri
157157
return nil
158158
}
159159

160+
// A drained version is eligible for server-side auto-deletion only if its pollers
161+
// have stopped. We approximate this by checking whether a controller-managed k8s
162+
// Deployment with active replicas exists; if so, pollers are likely still running
163+
// and the server cannot delete the version yet, making it ineligible for deletion.
164+
// Note: This only considers pollers managed by the worker-controller. External
165+
// pollers (e.g., manually deployed workers) are not accounted for.
160166
eligibleForDeletion := false
161167
if vInfo, exists := m.temporalState.Versions[buildID]; exists {
162-
eligibleForDeletion = vInfo.Status == v1alpha1.VersionStatusDrained && vInfo.NoTaskQueuesHaveVersionedPoller
168+
hasActiveDeployment := false
169+
if d, ok := m.k8sState.Deployments[buildID]; ok {
170+
hasActiveDeployment = d.Status.Replicas > 0
171+
}
172+
eligibleForDeletion = vInfo.Status == v1alpha1.VersionStatusDrained && !hasActiveDeployment
163173
}
164174

165175
version := &v1alpha1.DeprecatedWorkerDeploymentVersion{

0 commit comments

Comments
 (0)