Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@
package main

import (
"context"
"flag"
"fmt"
"log/slog"
"os"

temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
"github.com/temporalio/temporal-worker-controller/internal/controller"
"github.com/temporalio/temporal-worker-controller/internal/controller/clientpool"
"go.temporal.io/sdk/log"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
Expand Down Expand Up @@ -112,6 +116,21 @@ func main() {
os.Exit(1)
}

podNamespace := os.Getenv("POD_NAMESPACE")
if podNamespace == "" {
setupLog.Error(nil, "POD_NAMESPACE environment variable must be set")
os.Exit(1)
}
var ns corev1.Namespace
if err := mgr.GetAPIReader().Get(context.Background(), types.NamespacedName{Name: podNamespace}, &ns); err != nil {
setupLog.Error(err, "unable to fetch namespace UID for controller identity suffix")
os.Exit(1)
}
if err := os.Setenv(controller.IdentitySuffixEnvKey, string(ns.UID)); err != nil {
setupLog.Error(err, fmt.Sprintf("unable to set %s", controller.IdentitySuffixEnvKey))
os.Exit(1)
}

setupLog.Info("starting manager")
if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil {
setupLog.Error(err, "problem running manager")
Expand Down
2 changes: 1 addition & 1 deletion docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ Technical constraints and limitations of the Temporal Worker Controller system,
Comprehensive guide for migrating from existing unversioned worker deployment systems to the Temporal Worker Controller. Includes step-by-step instructions, configuration mapping, and common patterns.
See [Migration to Unversioned](migration-to-unversioned.md) for how to migrate back to an unversioned deployment system.

### [Ownership](ownership.md)
### [Ownership](manager-identity.md)
How the controller gets permission to manage a Worker Deployment, how a human client can take or give back control.

### [WorkerResourceTemplate](worker-resource-templates.md)
Expand Down
2 changes: 1 addition & 1 deletion docs/ownership.md → docs/manager-identity.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Ownership Transfer in the Worker Controller
# Manager Identity and Ownership Transfer in the Worker Controller

## Problem

Expand Down
6 changes: 6 additions & 0 deletions helm/temporal-worker-controller/templates/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,12 @@ rules:
verbs:
- create
- patch
- apiGroups:
- ""
resources:
- namespaces
verbs:
- get
- apiGroups:
- ""
resources:
Expand Down
21 changes: 16 additions & 5 deletions internal/controller/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,17 @@ func (r *TemporalWorkerDeploymentReconciler) startTestWorkflows(ctx context.Cont
}

func (r *TemporalWorkerDeploymentReconciler) shouldClaimManagerIdentity(vcfg *planner.VersionConfig) bool {
return vcfg.ManagerIdentity == ""
existing := vcfg.ManagerIdentity
if existing == "" {
return true // unclaimed
}
// In the next release, the namespace UID will be included in the controller identity.
// To support smooth rollback, in this release, we will detect the future format and
// treat that as a reclaimable claim.
if existing == getControllerIdentityWithNamespaceUID() {
return true
}
return false
}

func (r *TemporalWorkerDeploymentReconciler) claimManagerIdentity(
Expand All @@ -194,18 +204,19 @@ func (r *TemporalWorkerDeploymentReconciler) claimManagerIdentity(
deploymentHandler sdkclient.WorkerDeploymentHandle,
vcfg *planner.VersionConfig,
) error {
identity := getControllerIdentity()
resp, err := deploymentHandler.SetManagerIdentity(ctx, sdkclient.WorkerDeploymentSetManagerIdentityOptions{
Self: true,
ConflictToken: vcfg.ConflictToken,
Identity: getControllerIdentity(),
Identity: identity,
})
if err != nil {
l.Error(err, "unable to claim manager identity")
r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonManagerIdentityClaimFailed,
"Failed to claim manager identity: %v", err)
return err
}
l.Info("claimed manager identity", "identity", getControllerIdentity())
l.Info("claimed manager identity", "identity", identity)
// Use the updated conflict token for the subsequent routing config change.
vcfg.ConflictToken = resp.ConflictToken
return nil
Expand Down Expand Up @@ -275,8 +286,8 @@ func (r *TemporalWorkerDeploymentReconciler) updateVersionConfig(ctx context.Con
},
MetadataUpdate: sdkclient.WorkerDeploymentMetadataUpdate{
UpsertEntries: map[string]interface{}{
controllerIdentityMetadataKey: getControllerIdentity(),
controllerVersionMetadataKey: getControllerVersion(),
IdentityMetadataKey: getControllerIdentity(),
VersionMetadataKey: getControllerVersion(),
},
},
}); err != nil { // would be cool to do this atomically with the update
Expand Down
6 changes: 6 additions & 0 deletions internal/controller/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package controller

import (
"os"
"path/filepath"
"testing"

Expand All @@ -26,6 +27,11 @@ var cfg *rest.Config
var k8sClient client.Client
var testEnv *envtest.Environment

func TestMain(m *testing.M) {
_ = os.Setenv(IdentityEnvKey, "test-controller-identity")
os.Exit(m.Run())
}

func TestControllers(t *testing.T) {
RegisterFailHandler(Fail)

Expand Down
30 changes: 20 additions & 10 deletions internal/controller/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,13 @@ const (
)

const (
controllerIdentityMetadataKey = "temporal.io/controller"
controllerVersionMetadataKey = "temporal.io/controller-version"
IdentityMetadataKey = "temporal.io/controller"
VersionMetadataKey = "temporal.io/controller-version"

controllerVersionEnvKey = "CONTROLLER_VERSION"
controllerIdentityEnvKey = "CONTROLLER_IDENTITY"
ControllerMaxDeploymentVersionsIneligibleForDeletionEnvKey = "CONTROLLER_MAX_DEPLOYMENT_VERSIONS_INELIGIBLE_FOR_DELETION"
VersionEnvKey = "CONTROLLER_VERSION"
IdentityEnvKey = "CONTROLLER_IDENTITY"
IdentitySuffixEnvKey = "CONTROLLER_IDENTITY_SUFFIX"
MaxDeploymentVersionsIneligibleForDeletionEnvKey = "CONTROLLER_MAX_DEPLOYMENT_VERSIONS_INELIGIBLE_FOR_DELETION"

serverDeleteVersionIdentity = "try-delete-for-add-version"
)
Expand All @@ -51,22 +52,31 @@ func getControllerVersion() string {
return Version
}
// Fall back to environment variable (set by Helm from image.tag)
if version := os.Getenv(controllerVersionEnvKey); version != "" {
if version := os.Getenv(VersionEnvKey); version != "" {
return version
}
return "unknown"
}

// getControllerIdentity returns the identity from environment variable (set by Helm)
// getControllerIdentity returns the identity from environment variable (set by Helm).
// Returns empty string if unset. main() enforces this at startup, but that check is
// bypassed if the reconciler is used as a library (e.g. embedded in another controller
// manager or in tests). An empty return means the env var was not set before starting.
func getControllerIdentity() string {
if identity := os.Getenv(controllerIdentityEnvKey); identity != "" {
if identity := os.Getenv(IdentityEnvKey); identity != "" {
return identity
}
return defaults.ControllerIdentity
return defaults.ToBeDeprecatedDefaultControllerIdentity
}

// getControllerIdentityWithNamespaceUID returns the identity which will be used in the
// next release. Used in this release for smooth rollback identity reclamation.
func getControllerIdentityWithNamespaceUID() string {
return getControllerIdentity() + "/" + os.Getenv(IdentitySuffixEnvKey)
}

func GetControllerMaxDeploymentVersionsIneligibleForDeletion() int32 {
if maxStr := os.Getenv(ControllerMaxDeploymentVersionsIneligibleForDeletionEnvKey); maxStr != "" {
if maxStr := os.Getenv(MaxDeploymentVersionsIneligibleForDeletionEnvKey); maxStr != "" {
i, err := strconv.Atoi(maxStr)
if err == nil {
return int32(i)
Expand Down
2 changes: 2 additions & 0 deletions internal/controller/worker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type TemporalWorkerDeploymentReconciler struct {
// +kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments/finalizers,verbs=update
// +kubebuilder:rbac:groups=temporal.io,resources=temporalconnections,verbs=get;list;watch;update;patch
// +kubebuilder:rbac:groups=temporal.io,resources=temporalconnections/finalizers,verbs=update
// +kubebuilder:rbac:groups=core,resources=namespaces,verbs=get
// +kubebuilder:rbac:groups=core,resources=secrets,verbs=get;list;watch
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps,resources=deployments/scale,verbs=update
Expand All @@ -133,6 +134,7 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req
defer cancel()

l := log.FromContext(ctx)

l.V(1).Info("Running Reconcile loop")

// Fetch the worker deployment
Expand Down
4 changes: 3 additions & 1 deletion internal/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,7 @@ const (
DeleteDelay = 24 * time.Hour
ServerMaxVersions = 100
MaxVersionsIneligibleForDeletion = int32(ServerMaxVersions * 0.75)
ControllerIdentity = "temporal-worker-controller"

// ToBeDeprecatedDefaultControllerIdentity will stop being used in the next release.
ToBeDeprecatedDefaultControllerIdentity = "temporal-worker-controller"
)
4 changes: 3 additions & 1 deletion internal/tests/internal/env_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
testDrainageRefreshInterval = time.Second
testMaxVersionsIneligibleForDeletion = 5
testMaxVersionsInDeployment = 6
testControllerIdentity = "test-controller-identity"
)

// setupKubebuilderAssets sets up the KUBEBUILDER_ASSETS environment variable if not already set
Expand Down Expand Up @@ -95,12 +96,13 @@ func getRepoRoot(t *testing.T) string {
func setupTestEnvironment(t *testing.T) (*rest.Config, client.Client, manager.Manager, *clientpool.ClientPool, func()) {
// Set faster reconcile interval for testing
t.Setenv("RECONCILE_INTERVAL", "1s")
t.Setenv(controller.IdentityEnvKey, testControllerIdentity)
if kubeAssets := os.Getenv("KUBEBUILDER_ASSETS"); kubeAssets == "" {
t.Skip("Skipping because KUBEBUILDER_ASSETS not set")
}

// set max versions value for testing
t.Setenv(controller.ControllerMaxDeploymentVersionsIneligibleForDeletionEnvKey, fmt.Sprintf("%d", testMaxVersionsIneligibleForDeletion))
t.Setenv(controller.MaxDeploymentVersionsIneligibleForDeletionEnvKey, fmt.Sprintf("%d", testMaxVersionsIneligibleForDeletion))

// Setup kubebuilder assets for IDE testing
if err := setupKubebuilderAssets(); err != nil {
Expand Down
5 changes: 2 additions & 3 deletions internal/tests/internal/validation_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"time"

temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
"github.com/temporalio/temporal-worker-controller/internal/defaults"
"github.com/temporalio/temporal-worker-controller/internal/k8s"
"github.com/temporalio/temporal-worker-controller/internal/testhelpers"
sdkclient "go.temporal.io/sdk/client"
Expand Down Expand Up @@ -129,7 +128,7 @@ func setCurrentVersion(
eventually(t, 60*time.Second, time.Second, func() error {
_, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{
BuildID: buildID,
Identity: defaults.ControllerIdentity,
Identity: testControllerIdentity,
})
if err != nil {
return fmt.Errorf("unable to set build '%s' as current of worker deployment %s: %w", buildID, workerDeploymentName, err)
Expand Down Expand Up @@ -157,7 +156,7 @@ func setRampingVersion(
_, err := deploymentHandler.SetRampingVersion(ctx, sdkclient.WorkerDeploymentSetRampingVersionOptions{
BuildID: buildID,
Percentage: rampPercentage,
Identity: defaults.ControllerIdentity,
Identity: testControllerIdentity,
})
if err != nil {
return fmt.Errorf("unable to set build '%s' as ramping of worker deployment %s: %w", buildID, workerDeploymentName, err)
Expand Down
Loading