diff --git a/docs/README.md b/docs/README.md index 373d661c..8d34d855 100644 --- a/docs/README.md +++ b/docs/README.md @@ -14,6 +14,9 @@ This documentation structure is designed to support various types of technical d ### [Limits](limits.md) Technical constraints and limitations of the Temporal Worker Controller system, including maximum field lengths and other operational boundaries. +### [Ownership](ownership.md) +How the controller gets permission to manage a Worker Deployment, how a human client can take or give back control. + --- *Note: This documentation structure is designed to grow with the project.* \ No newline at end of file diff --git a/docs/ownership.md b/docs/ownership.md new file mode 100644 index 00000000..58c17a5d --- /dev/null +++ b/docs/ownership.md @@ -0,0 +1,42 @@ +# Ownership Transfer in the Worker Controller + +## Problem + +If a worker controller is managing a Worker Deployment (ie. the controller is updating the RoutingConfig of the Worker +Deployment), but the user changes something via the CLI (ie. rolls back to the previous current version, or stops the +new target version from ramping because an issue was detected), the controller should not clobber what the human did. + +At some point, after this human has handled their urgent rollback, they will want to let the controller know that it is +authorized to resume making changes to the Routing Config of the Worker Deployment. + +## Solution + +_Once it is available in OSS v1.29, the controller will be able to coordinate with other users via the `OwnerIdentity` +field of a Worker Deployment. This runbook will be updated when that is available and implemented by the controller._ + +In the meantime, the controller will watch the `LastModifierIdentity` field of a Worker Deployment to detect whether +another user has made a change. If another user made a change to the Worker Deployment, the controller will not make +any more changes to ensure a human's change is not clobbered. + +Once you are done making your own changes to the Worker Deployment's current and ramping versions, and you are ready +for the Worker Controller to take over, you can update the metadata to indicate that. + +There is no Temporal server support for Worker Deployment Version-level metadata, so you'll have to set this value on +the Current Version of your Worker Deployment. + +```bash +temporal worker deployment update-metadata-version \ + --deployment-name $MY_DEPLOYMENT \ + --build-id $CURRENT_VERSION_BUILD_ID + --metadata 'temporal.io/ignore-last-modifier=true'` +``` +In the rare case that you have a nil Current Version when you are passing back ownership, you should set it on your Ramping Version +```bash +temporal worker deployment update-metadata-version \ + --deployment-name $MY_DEPLOYMENT \ + --build-id $RAMPING_VERSION_BUILD_ID + --metadata 'temporal.io/ignore-last-modifier=true'` +``` + +In the even rarer case that you have nil Current Version and nil Ramping Version, you'll need to use the CLI or SDK to +set a Current or Ramping Version and then do as instructed above. \ No newline at end of file diff --git a/internal/controller/execplan.go b/internal/controller/execplan.go index dfbced1b..eaf15a5c 100644 --- a/internal/controller/execplan.go +++ b/internal/controller/execplan.go @@ -10,6 +10,7 @@ import ( "time" "github.com/go-logr/logr" + "github.com/temporalio/temporal-worker-controller/internal/temporal" enumspb "go.temporal.io/api/enums/v1" sdkclient "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" @@ -91,7 +92,7 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l if _, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{ BuildID: vcfg.BuildID, ConflictToken: vcfg.ConflictToken, - Identity: ControllerIdentity, + Identity: getControllerIdentity(), }); err != nil { return fmt.Errorf("unable to set current deployment version: %w", err) } @@ -106,7 +107,7 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l BuildID: vcfg.BuildID, Percentage: vcfg.RampPercentage, ConflictToken: vcfg.ConflictToken, - Identity: ControllerIdentity, + Identity: getControllerIdentity(), }); err != nil { return fmt.Errorf("unable to set ramping deployment version: %w", err) } @@ -127,5 +128,19 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l } } + for _, buildId := range p.RemoveIgnoreLastModifierBuilds { + if _, err := deploymentHandler.UpdateVersionMetadata(ctx, sdkclient.WorkerDeploymentUpdateVersionMetadataOptions{ + Version: worker.WorkerDeploymentVersion{ + DeploymentName: p.WorkerDeploymentName, + BuildId: buildId, + }, + MetadataUpdate: sdkclient.WorkerDeploymentMetadataUpdate{ + RemoveEntries: []string{temporal.IgnoreLastModifierKey}, + }, + }); err != nil { + return fmt.Errorf("unable to update metadata to remove %s deployment: %w", temporal.IgnoreLastModifierKey, err) + } + } + return nil } diff --git a/internal/controller/genplan.go b/internal/controller/genplan.go index f5cc1387..ec128cca 100644 --- a/internal/controller/genplan.go +++ b/internal/controller/genplan.go @@ -33,6 +33,10 @@ type plan struct { // Start a workflow startTestWorkflows []startWorkflowConfig + + // Build IDs of versions from which the controller should + // remove IgnoreLastModifierKey from the version metadata + RemoveIgnoreLastModifierBuilds []string } // startWorkflowConfig defines a workflow to be started @@ -75,8 +79,10 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan( // Check if we need to force manual strategy due to external modification rolloutStrategy := w.Spec.RolloutStrategy - if w.Status.LastModifierIdentity != ControllerIdentity && w.Status.LastModifierIdentity != "" { - l.Info("Forcing manual rollout strategy since deployment was modified externally") + if w.Status.LastModifierIdentity != getControllerIdentity() && + w.Status.LastModifierIdentity != "" && + !temporalState.IgnoreLastModifier { + l.Info(fmt.Sprintf("Forcing Manual rollout strategy since Worker Deployment was modified by a user with a different identity '%s'; to allow controller to make changes again, set 'temporal.io/ignore-last-modifier=true' in the metadata of your Current or Ramping Version; see ownership runbook at docs/ownership.md for more details.", w.Status.LastModifierIdentity)) rolloutStrategy.Strategy = temporaliov1alpha1.UpdateManual } @@ -107,6 +113,8 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan( // Convert version config plan.UpdateVersionConfig = planResult.VersionConfig + plan.RemoveIgnoreLastModifierBuilds = planResult.RemoveIgnoreLastModifierBuilds + // Convert test workflows for _, wf := range planResult.TestWorkflows { plan.startTestWorkflows = append(plan.startTestWorkflows, startWorkflowConfig{ diff --git a/internal/controller/genstatus.go b/internal/controller/genstatus.go index eb61e9a9..680fa329 100644 --- a/internal/controller/genstatus.go +++ b/internal/controller/genstatus.go @@ -16,9 +16,6 @@ import ( ctrl "sigs.k8s.io/controller-runtime" ) -// ControllerIdentity is the identity the controller passes to all write calls. -const ControllerIdentity = "temporal-worker-controller" - func (r *TemporalWorkerDeploymentReconciler) generateStatus( ctx context.Context, l logr.Logger, diff --git a/internal/controller/state_mapper.go b/internal/controller/state_mapper.go index 9911e27e..1800aada 100644 --- a/internal/controller/state_mapper.go +++ b/internal/controller/state_mapper.go @@ -33,6 +33,8 @@ func (m *stateMapper) mapToStatus(targetBuildID string) *v1alpha1.TemporalWorker VersionConflictToken: m.temporalState.VersionConflictToken, } + status.LastModifierIdentity = m.temporalState.LastModifierIdentity + // Get build IDs directly from temporal state currentBuildID := m.temporalState.CurrentBuildID rampingBuildID := m.temporalState.RampingBuildID diff --git a/internal/controller/util.go b/internal/controller/util.go index 1c1285b1..1a2e2630 100644 --- a/internal/controller/util.go +++ b/internal/controller/util.go @@ -11,7 +11,7 @@ import ( const ( controllerIdentityKey = "temporal.io/controller" controllerVersionKey = "temporal.io/controller-version" - defaultControllerIdentity = "temporal-worker-controller" + DefaultControllerIdentity = "temporal-worker-controller" ) // Version is set by goreleaser via ldflags at build time @@ -35,5 +35,5 @@ func getControllerIdentity() string { if identity := os.Getenv("CONTROLLER_IDENTITY"); identity != "" { return identity } - return defaultControllerIdentity + return DefaultControllerIdentity } diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index cddae68a..51e38cc2 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -137,6 +137,7 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req temporalClient, workerDeploymentName, workerDeploy.Spec.WorkerOptions.TemporalNamespace, + getControllerIdentity(), ) if err != nil { return ctrl.Result{}, fmt.Errorf("unable to get Temporal worker deployment state: %w", err) diff --git a/internal/planner/planner.go b/internal/planner/planner.go index f3611ae2..8224c37b 100644 --- a/internal/planner/planner.go +++ b/internal/planner/planner.go @@ -25,6 +25,9 @@ type Plan struct { ShouldCreateDeployment bool VersionConfig *VersionConfig TestWorkflows []WorkflowConfig + // Build IDs of versions from which the controller should + // remove IgnoreLastModifierKey from the version metadata + RemoveIgnoreLastModifierBuilds []string } // VersionConfig defines version configuration for Temporal @@ -83,6 +86,17 @@ func GeneratePlan( // Determine version config changes plan.VersionConfig = getVersionConfigDiff(l, status, temporalState, config, workerDeploymentName) + // Only remove the IgnoreLastModifier metadata after it's been used to make a version config change, which will + // make the controller the LastModifier again + if temporalState != nil && temporalState.IgnoreLastModifier && plan.VersionConfig != nil { + if temporalState.RampingBuildID != "" { + plan.RemoveIgnoreLastModifierBuilds = append(plan.RemoveIgnoreLastModifierBuilds, temporalState.RampingBuildID) + } + if temporalState.CurrentBuildID != "" { + plan.RemoveIgnoreLastModifierBuilds = append(plan.RemoveIgnoreLastModifierBuilds, temporalState.CurrentBuildID) + } + } + // TODO(jlegrone): generate warnings/events on the TemporalWorkerDeployment resource when buildIDs are reachable // but have no corresponding Deployment. diff --git a/internal/temporal/worker_deployment.go b/internal/temporal/worker_deployment.go index b7d9c11f..30429c4c 100644 --- a/internal/temporal/worker_deployment.go +++ b/internal/temporal/worker_deployment.go @@ -15,9 +15,14 @@ import ( enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" temporalClient "go.temporal.io/sdk/client" + "go.temporal.io/sdk/converter" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +const ( + IgnoreLastModifierKey = "temporal.io/ignore-last-modifier" +) + // VersionInfo contains information about a specific version type VersionInfo struct { DeploymentName string @@ -40,6 +45,7 @@ type TemporalWorkerState struct { // Versions indexed by build ID Versions map[string]*VersionInfo LastModifierIdentity string + IgnoreLastModifier bool } // GetWorkerDeploymentState queries Temporal to get the state of a worker deployment @@ -48,6 +54,7 @@ func GetWorkerDeploymentState( client temporalClient.Client, workerDeploymentName string, namespace string, + controllerIdentity string, ) (*TemporalWorkerState, error) { state := &TemporalWorkerState{ Versions: make(map[string]*VersionInfo), @@ -81,6 +88,14 @@ func GetWorkerDeploymentState( state.LastModifierIdentity = workerDeploymentInfo.LastModifierIdentity state.VersionConflictToken = resp.ConflictToken + // Decide whether to ignore LastModifierIdentity + if state.LastModifierIdentity != controllerIdentity && state.LastModifierIdentity != "" { + state.IgnoreLastModifier, err = DeploymentShouldIgnoreLastModifier(ctx, deploymentHandler, routingConfig) + if err != nil { + return nil, err + } + } + // TODO(jlegrone): Re-enable stats once available in versioning v3. // Set ramping since time if applicable @@ -232,3 +247,45 @@ func mapWorkflowStatus(status enumspb.WorkflowExecutionStatus) temporaliov1alpha func GetTestWorkflowID(deploymentName, buildID, taskQueue string) string { return fmt.Sprintf("test-%s:%s-%s", deploymentName, buildID, taskQueue) } + +func DeploymentShouldIgnoreLastModifier( + ctx context.Context, + deploymentHandler temporalClient.WorkerDeploymentHandle, + routingConfig temporalClient.WorkerDeploymentRoutingConfig, +) (shouldIgnore bool, err error) { + if routingConfig.CurrentVersion != nil { + shouldIgnore, err = getShouldIgnoreLastModifier(ctx, deploymentHandler, routingConfig.CurrentVersion.BuildId) + if err != nil { + return false, err + } + } + if !shouldIgnore && // if someone has a non-nil Current Version, but only set the metadata in their Ramping Version, also count that + routingConfig.RampingVersion != nil { + return getShouldIgnoreLastModifier(ctx, deploymentHandler, routingConfig.CurrentVersion.BuildId) + } + return shouldIgnore, nil +} + +func getShouldIgnoreLastModifier( + ctx context.Context, + deploymentHandler temporalClient.WorkerDeploymentHandle, + buildId string, +) (bool, error) { + desc, err := deploymentHandler.DescribeVersion(ctx, temporalClient.WorkerDeploymentDescribeVersionOptions{ + BuildID: buildId, + }) + if err != nil { + return false, fmt.Errorf("unable to describe version: %w", err) + } + for k, v := range desc.Info.Metadata { + if k == IgnoreLastModifierKey { + var s string + err = converter.GetDefaultDataConverter().FromPayload(v, &s) + if err != nil { + return false, fmt.Errorf("unable to decode metadata payload for key %s: %w", IgnoreLastModifierKey, err) + } + return s == "true", nil + } + } + return false, nil +} diff --git a/internal/temporal/worker_deployment_test.go b/internal/temporal/worker_deployment_test.go index 9271ac2e..7e302d24 100644 --- a/internal/temporal/worker_deployment_test.go +++ b/internal/temporal/worker_deployment_test.go @@ -5,11 +5,17 @@ package temporal import ( + "context" "testing" + "time" "github.com/stretchr/testify/assert" temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" + "github.com/temporalio/temporal-worker-controller/internal/testhelpers" enumspb "go.temporal.io/api/enums/v1" + sdkclient "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" + "go.temporal.io/server/temporaltest" ) func TestMapWorkflowStatus(t *testing.T) { @@ -99,3 +105,64 @@ func TestGetTestWorkflowID(t *testing.T) { }) } } + +func TestGetIgnoreLastModifier(t *testing.T) { + ctx := context.Background() + deploymentName := "test-dep" + buildId := "v1" + tq := "test-tq" + + ts := temporaltest.NewServer(temporaltest.WithT(t)) + + // create version + w, stopFunc, err := testhelpers.NewWorker(ctx, deploymentName, buildId, tq, ts.GetFrontendHostPort(), ts.GetDefaultNamespace(), true) + if err != nil { + t.Error(err) + } + if err = w.Start(); err != nil { + t.Errorf("error starting unversioned worker %v", err) + } + defer stopFunc() + + deploymentHandler := ts.GetDefaultClient().WorkerDeploymentClient().GetHandle(deploymentName) + + eventually(t, 5*time.Second, 100*time.Millisecond, func() error { + _, err := deploymentHandler.UpdateVersionMetadata(ctx, sdkclient.WorkerDeploymentUpdateVersionMetadataOptions{ + Version: worker.WorkerDeploymentVersion{ + DeploymentName: deploymentName, + BuildId: buildId, + }, + MetadataUpdate: sdkclient.WorkerDeploymentMetadataUpdate{ + UpsertEntries: map[string]interface{}{ + IgnoreLastModifierKey: "true", + }, + }, + }) + return err + }) + + shouldIgnore, err := getShouldIgnoreLastModifier(ctx, deploymentHandler, buildId) + if err != nil { + t.Error(err) + } + if !shouldIgnore { + t.Error("expected true, got false") + } + +} + +func eventually(t *testing.T, timeout, interval time.Duration, check func() error) { + deadline := time.Now().Add(timeout) + var lastErr error + for time.Now().Before(deadline) { + err := check() + if err == nil { + return // Success! + } + lastErr = err + time.Sleep(interval) + } + if lastErr != nil { + t.Fatalf("eventually failed after %s: %v", timeout, lastErr) + } +} diff --git a/internal/testhelpers/make.go b/internal/testhelpers/make.go index 784a06e9..9abae59d 100644 --- a/internal/testhelpers/make.go +++ b/internal/testhelpers/make.go @@ -135,6 +135,9 @@ func MakeTWDWithName(name, namespace string) *temporaliov1alpha1.TemporalWorkerD } func MakeCurrentVersion(namespace, twdName, imageName string, healthy, createDeployment bool) *temporaliov1alpha1.CurrentWorkerDeploymentVersion { + if imageName == "" { // empty build id == nil current version == unversioned + return nil + } ret := &temporaliov1alpha1.CurrentWorkerDeploymentVersion{ BaseWorkerDeploymentVersion: MakeBaseVersion(namespace, twdName, imageName, temporaliov1alpha1.VersionStatusCurrent, createDeployment, true), } diff --git a/internal/testhelpers/test_builder.go b/internal/testhelpers/test_builder.go index 3ab4194b..02e161ec 100644 --- a/internal/testhelpers/test_builder.go +++ b/internal/testhelpers/test_builder.go @@ -238,9 +238,13 @@ type TestCase struct { // waitTime is the duration to delay before checking expected status. waitTime *time.Duration - // setupFunc is an arbitrary function called at the end of setting up the environment specified by input.Status. + // setupFunc is an arbitrary function called at the end of setting up the environment, after making the state match input.Status. // It can be used for additional state creation or destruction. setupFunc func(t *testing.T, ctx context.Context, tc TestCase, env TestEnv) + + // validatorFunc is an arbitrary function called after the test validates the expected TWD Status has been achieved. + // It can be used for additional state validation. + validatorFunc func(t *testing.T, ctx context.Context, tc TestCase, env TestEnv) } func (tc *TestCase) GetTWD() *temporaliov1alpha1.TemporalWorkerDeployment { @@ -271,6 +275,10 @@ func (tc *TestCase) GetSetupFunc() func(t *testing.T, ctx context.Context, tc Te return tc.setupFunc } +func (tc *TestCase) GetValidatorFunc() func(t *testing.T, ctx context.Context, tc TestCase, env TestEnv) { + return tc.validatorFunc +} + // TestCaseBuilder provides a fluent interface for building test cases type TestCaseBuilder struct { name string @@ -283,7 +291,8 @@ type TestCaseBuilder struct { expectedDeploymentInfos []DeploymentInfo waitTime *time.Duration - setupFunc func(t *testing.T, ctx context.Context, tc TestCase, env TestEnv) + setupFunc func(t *testing.T, ctx context.Context, tc TestCase, env TestEnv) + validatorFunc func(t *testing.T, ctx context.Context, tc TestCase, env TestEnv) } // NewTestCase creates a new test case builder @@ -310,12 +319,19 @@ func NewTestCaseWithValues(name, k8sNamespace, temporalNamespace string) *TestCa } } -// WithSetupFunction defines a function that the test case will call while setting up the state. +// WithSetupFunction defines a function that the test case will call while setting up the state, after creating the initial Status. func (tcb *TestCaseBuilder) WithSetupFunction(f func(t *testing.T, ctx context.Context, tc TestCase, env TestEnv)) *TestCaseBuilder { tcb.setupFunc = f return tcb } +// WithValidatorFunction defines a function that the test case will call after the TWD expected state has been validated. +// Can be used to validate any other arbitrary state. +func (tcb *TestCaseBuilder) WithValidatorFunction(f func(t *testing.T, ctx context.Context, tc TestCase, env TestEnv)) *TestCaseBuilder { + tcb.validatorFunc = f + return tcb +} + // WithInput sets the input TWD func (tcb *TestCaseBuilder) WithInput(twdBuilder *TemporalWorkerDeploymentBuilder) *TestCaseBuilder { tcb.twdBuilder = twdBuilder @@ -384,8 +400,9 @@ func (tcb *TestCaseBuilder) WithExpectedStatus(statusBuilder *StatusBuilder) *Te // Build returns the constructed test case func (tcb *TestCaseBuilder) Build() TestCase { ret := TestCase{ - setupFunc: tcb.setupFunc, - waitTime: tcb.waitTime, + setupFunc: tcb.setupFunc, + validatorFunc: tcb.validatorFunc, + waitTime: tcb.waitTime, twd: tcb.twdBuilder. WithName(tcb.name). WithNamespace(tcb.k8sNamespace). diff --git a/internal/tests/internal/env_helpers.go b/internal/tests/internal/env_helpers.go index 8316745e..719f55eb 100644 --- a/internal/tests/internal/env_helpers.go +++ b/internal/tests/internal/env_helpers.go @@ -15,11 +15,14 @@ import ( temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" "github.com/temporalio/temporal-worker-controller/internal/controller" "github.com/temporalio/temporal-worker-controller/internal/controller/clientpool" + "github.com/temporalio/temporal-worker-controller/internal/k8s" + "github.com/temporalio/temporal-worker-controller/internal/temporal" "github.com/temporalio/temporal-worker-controller/internal/testhelpers" "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" temporalClient "go.temporal.io/sdk/client" "go.temporal.io/sdk/log" + "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -261,3 +264,79 @@ func getPollers(ctx context.Context, } return resp.GetPollers(), nil } + +func setUnversionedCurrent(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { + workerDeploymentName := k8s.ComputeWorkerDeploymentName(tc.GetTWD()) + deploymentHandle := env.Ts.GetDefaultClient().WorkerDeploymentClient().GetHandle(workerDeploymentName) + + _, err := deploymentHandle.SetCurrentVersion(ctx, temporalClient.WorkerDeploymentSetCurrentVersionOptions{ + BuildID: "", + IgnoreMissingTaskQueues: true, + }) + if err != nil { + t.Errorf("error setting unversioned current version to spook controller into manual mode: %v", err) + } + t.Logf("set current version to unversioned with non-controller identity") + +} + +func setCurrentAndSetIgnoreModifierMetadata(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { + workerDeploymentName := k8s.ComputeWorkerDeploymentName(tc.GetTWD()) + deploymentHandle := env.Ts.GetDefaultClient().WorkerDeploymentClient().GetHandle(workerDeploymentName) + + // change current version arbitrarily so we can be the last modifier + resp, err := deploymentHandle.SetCurrentVersion(ctx, temporalClient.WorkerDeploymentSetCurrentVersionOptions{ + BuildID: "", + IgnoreMissingTaskQueues: true, + }) + if err != nil { + t.Errorf("error setting unversioned current version to spook controller into manual mode: %v", err) + } + t.Logf("set current version to unversioned with non-controller identity") + + // set it back to what it was so that it's non-nil + _, err = deploymentHandle.SetCurrentVersion(ctx, temporalClient.WorkerDeploymentSetCurrentVersionOptions{ + BuildID: resp.PreviousVersion.BuildId, + }) + if err != nil { + t.Errorf("error restoring current version: %v", err) + } + t.Logf("set current version to build %v with non-controller identity", resp.PreviousVersion.BuildId) + + // set the IgnoreLastModifier metadata + _, err = deploymentHandle.UpdateVersionMetadata(ctx, temporalClient.WorkerDeploymentUpdateVersionMetadataOptions{ + Version: worker.WorkerDeploymentVersion{ + DeploymentName: workerDeploymentName, + BuildId: resp.PreviousVersion.BuildId, + }, + MetadataUpdate: temporalClient.WorkerDeploymentMetadataUpdate{ + UpsertEntries: map[string]interface{}{ + temporal.IgnoreLastModifierKey: "true", + }, + }, + }) + if err != nil { + t.Errorf("error updating version metadata: %v", err) + } + t.Log("set current version's metadata to have \"temporal.io/ignore-last-modifier\"=\"true\"") +} + +func validateIgnoreLastModifierMetadata(expectShouldIgnore bool) func(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { + return func(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { + workerDeploymentName := k8s.ComputeWorkerDeploymentName(tc.GetTWD()) + deploymentHandle := env.Ts.GetDefaultClient().WorkerDeploymentClient().GetHandle(workerDeploymentName) + + desc, err := deploymentHandle.Describe(ctx, temporalClient.WorkerDeploymentDescribeOptions{}) + if err != nil { + t.Errorf("error describing worker deployment: %v", err) + } + + shouldIgnore, err := temporal.DeploymentShouldIgnoreLastModifier(ctx, deploymentHandle, desc.Info.RoutingConfig) + if err != nil { + t.Errorf("error checking ignore last modifier for worker deployment: %v", err) + } + if shouldIgnore != expectShouldIgnore { + t.Errorf("expected ignore last modifier to be %v, got %v", expectShouldIgnore, shouldIgnore) + } + } +} diff --git a/internal/tests/internal/integration_test.go b/internal/tests/internal/integration_test.go index 925d20cc..a5596407 100644 --- a/internal/tests/internal/integration_test.go +++ b/internal/tests/internal/integration_test.go @@ -191,6 +191,56 @@ func TestIntegration(t *testing.T) { testhelpers.NewDeploymentInfo("v1", 0), testhelpers.NewDeploymentInfo("v2", 1), ), + "nth-rollout-blocked-by-modifier": testhelpers.NewTestCase(). + WithInput( + testhelpers.NewTemporalWorkerDeploymentBuilder(). + WithAllAtOnceStrategy(). + WithTargetTemplate("v1"). + WithStatus( + testhelpers.NewStatusBuilder(). + WithTargetVersion("v0", temporaliov1alpha1.VersionStatusCurrent, -1, true, true). + WithCurrentVersion("v0", true, true), + ), + ). + WithExistingDeployments( + testhelpers.NewDeploymentInfo("v0", 1), + ). + WithWaitTime(5 * time.Second). + WithSetupFunction(setUnversionedCurrent). + WithExpectedStatus( + testhelpers.NewStatusBuilder(). + WithTargetVersion("v1", temporaliov1alpha1.VersionStatusInactive, -1, true, false). + WithCurrentVersion("", false, false). + WithDeprecatedVersions(testhelpers.NewDeprecatedVersionInfo("v0", temporaliov1alpha1.VersionStatusDrained, true, false, true)), + ). + WithExpectedDeployments( + testhelpers.NewDeploymentInfo("v0", 1), + ). + WithValidatorFunction(validateIgnoreLastModifierMetadata(false)), + "nth-rollout-unblocked-by-modifier-with-ignore": testhelpers.NewTestCase(). + WithInput( + testhelpers.NewTemporalWorkerDeploymentBuilder(). + WithAllAtOnceStrategy(). + WithTargetTemplate("v1"). + WithStatus( + testhelpers.NewStatusBuilder(). + WithTargetVersion("v0", temporaliov1alpha1.VersionStatusCurrent, -1, true, true). + WithCurrentVersion("v0", true, true), + ), + ). + WithExistingDeployments( + testhelpers.NewDeploymentInfo("v0", 1), + ). + WithSetupFunction(setCurrentAndSetIgnoreModifierMetadata). + WithExpectedStatus( + testhelpers.NewStatusBuilder(). + WithTargetVersion("v1", temporaliov1alpha1.VersionStatusCurrent, -1, true, false). + WithDeprecatedVersions(testhelpers.NewDeprecatedVersionInfo("v0", temporaliov1alpha1.VersionStatusDrained, true, false, true)), + ). + WithExpectedDeployments( + testhelpers.NewDeploymentInfo("v0", 1), + ). + WithValidatorFunction(validateIgnoreLastModifierMetadata(false)), } // TODO(carlydf): Add additional test case where multiple ramping steps are done @@ -266,6 +316,11 @@ func testTemporalWorkerDeploymentCreation( // verify that temporal state matches the preliminary status, to confirm that makePreliminaryStatusTrue worked verifyTemporalStateMatchesStatusEventually(t, ctx, ts, twd, twd.Status, 30*time.Second, 5*time.Second) + // apply post-status setup function + if f := tc.GetSetupFunc(); f != nil { + tc.GetSetupFunc()(t, ctx, tc, env) + } + t.Log("Creating a TemporalWorkerDeployment") if err := k8sClient.Create(ctx, twd); err != nil { t.Fatalf("failed to create TemporalWorkerDeployment: %v", err) @@ -282,4 +337,9 @@ func testTemporalWorkerDeploymentCreation( } verifyTemporalWorkerDeploymentStatusEventually(t, ctx, env, twd.Name, twd.Namespace, expectedStatus, 30*time.Second, 5*time.Second) verifyTemporalStateMatchesStatusEventually(t, ctx, ts, twd, *expectedStatus, 30*time.Second, 5*time.Second) + + // apply post-expected-status validation function + if f := tc.GetValidatorFunc(); f != nil { + tc.GetValidatorFunc()(t, ctx, tc, env) + } } diff --git a/internal/tests/internal/validation_helpers.go b/internal/tests/internal/validation_helpers.go index 6f488bf8..5db24354 100644 --- a/internal/tests/internal/validation_helpers.go +++ b/internal/tests/internal/validation_helpers.go @@ -79,7 +79,7 @@ func setCurrentVersion( eventually(t, 30*time.Second, time.Second, func() error { _, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{ BuildID: buildID, - Identity: controller.ControllerIdentity, + Identity: controller.DefaultControllerIdentity, }) if err != nil { return fmt.Errorf("unable to set build '%s' as current of worker deployment %s: %w", buildID, workerDeploymentName, err) @@ -107,7 +107,7 @@ func setRampingVersion( _, err := deploymentHandler.SetRampingVersion(ctx, sdkclient.WorkerDeploymentSetRampingVersionOptions{ BuildID: buildID, Percentage: rampPercentage, - Identity: controller.ControllerIdentity, + Identity: controller.DefaultControllerIdentity, }) if err != nil { return fmt.Errorf("unable to set build '%s' as ramping of worker deployment %s: %w", buildID, workerDeploymentName, err) @@ -332,7 +332,7 @@ func validateDeprecatedVersion(ctx context.Context, env testhelpers.TestEnv, exp // status if expectedDV.Status != actualDV.Status { return fmt.Errorf("expected status of deprecated build '%s' to be '%v', got '%v'", - expectedDV.BuildID, expectedDV.Status, expectedDV.Status) + expectedDV.BuildID, expectedDV.Status, actualDV.Status) } // deployment if expectedDV.Deployment == nil {