diff --git a/api/v1alpha1/temporalworker_webhook.go b/api/v1alpha1/temporalworker_webhook.go index 2be63781..92e9ea1b 100644 --- a/api/v1alpha1/temporalworker_webhook.go +++ b/api/v1alpha1/temporalworker_webhook.go @@ -57,11 +57,6 @@ func (s *TemporalWorkerDeploymentSpec) Default(ctx context.Context) error { s.SunsetStrategy.DeleteDelay = &v1.Duration{Duration: defaults.DeleteDelay} } - if s.MaxVersions == nil { - maxVersions := int32(defaults.MaxVersions) - s.MaxVersions = &maxVersions - } - return nil } diff --git a/api/v1alpha1/temporalworker_webhook_test.go b/api/v1alpha1/temporalworker_webhook_test.go index 0d932039..bcd3b975 100644 --- a/api/v1alpha1/temporalworker_webhook_test.go +++ b/api/v1alpha1/temporalworker_webhook_test.go @@ -159,24 +159,6 @@ func TestTemporalWorkerDeployment_Default(t *testing.T) { obj runtime.Object expected func(t *testing.T, obj *temporaliov1alpha1.TemporalWorkerDeployment) }{ - "sets default maxVersions": { - obj: testhelpers.MakeTWDWithName("default-max-versions", ""), - expected: func(t *testing.T, obj *temporaliov1alpha1.TemporalWorkerDeployment) { - require.NotNil(t, obj.Spec.MaxVersions) - assert.Equal(t, int32(75), *obj.Spec.MaxVersions) - }, - }, - "preserves existing maxVersions": { - obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("preserve-max-versions", ""), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment { - maxVersions := int32(100) - obj.Spec.MaxVersions = &maxVersions - return obj - }), - expected: func(t *testing.T, obj *temporaliov1alpha1.TemporalWorkerDeployment) { - require.NotNil(t, obj.Spec.MaxVersions) - assert.Equal(t, int32(100), *obj.Spec.MaxVersions) - }, - }, "sets default sunset strategy delays": { obj: testhelpers.MakeTWDWithName("default-sunset-delays", ""), expected: func(t *testing.T, obj *temporaliov1alpha1.TemporalWorkerDeployment) { diff --git a/api/v1alpha1/worker_types.go b/api/v1alpha1/worker_types.go index 1f0b6c34..0b155f6b 100644 --- a/api/v1alpha1/worker_types.go +++ b/api/v1alpha1/worker_types.go @@ -60,18 +60,6 @@ type TemporalWorkerDeploymentSpec struct { // TODO(jlegrone): add godoc WorkerOptions WorkerOptions `json:"workerOptions"` - - // MaxVersions defines the maximum number of worker deployment versions allowed. - // This helps prevent hitting Temporal's default limit of 100 versions per deployment. - // Defaults to 75. Users can override this by explicitly setting a higher value in - // the CRD, but should exercise caution: once the server's version limit is reached, - // Temporal attempts to delete an eligible version. If no version is eligible for deletion, - // new deployments get blocked which prevents the controller from making progress. - // This limit can be adjusted server-side by setting `matching.maxVersionsInDeployment` - // in dynamicconfig. - // +optional - // +kubebuilder:validation:Minimum=1 - MaxVersions *int32 `json:"maxVersions,omitempty"` } // VersionStatus indicates the status of a version. @@ -240,6 +228,12 @@ type DeprecatedWorkerDeploymentVersion struct { // Only set when Status is VersionStatusDrained. // +optional DrainedSince *metav1.Time `json:"drainedSince"` + + // A Version is eligible for deletion if it is drained and has no pollers on any task queue. + // After pollers stop polling, the server will still consider them present until `matching.PollerHistoryTTL` + // has passed. + // +optional + EligibleForDeletion bool `json:"eligibleForDeletion,omitempty"` } // DefaultVersionUpdateStrategy describes how to cut over new workflow executions @@ -248,10 +242,20 @@ type DeprecatedWorkerDeploymentVersion struct { type DefaultVersionUpdateStrategy string const ( + // UpdateManual scales worker resources up or down, but does not update the current or ramping worker deployment version. UpdateManual DefaultVersionUpdateStrategy = "Manual" + // UpdateAllAtOnce starts 100% of new workflow executions on the new worker deployment version as soon as it's healthy. UpdateAllAtOnce DefaultVersionUpdateStrategy = "AllAtOnce" + // UpdateProgressive ramps up the percentage of new workflow executions targeting the new worker deployment version over time. + // + // Note: If the Current Version of a Worker Deployment is nil and the controller cannot confirm that all Task Queues + // in the Target Version have at least one unversioned poller, the controller will immediately set the new worker + // deployment version to be Current and ignore the Progressive rollout steps. + // Sending a percentage of traffic to a "nil" version means that traffic will be sent to unversioned workers. If + // there are no unversioned workers, those tasks will get stuck. This behavior ensures that all traffic on the task + // queues in this worker deployment can be handled by an active poller. UpdateProgressive DefaultVersionUpdateStrategy = "Progressive" ) @@ -263,15 +267,9 @@ type GateWorkflowConfig struct { type RolloutStrategy struct { // Specifies how to treat concurrent executions of a Job. // Valid values are: - // - "Manual": scale worker resources up or down, but do not update the current or ramping worker deployment version; - // - "AllAtOnce": start 100% of new workflow executions on the new worker deployment version as soon as it's healthy; - // - "Progressive": ramp up the percentage of new workflow executions targeting the new worker deployment version over time; - // - // Note: If the Current Version of a Worker Deployment is nil, the controller will ignore any Progressive Rollout - // Steps and immediately set the new worker deployment version to be Current. - // Sending a percentage of traffic to a "nil" version means that traffic will be sent to unversioned workers. If - // there are no unversioned workers, those tasks will get stuck. This behavior ensures that all traffic on the task - // queues in this worker deployment can be handled by an active poller. + // - "Manual" + // - "AllAtOnce" + // - "Progressive" Strategy DefaultVersionUpdateStrategy `json:"strategy"` // Gate specifies a workflow type that must run once to completion on the new worker deployment version before diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 911e9524..8f4af3d9 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -421,11 +421,6 @@ func (in *TemporalWorkerDeploymentSpec) DeepCopyInto(out *TemporalWorkerDeployme in.RolloutStrategy.DeepCopyInto(&out.RolloutStrategy) in.SunsetStrategy.DeepCopyInto(&out.SunsetStrategy) out.WorkerOptions = in.WorkerOptions - if in.MaxVersions != nil { - in, out := &in.MaxVersions, &out.MaxVersions - *out = new(int32) - **out = **in - } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TemporalWorkerDeploymentSpec. diff --git a/cmd/main.go b/cmd/main.go index 092a035b..646b9f51 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -91,6 +91,7 @@ func main() { }))), mgr.GetClient(), ), + MaxDeploymentVersionsIneligibleForDeletion: controller.GetControllerMaxDeploymentVersionsIneligibleForDeletion(), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "TemporalWorkerDeployment") os.Exit(1) diff --git a/helm/temporal-worker-controller/templates/crds/temporal.io_temporalworkerdeployments.yaml b/helm/temporal-worker-controller/templates/crds/temporal.io_temporalworkerdeployments.yaml index cbf6543f..b49ad55f 100644 --- a/helm/temporal-worker-controller/templates/crds/temporal.io_temporalworkerdeployments.yaml +++ b/helm/temporal-worker-controller/templates/crds/temporal.io_temporalworkerdeployments.yaml @@ -46,10 +46,6 @@ spec: type: object spec: properties: - maxVersions: - format: int32 - minimum: 1 - type: integer minReadySeconds: format: int32 type: integer @@ -3876,6 +3872,8 @@ spec: drainedSince: format: date-time type: string + eligibleForDeletion: + type: boolean healthySince: format: date-time type: string diff --git a/internal/controller/execplan.go b/internal/controller/execplan.go index eaf15a5c..f28f4637 100644 --- a/internal/controller/execplan.go +++ b/internal/controller/execplan.go @@ -119,8 +119,8 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l }, MetadataUpdate: sdkclient.WorkerDeploymentMetadataUpdate{ UpsertEntries: map[string]interface{}{ - controllerIdentityKey: getControllerIdentity(), - controllerVersionKey: getControllerVersion(), + controllerIdentityMetadataKey: getControllerIdentity(), + controllerVersionMetadataKey: getControllerVersion(), }, }, }); err != nil { // would be cool to do this atomically with the update diff --git a/internal/controller/genplan.go b/internal/controller/genplan.go index ec128cca..443bd1ce 100644 --- a/internal/controller/genplan.go +++ b/internal/controller/genplan.go @@ -100,6 +100,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan( connection, plannerConfig, workerDeploymentName, + r.MaxDeploymentVersionsIneligibleForDeletion, ) if err != nil { return nil, fmt.Errorf("error generating plan: %w", err) diff --git a/internal/controller/genstatus.go b/internal/controller/genstatus.go index 680fa329..7414aab7 100644 --- a/internal/controller/genstatus.go +++ b/internal/controller/genstatus.go @@ -6,7 +6,6 @@ package controller import ( "context" - "fmt" "github.com/go-logr/logr" temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" @@ -23,22 +22,11 @@ func (r *TemporalWorkerDeploymentReconciler) generateStatus( req ctrl.Request, workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment, temporalState *temporal.TemporalWorkerState, + k8sState *k8s.DeploymentState, ) (*temporaliov1alpha1.TemporalWorkerDeploymentStatus, error) { workerDeploymentName := k8s.ComputeWorkerDeploymentName(workerDeploy) targetBuildID := k8s.ComputeBuildID(workerDeploy) - // Fetch Kubernetes deployment state - k8sState, err := k8s.GetDeploymentState( - ctx, - r.Client, - req.Namespace, - req.Name, - workerDeploymentName, - ) - if err != nil { - return nil, fmt.Errorf("unable to get Kubernetes deployment state: %w", err) - } - // Fetch test workflow status for the desired version if targetBuildID != temporalState.CurrentBuildID { testWorkflows, err := temporal.GetTestWorkflowStatus( diff --git a/internal/controller/state_mapper.go b/internal/controller/state_mapper.go index 1800aada..f2c72636 100644 --- a/internal/controller/state_mapper.go +++ b/internal/controller/state_mapper.go @@ -160,11 +160,17 @@ func (m *stateMapper) mapDeprecatedWorkerDeploymentVersionByBuildID(buildID stri return nil } + eligibleForDeletion := false + if vInfo, exists := m.temporalState.Versions[buildID]; exists { + eligibleForDeletion = vInfo.Status == v1alpha1.VersionStatusDrained && vInfo.NoTaskQueuesHaveVersionedPoller + } + version := &v1alpha1.DeprecatedWorkerDeploymentVersion{ BaseWorkerDeploymentVersion: v1alpha1.BaseWorkerDeploymentVersion{ BuildID: buildID, Status: v1alpha1.VersionStatusNotRegistered, }, + EligibleForDeletion: eligibleForDeletion, } // Set deployment reference if it exists diff --git a/internal/controller/util.go b/internal/controller/util.go index 1a2e2630..9caa3b92 100644 --- a/internal/controller/util.go +++ b/internal/controller/util.go @@ -6,12 +6,18 @@ package controller import ( "os" + "strconv" + + "github.com/temporalio/temporal-worker-controller/internal/defaults" ) const ( - controllerIdentityKey = "temporal.io/controller" - controllerVersionKey = "temporal.io/controller-version" - DefaultControllerIdentity = "temporal-worker-controller" + controllerIdentityMetadataKey = "temporal.io/controller" + controllerVersionMetadataKey = "temporal.io/controller-version" + + controllerVersionEnvKey = "CONTROLLER_VERSION" + controllerIdentityEnvKey = "CONTROLLER_IDENTITY" + ControllerMaxDeploymentVersionsIneligibleForDeletionEnvKey = "CONTROLLER_MAX_DEPLOYMENT_VERSIONS_INELIGIBLE_FOR_DELETION" ) // Version is set by goreleaser via ldflags at build time @@ -24,7 +30,7 @@ func getControllerVersion() string { return Version } // Fall back to environment variable (set by Helm from image.tag) - if version := os.Getenv("CONTROLLER_VERSION"); version != "" { + if version := os.Getenv(controllerVersionEnvKey); version != "" { return version } return "unknown" @@ -32,8 +38,18 @@ func getControllerVersion() string { // getControllerIdentity returns the identity from environment variable (set by Helm) func getControllerIdentity() string { - if identity := os.Getenv("CONTROLLER_IDENTITY"); identity != "" { + if identity := os.Getenv(controllerIdentityEnvKey); identity != "" { return identity } - return DefaultControllerIdentity + return defaults.ControllerIdentity +} + +func GetControllerMaxDeploymentVersionsIneligibleForDeletion() int32 { + if maxStr := os.Getenv(ControllerMaxDeploymentVersionsIneligibleForDeletionEnvKey); maxStr != "" { + i, err := strconv.Atoi(maxStr) + if err == nil { + return int32(i) + } + } + return defaults.MaxVersionsIneligibleForDeletion } diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index 51e38cc2..838910cb 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -44,6 +44,19 @@ type TemporalWorkerDeploymentReconciler struct { // Disables panic recovery if true DisableRecoverPanic bool + + // When a Worker Deployment has the maximum number of versions (100 per Worker Deployment by default), + // it will delete the oldest eligible version when a worker with the 101st version arrives. + // If no versions are eligible for deletion, that worker's poll will fail, which is dangerous. + // To protect against this, when a Worker Deployment has too many versions ineligible for deletion, + // the controller will stop deploying new workers in order to give the user the opportunity to adjust + // their sunset policy to avoid this situation before it actually blocks deployment of a new worker + // version on the server side. + // + // MaxDeploymentVersionsIneligibleForDeletion is currently defaulted to 75, which is safe for the default + // server value of `matching.maxVersionsInDeployment=100`. + // Users who reduce `matching.maxVersionsInDeployment` in their dynamicconfig should also reduce this value. + MaxDeploymentVersionsIneligibleForDeletion int32 } //+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments,verbs=get;list;watch;create;update;patch;delete @@ -130,13 +143,30 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req temporalClient = c } - // Fetch Temporal worker deployment state workerDeploymentName := k8s.ComputeWorkerDeploymentName(&workerDeploy) + targetBuildID := k8s.ComputeBuildID(&workerDeploy) + + // Fetch Kubernetes deployment state + k8sState, err := k8s.GetDeploymentState( + ctx, + r.Client, + req.Namespace, + req.Name, + workerDeploymentName, + ) + if err != nil { + return ctrl.Result{}, fmt.Errorf("unable to get Kubernetes deployment state: %w", err) + } + + // Fetch Temporal worker deployment state temporalState, err := temporal.GetWorkerDeploymentState( ctx, temporalClient, workerDeploymentName, workerDeploy.Spec.WorkerOptions.TemporalNamespace, + k8sState.Deployments, + targetBuildID, + workerDeploy.Spec.RolloutStrategy.Strategy, getControllerIdentity(), ) if err != nil { @@ -144,7 +174,7 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req } // Compute a new status from k8s and temporal state - status, err := r.generateStatus(ctx, l, temporalClient, req, &workerDeploy, temporalState) + status, err := r.generateStatus(ctx, l, temporalClient, req, &workerDeploy, temporalState, k8sState) if err != nil { return ctrl.Result{}, err } diff --git a/internal/defaults/defaults.go b/internal/defaults/defaults.go index 87b78e6a..0eec90dd 100644 --- a/internal/defaults/defaults.go +++ b/internal/defaults/defaults.go @@ -7,8 +7,9 @@ import "time" // Default values for TemporalWorkerDeploymentSpec fields const ( - ScaledownDelay = 1 * time.Hour - DeleteDelay = 24 * time.Hour - ServerMaxVersions = 100 - MaxVersions = int32(ServerMaxVersions * 0.75) + ScaledownDelay = 1 * time.Hour + DeleteDelay = 24 * time.Hour + ServerMaxVersions = 100 + MaxVersionsIneligibleForDeletion = int32(ServerMaxVersions * 0.75) + ControllerIdentity = "temporal-worker-controller" ) diff --git a/internal/planner/planner.go b/internal/planner/planner.go index 8224c37b..2808b3a0 100644 --- a/internal/planner/planner.go +++ b/internal/planner/planner.go @@ -69,6 +69,7 @@ func GeneratePlan( connection temporaliov1alpha1.TemporalConnectionSpec, config *Config, workerDeploymentName string, + maxVersionsIneligibleForDeletion int32, ) (*Plan, error) { plan := &Plan{ ScaleDeployments: make(map[*corev1.ObjectReference]uint32), @@ -77,7 +78,7 @@ func GeneratePlan( // Add delete/scale operations based on version status plan.DeleteDeployments = getDeleteDeployments(k8sState, status, spec) plan.ScaleDeployments = getScaleDeployments(k8sState, status, spec) - plan.ShouldCreateDeployment = shouldCreateDeployment(status, spec) + plan.ShouldCreateDeployment = shouldCreateDeployment(status, maxVersionsIneligibleForDeletion) plan.UpdateDeployments = getUpdateDeployments(k8sState, status, connection) // Determine if we need to start any test workflows @@ -297,20 +298,22 @@ func getScaleDeployments( // shouldCreateDeployment determines if a new deployment needs to be created func shouldCreateDeployment( status *temporaliov1alpha1.TemporalWorkerDeploymentStatus, - spec *temporaliov1alpha1.TemporalWorkerDeploymentSpec, + maxVersionsIneligibleForDeletion int32, ) bool { // Check if target version already has a deployment if status.TargetVersion.Deployment != nil { return false } - // Check if we're at the version limit - maxVersions := int32(75) // Default from defaults.MaxVersions - if spec.MaxVersions != nil { - maxVersions = *spec.MaxVersions + versionCountIneligibleForDeletion := int32(0) + + for _, v := range status.DeprecatedVersions { + if !v.EligibleForDeletion { + versionCountIneligibleForDeletion++ + } } - if status.VersionCount >= maxVersions { + if versionCountIneligibleForDeletion >= maxVersionsIneligibleForDeletion { return false } @@ -398,14 +401,17 @@ func getVersionConfigDiff( BuildID: status.TargetVersion.BuildID, } - // If there is no current version, set the target version as the current version - if status.CurrentVersion == nil { + // If there is no current version and presence of unversioned pollers is not confirmed for all + // target version task queues, set the target version as the current version right away. + if status.CurrentVersion == nil && + status.TargetVersion.Status == temporaliov1alpha1.VersionStatusInactive && + !temporalState.Versions[status.TargetVersion.BuildID].AllTaskQueuesHaveUnversionedPoller { vcfg.SetCurrent = true return vcfg } // If the current version is the target version - if status.CurrentVersion.BuildID == status.TargetVersion.BuildID { + if status.CurrentVersion != nil && status.CurrentVersion.BuildID == status.TargetVersion.BuildID { // Reset ramp if needed, this would happen if a ramp has been rolled back before completing if temporalState.RampingBuildID != "" { vcfg.BuildID = "" diff --git a/internal/planner/planner_test.go b/internal/planner/planner_test.go index 11d9509b..10b4cf95 100644 --- a/internal/planner/planner_test.go +++ b/internal/planner/planner_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" + "github.com/temporalio/temporal-worker-controller/internal/defaults" "github.com/temporalio/temporal-worker-controller/internal/k8s" "github.com/temporalio/temporal-worker-controller/internal/temporal" "github.com/temporalio/temporal-worker-controller/internal/testhelpers/testlogr" @@ -24,20 +25,21 @@ import ( func TestGeneratePlan(t *testing.T) { testCases := []struct { - name string - k8sState *k8s.DeploymentState - status *temporaliov1alpha1.TemporalWorkerDeploymentStatus - spec *temporaliov1alpha1.TemporalWorkerDeploymentSpec - state *temporal.TemporalWorkerState - config *Config - expectDelete int - expectCreate bool - expectScale int - expectUpdate int - expectWorkflow int - expectConfig bool - expectConfigSetCurrent *bool // pointer so we can test nil - expectConfigRampPercent *float32 // pointer so we can test nil + name string + k8sState *k8s.DeploymentState + status *temporaliov1alpha1.TemporalWorkerDeploymentStatus + spec *temporaliov1alpha1.TemporalWorkerDeploymentSpec + state *temporal.TemporalWorkerState + config *Config + expectDelete int + expectCreate bool + expectScale int + expectUpdate int + expectWorkflow int + expectConfig bool + expectConfigSetCurrent *bool // pointer so we can test nil + expectConfigRampPercent *float32 // pointer so we can test nil + maxVersionsIneligibleForDeletion *int32 // set by env if non-nil, else default 75 }{ { name: "empty state creates new deployment", @@ -211,7 +213,41 @@ func TestGeneratePlan(t *testing.T) { expectConfigRampPercent: func() *float32 { f := float32(0); return &f }(), // Should reset ramp to 0 }, { - name: "should not create deployment when version limit is reached", + name: "should not create deployment when version limit (ineligible for deletion) is reached", + k8sState: &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{}, + DeploymentsByTime: []*appsv1.Deployment{}, + DeploymentRefs: map[string]*corev1.ObjectReference{}, + }, + status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "new", + Status: temporaliov1alpha1.VersionStatusNotRegistered, + Deployment: nil, + }, + }, + DeprecatedVersions: func() []*temporaliov1alpha1.DeprecatedWorkerDeploymentVersion { + // default is NOT eligible for deletion, so 5 empty Deprecated Versions should block rollout + r := make([]*temporaliov1alpha1.DeprecatedWorkerDeploymentVersion, 5) + for i := range r { + r[i] = &temporaliov1alpha1.DeprecatedWorkerDeploymentVersion{} + } + return r + }(), + }, + spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{ + Replicas: func() *int32 { r := int32(1); return &r }(), + }, + state: &temporal.TemporalWorkerState{}, + config: &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{}, + }, + expectCreate: false, + maxVersionsIneligibleForDeletion: func() *int32 { i := int32(5); return &i }(), + }, + { + name: "should create deployment when version limit (ineligible for deletion) is not reached", k8sState: &k8s.DeploymentState{ Deployments: map[string]*appsv1.Deployment{}, DeploymentsByTime: []*appsv1.Deployment{}, @@ -226,16 +262,23 @@ func TestGeneratePlan(t *testing.T) { Deployment: nil, }, }, + DeprecatedVersions: func() []*temporaliov1alpha1.DeprecatedWorkerDeploymentVersion { + r := make([]*temporaliov1alpha1.DeprecatedWorkerDeploymentVersion, 4) + for i := range r { + r[i] = &temporaliov1alpha1.DeprecatedWorkerDeploymentVersion{} + } + return r + }(), }, spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{ - MaxVersions: func() *int32 { i := int32(5); return &i }(), - Replicas: func() *int32 { r := int32(1); return &r }(), + Replicas: func() *int32 { r := int32(1); return &r }(), }, state: &temporal.TemporalWorkerState{}, config: &Config{ RolloutStrategy: temporaliov1alpha1.RolloutStrategy{}, }, - expectCreate: false, + expectCreate: true, + maxVersionsIneligibleForDeletion: func() *int32 { i := int32(5); return &i }(), }, { name: "update deployment when target version, with an existing deployment, has an expired connection spec hash", @@ -353,7 +396,12 @@ func TestGeneratePlan(t *testing.T) { if tc.status == nil { tc.status = &temporaliov1alpha1.TemporalWorkerDeploymentStatus{} } - plan, err := GeneratePlan(logr.Discard(), tc.k8sState, tc.status, tc.spec, tc.state, createDefaultConnectionSpec(), tc.config, "test/namespace") + maxV := defaults.MaxVersionsIneligibleForDeletion + if tc.maxVersionsIneligibleForDeletion != nil { + maxV = *tc.maxVersionsIneligibleForDeletion + } + + plan, err := GeneratePlan(logr.Discard(), tc.k8sState, tc.status, tc.spec, tc.state, createDefaultConnectionSpec(), tc.config, "test/namespace", maxV) require.NoError(t, err) assert.Equal(t, tc.expectDelete, len(plan.DeleteDeployments), "unexpected number of deletions") @@ -722,10 +770,11 @@ func TestGetScaleDeployments(t *testing.T) { func TestShouldCreateDeployment(t *testing.T) { testCases := []struct { - name string - status *temporaliov1alpha1.TemporalWorkerDeploymentStatus - spec *temporaliov1alpha1.TemporalWorkerDeploymentSpec - expectCreates bool + name string + status *temporaliov1alpha1.TemporalWorkerDeploymentStatus + spec *temporaliov1alpha1.TemporalWorkerDeploymentSpec + expectCreates bool + maxVersionsIneligibleForDeletion *int32 // set by env if non-nil, else default 75 }{ { name: "existing deployment should not create", @@ -760,9 +809,8 @@ func TestShouldCreateDeployment(t *testing.T) { expectCreates: true, }, { - name: "should not create when version limit is reached (default limit)", + name: "should not create when version limit ineligible for deletion is reached (default limit)", status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ - VersionCount: 75, // Default limit is 75 TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ BuildID: "new", @@ -770,17 +818,24 @@ func TestShouldCreateDeployment(t *testing.T) { Deployment: nil, }, }, + DeprecatedVersions: func() []*temporaliov1alpha1.DeprecatedWorkerDeploymentVersion { + // default is NOT eligible for deletion, so 75 empty Deprecated Versions should block rollout + r := make([]*temporaliov1alpha1.DeprecatedWorkerDeploymentVersion, 75) + for i := range r { + r[i] = &temporaliov1alpha1.DeprecatedWorkerDeploymentVersion{} + } + return r + }(), }, spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{ - // MaxVersions is nil, so uses default of 75 Replicas: func() *int32 { r := int32(1); return &r }(), }, - expectCreates: false, + expectCreates: false, + maxVersionsIneligibleForDeletion: nil, // MaxVersions is nil, so uses default of 75 }, { - name: "should not create when version limit is reached (custom limit)", + name: "should not create when version limit ineligible for deletion is reached (custom limit)", status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ - VersionCount: 5, TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ BuildID: "new", @@ -788,12 +843,20 @@ func TestShouldCreateDeployment(t *testing.T) { Deployment: nil, }, }, + DeprecatedVersions: func() []*temporaliov1alpha1.DeprecatedWorkerDeploymentVersion { + // default is NOT eligible for deletion, so 5 empty Deprecated Versions should block rollout + r := make([]*temporaliov1alpha1.DeprecatedWorkerDeploymentVersion, 5) + for i := range r { + r[i] = &temporaliov1alpha1.DeprecatedWorkerDeploymentVersion{} + } + return r + }(), }, spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{ - MaxVersions: func() *int32 { i := int32(5); return &i }(), - Replicas: func() *int32 { r := int32(1); return &r }(), + Replicas: func() *int32 { r := int32(1); return &r }(), }, - expectCreates: false, + expectCreates: false, + maxVersionsIneligibleForDeletion: func() *int32 { i := int32(5); return &i }(), }, { name: "should create when below version limit", @@ -806,18 +869,29 @@ func TestShouldCreateDeployment(t *testing.T) { Deployment: nil, }, }, + DeprecatedVersions: func() []*temporaliov1alpha1.DeprecatedWorkerDeploymentVersion { + r := make([]*temporaliov1alpha1.DeprecatedWorkerDeploymentVersion, 4) + for i := range r { + r[i] = &temporaliov1alpha1.DeprecatedWorkerDeploymentVersion{} + } + return r + }(), }, spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{ - MaxVersions: func() *int32 { i := int32(5); return &i }(), - Replicas: func() *int32 { r := int32(1); return &r }(), + Replicas: func() *int32 { r := int32(1); return &r }(), }, - expectCreates: true, + expectCreates: true, + maxVersionsIneligibleForDeletion: func() *int32 { i := int32(5); return &i }(), }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - creates := shouldCreateDeployment(tc.status, tc.spec) + maxV := defaults.MaxVersionsIneligibleForDeletion + if tc.maxVersionsIneligibleForDeletion != nil { + maxV = *tc.maxVersionsIneligibleForDeletion + } + creates := shouldCreateDeployment(tc.status, maxV) assert.Equal(t, tc.expectCreates, creates, "unexpected create decision") }) } @@ -1779,7 +1853,7 @@ func TestComplexVersionStateScenarios(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - plan, err := GeneratePlan(logr.Discard(), tc.k8sState, tc.status, tc.spec, tc.state, createDefaultConnectionSpec(), tc.config, "test/namespace") + plan, err := GeneratePlan(logr.Discard(), tc.k8sState, tc.status, tc.spec, tc.state, createDefaultConnectionSpec(), tc.config, "test/namespace", defaults.MaxVersionsIneligibleForDeletion) require.NoError(t, err) assert.Equal(t, tc.expectDeletes, len(plan.DeleteDeployments), "unexpected number of deletes") diff --git a/internal/temporal/worker_deployment.go b/internal/temporal/worker_deployment.go index 30429c4c..858b8a6c 100644 --- a/internal/temporal/worker_deployment.go +++ b/internal/temporal/worker_deployment.go @@ -14,8 +14,11 @@ import ( temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" enumspb "go.temporal.io/api/enums/v1" "go.temporal.io/api/serviceerror" + taskqueuepb "go.temporal.io/api/taskqueue/v1" + "go.temporal.io/api/workflowservice/v1" temporalClient "go.temporal.io/sdk/client" "go.temporal.io/sdk/converter" + appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -31,6 +34,20 @@ type VersionInfo struct { DrainedSince *time.Time TaskQueues []temporaliov1alpha1.TaskQueue TestWorkflows []temporaliov1alpha1.WorkflowExecution + + // True if all task queues in this version have at least one unversioned poller. + // False could just mean unknown / not checked / not checked successfully. + // Only checked for Target Version when Current Version is nil and strategy is Progressive. + // Used to decide whether to fast track the rollout; rollout will be AllAtOnce if: + // - Current Version is nil + // - Strategy is Progressive, and + // - Presence of unversioned pollers in all task queues of target version cannot be confirmed. + AllTaskQueuesHaveUnversionedPoller bool + // True if all task queues in this version have no versioned pollers. + // False could just mean unknown / not checked / not checked successfully. + // Only checked for Drained versions that don't have controller-managed Deployments. + // Used to compute status.VersionCountIneligibleForDeletion. + NoTaskQueuesHaveVersionedPoller bool } // TemporalWorkerState represents the state of a worker deployment in Temporal @@ -54,6 +71,9 @@ func GetWorkerDeploymentState( client temporalClient.Client, workerDeploymentName string, namespace string, + k8sDeployments map[string]*appsv1.Deployment, + targetBuildId string, + strategy temporaliov1alpha1.DefaultVersionUpdateStrategy, controllerIdentity string, ) (*TemporalWorkerState, error) { state := &TemporalWorkerState{ @@ -137,9 +157,36 @@ func GetWorkerDeploymentState( if err == nil { drainedSince := versionResp.Info.DrainageInfo.LastChangedTime versionInfo.DrainedSince = &drainedSince + // If the deployment exists and has replicas, we assume there are versioned pollers, no need to check + deployment, ok := k8sDeployments[version.Version.BuildId] + if !ok || deployment.Status.Replicas == 0 { //revive:disable-line:max-control-nesting + versionInfo.NoTaskQueuesHaveVersionedPoller = noTaskQueuesHaveVersionedPollers(ctx, client, versionResp.Info.TaskQueuesInfos) + } } } else { versionInfo.Status = temporaliov1alpha1.VersionStatusInactive + // get unversioned poller info to decide whether to fast-track rollout + if version.Version.BuildId == targetBuildId && + routingConfig.CurrentVersion == nil && + strategy == temporaliov1alpha1.UpdateProgressive { + var desc temporalClient.WorkerDeploymentVersionDescription + describeVersion := func() error { + desc, err = deploymentHandler.DescribeVersion(ctx, temporalClient.WorkerDeploymentDescribeVersionOptions{ + BuildID: version.Version.BuildId, + }) + return err + } + // At first, version is found in DeploymentInfo.VersionSummaries but not ready for describe, so we have + // to describe with backoff. + // + // Note: We can only check whether the task queues that we know of have unversioned pollers. + // If, later on, a poll request arrives tying a new task queue to the target version, we + // don't know whether that task queue has unversioned pollers. + if err = withBackoff(10*time.Second, 1*time.Second, describeVersion); err == nil { //revive:disable-line:max-control-nesting + versionInfo.AllTaskQueuesHaveUnversionedPoller = allTaskQueuesHaveUnversionedPoller(ctx, client, desc.Info.TaskQueuesInfos) + } + } + } state.Versions[version.Version.BuildId] = versionInfo @@ -148,6 +195,20 @@ func GetWorkerDeploymentState( return state, nil } +func withBackoff(timeout time.Duration, tick time.Duration, fn func() error) error { + deadline := time.Now().Add(timeout) + var lastErr error + for time.Now().Before(deadline) { + err := fn() + if err == nil { + return nil + } + lastErr = err + time.Sleep(tick) + } + return lastErr +} + // GetTestWorkflowStatus queries Temporal to get the status of test workflows for a version func GetTestWorkflowStatus( ctx context.Context, @@ -248,6 +309,24 @@ func GetTestWorkflowID(deploymentName, buildID, taskQueue string) string { return fmt.Sprintf("test-%s:%s-%s", deploymentName, buildID, taskQueue) } +func HasUnversionedPoller(ctx context.Context, + client temporalClient.Client, + taskQueueInfo temporalClient.WorkerDeploymentTaskQueueInfo, +) (bool, error) { + pollers, err := getPollers(ctx, client, taskQueueInfo) + if err != nil { + return false, fmt.Errorf("unable to confirm presence of unversioned pollers: %w", err) + } + for _, p := range pollers { + switch p.GetDeploymentOptions().GetWorkerVersioningMode() { + case temporalClient.WorkerVersioningModeUnversioned, temporalClient.WorkerVersioningModeUnspecified: + return true, nil + case temporalClient.WorkerVersioningModeVersioned: + } + } + return false, nil +} + func DeploymentShouldIgnoreLastModifier( ctx context.Context, deploymentHandler temporalClient.WorkerDeploymentHandle, @@ -289,3 +368,69 @@ func getShouldIgnoreLastModifier( } return false, nil } + +func HasNoVersionedPollers(ctx context.Context, + client temporalClient.Client, + taskQueueInfo temporalClient.WorkerDeploymentTaskQueueInfo, +) (bool, error) { + pollers, err := getPollers(ctx, client, taskQueueInfo) + if err != nil { + return false, fmt.Errorf("unable to confirm absence of versioned pollers: %w", err) + } + for _, p := range pollers { + switch p.GetDeploymentOptions().GetWorkerVersioningMode() { + case temporalClient.WorkerVersioningModeUnversioned, temporalClient.WorkerVersioningModeUnspecified: + case temporalClient.WorkerVersioningModeVersioned: + return false, nil + } + } + return true, nil +} + +func getPollers(ctx context.Context, + client temporalClient.Client, + taskQueueInfo temporalClient.WorkerDeploymentTaskQueueInfo, +) ([]*taskqueuepb.PollerInfo, error) { + var resp *workflowservice.DescribeTaskQueueResponse + var err error + switch taskQueueInfo.Type { + case temporalClient.TaskQueueTypeWorkflow: + resp, err = client.DescribeTaskQueue(ctx, taskQueueInfo.Name, temporalClient.TaskQueueTypeWorkflow) + case temporalClient.TaskQueueTypeActivity: + resp, err = client.DescribeTaskQueue(ctx, taskQueueInfo.Name, temporalClient.TaskQueueTypeActivity) + } + if err != nil { + return nil, fmt.Errorf("unable to describe task queue %s: %w", taskQueueInfo.Name, err) + } + return resp.GetPollers(), nil +} + +func noTaskQueuesHaveVersionedPollers( + ctx context.Context, + client temporalClient.Client, + tqs []temporalClient.WorkerDeploymentTaskQueueInfo, +) bool { + countHasNoVersionedPollers := 0 + for _, tqInfo := range tqs { + hasNoVersionedPollers, _ := HasNoVersionedPollers(ctx, client, tqInfo) // TODO(carlydf): consider logging this error + if hasNoVersionedPollers { + countHasNoVersionedPollers++ + } + } + return countHasNoVersionedPollers == len(tqs) +} + +func allTaskQueuesHaveUnversionedPoller( + ctx context.Context, + client temporalClient.Client, + tqs []temporalClient.WorkerDeploymentTaskQueueInfo, +) bool { + countHasUnversionedPoller := 0 + for _, tqInfo := range tqs { + hasUnversionedPoller, _ := HasUnversionedPoller(ctx, client, tqInfo) // TODO(carlydf): consider logging this error + if hasUnversionedPoller { + countHasUnversionedPoller++ + } + } + return countHasUnversionedPoller == len(tqs) +} diff --git a/internal/testhelpers/make.go b/internal/testhelpers/make.go index 9abae59d..47eccce0 100644 --- a/internal/testhelpers/make.go +++ b/internal/testhelpers/make.go @@ -7,6 +7,7 @@ import ( "github.com/pborman/uuid" temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" "github.com/temporalio/temporal-worker-controller/internal/k8s" + "go.temporal.io/server/common/worker_versioning" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -135,7 +136,7 @@ func MakeTWDWithName(name, namespace string) *temporaliov1alpha1.TemporalWorkerD } func MakeCurrentVersion(namespace, twdName, imageName string, healthy, createDeployment bool) *temporaliov1alpha1.CurrentWorkerDeploymentVersion { - if imageName == "" { // empty build id == nil current version == unversioned + if imageName == worker_versioning.UnversionedVersionId { // empty build id == nil current version == unversioned return nil } ret := &temporaliov1alpha1.CurrentWorkerDeploymentVersion{ diff --git a/internal/testhelpers/workers.go b/internal/testhelpers/workers.go index 8ecd229e..15187743 100644 --- a/internal/testhelpers/workers.go +++ b/internal/testhelpers/workers.go @@ -109,9 +109,6 @@ func newClient(ctx context.Context, hostPort, namespace string) (client.Client, // RunHelloWorldWorker runs one worker per replica in the pod spec. callback is a function that can be called multiple times. func RunHelloWorldWorker(ctx context.Context, podTemplateSpec corev1.PodTemplateSpec, callback func(stopFunc func(), err error)) { w, stopFunc, err := newVersionedWorker(ctx, podTemplateSpec) - defer func() { - callback(stopFunc, err) - }() if err != nil { return } @@ -127,6 +124,8 @@ func RunHelloWorldWorker(ctx context.Context, podTemplateSpec corev1.PodTemplate err = w.Start() if err != nil { callback(nil, err) + } else { + callback(stopFunc, nil) } }() } diff --git a/internal/tests/internal/deployment_controller.go b/internal/tests/internal/deployment_controller.go index ab904c48..9d014b81 100644 --- a/internal/tests/internal/deployment_controller.go +++ b/internal/tests/internal/deployment_controller.go @@ -31,6 +31,38 @@ func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { } } +func startAndStopWorker(t *testing.T, ctx context.Context, k8sClient client.Client, deploymentName, namespace string) { + var deployment appsv1.Deployment + if err := k8sClient.Get(ctx, types.NamespacedName{ + Name: deploymentName, + Namespace: namespace, + }, &deployment); err != nil { + t.Fatalf("failed to get deployment: %v", err) + } + + startedCh := make(chan struct{}) + var stop func() + workerCallback := func(stopFunc func(), err error) { + if err != nil { + t.Errorf("failed to start worker: %v", err) + } else { + startedCh <- struct{}{} + stop = stopFunc + } + } + + testhelpers.RunHelloWorldWorker(ctx, deployment.Spec.Template, workerCallback) + // wait for worker to start + <-startedCh + + time.Sleep(1 * time.Second) + + // kill worker + if stop != nil { + stop() + } +} + func applyDeployment(t *testing.T, ctx context.Context, k8sClient client.Client, deploymentName, namespace string) []func() { var deployment appsv1.Deployment if err := k8sClient.Get(ctx, types.NamespacedName{ @@ -151,18 +183,16 @@ func createStatus( rampPercentage *float32, ) (workerStopFuncs []func()) { if prevVersion.Deployment != nil && prevVersion.Deployment.FieldPath == "create" { - deploymentName := k8s.ComputeWorkerDeploymentName(newTWD) + workerDeploymentName := k8s.ComputeWorkerDeploymentName(newTWD) v := &worker.WorkerDeploymentVersion{ - DeploymentName: deploymentName, + DeploymentName: workerDeploymentName, BuildId: prevVersion.BuildID, } prevTWD := recreateTWD(newTWD, env.ExistingDeploymentImages[v.BuildId], env.ExistingDeploymentReplicas[v.BuildId]) createWorkerDeployment(ctx, t, env, prevTWD, v.BuildId) expectedDeploymentName := k8s.ComputeVersionedDeploymentName(prevTWD.Name, k8s.ComputeBuildID(prevTWD)) waitForDeployment(t, env.K8sClient, expectedDeploymentName, prevTWD.Namespace, 30*time.Second) - if prevVersion.Status != temporaliov1alpha1.VersionStatusNotRegistered { - workerStopFuncs = applyDeployment(t, ctx, env.K8sClient, expectedDeploymentName, prevTWD.Namespace) - } + workerStopFuncs = applyDeployment(t, ctx, env.K8sClient, expectedDeploymentName, prevTWD.Namespace) switch prevVersion.Status { case temporaliov1alpha1.VersionStatusInactive, temporaliov1alpha1.VersionStatusNotRegistered: @@ -176,6 +206,9 @@ func createStatus( // TODO(carlydf): start a workflow on v that does not complete -> will never drain setRampingVersion(t, ctx, env.Ts, v.DeploymentName, "", 0) case temporaliov1alpha1.VersionStatusDrained: + if env.ExistingDeploymentReplicas[v.BuildId] == 0 { + startAndStopWorker(t, ctx, env.K8sClient, expectedDeploymentName, prevTWD.Namespace) + } setCurrentVersion(t, ctx, env.Ts, v.DeploymentName, v.BuildId) setCurrentVersion(t, ctx, env.Ts, v.DeploymentName, "") } diff --git a/internal/tests/internal/env_helpers.go b/internal/tests/internal/env_helpers.go index 719f55eb..f82aa4ff 100644 --- a/internal/tests/internal/env_helpers.go +++ b/internal/tests/internal/env_helpers.go @@ -36,6 +36,13 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" ) +const ( + testShortPollerHistoryTTL = time.Second + testDrainageVisibilityGracePeriod = time.Second + testDrainageRefreshInterval = time.Second + testMaxVersionsIneligibleForDeletion = 5 +) + // setupKubebuilderAssets sets up the KUBEBUILDER_ASSETS environment variable if not already set func setupKubebuilderAssets() error { if os.Getenv("KUBEBUILDER_ASSETS") != "" { @@ -92,6 +99,9 @@ func setupTestEnvironment(t *testing.T) (*rest.Config, client.Client, manager.Ma t.Skip("Skipping because KUBEBUILDER_ASSETS not set") } + // set max versions value for testing + t.Setenv(controller.ControllerMaxDeploymentVersionsIneligibleForDeletionEnvKey, fmt.Sprintf("%d", testMaxVersionsIneligibleForDeletion)) + // Setup kubebuilder assets for IDE testing if err := setupKubebuilderAssets(); err != nil { t.Logf("Warning: Could not setup kubebuilder assets automatically: %v", err) @@ -144,6 +154,7 @@ func setupTestEnvironment(t *testing.T) (*rest.Config, client.Client, manager.Ma Scheme: mgr.GetScheme(), TemporalClientPool: clientPool, DisableRecoverPanic: true, + MaxDeploymentVersionsIneligibleForDeletion: controller.GetControllerMaxDeploymentVersionsIneligibleForDeletion(), } err = reconciler.SetupWithManager(mgr) if err != nil { @@ -193,14 +204,18 @@ func cleanupTestNamespace(t *testing.T, cfg *rest.Config, k8sClient client.Clien } } -func setupUnversionedPoller(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { +func setupUnversionedPollers(t *testing.T, ctx context.Context, tc testhelpers.TestCase, env testhelpers.TestEnv) { w, _, err := testhelpers.NewWorker(ctx, "", "", tc.GetTWD().Name, env.Ts.GetFrontendHostPort(), env.Ts.GetDefaultNamespace(), false) + if err != nil { + t.Errorf("failed to setup worker: %v", err) + } // Register a dummy workflow and activity so the worker has something to poll for w.RegisterWorkflowWithOptions(func(ctx workflow.Context) (string, error) { return "hi", nil }, workflow.RegisterOptions{Name: "dummyWorkflow"}) w.RegisterActivity(func(ctx context.Context) (string, error) { return "hi", nil }) err = w.Start() + t.Log("started unversioned worker") if err != nil { t.Errorf("error starting unversioned worker %v", err) } @@ -227,6 +242,7 @@ func setupUnversionedPoller(t *testing.T, ctx context.Context, tc testhelpers.Te } return nil }) + t.Logf("confirmed that task queue %v has unversioned workflow and activity pollers", tc.GetTWD().Name) } func hasUnversionedPoller(ctx context.Context, diff --git a/internal/tests/internal/integration_test.go b/internal/tests/internal/integration_test.go index 5ec715a5..8dab67e8 100644 --- a/internal/tests/internal/integration_test.go +++ b/internal/tests/internal/integration_test.go @@ -9,6 +9,7 @@ import ( "github.com/temporalio/temporal-worker-controller/internal/k8s" "github.com/temporalio/temporal-worker-controller/internal/testhelpers" "go.temporal.io/server/common/dynamicconfig" + "go.temporal.io/server/common/worker_versioning" "go.temporal.io/server/temporal" "go.temporal.io/server/temporaltest" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -16,12 +17,6 @@ import ( "sigs.k8s.io/controller-runtime/pkg/manager" ) -const ( - testShortPollerHistoryTTL = time.Second - testDrainageVisibilityGracePeriod = time.Second - testDrainageRefreshInterval = time.Second -) - // TestIntegration runs integration tests for the Temporal Worker Controller func TestIntegration(t *testing.T) { // Set up test environment @@ -77,18 +72,17 @@ func TestIntegration(t *testing.T) { WithTargetVersion("v1.0", temporaliov1alpha1.VersionStatusCurrent, -1, true, false). WithCurrentVersion("v1.0", true, false), ), - //// TODO(carlydf): this won't work until the controller detects unversioned pollers - // "progressive-rollout-yes-unversioned-pollers-expect-first-step": testhelpers.NewTestCase(). - // WithInput( - // testhelpers.NewTemporalWorkerDeploymentBuilder(). - // WithProgressiveStrategy(testhelpers.ProgressiveStep(5, time.Hour)). - // WithTargetTemplate("v1.0"), - // ). - // WithSetupFunction(setupUnversionedPoller). - // WithExpectedStatus( - // testhelpers.NewStatusBuilder(). - // WithTargetVersion("v1.0", temporaliov1alpha1.VersionStatusRamping, 5, true, false), - // ), + "progressive-rollout-yes-unversioned-pollers-expect-first-step": testhelpers.NewTestCase(). + WithInput( + testhelpers.NewTemporalWorkerDeploymentBuilder(). + WithProgressiveStrategy(testhelpers.ProgressiveStep(5, time.Hour)). + WithTargetTemplate("v1"), + ). + WithSetupFunction(setupUnversionedPollers). + WithExpectedStatus( + testhelpers.NewStatusBuilder(). + WithTargetVersion("v1", temporaliov1alpha1.VersionStatusRamping, 5, true, false), + ), "nth-progressive-rollout-expect-first-step": testhelpers.NewTestCase(). WithInput( testhelpers.NewTemporalWorkerDeploymentBuilder(). @@ -191,6 +185,50 @@ func TestIntegration(t *testing.T) { testhelpers.NewDeploymentInfo("v1.0", 0), testhelpers.NewDeploymentInfo("v2.0", 1), ), + "nth-rollout-blocked-at-max-replicas": testhelpers.NewTestCase(). + WithInput( + testhelpers.NewTemporalWorkerDeploymentBuilder(). + WithAllAtOnceStrategy(). + WithTargetTemplate("v5"). + WithStatus( + testhelpers.NewStatusBuilder(). + WithTargetVersion("v4", temporaliov1alpha1.VersionStatusCurrent, -1, true, true). + WithCurrentVersion("v4", true, true). + WithDeprecatedVersions( // drained AND has no pollers -> eligible for deletion + testhelpers.NewDeprecatedVersionInfo("v0", temporaliov1alpha1.VersionStatusDrained, true, true, true), + testhelpers.NewDeprecatedVersionInfo("v1", temporaliov1alpha1.VersionStatusDrained, true, true, true), + testhelpers.NewDeprecatedVersionInfo("v2", temporaliov1alpha1.VersionStatusDrained, true, true, true), + testhelpers.NewDeprecatedVersionInfo("v3", temporaliov1alpha1.VersionStatusDrained, true, true, true), + ), + ), + ). + WithExistingDeployments( + testhelpers.NewDeploymentInfo("v0", 1), + testhelpers.NewDeploymentInfo("v1", 1), + testhelpers.NewDeploymentInfo("v2", 1), + testhelpers.NewDeploymentInfo("v3", 1), + testhelpers.NewDeploymentInfo("v4", 1), + ). + WithWaitTime(5*time.Second). + WithExpectedStatus( + testhelpers.NewStatusBuilder(). // controller won't deploy v5, so it's not registered + WithTargetVersion("v5", temporaliov1alpha1.VersionStatusNotRegistered, -1, false, false). + WithCurrentVersion("v4", true, false). + WithDeprecatedVersions( // drained but has pollers, so ineligible for deletion + testhelpers.NewDeprecatedVersionInfo("v0", temporaliov1alpha1.VersionStatusDrained, true, false, true), + testhelpers.NewDeprecatedVersionInfo("v1", temporaliov1alpha1.VersionStatusDrained, true, false, true), + testhelpers.NewDeprecatedVersionInfo("v2", temporaliov1alpha1.VersionStatusDrained, true, false, true), + testhelpers.NewDeprecatedVersionInfo("v3", temporaliov1alpha1.VersionStatusDrained, true, false, true), + ), + ). + WithExpectedDeployments( + testhelpers.NewDeploymentInfo("v0", 1), + testhelpers.NewDeploymentInfo("v1", 1), + testhelpers.NewDeploymentInfo("v2", 1), + testhelpers.NewDeploymentInfo("v3", 1), + testhelpers.NewDeploymentInfo("v4", 1), + testhelpers.NewDeploymentInfo("v5", 1), + ), "nth-rollout-blocked-by-modifier": testhelpers.NewTestCase(). WithInput( testhelpers.NewTemporalWorkerDeploymentBuilder(). @@ -210,7 +248,7 @@ func TestIntegration(t *testing.T) { WithExpectedStatus( testhelpers.NewStatusBuilder(). WithTargetVersion("v1", temporaliov1alpha1.VersionStatusInactive, -1, true, false). - WithCurrentVersion("", false, false). + WithCurrentVersion(worker_versioning.UnversionedVersionId, false, false). WithDeprecatedVersions(testhelpers.NewDeprecatedVersionInfo("v0", temporaliov1alpha1.VersionStatusDrained, true, false, true)), ). WithExpectedDeployments( @@ -264,6 +302,51 @@ func TestIntegration(t *testing.T) { ) testsShortPollerTTL := map[string]*testhelpers.TestCaseBuilder{ // Note: Add tests that require pollers to expire quickly here + "nth-rollout-unblocked-after-pollers-die": testhelpers.NewTestCase(). + WithInput( + testhelpers.NewTemporalWorkerDeploymentBuilder(). + WithAllAtOnceStrategy(). + WithTargetTemplate("v5"). + WithStatus( + testhelpers.NewStatusBuilder(). + WithTargetVersion("v4", temporaliov1alpha1.VersionStatusCurrent, -1, true, true). + WithCurrentVersion("v4", true, true). + WithDeprecatedVersions( // drained AND has no pollers -> eligible for deletion + testhelpers.NewDeprecatedVersionInfo("v0", temporaliov1alpha1.VersionStatusDrained, true, true, true), + testhelpers.NewDeprecatedVersionInfo("v1", temporaliov1alpha1.VersionStatusDrained, true, true, true), + testhelpers.NewDeprecatedVersionInfo("v2", temporaliov1alpha1.VersionStatusDrained, true, true, true), + testhelpers.NewDeprecatedVersionInfo("v3", temporaliov1alpha1.VersionStatusDrained, true, true, true), + ), + ), + ). + WithExistingDeployments( + testhelpers.NewDeploymentInfo("v0", 0), // 0 replicas -> no pollers + testhelpers.NewDeploymentInfo("v1", 1), + testhelpers.NewDeploymentInfo("v2", 1), + testhelpers.NewDeploymentInfo("v3", 1), + testhelpers.NewDeploymentInfo("v4", 1), + ). + WithWaitTime(5*time.Second). + WithExpectedStatus( + testhelpers.NewStatusBuilder(). + WithTargetVersion("v5", temporaliov1alpha1.VersionStatusCurrent, -1, false, false). + WithCurrentVersion("v5", true, false). + WithDeprecatedVersions( // drained AND has pollers -> eligible for deletion + testhelpers.NewDeprecatedVersionInfo("v0", temporaliov1alpha1.VersionStatusDrained, true, false, true), + testhelpers.NewDeprecatedVersionInfo("v1", temporaliov1alpha1.VersionStatusDrained, true, false, true), + testhelpers.NewDeprecatedVersionInfo("v2", temporaliov1alpha1.VersionStatusDrained, true, false, true), + testhelpers.NewDeprecatedVersionInfo("v3", temporaliov1alpha1.VersionStatusDrained, true, false, true), + testhelpers.NewDeprecatedVersionInfo("v4", temporaliov1alpha1.VersionStatusDrained, true, false, true), + ), + ). + WithExpectedDeployments( + testhelpers.NewDeploymentInfo("v0", 0), // 0 replicas -> no pollers + testhelpers.NewDeploymentInfo("v1", 1), + testhelpers.NewDeploymentInfo("v2", 1), + testhelpers.NewDeploymentInfo("v3", 1), + testhelpers.NewDeploymentInfo("v4", 1), + testhelpers.NewDeploymentInfo("v5", 1), + ), } for testName, tc := range testsShortPollerTTL { @@ -318,7 +401,7 @@ func testTemporalWorkerDeploymentCreation( // apply post-status setup function if f := tc.GetSetupFunc(); f != nil { - tc.GetSetupFunc()(t, ctx, tc, env) + f(t, ctx, tc, env) } t.Log("Creating a TemporalWorkerDeployment") @@ -328,9 +411,13 @@ func testTemporalWorkerDeploymentCreation( t.Log("Waiting for the controller to reconcile") expectedDeploymentName := k8s.ComputeVersionedDeploymentName(twd.Name, k8s.ComputeBuildID(twd)) - waitForDeployment(t, k8sClient, expectedDeploymentName, twd.Namespace, 30*time.Second) - workerStopFuncs := applyDeployment(t, ctx, k8sClient, expectedDeploymentName, twd.Namespace) - defer handleStopFuncs(workerStopFuncs) + + // only wait for and create the deployment if it is expected + if expectedStatus.TargetVersion.Status != temporaliov1alpha1.VersionStatusNotRegistered { + waitForDeployment(t, k8sClient, expectedDeploymentName, twd.Namespace, 30*time.Second) + workerStopFuncs := applyDeployment(t, ctx, k8sClient, expectedDeploymentName, twd.Namespace) + defer handleStopFuncs(workerStopFuncs) + } if wait := tc.GetWaitTime(); wait != nil { time.Sleep(*wait) diff --git a/internal/tests/internal/validation_helpers.go b/internal/tests/internal/validation_helpers.go index 5db24354..14624f3d 100644 --- a/internal/tests/internal/validation_helpers.go +++ b/internal/tests/internal/validation_helpers.go @@ -7,7 +7,7 @@ import ( "time" temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" - "github.com/temporalio/temporal-worker-controller/internal/controller" + "github.com/temporalio/temporal-worker-controller/internal/defaults" "github.com/temporalio/temporal-worker-controller/internal/k8s" "github.com/temporalio/temporal-worker-controller/internal/testhelpers" sdkclient "go.temporal.io/sdk/client" @@ -33,6 +33,7 @@ func waitForDeployment(t *testing.T, k8sClient client.Client, deploymentName, na Name: deploymentName, Namespace: namespace, }, &deployment); err == nil { + t.Logf("Found deployment %s in namespace %s", deployment.Name, namespace) return } time.Sleep(1 * time.Second) @@ -79,7 +80,7 @@ func setCurrentVersion( eventually(t, 30*time.Second, time.Second, func() error { _, err := deploymentHandler.SetCurrentVersion(ctx, sdkclient.WorkerDeploymentSetCurrentVersionOptions{ BuildID: buildID, - Identity: controller.DefaultControllerIdentity, + Identity: defaults.ControllerIdentity, }) if err != nil { return fmt.Errorf("unable to set build '%s' as current of worker deployment %s: %w", buildID, workerDeploymentName, err) @@ -107,7 +108,7 @@ func setRampingVersion( _, err := deploymentHandler.SetRampingVersion(ctx, sdkclient.WorkerDeploymentSetRampingVersionOptions{ BuildID: buildID, Percentage: rampPercentage, - Identity: controller.DefaultControllerIdentity, + Identity: defaults.ControllerIdentity, }) if err != nil { return fmt.Errorf("unable to set build '%s' as ramping of worker deployment %s: %w", buildID, workerDeploymentName, err)