Skip to content
3 changes: 3 additions & 0 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ This documentation structure is designed to support various types of technical d
### [Limits](limits.md)
Technical constraints and limitations of the Temporal Worker Controller system, including maximum field lengths and other operational boundaries.

### [Ownership](ownership.md)
How the controller gets permission to manage a Worker Deployment, how a human client can take or give back control.

---

*Note: This documentation structure is designed to grow with the project.*
42 changes: 42 additions & 0 deletions docs/ownership.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Ownership Transfer in the Worker Controller

## Problem

If a worker controller is managing a Worker Deployment (ie. the controller is updating the RoutingConfig of the Worker
Deployment), but the user changes something via the CLI (ie. rolls back to the previous current version, or stops the
new target version from ramping because an issue was detected), the controller should not clobber what the human did.

At some point, after this human has handled their urgent rollback, they will want to let the controller know that it is
authorized to resume making changes to the Routing Config of the Worker Deployment.

## Solution

_Once it is available in OSS v1.29, the controller will be able to coordinate with other users via the `OwnerIdentity`
field of a Worker Deployment. This runbook will be updated when that is available and implemented by the controller._

In the meantime, the controller will watch the `LastModifierIdentity` field of a Worker Deployment to detect whether
another user has made a change. If another user made a change to the Worker Deployment, the controller will not make
any more changes to ensure a human's change is not clobbered.

Once you are done making your own changes to the Worker Deployment's current and ramping versions, and you are ready
for the Worker Controller to take over, you can update the metadata to indicate that.

There is no Temporal server support for Worker Deployment Version-level metadata, so you'll have to set this value on
the Current Version of your Worker Deployment.

```bash
temporal worker deployment update-metadata-version \
--deployment-name $MY_DEPLOYMENT \
--build-id $CURRENT_VERSION_BUILD_ID
--metadata 'temporal.io/ignore-last-modifier=true'`
```
In the rare case that you have a nil Current Version when you are passing back ownership, you should set it on your Ramping Version
```bash
temporal worker deployment update-metadata-version \
--deployment-name $MY_DEPLOYMENT \
--build-id $RAMPING_VERSION_BUILD_ID
--metadata 'temporal.io/ignore-last-modifier=true'`
```

In the even rarer case that you have nil Current Version and nil Ramping Version, you'll need to use the CLI or SDK to
set a Current or Ramping Version and then do as instructed above.
19 changes: 17 additions & 2 deletions internal/controller/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/go-logr/logr"
"github.com/temporalio/temporal-worker-controller/internal/temporal"
enumspb "go.temporal.io/api/enums/v1"
sdkclient "go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
Expand Down Expand Up @@ -91,7 +92,7 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
if _, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{
BuildID: vcfg.BuildID,
ConflictToken: vcfg.ConflictToken,
Identity: ControllerIdentity,
Identity: getControllerIdentity(),
}); err != nil {
return fmt.Errorf("unable to set current deployment version: %w", err)
}
Expand All @@ -106,7 +107,7 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
BuildID: vcfg.BuildID,
Percentage: vcfg.RampPercentage,
ConflictToken: vcfg.ConflictToken,
Identity: ControllerIdentity,
Identity: getControllerIdentity(),
}); err != nil {
return fmt.Errorf("unable to set ramping deployment version: %w", err)
}
Expand All @@ -127,5 +128,19 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
}
}

for _, buildId := range p.RemoveIgnoreLastModifierBuilds {
if _, err := deploymentHandler.UpdateVersionMetadata(ctx, sdkclient.WorkerDeploymentUpdateVersionMetadataOptions{
Version: worker.WorkerDeploymentVersion{
DeploymentName: p.WorkerDeploymentName,
BuildId: buildId,
},
MetadataUpdate: sdkclient.WorkerDeploymentMetadataUpdate{
RemoveEntries: []string{temporal.IgnoreLastModifierKey},
},
}); err != nil {
return fmt.Errorf("unable to update metadata to remove %s deployment: %w", temporal.IgnoreLastModifierKey, err)
}
}

