Skip to content
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package main

import (
"context"
"flag"
"log/slog"
"os"
Expand All @@ -13,7 +14,9 @@ import (
"github.com/temporalio/temporal-worker-controller/internal/controller"
"github.com/temporalio/temporal-worker-controller/internal/controller/clientpool"
"go.temporal.io/sdk/log"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
Expand Down Expand Up @@ -112,6 +115,25 @@ func main() {
os.Exit(1)
}

if os.Getenv(controller.IdentityEnvKey) == "" {
setupLog.Error(nil, "CONTROLLER_IDENTITY environment variable must be set")
os.Exit(1)
}
podNamespace := os.Getenv("POD_NAMESPACE")
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

already needed for WRT webhook (SA checks)

if podNamespace == "" {
setupLog.Error(nil, "POD_NAMESPACE environment variable must be set")
os.Exit(1)
}
var ns corev1.Namespace
if err := mgr.GetAPIReader().Get(context.Background(), types.NamespacedName{Name: podNamespace}, &ns); err != nil {
setupLog.Error(err, "unable to fetch namespace UID for controller identity")
os.Exit(1)
}
if err := os.Setenv(controller.IdentityEnvKey, os.Getenv(controller.IdentityEnvKey)+"/"+string(ns.UID)); err != nil {
setupLog.Error(err, "unable to set CONTROLLER_IDENTITY")
os.Exit(1)
}
Comment on lines +132 to +135
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so i have a feeling that this won't work only because we don't have the worker-controller's RBAC policies updated

i checked here and noticed that the clusterRole does not have namespaces in it

my feeling is that this call would then fail but happy to be proven wrong!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note - i just found out that envTest also won't test this since it's spins up a cluster with a permissive auth mode

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should just add describe namespace permission to the auth then, right?


setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
Expand Down
2 changes: 1 addition & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Technical constraints and limitations of the Temporal Worker Controller system,
Comprehensive guide for migrating from existing unversioned worker deployment systems to the Temporal Worker Controller. Includes step-by-step instructions, configuration mapping, and common patterns.
See [Migration to Unversioned](migration-to-unversioned.md) for how to migrate back to an unversioned deployment system.

### [Ownership](ownership.md)
### [Ownership](manager-identity.md)
How the controller gets permission to manage a Worker Deployment, how a human client can take or give back control.

### [WorkerResourceTemplate](worker-resource-templates.md)
Expand Down
2 changes: 1 addition & 1 deletion docs/ownership.md → docs/manager-identity.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Ownership Transfer in the Worker Controller
# Manager Identity and Ownership Transfer in the Worker Controller

## Problem

Expand Down
31 changes: 26 additions & 5 deletions internal/controller/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/go-logr/logr"
temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
"github.com/temporalio/temporal-worker-controller/internal/defaults"
"github.com/temporalio/temporal-worker-controller/internal/k8s"
"github.com/temporalio/temporal-worker-controller/internal/planner"
enumspb "go.temporal.io/api/enums/v1"
Expand Down Expand Up @@ -184,7 +185,19 @@ func (r *TemporalWorkerDeploymentReconciler) startTestWorkflows(ctx context.Cont
}

func (r *TemporalWorkerDeploymentReconciler) shouldClaimManagerIdentity(vcfg *planner.VersionConfig) bool {
return vcfg.ManagerIdentity == ""
existing := vcfg.ManagerIdentity
if existing == "" {
return true // unclaimed
}
// Handle Worker Deployments that were controller-managed before we
// started recording the cluster-UID in the manager identity.
if existing == defaults.DeprecatedDefaultControllerIdentity {
return true // pre-Helm hardcoded default
}
// Pre-cluster-UID format was "release/namespace"; new format is
// "release/namespace/{namespace-uid}" (UID appended by main() at startup).
// Reclaim if ours is a longer version of theirs.
return strings.HasPrefix(getControllerIdentity(), existing+"/")
}

func (r *TemporalWorkerDeploymentReconciler) claimManagerIdentity(
Expand All @@ -194,18 +207,26 @@ func (r *TemporalWorkerDeploymentReconciler) claimManagerIdentity(
deploymentHandler sdkclient.WorkerDeploymentHandle,
vcfg *planner.VersionConfig,
) error {
identity := getControllerIdentity()
if identity == "" {
// Passing an empty identity to SetManagerIdentity clears the field on the
// Worker Deployment, leaving it ownerless. Refuse rather than cause that.
// This should never happen, but this is the extra fallback in case somehow
// the check in main() and Reconcile() are not sufficient.
return errors.New("CONTROLLER_IDENTITY is not set; refusing to call SetManagerIdentity to avoid clearing the manager identity field")
}
resp, err := deploymentHandler.SetManagerIdentity(ctx, sdkclient.WorkerDeploymentSetManagerIdentityOptions{
Self: true,
ConflictToken: vcfg.ConflictToken,
Identity: getControllerIdentity(),
Identity: identity,
})
if err != nil {
l.Error(err, "unable to claim manager identity")
r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonManagerIdentityClaimFailed,
"Failed to claim manager identity: %v", err)
return err
}
l.Info("claimed manager identity", "identity", getControllerIdentity())
l.Info("claimed manager identity", "identity", identity)
// Use the updated conflict token for the subsequent routing config change.
vcfg.ConflictToken = resp.ConflictToken
return nil
Expand Down Expand Up @@ -275,8 +296,8 @@ func (r *TemporalWorkerDeploymentReconciler) updateVersionConfig(ctx context.Con
},
MetadataUpdate: sdkclient.WorkerDeploymentMetadataUpdate{
UpsertEntries: map[string]interface{}{
controllerIdentityMetadataKey: getControllerIdentity(),
controllerVersionMetadataKey: getControllerVersion(),
IdentityMetadataKey: getControllerIdentity(),
VersionMetadataKey: getControllerVersion(),
},
},
}); err != nil { // would be cool to do this atomically with the update
Expand Down
6 changes: 6 additions & 0 deletions internal/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package controller

import (
"os"
"path/filepath"
"testing"

Expand All @@ -26,6 +27,11 @@ var cfg *rest.Config
var k8sClient client.Client
var testEnv *envtest.Environment

func TestMain(m *testing.M) {
_ = os.Setenv(IdentityEnvKey, "test-controller-identity")
os.Exit(m.Run())
}

func TestControllers(t *testing.T) {
RegisterFailHandler(Fail)

Expand Down
24 changes: 12 additions & 12 deletions internal/controller/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ const (
)

const (
controllerIdentityMetadataKey = "temporal.io/controller"
controllerVersionMetadataKey = "temporal.io/controller-version"
IdentityMetadataKey = "temporal.io/controller"
VersionMetadataKey = "temporal.io/controller-version"

controllerVersionEnvKey = "CONTROLLER_VERSION"
controllerIdentityEnvKey = "CONTROLLER_IDENTITY"
ControllerMaxDeploymentVersionsIneligibleForDeletionEnvKey = "CONTROLLER_MAX_DEPLOYMENT_VERSIONS_INELIGIBLE_FOR_DELETION"
VersionEnvKey = "CONTROLLER_VERSION"
IdentityEnvKey = "CONTROLLER_IDENTITY"
MaxDeploymentVersionsIneligibleForDeletionEnvKey = "CONTROLLER_MAX_DEPLOYMENT_VERSIONS_INELIGIBLE_FOR_DELETION"

serverDeleteVersionIdentity = "try-delete-for-add-version"
)
Expand All @@ -51,22 +51,22 @@ func getControllerVersion() string {
return Version
}
// Fall back to environment variable (set by Helm from image.tag)
if version := os.Getenv(controllerVersionEnvKey); version != "" {
if version := os.Getenv(VersionEnvKey); version != "" {
return version
}
return "unknown"
}

// getControllerIdentity returns the identity from environment variable (set by Helm)
// getControllerIdentity returns the identity from environment variable (set by Helm).
// Returns empty string if unset. main() enforces this at startup, but that check is
// bypassed if the reconciler is used as a library (e.g. embedded in another controller
// manager or in tests). An empty return means the env var was not set before starting.
func getControllerIdentity() string {
if identity := os.Getenv(controllerIdentityEnvKey); identity != "" {
return identity
}
return defaults.ControllerIdentity
return os.Getenv(IdentityEnvKey)
}

func GetControllerMaxDeploymentVersionsIneligibleForDeletion() int32 {
if maxStr := os.Getenv(ControllerMaxDeploymentVersionsIneligibleForDeletionEnvKey); maxStr != "" {
if maxStr := os.Getenv(MaxDeploymentVersionsIneligibleForDeletionEnvKey); maxStr != "" {
i, err := strconv.Atoi(maxStr)
if err == nil {
return int32(i)
Expand Down
8 changes: 8 additions & 0 deletions internal/controller/worker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,14 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req
defer cancel()

l := log.FromContext(ctx)

// Fallback identity check for when the reconciler is used as a library and
// main() is not in the call path. main() is kept as the primary check for
// faster feedback in normal Helm-based deployments.
if getControllerIdentity() == "" {
return ctrl.Result{}, errors.New("CONTROLLER_IDENTITY environment variable is not set")
}

l.V(1).Info("Running Reconcile loop")

// Fetch the worker deployment
Expand Down
5 changes: 4 additions & 1 deletion internal/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,8 @@ const (
DeleteDelay = 24 * time.Hour
ServerMaxVersions = 100
MaxVersionsIneligibleForDeletion = int32(ServerMaxVersions * 0.75)
ControllerIdentity = "temporal-worker-controller"

// DeprecatedDefaultControllerIdentity is no longer used but kept to detect if a Worker Deployment
// was using it before, so that we can claim ownership with the new identity format.
DeprecatedDefaultControllerIdentity = "temporal-worker-controller"
)
4 changes: 3 additions & 1 deletion internal/tests/internal/env_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
testDrainageRefreshInterval = time.Second
testMaxVersionsIneligibleForDeletion = 5
testMaxVersionsInDeployment = 6
testControllerIdentity = "test-controller-identity"
)

// setupKubebuilderAssets sets up the KUBEBUILDER_ASSETS environment variable if not already set
Expand Down Expand Up @@ -95,12 +96,13 @@ func getRepoRoot(t *testing.T) string {
func setupTestEnvironment(t *testing.T) (*rest.Config, client.Client, manager.Manager, *clientpool.ClientPool, func()) {
// Set faster reconcile interval for testing
t.Setenv("RECONCILE_INTERVAL", "1s")
t.Setenv(controller.IdentityEnvKey, testControllerIdentity)
if kubeAssets := os.Getenv("KUBEBUILDER_ASSETS"); kubeAssets == "" {
t.Skip("Skipping because KUBEBUILDER_ASSETS not set")
}

// set max versions value for testing
t.Setenv(controller.ControllerMaxDeploymentVersionsIneligibleForDeletionEnvKey, fmt.Sprintf("%d", testMaxVersionsIneligibleForDeletion))
t.Setenv(controller.MaxDeploymentVersionsIneligibleForDeletionEnvKey, fmt.Sprintf("%d", testMaxVersionsIneligibleForDeletion))

// Setup kubebuilder assets for IDE testing
if err := setupKubebuilderAssets(); err != nil {
Expand Down
5 changes: 2 additions & 3 deletions internal/tests/internal/validation_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
"github.com/temporalio/temporal-worker-controller/internal/defaults"
"github.com/temporalio/temporal-worker-controller/internal/k8s"
"github.com/temporalio/temporal-worker-controller/internal/testhelpers"
sdkclient "go.temporal.io/sdk/client"
Expand Down Expand Up @@ -129,7 +128,7 @@ func setCurrentVersion(
eventually(t, 60*time.Second, time.Second, func() error {
_, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{
BuildID: buildID,
Identity: defaults.ControllerIdentity,
Identity: testControllerIdentity,
})
if err != nil {
return fmt.Errorf("unable to set build '%s' as current of worker deployment %s: %w", buildID, workerDeploymentName, err)
Expand Down Expand Up @@ -157,7 +156,7 @@ func setRampingVersion(
_, err := deploymentHandler.SetRampingVersion(ctx, sdkclient.WorkerDeploymentSetRampingVersionOptions{
BuildID: buildID,
Percentage: rampPercentage,
Identity: defaults.ControllerIdentity,
Identity: testControllerIdentity,
})
if err != nil {
return fmt.Errorf("unable to set build '%s' as ramping of worker deployment %s: %w", buildID, workerDeploymentName, err)
Expand Down
Loading