diff --git a/api/v1alpha1/worker_types.go b/api/v1alpha1/worker_types.go index 002011f0..67611969 100644 --- a/api/v1alpha1/worker_types.go +++ b/api/v1alpha1/worker_types.go @@ -196,6 +196,12 @@ type TemporalWorkerDeploymentStatus struct { // +optional LastModifierIdentity string `json:"lastModifierIdentity,omitempty"` + // ManagerIdentity is the identity that has exclusive rights to modify this Worker Deployment's routing config. + // When set, clients whose identity does not match will be blocked from making routing changes. + // Empty by default. Use `temporal worker deployment manager-identity set/unset` to change. + // +optional + ManagerIdentity string `json:"managerIdentity,omitempty"` + // VersionCount is the total number of versions currently known by the worker deployment. // This includes current, target, ramping, and deprecated versions. // +optional diff --git a/docs/ownership.md b/docs/ownership.md index 3a51096c..6fb3cc57 100644 --- a/docs/ownership.md +++ b/docs/ownership.md @@ -2,57 +2,52 @@ ## Problem -If a worker controller is managing a Worker Deployment (ie. the controller is updating the RoutingConfig of the Worker -Deployment), but the user changes something via the CLI (ie. rolls back to the previous current version, or stops the -new target version from ramping because an issue was detected), the controller should not clobber what the human did. +If the worker controller is managing a Worker Deployment (i.e. updating its routing config), but a user makes a manual +change via the CLI, SDK, or gRPC API instead of via the `TemporalWorkerDeployment` CRD interface, the controller should +not clobber the user's change. -At some point, after this human has handled their urgent rollback, they will want to let the controller know that it is -authorized to resume making changes to the Routing Config of the Worker Deployment. +Once the user has finished their manual intervention, they need a way to hand ownership back to the controller. ## Solution -_Once it is available in OSS v1.29, the controller will be able to coordinate with other users via the `ManagerIdentity` -field of a Worker Deployment. This runbook will be updated when that is available and implemented by the controller._ +The controller uses the Temporal server's `ManagerIdentity` field on Worker Deployments to coordinate exclusive +ownership of routing changes. -In the meantime, the controller will watch the `LastModifierIdentity` field of a Worker Deployment to detect whether -another user has made a change. If another user made a change to the Worker Deployment, the controller will not make -any more changes to ensure a human's change is not clobbered. +When `ManagerIdentity` is set on a Worker Deployment, only clients whose identity matches `ManagerIdentity` can make +routing changes (set current version, set ramping version). The controller's identity is visible in the +`managerIdentity` field of the `TemporalWorkerDeployment` status. -Once you are done making your own changes to the Worker Deployment's current and ramping versions, and you are ready -for the Worker Controller to take over, you can update the metadata to indicate that. +### How the controller claims ownership -There is no Temporal server support for Worker Deployment Version-level metadata, so you'll have to set this value on -the Current Version of your Worker Deployment. +The first time the controller plans a routing change for a Worker Deployment (i.e. when `ManagerIdentity` is empty), +it calls `SetManagerIdentity` to claim ownership before applying the change. Subsequent routing changes succeed because +the controller's identity already matches `ManagerIdentity`. -Note: The controller decodes this metadata value as a string. Be sure to set the value to the string "true" (not the boolean true). +### Taking manual control + +To take manual control away from the controller, set `ManagerIdentity` to your own identity: ```bash -temporal worker deployment update-metadata-version \ - --deployment-name $MY_DEPLOYMENT \ - --build-id $CURRENT_VERSION_BUILD_ID \ - --metadata 'temporal.io/ignore-last-modifier="true"' -``` -Alternatively, if your CLI supports JSON input: -```bash -temporal worker deployment update-metadata-version \ - --deployment-name $MY_DEPLOYMENT \ - --build-id $CURRENT_VERSION_BUILD_ID \ - --metadata-json '{"temporal.io/ignore-last-modifier":"true"}' -``` -In the rare case that you have a nil Current Version when you are passing back ownership, you should set it on your Ramping Version -```bash -temporal worker deployment update-metadata-version \ +temporal worker deployment manager-identity set \ --deployment-name $MY_DEPLOYMENT \ - --build-id $RAMPING_VERSION_BUILD_ID \ - --metadata 'temporal.io/ignore-last-modifier="true"' + --self ``` -Or with JSON: + +The `--self` flag sets `ManagerIdentity` to the identity of the caller (auto-generated by the CLI if not explicitly +provided via `--identity`; similarly, the SDK uses its own auto-generated or configured identity). After this, the +controller's routing change attempts will fail and it will retry on a backoff until ownership is returned. + +You can then make routing changes freely (the server enforces `ManagerIdentity` for all clients, not just the +controller). + +### Returning ownership to the controller + +When you are done with your manual changes and want the controller to resume, clear `ManagerIdentity`: + ```bash -temporal worker deployment update-metadata-version \ - --deployment-name $MY_DEPLOYMENT \ - --build-id $RAMPING_VERSION_BUILD_ID \ - --metadata-json '{"temporal.io/ignore-last-modifier":"true"}' +temporal worker deployment manager-identity unset \ + --deployment-name $MY_DEPLOYMENT ``` -In the even rarer case that you have nil Current Version and nil Ramping Version, you'll need to use the CLI or SDK to -set a Current or Ramping Version and then do as instructed above. \ No newline at end of file +On the next reconcile, the controller will detect that `ManagerIdentity` is empty, claim it for itself, and resume +managing routing changes. \ No newline at end of file diff --git a/helm/temporal-worker-controller-crds/templates/temporal.io_temporalworkerdeployments.yaml b/helm/temporal-worker-controller-crds/templates/temporal.io_temporalworkerdeployments.yaml index 614e5187..b8985736 100644 --- a/helm/temporal-worker-controller-crds/templates/temporal.io_temporalworkerdeployments.yaml +++ b/helm/temporal-worker-controller-crds/templates/temporal.io_temporalworkerdeployments.yaml @@ -4098,6 +4098,8 @@ spec: type: array lastModifierIdentity: type: string + managerIdentity: + type: string targetVersion: properties: buildID: diff --git a/internal/controller/clientpool/clientpool.go b/internal/controller/clientpool/clientpool.go index 2d87c401..e6d879d1 100644 --- a/internal/controller/clientpool/clientpool.go +++ b/internal/controller/clientpool/clientpool.go @@ -97,6 +97,7 @@ type NewClientOptions struct { TemporalNamespace string K8sNamespace string Spec v1alpha1.TemporalConnectionSpec + Identity string } func (cp *ClientPool) fetchClientUsingMTLSSecret(secret corev1.Secret, opts NewClientOptions) (*sdkclient.Options, *ClientPoolKey, *ClientAuth, error) { @@ -104,6 +105,7 @@ func (cp *ClientPool) fetchClientUsingMTLSSecret(secret corev1.Secret, opts NewC Logger: cp.logger, HostPort: opts.Spec.HostPort, Namespace: opts.TemporalNamespace, + Identity: opts.Identity, } var pemCert []byte @@ -170,6 +172,7 @@ func (cp *ClientPool) fetchClientUsingAPIKeySecret(secret corev1.Secret, opts Ne Logger: cp.logger, HostPort: opts.Spec.HostPort, Namespace: opts.TemporalNamespace, + Identity: opts.Identity, ConnectionOptions: sdkclient.ConnectionOptions{ TLS: &tls.Config{}, }, @@ -198,6 +201,7 @@ func (cp *ClientPool) fetchClientUsingNoCredentials(opts NewClientOptions) (*sdk Logger: cp.logger, HostPort: opts.Spec.HostPort, Namespace: opts.TemporalNamespace, + Identity: opts.Identity, } key := ClientPoolKey{ diff --git a/internal/controller/execplan.go b/internal/controller/execplan.go index e889682f..19d8cdf4 100644 --- a/internal/controller/execplan.go +++ b/internal/controller/execplan.go @@ -12,7 +12,7 @@ import ( "github.com/go-logr/logr" temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" - "github.com/temporalio/temporal-worker-controller/internal/temporal" + "github.com/temporalio/temporal-worker-controller/internal/planner" enumspb "go.temporal.io/api/enums/v1" sdkclient "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" @@ -150,12 +150,46 @@ func (r *TemporalWorkerDeploymentReconciler) startTestWorkflows(ctx context.Cont return nil } +func (r *TemporalWorkerDeploymentReconciler) shouldClaimManagerIdentity(vcfg *planner.VersionConfig) bool { + return vcfg.ManagerIdentity == "" +} + +func (r *TemporalWorkerDeploymentReconciler) claimManagerIdentity( + ctx context.Context, + l logr.Logger, + workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, + deploymentHandler sdkclient.WorkerDeploymentHandle, + vcfg *planner.VersionConfig, +) error { + resp, err := deploymentHandler.SetManagerIdentity(ctx, sdkclient.WorkerDeploymentSetManagerIdentityOptions{ + Self: true, + ConflictToken: vcfg.ConflictToken, + Identity: getControllerIdentity(), + }) + if err != nil { + l.Error(err, "unable to claim manager identity") + r.Recorder.Eventf(workerDeploy, corev1.EventTypeWarning, ReasonManagerIdentityClaimFailed, + "Failed to claim manager identity: %v", err) + return err + } + l.Info("claimed manager identity", "identity", getControllerIdentity()) + // Use the updated conflict token for the subsequent routing config change. + vcfg.ConflictToken = resp.ConflictToken + return nil +} + func (r *TemporalWorkerDeploymentReconciler) updateVersionConfig(ctx context.Context, l logr.Logger, workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, deploymentHandler sdkclient.WorkerDeploymentHandle, p *plan) error { vcfg := p.UpdateVersionConfig if vcfg == nil { return nil } + if r.shouldClaimManagerIdentity(vcfg) { + if err := r.claimManagerIdentity(ctx, l, workerDeploy, deploymentHandler, vcfg); err != nil { + return fmt.Errorf("unable to claim manager identity: %w", err) + } + } + if vcfg.SetCurrent { l.Info("registering new current version", "buildID", vcfg.BuildID) if _, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{ @@ -226,20 +260,5 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l return err } - for _, buildId := range p.RemoveIgnoreLastModifierBuilds { - if _, err := deploymentHandler.UpdateVersionMetadata(ctx, sdkclient.WorkerDeploymentUpdateVersionMetadataOptions{ - Version: worker.WorkerDeploymentVersion{ - DeploymentName: p.WorkerDeploymentName, - BuildID: buildId, - }, - MetadataUpdate: sdkclient.WorkerDeploymentMetadataUpdate{ - RemoveEntries: []string{temporal.IgnoreLastModifierKey}, - }, - }); err != nil { - l.Error(err, "unable to remove ignore-last-modifier metadata", "buildID", buildId) - return fmt.Errorf("unable to update metadata to remove %s deployment: %w", temporal.IgnoreLastModifierKey, err) - } - } - return nil } diff --git a/internal/controller/genplan.go b/internal/controller/genplan.go index 050158cc..e0c07a7c 100644 --- a/internal/controller/genplan.go +++ b/internal/controller/genplan.go @@ -34,10 +34,6 @@ type plan struct { // Start a workflow startTestWorkflows []startWorkflowConfig - - // Build IDs of versions from which the controller should - // remove IgnoreLastModifierKey from the version metadata - RemoveIgnoreLastModifierBuilds []string } // startWorkflowConfig defines a workflow to be started @@ -80,15 +76,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan( ScaleDeployments: make(map[*corev1.ObjectReference]uint32), } - // Check if we need to force manual strategy due to external modification rolloutStrategy := w.Spec.RolloutStrategy - if w.Status.LastModifierIdentity != getControllerIdentity() && - w.Status.LastModifierIdentity != serverDeleteVersionIdentity && - w.Status.LastModifierIdentity != "" && - !temporalState.IgnoreLastModifier { - l.Info(fmt.Sprintf("Forcing Manual rollout strategy since Worker Deployment was modified by a user with a different identity '%s'; to allow controller to make changes again, set 'temporal.io/ignore-last-modifier=true' in the metadata of your Current or Ramping Version; see ownership runbook at docs/ownership.md for more details.", w.Status.LastModifierIdentity)) - rolloutStrategy.Strategy = temporaliov1alpha1.UpdateManual - } // Resolve gate input if gate is configured var gateInput []byte @@ -153,8 +141,6 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan( // Convert version config plan.UpdateVersionConfig = planResult.VersionConfig - plan.RemoveIgnoreLastModifierBuilds = planResult.RemoveIgnoreLastModifierBuilds - // Convert test workflows for _, wf := range planResult.TestWorkflows { plan.startTestWorkflows = append(plan.startTestWorkflows, startWorkflowConfig{ diff --git a/internal/controller/reconciler_events_test.go b/internal/controller/reconciler_events_test.go index fd316137..849f42a3 100644 --- a/internal/controller/reconciler_events_test.go +++ b/internal/controller/reconciler_events_test.go @@ -684,20 +684,20 @@ func TestUpdateVersionConfig_EmitsEventOnFailure(t *testing.T) { { name: "SetCurrentFailed", handle: &stubWDHandle{setCurrentErr: errors.New("simulated SetCurrentVersion failure")}, - config: &planner.VersionConfig{BuildID: "build-abc", SetCurrent: true}, + config: &planner.VersionConfig{BuildID: "build-abc", SetCurrent: true, ManagerIdentity: "some-manager"}, expectedReason: ReasonVersionPromotionFailed, }, { name: "SetRampingFailed", handle: &stubWDHandle{setRampingErr: errors.New("simulated SetRampingVersion failure")}, - config: &planner.VersionConfig{BuildID: "build-abc", RampPercentage: 25}, + config: &planner.VersionConfig{BuildID: "build-abc", RampPercentage: 25, ManagerIdentity: "some-manager"}, expectedReason: ReasonVersionPromotionFailed, }, { // SetCurrentVersion succeeds; UpdateVersionMetadata fails. name: "MetadataUpdateFailed", handle: &stubWDHandle{updateMetaErr: errors.New("simulated UpdateVersionMetadata failure")}, - config: &planner.VersionConfig{BuildID: "build-abc", SetCurrent: true}, + config: &planner.VersionConfig{BuildID: "build-abc", SetCurrent: true, ManagerIdentity: "some-manager"}, expectedReason: ReasonMetadataUpdateFailed, }, } diff --git a/internal/controller/state_mapper.go b/internal/controller/state_mapper.go index 1b720ab9..746e4415 100644 --- a/internal/controller/state_mapper.go +++ b/internal/controller/state_mapper.go @@ -34,6 +34,7 @@ func (m *stateMapper) mapToStatus(targetBuildID string) *v1alpha1.TemporalWorker } status.LastModifierIdentity = m.temporalState.LastModifierIdentity + status.ManagerIdentity = m.temporalState.ManagerIdentity // Get build IDs directly from temporal state currentBuildID := m.temporalState.CurrentBuildID diff --git a/internal/controller/util.go b/internal/controller/util.go index 8c12fece..5439be5f 100644 --- a/internal/controller/util.go +++ b/internal/controller/util.go @@ -18,15 +18,16 @@ import ( // status API and may change between releases. Do not write alerting or automation // that depends on these strings. const ( - ReasonPlanGenerationFailed = "PlanGenerationFailed" - ReasonPlanExecutionFailed = "PlanExecutionFailed" - ReasonDeploymentCreateFailed = "DeploymentCreateFailed" - ReasonDeploymentDeleteFailed = "DeploymentDeleteFailed" - ReasonDeploymentScaleFailed = "DeploymentScaleFailed" - ReasonDeploymentUpdateFailed = "DeploymentUpdateFailed" - ReasonTestWorkflowStartFailed = "TestWorkflowStartFailed" - ReasonVersionPromotionFailed = "VersionPromotionFailed" - ReasonMetadataUpdateFailed = "MetadataUpdateFailed" + ReasonPlanGenerationFailed = "PlanGenerationFailed" + ReasonPlanExecutionFailed = "PlanExecutionFailed" + ReasonDeploymentCreateFailed = "DeploymentCreateFailed" + ReasonDeploymentDeleteFailed = "DeploymentDeleteFailed" + ReasonDeploymentScaleFailed = "DeploymentScaleFailed" + ReasonDeploymentUpdateFailed = "DeploymentUpdateFailed" + ReasonTestWorkflowStartFailed = "TestWorkflowStartFailed" + ReasonVersionPromotionFailed = "VersionPromotionFailed" + ReasonMetadataUpdateFailed = "MetadataUpdateFailed" + ReasonManagerIdentityClaimFailed = "ManagerIdentityClaimFailed" ) const ( diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index 1935c3c1..e4fa6d5d 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -184,6 +184,7 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req K8sNamespace: workerDeploy.Namespace, TemporalNamespace: workerDeploy.Spec.WorkerOptions.TemporalNamespace, Spec: temporalConnection.Spec, + Identity: getControllerIdentity(), }) if err != nil { l.Error(err, "invalid Temporal auth secret") diff --git a/internal/planner/planner.go b/internal/planner/planner.go index 2f9f0158..4772a31e 100644 --- a/internal/planner/planner.go +++ b/internal/planner/planner.go @@ -27,9 +27,6 @@ type Plan struct { ShouldCreateDeployment bool VersionConfig *VersionConfig TestWorkflows []WorkflowConfig - // Build IDs of versions from which the controller should - // remove IgnoreLastModifierKey from the version metadata - RemoveIgnoreLastModifierBuilds []string } // VersionConfig defines version configuration for Temporal @@ -45,6 +42,11 @@ type VersionConfig struct { SetCurrent bool // Acceptable values [0,100] RampPercentage int32 + + // ManagerIdentity is the current manager identity of the worker deployment in Temporal. + // An empty string indicates the controller should claim the identity before applying + // any routing config changes. + ManagerIdentity string } // WorkflowConfig defines a workflow to be started @@ -100,17 +102,6 @@ func GeneratePlan( // Determine version config changes plan.VersionConfig = getVersionConfigDiff(l, status, temporalState, config, workerDeploymentName) - // Only remove the IgnoreLastModifier metadata after it's been used to make a version config change, which will - // make the controller the LastModifier again - if temporalState != nil && temporalState.IgnoreLastModifier && plan.VersionConfig != nil { - if temporalState.RampingBuildID != "" { - plan.RemoveIgnoreLastModifierBuilds = append(plan.RemoveIgnoreLastModifierBuilds, temporalState.RampingBuildID) - } - if temporalState.CurrentBuildID != "" { - plan.RemoveIgnoreLastModifierBuilds = append(plan.RemoveIgnoreLastModifierBuilds, temporalState.CurrentBuildID) - } - } - // TODO(jlegrone): generate warnings/events on the TemporalWorkerDeployment resource when buildIDs are reachable // but have no corresponding Deployment. @@ -546,9 +537,14 @@ func getVersionConfigDiff( } } + managerIdentity := "" + if temporalState != nil { + managerIdentity = temporalState.ManagerIdentity + } vcfg := &VersionConfig{ - ConflictToken: conflictToken, - BuildID: status.TargetVersion.BuildID, + ConflictToken: conflictToken, + BuildID: status.TargetVersion.BuildID, + ManagerIdentity: managerIdentity, } // If there is no current version and presence of unversioned pollers is not confirmed for all diff --git a/internal/planner/planner_test.go b/internal/planner/planner_test.go index 9873c2b5..37526818 100644 --- a/internal/planner/planner_test.go +++ b/internal/planner/planner_test.go @@ -38,9 +38,10 @@ func TestGeneratePlan(t *testing.T) { expectUpdate int expectWorkflow int expectConfig bool - expectConfigSetCurrent *bool // pointer so we can test nil - expectConfigRampPercent *int32 // pointer so we can test nil, in percentage (0-100) - maxVersionsIneligibleForDeletion *int32 // set by env if non-nil, else default 75 + expectConfigSetCurrent *bool // pointer so we can test nil + expectConfigRampPercent *int32 // pointer so we can test nil, in percentage (0-100) + expectManagerIdentity *string // pointer so nil means "don't assert" + maxVersionsIneligibleForDeletion *int32 // set by env if non-nil, else default 75 }{ { name: "empty state creates new deployment", @@ -390,6 +391,101 @@ func TestGeneratePlan(t *testing.T) { }, expectUpdate: 1, }, + { + // VersionConfig is non-nil because a new healthy target needs to be promoted. + // With empty ManagerIdentity the controller should claim before promoting. + name: "claims manager identity when ManagerIdentity is empty and a version config change is pending", + k8sState: &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{ + "oldbuild": createDeploymentWithDefaultConnectionSpecHash(1), + "newbuild": createDeploymentWithDefaultConnectionSpecHash(1), + }, + DeploymentRefs: map[string]*corev1.ObjectReference{ + "oldbuild": {Name: "test-oldbuild"}, + "newbuild": {Name: "test-newbuild"}, + }, + }, + status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "newbuild", + Status: temporaliov1alpha1.VersionStatusInactive, + Deployment: &corev1.ObjectReference{Name: "test-newbuild"}, + HealthySince: &metav1.Time{ + Time: time.Now().Add(-1 * time.Hour), + }, + }, + }, + CurrentVersion: &temporaliov1alpha1.CurrentWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "oldbuild", + Status: temporaliov1alpha1.VersionStatusCurrent, + Deployment: &corev1.ObjectReference{Name: "test-oldbuild"}, + }, + }, + }, + spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{ + Replicas: func() *int32 { r := int32(1); return &r }(), + }, + state: &temporal.TemporalWorkerState{ + ManagerIdentity: "", // empty → controller should claim + }, + config: &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{ + Strategy: temporaliov1alpha1.UpdateAllAtOnce, + }, + }, + expectConfig: true, + expectConfigSetCurrent: func() *bool { b := true; return &b }(), + expectManagerIdentity: func() *string { s := ""; return &s }(), + }, + { + // Same routing change scenario but ManagerIdentity is already set — no claim. + name: "does not claim manager identity when ManagerIdentity is already set", + k8sState: &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{ + "oldbuild": createDeploymentWithDefaultConnectionSpecHash(1), + "newbuild": createDeploymentWithDefaultConnectionSpecHash(1), + }, + DeploymentRefs: map[string]*corev1.ObjectReference{ + "oldbuild": {Name: "test-oldbuild"}, + "newbuild": {Name: "test-newbuild"}, + }, + }, + status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "newbuild", + Status: temporaliov1alpha1.VersionStatusInactive, + Deployment: &corev1.ObjectReference{Name: "test-newbuild"}, + HealthySince: &metav1.Time{ + Time: time.Now().Add(-1 * time.Hour), + }, + }, + }, + CurrentVersion: &temporaliov1alpha1.CurrentWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "oldbuild", + Status: temporaliov1alpha1.VersionStatusCurrent, + Deployment: &corev1.ObjectReference{Name: "test-oldbuild"}, + }, + }, + }, + spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{ + Replicas: func() *int32 { r := int32(1); return &r }(), + }, + state: &temporal.TemporalWorkerState{ + ManagerIdentity: "some-other-client", + }, + config: &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{ + Strategy: temporaliov1alpha1.UpdateAllAtOnce, + }, + }, + expectConfig: true, + expectConfigSetCurrent: func() *bool { b := true; return &b }(), + expectManagerIdentity: func() *string { s := "some-other-client"; return &s }(), + }, } for _, tc := range testCases { @@ -411,6 +507,10 @@ func TestGeneratePlan(t *testing.T) { assert.Equal(t, tc.expectUpdate, len(plan.UpdateDeployments), "unexpected number of updates") assert.Equal(t, tc.expectWorkflow, len(plan.TestWorkflows), "unexpected number of test workflows") assert.Equal(t, tc.expectConfig, plan.VersionConfig != nil, "unexpected version config presence") + if tc.expectManagerIdentity != nil { + require.NotNil(t, plan.VersionConfig, "expected VersionConfig to be non-nil when asserting ManagerIdentity") + assert.Equal(t, *tc.expectManagerIdentity, plan.VersionConfig.ManagerIdentity, "unexpected ManagerIdentity") + } if tc.expectConfig { assert.NotNil(t, plan.VersionConfig, "expected version config") diff --git a/internal/temporal/worker_deployment.go b/internal/temporal/worker_deployment.go index 8da9b6db..5834a026 100644 --- a/internal/temporal/worker_deployment.go +++ b/internal/temporal/worker_deployment.go @@ -12,22 +12,15 @@ import ( "time" temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" - deploymentpb "go.temporal.io/api/deployment/v1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" taskqueuepb "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" temporalClient "go.temporal.io/sdk/client" - "go.temporal.io/sdk/converter" - sdkworker "go.temporal.io/sdk/worker" appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -const ( - IgnoreLastModifierKey = "temporal.io/ignore-last-modifier" -) - // VersionInfo contains information about a specific version type VersionInfo struct { DeploymentName string @@ -59,7 +52,7 @@ type TemporalWorkerState struct { // Versions indexed by build ID Versions map[string]*VersionInfo LastModifierIdentity string - IgnoreLastModifier bool + ManagerIdentity string } // GetWorkerDeploymentState queries Temporal to get the state of a worker deployment @@ -113,17 +106,9 @@ func GetWorkerDeploymentState( } state.RampPercentage = routingConfig.RampingVersionPercentage state.LastModifierIdentity = workerDeploymentInfo.LastModifierIdentity + state.ManagerIdentity = workerDeploymentInfo.ManagerIdentity state.VersionConflictToken = resp.ConflictToken - // Decide whether to ignore LastModifierIdentity - if state.LastModifierIdentity != controllerIdentity && state.LastModifierIdentity != "" { - sdkRoutingConfig := toSDKRoutingConfig(routingConfig) - state.IgnoreLastModifier, err = DeploymentShouldIgnoreLastModifier(ctx, deploymentHandler, sdkRoutingConfig) - if err != nil { - return nil, err - } - } - // TODO(jlegrone): Re-enable stats once available in versioning v3. // Set ramping since time if applicable @@ -198,33 +183,6 @@ func GetWorkerDeploymentState( return state, nil } -// toSDKRoutingConfig converts a gRPC RoutingConfig to the SDK wrapper type -// needed by DeploymentShouldIgnoreLastModifier. -func toSDKRoutingConfig(rc *deploymentpb.RoutingConfig) temporalClient.WorkerDeploymentRoutingConfig { - sdkRC := temporalClient.WorkerDeploymentRoutingConfig{ - RampingVersionPercentage: rc.RampingVersionPercentage, - } - if rc.CurrentDeploymentVersion != nil { - sdkRC.CurrentVersion = &sdkworker.WorkerDeploymentVersion{ - BuildID: rc.CurrentDeploymentVersion.BuildId, - DeploymentName: rc.CurrentDeploymentVersion.DeploymentName, - } - } - if rc.RampingDeploymentVersion != nil { - sdkRC.RampingVersion = &sdkworker.WorkerDeploymentVersion{ - BuildID: rc.RampingDeploymentVersion.BuildId, - DeploymentName: rc.RampingDeploymentVersion.DeploymentName, - } - } - if rc.RampingVersionChangedTime != nil { - sdkRC.RampingVersionChangedTime = rc.RampingVersionChangedTime.AsTime() - } - if rc.RampingVersionPercentageChangedTime != nil { - sdkRC.RampingVersionPercentageChangedTime = rc.RampingVersionPercentageChangedTime.AsTime() - } - return sdkRC -} - func withBackoff(timeout time.Duration, tick time.Duration, fn func() error) error { deadline := time.Now().Add(timeout) var lastErr error @@ -357,48 +315,6 @@ func HasUnversionedPoller(ctx context.Context, return false, nil } -func DeploymentShouldIgnoreLastModifier( - ctx context.Context, - deploymentHandler temporalClient.WorkerDeploymentHandle, - routingConfig temporalClient.WorkerDeploymentRoutingConfig, -) (shouldIgnore bool, err error) { - if routingConfig.CurrentVersion != nil { - shouldIgnore, err = getShouldIgnoreLastModifier(ctx, deploymentHandler, routingConfig.CurrentVersion.BuildID) - if err != nil { - return false, err - } - } - if !shouldIgnore && // if someone has a non-nil Current Version, but only set the metadata in their Ramping Version, also count that - routingConfig.RampingVersion != nil { - return getShouldIgnoreLastModifier(ctx, deploymentHandler, routingConfig.CurrentVersion.BuildID) - } - return shouldIgnore, nil -} - -func getShouldIgnoreLastModifier( - ctx context.Context, - deploymentHandler temporalClient.WorkerDeploymentHandle, - buildId string, -) (bool, error) { - desc, err := deploymentHandler.DescribeVersion(ctx, temporalClient.WorkerDeploymentDescribeVersionOptions{ - BuildID: buildId, - }) - if err != nil { - return false, fmt.Errorf("unable to describe version: %w", err) - } - for k, v := range desc.Info.Metadata { - if k == IgnoreLastModifierKey { - var s string - err = converter.GetDefaultDataConverter().FromPayload(v, &s) - if err != nil { - return false, fmt.Errorf("unable to decode metadata payload for key %s: %w", IgnoreLastModifierKey, err) - } - return s == "true", nil - } - } - return false, nil -} - func getPollers(ctx context.Context, client temporalClient.Client, taskQueueInfo temporalClient.WorkerDeploymentTaskQueueInfo, diff --git a/internal/temporal/worker_deployment_test.go b/internal/temporal/worker_deployment_test.go index 6bbb1edb..e627cea7 100644 --- a/internal/temporal/worker_deployment_test.go +++ b/internal/temporal/worker_deployment_test.go @@ -5,17 +5,12 @@ package temporal import ( - "context" "testing" "time" "github.com/stretchr/testify/assert" temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" - "github.com/temporalio/temporal-worker-controller/internal/testhelpers" enumspb "go.temporal.io/api/enums/v1" - sdkclient "go.temporal.io/sdk/client" - "go.temporal.io/sdk/worker" - "go.temporal.io/server/temporaltest" ) func TestMapWorkflowStatus(t *testing.T) { @@ -106,51 +101,6 @@ func TestGetTestWorkflowID(t *testing.T) { } } -func TestGetIgnoreLastModifier(t *testing.T) { - ctx := context.Background() - deploymentName := "test-dep" - buildId := "v1" - tq := "test-tq" - - ts := temporaltest.NewServer(temporaltest.WithT(t)) - - // create version - w, stopFunc, err := testhelpers.NewWorker(ctx, deploymentName, buildId, tq, ts.GetFrontendHostPort(), ts.GetDefaultNamespace(), true) - if err != nil { - t.Error(err) - } - if err = w.Start(); err != nil { - t.Errorf("error starting unversioned worker %v", err) - } - defer stopFunc() - - deploymentHandler := ts.GetDefaultClient().WorkerDeploymentClient().GetHandle(deploymentName) - - eventually(t, 5*time.Second, 100*time.Millisecond, func() error { - _, err := deploymentHandler.UpdateVersionMetadata(ctx, sdkclient.WorkerDeploymentUpdateVersionMetadataOptions{ - Version: worker.WorkerDeploymentVersion{ - DeploymentName: deploymentName, - BuildID: buildId, - }, - MetadataUpdate: sdkclient.WorkerDeploymentMetadataUpdate{ - UpsertEntries: map[string]interface{}{ - IgnoreLastModifierKey: "true", - }, - }, - }) - return err - }) - - shouldIgnore, err := getShouldIgnoreLastModifier(ctx, deploymentHandler, buildId) - if err != nil { - t.Error(err) - } - if !shouldIgnore { - t.Error("expected true, got false") - } - -} - func eventually(t *testing.T, timeout, interval time.Duration, check func() error) { deadline := time.Now().Add(timeout) var lastErr error diff --git a/internal/tests/internal/deployment_controller.go b/internal/tests/internal/deployment_controller.go index fa596b3b..05e3c3d8 100644 --- a/internal/tests/internal/deployment_controller.go +++ b/internal/tests/internal/deployment_controller.go @@ -106,12 +106,19 @@ func applyDeployment(t *testing.T, ctx context.Context, k8sClient client.Client, // Set deployment status to `DeploymentAvailable` to simulate a healthy deployment // This is necessary because envtest doesn't actually start pods func setHealthyDeploymentStatus(t *testing.T, ctx context.Context, k8sClient client.Client, deployment appsv1.Deployment) { + // Refetch to get the latest resourceVersion before updating status, since the + // controller may have modified the Deployment (e.g. rolling update) while workers + // were starting up. + var fresh appsv1.Deployment + if err := k8sClient.Get(ctx, types.NamespacedName{Name: deployment.Name, Namespace: deployment.Namespace}, &fresh); err != nil { + t.Fatalf("failed to refetch deployment before status update: %v", err) + } now := metav1.Now() - deployment.Status = appsv1.DeploymentStatus{ - Replicas: *deployment.Spec.Replicas, - UpdatedReplicas: *deployment.Spec.Replicas, - ReadyReplicas: *deployment.Spec.Replicas, - AvailableReplicas: *deployment.Spec.Replicas, + fresh.Status = appsv1.DeploymentStatus{ + Replicas: *fresh.Spec.Replicas, + UpdatedReplicas: *fresh.Spec.Replicas, + ReadyReplicas: *fresh.Spec.Replicas, + AvailableReplicas: *fresh.Spec.Replicas, UnavailableReplicas: 0, Conditions: []appsv1.DeploymentCondition{ { @@ -132,8 +139,8 @@ func setHealthyDeploymentStatus(t *testing.T, ctx context.Context, k8sClient cli }, }, } - t.Logf("started %d healthy workers, updating deployment status", *deployment.Spec.Replicas) - if err := k8sClient.Status().Update(ctx, &deployment); err != nil { + t.Logf("started %d healthy workers, updating deployment status", *fresh.Spec.Replicas) + if err := k8sClient.Status().Update(ctx, &fresh); err != nil { t.Fatalf("failed to update deployment status: %v", err) } } diff --git a/internal/tests/internal/env_helpers.go b/internal/tests/internal/env_helpers.go index eb41a210..02185aee 100644 --- a/internal/tests/internal/env_helpers.go +++ b/internal/tests/internal/env_helpers.go @@ -17,13 +17,11 @@ import ( "github.com/temporalio/temporal-worker-controller/internal/controller" "github.com/temporalio/temporal-worker-controller/internal/controller/clientpool" "github.com/temporalio/temporal-worker-controller/internal/k8s" - "github.com/temporalio/temporal-worker-controller/internal/temporal" "github.com/temporalio/temporal-worker-controller/internal/testhelpers" "go.temporal.io/api/taskqueue/v1" "go.temporal.io/api/workflowservice/v1" temporalClient "go.temporal.io/sdk/client" "go.temporal.io/sdk/log" - "go.temporal.io/sdk/worker" "go.temporal.io/sdk/workflow" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -287,63 +285,68 @@ func getPollers(ctx context.Context, return resp.GetPollers(), nil } -func setUnversionedCurrent(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { +// setManagerIdentityToOther sets the Worker Deployment's ManagerIdentity to "some-other-cli-user" +// using a client with that identity. After this call, the controller (which has a different identity) +// will be blocked from making routing changes to this deployment. +func setManagerIdentityToOther(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { workerDeploymentName := k8s.ComputeWorkerDeploymentName(tc.GetTWD()) - deploymentHandle := env.Ts.GetDefaultClient().WorkerDeploymentClient().GetHandle(workerDeploymentName) - - _, err := deploymentHandle.SetCurrentVersion(ctx, temporalClient.WorkerDeploymentSetCurrentVersionOptions{ - BuildID: "", - IgnoreMissingTaskQueues: true, + c, err := temporalClient.Dial(temporalClient.Options{ + HostPort: env.Ts.GetFrontendHostPort(), + Namespace: env.Ts.GetDefaultNamespace(), + Identity: "some-other-cli-user", }) if err != nil { - t.Errorf("error setting unversioned current version to spook controller into manual mode: %v", err) + t.Fatalf("failed to create temporal client with other identity: %v", err) } - t.Logf("set current version to unversioned with non-controller identity") + defer c.Close() + deploymentHandle := c.WorkerDeploymentClient().GetHandle(workerDeploymentName) + _, err = deploymentHandle.SetManagerIdentity(ctx, temporalClient.WorkerDeploymentSetManagerIdentityOptions{ + Self: true, + }) + if err != nil { + t.Errorf("error setting manager identity to other: %v", err) + } + t.Logf("set manager identity to 'some-other-cli-user'") } -func setCurrentAndSetIgnoreModifierMetadata(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { +// setManagerIdentityBlockThenUnblock sets the Worker Deployment's ManagerIdentity to "some-other-cli-user" +// and then immediately clears it. This simulates a transient block: by the time the controller reconciles, +// the ManagerIdentity is empty and the controller can claim it normally. +func setManagerIdentityBlockThenUnblock(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { workerDeploymentName := k8s.ComputeWorkerDeploymentName(tc.GetTWD()) - deploymentHandle := env.Ts.GetDefaultClient().WorkerDeploymentClient().GetHandle(workerDeploymentName) - - // change current version arbitrarily so we can be the last modifier - resp, err := deploymentHandle.SetCurrentVersion(ctx, temporalClient.WorkerDeploymentSetCurrentVersionOptions{ - BuildID: "", - IgnoreMissingTaskQueues: true, + c, err := temporalClient.Dial(temporalClient.Options{ + HostPort: env.Ts.GetFrontendHostPort(), + Namespace: env.Ts.GetDefaultNamespace(), + Identity: "some-other-cli-user", }) if err != nil { - t.Errorf("error setting unversioned current version to spook controller into manual mode: %v", err) + t.Fatalf("failed to create temporal client with other identity: %v", err) } - t.Logf("set current version to unversioned with non-controller identity") + defer c.Close() - // set it back to what it was so that it's non-nil - _, err = deploymentHandle.SetCurrentVersion(ctx, temporalClient.WorkerDeploymentSetCurrentVersionOptions{ - BuildID: resp.PreviousVersion.BuildID, + deploymentHandle := c.WorkerDeploymentClient().GetHandle(workerDeploymentName) + resp, err := deploymentHandle.SetManagerIdentity(ctx, temporalClient.WorkerDeploymentSetManagerIdentityOptions{ + Self: true, }) if err != nil { - t.Errorf("error restoring current version: %v", err) + t.Errorf("error setting manager identity to other: %v", err) + return } - t.Logf("set current version to build %v with non-controller identity", resp.PreviousVersion.BuildID) + t.Logf("set manager identity to 'some-other-cli-user'") - // set the IgnoreLastModifier metadata - _, err = deploymentHandle.UpdateVersionMetadata(ctx, temporalClient.WorkerDeploymentUpdateVersionMetadataOptions{ - Version: worker.WorkerDeploymentVersion{ - DeploymentName: workerDeploymentName, - BuildID: resp.PreviousVersion.BuildID, - }, - MetadataUpdate: temporalClient.WorkerDeploymentMetadataUpdate{ - UpsertEntries: map[string]interface{}{ - temporal.IgnoreLastModifierKey: "true", - }, - }, + _, err = deploymentHandle.SetManagerIdentity(ctx, temporalClient.WorkerDeploymentSetManagerIdentityOptions{ + ManagerIdentity: "", // clear + ConflictToken: resp.ConflictToken, }) if err != nil { - t.Errorf("error updating version metadata: %v", err) + t.Errorf("error clearing manager identity: %v", err) } - t.Log("set current version's metadata to have \"temporal.io/ignore-last-modifier\"=\"true\"") + t.Logf("cleared manager identity") } -func validateIgnoreLastModifierMetadata(expectShouldIgnore bool) func(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { +// validateManagerIdentity checks that the Worker Deployment's ManagerIdentity matches expected. +func validateManagerIdentity(expected string) func(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { return func(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { workerDeploymentName := k8s.ComputeWorkerDeploymentName(tc.GetTWD()) deploymentHandle := env.Ts.GetDefaultClient().WorkerDeploymentClient().GetHandle(workerDeploymentName) @@ -351,14 +354,10 @@ func validateIgnoreLastModifierMetadata(expectShouldIgnore bool) func(t *testing desc, err := deploymentHandle.Describe(ctx, temporalClient.WorkerDeploymentDescribeOptions{}) if err != nil { t.Errorf("error describing worker deployment: %v", err) + return } - - shouldIgnore, err := temporal.DeploymentShouldIgnoreLastModifier(ctx, deploymentHandle, desc.Info.RoutingConfig) - if err != nil { - t.Errorf("error checking ignore last modifier for worker deployment: %v", err) - } - if shouldIgnore != expectShouldIgnore { - t.Errorf("expected ignore last modifier to be %v, got %v", expectShouldIgnore, shouldIgnore) + if desc.Info.ManagerIdentity != expected { + t.Errorf("expected manager identity to be %q, got %q", expected, desc.Info.ManagerIdentity) } } } diff --git a/internal/tests/internal/integration_test.go b/internal/tests/internal/integration_test.go index 4761cf2c..4ed5ea54 100644 --- a/internal/tests/internal/integration_test.go +++ b/internal/tests/internal/integration_test.go @@ -9,7 +9,6 @@ import ( "github.com/temporalio/temporal-worker-controller/internal/k8s" "github.com/temporalio/temporal-worker-controller/internal/testhelpers" "go.temporal.io/server/common/dynamicconfig" - "go.temporal.io/server/common/worker_versioning" "go.temporal.io/server/temporal" "go.temporal.io/server/temporaltest" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -226,7 +225,7 @@ func TestIntegration(t *testing.T) { ), }, { - name: "manual-rollout-blocked-by-modifier", + name: "manual-rollout-blocked-by-manager-identity", builder: testhelpers.NewTestCase(). WithInput( testhelpers.NewTemporalWorkerDeploymentBuilder(). @@ -242,20 +241,19 @@ func TestIntegration(t *testing.T) { testhelpers.NewDeploymentInfo("v0", 1), ). WithWaitTime(5 * time.Second). - WithSetupFunction(setUnversionedCurrent). + WithSetupFunction(setManagerIdentityToOther). WithExpectedStatus( testhelpers.NewStatusBuilder(). WithTargetVersion("v1", temporaliov1alpha1.VersionStatusInactive, -1, true, false). - WithCurrentVersion(worker_versioning.UnversionedVersionId, false, false). - WithDeprecatedVersions(testhelpers.NewDeprecatedVersionInfo("v0", temporaliov1alpha1.VersionStatusDrained, true, false, true)), + WithCurrentVersion("v0", true, false), ). WithExpectedDeployments( testhelpers.NewDeploymentInfo("v0", 1), ). - WithValidatorFunction(validateIgnoreLastModifierMetadata(false)), + WithValidatorFunction(validateManagerIdentity("some-other-cli-user")), }, { - name: "manual-rollout-unblocked-by-modifier-with-ignore", + name: "manual-rollout-unblocked-after-manager-identity-cleared", builder: testhelpers.NewTestCase(). WithInput( testhelpers.NewTemporalWorkerDeploymentBuilder(). @@ -270,16 +268,15 @@ func TestIntegration(t *testing.T) { WithExistingDeployments( testhelpers.NewDeploymentInfo("v0", 1), ). - WithSetupFunction(setCurrentAndSetIgnoreModifierMetadata). + WithSetupFunction(setManagerIdentityBlockThenUnblock). WithExpectedStatus( testhelpers.NewStatusBuilder(). - WithTargetVersion("v1", temporaliov1alpha1.VersionStatusInactive, -1, true, false). // manual strategy, so controller should not promote v1 to current despite being unblocked + WithTargetVersion("v1", temporaliov1alpha1.VersionStatusInactive, -1, true, false). // manual strategy, so controller should not promote v1 to current WithCurrentVersion("v0", true, false), ). WithExpectedDeployments( testhelpers.NewDeploymentInfo("v0", 1), - ). - WithValidatorFunction(validateIgnoreLastModifierMetadata(true)), // Since this is a manual strategy, the current version at the end of the test is v0 which has the ignore last modifier set to true + ), }, } @@ -426,7 +423,7 @@ func TestIntegration(t *testing.T) { ), }, { - name: "all-at-once-rollout-blocked-by-modifier", + name: "all-at-once-rollout-blocked-by-manager-identity", builder: testhelpers.NewTestCase(). WithInput( testhelpers.NewTemporalWorkerDeploymentBuilder(). @@ -442,20 +439,19 @@ func TestIntegration(t *testing.T) { testhelpers.NewDeploymentInfo("v0", 1), ). WithWaitTime(5 * time.Second). - WithSetupFunction(setUnversionedCurrent). + WithSetupFunction(setManagerIdentityToOther). WithExpectedStatus( testhelpers.NewStatusBuilder(). WithTargetVersion("v1", temporaliov1alpha1.VersionStatusInactive, -1, true, false). - WithCurrentVersion(worker_versioning.UnversionedVersionId, false, false). - WithDeprecatedVersions(testhelpers.NewDeprecatedVersionInfo("v0", temporaliov1alpha1.VersionStatusDrained, true, false, true)), + WithCurrentVersion("v0", true, false), ). WithExpectedDeployments( testhelpers.NewDeploymentInfo("v0", 1), ). - WithValidatorFunction(validateIgnoreLastModifierMetadata(false)), + WithValidatorFunction(validateManagerIdentity("some-other-cli-user")), }, { - name: "all-at-once-unblocked-by-modifier-with-ignore", + name: "all-at-once-unblocked-after-manager-identity-cleared", builder: testhelpers.NewTestCase(). WithInput( testhelpers.NewTemporalWorkerDeploymentBuilder(). @@ -470,7 +466,7 @@ func TestIntegration(t *testing.T) { WithExistingDeployments( testhelpers.NewDeploymentInfo("v0", 1), ). - WithSetupFunction(setCurrentAndSetIgnoreModifierMetadata). + WithSetupFunction(setManagerIdentityBlockThenUnblock). WithExpectedStatus( testhelpers.NewStatusBuilder(). WithTargetVersion("v1", temporaliov1alpha1.VersionStatusCurrent, -1, true, false). @@ -479,8 +475,7 @@ func TestIntegration(t *testing.T) { ). WithExpectedDeployments( testhelpers.NewDeploymentInfo("v0", 1), - ). - WithValidatorFunction(validateIgnoreLastModifierMetadata(false)), + ), }, } @@ -653,7 +648,7 @@ func TestIntegration(t *testing.T) { ), }, { - name: "progressive-rollout-blocked-by-modifier", + name: "progressive-rollout-blocked-by-manager-identity", builder: testhelpers.NewTestCase(). WithInput( testhelpers.NewTemporalWorkerDeploymentBuilder(). @@ -669,20 +664,19 @@ func TestIntegration(t *testing.T) { testhelpers.NewDeploymentInfo("v0", 1), ). WithWaitTime(5 * time.Second). - WithSetupFunction(setUnversionedCurrent). + WithSetupFunction(setManagerIdentityToOther). WithExpectedStatus( testhelpers.NewStatusBuilder(). WithTargetVersion("v1", temporaliov1alpha1.VersionStatusInactive, -1, true, false). - WithCurrentVersion(worker_versioning.UnversionedVersionId, false, false). - WithDeprecatedVersions(testhelpers.NewDeprecatedVersionInfo("v0", temporaliov1alpha1.VersionStatusDrained, true, false, true)), + WithCurrentVersion("v0", true, false), ). WithExpectedDeployments( testhelpers.NewDeploymentInfo("v0", 1), ). - WithValidatorFunction(validateIgnoreLastModifierMetadata(false)), + WithValidatorFunction(validateManagerIdentity("some-other-cli-user")), }, { - name: "progressive-rollout-unblocked-by-modifier-with-ignore", + name: "progressive-rollout-unblocked-after-manager-identity-cleared", builder: testhelpers.NewTestCase(). WithInput( testhelpers.NewTemporalWorkerDeploymentBuilder(). @@ -697,7 +691,7 @@ func TestIntegration(t *testing.T) { WithExistingDeployments( testhelpers.NewDeploymentInfo("v0", 1), ). - WithSetupFunction(setCurrentAndSetIgnoreModifierMetadata). + WithSetupFunction(setManagerIdentityBlockThenUnblock). WithExpectedStatus( testhelpers.NewStatusBuilder(). WithTargetVersion("v1", temporaliov1alpha1.VersionStatusRamping, 5, true, false). @@ -705,8 +699,7 @@ func TestIntegration(t *testing.T) { ). WithExpectedDeployments( testhelpers.NewDeploymentInfo("v0", 1), - ). - WithValidatorFunction(validateIgnoreLastModifierMetadata(false)), + ), }, }