Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions api/v1alpha1/temporalworker_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,6 @@ func (s *TemporalWorkerDeploymentSpec) Default(ctx context.Context) error {
s.SunsetStrategy.DeleteDelay = &v1.Duration{Duration: defaults.DeleteDelay}
}

if s.MaxVersions == nil {
maxVersions := int32(defaults.MaxVersions)
s.MaxVersions = &maxVersions
}

return nil
}

Expand Down
18 changes: 0 additions & 18 deletions api/v1alpha1/temporalworker_webhook_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,24 +159,6 @@ func TestTemporalWorkerDeployment_Default(t *testing.T) {
obj runtime.Object
expected func(t *testing.T, obj *temporaliov1alpha1.TemporalWorkerDeployment)
}{
"sets default maxVersions": {
obj: testhelpers.MakeTWDWithName("default-max-versions", ""),
expected: func(t *testing.T, obj *temporaliov1alpha1.TemporalWorkerDeployment) {
require.NotNil(t, obj.Spec.MaxVersions)
assert.Equal(t, int32(75), *obj.Spec.MaxVersions)
},
},
"preserves existing maxVersions": {
obj: testhelpers.ModifyObj(testhelpers.MakeTWDWithName("preserve-max-versions", ""), func(obj *temporaliov1alpha1.TemporalWorkerDeployment) *temporaliov1alpha1.TemporalWorkerDeployment {
maxVersions := int32(100)
obj.Spec.MaxVersions = &maxVersions
return obj
}),
expected: func(t *testing.T, obj *temporaliov1alpha1.TemporalWorkerDeployment) {
require.NotNil(t, obj.Spec.MaxVersions)
assert.Equal(t, int32(100), *obj.Spec.MaxVersions)
},
},
"sets default sunset strategy delays": {
obj: testhelpers.MakeTWDWithName("default-sunset-delays", ""),
expected: func(t *testing.T, obj *temporaliov1alpha1.TemporalWorkerDeployment) {
Expand Down
40 changes: 19 additions & 21 deletions api/v1alpha1/worker_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,18 +60,6 @@ type TemporalWorkerDeploymentSpec struct {

// TODO(jlegrone): add godoc
WorkerOptions WorkerOptions `json:"workerOptions"`

// MaxVersions defines the maximum number of worker deployment versions allowed.
// This helps prevent hitting Temporal's default limit of 100 versions per deployment.
// Defaults to 75. Users can override this by explicitly setting a higher value in
// the CRD, but should exercise caution: once the server's version limit is reached,
// Temporal attempts to delete an eligible version. If no version is eligible for deletion,
// new deployments get blocked which prevents the controller from making progress.
// This limit can be adjusted server-side by setting `matching.maxVersionsInDeployment`
// in dynamicconfig.
// +optional
// +kubebuilder:validation:Minimum=1
MaxVersions *int32 `json:"maxVersions,omitempty"`
}

// VersionStatus indicates the status of a version.
Expand Down Expand Up @@ -240,6 +228,12 @@ type DeprecatedWorkerDeploymentVersion struct {
// Only set when Status is VersionStatusDrained.
// +optional
DrainedSince *metav1.Time `json:"drainedSince"`

// A Version is eligible for deletion if it is drained and has no pollers on any task queue.
// After pollers stop polling, the server will still consider them present until `matching.PollerHistoryTTL`
// has passed.
// +optional
EligibleForDeletion bool `json:"eligibleForDeletion,omitempty"`
}

// DefaultVersionUpdateStrategy describes how to cut over new workflow executions
Expand All @@ -248,10 +242,20 @@ type DeprecatedWorkerDeploymentVersion struct {
type DefaultVersionUpdateStrategy string

const (
// UpdateManual scales worker resources up or down, but does not update the current or ramping worker deployment version.
UpdateManual DefaultVersionUpdateStrategy = "Manual"

// UpdateAllAtOnce starts 100% of new workflow executions on the new worker deployment version as soon as it's healthy.
UpdateAllAtOnce DefaultVersionUpdateStrategy = "AllAtOnce"

// UpdateProgressive ramps up the percentage of new workflow executions targeting the new worker deployment version over time.
//
// Note: If the Current Version of a Worker Deployment is nil and the controller cannot confirm that all Task Queues
// in the Target Version have at least one unversioned poller, the controller will immediately set the new worker
// deployment version to be Current and ignore the Progressive rollout steps.
// Sending a percentage of traffic to a "nil" version means that traffic will be sent to unversioned workers. If
// there are no unversioned workers, those tasks will get stuck. This behavior ensures that all traffic on the task
// queues in this worker deployment can be handled by an active poller.
UpdateProgressive DefaultVersionUpdateStrategy = "Progressive"
)

Expand All @@ -263,15 +267,9 @@ type GateWorkflowConfig struct {
type RolloutStrategy struct {
// Specifies how to treat concurrent executions of a Job.
// Valid values are:
// - "Manual": scale worker resources up or down, but do not update the current or ramping worker deployment version;
// - "AllAtOnce": start 100% of new workflow executions on the new worker deployment version as soon as it's healthy;
// - "Progressive": ramp up the percentage of new workflow executions targeting the new worker deployment version over time;
//
// Note: If the Current Version of a Worker Deployment is nil, the controller will ignore any Progressive Rollout
// Steps and immediately set the new worker deployment version to be Current.
// Sending a percentage of traffic to a "nil" version means that traffic will be sent to unversioned workers. If
// there are no unversioned workers, those tasks will get stuck. This behavior ensures that all traffic on the task
// queues in this worker deployment can be handled by an active poller.
// - "Manual"
// - "AllAtOnce"
// - "Progressive"
Strategy DefaultVersionUpdateStrategy `json:"strategy"`

// Gate specifies a workflow type that must run once to completion on the new worker deployment version before
Expand Down
5 changes: 0 additions & 5 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func main() {
}))),
mgr.GetClient(),
),
MaxDeploymentVersionsIneligibleForDeletion: controller.GetControllerMaxDeploymentVersionsIneligibleForDeletion(),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "TemporalWorkerDeployment")
os.Exit(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ spec:
type: object
spec:
properties:
maxVersions:
format: int32
minimum: 1
type: integer
minReadySeconds:
format: int32
type: integer
Expand Down Expand Up @@ -3876,6 +3872,8 @@ spec:
drainedSince:
format: date-time
type: string
eligibleForDeletion:
type: boolean
healthySince:
format: date-time
type: string
Expand Down
4 changes: 2 additions & 2 deletions internal/controller/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,8 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l
},
MetadataUpdate: sdkclient.WorkerDeploymentMetadataUpdate{
UpsertEntries: map[string]interface{}{
controllerIdentityKey: getControllerIdentity(),
controllerVersionKey: getControllerVersion(),
controllerIdentityMetadataKey: getControllerIdentity(),
controllerVersionMetadataKey: getControllerVersion(),
},
},
}); err != nil { // would be cool to do this atomically with the update
Expand Down
1 change: 1 addition & 0 deletions internal/controller/genplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan(
connection,
plannerConfig,
workerDeploymentName,
r.MaxDeploymentVersionsIneligibleForDeletion,
)
if err != nil {
return nil, fmt.Errorf("error generating plan: %w", err)
Expand Down
14 changes: 1 addition & 13 deletions internal/controller/genstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package controller

import (
"context"
"fmt"

"github.com/go-logr/logr"
temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1"
Expand All @@ -23,22 +22,11 @@ func (r *TemporalWorkerDeploymentReconciler) generateStatus(
req ctrl.Request,
workerDeploy *temporaliov1alpha1.TemporalWorkerDeployment,
temporalState *temporal.TemporalWorkerState,
k8sState *k8s.DeploymentState,
) (*temporaliov1alpha1.TemporalWorkerDeploymentStatus, error) {
workerDeploymentName := k8s.ComputeWorkerDeploymentName(workerDeploy)
targetBuildID := k8s.ComputeBuildID(workerDeploy)

// Fetch Kubernetes deployment state
k8sState, err := k8s.GetDeploymentState(
ctx,
r.Client,
req.Namespace,
req.Name,
workerDeploymentName,
)
if err != nil {
return nil, fmt.Errorf("unable to get Kubernetes deployment state: %w", err)
}

// Fetch test workflow status for the desired version
if targetBuildID != temporalState.CurrentBuildID {
testWorkflows, err := temporal.GetTestWorkflowStatus(
Expand Down
6 changes: 6 additions & 0 deletions internal/controller/state_mapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,11 +160,17 @@ func (m *stateMapper) mapDeprecatedWorkerDeploymentVersionByBuildID(buildID stri
return nil
}

eligibleForDeletion := false
if vInfo, exists := m.temporalState.Versions[buildID]; exists {
eligibleForDeletion = vInfo.Status == v1alpha1.VersionStatusDrained && vInfo.NoTaskQueuesHaveVersionedPoller
}

version := &v1alpha1.DeprecatedWorkerDeploymentVersion{
BaseWorkerDeploymentVersion: v1alpha1.BaseWorkerDeploymentVersion{
BuildID: buildID,
Status: v1alpha1.VersionStatusNotRegistered,
},
EligibleForDeletion: eligibleForDeletion,
}

// Set deployment reference if it exists
Expand Down
28 changes: 22 additions & 6 deletions internal/controller/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@ package controller

import (
"os"
"strconv"

"github.com/temporalio/temporal-worker-controller/internal/defaults"
)

const (
controllerIdentityKey = "temporal.io/controller"
controllerVersionKey = "temporal.io/controller-version"
DefaultControllerIdentity = "temporal-worker-controller"
controllerIdentityMetadataKey = "temporal.io/controller"
controllerVersionMetadataKey = "temporal.io/controller-version"

controllerVersionEnvKey = "CONTROLLER_VERSION"
controllerIdentityEnvKey = "CONTROLLER_IDENTITY"
ControllerMaxDeploymentVersionsIneligibleForDeletionEnvKey = "CONTROLLER_MAX_DEPLOYMENT_VERSIONS_INELIGIBLE_FOR_DELETION"
)

// Version is set by goreleaser via ldflags at build time
Expand All @@ -24,16 +30,26 @@ func getControllerVersion() string {
return Version
}
// Fall back to environment variable (set by Helm from image.tag)
if version := os.Getenv("CONTROLLER_VERSION"); version != "" {
if version := os.Getenv(controllerVersionEnvKey); version != "" {
return version
}
return "unknown"
}

// getControllerIdentity returns the identity from environment variable (set by Helm)
func getControllerIdentity() string {
if identity := os.Getenv("CONTROLLER_IDENTITY"); identity != "" {
if identity := os.Getenv(controllerIdentityEnvKey); identity != "" {
return identity
}
return DefaultControllerIdentity
return defaults.ControllerIdentity
}

func GetControllerMaxDeploymentVersionsIneligibleForDeletion() int32 {
if maxStr := os.Getenv(ControllerMaxDeploymentVersionsIneligibleForDeletionEnvKey); maxStr != "" {
i, err := strconv.Atoi(maxStr)
if err == nil {
return int32(i)
}
}
return defaults.MaxVersionsIneligibleForDeletion
}
34 changes: 32 additions & 2 deletions internal/controller/worker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ type TemporalWorkerDeploymentReconciler struct {

// Disables panic recovery if true
DisableRecoverPanic bool

// When a Worker Deployment has the maximum number of versions (100 per Worker Deployment by default),
// it will delete the oldest eligible version when a worker with the 101st version arrives.
// If no versions are eligible for deletion, that worker's poll will fail, which is dangerous.
// To protect against this, when a Worker Deployment has too many versions ineligible for deletion,
// the controller will stop deploying new workers in order to give the user the opportunity to adjust
// their sunset policy to avoid this situation before it actually blocks deployment of a new worker
// version on the server side.
//
// MaxDeploymentVersionsIneligibleForDeletion is currently defaulted to 75, which is safe for the default
// server value of `matching.maxVersionsInDeployment=100`.
// Users who reduce `matching.maxVersionsInDeployment` in their dynamicconfig should also reduce this value.
MaxDeploymentVersionsIneligibleForDeletion int32
}

//+kubebuilder:rbac:groups=temporal.io,resources=temporalworkerdeployments,verbs=get;list;watch;create;update;patch;delete
Expand Down Expand Up @@ -130,21 +143,38 @@ func (r *TemporalWorkerDeploymentReconciler) Reconcile(ctx context.Context, req
temporalClient = c
}

// Fetch Temporal worker deployment state
workerDeploymentName := k8s.ComputeWorkerDeploymentName(&workerDeploy)
targetBuildID := k8s.ComputeBuildID(&workerDeploy)

// Fetch Kubernetes deployment state
k8sState, err := k8s.GetDeploymentState(
ctx,
r.Client,
req.Namespace,
req.Name,
workerDeploymentName,
)
if err != nil {
return ctrl.Result{}, fmt.Errorf("unable to get Kubernetes deployment state: %w", err)
}

// Fetch Temporal worker deployment state
temporalState, err := temporal.GetWorkerDeploymentState(
ctx,
temporalClient,
workerDeploymentName,
workerDeploy.Spec.WorkerOptions.TemporalNamespace,
k8sState.Deployments,
targetBuildID,
workerDeploy.Spec.RolloutStrategy.Strategy,
getControllerIdentity(),
)
if err != nil {
return ctrl.Result{}, fmt.Errorf("unable to get Temporal worker deployment state: %w", err)
}

// Compute a new status from k8s and temporal state
status, err := r.generateStatus(ctx, l, temporalClient, req, &workerDeploy, temporalState)
status, err := r.generateStatus(ctx, l, temporalClient, req, &workerDeploy, temporalState, k8sState)
if err != nil {
return ctrl.Result{}, err
}
Expand Down
9 changes: 5 additions & 4 deletions internal/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ import "time"

// Default values for TemporalWorkerDeploymentSpec fields
const (
ScaledownDelay = 1 * time.Hour
DeleteDelay = 24 * time.Hour
ServerMaxVersions = 100
MaxVersions = int32(ServerMaxVersions * 0.75)
ScaledownDelay = 1 * time.Hour
DeleteDelay = 24 * time.Hour
ServerMaxVersions = 100
MaxVersionsIneligibleForDeletion = int32(ServerMaxVersions * 0.75)
ControllerIdentity = "temporal-worker-controller"
)
Loading
Loading