Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
2f52229
fix: deleting a TWD leaves stale versioning data on Temporal server, …
anujagrawal380 Mar 24, 2026
c0ebbaf
fix: persist connection until twd deletes for cleanup
anujagrawal380 Mar 25, 2026
ba857b9
fix: add else log for no ramping version
anujagrawal380 Mar 25, 2026
cd83d2c
fix: use single temporal.io/delete-protection finalizer
anujagrawal380 Mar 25, 2026
05fb227
fix: integration tests
anujagrawal380 Mar 26, 2026
04b61a2
fix: add deletion timeout, strict retry, and RBAC markers for connect…
anujagrawal380 Apr 22, 2026
0ab31a3
fix: remove unused mgr parameter and dead env construction from delet…
anujagrawal380 Apr 26, 2026
a1abea1
fix: remove redundant l.Error log and add ramping version test coverage
anujagrawal380 Apr 27, 2026
202c72d
fix: clear ramping version before setting current to unversioned on d…
anujagrawal380 Apr 27, 2026
9e879a7
fix: re-describe after clearing ramping version to get fresh Conflict…
anujagrawal380 Apr 27, 2026
2e4808b
fix: update handleDeletion docstring to reflect actual cleanup sequence
anujagrawal380 Apr 27, 2026
5d1c04c
fix: regenerate rbac.yaml via make manifests
anujagrawal380 Apr 27, 2026
73d5233
fix: handle AllAtOnce race in deletion test and regenerate CRD manifests
anujagrawal380 Apr 27, 2026
f8e49d9
fix: apply gofmt alignment to deletion integration test
anujagrawal380 Apr 27, 2026
53499d0
fix: use controller identity client to revert current version in dele…
anujagrawal380 Apr 27, 2026
0d587d5
fix: switch deletion test to manual rollout strategy to avoid races
anujagrawal380 Apr 27, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions helm/temporal-worker-controller/templates/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,15 @@ rules:
verbs:
- get
- list
- patch
- update
- watch
- apiGroups:
- temporal.io
resources:
- temporalconnections/finalizers
verbs:
- update
Comment thread
anujagrawal380 marked this conversation as resolved.
- apiGroups:
- temporal.io
resources:
Expand Down
249 changes: 248 additions & 1 deletion internal/controller/worker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import (
"fmt"
"time"

"github.com/go-logr/logr"
temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
"github.com/temporalio/temporal-worker-controller/internal/controller/clientpool"
"github.com/temporalio/temporal-worker-controller/internal/k8s"
"github.com/temporalio/temporal-worker-controller/internal/temporal"
"go.temporal.io/api/serviceerror"
sdkclient "go.temporal.io/sdk/client"
"google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
appsv1 "k8s.io/api/apps/v1"
Expand All @@ -28,6 +30,7 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
Expand All @@ -44,6 +47,12 @@ const (

// wrtWorkerRefKey is the field index key for WorkerResourceTemplate by temporalWorkerDeploymentRef.name.
wrtWorkerRefKey = ".spec.temporalWorkerDeploymentRef.name"

// finalizerName is the finalizer added to TemporalWorkerDeployment and TemporalConnection
// resources to prevent deletion before cleanup actions are taken. On TWD resources, it
// ensures Temporal server-side versioning data is cleaned up. On TemporalConnection
// resources, it prevents deletion while any TWD still references the connection.
finalizerName = "temporal.io/delete-protection"
)

// getAPIKeySecretName extracts the secret name from a SecretKeySelector
Expand Down Expand Up @@ -101,7 +110,8 @@ type TemporalWorkerDeploymentReconciler struct {
// +kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments/finalizers,verbs=update
// +kubebuilder:rbac:groups=temporal.io,resources=temporalconnections,verbs=get;list;watch
// +kubebuilder:rbac:groups=temporal.io,resources=temporalconnections,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups=temporal.io,resources=temporalconnections/finalizers,verbs=update
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps,resources=deployments/scale,verbs=update
Expand Down Expand Up @@ -139,6 +149,40 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req
return ctrl.Result{}, r.markWRTsTWDNotFound(ctx, req.NamespacedName)
}

// Handle deletion: clean up Temporal server-side versioning data before allowing
// the CRD to be deleted. Without this, stale build ID routing persists in Temporal
// and prevents unversioned workers from picking up tasks on the same task queue.
if !workerDeploy.DeletionTimestamp.IsZero() {
if controllerutil.ContainsFinalizer(&workerDeploy, finalizerName) {
l.Info("TemporalWorkerDeployment is being deleted, running cleanup")
if err := r.handleDeletion(ctx, l, &workerDeploy); err != nil {
l.Error(err, "failed to clean up Temporal server-side deployment data, will retry")
return ctrl.Result{RequeueAfter: 10 * time.Second}, nil
}

// Remove our finalizer from the TemporalConnection if no other TWDs reference it.
if err := r.removeConnectionFinalizerIfUnused(ctx, l, &workerDeploy); err != nil {
return ctrl.Result{}, err
}

// Cleanup succeeded, remove the finalizer so K8s can delete the resource
controllerutil.RemoveFinalizer(&workerDeploy, finalizerName)
if err := r.Update(ctx, &workerDeploy); err != nil {
return ctrl.Result{}, err
}
l.Info("Temporal server-side cleanup complete, finalizer removed")
}
return ctrl.Result{}, nil
}

// Ensure finalizer is present on non-deleted resources
if !controllerutil.ContainsFinalizer(&workerDeploy, finalizerName) {
controllerutil.AddFinalizer(&workerDeploy, finalizerName)
if err := r.Update(ctx, &workerDeploy); err != nil {
return ctrl.Result{}, err
}
}

// TODO(jlegrone): Set defaults via webhook rather than manually
if err := workerDeploy.Default(ctx, &workerDeploy); err != nil {
l.Error(err, "TemporalWorkerDeployment defaulter failed")
Expand Down Expand Up @@ -172,6 +216,13 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req
return ctrl.Result{}, err
}

// Ensure our finalizer is on the TemporalConnection so it cannot be deleted
// while this TWD still references it. This guarantees the connection is available
// during TWD deletion cleanup.
if err := r.ensureConnectionFinalizer(ctx, l, &temporalConnection); err != nil {
return ctrl.Result{}, err
}

// Get the Auth Mode and Secret Name
authMode, secretName, err := resolveAuthSecretName(&temporalConnection)
if err != nil {
Expand Down Expand Up @@ -359,6 +410,134 @@ func (r *TemporalWorkerDeploymentReconciler) markWRTsTWDNotFound(ctx context.Con
return errors.Join(errs...)
}

// handleDeletion cleans up Temporal server-side deployment versioning data when
// a TemporalWorkerDeployment CRD is deleted. This prevents stale build ID routing
// from blocking unversioned workers on the same task queue.
//
// The cleanup sequence:
// 1. Set the current version to "unversioned" (empty BuildID) so new tasks route to unversioned workers
// 2. Delete all non-current/non-ramping versions (drained/inactive ones)
Comment thread
anujagrawal380 marked this conversation as resolved.
Outdated
// 3. The deployment itself will be garbage collected by Temporal once all versions are removed
func (r *TemporalWorkerDeploymentReconciler) handleDeletion(
ctx context.Context,
l logr.Logger,
workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment,
) error {
// Resolve Temporal connection.
// The TemporalConnection is guaranteed to exist because we hold a finalizer on it
// that prevents deletion while any TWD references it.
var temporalConnection temporaliov1alpha1.TemporalConnection
if err := r.Get(ctx, types.NamespacedName{
Name: workerDeploy.Spec.WorkerOptions.TemporalConnectionRef.Name,
Namespace: workerDeploy.Namespace,
}, &temporalConnection); err != nil {
return fmt.Errorf("unable to fetch TemporalConnection: %w", err)
}

authMode, secretName, err := resolveAuthSecretName(&temporalConnection)
if err != nil {
return fmt.Errorf("unable to resolve auth secret name: %w", err)
}

temporalClient, ok := r.TemporalClientPool.GetSDKClient(clientpool.ClientPoolKey{
HostPort: temporalConnection.Spec.HostPort,
Namespace: workerDeploy.Spec.WorkerOptions.TemporalNamespace,
SecretName: secretName,
AuthMode: authMode,
})
if !ok {
clientOpts, key, clientAuth, err := r.TemporalClientPool.ParseClientSecret(ctx, secretName, authMode, clientpool.NewClientOptions{
K8sNamespace: workerDeploy.Namespace,
TemporalNamespace: workerDeploy.Spec.WorkerOptions.TemporalNamespace,
Spec: temporalConnection.Spec,
Identity: getControllerIdentity(),
})
if err != nil {
return fmt.Errorf("unable to parse Temporal auth secret: %w", err)
}
c, err := r.TemporalClientPool.DialAndUpsertClient(*clientOpts, *key, *clientAuth)
if err != nil {
return fmt.Errorf("unable to create TemporalClient: %w", err)
}
temporalClient = c
}

workerDeploymentName := k8s.ComputeWorkerDeploymentName(workerDeploy)
deploymentHandler := temporalClient.WorkerDeploymentClient().GetHandle(workerDeploymentName)

// Describe the deployment to get current state
resp, err := deploymentHandler.Describe(ctx, sdkclient.WorkerDeploymentDescribeOptions{})
if err != nil {
var notFound *serviceerror.NotFound
if errors.As(err, &notFound) {
l.Info("Worker Deployment not found on Temporal server, nothing to clean up")
return nil
}
return fmt.Errorf("unable to describe worker deployment: %w", err)
}

routingConfig := resp.Info.RoutingConfig

// Step 1: Set current version to unversioned (empty BuildID) so tasks route to unversioned workers.
// This is the critical step that unblocks task dispatch.
if routingConfig.CurrentVersion != nil {
l.Info("Setting current version to unversioned", "previousBuildID", routingConfig.CurrentVersion.BuildID)
if _, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{
BuildID: "", // empty = unversioned
ConflictToken: resp.ConflictToken,
Identity: getControllerIdentity(),
IgnoreMissingTaskQueues: true,
}); err != nil {
return fmt.Errorf("unable to set current version to unversioned: %w", err)
}
l.Info("Successfully set current version to unversioned")
} else {
l.Info("No current version set, skipping unversioned redirect")
}

// Step 2: If there's a ramping version, clear it.
if routingConfig.RampingVersion != nil {
l.Info("Clearing ramping version", "buildID", routingConfig.RampingVersion.BuildID)
if _, err := deploymentHandler.SetRampingVersion(ctx, sdkclient.WorkerDeploymentSetRampingVersionOptions{
BuildID: "",
Percentage: 0,
Identity: getControllerIdentity(),
}); err != nil {
l.Info("Failed to clear ramping version (may have been cleared by SetCurrentVersion)", "error", err)
}
Comment thread
anujagrawal380 marked this conversation as resolved.
Outdated
} else {
l.Info("No ramping version set, skipping clear ramping version")
}
Comment thread
anujagrawal380 marked this conversation as resolved.
Outdated

// Step 3: Delete versions that are eligible. Versions that are still draining
// are force-deleted with SkipDrainage since the TWD is being removed entirely.
// If any version fails to delete (e.g. active pollers), return an error so the
// reconciler requeues. Pollers disappear once pods terminate and the next
// reconciliation will succeed.
for _, version := range resp.Info.VersionSummaries {
buildID := version.Version.BuildID
l.Info("Deleting worker deployment version", "buildID", buildID)
if _, err := deploymentHandler.DeleteVersion(ctx, sdkclient.WorkerDeploymentDeleteVersionOptions{
BuildID: buildID,
SkipDrainage: true,
Identity: getControllerIdentity(),
}); err != nil {
return fmt.Errorf("unable to delete version %s (will retry): %w", buildID, err)
}
}

// Step 4: Delete the deployment itself. This only succeeds if all versions are gone.
l.Info("Attempting to delete worker deployment from Temporal server", "name", workerDeploymentName)
if _, err := temporalClient.WorkerDeploymentClient().Delete(ctx, sdkclient.WorkerDeploymentDeleteOptions{
Name: workerDeploymentName,
Identity: getControllerIdentity(),
}); err != nil {
return fmt.Errorf("unable to delete worker deployment %s (will retry): %w", workerDeploymentName, err)
}

return nil
}

// setCondition sets a condition on the TemporalWorkerDeployment status.
func (r *TemporalWorkerDeploymentReconciler) setCondition(
workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment,
Expand Down Expand Up @@ -446,6 +625,74 @@ func (r *TemporalWorkerDeploymentReconciler) recordWarningAndSetBlocked(
_ = r.Status().Update(ctx, workerDeploy)
}

// ensureConnectionFinalizer adds our finalizer to the TemporalConnection so it
// cannot be deleted while this TWD still needs it for cleanup.
func (r *TemporalWorkerDeploymentReconciler) ensureConnectionFinalizer(
ctx context.Context,
l logr.Logger,
tc *temporaliov1alpha1.TemporalConnection,
) error {
if !controllerutil.ContainsFinalizer(tc, finalizerName) {
l.Info("Adding finalizer to TemporalConnection", "connection", tc.Name)
controllerutil.AddFinalizer(tc, finalizerName)
if err := r.Update(ctx, tc); err != nil {
return fmt.Errorf("unable to add finalizer to TemporalConnection %q: %w", tc.Name, err)
}
}
return nil
}

// removeConnectionFinalizerIfUnused removes our finalizer from the TemporalConnection
// if no other TWDs (besides the one being deleted) still reference it.
func (r *TemporalWorkerDeploymentReconciler) removeConnectionFinalizerIfUnused(
ctx context.Context,
l logr.Logger,
deletingTWD *temporaliov1alpha1.TemporalWorkerDeployment,
) error {
connectionName := deletingTWD.Spec.WorkerOptions.TemporalConnectionRef.Name

// List all TWDs in the same namespace
var twds temporaliov1alpha1.TemporalWorkerDeploymentList
if err := r.List(ctx, &twds, client.InNamespace(deletingTWD.Namespace)); err != nil {
return fmt.Errorf("unable to list TWDs: %w", err)
}

// Check if any other TWD (not the one being deleted) references this connection
for i := range twds.Items {
twd := &twds.Items[i]
if twd.Name == deletingTWD.Name {
continue
}
if twd.Spec.WorkerOptions.TemporalConnectionRef.Name == connectionName {
l.Info("TemporalConnection still referenced by another TWD, keeping finalizer",
"connection", connectionName, "referencedBy", twd.Name)
return nil
}
}

// No other TWDs reference this connection, remove the finalizer
var tc temporaliov1alpha1.TemporalConnection
if err := r.Get(ctx, types.NamespacedName{
Name: connectionName,
Namespace: deletingTWD.Namespace,
}, &tc); err != nil {
if apierrors.IsNotFound(err) {
return nil // already gone
}
return fmt.Errorf("unable to fetch TemporalConnection %q: %w", connectionName, err)
}

if controllerutil.ContainsFinalizer(&tc, finalizerName) {
l.Info("Removing finalizer from TemporalConnection", "connection", connectionName)
controllerutil.RemoveFinalizer(&tc, finalizerName)
if err := r.Update(ctx, &tc); err != nil {
return fmt.Errorf("unable to remove finalizer from TemporalConnection %q: %w", connectionName, err)
}
}

return nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *TemporalWorkerDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
if err := mgr.GetFieldIndexer().IndexField(context.Background(), &appsv1.Deployment{}, deployOwnerKey, func(rawObj client.Object) []string {
Expand Down
Loading