Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/v1alpha1/worker_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ type TemporalWorkerDeploymentStatus struct {
// +optional
LastModifierIdentity string `json:"lastModifierIdentity,omitempty"`

// ManagerIdentity is the identity that has exclusive rights to modify this Worker Deployment's routing config.
// When set, clients whose identity does not match will be blocked from making routing changes.
// Empty by default. Use `temporal worker deployment manager-identity set/unset` to change.
// +optional
ManagerIdentity string `json:"managerIdentity,omitempty"`

// VersionCount is the total number of versions currently known by the worker deployment.
// This includes current, target, ramping, and deprecated versions.
// +optional
Expand Down
73 changes: 34 additions & 39 deletions docs/ownership.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,57 +2,52 @@

## Problem

If a worker controller is managing a Worker Deployment (ie. the controller is updating the RoutingConfig of the Worker
Deployment), but the user changes something via the CLI (ie. rolls back to the previous current version, or stops the
new target version from ramping because an issue was detected), the controller should not clobber what the human did.
If the worker controller is managing a Worker Deployment (i.e. updating its routing config), but a user makes a manual
change via the CLI, SDK, or gRPC API instead of via the `TemporalWorkerDeployment` CRD interface, the controller should
not clobber the user's change.

At some point, after this human has handled their urgent rollback, they will want to let the controller know that it is
authorized to resume making changes to the Routing Config of the Worker Deployment.
Once the user has finished their manual intervention, they need a way to hand ownership back to the controller.

## Solution

_Once it is available in OSS v1.29, the controller will be able to coordinate with other users via the `ManagerIdentity`
field of a Worker Deployment. This runbook will be updated when that is available and implemented by the controller._
The controller uses the Temporal server's `ManagerIdentity` field on Worker Deployments to coordinate exclusive
ownership of routing changes.

In the meantime, the controller will watch the `LastModifierIdentity` field of a Worker Deployment to detect whether
another user has made a change. If another user made a change to the Worker Deployment, the controller will not make
any more changes to ensure a human's change is not clobbered.
When `ManagerIdentity` is set on a Worker Deployment, only clients whose identity matches `ManagerIdentity` can make
routing changes (set current version, set ramping version). The controller's identity is visible in the
`managerIdentity` field of the `TemporalWorkerDeployment` status.

Once you are done making your own changes to the Worker Deployment's current and ramping versions, and you are ready
for the Worker Controller to take over, you can update the metadata to indicate that.
### How the controller claims ownership

There is no Temporal server support for Worker Deployment Version-level metadata, so you'll have to set this value on
the Current Version of your Worker Deployment.
The first time the controller plans a routing change for a Worker Deployment (i.e. when `ManagerIdentity` is empty),
it calls `SetManagerIdentity` to claim ownership before applying the change. Subsequent routing changes succeed because
the controller's identity already matches `ManagerIdentity`.

Note: The controller decodes this metadata value as a string. Be sure to set the value to the string "true" (not the boolean true).
### Taking manual control

To take manual control away from the controller, set `ManagerIdentity` to your own identity:

```bash
temporal worker deployment update-metadata-version \
--deployment-name $MY_DEPLOYMENT \
--build-id $CURRENT_VERSION_BUILD_ID \
--metadata 'temporal.io/ignore-last-modifier="true"'
```
Alternatively, if your CLI supports JSON input:
```bash
temporal worker deployment update-metadata-version \
--deployment-name $MY_DEPLOYMENT \
--build-id $CURRENT_VERSION_BUILD_ID \
--metadata-json '{"temporal.io/ignore-last-modifier":"true"}'
```
In the rare case that you have a nil Current Version when you are passing back ownership, you should set it on your Ramping Version
```bash
temporal worker deployment update-metadata-version \
temporal worker deployment manager-identity set \
--deployment-name $MY_DEPLOYMENT \
--build-id $RAMPING_VERSION_BUILD_ID \
--metadata 'temporal.io/ignore-last-modifier="true"'
--self
```
Or with JSON:

The `--self` flag sets `ManagerIdentity` to the identity of the caller (auto-generated by the CLI if not explicitly
provided via `--identity`; similarly, the SDK uses its own auto-generated or configured identity). After this, the
controller's routing change attempts will fail and it will retry on a backoff until ownership is returned.

You can then make routing changes freely (the server enforces `ManagerIdentity` for all clients, not just the
controller).

### Returning ownership to the controller

When you are done with your manual changes and want the controller to resume, clear `ManagerIdentity`:

```bash
temporal worker deployment update-metadata-version \
--deployment-name $MY_DEPLOYMENT \
--build-id $RAMPING_VERSION_BUILD_ID \
--metadata-json '{"temporal.io/ignore-last-modifier":"true"}'
temporal worker deployment manager-identity unset \
--deployment-name $MY_DEPLOYMENT
```

In the even rarer case that you have nil Current Version and nil Ramping Version, you'll need to use the CLI or SDK to
set a Current or Ramping Version and then do as instructed above.
On the next reconcile, the controller will detect that `ManagerIdentity` is empty, claim it for itself, and resume
managing routing changes.
Original file line number Diff line number Diff line change
Expand Up @@ -4098,6 +4098,8 @@ spec:
type: array
lastModifierIdentity:
type: string
managerIdentity:
type: string
targetVersion:
properties:
buildID:
Expand Down
4 changes: 4 additions & 0 deletions internal/controller/clientpool/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,15 @@ type NewClientOptions struct {
TemporalNamespace string
K8sNamespace string
Spec v1alpha1.TemporalConnectionSpec
Identity string
}

func (cp *ClientPool) fetchClientUsingMTLSSecret(secret corev1.Secret, opts NewClientOptions) (*sdkclient.Options, *ClientPoolKey, *ClientAuth, error) {
clientOpts := sdkclient.Options{
Logger: cp.logger,
HostPort: opts.Spec.HostPort,
Namespace: opts.TemporalNamespace,
Identity: opts.Identity,
}

var pemCert []byte
Expand Down Expand Up @@ -170,6 +172,7 @@ func (cp *ClientPool) fetchClientUsingAPIKeySecret(secret corev1.Secret, opts Ne
Logger: cp.logger,
HostPort: opts.Spec.HostPort,
Namespace: opts.TemporalNamespace,
Identity: opts.Identity,
ConnectionOptions: sdkclient.ConnectionOptions{
TLS: &tls.Config{},
},
Expand Down Expand Up @@ -198,6 +201,7 @@ func (cp *ClientPool) fetchClientUsingNoCredentials(opts NewClientOptions) (*sdk
Logger: cp.logger,
HostPort: opts.Spec.HostPort,
Namespace: opts.TemporalNamespace,
Identity: opts.Identity,
}

key := ClientPoolKey{
Expand Down
51 changes: 35 additions & 16 deletions internal/controller/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

"github.com/go-logr/logr"
temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
"github.com/temporalio/temporal-worker-controller/internal/temporal"
"github.com/temporalio/temporal-worker-controller/internal/planner"
enumspb "go.temporal.io/api/enums/v1"
sdkclient "go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
Expand Down Expand Up @@ -150,12 +150,46 @@ func (r *TemporalWorkerDeploymentReconciler) startTestWorkflows(ctx context.Cont
return nil
}

func (r *TemporalWorkerDeploymentReconciler) shouldClaimManagerIdentity(vcfg *planner.VersionConfig) bool {
return vcfg.ManagerIdentity == ""
}

func (r *TemporalWorkerDeploymentReconciler) claimManagerIdentity(
ctx context.Context,
l logr.Logger,
workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment,
deploymentHandler sdkclient.WorkerDeploymentHandle,
vcfg *planner.VersionConfig,
) error {
resp, err := deploymentHandler.SetManagerIdentity(ctx, sdkclient.WorkerDeploymentSetManagerIdentityOptions{
Self: true,
ConflictToken: vcfg.ConflictToken,
Identity: getControllerIdentity(),
})
if err != nil {
l.Error(err, "unable to claim manager identity")
r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonManagerIdentityClaimFailed,
"Failed to claim manager identity: %v", err)
return err
}
l.Info("claimed manager identity", "identity", getControllerIdentity())
// Use the updated conflict token for the subsequent routing config change.
vcfg.ConflictToken = resp.ConflictToken
return nil
}

func (r *TemporalWorkerDeploymentReconciler) updateVersionConfig(ctx context.Context, l logr.Logger, workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, deploymentHandler sdkclient.WorkerDeploymentHandle, p *plan) error {
vcfg := p.UpdateVersionConfig
if vcfg == nil {
return nil
}

if r.shouldClaimManagerIdentity(vcfg) {
if err := r.claimManagerIdentity(ctx, l, workerDeploy, deploymentHandler, vcfg); err != nil {
return fmt.Errorf("unable to claim manager identity: %w", err)
}
}

if vcfg.SetCurrent {
l.Info("registering new current version", "buildID", vcfg.BuildID)
if _, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{
Expand Down Expand Up @@ -226,20 +260,5 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
return err
}

for _, buildId := range p.RemoveIgnoreLastModifierBuilds {
if _, err := deploymentHandler.UpdateVersionMetadata(ctx, sdkclient.WorkerDeploymentUpdateVersionMetadataOptions{
Version: worker.WorkerDeploymentVersion{
DeploymentName: p.WorkerDeploymentName,
BuildID: buildId,
},
MetadataUpdate: sdkclient.WorkerDeploymentMetadataUpdate{
RemoveEntries: []string{temporal.IgnoreLastModifierKey},
},
}); err != nil {
l.Error(err, "unable to remove ignore-last-modifier metadata", "buildID", buildId)
return fmt.Errorf("unable to update metadata to remove %s deployment: %w", temporal.IgnoreLastModifierKey, err)
}
}

return nil
}
14 changes: 0 additions & 14 deletions internal/controller/genplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ type plan struct {

// Start a workflow
startTestWorkflows []startWorkflowConfig

// Build IDs of versions from which the controller should
// remove IgnoreLastModifierKey from the version metadata
RemoveIgnoreLastModifierBuilds []string
}

// startWorkflowConfig defines a workflow to be started
Expand Down Expand Up @@ -80,15 +76,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
ScaleDeployments: make(map[*corev1.ObjectReference]uint32),
}

// Check if we need to force manual strategy due to external modification
rolloutStrategy := w.Spec.RolloutStrategy
if w.Status.LastModifierIdentity != getControllerIdentity() &&
w.Status.LastModifierIdentity != serverDeleteVersionIdentity &&
w.Status.LastModifierIdentity != "" &&
!temporalState.IgnoreLastModifier {
l.Info(fmt.Sprintf("Forcing Manual rollout strategy since Worker Deployment was modified by a user with a different identity '%s'; to allow controller to make changes again, set 'temporal.io/ignore-last-modifier=true' in the metadata of your Current or Ramping Version; see ownership runbook at docs/ownership.md for more details.", w.Status.LastModifierIdentity))
rolloutStrategy.Strategy = temporaliov1alpha1.UpdateManual
}

// Resolve gate input if gate is configured
var gateInput []byte
Expand Down Expand Up @@ -153,8 +141,6 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
// Convert version config
plan.UpdateVersionConfig = planResult.VersionConfig

plan.RemoveIgnoreLastModifierBuilds = planResult.RemoveIgnoreLastModifierBuilds

// Convert test workflows
for _, wf := range planResult.TestWorkflows {
plan.startTestWorkflows = append(plan.startTestWorkflows, startWorkflowConfig{
Expand Down
6 changes: 3 additions & 3 deletions internal/controller/reconciler_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,20 +684,20 @@ func TestUpdateVersionConfig_EmitsEventOnFailure(t *testing.T) {
{
name: "SetCurrentFailed",
handle: &stubWDHandle{setCurrentErr: errors.New("simulated SetCurrentVersion failure")},
config: &planner.VersionConfig{BuildID: "build-abc", SetCurrent: true},
config: &planner.VersionConfig{BuildID: "build-abc", SetCurrent: true, ManagerIdentity: "some-manager"},
expectedReason: ReasonVersionPromotionFailed,
},
{
name: "SetRampingFailed",
handle: &stubWDHandle{setRampingErr: errors.New("simulated SetRampingVersion failure")},
config: &planner.VersionConfig{BuildID: "build-abc", RampPercentage: 25},
config: &planner.VersionConfig{BuildID: "build-abc", RampPercentage: 25, ManagerIdentity: "some-manager"},
expectedReason: ReasonVersionPromotionFailed,
},
{
// SetCurrentVersion succeeds; UpdateVersionMetadata fails.
name: "MetadataUpdateFailed",
handle: &stubWDHandle{updateMetaErr: errors.New("simulated UpdateVersionMetadata failure")},
config: &planner.VersionConfig{BuildID: "build-abc", SetCurrent: true},
config: &planner.VersionConfig{BuildID: "build-abc", SetCurrent: true, ManagerIdentity: "some-manager"},
expectedReason: ReasonMetadataUpdateFailed,
},
}
Expand Down
1 change: 1 addition & 0 deletions internal/controller/state_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ func (m *stateMapper) mapToStatus(targetBuildID string) *v1alpha1.TemporalWorker
}

status.LastModifierIdentity = m.temporalState.LastModifierIdentity
status.ManagerIdentity = m.temporalState.ManagerIdentity

// Get build IDs directly from temporal state
currentBuildID := m.temporalState.CurrentBuildID
Expand Down
19 changes: 10 additions & 9 deletions internal/controller/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ import (
// status API and may change between releases. Do not write alerting or automation
// that depends on these strings.
const (
ReasonPlanGenerationFailed = "PlanGenerationFailed"
ReasonPlanExecutionFailed = "PlanExecutionFailed"
ReasonDeploymentCreateFailed = "DeploymentCreateFailed"
ReasonDeploymentDeleteFailed = "DeploymentDeleteFailed"
ReasonDeploymentScaleFailed = "DeploymentScaleFailed"
ReasonDeploymentUpdateFailed = "DeploymentUpdateFailed"
ReasonTestWorkflowStartFailed = "TestWorkflowStartFailed"
ReasonVersionPromotionFailed = "VersionPromotionFailed"
ReasonMetadataUpdateFailed = "MetadataUpdateFailed"
ReasonPlanGenerationFailed = "PlanGenerationFailed"
ReasonPlanExecutionFailed = "PlanExecutionFailed"
ReasonDeploymentCreateFailed = "DeploymentCreateFailed"
ReasonDeploymentDeleteFailed = "DeploymentDeleteFailed"
ReasonDeploymentScaleFailed = "DeploymentScaleFailed"
ReasonDeploymentUpdateFailed = "DeploymentUpdateFailed"
ReasonTestWorkflowStartFailed = "TestWorkflowStartFailed"
ReasonVersionPromotionFailed = "VersionPromotionFailed"
ReasonMetadataUpdateFailed = "MetadataUpdateFailed"
ReasonManagerIdentityClaimFailed = "ManagerIdentityClaimFailed"
)

const (
Expand Down
1 change: 1 addition & 0 deletions internal/controller/worker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req
K8sNamespace: workerDeploy.Namespace,
TemporalNamespace: workerDeploy.Spec.WorkerOptions.TemporalNamespace,
Spec: temporalConnection.Spec,
Identity: getControllerIdentity(),
})
if err != nil {
l.Error(err, "invalid Temporal auth secret")
Expand Down
28 changes: 12 additions & 16 deletions internal/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,6 @@ type Plan struct {
ShouldCreateDeployment bool
VersionConfig *VersionConfig
TestWorkflows []WorkflowConfig
// Build IDs of versions from which the controller should
// remove IgnoreLastModifierKey from the version metadata
RemoveIgnoreLastModifierBuilds []string
}

// VersionConfig defines version configuration for Temporal
Expand All @@ -45,6 +42,11 @@ type VersionConfig struct {
SetCurrent bool
// Acceptable values [0,100]
RampPercentage int32

// ManagerIdentity is the current manager identity of the worker deployment in Temporal.
// An empty string indicates the controller should claim the identity before applying
// any routing config changes.
ManagerIdentity string
}

// WorkflowConfig defines a workflow to be started
Expand Down Expand Up @@ -100,17 +102,6 @@ func GeneratePlan(
// Determine version config changes
plan.VersionConfig = getVersionConfigDiff(l, status, temporalState, config, workerDeploymentName)

// Only remove the IgnoreLastModifier metadata after it's been used to make a version config change, which will
// make the controller the LastModifier again
if temporalState != nil && temporalState.IgnoreLastModifier && plan.VersionConfig != nil {
if temporalState.RampingBuildID != "" {
plan.RemoveIgnoreLastModifierBuilds = append(plan.RemoveIgnoreLastModifierBuilds, temporalState.RampingBuildID)
}
if temporalState.CurrentBuildID != "" {
plan.RemoveIgnoreLastModifierBuilds = append(plan.RemoveIgnoreLastModifierBuilds, temporalState.CurrentBuildID)
}
}

// TODO(jlegrone): generate warnings/events on the TemporalWorkerDeployment resource when buildIDs are reachable
// but have no corresponding Deployment.

Expand Down Expand Up @@ -546,9 +537,14 @@ func getVersionConfigDiff(
}
}

managerIdentity := ""
if temporalState != nil {
managerIdentity = temporalState.ManagerIdentity
}
vcfg := &VersionConfig{
ConflictToken: conflictToken,
BuildID: status.TargetVersion.BuildID,
ConflictToken: conflictToken,
BuildID: status.TargetVersion.BuildID,
ManagerIdentity: managerIdentity,
}

// If there is no current version and presence of unversioned pollers is not confirmed for all
Expand Down
Loading
Loading