return nil
}
12 changes: 10 additions & 2 deletions internal/controller/genplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ type plan struct {

// Start a workflow
startTestWorkflows []startWorkflowConfig

// Build IDs of versions from which the controller should
// remove IgnoreLastModifierKey from the version metadata
RemoveIgnoreLastModifierBuilds []string
}

// startWorkflowConfig defines a workflow to be started
Expand Down Expand Up @@ -75,8 +79,10 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(

// Check if we need to force manual strategy due to external modification
rolloutStrategy := w.Spec.RolloutStrategy
if w.Status.LastModifierIdentity != ControllerIdentity && w.Status.LastModifierIdentity != "" {
l.Info("Forcing manual rollout strategy since deployment was modified externally")
if w.Status.LastModifierIdentity != getControllerIdentity() &&
w.Status.LastModifierIdentity != "" &&
!temporalState.IgnoreLastModifier {
l.Info(fmt.Sprintf("Forcing Manual rollout strategy since Worker Deployment was modified by a user with a different identity '%s'; to allow controller to make changes again, set 'temporal.io/ignore-last-modifier=true' in the metadata of your Current or Ramping Version; see ownership runbook at docs/ownership.md for more details.", w.Status.LastModifierIdentity))
Comment thread
carlydf marked this conversation as resolved.
rolloutStrategy.Strategy = temporaliov1alpha1.UpdateManual
}

Expand Down Expand Up @@ -107,6 +113,8 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
// Convert version config
plan.UpdateVersionConfig = planResult.VersionConfig

plan.RemoveIgnoreLastModifierBuilds = planResult.RemoveIgnoreLastModifierBuilds

// Convert test workflows
for _, wf := range planResult.TestWorkflows {
plan.startTestWorkflows = append(plan.startTestWorkflows, startWorkflowConfig{
Expand Down
3 changes: 0 additions & 3 deletions internal/controller/genstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ import (
ctrl "sigs.k8s.io/controller-runtime"
)

// ControllerIdentity is the identity the controller passes to all write calls.
const ControllerIdentity = "temporal-worker-controller"

func (r *TemporalWorkerDeploymentReconciler) generateStatus(
ctx context.Context,
l logr.Logger,
Expand Down
2 changes: 2 additions & 0 deletions internal/controller/state_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ func (m *stateMapper) mapToStatus(targetBuildID string) *v1alpha1.TemporalWorker
VersionConflictToken: m.temporalState.VersionConflictToken,
}

status.LastModifierIdentity = m.temporalState.LastModifierIdentity
Comment thread
carlydf marked this conversation as resolved.

// Get build IDs directly from temporal state
currentBuildID := m.temporalState.CurrentBuildID
rampingBuildID := m.temporalState.RampingBuildID
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
const (
controllerIdentityKey = "temporal.io/controller"
controllerVersionKey = "temporal.io/controller-version"
defaultControllerIdentity = "temporal-worker-controller"
DefaultControllerIdentity = "temporal-worker-controller"
)

// Version is set by goreleaser via ldflags at build time
Expand All @@ -35,5 +35,5 @@ func getControllerIdentity() string {
if identity := os.Getenv("CONTROLLER_IDENTITY"); identity != "" {
return identity
}
return defaultControllerIdentity
return DefaultControllerIdentity
}
1 change: 1 addition & 0 deletions internal/controller/worker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req
temporalClient,
workerDeploymentName,
workerDeploy.Spec.WorkerOptions.TemporalNamespace,
getControllerIdentity(),
)
if err != nil {
return ctrl.Result{}, fmt.Errorf("unable to get Temporal worker deployment state: %w", err)
Expand Down
14 changes: 14 additions & 0 deletions internal/planner/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type Plan struct {
ShouldCreateDeployment bool
VersionConfig *VersionConfig
TestWorkflows []WorkflowConfig
// Build IDs of versions from which the controller should
// remove IgnoreLastModifierKey from the version metadata
RemoveIgnoreLastModifierBuilds []string
}

// VersionConfig defines version configuration for Temporal
Expand Down Expand Up @@ -83,6 +86,17 @@ func GeneratePlan(
// Determine version config changes
plan.VersionConfig = getVersionConfigDiff(l, status, temporalState, config, workerDeploymentName)

// Only remove the IgnoreLastModifier metadata after it's been used to make a version config change, which will
// make the controller the LastModifier again
if temporalState != nil && temporalState.IgnoreLastModifier && plan.VersionConfig != nil {
if temporalState.RampingBuildID != "" {
plan.RemoveIgnoreLastModifierBuilds = append(plan.RemoveIgnoreLastModifierBuilds, temporalState.RampingBuildID)
}
if temporalState.CurrentBuildID != "" {
plan.RemoveIgnoreLastModifierBuilds = append(plan.RemoveIgnoreLastModifierBuilds, temporalState.CurrentBuildID)
}
}

// TODO(jlegrone): generate warnings/events on the TemporalWorkerDeployment resource when buildIDs are reachable
// but have no corresponding Deployment.

Expand Down
57 changes: 57 additions & 0 deletions internal/temporal/worker_deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,14 @@ import (
enumspb "go.temporal.io/api/enums/v1"
"go.temporal.io/api/serviceerror"
temporalClient "go.temporal.io/sdk/client"
"go.temporal.io/sdk/converter"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

const (
IgnoreLastModifierKey = "temporal.io/ignore-last-modifier"
)

// VersionInfo contains information about a specific version
type VersionInfo struct {
DeploymentName string
Expand All @@ -40,6 +45,7 @@ type TemporalWorkerState struct {
// Versions indexed by build ID
Versions map[string]*VersionInfo
LastModifierIdentity string
IgnoreLastModifier bool
}

// GetWorkerDeploymentState queries Temporal to get the state of a worker deployment
Expand All @@ -48,6 +54,7 @@ func GetWorkerDeploymentState(
client temporalClient.Client,
workerDeploymentName string,
namespace string,
controllerIdentity string,
) (*TemporalWorkerState, error) {
state := &TemporalWorkerState{
Versions: make(map[string]*VersionInfo),
Expand Down Expand Up @@ -81,6 +88,14 @@ func GetWorkerDeploymentState(
state.LastModifierIdentity = workerDeploymentInfo.LastModifierIdentity
state.VersionConflictToken = resp.ConflictToken

// Decide whether to ignore LastModifierIdentity
if state.LastModifierIdentity != controllerIdentity && state.LastModifierIdentity != "" {
state.IgnoreLastModifier, err = DeploymentShouldIgnoreLastModifier(ctx, deploymentHandler, routingConfig)
if err != nil {
return nil, err
}
}

// TODO(jlegrone): Re-enable stats once available in versioning v3.

// Set ramping since time if applicable
Expand Down Expand Up @@ -232,3 +247,45 @@ func mapWorkflowStatus(status enumspb.WorkflowExecutionStatus) temporaliov1alpha
func GetTestWorkflowID(deploymentName, buildID, taskQueue string) string {
return fmt.Sprintf("test-%s:%s-%s", deploymentName, buildID, taskQueue)
}

func DeploymentShouldIgnoreLastModifier(
ctx context.Context,
deploymentHandler temporalClient.WorkerDeploymentHandle,
routingConfig temporalClient.WorkerDeploymentRoutingConfig,
) (shouldIgnore bool, err error) {
if routingConfig.CurrentVersion != nil {
shouldIgnore, err = getShouldIgnoreLastModifier(ctx, deploymentHandler, routingConfig.CurrentVersion.BuildId)
if err != nil {
return false, err
}
}
if !shouldIgnore && // if someone has a non-nil Current Version, but only set the metadata in their Ramping Version, also count that
routingConfig.RampingVersion != nil {
return getShouldIgnoreLastModifier(ctx, deploymentHandler, routingConfig.CurrentVersion.BuildId)
}
return shouldIgnore, nil
}

func getShouldIgnoreLastModifier(
ctx context.Context,
deploymentHandler temporalClient.WorkerDeploymentHandle,
buildId string,
) (bool, error) {
desc, err := deploymentHandler.DescribeVersion(ctx, temporalClient.WorkerDeploymentDescribeVersionOptions{
BuildID: buildId,
})
if err != nil {
return false, fmt.Errorf("unable to describe version: %w", err)
}
for k, v := range desc.Info.Metadata {
if k == IgnoreLastModifierKey {
var s string
err = converter.GetDefaultDataConverter().FromPayload(v, &s)
if err != nil {
return false, fmt.Errorf("unable to decode metadata payload for key %s: %w", IgnoreLastModifierKey, err)
}
return s == "true", nil
}
}
return false, nil
}
67 changes: 67 additions & 0 deletions internal/temporal/worker_deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,17 @@
package temporal

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
"github.com/temporalio/temporal-worker-controller/internal/testhelpers"
enumspb "go.temporal.io/api/enums/v1"
sdkclient "go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
"go.temporal.io/server/temporaltest"
)

func TestMapWorkflowStatus(t *testing.T) {
Expand Down Expand Up @@ -99,3 +105,64 @@ func TestGetTestWorkflowID(t *testing.T) {
})
}
}

func TestGetIgnoreLastModifier(t *testing.T) {
ctx := context.Background()
deploymentName := "test-dep"
buildId := "v1"
tq := "test-tq"

ts := temporaltest.NewServer(temporaltest.WithT(t))

// create version
w, stopFunc, err := testhelpers.NewWorker(ctx, deploymentName, buildId, tq, ts.GetFrontendHostPort(), ts.GetDefaultNamespace(), true)
if err != nil {
t.Error(err)
}
if err = w.Start(); err != nil {
t.Errorf("error starting unversioned worker %v", err)
}
defer stopFunc()

deploymentHandler := ts.GetDefaultClient().WorkerDeploymentClient().GetHandle(deploymentName)

eventually(t, 5*time.Second, 100*time.Millisecond, func() error {
Comment thread
carlydf marked this conversation as resolved.
_, err := deploymentHandler.UpdateVersionMetadata(ctx, sdkclient.WorkerDeploymentUpdateVersionMetadataOptions{
Version: worker.WorkerDeploymentVersion{
DeploymentName: deploymentName,
BuildId: buildId,
},
MetadataUpdate: sdkclient.WorkerDeploymentMetadataUpdate{
UpsertEntries: map[string]interface{}{
IgnoreLastModifierKey: "true",
},
},
})
return err
})

shouldIgnore, err := getShouldIgnoreLastModifier(ctx, deploymentHandler, buildId)
if err != nil {
t.Error(err)
}
if !shouldIgnore {
t.Error("expected true, got false")
}

}

func eventually(t *testing.T, timeout, interval time.Duration, check func() error) {
deadline := time.Now().Add(timeout)
var lastErr error
for time.Now().Before(deadline) {
err := check()
if err == nil {
return // Success!
}
lastErr = err
time.Sleep(interval)
}
if lastErr != nil {
t.Fatalf("eventually failed after %s: %v", timeout, lastErr)
}
}
3 changes: 3 additions & 0 deletions internal/testhelpers/make.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ func MakeTWDWithName(name, namespace string) *temporaliov1alpha1.TemporalWorkerD
}

func MakeCurrentVersion(namespace, twdName, imageName string, healthy, createDeployment bool) *temporaliov1alpha1.CurrentWorkerDeploymentVersion {
if imageName == "" { // empty build id == nil current version == unversioned
return nil
}
ret := &temporaliov1alpha1.CurrentWorkerDeploymentVersion{
BaseWorkerDeploymentVersion: MakeBaseVersion(namespace, twdName, imageName, temporaliov1alpha1.VersionStatusCurrent, createDeployment, true),
}
Expand Down
Loading
Loading