diff --git a/README.md b/README.md index c5a94330..1fbf95a4 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ > 🚀 **Public Preview**: This project is in [Public Preview](https://docs.temporal.io/evaluate/development-production-features/release-stages) and ready for production use cases*. Core functionality is complete with stable APIs. > -> *Dynamic auto-scaling based on workflow load is not yet implemented. Use cases must work with fixed worker replica counts. +> *Autoscaling based on Temporal task queue depth is not yet built in. You can attach Horizontal Pod Autoscalers or KEDA ScaledObjects to each versioned Deployment via [`TemporalWorkerOwnedResource`](docs/owned-resources.md). **The Temporal Worker Controller makes it simple and safe to deploy Temporal workers on Kubernetes.** @@ -78,6 +78,7 @@ When you update the image, the controller automatically: - Helm [v3.0+](https://github.com/helm/helm/releases) if deploying via our Helm chart - [Temporal Server](https://docs.temporal.io/) (Cloud or self-hosted [v1.29.1](https://github.com/temporalio/temporal/releases/tag/v1.29.1)) - Basic familiarity with Temporal [Workers](https://docs.temporal.io/workers), [Workflows](https://docs.temporal.io/workflows), and [Worker Versioning](https://docs.temporal.io/production-deployment/worker-deployments/worker-versioning) +- **[cert-manager](https://cert-manager.io/docs/installation/)** *(required for `TemporalWorkerOwnedResource`)* — the controller installs a validating webhook for TWOR objects that requires TLS. cert-manager handles certificate provisioning automatically. If cert-manager is not available in your cluster, see [Webhook TLS without cert-manager](docs/owned-resources.md#webhook-tls) for the manual setup. ### 🔧 Installation @@ -104,7 +105,7 @@ helm install temporal-worker-controller \ - ✅ **Deletion of resources** associated with drained Worker Deployment Versions - ✅ **Multiple rollout strategies**: `Manual`, `AllAtOnce`, and `Progressive` rollouts - ✅ **Gate workflows** - Test new versions with a [pre-deployment test](https://docs.temporal.io/production-deployment/worker-deployments/worker-versioning#adding-a-pre-deployment-test) before routing real traffic to them -- ⏳ **Load-based auto-scaling** - Not yet implemented (use fixed replica counts) +- ✅ **Per-version attached resources** - Attach HPAs, KEDA ScaledObjects, PodDisruptionBudgets, or any namespaced Kubernetes resource to each worker version with running workers via [`TemporalWorkerOwnedResource`](docs/owned-resources.md) — this is also the recommended path for metric-based and backlog-based autoscaling ## 💡 Why Use This? @@ -137,6 +138,7 @@ The Temporal Worker Controller eliminates this operational overhead by automatin | [Configuration](docs/configuration.md) | Complete configuration reference | | [Concepts](docs/concepts.md) | Key concepts and terminology | | [Limits](docs/limits.md) | Technical constraints and limitations | +| [TemporalWorkerOwnedResource](docs/owned-resources.md) | Attach HPAs, PDBs, and other resources to each versioned Deployment | ## 🔧 Worker Configuration diff --git a/api/v1alpha1/temporalworkerownedresource_types.go b/api/v1alpha1/temporalworkerownedresource_types.go new file mode 100644 index 00000000..27d44a4e --- /dev/null +++ b/api/v1alpha1/temporalworkerownedresource_types.go @@ -0,0 +1,104 @@ +// Unless explicitly stated otherwise all files in this repository are licensed under the MIT License. +// +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2024 Datadog, Inc. + +package v1alpha1 + +import ( + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +// WorkerDeploymentReference references a TemporalWorkerDeployment in the same namespace. +type WorkerDeploymentReference struct { + // Name of the TemporalWorkerDeployment resource in the same namespace. + // +kubebuilder:validation:Required + // +kubebuilder:validation:Pattern=`^[a-z0-9]([-a-z0-9]*[a-z0-9])?$` + Name string `json:"name"` +} + +// TemporalWorkerOwnedResourceSpec defines the desired state of TemporalWorkerOwnedResource. +type TemporalWorkerOwnedResourceSpec struct { + // WorkerRef references the TemporalWorkerDeployment to attach this resource to. + // +kubebuilder:validation:Required + WorkerRef WorkerDeploymentReference `json:"workerRef"` + + // Object is the Kubernetes resource template to attach to each versioned Deployment. + // One copy of this resource is created per active Build ID, owned by the corresponding + // versioned Deployment (so it is garbage collected when the Deployment is deleted). + // + // The object must include apiVersion, kind, and spec. The metadata.name and + // metadata.namespace are generated by the controller and must not be set by the user. + // + // String values in the spec may contain Go template expressions: + // {{ .DeploymentName }} - the controller-generated versioned Deployment name + // {{ .TemporalNamespace }} - the Temporal namespace the worker connects to + // {{ .BuildID }} - the Build ID for this version + // + // The controller also auto-injects two well-known fields. If your resource requires these fields, + // you must include them in the spec.object but leave them empty for the controller to fill in: + // scaleTargetRef - set to point at the versioned Deployment (for HPA, KEDA, WPA, etc.) + // matchLabels - set to the versioned Deployment's selector labels (for PDB, WPA, etc.) + // +kubebuilder:validation:Required + // +kubebuilder:pruning:PreserveUnknownFields + Object runtime.RawExtension `json:"object"` +} + +// OwnedResourceVersionStatus describes the status of an owned resource for a single Build ID. +type OwnedResourceVersionStatus struct { + // BuildID is the Build ID of the versioned Deployment this status entry refers to. + BuildID string `json:"buildID"` + + // Applied is true if the resource was successfully applied for this Build ID. + Applied bool `json:"applied"` + + // ResourceName is the name of the applied Kubernetes resource. + // +optional + ResourceName string `json:"resourceName,omitempty"` + + // Message describes any error if Applied is false. + // +optional + Message string `json:"message,omitempty"` + + // LastTransitionTime is the last time this status entry was updated. + LastTransitionTime metav1.Time `json:"lastTransitionTime"` +} + +// TemporalWorkerOwnedResourceStatus defines the observed state of TemporalWorkerOwnedResource. +type TemporalWorkerOwnedResourceStatus struct { + // Versions describes the per-Build-ID status of owned resources. + // +optional + Versions []OwnedResourceVersionStatus `json:"versions,omitempty"` +} + +//+kubebuilder:object:root=true +//+kubebuilder:subresource:status +//+kubebuilder:resource:shortName=twor +//+kubebuilder:printcolumn:name="Worker",type="string",JSONPath=".spec.workerRef.name",description="Referenced TemporalWorkerDeployment" +//+kubebuilder:printcolumn:name="Kind",type="string",JSONPath=".spec.object.kind",description="Kind of owned resource" +//+kubebuilder:printcolumn:name="Age",type="date",JSONPath=".metadata.creationTimestamp",description="Age" + +// TemporalWorkerOwnedResource attaches an arbitrary namespaced Kubernetes resource +// (HPA, PDB, WPA, custom CRDs, etc.) to each per-Build-ID versioned Deployment +// managed by a TemporalWorkerDeployment. One copy of the resource is created per +// active Build ID and is owned by the corresponding versioned Deployment. +type TemporalWorkerOwnedResource struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Spec TemporalWorkerOwnedResourceSpec `json:"spec,omitempty"` + Status TemporalWorkerOwnedResourceStatus `json:"status,omitempty"` +} + +//+kubebuilder:object:root=true + +// TemporalWorkerOwnedResourceList contains a list of TemporalWorkerOwnedResource. +type TemporalWorkerOwnedResourceList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata,omitempty"` + Items []TemporalWorkerOwnedResource `json:"items"` +} + +func init() { + SchemeBuilder.Register(&TemporalWorkerOwnedResource{}, &TemporalWorkerOwnedResourceList{}) +} diff --git a/api/v1alpha1/temporalworkerownedresource_webhook.go b/api/v1alpha1/temporalworkerownedresource_webhook.go new file mode 100644 index 00000000..67294077 --- /dev/null +++ b/api/v1alpha1/temporalworkerownedresource_webhook.go @@ -0,0 +1,463 @@ +// Unless explicitly stated otherwise all files in this repository are licensed under the MIT License. +// +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2024 Datadog, Inc. + +package v1alpha1 + +import ( + "context" + "encoding/json" + "fmt" + "os" + "strings" + + authorizationv1 "k8s.io/api/authorization/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/validation/field" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/webhook" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" +) + +// defaultBannedKinds lists resource kinds that the controller blocks by default. +// This prevents TWOR from being misused to run arbitrary services alongside the worker. +// The operator can override this list via the BANNED_KINDS environment variable. +var defaultBannedKinds = []string{"Deployment", "StatefulSet", "Job", "Pod", "CronJob"} + +// TemporalWorkerOwnedResourceValidator validates TemporalWorkerOwnedResource objects. +// It holds API-dependent dependencies (client, RESTMapper, controller SA identity). +// +kubebuilder:object:generate=false +type TemporalWorkerOwnedResourceValidator struct { + Client client.Client + RESTMapper meta.RESTMapper + ControllerSAName string + ControllerSANamespace string + BannedKinds []string +} + +var _ webhook.CustomValidator = &TemporalWorkerOwnedResourceValidator{} + +// NewTemporalWorkerOwnedResourceValidator creates a validator from a manager. +// +// Three environment variables are read at startup (all injected by the Helm chart): +// +// - POD_NAMESPACE — namespace in which the controller pod runs; used as the +// service-account namespace when performing SubjectAccessReview checks for the +// controller SA. Populated via the downward API (fieldRef: metadata.namespace). +// +// - SERVICE_ACCOUNT_NAME — name of the Kubernetes ServiceAccount the controller +// pod runs as; used when performing SubjectAccessReview checks for the controller +// SA. Populated via the downward API (fieldRef: spec.serviceAccountName). +// +// - BANNED_KINDS — comma-separated list of kind names that are not allowed as +// TemporalWorkerOwnedResource objects (e.g. "Deployment,StatefulSet,Job,Pod,CronJob"). +// Configurable via the ownedResources.bannedKinds Helm value. If unset, the +// built-in defaultBannedKinds list is used. +func NewTemporalWorkerOwnedResourceValidator(mgr ctrl.Manager) *TemporalWorkerOwnedResourceValidator { + bannedKinds := defaultBannedKinds + if env := os.Getenv("BANNED_KINDS"); env != "" { + parts := strings.Split(env, ",") + bannedKinds = make([]string, 0, len(parts)) + for _, p := range parts { + if trimmed := strings.TrimSpace(p); trimmed != "" { + bannedKinds = append(bannedKinds, trimmed) + } + } + } + return &TemporalWorkerOwnedResourceValidator{ + Client: mgr.GetClient(), + RESTMapper: mgr.GetRESTMapper(), + ControllerSAName: os.Getenv("SERVICE_ACCOUNT_NAME"), + ControllerSANamespace: os.Getenv("POD_NAMESPACE"), + BannedKinds: bannedKinds, + } +} + +// SetupWebhookWithManager registers the validating webhook with the manager. +// +// +kubebuilder:webhook:path=/validate-temporal-io-v1alpha1-temporalworkerownedresource,mutating=false,failurePolicy=fail,sideEffects=None,groups=temporal.io,resources=temporalworkerownedresources,verbs=create;update;delete,versions=v1alpha1,name=vtemporalworkerownedresource.kb.io,admissionReviewVersions=v1 +func (v *TemporalWorkerOwnedResourceValidator) SetupWebhookWithManager(mgr ctrl.Manager) error { + return ctrl.NewWebhookManagedBy(mgr). + For(&TemporalWorkerOwnedResource{}). + WithValidator(v). + Complete() +} + +// ValidateCreate validates a new TemporalWorkerOwnedResource. +func (v *TemporalWorkerOwnedResourceValidator) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) { + twor, ok := obj.(*TemporalWorkerOwnedResource) + if !ok { + return nil, apierrors.NewBadRequest("expected a TemporalWorkerOwnedResource") + } + return v.validate(ctx, nil, twor, "create") +} + +// ValidateUpdate validates an updated TemporalWorkerOwnedResource. +func (v *TemporalWorkerOwnedResourceValidator) ValidateUpdate(ctx context.Context, oldObj, newObj runtime.Object) (admission.Warnings, error) { + old, ok := oldObj.(*TemporalWorkerOwnedResource) + if !ok { + return nil, apierrors.NewBadRequest("expected a TemporalWorkerOwnedResource for old object") + } + newTWOR, ok := newObj.(*TemporalWorkerOwnedResource) + if !ok { + return nil, apierrors.NewBadRequest("expected a TemporalWorkerOwnedResource for new object") + } + return v.validate(ctx, old, newTWOR, "update") +} + +// ValidateDelete checks that the requesting user and the controller service account +// are both authorized to delete the underlying resource kind. This prevents privilege +// escalation: a user who cannot directly delete HPAs should not be able to delete a +// TemporalWorkerOwnedResource that manages HPAs and thereby trigger their removal. +func (v *TemporalWorkerOwnedResourceValidator) ValidateDelete(ctx context.Context, obj runtime.Object) (admission.Warnings, error) { + twor, ok := obj.(*TemporalWorkerOwnedResource) + if !ok { + return nil, apierrors.NewBadRequest("expected a TemporalWorkerOwnedResource") + } + + if twor.Spec.Object.Raw == nil { + return nil, nil // nothing to check + } + + warnings, errs := v.validateWithAPI(ctx, twor, "delete") + if len(errs) > 0 { + return warnings, apierrors.NewInvalid( + twor.GroupVersionKind().GroupKind(), + twor.GetName(), + errs, + ) + } + return warnings, nil +} + +// validate runs all validation checks (pure spec + API-dependent). +// verb is the RBAC verb to check for the underlying resource ("create" on create, "update" on update). +func (v *TemporalWorkerOwnedResourceValidator) validate(ctx context.Context, old, new *TemporalWorkerOwnedResource, verb string) (admission.Warnings, error) { + var allErrs field.ErrorList + var warnings admission.Warnings + + // Pure spec validation (no API calls needed) + specWarnings, specErrs := validateOwnedResourceSpec(new.Spec, v.BannedKinds) + warnings = append(warnings, specWarnings...) + allErrs = append(allErrs, specErrs...) + + // Immutability: workerRef.name must not change on update + if old != nil && old.Spec.WorkerRef.Name != new.Spec.WorkerRef.Name { + allErrs = append(allErrs, field.Forbidden( + field.NewPath("spec").Child("workerRef").Child("name"), + "workerRef.name is immutable and cannot be changed after creation", + )) + } + + // Return early if pure validation failed (API checks won't help) + if len(allErrs) > 0 { + return warnings, apierrors.NewInvalid( + new.GroupVersionKind().GroupKind(), + new.GetName(), + allErrs, + ) + } + + // API-dependent checks (RESTMapper scope + SubjectAccessReview) + apiWarnings, apiErrs := v.validateWithAPI(ctx, new, verb) + warnings = append(warnings, apiWarnings...) + allErrs = append(allErrs, apiErrs...) + + if len(allErrs) > 0 { + return warnings, apierrors.NewInvalid( + new.GroupVersionKind().GroupKind(), + new.GetName(), + allErrs, + ) + } + + return warnings, nil +} + +// validateOwnedResourceSpec performs pure (no-API) validation of the spec fields. +// It checks structural constraints that can be evaluated without talking to the API server. +func validateOwnedResourceSpec(spec TemporalWorkerOwnedResourceSpec, bannedKinds []string) (admission.Warnings, field.ErrorList) { + var allErrs field.ErrorList + var warnings admission.Warnings + + if spec.Object.Raw == nil { + allErrs = append(allErrs, field.Required( + field.NewPath("spec").Child("object"), + "object must be specified", + )) + return warnings, allErrs + } + + var obj map[string]interface{} + if err := json.Unmarshal(spec.Object.Raw, &obj); err != nil { + allErrs = append(allErrs, field.Invalid( + field.NewPath("spec").Child("object"), + "", + fmt.Sprintf("failed to parse object: %v", err), + )) + return warnings, allErrs + } + + // 1. apiVersion and kind must be present + apiVersionStr, _ := obj["apiVersion"].(string) + kind, _ := obj["kind"].(string) + if apiVersionStr == "" { + allErrs = append(allErrs, field.Required( + field.NewPath("spec").Child("object").Child("apiVersion"), + "apiVersion must be specified", + )) + } + if kind == "" { + allErrs = append(allErrs, field.Required( + field.NewPath("spec").Child("object").Child("kind"), + "kind must be specified", + )) + } + + // 2. metadata.name and metadata.namespace must be absent or empty — the controller generates them + if embeddedMeta, ok := obj["metadata"].(map[string]interface{}); ok { + if name, ok := embeddedMeta["name"].(string); ok && name != "" { + allErrs = append(allErrs, field.Forbidden( + field.NewPath("spec").Child("object").Child("metadata").Child("name"), + "metadata.name is generated by the controller; remove it from spec.object", + )) + } + if ns, ok := embeddedMeta["namespace"].(string); ok && ns != "" { + allErrs = append(allErrs, field.Forbidden( + field.NewPath("spec").Child("object").Child("metadata").Child("namespace"), + "metadata.namespace is set by the controller; remove it from spec.object", + )) + } + } + + // 3. Banned kinds + if kind != "" { + for _, banned := range bannedKinds { + if strings.EqualFold(kind, banned) { + allErrs = append(allErrs, field.Forbidden( + field.NewPath("spec").Child("object").Child("kind"), + fmt.Sprintf("kind %q is not allowed; "+ + "the owned resources mechanism is to attach resources that help the worker service run (e.g. scalers), "+ + "not for running arbitrary additional services; "+ + "your controller operator can change this requirement if you have a specific use case in mind", kind), + )) + break + } + } + } + + // Check spec-level fields + if innerSpec, ok := obj["spec"].(map[string]interface{}); ok { + innerSpecPath := field.NewPath("spec").Child("object").Child("spec") + + // 4. minReplicas must not be 0. + // Scaling to zero is not safe with Temporal's approximate_backlog_count metric: that metric + // is only emitted while the task queue is loaded in memory (i.e. while at least one worker + // is polling). If all workers are scaled to zero and the task queue goes idle for ~5 minutes, + // the metric stops being emitted and resets to zero. If new tasks then arrive but no worker + // polls, the metric remains zero — making it impossible for a metric-based autoscaler to + // detect the backlog and scale back up. Until Temporal provides a reliable mechanism for + // scaling workers to zero and back, minReplicas=0 is rejected. + if minReplicas, exists := innerSpec["minReplicas"]; exists { + if v, ok := minReplicas.(float64); ok && v == 0 { + allErrs = append(allErrs, field.Invalid( + innerSpecPath.Child("minReplicas"), + 0, + "minReplicas must not be 0; scaling Temporal workers to zero is not currently safe: "+ + "Temporal's approximate_backlog_count metric stops being emitted when the task queue is idle "+ + "with no pollers, so a metric-based autoscaler cannot detect a new backlog and scale back up "+ + "from zero once all workers are gone", + )) + } + } + + // 5. scaleTargetRef: if absent, no injection. If present and null, the controller injects it + // to point at the versioned Deployment. If present and non-null, reject — the controller + // owns this field when it is present and any hardcoded value would point at the wrong Deployment. + checkScaleTargetRefNotSet(innerSpec, innerSpecPath, &allErrs) + + // 6. selector.matchLabels: if absent, no injection. If present and null, the controller + // injects it with the versioned Deployment's selector labels. If present and non-null, + // reject — the controller owns this field when it is present. + if selector, ok := innerSpec["selector"].(map[string]interface{}); ok { + if ml, exists := selector["matchLabels"]; exists && ml != nil { + allErrs = append(allErrs, field.Forbidden( + innerSpecPath.Child("selector").Child("matchLabels"), + "if selector.matchLabels is present, the controller owns it and will set it to the "+ + "versioned Deployment's selector labels; set it to null to opt in to auto-injection, "+ + "or remove it entirely if you do not need label-based selection", + )) + } + } + } + + return warnings, allErrs +} + +// checkScaleTargetRefNotSet recursively traverses obj looking for any scaleTargetRef that is +// non-null. If absent, no injection. If null, the controller injects it to point at the versioned +// Deployment. If non-null, reject — the controller owns this field when present, and a hardcoded +// value would point at the wrong (non-versioned) Deployment. +func checkScaleTargetRefNotSet(obj map[string]interface{}, path *field.Path, allErrs *field.ErrorList) { + for k, v := range obj { + if k == "scaleTargetRef" { + if v != nil { + *allErrs = append(*allErrs, field.Forbidden( + path.Child("scaleTargetRef"), + "if scaleTargetRef is present, the controller owns it and will set it to point at the "+ + "versioned Deployment; set it to null to opt in to auto-injection, "+ + "or remove it entirely if you do not need the scaleTargetRef field", + )) + } + // null is the correct opt-in sentinel — leave it alone + continue + } + if nested, ok := v.(map[string]interface{}); ok { + checkScaleTargetRefNotSet(nested, path.Child(k), allErrs) + } + } +} + +// validateWithAPI performs API-dependent validation: RESTMapper scope check and +// SubjectAccessReview for both the requesting user and the controller service account. +// verb is the RBAC verb to check ("create" on create/update, "delete" on delete). +// It is a no-op when Client or RESTMapper is nil (e.g., in unit tests). +func (v *TemporalWorkerOwnedResourceValidator) validateWithAPI(ctx context.Context, twor *TemporalWorkerOwnedResource, verb string) (admission.Warnings, field.ErrorList) { + var allErrs field.ErrorList + var warnings admission.Warnings + + if v.Client == nil || v.RESTMapper == nil { + return warnings, allErrs + } + + var obj map[string]interface{} + if err := json.Unmarshal(twor.Spec.Object.Raw, &obj); err != nil { + return warnings, allErrs // already caught in validateOwnedResourceSpec + } + apiVersionStr, _ := obj["apiVersion"].(string) + kind, _ := obj["kind"].(string) + if apiVersionStr == "" || kind == "" { + return warnings, allErrs // already caught in validateOwnedResourceSpec + } + + gv, err := schema.ParseGroupVersion(apiVersionStr) + if err != nil { + allErrs = append(allErrs, field.Invalid( + field.NewPath("spec").Child("object").Child("apiVersion"), + apiVersionStr, + fmt.Sprintf("invalid apiVersion: %v", err), + )) + return warnings, allErrs + } + gvk := gv.WithKind(kind) + + // 1. Namespace-scope check via RESTMapper + mappings, err := v.RESTMapper.RESTMappings(gvk.GroupKind(), gvk.Version) + if err != nil { + allErrs = append(allErrs, field.Invalid( + field.NewPath("spec").Child("object").Child("apiVersion"), + apiVersionStr, + fmt.Sprintf("could not look up %s %s in the API server: %v (ensure the resource type is installed and the apiVersion/kind are correct)", kind, apiVersionStr, err), + )) + return warnings, allErrs + } + var resource string + for _, mapping := range mappings { + if mapping.Scope.Name() != meta.RESTScopeNameNamespace { + allErrs = append(allErrs, field.Forbidden( + field.NewPath("spec").Child("object").Child("kind"), + fmt.Sprintf("kind %q is not namespace-scoped; only namespaced resources are allowed in TemporalWorkerOwnedResource", kind), + )) + return warnings, allErrs + } + if resource == "" { + resource = mapping.Resource.Resource + } + } + + // 2. SubjectAccessReview: requesting user + req, reqErr := admission.RequestFromContext(ctx) + if reqErr == nil && req.UserInfo.Username != "" { + userSAR := &authorizationv1.SubjectAccessReview{ + Spec: authorizationv1.SubjectAccessReviewSpec{ + User: req.UserInfo.Username, + Groups: req.UserInfo.Groups, + ResourceAttributes: &authorizationv1.ResourceAttributes{ + Namespace: twor.Namespace, + Verb: verb, + Group: gv.Group, + Version: gv.Version, + Resource: resource, + }, + }, + } + if err := v.Client.Create(ctx, userSAR); err != nil { + allErrs = append(allErrs, field.InternalError( + field.NewPath("spec").Child("object"), + fmt.Errorf("failed to check requesting user permissions: %w", err), + )) + } else if !userSAR.Status.Allowed { + reason := userSAR.Status.Reason + if reason == "" { + reason = "permission denied" + } + allErrs = append(allErrs, field.Forbidden( + field.NewPath("spec").Child("object"), + fmt.Sprintf("requesting user %q is not authorized to %s %s in namespace %q: %s", + req.UserInfo.Username, verb, kind, twor.Namespace, reason), + )) + } + } + + // 3. SubjectAccessReview: controller service account. + // When Kubernetes evaluates a real request from a service account token, it automatically + // considers the SA's username AND the groups the SA belongs to. When we construct a SAR + // manually (without a real token), we must supply those groups ourselves — Kubernetes will + // not add them. Without the groups, we could get a false negative if the cluster admin + // granted permissions to "all SAs in namespace X" via a group binding + // (e.g. system:serviceaccounts:{namespace}) rather than to the specific SA username. Including + // the groups here mirrors what Kubernetes would do for a real request from the SA. + if v.ControllerSAName != "" && v.ControllerSANamespace != "" { + controllerUser := fmt.Sprintf("system:serviceaccount:%s:%s", v.ControllerSANamespace, v.ControllerSAName) + controllerSAR := &authorizationv1.SubjectAccessReview{ + Spec: authorizationv1.SubjectAccessReviewSpec{ + User: controllerUser, + Groups: []string{ + "system:serviceaccounts", + fmt.Sprintf("system:serviceaccounts:%s", v.ControllerSANamespace), + "system:authenticated", + }, + ResourceAttributes: &authorizationv1.ResourceAttributes{ + Namespace: twor.Namespace, + Verb: verb, + Group: gv.Group, + Version: gv.Version, + Resource: resource, + }, + }, + } + if err := v.Client.Create(ctx, controllerSAR); err != nil { + allErrs = append(allErrs, field.InternalError( + field.NewPath("spec").Child("object"), + fmt.Errorf("failed to check controller service account permissions: %w", err), + )) + } else if !controllerSAR.Status.Allowed { + reason := controllerSAR.Status.Reason + if reason == "" { + reason = "permission denied" + } + allErrs = append(allErrs, field.Forbidden( + field.NewPath("spec").Child("object"), + fmt.Sprintf("controller service account %q is not authorized to %s %s in namespace %q: %s; "+ + "grant it the required RBAC permissions via the Helm chart configuration", + controllerUser, verb, kind, twor.Namespace, reason), + )) + } + } + + return warnings, allErrs +} diff --git a/api/v1alpha1/temporalworkerownedresource_webhook_test.go b/api/v1alpha1/temporalworkerownedresource_webhook_test.go new file mode 100644 index 00000000..d607e876 --- /dev/null +++ b/api/v1alpha1/temporalworkerownedresource_webhook_test.go @@ -0,0 +1,391 @@ +// Unless explicitly stated otherwise all files in this repository are licensed under the MIT License. +// +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2024 Datadog, Inc. + +package v1alpha1_test + +import ( + "context" + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +// newTWOR builds a TemporalWorkerOwnedResource with an arbitrary embedded object spec. +func newTWOR(name, workerRefName string, embeddedObj map[string]interface{}) *temporaliov1alpha1.TemporalWorkerOwnedResource { + raw, _ := json.Marshal(embeddedObj) + return &temporaliov1alpha1.TemporalWorkerOwnedResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + }, + Spec: temporaliov1alpha1.TemporalWorkerOwnedResourceSpec{ + WorkerRef: temporaliov1alpha1.WorkerDeploymentReference{ + Name: workerRefName, + }, + Object: runtime.RawExtension{Raw: raw}, + }, + } +} + +// validHPAObject returns a minimal valid HPA embedded object spec. +func validHPAObject() map[string]interface{} { + return map[string]interface{}{ + "apiVersion": "autoscaling/v2", + "kind": "HorizontalPodAutoscaler", + "spec": map[string]interface{}{ + "minReplicas": float64(2), + "maxReplicas": float64(10), + }, + } +} + +// newValidatorNoAPI creates a validator with no API client (API checks skipped). +func newValidatorNoAPI() *temporaliov1alpha1.TemporalWorkerOwnedResourceValidator { + return &temporaliov1alpha1.TemporalWorkerOwnedResourceValidator{ + BannedKinds: []string{"Deployment", "StatefulSet", "Job", "Pod", "CronJob"}, + } +} + +func TestTemporalWorkerOwnedResource_ValidateCreate(t *testing.T) { + tests := map[string]struct { + obj runtime.Object + errorMsg string + }{ + "valid HPA": { + obj: newTWOR("my-hpa", "my-worker", validHPAObject()), + }, + "wrong object type": { + obj: &corev1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod"}}, + errorMsg: "expected a TemporalWorkerOwnedResource", + }, + "nil raw object": { + obj: &temporaliov1alpha1.TemporalWorkerOwnedResource{ + ObjectMeta: metav1.ObjectMeta{Name: "empty", Namespace: "default"}, + Spec: temporaliov1alpha1.TemporalWorkerOwnedResourceSpec{ + WorkerRef: temporaliov1alpha1.WorkerDeploymentReference{Name: "my-worker"}, + Object: runtime.RawExtension{Raw: nil}, + }, + }, + errorMsg: "object must be specified", + }, + "missing apiVersion": { + obj: newTWOR("no-apiv", "my-worker", map[string]interface{}{ + "kind": "HorizontalPodAutoscaler", + "spec": map[string]interface{}{"minReplicas": float64(1)}, + }), + errorMsg: "apiVersion must be specified", + }, + "missing kind": { + obj: newTWOR("no-kind", "my-worker", map[string]interface{}{ + "apiVersion": "autoscaling/v2", + "spec": map[string]interface{}{"minReplicas": float64(1)}, + }), + errorMsg: "kind must be specified", + }, + "metadata.name set in embedded object": { + obj: newTWOR("has-name", "my-worker", map[string]interface{}{ + "apiVersion": "autoscaling/v2", + "kind": "HorizontalPodAutoscaler", + "metadata": map[string]interface{}{"name": "fixed-name"}, + "spec": map[string]interface{}{"minReplicas": float64(2)}, + }), + errorMsg: "metadata.name is generated by the controller", + }, + "metadata.namespace set in embedded object": { + obj: newTWOR("has-ns", "my-worker", map[string]interface{}{ + "apiVersion": "autoscaling/v2", + "kind": "HorizontalPodAutoscaler", + "metadata": map[string]interface{}{"namespace": "some-ns"}, + "spec": map[string]interface{}{"minReplicas": float64(2)}, + }), + errorMsg: "metadata.namespace is set by the controller", + }, + "banned kind Deployment": { + obj: newTWOR("bad-deploy", "my-worker", map[string]interface{}{ + "apiVersion": "apps/v1", + "kind": "Deployment", + "spec": map[string]interface{}{}, + }), + errorMsg: `kind "Deployment" is not allowed`, + }, + "banned kind StatefulSet": { + obj: newTWOR("bad-sts", "my-worker", map[string]interface{}{ + "apiVersion": "apps/v1", + "kind": "StatefulSet", + "spec": map[string]interface{}{}, + }), + errorMsg: `kind "StatefulSet" is not allowed`, + }, + "banned kind Pod": { + obj: newTWOR("bad-pod", "my-worker", map[string]interface{}{ + "apiVersion": "v1", + "kind": "Pod", + "spec": map[string]interface{}{}, + }), + errorMsg: `kind "Pod" is not allowed`, + }, + "banned kind case-insensitive": { + obj: newTWOR("bad-job", "my-worker", map[string]interface{}{ + "apiVersion": "batch/v1", + "kind": "job", // lowercase + "spec": map[string]interface{}{}, + }), + errorMsg: `kind "job" is not allowed`, + }, + "minReplicas is 0": { + obj: newTWOR("zero-replicas", "my-worker", map[string]interface{}{ + "apiVersion": "autoscaling/v2", + "kind": "HorizontalPodAutoscaler", + "spec": map[string]interface{}{ + "minReplicas": float64(0), + "maxReplicas": float64(5), + }, + }), + errorMsg: "minReplicas must not be 0", + }, + "non-null scaleTargetRef": { + obj: newTWOR("has-scale-ref", "my-worker", map[string]interface{}{ + "apiVersion": "autoscaling/v2", + "kind": "HorizontalPodAutoscaler", + "spec": map[string]interface{}{ + "scaleTargetRef": map[string]interface{}{ + "apiVersion": "apps/v1", + "kind": "Deployment", + "name": "some-other-deployment", + }, + "minReplicas": float64(2), + "maxReplicas": float64(10), + }, + }), + errorMsg: "if scaleTargetRef is present, the controller owns it", + }, + "null scaleTargetRef is valid (opt-in)": { + obj: newTWOR("null-scale-ref", "my-worker", map[string]interface{}{ + "apiVersion": "autoscaling/v2", + "kind": "HorizontalPodAutoscaler", + "spec": map[string]interface{}{ + "scaleTargetRef": nil, // null = opt-in sentinel + "minReplicas": float64(2), + "maxReplicas": float64(10), + }, + }), + }, + "non-null selector.matchLabels": { + obj: newTWOR("has-match-labels", "my-worker", map[string]interface{}{ + "apiVersion": "policy/v1", + "kind": "PodDisruptionBudget", + "spec": map[string]interface{}{ + "minAvailable": float64(1), + "selector": map[string]interface{}{ + "matchLabels": map[string]interface{}{ + "app": "my-worker", + }, + }, + }, + }), + errorMsg: "if selector.matchLabels is present, the controller owns it", + }, + "null selector.matchLabels is valid (opt-in)": { + obj: newTWOR("null-match-labels", "my-worker", map[string]interface{}{ + "apiVersion": "policy/v1", + "kind": "PodDisruptionBudget", + "spec": map[string]interface{}{ + "minAvailable": float64(1), + "selector": map[string]interface{}{ + "matchLabels": nil, // null = opt-in sentinel + }, + }, + }), + }, + "absent selector.matchLabels is valid": { + obj: newTWOR("no-match-labels", "my-worker", map[string]interface{}{ + "apiVersion": "policy/v1", + "kind": "PodDisruptionBudget", + "spec": map[string]interface{}{ + "minAvailable": float64(1), + "selector": map[string]interface{}{}, + }, + }), + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + ctx := context.Background() + v := newValidatorNoAPI() + + warnings, err := v.ValidateCreate(ctx, tc.obj) + + if tc.errorMsg != "" { + require.Error(t, err, "expected an error containing %q", tc.errorMsg) + assert.Contains(t, err.Error(), tc.errorMsg) + } else { + require.NoError(t, err) + } + assert.Nil(t, warnings) + }) + } +} + +func TestTemporalWorkerOwnedResource_ValidateUpdate_Immutability(t *testing.T) { + tests := map[string]struct { + oldWorkerRef string + newWorkerRef string + errorMsg string + }{ + "same workerRef is valid": { + oldWorkerRef: "my-worker", + newWorkerRef: "my-worker", + }, + "changing workerRef is forbidden": { + oldWorkerRef: "my-worker", + newWorkerRef: "different-worker", + errorMsg: "workerRef.name is immutable", + }, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + ctx := context.Background() + v := newValidatorNoAPI() + + oldTWOR := newTWOR("my-hpa", tc.oldWorkerRef, validHPAObject()) + newTWOR := newTWOR("my-hpa", tc.newWorkerRef, validHPAObject()) + + _, err := v.ValidateUpdate(ctx, oldTWOR, newTWOR) + + if tc.errorMsg != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), tc.errorMsg) + } else { + require.NoError(t, err) + } + }) + } +} + +func TestTemporalWorkerOwnedResource_ValidateUpdate_ValidatesNewSpec(t *testing.T) { + ctx := context.Background() + v := newValidatorNoAPI() + + // Update from a valid spec to one with banned kind should fail + oldTWOR := newTWOR("my-hpa", "my-worker", validHPAObject()) + invalidNew := newTWOR("my-hpa", "my-worker", map[string]interface{}{ + "apiVersion": "apps/v1", + "kind": "Deployment", + "spec": map[string]interface{}{}, + }) + + _, err := v.ValidateUpdate(ctx, oldTWOR, invalidNew) + require.Error(t, err) + assert.Contains(t, err.Error(), `kind "Deployment" is not allowed`) +} + +func TestTemporalWorkerOwnedResource_ValidateDelete(t *testing.T) { + ctx := context.Background() + v := newValidatorNoAPI() + + // When API client is nil, delete permission checks are skipped + twor := newTWOR("my-hpa", "my-worker", validHPAObject()) + warnings, err := v.ValidateDelete(ctx, twor) + assert.NoError(t, err) + assert.Nil(t, warnings) +} + +func TestTemporalWorkerOwnedResource_ValidateDelete_WrongObjectType(t *testing.T) { + ctx := context.Background() + v := newValidatorNoAPI() + + _, err := v.ValidateDelete(ctx, &corev1.Pod{}) + require.Error(t, err) + assert.Contains(t, err.Error(), "expected a TemporalWorkerOwnedResource") +} + +func TestTemporalWorkerOwnedResource_ValidateDelete_NilRaw(t *testing.T) { + ctx := context.Background() + v := newValidatorNoAPI() + + // TWOR with nil spec.object should be permitted (nothing to check) + twor := &temporaliov1alpha1.TemporalWorkerOwnedResource{ + ObjectMeta: metav1.ObjectMeta{Name: "empty", Namespace: "default"}, + Spec: temporaliov1alpha1.TemporalWorkerOwnedResourceSpec{ + WorkerRef: temporaliov1alpha1.WorkerDeploymentReference{Name: "my-worker"}, + Object: runtime.RawExtension{Raw: nil}, + }, + } + _, err := v.ValidateDelete(ctx, twor) + assert.NoError(t, err) +} + +func TestTemporalWorkerOwnedResource_ValidateCreate_WrongObjectType(t *testing.T) { + ctx := context.Background() + v := newValidatorNoAPI() + + _, err := v.ValidateCreate(ctx, &corev1.Pod{}) + require.Error(t, err) + assert.Contains(t, err.Error(), "expected a TemporalWorkerOwnedResource") +} + +func TestTemporalWorkerOwnedResource_ValidateUpdate_WrongObjectType(t *testing.T) { + ctx := context.Background() + v := newValidatorNoAPI() + + _, err := v.ValidateUpdate(ctx, &corev1.Pod{}, newTWOR("x", "w", validHPAObject())) + require.Error(t, err) + assert.Contains(t, err.Error(), "expected a TemporalWorkerOwnedResource for old object") + + _, err = v.ValidateUpdate(ctx, newTWOR("x", "w", validHPAObject()), &corev1.Pod{}) + require.Error(t, err) + assert.Contains(t, err.Error(), "expected a TemporalWorkerOwnedResource for new object") +} + +func TestTemporalWorkerOwnedResource_CustomBannedKinds(t *testing.T) { + ctx := context.Background() + v := &temporaliov1alpha1.TemporalWorkerOwnedResourceValidator{ + BannedKinds: []string{"CustomBannedKind"}, + } + + // CustomBannedKind should be rejected + _, err := v.ValidateCreate(ctx, newTWOR("bad", "my-worker", map[string]interface{}{ + "apiVersion": "example.com/v1", + "kind": "CustomBannedKind", + "spec": map[string]interface{}{}, + })) + require.Error(t, err) + assert.Contains(t, err.Error(), `kind "CustomBannedKind" is not allowed`) + + // Default banned kinds (Deployment) should be allowed if custom list is set + _, err = v.ValidateCreate(ctx, newTWOR("deploy", "my-worker", map[string]interface{}{ + "apiVersion": "apps/v1", + "kind": "Deployment", + "spec": map[string]interface{}{}, + })) + require.NoError(t, err, "Deployment should be allowed when custom bannedKinds list does not include it") +} + +func TestTemporalWorkerOwnedResource_MultipleErrors(t *testing.T) { + ctx := context.Background() + v := newValidatorNoAPI() + + // Both metadata.name and minReplicas=0 are invalid + obj := map[string]interface{}{ + "apiVersion": "autoscaling/v2", + "kind": "HorizontalPodAutoscaler", + "metadata": map[string]interface{}{"name": "bad-name"}, + "spec": map[string]interface{}{ + "minReplicas": float64(0), + "maxReplicas": float64(5), + }, + } + _, err := v.ValidateCreate(ctx, newTWOR("multi-err", "my-worker", obj)) + require.Error(t, err) + assert.Contains(t, err.Error(), "metadata.name is generated by the controller") + assert.Contains(t, err.Error(), "minReplicas must not be 0") +} diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 1decf431..1df0a443 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -6,6 +6,7 @@ package v1alpha1 import ( "k8s.io/api/core/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -90,9 +91,44 @@ func (in *DeprecatedWorkerDeploymentVersion) DeepCopy() *DeprecatedWorkerDeploym return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *GateInputSource) DeepCopyInto(out *GateInputSource) { + *out = *in + if in.ConfigMapKeyRef != nil { + in, out := &in.ConfigMapKeyRef, &out.ConfigMapKeyRef + *out = new(v1.ConfigMapKeySelector) + (*in).DeepCopyInto(*out) + } + if in.SecretKeyRef != nil { + in, out := &in.SecretKeyRef, &out.SecretKeyRef + *out = new(v1.SecretKeySelector) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GateInputSource. +func (in *GateInputSource) DeepCopy() *GateInputSource { + if in == nil { + return nil + } + out := new(GateInputSource) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GateWorkflowConfig) DeepCopyInto(out *GateWorkflowConfig) { *out = *in + if in.Input != nil { + in, out := &in.Input, &out.Input + *out = new(apiextensionsv1.JSON) + (*in).DeepCopyInto(*out) + } + if in.InputFrom != nil { + in, out := &in.InputFrom, &out.InputFrom + *out = new(GateInputSource) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GateWorkflowConfig. @@ -120,6 +156,22 @@ func (in *ManualRolloutStrategy) DeepCopy() *ManualRolloutStrategy { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *OwnedResourceVersionStatus) DeepCopyInto(out *OwnedResourceVersionStatus) { + *out = *in + in.LastTransitionTime.DeepCopyInto(&out.LastTransitionTime) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new OwnedResourceVersionStatus. +func (in *OwnedResourceVersionStatus) DeepCopy() *OwnedResourceVersionStatus { + if in == nil { + return nil + } + out := new(OwnedResourceVersionStatus) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RolloutStep) DeepCopyInto(out *RolloutStep) { *out = *in @@ -142,7 +194,7 @@ func (in *RolloutStrategy) DeepCopyInto(out *RolloutStrategy) { if in.Gate != nil { in, out := &in.Gate, &out.Gate *out = new(GateWorkflowConfig) - **out = **in + (*in).DeepCopyInto(*out) } if in.Steps != nil { in, out := &in.Steps, &out.Steps @@ -489,6 +541,119 @@ func (in *TemporalWorkerDeploymentStatus) DeepCopy() *TemporalWorkerDeploymentSt return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TemporalWorkerOwnedResource) DeepCopyInto(out *TemporalWorkerOwnedResource) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + in.Spec.DeepCopyInto(&out.Spec) + in.Status.DeepCopyInto(&out.Status) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TemporalWorkerOwnedResource. +func (in *TemporalWorkerOwnedResource) DeepCopy() *TemporalWorkerOwnedResource { + if in == nil { + return nil + } + out := new(TemporalWorkerOwnedResource) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *TemporalWorkerOwnedResource) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TemporalWorkerOwnedResourceList) DeepCopyInto(out *TemporalWorkerOwnedResourceList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]TemporalWorkerOwnedResource, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TemporalWorkerOwnedResourceList. +func (in *TemporalWorkerOwnedResourceList) DeepCopy() *TemporalWorkerOwnedResourceList { + if in == nil { + return nil + } + out := new(TemporalWorkerOwnedResourceList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *TemporalWorkerOwnedResourceList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TemporalWorkerOwnedResourceSpec) DeepCopyInto(out *TemporalWorkerOwnedResourceSpec) { + *out = *in + out.WorkerRef = in.WorkerRef + in.Object.DeepCopyInto(&out.Object) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TemporalWorkerOwnedResourceSpec. +func (in *TemporalWorkerOwnedResourceSpec) DeepCopy() *TemporalWorkerOwnedResourceSpec { + if in == nil { + return nil + } + out := new(TemporalWorkerOwnedResourceSpec) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *TemporalWorkerOwnedResourceStatus) DeepCopyInto(out *TemporalWorkerOwnedResourceStatus) { + *out = *in + if in.Versions != nil { + in, out := &in.Versions, &out.Versions + *out = make([]OwnedResourceVersionStatus, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TemporalWorkerOwnedResourceStatus. +func (in *TemporalWorkerOwnedResourceStatus) DeepCopy() *TemporalWorkerOwnedResourceStatus { + if in == nil { + return nil + } + out := new(TemporalWorkerOwnedResourceStatus) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *WorkerDeploymentReference) DeepCopyInto(out *WorkerDeploymentReference) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkerDeploymentReference. +func (in *WorkerDeploymentReference) DeepCopy() *WorkerDeploymentReference { + if in == nil { + return nil + } + out := new(WorkerDeploymentReference) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *WorkerOptions) DeepCopyInto(out *WorkerOptions) { *out = *in diff --git a/cmd/main.go b/cmd/main.go index 646b9f51..f16306e3 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -96,11 +96,10 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "TemporalWorkerDeployment") os.Exit(1) } - // TODO(jlegrone): Enable the webhook after fixing TLS - //if err = (&temporaliov1alpha1.TemporalWorker{}).SetupWebhookWithManager(mgr); err != nil { - // setupLog.Error(err, "unable to create webhook", "webhook", "TemporalWorker") - // os.Exit(1) - //} + if err = temporaliov1alpha1.NewTemporalWorkerOwnedResourceValidator(mgr).SetupWebhookWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create webhook", "webhook", "TemporalWorkerOwnedResource") + os.Exit(1) + } //+kubebuilder:scaffold:builder if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { diff --git a/docs/README.md b/docs/README.md index 08de7d0f..9b1a6849 100644 --- a/docs/README.md +++ b/docs/README.md @@ -30,6 +30,9 @@ Technical constraints and limitations of the Temporal Worker Controller system, ### [Ownership](ownership.md) How the controller gets permission to manage a Worker Deployment, how a human client can take or give back control. +### [TemporalWorkerOwnedResource](owned-resources.md) +How to attach HPAs, PodDisruptionBudgets, and other Kubernetes resources to each active versioned Deployment. Covers the auto-injection model, RBAC setup, webhook TLS, and examples. + --- *Note: This documentation structure is designed to grow with the project.* \ No newline at end of file diff --git a/docs/owned-resources.md b/docs/owned-resources.md new file mode 100644 index 00000000..bb529a3e --- /dev/null +++ b/docs/owned-resources.md @@ -0,0 +1,158 @@ +# TemporalWorkerOwnedResource + +`TemporalWorkerOwnedResource` lets you attach arbitrary Kubernetes resources — HPAs, PodDisruptionBudgets, KEDA ScaledObjects, custom CRDs — to each worker version that has running workers. The controller creates one copy of the resource per worker version with a running Deployment, automatically wired to the correct versioned Deployment. + +## Why you need this + +The Temporal Worker Controller creates one Kubernetes `Deployment` per worker version (Build ID). If you attach an HPA directly to a single Deployment, it breaks as versions roll over — the old HPA still targets the old Deployment, the new Deployment has no HPA, and you have to manage cleanup yourself. + +`TemporalWorkerOwnedResource` solves this by treating the attached resource as a template. The controller renders one instance per worker version with running workers, injects the correct versioned Deployment name, and cleans up automatically when the versioned Deployment is deleted (e.g., during the sunset process after traffic has drained). + +This is also the recommended mechanism for metric-based or backlog-based autoscaling: attach a KEDA `ScaledObject` (or a standard HPA with custom metrics) to your workers and the controller keeps one per running worker version, each pointing at the right Deployment. + +## How it works + +1. You create a `TemporalWorkerOwnedResource` that references a `TemporalWorkerDeployment` and contains the resource spec in `spec.object`. +2. The validating webhook checks that you have permission to manage that resource type yourself (SubjectAccessReview), and that the resource kind isn't on the banned list. +3. On each reconcile loop, the controller renders one copy of `spec.object` per worker version with a running Deployment, injects fields (see below), and applies it via Server-Side Apply. +4. Each copy is owned by the corresponding versioned `Deployment`, so it is garbage-collected automatically when that Deployment is deleted. +5. `TemporalWorkerOwnedResource.status.versions` is updated with the applied/failed status for each Build ID. + +## Auto-injection + +The controller auto-injects two fields when you set them to `null` in `spec.object`. Setting them to `null` is the explicit signal that you want injection: +- If you omit the field entirely, nothing is injected. +- If you set a non-null value, the webhook rejects the object because the controller owns these fields. + +| Field | Injected value | +|-------|---------------| +| `spec.scaleTargetRef` (any resource with this field) | `{apiVersion: apps/v1, kind: Deployment, name: }` | +| `spec.selector.matchLabels` (any resource with this field) | `{temporal.io/build-id: , temporal.io/deployment-name: }` | + +The `scaleTargetRef` injection applies to any resource type that has a `scaleTargetRef` field — not just HPAs. KEDA `ScaledObjects` and other autoscaler CRDs use the same field and benefit from the same injection. + +## Resource naming + +Each per-Build-ID copy is given a unique, DNS-safe name derived from the `(twdName, tworName, buildID)` triple. Names are capped at 47 characters to be safe for all Kubernetes resource types, including Deployment (which has pod-naming constraints that effectively limit deployment names to ~47 characters). The name always ends with an 8-character hash of the full triple, so uniqueness is guaranteed even when the human-readable prefix is truncated. + +Use `kubectl get ` after a reconcile to see the created resources and their names. + +## Banned resource kinds + +Certain resource kinds are blocked by default to prevent misuse (e.g., using `TemporalWorkerOwnedResource` to spin up arbitrary workloads). The default banned list is: + +``` +Deployment, StatefulSet, Job, Pod, CronJob +``` + +The banned list is configured via `ownedResourceConfig.bannedKinds` in Helm values and is visible as the `BANNED_KINDS` environment variable on the controller pod: + +```bash +kubectl get pod -n -l app.kubernetes.io/name=temporal-worker-controller \ + -o jsonpath='{.items[0].spec.containers[0].env[?(@.name=="BANNED_KINDS")].value}' +``` + +## RBAC + +### What the webhook checks + +When you create or update a `TemporalWorkerOwnedResource`, the webhook performs SubjectAccessReviews to verify: + +1. **You** (the requesting user) can create/update the embedded resource type in that namespace. +2. **The controller's service account** can create/update the embedded resource type in that namespace. + +If either check fails, the request is rejected. This prevents privilege escalation — you cannot use `TemporalWorkerOwnedResource` to create resources you don't already have permission to create yourself. + +### What to configure in Helm + +`ownedResourceConfig.rbac.rules` controls what resource types the controller's ClusterRole permits it to manage. The defaults cover HPAs and PodDisruptionBudgets: + +```yaml +ownedResourceConfig: + rbac: + rules: + - apiGroups: ["autoscaling"] + resources: ["horizontalpodautoscalers"] + - apiGroups: ["policy"] + resources: ["poddisruptionbudgets"] +``` + +Add entries for any other resource types you want to attach (e.g., KEDA `ScaledObjects`). For development clusters you can set `rbac.wildcard: true` to grant access to all resource types, but this is not recommended for production. + +### What to configure for your users + +Users who create `TemporalWorkerOwnedResources` also need RBAC permission to manage the embedded resource type directly. For example, to let a team create `TemporalWorkerOwnedResources` that embed HPAs, they need the standard `autoscaling` permissions in their namespace — there is nothing `TemporalWorkerOwnedResource`-specific to configure for this. + +## Webhook TLS + +The `TemporalWorkerOwnedResource` validating webhook requires TLS. Install [cert-manager](https://cert-manager.io/docs/installation/) before deploying the controller — the Helm chart handles everything else automatically (`certmanager.enabled: true` is the default). + +## Example: HPA per worker version + +```yaml +apiVersion: temporal.io/v1alpha1 +kind: TemporalWorkerOwnedResource +metadata: + name: my-worker-hpa + namespace: my-namespace +spec: + # Reference the TemporalWorkerDeployment to attach to. + workerRef: + name: my-worker + + # The resource template. The controller creates one copy per worker version + # with a running Deployment. + object: + apiVersion: autoscaling/v2 + kind: HorizontalPodAutoscaler + spec: + # null tells the controller to auto-inject the versioned Deployment reference. + # Do not set this to a real value — the webhook will reject it. + scaleTargetRef: null + minReplicas: 2 + maxReplicas: 10 + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 70 +``` + +See [examples/twor-hpa.yaml](../examples/twor-hpa.yaml) for an example pre-configured for the helloworld demo. + +## Example: PodDisruptionBudget per worker version + +```yaml +apiVersion: temporal.io/v1alpha1 +kind: TemporalWorkerOwnedResource +metadata: + name: my-worker-pdb + namespace: my-namespace +spec: + workerRef: + name: my-worker + object: + apiVersion: policy/v1 + kind: PodDisruptionBudget + spec: + minAvailable: 1 + # null tells the controller to auto-inject {temporal.io/build-id, temporal.io/deployment-name}. + selector: + matchLabels: null +``` + +## Checking status + +```bash +# See all TemporalWorkerOwnedResources and which TWD they reference +kubectl get temporalworkerownedresource -n my-namespace + +# See per-Build-ID apply status +kubectl get temporalworkerownedresource my-worker-hpa -n my-namespace \ + -o jsonpath='{.status.versions}' | jq . + +# See the created HPAs +kubectl get hpa -n my-namespace +``` diff --git a/examples/twor-hpa.yaml b/examples/twor-hpa.yaml new file mode 100644 index 00000000..e394a03c --- /dev/null +++ b/examples/twor-hpa.yaml @@ -0,0 +1,49 @@ +# TemporalWorkerOwnedResource — HPA example +# +# This example attaches a HorizontalPodAutoscaler to each active versioned Deployment +# of the "helloworld" TemporalWorkerDeployment (from the local demo). +# +# Prerequisites: +# - The helloworld demo is running (skaffold run --profile helloworld-worker) +# - The controller was installed with cert-manager (certmanager.enabled: true, the default) +# - You have permission to create HPAs in this namespace +# +# Apply: +# kubectl apply -f examples/twor-hpa.yaml +# +# Verify: +# kubectl get twor # shows Applied status per Build ID +# kubectl get hpa # shows one HPA per active Build ID +# +# See docs/owned-resources.md for full documentation. +apiVersion: temporal.io/v1alpha1 +kind: TemporalWorkerOwnedResource +metadata: + name: helloworld-hpa + # Deploy into the same namespace as the helloworld TemporalWorkerDeployment. + # The default Skaffold setup does not set a namespace, so resources land in "default". + namespace: default +spec: + # Must match the name of the TemporalWorkerDeployment. + workerRef: + name: helloworld + + # The resource template. The controller creates one copy per active Build ID, + # naming each one "--". + object: + apiVersion: autoscaling/v2 + kind: HorizontalPodAutoscaler + spec: + # Setting scaleTargetRef to null tells the controller to auto-inject the + # correct versioned Deployment reference for each Build ID. Do not set this + # to a real value — the webhook will reject it. + scaleTargetRef: null + minReplicas: 1 + maxReplicas: 3 + metrics: + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: 70 diff --git a/helm/temporal-worker-controller/crds/temporal.io_temporalworkerdeployments.yaml b/helm/temporal-worker-controller/crds/temporal.io_temporalworkerdeployments.yaml index 9c9ad07c..4fdf54a8 100644 --- a/helm/temporal-worker-controller/crds/temporal.io_temporalworkerdeployments.yaml +++ b/helm/temporal-worker-controller/crds/temporal.io_temporalworkerdeployments.yaml @@ -3950,13 +3950,13 @@ spec: required: - name type: object + temporalNamespace: + minLength: 1 + type: string unsafeCustomBuildID: maxLength: 63 pattern: ^[a-zA-Z0-9]([a-zA-Z0-9._-]*[a-zA-Z0-9])?$ type: string - temporalNamespace: - minLength: 1 - type: string required: - connectionRef - temporalNamespace diff --git a/helm/temporal-worker-controller/crds/temporal.io_temporalworkerownedresources.yaml b/helm/temporal-worker-controller/crds/temporal.io_temporalworkerownedresources.yaml new file mode 100644 index 00000000..9b2da99c --- /dev/null +++ b/helm/temporal-worker-controller/crds/temporal.io_temporalworkerownedresources.yaml @@ -0,0 +1,86 @@ +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.19.0 + name: temporalworkerownedresources.temporal.io +spec: + group: temporal.io + names: + kind: TemporalWorkerOwnedResource + listKind: TemporalWorkerOwnedResourceList + plural: temporalworkerownedresources + shortNames: + - twor + singular: temporalworkerownedresource + scope: Namespaced + versions: + - additionalPrinterColumns: + - description: Referenced TemporalWorkerDeployment + jsonPath: .spec.workerRef.name + name: Worker + type: string + - description: Kind of owned resource + jsonPath: .spec.object.kind + name: Kind + type: string + - description: Age + jsonPath: .metadata.creationTimestamp + name: Age + type: date + name: v1alpha1 + schema: + openAPIV3Schema: + properties: + apiVersion: + type: string + kind: + type: string + metadata: + type: object + spec: + properties: + object: + type: object + x-kubernetes-preserve-unknown-fields: true + workerRef: + properties: + name: + pattern: ^[a-z0-9]([-a-z0-9]*[a-z0-9])?$ + type: string + required: + - name + type: object + required: + - object + - workerRef + type: object + status: + properties: + versions: + items: + properties: + applied: + type: boolean + buildID: + type: string + lastTransitionTime: + format: date-time + type: string + message: + type: string + resourceName: + type: string + required: + - applied + - buildID + - lastTransitionTime + type: object + type: array + type: object + type: object + served: true + storage: true + subresources: + status: {} diff --git a/helm/temporal-worker-controller/templates/certmanager.yaml b/helm/temporal-worker-controller/templates/certmanager.yaml index 3fa72673..7b3a7ddb 100644 --- a/helm/temporal-worker-controller/templates/certmanager.yaml +++ b/helm/temporal-worker-controller/templates/certmanager.yaml @@ -1,35 +1,32 @@ -# The following manifests contain a self-signed issuer CR and a certificate CR. -# More document can be found at https://docs.cert-manager.io -# WARNING: Targets CertManager v1.0. Check https://cert-manager.io/docs/installation/upgrading/ for breaking changes. {{- if .Values.certmanager.enabled }} -{{ fail "certmanager isn't supported yet" }} +# Self-signed issuer and certificate for the webhook server TLS. +# Requires cert-manager to be installed in the cluster. +# See https://cert-manager.io/docs/installation/ for installation instructions. apiVersion: cert-manager.io/v1 kind: Issuer metadata: + name: {{ .Release.Name }}-selfsigned-issuer + namespace: {{ .Release.Namespace }} labels: app.kubernetes.io/component: certificate - {{ include "temporal-worker-controller.labels" $ | indent 4 }} - name: selfsigned-issuer - namespace: {{ .Release.Namespace }} + {{- include "temporal-worker-controller.labels" . | nindent 4 }} spec: selfSigned: {} --- apiVersion: cert-manager.io/v1 kind: Certificate metadata: + name: {{ .Release.Name }}-serving-cert + namespace: {{ .Release.Namespace }} labels: app.kubernetes.io/component: certificate - {{ include "temporal-worker-controller.labels" $ | indent 4 }} - name: serving-cert # this name should match the one appeared in kustomizeconfig.yaml - namespace: {{ .Release.Namespace }} + {{- include "temporal-worker-controller.labels" . | nindent 4 }} spec: - # SERVICE_NAME and SERVICE_NAMESPACE will be substituted by kustomize TODO (carlydf): What should these be? dnsNames: - - SERVICE_NAME.SERVICE_NAMESPACE.svc - - SERVICE_NAME.SERVICE_NAMESPACE.svc.cluster.local + - {{ .Release.Name }}-webhook-service.{{ .Release.Namespace }}.svc + - {{ .Release.Name }}-webhook-service.{{ .Release.Namespace }}.svc.cluster.local issuerRef: kind: Issuer - name: selfsigned-issuer - secretName: webhook-server-cert # this secret will not be prefixed, since it's not managed by kustomize ---- -{{- end }} \ No newline at end of file + name: {{ .Release.Name }}-selfsigned-issuer + secretName: webhook-server-cert +{{- end }} diff --git a/helm/temporal-worker-controller/templates/manager.yaml b/helm/temporal-worker-controller/templates/manager.yaml index 50f8d7b1..ea182532 100644 --- a/helm/temporal-worker-controller/templates/manager.yaml +++ b/helm/temporal-worker-controller/templates/manager.yaml @@ -64,23 +64,37 @@ spec: value: "{{ .Release.Name }}/{{ .Release.Namespace }}" - name: CONTROLLER_VERSION value: "{{ .Values.image.tag | default .Chart.AppVersion }}" + # POD_NAMESPACE and SERVICE_ACCOUNT_NAME are populated automatically by + # Kubernetes from the pod's own metadata (namespace and service account name). + # The validating webhook reads them to perform SubjectAccessReview checks + # using the controller's identity. + - name: POD_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace + - name: SERVICE_ACCOUNT_NAME + valueFrom: + fieldRef: + fieldPath: spec.serviceAccountName + # BANNED_KINDS controls which resource kinds are rejected by the + # TemporalWorkerOwnedResource validating webhook. Configurable via + # ownedResourceConfig.bannedKinds in values.yaml. + - name: BANNED_KINDS + value: "{{ join "," .Values.ownedResourceConfig.bannedKinds }}" args: - --leader-elect {{- if .Values.metrics.enabled }} - "--metrics-bind-address=127.0.0.1:{{ .Values.metrics.port }}" {{- end }} - "--health-probe-bind-address=:8081" - {{- if .Values.webhook.enabled }} - {{ fail "webhooks aren't supported yet" }} ports: - - containerPort: 9443 + - containerPort: {{ .Values.webhook.port }} name: webhook-server protocol: TCP volumeMounts: - - mountPath: /tmp/k8s-webhook-server/serving-certs + - mountPath: {{ .Values.webhook.certDir }} name: cert readOnly: true - {{- end }} securityContext: allowPrivilegeEscalation: false capabilities: @@ -125,14 +139,11 @@ spec: cpu: 5m memory: 64Mi {{- end }} - {{- if .Values.webhook.enabled }} - {{ fail "webhooks aren't supported yet" }} volumes: - name: cert secret: defaultMode: 420 secretName: webhook-server-cert - {{- end }} serviceAccountName: {{ .Values.serviceAccount.name | default (printf "%s-service-account" .Release.Name) }} terminationGracePeriodSeconds: {{ .Values.terminationGracePeriodSeconds }} {{- with .Values.nodeSelector }} diff --git a/helm/temporal-worker-controller/templates/rbac.yaml b/helm/temporal-worker-controller/templates/rbac.yaml index 94b19079..7ce7a882 100644 --- a/helm/temporal-worker-controller/templates/rbac.yaml +++ b/helm/temporal-worker-controller/templates/rbac.yaml @@ -123,6 +123,61 @@ rules: - get - patch - update + - apiGroups: + - temporal.io + resources: + - temporalworkerownedresources + verbs: + - get + - list + - watch + - patch + - update + - apiGroups: + - temporal.io + resources: + - temporalworkerownedresources/status + verbs: + - get + - patch + - update + - apiGroups: + - authorization.k8s.io + resources: + - subjectaccessreviews + verbs: + - create + # Rules for managing resources attached via TemporalWorkerOwnedResource. + # Controlled by ownedResourceConfig.rbac in values.yaml. + {{- if .Values.ownedResourceConfig.rbac.wildcard }} + - apiGroups: + - "*" + resources: + - "*" + verbs: + - create + - delete + - get + - patch + - update + {{- else }} + {{- range .Values.ownedResourceConfig.rbac.rules }} + - apiGroups: + {{- range .apiGroups }} + - {{ . | quote }} + {{- end }} + resources: + {{- range .resources }} + - {{ . | quote }} + {{- end }} + verbs: + - create + - delete + - get + - patch + - update + {{- end }} + {{- end }} --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding @@ -244,4 +299,56 @@ rules: verbs: - get --- +# permissions for end users to edit temporalworkerownedresources. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + labels: + app.kubernetes.io/component: rbac + {{- include "temporal-worker-controller.labels" . | nindent 4 }} + name: {{ .Release.Name }}-{{ .Release.Namespace }}-temporalworkerownedresource-editor-role +rules: + - apiGroups: + - temporal.io + resources: + - temporalworkerownedresources + verbs: + - create + - delete + - get + - list + - patch + - update + - watch + - apiGroups: + - temporal.io + resources: + - temporalworkerownedresources/status + verbs: + - get +--- +# permissions for end users to view temporalworkerownedresources. +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + labels: + app.kubernetes.io/component: rbac + {{- include "temporal-worker-controller.labels" . | nindent 4 }} + name: {{ .Release.Name }}-{{ .Release.Namespace }}-temporalworkerownedresource-viewer-role +rules: + - apiGroups: + - temporal.io + resources: + - temporalworkerownedresources + verbs: + - get + - list + - watch + - apiGroups: + - temporal.io + resources: + - temporalworkerownedresources/status + verbs: + - get +--- {{- end }} \ No newline at end of file diff --git a/helm/temporal-worker-controller/templates/webhook.yaml b/helm/temporal-worker-controller/templates/webhook.yaml index ad15f073..8712975f 100644 --- a/helm/temporal-worker-controller/templates/webhook.yaml +++ b/helm/temporal-worker-controller/templates/webhook.yaml @@ -1,85 +1,84 @@ -{{- if .Values.webhook.enabled -}} -{{ fail "webhooks aren't supported yet" }} -kind: MutatingWebhookConfiguration +# Webhook Service — routes traffic from the k8s API server to the controller's webhook server. +apiVersion: v1 +kind: Service metadata: - name: {{ .Release.Name }}-mutating-webhook-configuration + name: {{ .Release.Name }}-webhook-service + namespace: {{ .Release.Namespace }} labels: app.kubernetes.io/component: webhook - {{ include "temporal-worker-controller.labels" $ | indent 4 }} - annotations: - {{- if .Values.certmanager.enabled }} - {{ fail "certmanager isn't supported yet" }} - cert-manager.io/inject-ca-from: CERTIFICATE_NAMESPACE/CERTIFICATE_NAME # TODO (carlydf): what should these be? - {{- end }} -webhooks: -- admissionReviewVersions: - - v1 - clientConfig: - service: - name: {{ .Release.Name }}-webhook-service - namespace: system - path: /mutate-temporal-io-temporal-io-v1alpha1-temporalworkerdeployment - failurePolicy: Fail - name: mtemporalworker.kb.io - rules: - - apiGroups: - - temporal.io.temporal.io - apiVersions: - - v1alpha1 - operations: - - CREATE - - UPDATE - resources: - - temporalworkers - sideEffects: None + {{- include "temporal-worker-controller.labels" . | nindent 4 }} +spec: + ports: + - port: 443 + protocol: TCP + targetPort: {{ .Values.webhook.port }} + selector: + {{- include "temporal-worker-controller.selectorLabels" . | nindent 4 }} --- +# ValidatingWebhookConfiguration for TemporalWorkerOwnedResource. +# Always installed — this webhook is a required security control that validates +# all TemporalWorkerOwnedResource create/update/delete requests. apiVersion: admissionregistration.k8s.io/v1 kind: ValidatingWebhookConfiguration metadata: name: {{ .Release.Name }}-validating-webhook-configuration labels: app.kubernetes.io/component: webhook - {{ include "temporal-worker-controller.labels" $ | indent 4 }} + {{- include "temporal-worker-controller.labels" . | nindent 4 }} + {{- if .Values.certmanager.enabled }} annotations: - {{- if .Values.certmanager.enabled }} - {{ fail "certmanager isn't supported yet" }} - cert-manager.io/inject-ca-from: CERTIFICATE_NAMESPACE/CERTIFICATE_NAME # TODO (carlydf): what should these be? - {{- end }} + # cert-manager injects the CA bundle from the named Certificate into this field automatically. + cert-manager.io/inject-ca-from: {{ .Release.Namespace }}/{{ .Release.Name }}-serving-cert + {{- end }} webhooks: -- admissionReviewVersions: - - v1 - clientConfig: - service: - name: {{ .Release.Name }}-webhook-service - namespace: system - path: /mutate-temporal-io-temporal-io-v1alpha1-temporalworkerdeployment - failurePolicy: Fail - name: mtemporalworker.kb.io - rules: - - apiGroups: - - temporal.io.temporal.io - apiVersions: - - v1alpha1 - operations: - - CREATE - - UPDATE - resources: - - temporalworkers - sideEffects: None + - name: vtemporalworkerownedresource.kb.io + admissionReviewVersions: ["v1"] + clientConfig: + {{- if and (not .Values.certmanager.enabled) .Values.certmanager.caBundle }} + caBundle: {{ .Values.certmanager.caBundle }} + {{- end }} + service: + name: {{ .Release.Name }}-webhook-service + namespace: {{ .Release.Namespace }} + path: /validate-temporal-io-v1alpha1-temporalworkerownedresource + failurePolicy: Fail + rules: + - apiGroups: ["temporal.io"] + apiVersions: ["v1alpha1"] + operations: ["CREATE", "UPDATE", "DELETE"] + resources: ["temporalworkerownedresources"] + sideEffects: None +{{- if .Values.webhook.enabled }} --- -apiVersion: v1 -kind: Service +# ValidatingWebhookConfiguration for TemporalWorkerDeployment. +# Optional — controlled by webhook.enabled in values.yaml. +apiVersion: admissionregistration.k8s.io/v1 +kind: ValidatingWebhookConfiguration metadata: - name: {{ .Release.Name }}-webhook-service - namespace: {{ .Release.Namespace }} + name: {{ .Release.Name }}-twd-validating-webhook-configuration labels: app.kubernetes.io/component: webhook - {{ include "temporal-worker-controller.labels" $ | indent 4 }} -spec: - ports: - - port: 443 - protocol: TCP - targetPort: {{ .Values.webhook.port }} - selector: - {{ include "temporal-worker-controller.selectorLabels" $ | indent 4 }} -{{- end }} \ No newline at end of file + {{- include "temporal-worker-controller.labels" . | nindent 4 }} + {{- if .Values.certmanager.enabled }} + annotations: + cert-manager.io/inject-ca-from: {{ .Release.Namespace }}/{{ .Release.Name }}-serving-cert + {{- end }} +webhooks: + - name: vtemporalworkerdeployment.kb.io + admissionReviewVersions: ["v1"] + clientConfig: + {{- if and (not .Values.certmanager.enabled) .Values.certmanager.caBundle }} + caBundle: {{ .Values.certmanager.caBundle }} + {{- end }} + service: + name: {{ .Release.Name }}-webhook-service + namespace: {{ .Release.Namespace }} + path: /validate-temporal-io-temporal-io-v1alpha1-temporalworkerdeployment + failurePolicy: Fail + rules: + - apiGroups: ["temporal.io"] + apiVersions: ["v1alpha1"] + operations: ["CREATE", "UPDATE"] + resources: ["temporalworkerdeployments"] + sideEffects: None +{{- end }} diff --git a/helm/temporal-worker-controller/values.yaml b/helm/temporal-worker-controller/values.yaml index 765321dd..b37eda5e 100644 --- a/helm/temporal-worker-controller/values.yaml +++ b/helm/temporal-worker-controller/values.yaml @@ -75,15 +75,70 @@ metrics: namespace: create: false -# Not yet supported webhook: + # enabled controls the optional TemporalWorkerDeployment validating webhook. + # The TemporalWorkerOwnedResource validating webhook is always enabled and does + # not require this flag. enabled: false port: 9443 certDir: /tmp/k8s-webhook-server/serving-certs -# Not yet supported +# Configuration for TemporalWorkerOwnedResource objects. +ownedResourceConfig: + # bannedKinds is the list of Kubernetes resource kinds that are not permitted as + # TemporalWorkerOwnedResource objects. The default list blocks workload types + # (Deployment, StatefulSet, Job, Pod, CronJob) to prevent misuse of the mechanism + # to run arbitrary services alongside the worker. Override this list if you have a + # legitimate use case for one of these kinds. + bannedKinds: + - Deployment + - StatefulSet + - Job + - Pod + - CronJob + + # rbac controls which resource types the controller is permitted to manage on behalf + # of TemporalWorkerOwnedResource objects (i.e. the HPAs, PDBs, or custom CRDs that + # users attach to their versioned Deployments). + # + # The TemporalWorkerOwnedResource validating webhook enforces that requesting users + # already have permission to manage the embedded resource type, so the controller's + # permissions here act as the executor, not the gatekeeper. + rbac: + # wildcard: when true, grants the controller create/delete/get/patch/update on all + # resource types in all API groups. Convenient for development clusters or when + # users attach many different custom CRD types. Not recommended for production. + wildcard: false + + # rules: explicit list of resource types the controller may manage. Each entry + # follows standard RBAC rule syntax (apiGroups + resources). Ignored when + # wildcard is true. + # + # Add one entry per API group your TWOR objects will use. For example, to support + # KEDA ScaledObjects alongside the defaults, add: + # - apiGroups: ["keda.sh"] + # resources: ["scaledobjects"] + rules: + # HPA (autoscaling/v1 and autoscaling/v2 share the same API group) + - apiGroups: ["autoscaling"] + resources: ["horizontalpodautoscalers"] + # PodDisruptionBudget + - apiGroups: ["policy"] + resources: ["poddisruptionbudgets"] + certmanager: - enabled: false + # enabled controls creation of the cert-manager Issuer and Certificate used for + # webhook TLS. cert-manager must be installed in the cluster. + # See https://cert-manager.io/docs/installation/ + # Set to false only if you are providing your own TLS certificate (see caBundle below). + enabled: true + + # caBundle is only used when enabled: false (i.e. you are managing webhook TLS yourself). + # Set this to the base64-encoded PEM CA certificate that signed the TLS certificate in + # the "webhook-server-cert" Secret. The Kubernetes API server uses this to verify the + # webhook server's TLS certificate. + # Leave empty when enabled: true — cert-manager injects the CA bundle automatically. + caBundle: "" # Not yet supported prometheus: diff --git a/internal/controller/execplan.go b/internal/controller/execplan.go index df3c6769..edfeac9f 100644 --- a/internal/controller/execplan.go +++ b/internal/controller/execplan.go @@ -7,10 +7,13 @@ package controller import ( "context" "encoding/json" + "errors" "fmt" "time" "github.com/go-logr/logr" + temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" + "github.com/temporalio/temporal-worker-controller/internal/k8s" "github.com/temporalio/temporal-worker-controller/internal/temporal" enumspb "go.temporal.io/api/enums/v1" sdkclient "go.temporal.io/sdk/client" @@ -18,6 +21,7 @@ import ( appsv1 "k8s.io/api/apps/v1" autoscalingv1 "k8s.io/api/autoscaling/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -190,5 +194,90 @@ func (r *TemporalWorkerDeploymentReconciler) executePlan(ctx context.Context, l } } - return nil + // Patch any TWORs that are missing the owner reference to this TWD. + // Failures are logged but do not block the owned-resource apply step below — + // a TWOR may have been deleted between plan generation and execution, and + // applying resources is more important than setting owner references. + for _, ownerPatch := range p.EnsureTWOROwnerRefs { + if err := r.Patch(ctx, ownerPatch.Patched, client.MergeFrom(ownerPatch.Base)); err != nil { + l.Error(err, "failed to patch TWOR with controller reference", + "namespace", ownerPatch.Patched.Namespace, + "name", ownerPatch.Patched.Name, + ) + } + } + + // Apply owned resources via Server-Side Apply. + // Partial failure isolation: all resources are attempted even if some fail; + // errors are collected and returned together. + type tworKey struct{ namespace, name string } + type applyResult struct { + buildID string + resourceName string + err error + } + tworResults := make(map[tworKey][]applyResult) + + for _, apply := range p.ApplyOwnedResources { + l.Info("applying owned resource", + "name", apply.Resource.GetName(), + "kind", apply.Resource.GetKind(), + "fieldManager", apply.FieldManager, + ) + // client.Apply uses Server-Side Apply, which is a create-or-update operation: + // if the resource does not yet exist the API server creates it; if it already + // exists the API server merges only the fields owned by this field manager, + // leaving fields owned by other managers (e.g. the HPA controller) untouched. + // client.ForceOwnership allows this field manager to claim any fields that were + // previously owned by a different manager (e.g. after a field manager rename). + applyErr := r.Client.Patch( + ctx, + apply.Resource, + client.Apply, + client.ForceOwnership, + client.FieldOwner(apply.FieldManager), + ) + if applyErr != nil { + l.Error(applyErr, "unable to apply owned resource", + "name", apply.Resource.GetName(), + "kind", apply.Resource.GetKind(), + ) + } + key := tworKey{apply.TWORNamespace, apply.TWORName} + tworResults[key] = append(tworResults[key], applyResult{ + buildID: apply.BuildID, + resourceName: apply.Resource.GetName(), + err: applyErr, + }) + } + + // Write per-Build-ID status back to each TWOR. + // Done after all applies so a single failed apply does not prevent status + // updates for the other (TWOR, Build ID) pairs. + var applyErrs, statusErrs []error + for key, results := range tworResults { + versions := make([]temporaliov1alpha1.OwnedResourceVersionStatus, 0, len(results)) + for _, result := range results { + var msg string + if result.err != nil { + applyErrs = append(applyErrs, result.err) + msg = result.err.Error() + } + versions = append(versions, k8s.OwnedResourceVersionStatusForBuildID( + result.buildID, result.resourceName, result.err == nil, msg, + )) + } + + twor := &temporaliov1alpha1.TemporalWorkerOwnedResource{} + if err := r.Get(ctx, types.NamespacedName{Namespace: key.namespace, Name: key.name}, twor); err != nil { + statusErrs = append(statusErrs, fmt.Errorf("get TWOR %s/%s for status update: %w", key.namespace, key.name, err)) + continue + } + twor.Status.Versions = versions + if err := r.Status().Update(ctx, twor); err != nil { + statusErrs = append(statusErrs, fmt.Errorf("update status for TWOR %s/%s: %w", key.namespace, key.name, err)) + } + } + + return errors.Join(append(applyErrs, statusErrs...)...) } diff --git a/internal/controller/genplan.go b/internal/controller/genplan.go index 050158cc..68c25e02 100644 --- a/internal/controller/genplan.go +++ b/internal/controller/genplan.go @@ -16,6 +16,7 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" ) // plan holds the actions to execute during reconciliation @@ -38,6 +39,13 @@ type plan struct { // Build IDs of versions from which the controller should // remove IgnoreLastModifierKey from the version metadata RemoveIgnoreLastModifierBuilds []string + + // OwnedResources to apply via Server-Side Apply, one per (TWOR × Build ID) pair. + ApplyOwnedResources []planner.OwnedResourceApply + + // TWORs that need a controller owner reference added, as (base, patched) pairs + // ready for client.MergeFrom patching in executePlan. + EnsureTWOROwnerRefs []planner.TWOROwnerRefPatch } // startWorkflowConfig defines a workflow to be started @@ -128,6 +136,16 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan( RolloutStrategy: rolloutStrategy, } + // Fetch all TemporalWorkerOwnedResources that reference this TWD so that the planner + // can render one apply action per (TWOR × active Build ID) pair. + var tworList temporaliov1alpha1.TemporalWorkerOwnedResourceList + if err := r.List(ctx, &tworList, + client.InNamespace(w.Namespace), + client.MatchingFields{tworWorkerRefKey: w.Name}, + ); err != nil { + return nil, fmt.Errorf("unable to list TemporalWorkerOwnedResources: %w", err) + } + planResult, err := planner.GeneratePlan( l, k8sState, @@ -140,6 +158,9 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan( r.MaxDeploymentVersionsIneligibleForDeletion, gateInput, isGateInputSecret, + tworList.Items, + w.Name, + w.UID, ) if err != nil { return nil, fmt.Errorf("error generating plan: %w", err) @@ -154,6 +175,8 @@ func (r *TemporalWorkerDeploymentReconciler) generatePlan( plan.UpdateVersionConfig = planResult.VersionConfig plan.RemoveIgnoreLastModifierBuilds = planResult.RemoveIgnoreLastModifierBuilds + plan.ApplyOwnedResources = planResult.ApplyOwnedResources + plan.EnsureTWOROwnerRefs = planResult.EnsureTWOROwnerRefs // Convert test workflows for _, wf := range planResult.TestWorkflows { diff --git a/internal/controller/worker_controller.go b/internal/controller/worker_controller.go index d6471890..b54b77b2 100644 --- a/internal/controller/worker_controller.go +++ b/internal/controller/worker_controller.go @@ -36,6 +36,9 @@ const ( // TODO(jlegrone): add this everywhere deployOwnerKey = ".metadata.controller" buildIDLabel = "temporal.io/build-id" + + // tworWorkerRefKey is the field index key for TemporalWorkerOwnedResource by workerRef.name. + tworWorkerRefKey = ".spec.workerRef.name" ) // getAPIKeySecretName extracts the secret name from a SecretKeySelector @@ -274,11 +277,24 @@ func (r *TemporalWorkerDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) return err } + // Index TemporalWorkerOwnedResource by spec.workerRef.name for efficient listing. + if err := mgr.GetFieldIndexer().IndexField(context.Background(), &temporaliov1alpha1.TemporalWorkerOwnedResource{}, tworWorkerRefKey, func(rawObj client.Object) []string { + twor, ok := rawObj.(*temporaliov1alpha1.TemporalWorkerOwnedResource) + if !ok { + mgr.GetLogger().Error(fmt.Errorf("error indexing TemporalWorkerOwnedResources"), "could not convert raw object", rawObj) + return nil + } + return []string{twor.Spec.WorkerRef.Name} + }); err != nil { + return err + } + recoverPanic := !r.DisableRecoverPanic return ctrl.NewControllerManagedBy(mgr). For(&temporaliov1alpha1.TemporalWorkerDeployment{}). Owns(&appsv1.Deployment{}). Watches(&temporaliov1alpha1.TemporalConnection{}, handler.EnqueueRequestsFromMapFunc(r.findTWDsUsingConnection)). + Watches(&temporaliov1alpha1.TemporalWorkerOwnedResource{}, handler.EnqueueRequestsFromMapFunc(r.findTWDsForOwnedResource)). WithOptions(controller.Options{ MaxConcurrentReconciles: 100, RecoverPanic: &recoverPanic, @@ -286,6 +302,22 @@ func (r *TemporalWorkerDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) Complete(r) } +// findTWDsForOwnedResource maps a TemporalWorkerOwnedResource to the TWD reconcile request. +func (r *TemporalWorkerDeploymentReconciler) findTWDsForOwnedResource(ctx context.Context, twor client.Object) []reconcile.Request { + tworObj, ok := twor.(*temporaliov1alpha1.TemporalWorkerOwnedResource) + if !ok { + return nil + } + return []reconcile.Request{ + { + NamespacedName: types.NamespacedName{ + Name: tworObj.Spec.WorkerRef.Name, + Namespace: twor.GetNamespace(), + }, + }, + } +} + func (r *TemporalWorkerDeploymentReconciler) findTWDsUsingConnection(ctx context.Context, tc client.Object) []reconcile.Request { var requests []reconcile.Request diff --git a/internal/demo/README.md b/internal/demo/README.md index 44a5adf8..4ff81c50 100644 --- a/internal/demo/README.md +++ b/internal/demo/README.md @@ -10,6 +10,7 @@ This guide will help you set up and run the Temporal Worker Controller locally u - [kubectl](https://kubernetes.io/docs/tasks/tools/install-kubectl/) - Temporal Cloud account with API key or mTLS certificates - Understanding of [Worker Versioning concepts](https://docs.temporal.io/production-deployment/worker-deployments/worker-versioning) (Pinned and Auto-Upgrade versioning behaviors) +- **[cert-manager](https://cert-manager.io/docs/installation/)** — required for the `TemporalWorkerOwnedResource` validating webhook (TLS). Install it once into your Minikube cluster before deploying the controller (see step 3 below). > **Note**: This demo specifically showcases **Pinned** workflow behavior. All workflows in the demo will remain on the worker version where they started, demonstrating how the controller safely manages multiple worker versions simultaneously during deployments. @@ -63,6 +64,13 @@ This guide will help you set up and run the Temporal Worker Controller locally u - Note: Do not set both mTLS and API key for the same connection. If both present, the TemporalConnection Custom Resource Instance will not get installed in the k8s environment. +3. Install cert-manager into Minikube (required for the TWOR validating webhook): + ```bash + kubectl apply -f https://github.com/cert-manager/cert-manager/releases/latest/download/cert-manager.yaml + # Wait for cert-manager pods to be ready before continuing + kubectl wait --for=condition=Available deployment --all -n cert-manager --timeout=120s + ``` + 4. Build and deploy the Controller image to the local k8s cluster: ```bash skaffold run --profile worker-controller @@ -113,6 +121,34 @@ kubectl logs -n temporal-system deployments/temporal-worker-controller-manager - kubectl get twd ``` +### Testing TemporalWorkerOwnedResource (per-version HPA) + +`TemporalWorkerOwnedResource` lets you attach Kubernetes resources — HPAs, PodDisruptionBudgets, etc. — to each worker version with running workers. The controller creates one copy per worker version with a running Deployment and wires it to the correct Deployment automatically. + +The `TemporalWorkerOwnedResource` validating webhook enforces that you have permission to create the embedded resource type yourself, and it requires TLS (provided by cert-manager, installed in step 3 above). + +After deploying the helloworld worker (step 5), apply the example HPA: + +```bash +kubectl apply -f examples/twor-hpa.yaml +``` + +Watch the controller create an HPA for each worker version with running workers: + +```bash +# See TemporalWorkerOwnedResource status (Applied: true once the controller reconciles) +kubectl get temporalworkerownedresource + +# See the per-Build-ID HPAs +kubectl get hpa +``` + +You should see one HPA per worker version with running workers, with `scaleTargetRef` automatically pointing at the correct versioned Deployment. + +When you deploy a new worker version (e.g., step 8), the controller creates a new HPA for the new Build ID and keeps the old one until that versioned Deployment is deleted during the sunset process. + +See [docs/owned-resources.md](../../docs/owned-resources.md) for full documentation. + ### Cleanup To clean up the demo: diff --git a/internal/k8s/deployments.go b/internal/k8s/deployments.go index 3fd30c54..22af5526 100644 --- a/internal/k8s/deployments.go +++ b/internal/k8s/deployments.go @@ -206,10 +206,7 @@ func NewDeploymentWithOwnerRef( buildID string, connection temporaliov1alpha1.TemporalConnectionSpec, ) *appsv1.Deployment { - selectorLabels := map[string]string{ - twdNameLabel: TruncateString(CleanStringForDNS(objectMeta.GetName()), 63), - BuildIDLabel: TruncateString(buildID, 63), - } + selectorLabels := ComputeSelectorLabels(objectMeta.GetName(), buildID) // Set pod labels podLabels := make(map[string]string) diff --git a/internal/k8s/ownedresources.go b/internal/k8s/ownedresources.go new file mode 100644 index 00000000..c4f6962d --- /dev/null +++ b/internal/k8s/ownedresources.go @@ -0,0 +1,294 @@ +// Unless explicitly stated otherwise all files in this repository are licensed under the MIT License. +// +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2024 Datadog, Inc. + +package k8s + +import ( + "bytes" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "strings" + "text/template" + + temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" +) + +// OwnedResourceFieldManager returns the SSA field manager string for a given TWOR. +// +// The field manager identity has two requirements: +// 1. Stable across reconcile loops — the API server uses it to track which fields +// this controller "owns". Changing it abandons the old ownership records and +// causes spurious field conflicts until the old entries expire. +// 2. Unique per TWOR instance — if two different TWORs both render resources into +// the same namespace, their field managers must differ so each can own its own +// set of fields without conflicting with the other. +// +// Using "twc/{namespace}/{name}" satisfies both: it never changes for a given TWOR +// and is globally unique within the cluster (namespace+name is a unique identifier). +// The string is capped at 128 characters to stay within the Kubernetes API limit. +func OwnedResourceFieldManager(twor *temporaliov1alpha1.TemporalWorkerOwnedResource) string { + fm := "twc/" + twor.Namespace + "/" + twor.Name + return TruncateString(fm, 128) +} + +const ( + // ownedResourceMaxNameLen is the maximum length of a generated owned resource name. + // 47 is chosen to be safe for Deployment resources: a pod name is composed of + // "{deployment-name}-{rs-hash}-{pod-hash}" where the hashes add ~17 characters, + // and pod names must be ≤63 characters. Using 47 as the limit ensures that the + // generated names work even if the user un-bans Deployment as an owned resource + // kind, avoiding special-casing per resource type. + ownedResourceMaxNameLen = 47 + + // ownedResourceHashLen is the number of hex characters used for the uniqueness suffix. + // 8 hex chars = 4 bytes = 32 bits, giving negligible collision probability + // (< 1 in 10^7 even across thousands of resources in a cluster). + ownedResourceHashLen = 8 +) + +// ComputeOwnedResourceName generates a deterministic, DNS-safe name for the owned resource +// instance corresponding to a given (twdName, tworName, buildID) triple. +// +// The name has the form: +// +// {human-readable-prefix}-{8-char-hash} +// +// The 8-character hash is computed from the full untruncated triple BEFORE any length +// capping occurs. This guarantees that two different triples — including triples that +// differ only in the buildID — always produce different names, even if the human-readable +// prefix is truncated. The buildID is therefore always uniquely represented via the hash, +// regardless of how long twdName or tworName are. +func ComputeOwnedResourceName(twdName, tworName, buildID string) string { + // Hash the full triple first, before any truncation. + h := sha256.Sum256([]byte(twdName + tworName + buildID)) + hashSuffix := hex.EncodeToString(h[:ownedResourceHashLen/2]) // 4 bytes → 8 hex chars + + // Build the human-readable prefix and truncate so the total fits in maxLen. + // suffixLen = len("-") + ownedResourceHashLen + const suffixLen = 1 + ownedResourceHashLen + raw := CleanStringForDNS(twdName + ResourceNameSeparator + tworName + ResourceNameSeparator + buildID) + prefix := TruncateString(raw, ownedResourceMaxNameLen-suffixLen) + // Trim any trailing separator that results from truncating mid-segment. + prefix = strings.TrimRight(prefix, ResourceNameSeparator) + + return prefix + ResourceNameSeparator + hashSuffix +} + +// ComputeSelectorLabels returns the selector labels used by a versioned Deployment. +// These are the same labels set on the Deployment.Spec.Selector.MatchLabels. +func ComputeSelectorLabels(twdName, buildID string) map[string]string { + return map[string]string{ + twdNameLabel: TruncateString(CleanStringForDNS(twdName), 63), + BuildIDLabel: TruncateString(buildID, 63), + } +} + +// TemplateData holds the variables available in Go template expressions within spec.object. +type TemplateData struct { + // DeploymentName is the controller-generated versioned Deployment name. + DeploymentName string + // TemporalNamespace is the Temporal namespace the worker connects to. + TemporalNamespace string + // BuildID is the Build ID for this version. + BuildID string +} + +// RenderOwnedResource produces the Unstructured object to apply via SSA for a given +// TemporalWorkerOwnedResource and versioned Deployment. +// +// Processing order: +// 1. Unmarshal spec.object into an Unstructured +// 2. Auto-inject scaleTargetRef and matchLabels (Layer 1) +// 3. Render Go templates in all string values (Layer 2) +// 4. Set metadata (name, namespace, labels, owner reference) +func RenderOwnedResource( + twor *temporaliov1alpha1.TemporalWorkerOwnedResource, + deployment *appsv1.Deployment, + buildID string, + temporalNamespace string, +) (*unstructured.Unstructured, error) { + // Step 1: unmarshal the raw object + var raw map[string]interface{} + if err := json.Unmarshal(twor.Spec.Object.Raw, &raw); err != nil { + return nil, fmt.Errorf("failed to unmarshal spec.object: %w", err) + } + + data := TemplateData{ + DeploymentName: deployment.Name, + TemporalNamespace: temporalNamespace, + BuildID: buildID, + } + + selectorLabels := ComputeSelectorLabels(twor.Spec.WorkerRef.Name, buildID) + + // Step 2: auto-inject scaleTargetRef and matchLabels into spec subtree + if spec, ok := raw["spec"].(map[string]interface{}); ok { + autoInjectFields(spec, data.DeploymentName, selectorLabels) + } + + // Step 3: render Go templates in all string values + rendered, err := renderTemplateValues(raw, data) + if err != nil { + return nil, fmt.Errorf("failed to render templates: %w", err) + } + raw = rendered.(map[string]interface{}) + + // Step 4: set metadata + resourceName := ComputeOwnedResourceName(twor.Spec.WorkerRef.Name, twor.Name, buildID) + + meta, _ := raw["metadata"].(map[string]interface{}) + if meta == nil { + meta = make(map[string]interface{}) + } + meta["name"] = resourceName + meta["namespace"] = twor.Namespace + + // Merge labels + existingLabels, _ := meta["labels"].(map[string]interface{}) + if existingLabels == nil { + existingLabels = make(map[string]interface{}) + } + for k, v := range selectorLabels { + existingLabels[k] = v + } + meta["labels"] = existingLabels + + // Set owner reference pointing to the versioned Deployment so k8s GC cleans up + // the owned resource when the Deployment is deleted. + blockOwnerDeletion := true + isController := true + meta["ownerReferences"] = []interface{}{ + map[string]interface{}{ + "apiVersion": appsv1.SchemeGroupVersion.String(), + "kind": "Deployment", + "name": deployment.Name, + "uid": string(deployment.UID), + "blockOwnerDeletion": blockOwnerDeletion, + "controller": isController, + }, + } + + raw["metadata"] = meta + + obj := &unstructured.Unstructured{Object: raw} + return obj, nil +} + +// autoInjectFields recursively traverses obj and injects scaleTargetRef and matchLabels +// wherever the key is present with a null value. Users signal intent by writing +// `scaleTargetRef: null` or `matchLabels: null` in their spec.object. This covers Layer 1 +// auto-injection without the risk of adding unknown fields to resource types that don't use them. +func autoInjectFields(obj map[string]interface{}, deploymentName string, selectorLabels map[string]string) { + for k, v := range obj { + switch k { + case "scaleTargetRef": + // Inject only when the key is present but null (user opted in) + if v == nil { + obj[k] = map[string]interface{}{ + "apiVersion": appsv1.SchemeGroupVersion.String(), + "kind": "Deployment", + "name": deploymentName, + } + } + case "matchLabels": + // Inject only when the key is present but null (user opted in) + if v == nil { + labels := make(map[string]interface{}, len(selectorLabels)) + for lk, lv := range selectorLabels { + labels[lk] = lv + } + obj[k] = labels + } + default: + // Recurse into nested objects + if nested, ok := v.(map[string]interface{}); ok { + autoInjectFields(nested, deploymentName, selectorLabels) + } else if arr, ok := v.([]interface{}); ok { + for _, item := range arr { + if nestedItem, ok := item.(map[string]interface{}); ok { + autoInjectFields(nestedItem, deploymentName, selectorLabels) + } + } + } + } + } +} + +// renderTemplateValues recursively traverses a JSON-decoded value tree and renders +// Go template expressions in all string values. Returns the modified value. +func renderTemplateValues(v interface{}, data TemplateData) (interface{}, error) { + switch typed := v.(type) { + case string: + rendered, err := renderString(typed, data) + if err != nil { + return nil, err + } + return rendered, nil + case map[string]interface{}: + for k, val := range typed { + rendered, err := renderTemplateValues(val, data) + if err != nil { + return nil, fmt.Errorf("field %q: %w", k, err) + } + typed[k] = rendered + } + return typed, nil + case []interface{}: + for i, item := range typed { + rendered, err := renderTemplateValues(item, data) + if err != nil { + return nil, fmt.Errorf("index %d: %w", i, err) + } + typed[i] = rendered + } + return typed, nil + default: + // numbers, booleans, nil — pass through unchanged + return v, nil + } +} + +// renderString renders a single string as a Go template with the given data. +// Strings without template expressions are returned unchanged. +func renderString(s string, data TemplateData) (string, error) { + // Fast path: skip parsing if no template markers + if !containsTemplateMarker(s) { + return s, nil + } + tmpl, err := template.New("").Option("missingkey=error").Parse(s) + if err != nil { + return "", fmt.Errorf("invalid template %q: %w", s, err) + } + var buf bytes.Buffer + if err := tmpl.Execute(&buf, data); err != nil { + return "", fmt.Errorf("template execution failed for %q: %w", s, err) + } + return buf.String(), nil +} + +// containsTemplateMarker returns true if s contains "{{" indicating a Go template expression. +func containsTemplateMarker(s string) bool { + for i := 0; i < len(s)-1; i++ { + if s[i] == '{' && s[i+1] == '{' { + return true + } + } + return false +} + +// OwnedResourceVersionStatusForBuildID is a helper to build a status entry. +func OwnedResourceVersionStatusForBuildID(buildID, resourceName string, applied bool, message string) temporaliov1alpha1.OwnedResourceVersionStatus { + return temporaliov1alpha1.OwnedResourceVersionStatus{ + BuildID: buildID, + Applied: applied, + ResourceName: resourceName, + Message: message, + LastTransitionTime: metav1.Now(), + } +} diff --git a/internal/k8s/ownedresources_test.go b/internal/k8s/ownedresources_test.go new file mode 100644 index 00000000..44611b08 --- /dev/null +++ b/internal/k8s/ownedresources_test.go @@ -0,0 +1,305 @@ +// Unless explicitly stated otherwise all files in this repository are licensed under the MIT License. +// +// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2024 Datadog, Inc. + +package k8s + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + temporaliov1alpha1 "github.com/temporalio/temporal-worker-controller/api/v1alpha1" + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" +) + +// expectedOwnedResourceName replicates the naming logic for use in tests. +func expectedOwnedResourceName(twdName, tworName, buildID string) string { + h := sha256.Sum256([]byte(twdName + tworName + buildID)) + hashSuffix := hex.EncodeToString(h[:4]) + raw := CleanStringForDNS(twdName + "-" + tworName + "-" + buildID) + prefix := strings.TrimRight(TruncateString(raw, 47-9), "-") + return prefix + "-" + hashSuffix +} + +func TestComputeOwnedResourceName(t *testing.T) { + t.Run("short names produce human-readable result with hash suffix", func(t *testing.T) { + got := ComputeOwnedResourceName("my-worker", "my-hpa", "image-abc123") + // Should start with the human-readable prefix + assert.True(t, strings.HasPrefix(got, "my-worker-my-hpa-image-abc123-"), "got: %q", got) + // Should be ≤ 47 chars + assert.LessOrEqual(t, len(got), 47) + }) + + t.Run("special chars are cleaned for DNS", func(t *testing.T) { + got := ComputeOwnedResourceName("my_worker", "my/hpa", "image:latest") + assert.True(t, strings.HasPrefix(got, "my-worker-my-hpa-image-latest-"), "got: %q", got) + assert.LessOrEqual(t, len(got), 47) + }) + + t.Run("deterministic — same inputs always produce same name", func(t *testing.T) { + a := ComputeOwnedResourceName("w", "r", "b1") + b := ComputeOwnedResourceName("w", "r", "b1") + assert.Equal(t, a, b) + }) + + t.Run("different buildIDs always produce different names (hash suffix)", func(t *testing.T) { + // Even if the prefix would be identical after truncation, the hash must differ. + name1 := ComputeOwnedResourceName("my-worker", "my-hpa", "build-aaa") + name2 := ComputeOwnedResourceName("my-worker", "my-hpa", "build-bbb") + assert.NotEqual(t, name1, name2) + }) + + t.Run("very long names are still ≤ 47 chars and distinct per buildID", func(t *testing.T) { + longTWD := strings.Repeat("w", 63) + longTWOR := strings.Repeat("r", 253) // maximum k8s object name + buildID1 := "build-" + strings.Repeat("a", 57) + buildID2 := "build-" + strings.Repeat("b", 57) + + n1 := ComputeOwnedResourceName(longTWD, longTWOR, buildID1) + n2 := ComputeOwnedResourceName(longTWD, longTWOR, buildID2) + + assert.LessOrEqual(t, len(n1), 47, "name1 length: %d", len(n1)) + assert.LessOrEqual(t, len(n2), 47, "name2 length: %d", len(n2)) + assert.NotEqual(t, n1, n2, "names must differ even when prefix is fully truncated") + }) + + t.Run("name matches expected formula", func(t *testing.T) { + got := ComputeOwnedResourceName("my-worker", "my-hpa", "abc123") + assert.Equal(t, expectedOwnedResourceName("my-worker", "my-hpa", "abc123"), got) + }) +} + +func TestComputeSelectorLabels(t *testing.T) { + labels := ComputeSelectorLabels("my-worker", "abc-123") + assert.Equal(t, "my-worker", labels[twdNameLabel]) + assert.Equal(t, "abc-123", labels[BuildIDLabel]) +} + +func TestContainsTemplateMarker(t *testing.T) { + assert.True(t, containsTemplateMarker("hello {{ .DeploymentName }}")) + assert.False(t, containsTemplateMarker("hello world")) + assert.False(t, containsTemplateMarker("{ single brace }")) +} + +func TestRenderString(t *testing.T) { + data := TemplateData{ + DeploymentName: "my-worker-abc123", + TemporalNamespace: "my-temporal-ns", + BuildID: "abc123", + } + + tests := []struct { + input string + want string + }{ + {"plain string", "plain string"}, + {"{{ .DeploymentName }}", "my-worker-abc123"}, + {"{{ .TemporalNamespace }}", "my-temporal-ns"}, + {"{{ .BuildID }}", "abc123"}, + {"Monitor for build {{ .BuildID }}", "Monitor for build abc123"}, + {"{{ .DeploymentName }}.{{ .TemporalNamespace }}", "my-worker-abc123.my-temporal-ns"}, + } + for _, tc := range tests { + got, err := renderString(tc.input, data) + require.NoError(t, err) + assert.Equal(t, tc.want, got) + } +} + +func TestAutoInjectFields_ScaleTargetRef(t *testing.T) { + selectorLabels := map[string]string{ + BuildIDLabel: "abc123", + twdNameLabel: "my-worker", + } + + t.Run("does not inject scaleTargetRef when key is entirely absent", func(t *testing.T) { + spec := map[string]interface{}{ + "minReplicas": 1, + "maxReplicas": 5, + } + autoInjectFields(spec, "my-worker-abc123", selectorLabels) + _, hasKey := spec["scaleTargetRef"] + assert.False(t, hasKey, "scaleTargetRef should not be injected when absent (user must opt in with null)") + }) + + t.Run("injects scaleTargetRef when explicitly null (user opt-in)", func(t *testing.T) { + spec := map[string]interface{}{ + "scaleTargetRef": nil, + } + autoInjectFields(spec, "my-worker-abc123", selectorLabels) + ref, ok := spec["scaleTargetRef"].(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "my-worker-abc123", ref["name"]) + assert.Equal(t, "Deployment", ref["kind"]) + assert.Equal(t, appsv1.SchemeGroupVersion.String(), ref["apiVersion"]) + }) + + t.Run("does not overwrite existing scaleTargetRef", func(t *testing.T) { + spec := map[string]interface{}{ + "scaleTargetRef": map[string]interface{}{ + "name": "custom-deployment", + "kind": "Deployment", + }, + } + autoInjectFields(spec, "my-worker-abc123", selectorLabels) + ref := spec["scaleTargetRef"].(map[string]interface{}) + assert.Equal(t, "custom-deployment", ref["name"], "should not overwrite user-provided ref") + }) +} + +func TestAutoInjectFields_MatchLabels(t *testing.T) { + selectorLabels := map[string]string{ + BuildIDLabel: "abc123", + twdNameLabel: "my-worker", + } + + t.Run("does not inject matchLabels when key is absent", func(t *testing.T) { + spec := map[string]interface{}{ + "selector": map[string]interface{}{}, + } + autoInjectFields(spec, "my-worker-abc123", selectorLabels) + selector := spec["selector"].(map[string]interface{}) + _, hasKey := selector["matchLabels"] + assert.False(t, hasKey, "matchLabels should not be injected when absent (user must opt in with null)") + }) + + t.Run("injects matchLabels when explicitly null (user opt-in)", func(t *testing.T) { + spec := map[string]interface{}{ + "selector": map[string]interface{}{ + "matchLabels": nil, + }, + } + autoInjectFields(spec, "my-worker-abc123", selectorLabels) + selector := spec["selector"].(map[string]interface{}) + labels, ok := selector["matchLabels"].(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "abc123", labels[BuildIDLabel]) + assert.Equal(t, "my-worker", labels[twdNameLabel]) + }) + + t.Run("does not overwrite existing matchLabels", func(t *testing.T) { + spec := map[string]interface{}{ + "selector": map[string]interface{}{ + "matchLabels": map[string]interface{}{ + "custom": "label", + }, + }, + } + autoInjectFields(spec, "my-worker-abc123", selectorLabels) + selector := spec["selector"].(map[string]interface{}) + labels := selector["matchLabels"].(map[string]interface{}) + assert.Equal(t, "label", labels["custom"], "should not overwrite user-provided labels") + }) +} + +func TestRenderOwnedResource(t *testing.T) { + hpaSpec := map[string]interface{}{ + "apiVersion": "autoscaling/v2", + "kind": "HorizontalPodAutoscaler", + "spec": map[string]interface{}{ + "scaleTargetRef": nil, // opt in to auto-injection + "minReplicas": float64(2), + "maxReplicas": float64(10), + }, + } + rawBytes, err := json.Marshal(hpaSpec) + require.NoError(t, err) + + twor := &temporaliov1alpha1.TemporalWorkerOwnedResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-hpa", + Namespace: "default", + }, + Spec: temporaliov1alpha1.TemporalWorkerOwnedResourceSpec{ + WorkerRef: temporaliov1alpha1.WorkerDeploymentReference{ + Name: "my-worker", + }, + Object: runtime.RawExtension{Raw: rawBytes}, + }, + } + + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-worker-abc123", + Namespace: "default", + UID: types.UID("test-uid-123"), + }, + } + buildID := "abc123" + + obj, err := RenderOwnedResource(twor, deployment, buildID, "my-temporal-ns") + require.NoError(t, err) + + // Check metadata — name follows the hash-suffix formula + assert.Equal(t, expectedOwnedResourceName("my-worker", "my-hpa", "abc123"), obj.GetName()) + assert.Equal(t, "default", obj.GetNamespace()) + + // Check selector labels were added + labels := obj.GetLabels() + assert.Equal(t, "abc123", labels[BuildIDLabel]) + assert.Equal(t, "my-worker", labels[twdNameLabel]) + + // Check owner reference points to the Deployment + ownerRefs := obj.GetOwnerReferences() + require.Len(t, ownerRefs, 1) + assert.Equal(t, "my-worker-abc123", ownerRefs[0].Name) + assert.Equal(t, "Deployment", ownerRefs[0].Kind) + assert.Equal(t, types.UID("test-uid-123"), ownerRefs[0].UID) + + // Check scaleTargetRef was auto-injected + spec, ok := obj.Object["spec"].(map[string]interface{}) + require.True(t, ok) + ref, ok := spec["scaleTargetRef"].(map[string]interface{}) + require.True(t, ok, "scaleTargetRef should have been auto-injected") + assert.Equal(t, "my-worker-abc123", ref["name"]) +} + +func TestRenderOwnedResource_WithTemplates(t *testing.T) { + objSpec := map[string]interface{}{ + "apiVersion": "monitoring.example.com/v1", + "kind": "WorkloadMonitor", + "spec": map[string]interface{}{ + "targetWorkload": "{{ .DeploymentName }}", + "description": "Monitor for build {{ .BuildID }} in {{ .TemporalNamespace }}", + }, + } + rawBytes, err := json.Marshal(objSpec) + require.NoError(t, err) + + twor := &temporaliov1alpha1.TemporalWorkerOwnedResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-monitor", + Namespace: "production", + }, + Spec: temporaliov1alpha1.TemporalWorkerOwnedResourceSpec{ + WorkerRef: temporaliov1alpha1.WorkerDeploymentReference{ + Name: "my-worker", + }, + Object: runtime.RawExtension{Raw: rawBytes}, + }, + } + + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "my-worker-abc123", + Namespace: "production", + UID: types.UID("uid-abc"), + }, + } + + obj, err := RenderOwnedResource(twor, deployment, "abc123", "my-temporal-ns") + require.NoError(t, err) + + spec, ok := obj.Object["spec"].(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "my-worker-abc123", spec["targetWorkload"]) + assert.Equal(t, "Monitor for build abc123 in my-temporal-ns", spec["description"]) +} diff --git a/internal/planner/planner.go b/internal/planner/planner.go index 5410d359..2843aafc 100644 --- a/internal/planner/planner.go +++ b/internal/planner/planner.go @@ -16,8 +16,19 @@ import ( appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" ) +// OwnedResourceApply holds a rendered owned resource to apply via Server-Side Apply. +type OwnedResourceApply struct { + Resource *unstructured.Unstructured + FieldManager string + TWORName string + TWORNamespace string + BuildID string +} + // Plan holds the actions to execute during reconciliation type Plan struct { // Which actions to take @@ -30,6 +41,19 @@ type Plan struct { // Build IDs of versions from which the controller should // remove IgnoreLastModifierKey from the version metadata RemoveIgnoreLastModifierBuilds []string + // ApplyOwnedResources holds resources to apply via SSA, one per (TWOR × Build ID) pair. + ApplyOwnedResources []OwnedResourceApply + // EnsureTWOROwnerRefs holds (base, patched) pairs for TWORs that need a + // controller owner reference added, ready for client.MergeFrom patching. + EnsureTWOROwnerRefs []TWOROwnerRefPatch +} + +// TWOROwnerRefPatch holds a TWOR pair for a single merge-patch: +// Base is the unmodified object (used as the patch base), Patched has the +// controller owner reference already appended. +type TWOROwnerRefPatch struct { + Base *temporaliov1alpha1.TemporalWorkerOwnedResource + Patched *temporaliov1alpha1.TemporalWorkerOwnedResource } // VersionConfig defines version configuration for Temporal @@ -78,6 +102,9 @@ func GeneratePlan( maxVersionsIneligibleForDeletion int32, gateInput []byte, isGateInputSecret bool, + twors []temporaliov1alpha1.TemporalWorkerOwnedResource, + twdName string, + twdUID types.UID, ) (*Plan, error) { plan := &Plan{ ScaleDeployments: make(map[*corev1.ObjectReference]uint32), @@ -114,9 +141,88 @@ func GeneratePlan( // TODO(jlegrone): generate warnings/events on the TemporalWorkerDeployment resource when buildIDs are reachable // but have no corresponding Deployment. + plan.ApplyOwnedResources = getOwnedResourceApplies(l, twors, k8sState, spec.WorkerOptions.TemporalNamespace) + plan.EnsureTWOROwnerRefs = getTWOROwnerRefPatches(twors, twdName, twdUID) + return plan, nil } +// getOwnedResourceApplies renders one OwnedResourceApply for each (TWOR × active Build ID) pair. +// Pairs that fail to render are logged and skipped; they do not block the rest. +func getOwnedResourceApplies( + l logr.Logger, + twors []temporaliov1alpha1.TemporalWorkerOwnedResource, + k8sState *k8s.DeploymentState, + temporalNamespace string, +) []OwnedResourceApply { + var applies []OwnedResourceApply + for i := range twors { + twor := &twors[i] + if twor.Spec.Object.Raw == nil { + l.Info("skipping TemporalWorkerOwnedResource with empty spec.object", "name", twor.Name) + continue + } + for buildID, deployment := range k8sState.Deployments { + rendered, err := k8s.RenderOwnedResource(twor, deployment, buildID, temporalNamespace) + if err != nil { + l.Error(err, "failed to render TemporalWorkerOwnedResource", + "twor", twor.Name, + "buildID", buildID, + ) + continue + } + applies = append(applies, OwnedResourceApply{ + Resource: rendered, + FieldManager: k8s.OwnedResourceFieldManager(twor), + TWORName: twor.Name, + TWORNamespace: twor.Namespace, + BuildID: buildID, + }) + } + } + return applies +} + +// getTWOROwnerRefPatches returns (base, patched) pairs for each TWOR that does not +// yet have a controller owner reference pointing to the given TWD. The patched copy +// has the owner reference appended so that executePlan can apply a merge-patch to +// add it without a full Update. +func getTWOROwnerRefPatches( + twors []temporaliov1alpha1.TemporalWorkerOwnedResource, + twdName string, + twdUID types.UID, +) []TWOROwnerRefPatch { + isController := true + blockOwnerDeletion := true + ownerRef := metav1.OwnerReference{ + APIVersion: temporaliov1alpha1.GroupVersion.String(), + Kind: "TemporalWorkerDeployment", + Name: twdName, + UID: twdUID, + Controller: &isController, + BlockOwnerDeletion: &blockOwnerDeletion, + } + var patches []TWOROwnerRefPatch + for i := range twors { + twor := &twors[i] + // Skip if this TWD is already the controller owner. + alreadyOwned := false + for _, ref := range twor.OwnerReferences { + if ref.Controller != nil && *ref.Controller && ref.UID == twdUID { + alreadyOwned = true + break + } + } + if alreadyOwned { + continue + } + patched := twor.DeepCopy() + patched.OwnerReferences = append(patched.OwnerReferences, ownerRef) + patches = append(patches, TWOROwnerRefPatch{Base: twor, Patched: patched}) + } + return patches +} + // checkAndUpdateDeploymentConnectionSpec determines whether the Deployment for the given buildID is // out-of-date with respect to the provided TemporalConnectionSpec. If an update is required, it mutates // the existing Deployment in-place and returns a pointer to that Deployment. If no update is needed or diff --git a/internal/planner/planner_test.go b/internal/planner/planner_test.go index 9873c2b5..6c367b4a 100644 --- a/internal/planner/planner_test.go +++ b/internal/planner/planner_test.go @@ -6,6 +6,7 @@ package planner import ( "context" + "encoding/json" "slices" "testing" "time" @@ -22,6 +23,8 @@ import ( corev1 "k8s.io/api/core/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" ) func TestGeneratePlan(t *testing.T) { @@ -41,6 +44,8 @@ func TestGeneratePlan(t *testing.T) { expectConfigSetCurrent *bool // pointer so we can test nil expectConfigRampPercent *int32 // pointer so we can test nil, in percentage (0-100) maxVersionsIneligibleForDeletion *int32 // set by env if non-nil, else default 75 + twors []temporaliov1alpha1.TemporalWorkerOwnedResource + expectOwnedResourceApplies int }{ { name: "empty state creates new deployment", @@ -390,6 +395,65 @@ func TestGeneratePlan(t *testing.T) { }, expectUpdate: 1, }, + { + name: "one TWOR with two deployments produces two owned resource applies", + k8sState: &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{ + "build-a": createDeploymentWithUID("worker-build-a", "uid-a"), + "build-b": createDeploymentWithUID("worker-build-b", "uid-b"), + }, + DeploymentsByTime: []*appsv1.Deployment{}, + DeploymentRefs: map[string]*corev1.ObjectReference{}, + }, + status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "build-a", + Status: temporaliov1alpha1.VersionStatusCurrent, + Deployment: &corev1.ObjectReference{Name: "worker-build-a"}, + }, + }, + }, + spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{ + Replicas: func() *int32 { r := int32(1); return &r }(), + }, + state: &temporal.TemporalWorkerState{}, + config: &Config{ + RolloutStrategy: temporaliov1alpha1.RolloutStrategy{}, + }, + twors: []temporaliov1alpha1.TemporalWorkerOwnedResource{ + createTestTWOR("my-hpa", "my-worker"), + }, + expectScale: 1, + expectOwnedResourceApplies: 2, + }, + { + name: "no TWORs produces no owned resource applies", + k8sState: &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{ + "build-a": createDeploymentWithUID("worker-build-a", "uid-a"), + }, + DeploymentsByTime: []*appsv1.Deployment{}, + DeploymentRefs: map[string]*corev1.ObjectReference{}, + }, + status: &temporaliov1alpha1.TemporalWorkerDeploymentStatus{ + TargetVersion: temporaliov1alpha1.TargetWorkerDeploymentVersion{ + BaseWorkerDeploymentVersion: temporaliov1alpha1.BaseWorkerDeploymentVersion{ + BuildID: "build-a", + Status: temporaliov1alpha1.VersionStatusCurrent, + Deployment: &corev1.ObjectReference{Name: "worker-build-a"}, + }, + }, + }, + spec: &temporaliov1alpha1.TemporalWorkerDeploymentSpec{ + Replicas: func() *int32 { r := int32(1); return &r }(), + }, + state: &temporal.TemporalWorkerState{}, + config: &Config{RolloutStrategy: temporaliov1alpha1.RolloutStrategy{}}, + twors: nil, + expectScale: 1, + expectOwnedResourceApplies: 0, + }, } for _, tc := range testCases { @@ -402,7 +466,7 @@ func TestGeneratePlan(t *testing.T) { maxV = *tc.maxVersionsIneligibleForDeletion } - plan, err := GeneratePlan(logr.Discard(), tc.k8sState, tc.status, tc.spec, tc.state, createDefaultConnectionSpec(), tc.config, "test/namespace", maxV, nil, false) + plan, err := GeneratePlan(logr.Discard(), tc.k8sState, tc.status, tc.spec, tc.state, createDefaultConnectionSpec(), tc.config, "test/namespace", maxV, nil, false, tc.twors, "test-twd", types.UID("test-twd-uid")) require.NoError(t, err) assert.Equal(t, tc.expectDelete, len(plan.DeleteDeployments), "unexpected number of deletions") @@ -411,6 +475,7 @@ func TestGeneratePlan(t *testing.T) { assert.Equal(t, tc.expectUpdate, len(plan.UpdateDeployments), "unexpected number of updates") assert.Equal(t, tc.expectWorkflow, len(plan.TestWorkflows), "unexpected number of test workflows") assert.Equal(t, tc.expectConfig, plan.VersionConfig != nil, "unexpected version config presence") + assert.Equal(t, tc.expectOwnedResourceApplies, len(plan.ApplyOwnedResources), "unexpected number of owned resource applies") if tc.expectConfig { assert.NotNil(t, plan.VersionConfig, "expected version config") @@ -1928,7 +1993,7 @@ func TestComplexVersionStateScenarios(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - plan, err := GeneratePlan(logr.Discard(), tc.k8sState, tc.status, tc.spec, tc.state, createDefaultConnectionSpec(), tc.config, "test/namespace", defaults.MaxVersionsIneligibleForDeletion, nil, false) + plan, err := GeneratePlan(logr.Discard(), tc.k8sState, tc.status, tc.spec, tc.state, createDefaultConnectionSpec(), tc.config, "test/namespace", defaults.MaxVersionsIneligibleForDeletion, nil, false, nil, "test-twd", types.UID("test-twd-uid")) require.NoError(t, err) assert.Equal(t, tc.expectDeletes, len(plan.DeleteDeployments), "unexpected number of deletes") @@ -2825,3 +2890,401 @@ func TestResolveGateInput(t *testing.T) { }) } } + +func TestGetOwnedResourceApplies(t *testing.T) { + testCases := []struct { + name string + twors []temporaliov1alpha1.TemporalWorkerOwnedResource + k8sState *k8s.DeploymentState + expectCount int + }{ + { + name: "no TWORs produces no applies", + twors: nil, + k8sState: &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{ + "build-a": createDeploymentWithUID("worker-build-a", "uid-a"), + }, + }, + expectCount: 0, + }, + { + name: "no deployments produces no applies", + twors: []temporaliov1alpha1.TemporalWorkerOwnedResource{ + createTestTWOR("my-hpa", "my-worker"), + }, + k8sState: &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{}, + }, + expectCount: 0, + }, + { + name: "1 TWOR × 1 deployment produces 1 apply", + twors: []temporaliov1alpha1.TemporalWorkerOwnedResource{ + createTestTWOR("my-hpa", "my-worker"), + }, + k8sState: &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{ + "build-a": createDeploymentWithUID("worker-build-a", "uid-a"), + }, + }, + expectCount: 1, + }, + { + name: "1 TWOR × 2 deployments produces 2 applies", + twors: []temporaliov1alpha1.TemporalWorkerOwnedResource{ + createTestTWOR("my-hpa", "my-worker"), + }, + k8sState: &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{ + "build-a": createDeploymentWithUID("worker-build-a", "uid-a"), + "build-b": createDeploymentWithUID("worker-build-b", "uid-b"), + }, + }, + expectCount: 2, + }, + { + name: "2 TWORs × 1 deployment produces 2 applies", + twors: []temporaliov1alpha1.TemporalWorkerOwnedResource{ + createTestTWOR("my-hpa", "my-worker"), + createTestTWOR("my-pdb", "my-worker"), + }, + k8sState: &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{ + "build-a": createDeploymentWithUID("worker-build-a", "uid-a"), + }, + }, + expectCount: 2, + }, + { + name: "2 TWORs × 2 deployments produces 4 applies", + twors: []temporaliov1alpha1.TemporalWorkerOwnedResource{ + createTestTWOR("my-hpa", "my-worker"), + createTestTWOR("my-pdb", "my-worker"), + }, + k8sState: &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{ + "build-a": createDeploymentWithUID("worker-build-a", "uid-a"), + "build-b": createDeploymentWithUID("worker-build-b", "uid-b"), + }, + }, + expectCount: 4, + }, + { + name: "TWOR with nil Raw is skipped without blocking others", + twors: []temporaliov1alpha1.TemporalWorkerOwnedResource{ + { + ObjectMeta: metav1.ObjectMeta{Name: "nil-raw", Namespace: "default"}, + Spec: temporaliov1alpha1.TemporalWorkerOwnedResourceSpec{ + WorkerRef: temporaliov1alpha1.WorkerDeploymentReference{Name: "my-worker"}, + Object: runtime.RawExtension{Raw: nil}, + }, + }, + createTestTWOR("my-hpa", "my-worker"), + }, + k8sState: &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{ + "build-a": createDeploymentWithUID("worker-build-a", "uid-a"), + }, + }, + expectCount: 1, // only the valid TWOR + }, + { + name: "TWOR with invalid template is skipped without blocking others", + twors: []temporaliov1alpha1.TemporalWorkerOwnedResource{ + createTestTWORWithInvalidTemplate("bad-template", "my-worker"), + createTestTWOR("my-hpa", "my-worker"), + }, + k8sState: &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{ + "build-a": createDeploymentWithUID("worker-build-a", "uid-a"), + }, + }, + expectCount: 1, // only the valid TWOR + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + applies := getOwnedResourceApplies(logr.Discard(), tc.twors, tc.k8sState, "test-temporal-ns") + assert.Equal(t, tc.expectCount, len(applies), "unexpected number of owned resource applies") + }) + } +} + +func TestGetOwnedResourceApplies_ApplyContents(t *testing.T) { + twor := createTestTWOR("my-hpa", "my-worker") + deployment := createDeploymentWithUID("my-worker-build-abc", "uid-abc") + k8sState := &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{ + "build-abc": deployment, + }, + } + + applies := getOwnedResourceApplies(logr.Discard(), []temporaliov1alpha1.TemporalWorkerOwnedResource{twor}, k8sState, "test-temporal-ns") + require.Len(t, applies, 1) + + apply := applies[0] + + // Field manager must be the stable TWOR-scoped identifier + assert.Equal(t, k8s.OwnedResourceFieldManager(&twor), apply.FieldManager) + + // Resource kind and apiVersion must come from the template + assert.Equal(t, "HorizontalPodAutoscaler", apply.Resource.GetKind()) + assert.Equal(t, "autoscaling/v2", apply.Resource.GetAPIVersion()) + + // Resource must be owned by the versioned Deployment + ownerRefs := apply.Resource.GetOwnerReferences() + require.Len(t, ownerRefs, 1) + assert.Equal(t, deployment.Name, ownerRefs[0].Name) + assert.Equal(t, "Deployment", ownerRefs[0].Kind) + assert.Equal(t, types.UID("uid-abc"), ownerRefs[0].UID) + + // Resource name must be deterministic + assert.Equal(t, k8s.ComputeOwnedResourceName("my-worker", "my-hpa", "build-abc"), apply.Resource.GetName()) +} + +func TestGetOwnedResourceApplies_FieldManagerDistinctPerTWOR(t *testing.T) { + twor1 := createTestTWOR("my-hpa", "my-worker") + twor2 := createTestTWOR("my-pdb", "my-worker") + k8sState := &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{ + "build-a": createDeploymentWithUID("worker-build-a", "uid-a"), + }, + } + + applies := getOwnedResourceApplies(logr.Discard(), []temporaliov1alpha1.TemporalWorkerOwnedResource{twor1, twor2}, k8sState, "test-temporal-ns") + require.Len(t, applies, 2) + + fms := make(map[string]bool) + for _, a := range applies { + fms[a.FieldManager] = true + } + assert.Len(t, fms, 2, "each TWOR must produce a distinct field manager") +} + +// createTestTWOR builds a minimal valid TemporalWorkerOwnedResource for use in tests. +// The embedded object is a stub HPA with scaleTargetRef opted in for auto-injection. +func createTestTWOR(name, workerRefName string) temporaliov1alpha1.TemporalWorkerOwnedResource { + hpaSpec := map[string]interface{}{ + "apiVersion": "autoscaling/v2", + "kind": "HorizontalPodAutoscaler", + "spec": map[string]interface{}{ + "scaleTargetRef": nil, // opt in to auto-injection + "minReplicas": float64(1), + "maxReplicas": float64(5), + }, + } + raw, _ := json.Marshal(hpaSpec) + return temporaliov1alpha1.TemporalWorkerOwnedResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + }, + Spec: temporaliov1alpha1.TemporalWorkerOwnedResourceSpec{ + WorkerRef: temporaliov1alpha1.WorkerDeploymentReference{Name: workerRefName}, + Object: runtime.RawExtension{Raw: raw}, + }, + } +} + +// createDeploymentWithUID builds a Deployment with the given name and UID, with the default +// connection spec hash annotation pre-set so it does not trigger an update during plan generation. +func createDeploymentWithUID(name, uid string) *appsv1.Deployment { + return &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + UID: types.UID(uid), + }, + Spec: appsv1.DeploymentSpec{ + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + k8s.ConnectionSpecHashAnnotation: k8s.ComputeConnectionSpecHash(createDefaultConnectionSpec()), + }, + }, + }, + }, + } +} + +func TestGetOwnedResourceApplies_MatchLabelsInjection(t *testing.T) { + // PDB with matchLabels opted in for auto-injection via null sentinel. + pdbSpec := map[string]interface{}{ + "apiVersion": "policy/v1", + "kind": "PodDisruptionBudget", + "spec": map[string]interface{}{ + "selector": map[string]interface{}{ + "matchLabels": nil, // null = opt in to auto-injection + }, + "minAvailable": float64(1), + }, + } + raw, _ := json.Marshal(pdbSpec) + twor := temporaliov1alpha1.TemporalWorkerOwnedResource{ + ObjectMeta: metav1.ObjectMeta{Name: "my-pdb", Namespace: "default"}, + Spec: temporaliov1alpha1.TemporalWorkerOwnedResourceSpec{ + WorkerRef: temporaliov1alpha1.WorkerDeploymentReference{Name: "my-worker"}, + Object: runtime.RawExtension{Raw: raw}, + }, + } + + deployment := createDeploymentWithUID("my-worker-build-abc", "uid-abc") + k8sState := &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{"build-abc": deployment}, + } + + applies := getOwnedResourceApplies(logr.Discard(), []temporaliov1alpha1.TemporalWorkerOwnedResource{twor}, k8sState, "test-temporal-ns") + require.Len(t, applies, 1) + + spec, ok := applies[0].Resource.Object["spec"].(map[string]interface{}) + require.True(t, ok) + selector, ok := spec["selector"].(map[string]interface{}) + require.True(t, ok) + matchLabels, ok := selector["matchLabels"].(map[string]interface{}) + require.True(t, ok, "matchLabels should have been auto-injected") + + // The injected labels must equal ComputeSelectorLabels(workerRef, buildID). + expected := k8s.ComputeSelectorLabels("my-worker", "build-abc") + for k, v := range expected { + assert.Equal(t, v, matchLabels[k], "injected matchLabels[%q]", k) + } + assert.Len(t, matchLabels, len(expected), "no extra keys should be injected") +} + +func TestGetOwnedResourceApplies_GoTemplateRendering(t *testing.T) { + // Arbitrary CRD that uses all three template variables. + objSpec := map[string]interface{}{ + "apiVersion": "monitoring.example.com/v1", + "kind": "WorkloadMonitor", + "spec": map[string]interface{}{ + "targetWorkload": "{{ .DeploymentName }}", + "versionLabel": "build-{{ .BuildID }}", + "temporalNamespace": "{{ .TemporalNamespace }}", + }, + } + raw, _ := json.Marshal(objSpec) + twor := temporaliov1alpha1.TemporalWorkerOwnedResource{ + ObjectMeta: metav1.ObjectMeta{Name: "my-monitor", Namespace: "k8s-production"}, + Spec: temporaliov1alpha1.TemporalWorkerOwnedResourceSpec{ + WorkerRef: temporaliov1alpha1.WorkerDeploymentReference{Name: "my-worker"}, + Object: runtime.RawExtension{Raw: raw}, + }, + } + + deployment := createDeploymentWithUID("my-worker-build-abc", "uid-abc") + k8sState := &k8s.DeploymentState{ + Deployments: map[string]*appsv1.Deployment{"build-abc": deployment}, + } + + applies := getOwnedResourceApplies(logr.Discard(), []temporaliov1alpha1.TemporalWorkerOwnedResource{twor}, k8sState, "temporal-production") + require.Len(t, applies, 1) + + spec, ok := applies[0].Resource.Object["spec"].(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "my-worker-build-abc", spec["targetWorkload"], ".DeploymentName not rendered") + assert.Equal(t, "build-build-abc", spec["versionLabel"], ".BuildID not rendered") + assert.Equal(t, "temporal-production", spec["temporalNamespace"], ".TemporalNamespace not rendered") +} + +// createTestTWORWithInvalidTemplate builds a TWOR whose spec.object contains a broken Go +// template expression, causing RenderOwnedResource to return an error. +func createTestTWORWithInvalidTemplate(name, workerRefName string) temporaliov1alpha1.TemporalWorkerOwnedResource { + badSpec := map[string]interface{}{ + "apiVersion": "autoscaling/v2", + "kind": "HorizontalPodAutoscaler", + "spec": map[string]interface{}{ + "description": "{{ .NonExistentField }}", // will fail with missingkey=error + }, + } + raw, _ := json.Marshal(badSpec) + return temporaliov1alpha1.TemporalWorkerOwnedResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + }, + Spec: temporaliov1alpha1.TemporalWorkerOwnedResourceSpec{ + WorkerRef: temporaliov1alpha1.WorkerDeploymentReference{Name: workerRefName}, + Object: runtime.RawExtension{Raw: raw}, + }, + } +} + +func TestGetTWOROwnerRefPatches(t *testing.T) { + const twdName = "my-worker" + const twdUID = types.UID("twd-uid-123") + + newTWOR := func(name string, ownerRefs ...metav1.OwnerReference) temporaliov1alpha1.TemporalWorkerOwnedResource { + return temporaliov1alpha1.TemporalWorkerOwnedResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + OwnerReferences: ownerRefs, + }, + } + } + + t.Run("all TWORs need owner ref", func(t *testing.T) { + twors := []temporaliov1alpha1.TemporalWorkerOwnedResource{ + newTWOR("twor-a"), + newTWOR("twor-b"), + } + patches := getTWOROwnerRefPatches(twors, twdName, twdUID) + require.Len(t, patches, 2) + + // Base should be unchanged + assert.Empty(t, patches[0].Base.OwnerReferences) + assert.Empty(t, patches[1].Base.OwnerReferences) + + // Patched should have the owner ref appended + require.Len(t, patches[0].Patched.OwnerReferences, 1) + assert.Equal(t, twdUID, patches[0].Patched.OwnerReferences[0].UID) + assert.Equal(t, true, *patches[0].Patched.OwnerReferences[0].Controller) + }) + + t.Run("TWOR already owned by this TWD is skipped", func(t *testing.T) { + twors := []temporaliov1alpha1.TemporalWorkerOwnedResource{ + newTWOR("already-owned", metav1.OwnerReference{ + APIVersion: "temporal.io/v1alpha1", Kind: "TemporalWorkerDeployment", + Name: twdName, UID: twdUID, Controller: func() *bool { b := true; return &b }(), + }), + newTWOR("needs-ref"), + } + patches := getTWOROwnerRefPatches(twors, twdName, twdUID) + require.Len(t, patches, 1) + assert.Equal(t, "needs-ref", patches[0].Patched.Name) + }) + + t.Run("TWOR with different controller owner still gets patched", func(t *testing.T) { + otherUID := types.UID("other-uid") + otherController := true + otherRef := metav1.OwnerReference{UID: otherUID, Controller: &otherController} + twors := []temporaliov1alpha1.TemporalWorkerOwnedResource{ + newTWOR("other-owner", otherRef), + } + patches := getTWOROwnerRefPatches(twors, twdName, twdUID) + // The other controller has a different UID so we still add our ref + require.Len(t, patches, 1) + require.Len(t, patches[0].Patched.OwnerReferences, 2) + }) + + t.Run("empty TWOR list returns nil", func(t *testing.T) { + patches := getTWOROwnerRefPatches(nil, twdName, twdUID) + assert.Nil(t, patches) + }) + + t.Run("non-controller owner ref with same UID does not skip", func(t *testing.T) { + notController := false + nonControllerRef := metav1.OwnerReference{ + UID: twdUID, + Controller: ¬Controller, + } + twors := []temporaliov1alpha1.TemporalWorkerOwnedResource{ + newTWOR("non-controller-ref", nonControllerRef), + } + patches := getTWOROwnerRefPatches(twors, twdName, twdUID) + // controller=false with matching UID should still get the controller ref added + require.Len(t, patches, 1) + }) +} diff --git a/internal/tests/internal/integration_test.go b/internal/tests/internal/integration_test.go index 953208c0..5b6dba66 100644 --- a/internal/tests/internal/integration_test.go +++ b/internal/tests/internal/integration_test.go @@ -13,6 +13,7 @@ import ( "go.temporal.io/server/temporal" "go.temporal.io/server/temporaltest" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" ) @@ -839,6 +840,108 @@ func TestIntegration(t *testing.T) { }) } + // TemporalWorkerOwnedResource integration test: + // Creates a TWOR with an HPA spec and verifies that the controller applies one HPA per active Build ID. + t.Run("twor-creates-hpa-per-build-id", func(t *testing.T) { + ctx := context.Background() + twdName := "twor-hpa-test" + tworName := "test-hpa" + + // Build the TWD using the existing builder (sets connection ref, temporal namespace, task queue). + tc := testhelpers.NewTestCase(). + WithInput( + testhelpers.NewTemporalWorkerDeploymentBuilder(). + WithAllAtOnceStrategy(). + WithTargetTemplate("v1.0"), + ). + WithExpectedStatus( + testhelpers.NewStatusBuilder(). + WithTargetVersion("v1.0", temporaliov1alpha1.VersionStatusCurrent, -1, true, false). + WithCurrentVersion("v1.0", true, false), + ). + BuildWithValues(twdName, testNamespace.Name, ts.GetDefaultNamespace()) + twd := tc.GetTWD() + + t.Log("Creating TemporalConnection") + temporalConnection := &temporaliov1alpha1.TemporalConnection{ + ObjectMeta: metav1.ObjectMeta{ + Name: twd.Spec.WorkerOptions.TemporalConnectionRef.Name, + Namespace: twd.Namespace, + }, + Spec: temporaliov1alpha1.TemporalConnectionSpec{ + HostPort: ts.GetFrontendHostPort(), + }, + } + if err := k8sClient.Create(ctx, temporalConnection); err != nil { + t.Fatalf("failed to create TemporalConnection: %v", err) + } + + env := testhelpers.TestEnv{ + K8sClient: k8sClient, + Mgr: mgr, + Ts: ts, + Connection: temporalConnection, + ExistingDeploymentReplicas: make(map[string]int32), + ExistingDeploymentImages: make(map[string]string), + ExpectedDeploymentReplicas: make(map[string]int32), + } + + t.Log("Creating TemporalWorkerDeployment") + if err := k8sClient.Create(ctx, twd); err != nil { + t.Fatalf("failed to create TemporalWorkerDeployment: %v", err) + } + + // Wait for the controller to create the versioned Deployment, then start workers + // and mark it healthy so that the reconciler sees it as an active Build ID. + waitForExpectedTargetDeployment(t, twd, env, 30*time.Second) + buildID := k8s.ComputeBuildID(twd) + depName := k8s.ComputeVersionedDeploymentName(twd.Name, buildID) + stopFuncs := applyDeployment(t, ctx, k8sClient, depName, testNamespace.Name) + defer handleStopFuncs(stopFuncs) + + // Wait for TWD status to reach Current before creating the TWOR, + // so that k8sState.Deployments already contains the active Build ID + // when the reconciler next runs. + verifyTemporalWorkerDeploymentStatusEventually(t, ctx, env, twd.Name, twd.Namespace, tc.GetExpectedStatus(), 30*time.Second, 5*time.Second) + + t.Log("Creating TemporalWorkerOwnedResource with HPA spec") + twor := &temporaliov1alpha1.TemporalWorkerOwnedResource{ + ObjectMeta: metav1.ObjectMeta{ + Name: tworName, + Namespace: testNamespace.Name, + }, + Spec: temporaliov1alpha1.TemporalWorkerOwnedResourceSpec{ + WorkerRef: temporaliov1alpha1.WorkerDeploymentReference{Name: twd.Name}, + // scaleTargetRef is set to null to trigger auto-injection by the controller. + Object: runtime.RawExtension{Raw: []byte(`{ + "apiVersion": "autoscaling/v2", + "kind": "HorizontalPodAutoscaler", + "spec": { + "scaleTargetRef": null, + "minReplicas": 2, + "maxReplicas": 5, + "metrics": [] + } + }`)}, + }, + } + if err := k8sClient.Create(ctx, twor); err != nil { + t.Fatalf("failed to create TemporalWorkerOwnedResource: %v", err) + } + + // Compute expected HPA name using the same function the controller uses. + hpaName := k8s.ComputeOwnedResourceName(twd.Name, tworName, buildID) + expectedDeploymentName := k8s.ComputeVersionedDeploymentName(twd.Name, buildID) + + // Poll until the HPA appears and verify scaleTargetRef was auto-injected. + waitForOwnedHPAWithInjectedScaleTargetRef(t, ctx, k8sClient, testNamespace.Name, hpaName, expectedDeploymentName, 30*time.Second) + + // Poll until TWOR.Status.Versions shows Applied: true for the build ID. + waitForTWORStatusApplied(t, ctx, k8sClient, testNamespace.Name, tworName, buildID, 30*time.Second) + + // Assert that the TWOR has the TWD as a controller owner reference. + assertTWORControllerOwnerRef(t, ctx, k8sClient, testNamespace.Name, tworName, twd.Name) + }) } // testTemporalWorkerDeploymentCreation tests the creation of a TemporalWorkerDeployment and waits for the expected status diff --git a/internal/tests/internal/validation_helpers.go b/internal/tests/internal/validation_helpers.go index 395e5474..18b510ee 100644 --- a/internal/tests/internal/validation_helpers.go +++ b/internal/tests/internal/validation_helpers.go @@ -13,8 +13,10 @@ import ( sdkclient "go.temporal.io/sdk/client" "go.temporal.io/sdk/worker" "go.temporal.io/server/temporaltest" + autoscalingv2 "k8s.io/api/autoscaling/v2" appsv1 "k8s.io/api/apps/v1" "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" ) var ( @@ -433,3 +435,78 @@ func eventually(t *testing.T, timeout, interval time.Duration, check func() erro t.Fatalf("eventually failed after %s: %v", timeout, lastErr) } } + +// waitForOwnedHPAWithInjectedScaleTargetRef polls until the named HPA exists in namespace and +// verifies that the controller auto-injected scaleTargetRef to point at expectedDeploymentName. +func waitForOwnedHPAWithInjectedScaleTargetRef( + t *testing.T, + ctx context.Context, + k8sClient client.Client, + namespace, hpaName, expectedDeploymentName string, + timeout time.Duration, +) { + t.Helper() + t.Logf("Waiting for HPA %q to be created in namespace %q", hpaName, namespace) + var hpa autoscalingv2.HorizontalPodAutoscaler + eventually(t, timeout, time.Second, func() error { + return k8sClient.Get(ctx, types.NamespacedName{Name: hpaName, Namespace: namespace}, &hpa) + }) + if hpa.Spec.ScaleTargetRef.Name != expectedDeploymentName { + t.Errorf("HPA scaleTargetRef.name = %q, want %q", hpa.Spec.ScaleTargetRef.Name, expectedDeploymentName) + } + if hpa.Spec.ScaleTargetRef.Kind != "Deployment" { + t.Errorf("HPA scaleTargetRef.kind = %q, want %q", hpa.Spec.ScaleTargetRef.Kind, "Deployment") + } + if hpa.Spec.ScaleTargetRef.APIVersion != "apps/v1" { + t.Errorf("HPA scaleTargetRef.apiVersion = %q, want %q", hpa.Spec.ScaleTargetRef.APIVersion, "apps/v1") + } + t.Logf("HPA scaleTargetRef correctly injected: %s/%s %s", + hpa.Spec.ScaleTargetRef.APIVersion, hpa.Spec.ScaleTargetRef.Kind, hpa.Spec.ScaleTargetRef.Name) +} + +// waitForTWORStatusApplied polls until TWOR.Status.Versions contains an entry for buildID with Applied: true. +func waitForTWORStatusApplied( + t *testing.T, + ctx context.Context, + k8sClient client.Client, + namespace, tworName, buildID string, + timeout time.Duration, +) { + t.Helper() + eventually(t, timeout, time.Second, func() error { + var twor temporaliov1alpha1.TemporalWorkerOwnedResource + if err := k8sClient.Get(ctx, types.NamespacedName{Name: tworName, Namespace: namespace}, &twor); err != nil { + return err + } + for _, v := range twor.Status.Versions { + if v.BuildID == buildID && v.Applied { + return nil + } + } + return fmt.Errorf("TWOR status not yet updated for build ID %q (current versions: %+v)", buildID, twor.Status.Versions) + }) + t.Log("TWOR status shows Applied: true for build ID") +} + +// assertTWORControllerOwnerRef asserts that the named TWOR has a controller owner reference +// pointing to the TemporalWorkerDeployment named twdName. +func assertTWORControllerOwnerRef( + t *testing.T, + ctx context.Context, + k8sClient client.Client, + namespace, tworName, twdName string, +) { + t.Helper() + var twor temporaliov1alpha1.TemporalWorkerOwnedResource + if err := k8sClient.Get(ctx, types.NamespacedName{Name: tworName, Namespace: namespace}, &twor); err != nil { + t.Fatalf("failed to re-fetch TWOR: %v", err) + } + for _, ref := range twor.OwnerReferences { + if ref.Kind == "TemporalWorkerDeployment" && ref.Name == twdName && ref.Controller != nil && *ref.Controller { + t.Logf("TWOR correctly has controller owner reference to TWD %q", twdName) + return + } + } + t.Errorf("TWOR %s/%s missing controller owner reference to TWD %s (refs: %+v)", + namespace, tworName, twdName, twor.OwnerReferences) +}