diff --git a/api/v1/prefectworkpool_types.go b/api/v1/prefectworkpool_types.go index dc9ecb7..bed019c 100644 --- a/api/v1/prefectworkpool_types.go +++ b/api/v1/prefectworkpool_types.go @@ -52,10 +52,17 @@ type PrefectWorkPoolSpec struct { // DeploymentLabels defines additional labels to add to the Prefect Server Deployment DeploymentLabels map[string]string `json:"deploymentLabels,omitempty"` + + // Base job template for flow runs on this Work Pool + BaseJobTemplate *RawValueSource `json:"baseJobTemplate,omitempty"` } // PrefectWorkPoolStatus defines the observed state of PrefectWorkPool type PrefectWorkPoolStatus struct { + // Id is the workPool ID from Prefect + // +optional + Id *string `json:"id,omitempty"` + // Version is the version of the Prefect Worker that is currently running Version string `json:"version"` @@ -65,6 +72,21 @@ type PrefectWorkPoolStatus struct { // Ready is true if the work pool is ready to accept work Ready bool `json:"ready"` + // SpecHash tracks changes to the spec to minimize API calls + SpecHash string `json:"specHash,omitempty"` + + // LastSyncTime is the last time the workPool was synced with Prefect + // +optional + LastSyncTime *metav1.Time `json:"lastSyncTime,omitempty"` + + // ObservedGeneration tracks the last processed generation + // +optional + ObservedGeneration int64 `json:"observedGeneration,omitempty"` + + // BaseJobTemplateVersion tracks the version of BaseJobTemplate ConfigMap, if any is defined + // +optional + BaseJobTemplateVersion string `json:"baseJobTemplateVersion,omitempty"` + // Conditions store the status conditions of the PrefectWorkPool instances Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,1,rep,name=conditions"` } diff --git a/api/v1/raw_value_source.go b/api/v1/raw_value_source.go new file mode 100644 index 0000000..5403f68 --- /dev/null +++ b/api/v1/raw_value_source.go @@ -0,0 +1,31 @@ +package v1 + +import ( + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +type RawValueSource struct { + Value *runtime.RawExtension `json:"value,omitempty"` + ConfigMap *corev1.ConfigMapKeySelector `json:"configMap,omitempty"` + Patches []JsonPatch `json:"patches,omitempty"` +} + +type JsonPatch struct { + Operation string `json:"op"` + Path string `json:"path"` + + // +kubebuilder:pruning:PreserveUnknownFields + // +kubebuilder:validation:Schemaless + Value *runtime.RawExtension `json:"value,omitempty"` +} + +// TODO - no admission webhook yet +func (spec *RawValueSource) Validate() error { + if spec.Value != nil && spec.ConfigMap != nil { + return fmt.Errorf("value and configMap are mutually exclusive") + } + return nil +} diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index 7d83036..e1d700a 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -68,6 +68,26 @@ func (in *EphemeralConfiguration) DeepCopy() *EphemeralConfiguration { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *JsonPatch) DeepCopyInto(out *JsonPatch) { + *out = *in + if in.Value != nil { + in, out := &in.Value, &out.Value + *out = new(runtime.RawExtension) + (*in).DeepCopyInto(*out) + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new JsonPatch. +func (in *JsonPatch) DeepCopy() *JsonPatch { + if in == nil { + return nil + } + out := new(JsonPatch) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PostgresConfiguration) DeepCopyInto(out *PostgresConfiguration) { *out = *in @@ -789,6 +809,11 @@ func (in *PrefectWorkPoolSpec) DeepCopyInto(out *PrefectWorkPoolSpec) { (*out)[key] = val } } + if in.BaseJobTemplate != nil { + in, out := &in.BaseJobTemplate, &out.BaseJobTemplate + *out = new(RawValueSource) + (*in).DeepCopyInto(*out) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PrefectWorkPoolSpec. @@ -804,6 +829,15 @@ func (in *PrefectWorkPoolSpec) DeepCopy() *PrefectWorkPoolSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PrefectWorkPoolStatus) DeepCopyInto(out *PrefectWorkPoolStatus) { *out = *in + if in.Id != nil { + in, out := &in.Id, &out.Id + *out = new(string) + **out = **in + } + if in.LastSyncTime != nil { + in, out := &in.LastSyncTime, &out.LastSyncTime + *out = (*in).DeepCopy() + } if in.Conditions != nil { in, out := &in.Conditions, &out.Conditions *out = make([]metav1.Condition, len(*in)) @@ -823,6 +857,38 @@ func (in *PrefectWorkPoolStatus) DeepCopy() *PrefectWorkPoolStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *RawValueSource) DeepCopyInto(out *RawValueSource) { + *out = *in + if in.Value != nil { + in, out := &in.Value, &out.Value + *out = new(runtime.RawExtension) + (*in).DeepCopyInto(*out) + } + if in.ConfigMap != nil { + in, out := &in.ConfigMap, &out.ConfigMap + *out = new(corev1.ConfigMapKeySelector) + (*in).DeepCopyInto(*out) + } + if in.Patches != nil { + in, out := &in.Patches, &out.Patches + *out = make([]JsonPatch, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new RawValueSource. +func (in *RawValueSource) DeepCopy() *RawValueSource { + if in == nil { + return nil + } + out := new(RawValueSource) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *RedisConfiguration) DeepCopyInto(out *RedisConfiguration) { *out = *in diff --git a/deploy/charts/prefect-operator/crds/prefect.io_prefectworkpools.yaml b/deploy/charts/prefect-operator/crds/prefect.io_prefectworkpools.yaml index a7f6856..3a58ae9 100644 --- a/deploy/charts/prefect-operator/crds/prefect.io_prefectworkpools.yaml +++ b/deploy/charts/prefect-operator/crds/prefect.io_prefectworkpools.yaml @@ -62,6 +62,50 @@ spec: spec: description: PrefectWorkPoolSpec defines the desired state of PrefectWorkPool properties: + baseJobTemplate: + description: Base job template for flow runs on this Work Pool + properties: + configMap: + description: Selects a key from a ConfigMap. + properties: + key: + description: The key to select. + type: string + name: + default: "" + description: |- + Name of the referent. + This field is effectively required, but due to backwards compatibility is + allowed to be empty. Instances of this type with an empty value here are + almost certainly wrong. + More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names + type: string + optional: + description: Specify whether the ConfigMap or its key must + be defined + type: boolean + required: + - key + type: object + x-kubernetes-map-type: atomic + patches: + items: + properties: + op: + type: string + path: + type: string + value: + x-kubernetes-preserve-unknown-fields: true + required: + - op + - path + type: object + type: array + value: + type: object + x-kubernetes-preserve-unknown-fields: true + type: object deploymentLabels: additionalProperties: type: string @@ -1991,6 +2035,10 @@ spec: status: description: PrefectWorkPoolStatus defines the observed state of PrefectWorkPool properties: + baseJobTemplateVersion: + description: BaseJobTemplateVersion tracks the version of BaseJobTemplate + ConfigMap, if any is defined + type: string conditions: description: Conditions store the status conditions of the PrefectWorkPool instances @@ -2049,6 +2097,18 @@ spec: - type type: object type: array + id: + description: Id is the workPool ID from Prefect + type: string + lastSyncTime: + description: LastSyncTime is the last time the workPool was synced + with Prefect + format: date-time + type: string + observedGeneration: + description: ObservedGeneration tracks the last processed generation + format: int64 + type: integer ready: description: Ready is true if the work pool is ready to accept work type: boolean @@ -2057,6 +2117,9 @@ spec: ready format: int32 type: integer + specHash: + description: SpecHash tracks changes to the spec to minimize API calls + type: string version: description: Version is the version of the Prefect Worker that is currently running diff --git a/go.mod b/go.mod index d051438..b9629cf 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.24.2 require ( dario.cat/mergo v1.0.2 + github.com/evanphx/json-patch v0.5.2 github.com/go-logr/logr v1.4.3 github.com/google/uuid v1.6.0 github.com/onsi/ginkgo/v2 v2.25.2 diff --git a/go.sum b/go.sum index 1e263e6..60d47b9 100644 --- a/go.sum +++ b/go.sum @@ -50,6 +50,7 @@ github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 h1:BHT72Gu3keYf3ZEu2J github.com/google/pprof v0.0.0-20250403155104-27863c87afa6/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= diff --git a/internal/controller/prefectworkpool_controller.go b/internal/controller/prefectworkpool_controller.go index 04efe95..987f6d8 100644 --- a/internal/controller/prefectworkpool_controller.go +++ b/internal/controller/prefectworkpool_controller.go @@ -18,28 +18,40 @@ package controller import ( "context" + "fmt" "time" "github.com/PrefectHQ/prefect-operator/internal/prefect" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" - ctrllog "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/reconcile" prefectiov1 "github.com/PrefectHQ/prefect-operator/api/v1" "github.com/PrefectHQ/prefect-operator/internal/conditions" "github.com/PrefectHQ/prefect-operator/internal/constants" + "github.com/PrefectHQ/prefect-operator/internal/utils" ) const ( // PrefectWorkPoolFinalizer is the finalizer used to ensure cleanup of Prefect work pools PrefectWorkPoolFinalizer = "prefect.io/work-pool-cleanup" + + // PrefectDeploymentConditionReady indicates the deployment is ready + PrefectWorkPoolConditionReady = "Ready" + + // PrefectDeploymentConditionSynced indicates the deployment is synced with Prefect API + PrefectWorkPoolConditionSynced = "Synced" ) // PrefectWorkPoolReconciler reconciles a PrefectWorkPool object @@ -59,7 +71,7 @@ type PrefectWorkPoolReconciler struct { // For more details, check Reconcile and its Result here: // - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.3/pkg/reconcile func (r *PrefectWorkPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - log := ctrllog.FromContext(ctx) + log := log.FromContext(ctx) log.V(1).Info("Reconciling PrefectWorkPool") var workPool prefectiov1.PrefectWorkPool @@ -70,6 +82,21 @@ func (r *PrefectWorkPoolReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{}, err } + currentStatus := workPool.Status + + // Defer a final status update at the end of the reconciliation loop, so that any of the + // individual reconciliation functions can update the status as they see fit. + defer func() { + // Skip status update if nothing changed, to avoid conflicts + if equality.Semantic.DeepEqual(workPool.Status, currentStatus) { + return + } + + if statusErr := r.Status().Update(ctx, &workPool); statusErr != nil { + log.Error(statusErr, "Failed to update WorkPool status") + } + }() + // Handle deletion if workPool.DeletionTimestamp != nil { return r.handleDeletion(ctx, &workPool) @@ -85,13 +112,29 @@ func (r *PrefectWorkPoolReconciler) Reconcile(ctx context.Context, req ctrl.Requ return ctrl.Result{RequeueAfter: time.Second}, nil } - // Defer a final status update at the end of the reconciliation loop, so that any of the - // individual reconciliation functions can update the status as they see fit. - defer func() { - if statusErr := r.Status().Update(ctx, &workPool); statusErr != nil { - log.Error(statusErr, "Failed to update WorkPool status") + var baseJobTemplateConfigMap corev1.ConfigMap + + if workPool.Spec.BaseJobTemplate != nil && workPool.Spec.BaseJobTemplate.ConfigMap != nil { + configMapRef := workPool.Spec.BaseJobTemplate.ConfigMap + err := r.Get(ctx, types.NamespacedName{Name: configMapRef.Name, Namespace: workPool.Namespace}, &baseJobTemplateConfigMap) + if err != nil { + return ctrl.Result{}, err } - }() + } + + specHash, err := utils.Hash(workPool.Spec, 16) + if err != nil { + log.Error(err, "Failed to calculate spec hash", "workPool", workPool.Name) + return ctrl.Result{}, err + } + + if r.needsSync(&workPool, specHash, &baseJobTemplateConfigMap) { + log.Info("Starting sync with Prefect API", "deployment", workPool.Name) + err := r.syncWithPrefect(ctx, &workPool, &baseJobTemplateConfigMap) + if err != nil { + return ctrl.Result{}, err + } + } objName := constants.Deployment @@ -187,12 +230,132 @@ func (r *PrefectWorkPoolReconciler) Reconcile(ctx context.Context, req ctrl.Requ } } + r.setCondition(&workPool, PrefectWorkPoolConditionReady, metav1.ConditionTrue, "WorkPoolReady", "Work pool is ready and operational") + return ctrl.Result{}, nil } +func (r *PrefectWorkPoolReconciler) needsSync(workPool *prefectiov1.PrefectWorkPool, currentSpecHash string, baseJobTemplateConfigMap *corev1.ConfigMap) bool { + if workPool.Status.Id == nil || *workPool.Status.Id == "" { + return true + } + + if workPool.Status.SpecHash != currentSpecHash { + return true + } + + if workPool.Status.ObservedGeneration < workPool.Generation { + return true + } + + if workPool.Status.BaseJobTemplateVersion != baseJobTemplateConfigMap.ResourceVersion { + return true + } + + // Drift detection: sync if last sync was too long ago + if workPool.Status.LastSyncTime == nil { + return true + } + + timeSinceLastSync := time.Since(workPool.Status.LastSyncTime.Time) + return timeSinceLastSync > 10*time.Minute +} + +func (r *PrefectWorkPoolReconciler) syncWithPrefect(ctx context.Context, workPool *prefectiov1.PrefectWorkPool, baseJobTemplateConfigMap *corev1.ConfigMap) error { + name := workPool.Name + log := log.FromContext(ctx) + + prefectClient, err := r.getPrefectClient(ctx, workPool) + if err != nil { + log.Error(err, "Failed to create Prefect client", "workPool", name) + return err + } + + prefectWorkPool, err := prefectClient.GetWorkPool(ctx, name) + if err != nil { + log.Error(err, "Failed to get work pool in Prefect", "workPool", name) + return err + } + + var baseJobTemplate []byte + + if baseJobTemplateConfigMap.Name != "" { + key := workPool.Spec.BaseJobTemplate.ConfigMap.Key + + baseJobTemplateJson, exists := baseJobTemplateConfigMap.Data[key] + if !exists { + return fmt.Errorf("can't find key %s in ConfigMap %s", key, baseJobTemplateConfigMap.Name) + } + + baseJobTemplate = []byte(baseJobTemplateJson) + } + + if prefectWorkPool == nil { + workPoolSpec, err := prefect.ConvertToWorkPoolSpec(ctx, workPool, baseJobTemplate, prefectClient) + if err != nil { + log.Error(err, "Failed to convert work pool spec", "workPool", name) + r.setCondition(workPool, PrefectWorkPoolConditionSynced, metav1.ConditionFalse, "ConversionError", err.Error()) + return err + } + + prefectWorkPool, err = prefectClient.CreateWorkPool(ctx, workPoolSpec) + if err != nil { + log.Error(err, "Failed to create work pool in Prefect", "workPool", name) + r.setCondition(workPool, PrefectWorkPoolConditionSynced, metav1.ConditionFalse, "SyncError", err.Error()) + return err + } + } else { + workPoolSpec, err := prefect.ConvertToWorkPoolUpdateSpec(ctx, workPool, baseJobTemplate, prefectClient) + if err != nil { + log.Error(err, "Failed to convert work pool spec", "workPool", name) + r.setCondition(workPool, PrefectWorkPoolConditionSynced, metav1.ConditionFalse, "ConversionError", err.Error()) + return err + } + + err = prefectClient.UpdateWorkPool(ctx, workPool.Name, workPoolSpec) + if err != nil { + log.Error(err, "Failed to update work pool in Prefect", "workPool", name) + r.setCondition(workPool, PrefectWorkPoolConditionSynced, metav1.ConditionFalse, "SyncError", err.Error()) + return err + } + } + + prefect.UpdateWorkPoolStatus(workPool, prefectWorkPool) + + specHash, err := utils.Hash(workPool.Spec, 16) + if err != nil { + log.Error(err, "Failed to calculate spec hash", "workPool", workPool.Name) + return err + } + + now := metav1.Now() + + workPool.Status.SpecHash = specHash + workPool.Status.ObservedGeneration = workPool.Generation + workPool.Status.LastSyncTime = &now + workPool.Status.BaseJobTemplateVersion = baseJobTemplateConfigMap.ResourceVersion + + r.setCondition(workPool, PrefectWorkPoolConditionSynced, metav1.ConditionTrue, "SyncSuccessful", "Work pool successfully synced with Prefect API") + + return nil +} + +// setCondition sets a condition on the deployment status +func (r *PrefectWorkPoolReconciler) setCondition(workPool *prefectiov1.PrefectWorkPool, conditionType string, status metav1.ConditionStatus, reason, message string) { + condition := metav1.Condition{ + Type: conditionType, + Status: status, + LastTransitionTime: metav1.Now(), + Reason: reason, + Message: message, + } + + meta.SetStatusCondition(&workPool.Status.Conditions, condition) +} + // handleDeletion handles the cleanup of a PrefectWorkPool that is being deleted func (r *PrefectWorkPoolReconciler) handleDeletion(ctx context.Context, workPool *prefectiov1.PrefectWorkPool) (ctrl.Result, error) { - log := ctrllog.FromContext(ctx) + log := log.FromContext(ctx) log.Info("Handling deletion of PrefectWorkPool", "workPool", workPool.Name) // If the finalizer is not present, nothing to do @@ -234,10 +397,62 @@ func (r *PrefectWorkPoolReconciler) handleDeletion(ctx context.Context, workPool return ctrl.Result{}, nil } +func (r *PrefectWorkPoolReconciler) getPrefectClient(ctx context.Context, workPool *prefectiov1.PrefectWorkPool) (prefect.PrefectClient, error) { + log := log.FromContext(ctx) + name := workPool.Name + + // Use injected client if available (for testing) + prefectClient := r.PrefectClient + + if prefectClient == nil { + var err error + prefectClient, err = prefect.NewClientFromK8s(ctx, &workPool.Spec.Server, r.Client, workPool.Namespace, log) + if err != nil { + log.Error(err, "Failed to create Prefect client", "workPool", name) + return nil, err + } + } + + return prefectClient, nil +} + +func (r *PrefectWorkPoolReconciler) mapConfigMapToWorkPools(ctx context.Context, obj client.Object) []reconcile.Request { + configMap, ok := obj.(*corev1.ConfigMap) + if !ok { + return nil + } + + workPools := &prefectiov1.PrefectWorkPoolList{} + if err := r.List(ctx, workPools, client.InNamespace(configMap.Namespace)); err != nil { + return nil + } + + if len(workPools.Items) == 0 { + return nil + } + + var requests []reconcile.Request + for _, workPool := range workPools.Items { + if workPool.Spec.BaseJobTemplate != nil && + workPool.Spec.BaseJobTemplate.ConfigMap != nil && + workPool.Spec.BaseJobTemplate.ConfigMap.Name == configMap.Name { + requests = append(requests, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Name: workPool.Name, + Namespace: workPool.Namespace, + }, + }) + } + } + + return requests +} + // SetupWithManager sets up the controller with the Manager. func (r *PrefectWorkPoolReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&prefectiov1.PrefectWorkPool{}). Owns(&appsv1.Deployment{}). + Watches(&corev1.ConfigMap{}, handler.EnqueueRequestsFromMapFunc(r.mapConfigMapToWorkPools)). Complete(r) } diff --git a/internal/controller/prefectworkpool_controller_test.go b/internal/controller/prefectworkpool_controller_test.go index 65eb591..65ec558 100644 --- a/internal/controller/prefectworkpool_controller_test.go +++ b/internal/controller/prefectworkpool_controller_test.go @@ -18,6 +18,7 @@ package controller import ( "context" + "encoding/json" "fmt" "time" @@ -29,6 +30,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/utils/ptr" @@ -42,13 +44,15 @@ import ( var _ = Describe("PrefectWorkPool Controller", func() { var ( - ctx context.Context - namespace *corev1.Namespace - namespaceName string - name types.NamespacedName - prefectWorkPool *prefectiov1.PrefectWorkPool - reconciler *PrefectWorkPoolReconciler - mockClient *prefect.MockClient + ctx context.Context + namespace *corev1.Namespace + namespaceName string + name types.NamespacedName + prefectWorkPool *prefectiov1.PrefectWorkPool + reconciler *PrefectWorkPoolReconciler + mockClient *prefect.MockClient + baseJobTemplate map[string]interface{} + baseJobTemplateJson []byte ) BeforeEach(func() { @@ -66,12 +70,22 @@ var _ = Describe("PrefectWorkPool Controller", func() { } Expect(k8sClient.Create(ctx, namespace)).To(Succeed()) + mockClient = prefect.NewMockClient() + reconciler = &PrefectWorkPoolReconciler{ Client: k8sClient, Scheme: k8sClient.Scheme(), PrefectClient: mockClient, } - mockClient = prefect.NewMockClient() + + baseJobTemplate = map[string]interface{}{ + "foo": "bar", + "baz": []interface{}{"qux", "quux"}, + } + + var err error + baseJobTemplateJson, err = json.Marshal(baseJobTemplate) + Expect(err).NotTo(HaveOccurred()) }) AfterEach(func() { @@ -187,8 +201,12 @@ var _ = Describe("PrefectWorkPool Controller", func() { "some": "additional-label", "another": "extra-label", }, + BaseJobTemplate: &prefectiov1.RawValueSource{}, }, } + }) + + JustBeforeEach(func() { Expect(k8sClient.Create(ctx, prefectWorkPool)).To(Succeed()) By("First reconciliation - adding finalizer") @@ -220,6 +238,112 @@ var _ = Describe("PrefectWorkPool Controller", func() { Expect(deploymentReconciled.Reason).To(Equal("DeploymentCreated")) Expect(deploymentReconciled.Message).To(Equal("Deployment was created")) }) + + It("should have the Synced condition", func() { + syncedCondition := meta.FindStatusCondition(prefectWorkPool.Status.Conditions, "Synced") + Expect(syncedCondition).NotTo(BeNil()) + Expect(syncedCondition.Status).To(Equal(metav1.ConditionTrue)) + Expect(syncedCondition.Reason).To(Equal("SyncSuccessful")) + }) + + It("should have the Ready condition", func() { + readyCondition := meta.FindStatusCondition(prefectWorkPool.Status.Conditions, "Ready") + Expect(readyCondition).NotTo(BeNil()) + Expect(readyCondition.Status).To(Equal(metav1.ConditionTrue)) + Expect(readyCondition.Reason).To(Equal("WorkPoolReady")) + }) + + It("should have the default base job template", func() { + workPool, err := mockClient.GetWorkPool(ctx, prefectWorkPool.Name) + Expect(err).NotTo(HaveOccurred()) + Expect(workPool.BaseJobTemplate).To(Equal(prefect.MockDefaultBaseJobTemplate)) + }) + + Context("with inline job template", func() { + BeforeEach(func() { + prefectWorkPool.Spec.BaseJobTemplate.Value = &runtime.RawExtension{ + Raw: baseJobTemplateJson, + } + }) + + It("should have the base job template defined in the work pool spec", func() { + workPool, err := mockClient.GetWorkPool(ctx, prefectWorkPool.Name) + Expect(err).NotTo(HaveOccurred()) + Expect(workPool.BaseJobTemplate).To(Equal(baseJobTemplate)) + }) + }) + + Context("with base job template ConfigMap", func() { + BeforeEach(func() { + configMap := &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: name.Name, + Namespace: name.Namespace, + }, + Data: map[string]string{ + "baseJobTemplate.json": string(baseJobTemplateJson), + }, + } + + Expect(k8sClient.Create(ctx, configMap)).To(Succeed()) + + prefectWorkPool.Spec.BaseJobTemplate.ConfigMap = &corev1.ConfigMapKeySelector{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: configMap.Name, + }, + Key: "baseJobTemplate.json", + } + }) + + It("should have the base job template defined in the ConfigMap", func() { + workPool, err := mockClient.GetWorkPool(ctx, prefectWorkPool.Name) + Expect(err).NotTo(HaveOccurred()) + Expect(workPool.BaseJobTemplate).To(Equal(baseJobTemplate)) + }) + + }) + + Context("with base job template patches", func() { + BeforeEach(func() { + raw, err := json.Marshal("xyzzy") + Expect(err).NotTo(HaveOccurred()) + + prefectWorkPool.Spec.BaseJobTemplate.Patches = []prefectiov1.JsonPatch{ + {Operation: "add", Path: "/plugh", Value: &runtime.RawExtension{ + Raw: raw, + }}, + } + }) + + It("should properly apply base job template patches", func() { + workPool, err := mockClient.GetWorkPool(ctx, prefectWorkPool.Name) + Expect(err).NotTo(HaveOccurred()) + patched := prefect.MockDefaultBaseJobTemplate + patched["plugh"] = "xyzzy" + Expect(workPool.BaseJobTemplate).To(Equal(patched)) + }) + }) + }) + + Context("with base job template patches", func() { + BeforeEach(func() { + raw, err := json.Marshal("xyzzy") + Expect(err).NotTo(HaveOccurred()) + + prefectWorkPool.Spec.BaseJobTemplate.Patches = []prefectiov1.JsonPatch{ + {Operation: "add", Path: "/plugh", Value: &runtime.RawExtension{ + Raw: raw, + }}, + } + }) + + It("should properly apply base job template patches", func() { + workPool, err := mockClient.GetWorkPool(ctx, prefectWorkPool.Name) + Expect(err).NotTo(HaveOccurred()) + patched := prefect.MockDefaultBaseJobTemplate + patched["plugh"] = "xyzzy" + Expect(workPool.BaseJobTemplate).To(Equal(patched)) + }) }) Describe("the Deployment", func() { @@ -396,6 +520,7 @@ var _ = Describe("PrefectWorkPool Controller", func() { Image: "extra-image", }, } + Expect(k8sClient.Update(ctx, prefectWorkPool)).To(Succeed()) // Reconcile again to update the work pool @@ -483,6 +608,19 @@ var _ = Describe("PrefectWorkPool Controller", func() { Expect(after.Generation).To(Equal(before.Generation)) Expect(after).To(Equal(before)) }) + + It("should not change the prefect work pool if nothing has changed", func() { + before, err := mockClient.GetWorkPool(ctx, prefectWorkPool.Name) + Expect(err).NotTo(HaveOccurred()) + + _, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: name}) + Expect(err).NotTo(HaveOccurred()) + + after, err := mockClient.GetWorkPool(ctx, prefectWorkPool.Name) + Expect(err).NotTo(HaveOccurred()) + + Expect(after).To(Equal(before)) + }) }) Context("WorkPool Status Updates", func() { @@ -906,4 +1044,39 @@ var _ = Describe("PrefectWorkPool Controller", func() { Expect(err.Error()).To(ContainSubstring("not found")) }) }) + + Context("When testing error scenarios", func() { + BeforeEach(func() { + prefectWorkPool = &prefectiov1.PrefectWorkPool{ + ObjectMeta: metav1.ObjectMeta{ + Name: name.Name, + Namespace: name.Namespace, + }, + Spec: prefectiov1.PrefectWorkPoolSpec{}, + } + }) + + It("should handle sync errors from mock client", func() { + By("Configuring mock client to fail") + mockClient.ShouldFailCreate = true + mockClient.FailureMessage = "simulated Prefect API error" + + By("Creating deployment") + Expect(k8sClient.Create(ctx, prefectWorkPool)).To(Succeed()) + + By("First reconcile - adding finalizer") + result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: name}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.RequeueAfter).To(Equal(time.Second)) + + By("Second reconcile should handle the error") + result, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: name}) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("simulated Prefect API error")) + Expect(result.RequeueAfter).To(Equal(time.Duration(0))) + + By("Resetting mock client") + mockClient.ShouldFailCreate = false + }) + }) }) diff --git a/internal/prefect/client.go b/internal/prefect/client.go index fe1db68..d5026ea 100644 --- a/internal/prefect/client.go +++ b/internal/prefect/client.go @@ -48,8 +48,16 @@ type PrefectClient interface { CreateOrGetFlow(ctx context.Context, flow *FlowSpec) (*Flow, error) // GetFlowByName retrieves a flow by name GetFlowByName(ctx context.Context, name string) (*Flow, error) + // CreateWorkPool creates a new work pool + CreateWorkPool(ctx context.Context, workPool *WorkPoolSpec) (*WorkPool, error) + // GetWorkPool retrieves a work pool by name + GetWorkPool(ctx context.Context, name string) (*WorkPool, error) + // UpdateWorkPool updates an existing work pool + UpdateWorkPool(ctx context.Context, name string, workPool *WorkPoolSpec) error // DeleteWorkPool deletes a work pool DeleteWorkPool(ctx context.Context, id string) error + // GetWorkerMetadata retrieves aggregate metadata for all worker types + GetWorkerMetadata(ctx context.Context) (map[string]WorkerMetadata, error) } // HTTPClient represents an HTTP client interface for testing @@ -216,6 +224,31 @@ type Flow struct { Labels map[string]string `json:"labels"` } +type WorkPoolSpec struct { + Name string `json:"name,omitempty"` + Description *string `json:"description,omitempty"` + Type string `json:"type,omitempty"` + BaseJobTemplate map[string]interface{} `json:"base_job_template,omitempty"` + IsPaused *bool `json:"is_paused,omitempty"` + ConcurrencyLimit *int `json:"concurrency_limit,omitempty"` + // StorageConfiguration map[string]interface{} `json:"storage_configuration,omitempty"` +} + +type WorkPool struct { + ID string `json:"id"` + Created time.Time `json:"created"` + Updated time.Time `json:"updated"` + Name string `json:"name"` + Type string `json:"type"` + Description *string `json:"description"` + BaseJobTemplate map[string]interface{} `json:"base_job_template"` + IsPaused bool `json:"is_paused"` + ConcurrencyLimit *int `json:"concurrency_limit"` + Status string `json:"status"` + DefaultQueueID *string `json:"default_queue_id"` + // StorageConfiguration map[string]interface{} `json:"storage_configuration,omitempty"` +} + // CreateOrUpdateDeployment creates or updates a deployment using the Prefect API func (c *Client) CreateOrUpdateDeployment(ctx context.Context, deployment *DeploymentSpec) (*Deployment, error) { url := fmt.Sprintf("%s/deployments/", c.BaseURL) @@ -493,6 +526,134 @@ func (c *Client) GetFlowByName(ctx context.Context, name string) (*Flow, error) return &result, nil } +// GetWorkPool retrieves a deployment by ID +func (c *Client) GetWorkPool(ctx context.Context, name string) (*WorkPool, error) { + url := fmt.Sprintf("%s/work_pools/%s", c.BaseURL, name) + c.log.V(1).Info("Getting work pool", "url", url, "name", name) + + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.APIKey)) + + resp, err := c.HTTPClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to make request: %w", err) + } + defer func() { + _ = resp.Body.Close() + }() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + + if resp.StatusCode == http.StatusNotFound { + return nil, nil + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) + } + + var result WorkPool + if err := json.Unmarshal(body, &result); err != nil { + return nil, fmt.Errorf("failed to unmarshal response: %w", err) + } + + c.log.V(1).Info("Work pool retrieved successfully", "workPool", result.ID) + return &result, nil +} + +func (c *Client) CreateWorkPool(ctx context.Context, workPool *WorkPoolSpec) (*WorkPool, error) { + url := fmt.Sprintf("%s/work_pools/", c.BaseURL) + c.log.V(1).Info("Creating or updating work pool", "url", url, "workPool", workPool.Name) + + jsonData, err := json.Marshal(workPool) + if err != nil { + return nil, fmt.Errorf("failed to marshal work pool: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(jsonData)) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.APIKey)) + + resp, err := c.HTTPClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to make request: %w", err) + } + defer func() { + _ = resp.Body.Close() + }() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + + if resp.StatusCode != http.StatusCreated { + return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) + } + + var result WorkPool + if err := json.Unmarshal(body, &result); err != nil { + return nil, fmt.Errorf("failed to unmarshal response: %w", err) + } + + c.log.V(1).Info("Work pool created successfully", "workPoolID", result.ID) + return &result, nil +} + +// UpdateWorkPool updates an existing work pool +func (c *Client) UpdateWorkPool(ctx context.Context, name string, workPool *WorkPoolSpec) error { + url := fmt.Sprintf("%s/work_pools/%s", c.BaseURL, name) + c.log.V(1).Info("Updating work pool", "url", url, "name", name) + + jsonData, err := json.Marshal(workPool) + if err != nil { + return fmt.Errorf("failed to marshal work pool updates: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, "PATCH", url, bytes.NewBuffer(jsonData)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + if c.APIKey != "" { + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.APIKey)) + } + + resp, err := c.HTTPClient.Do(req) + if err != nil { + return fmt.Errorf("failed to make request: %w", err) + } + defer func() { + if err := resp.Body.Close(); err != nil { + c.log.Error(err, "failed to close response body") + } + }() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("failed to read response body: %w", err) + } + + if resp.StatusCode != http.StatusNoContent { + return fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) + } + + c.log.V(1).Info("Work pool updated successfully", "name", workPool.Name) + return nil +} + // DeleteWorkPool deletes a work pool by ID func (c *Client) DeleteWorkPool(ctx context.Context, name string) error { url := fmt.Sprintf("%s/work_pools/%s", c.BaseURL, name) @@ -531,3 +692,65 @@ func (c *Client) isRunningInCluster() bool { _, err := rest.InClusterConfig() return err == nil } + +type WorkerMetadata struct { + Type string `json:"type"` + Description string `json:"description"` + DisplayName string `json:"display_name"` + DocumentationURL string `json:"documentation_url"` + InstallCommand string `json:"install_command"` + IsBeta bool `json:"is_beta"` + LogoURL string `json:"logo_url"` + DefaultBaseJobTemplate map[string]interface{} `json:"default_base_job_configuration"` +} + +// GetWorkerMetadata retrieves aggregate metadata for all worker types +func (c *Client) GetWorkerMetadata(ctx context.Context) (map[string]WorkerMetadata, error) { + url := fmt.Sprintf("%s/collections/views/aggregate-worker-metadata", c.BaseURL) + c.log.V(1).Info("Getting aggregate worker metadata", "url", url) + + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.APIKey)) + + resp, err := c.HTTPClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to make request: %w", err) + } + defer func() { + _ = resp.Body.Close() + }() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + + if resp.StatusCode == http.StatusNotFound { + return nil, nil + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) + } + + var result map[string]map[string]WorkerMetadata + if err := json.Unmarshal(body, &result); err != nil { + return nil, fmt.Errorf("failed to unmarshal response: %w", err) + } + + c.log.V(1).Info("Worker metadata retrieved successfully") + + metadata := map[string]WorkerMetadata{} + + for _, integration := range result { + for workerType, worker := range integration { + metadata[workerType] = worker + } + } + + return metadata, nil +} diff --git a/internal/prefect/convert.go b/internal/prefect/convert.go index 46689c6..0a01b16 100644 --- a/internal/prefect/convert.go +++ b/internal/prefect/convert.go @@ -23,6 +23,7 @@ import ( "time" prefectiov1 "github.com/PrefectHQ/prefect-operator/api/v1" + jsonpatch "github.com/evanphx/json-patch" ) // ConvertToDeploymentSpec converts a K8s PrefectDeployment to a Prefect API DeploymentSpec @@ -149,3 +150,83 @@ func GetFlowIDFromDeployment(ctx context.Context, client PrefectClient, k8sDeplo } return flow.ID, nil } + +// ConvertToWorkPoolSpec converts a K8s PrefectWorkPool to a Prefect API WorkPool +func ConvertToWorkPoolUpdateSpec(ctx context.Context, k8sWorkPool *prefectiov1.PrefectWorkPool, baseJobTemplate []byte, client PrefectClient) (*WorkPoolSpec, error) { + return convertToWorkPoolSpec(ctx, k8sWorkPool, baseJobTemplate, client, true) +} + +func ConvertToWorkPoolSpec(ctx context.Context, k8sWorkPool *prefectiov1.PrefectWorkPool, baseJobTemplate []byte, client PrefectClient) (*WorkPoolSpec, error) { + return convertToWorkPoolSpec(ctx, k8sWorkPool, baseJobTemplate, client, false) +} + +func convertToWorkPoolSpec(ctx context.Context, k8sWorkPool *prefectiov1.PrefectWorkPool, baseJobTemplate []byte, client PrefectClient, update bool) (*WorkPoolSpec, error) { + spec := &WorkPoolSpec{} + + workPool := k8sWorkPool.Spec + + if !update { + spec.Name = k8sWorkPool.Name + spec.Type = workPool.Type + } + + if baseJobTemplate == nil { + // if no template was passed, try to source it from the workpool spec + if workPool.BaseJobTemplate != nil && workPool.BaseJobTemplate.Value != nil { + baseJobTemplate = workPool.BaseJobTemplate.Value.Raw + } else { + // if no template was specified, retrieve the default template + metadata, err := client.GetWorkerMetadata(ctx) + + if err != nil { + return nil, fmt.Errorf("failed to retrieve worker metadata: %w", err) + } + + workPoolType := "kubernetes" + + if workPool.Type != "" { + workPoolType = workPool.Type + } + + worker, exists := metadata[workPoolType] + + if !exists { + return nil, fmt.Errorf("worker type not found in worker metadata: %s", workPool.Type) + } + + baseJobTemplate, err = json.Marshal(worker.DefaultBaseJobTemplate) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal base job template: %w", err) + } + } + } + + if workPool.BaseJobTemplate != nil && workPool.BaseJobTemplate.Patches != nil { + patchSource, err := json.Marshal(workPool.BaseJobTemplate.Patches) + if err != nil { + return nil, fmt.Errorf("can't marshal job template patches: %w", err) + } + + patch, err := jsonpatch.DecodePatch(patchSource) + if err != nil { + return nil, fmt.Errorf("can't decode RFC6902 patch: %s", err) + } + + baseJobTemplate, err = patch.Apply(baseJobTemplate) + if err != nil { + return nil, fmt.Errorf("job template patch failed: %w", err) + } + } + + if err := json.Unmarshal(baseJobTemplate, &spec.BaseJobTemplate); err != nil { + return nil, fmt.Errorf("failed to marshal patched job template: %w", err) + } + + return spec, nil +} + +// UpdateDeploymentStatus updates the K8s PrefectDeployment status from a Prefect API Deployment +func UpdateWorkPoolStatus(k8sWorkPool *prefectiov1.PrefectWorkPool, prefectWorkPool *WorkPool) { + k8sWorkPool.Status.Id = &prefectWorkPool.ID + k8sWorkPool.Status.Ready = prefectWorkPool.Status == "READY" +} diff --git a/internal/prefect/mock.go b/internal/prefect/mock.go index a7d97a3..2224ee8 100644 --- a/internal/prefect/mock.go +++ b/internal/prefect/mock.go @@ -30,6 +30,7 @@ type MockClient struct { mu sync.RWMutex deployments map[string]*Deployment flows map[string]*Flow + workPools map[string]*WorkPool // Test configuration ShouldFailCreate bool @@ -45,6 +46,7 @@ func NewMockClient() *MockClient { return &MockClient{ deployments: make(map[string]*Deployment), flows: make(map[string]*Flow), + workPools: make(map[string]*WorkPool), } } @@ -434,10 +436,111 @@ func (m *MockClient) copyFlow(f *Flow) *Flow { return © } -// DeleteWorkPool removes a work pool +func (m *MockClient) CreateWorkPool(ctx context.Context, workPool *WorkPoolSpec) (*WorkPool, error) { + if m.ShouldFailCreate { + return nil, fmt.Errorf("mock error: %s", m.FailureMessage) + } + + _, exists := m.workPools[workPool.Name] + if exists { + return nil, fmt.Errorf("work pool already exists: %s", workPool.Name) + } + + m.mu.Lock() + defer m.mu.Unlock() + + now := time.Now() + + newWorkPool := &WorkPool{ + ID: uuid.New().String(), + Created: now, + Updated: now, + Name: workPool.Name, + Type: workPool.Type, + Description: workPool.Description, + BaseJobTemplate: workPool.BaseJobTemplate, + IsPaused: workPool.IsPaused != nil && *workPool.IsPaused, + ConcurrencyLimit: workPool.ConcurrencyLimit, + Status: "READY", // Default status + DefaultQueueID: nil, + } + + if newWorkPool.BaseJobTemplate == nil { + newWorkPool.BaseJobTemplate = make(map[string]interface{}) + } + + m.workPools[newWorkPool.Name] = newWorkPool + return newWorkPool, nil +} + +func (m *MockClient) GetWorkPool(ctx context.Context, name string) (*WorkPool, error) { + if m.ShouldFailGet { + return nil, fmt.Errorf("mock error: %s", m.FailureMessage) + } + + m.mu.RLock() + defer m.mu.RUnlock() + + workPool, exists := m.workPools[name] + if !exists { + return nil, nil + } + + // Return a copy to avoid race conditions + return m.copyWorkPool(workPool), nil +} + +func (m *MockClient) UpdateWorkPool(ctx context.Context, name string, workPool *WorkPoolSpec) error { + if m.ShouldFailUpdate { + return fmt.Errorf("mock error: %s", m.FailureMessage) + } + + m.mu.Lock() + defer m.mu.Unlock() + + existing, ok := m.workPools[name] + if !ok { + return fmt.Errorf("work pool not found") + } + + existing.Updated = time.Now() + existing.Description = workPool.Description + existing.IsPaused = *workPool.IsPaused + existing.BaseJobTemplate = workPool.BaseJobTemplate + existing.ConcurrencyLimit = workPool.ConcurrencyLimit + + return nil +} + func (m *MockClient) DeleteWorkPool(ctx context.Context, name string) error { if m.ShouldFailDelete { return fmt.Errorf("mock error: %s", m.FailureMessage) } return nil } + +func (m *MockClient) copyWorkPool(w *WorkPool) *WorkPool { + copy := *w + + if w.BaseJobTemplate != nil { + copy.BaseJobTemplate = make(map[string]interface{}) + for k, v := range w.BaseJobTemplate { + copy.BaseJobTemplate[k] = v + } + } + + return © +} + +var MockDefaultBaseJobTemplate = map[string]interface{}{ + "foo": "bar", + "quux": true, + "boz": []interface{}{"baz", "bot", "biz"}, +} + +// TODO - implement when implementing unit tests +func (m *MockClient) GetWorkerMetadata(ctx context.Context) (map[string]WorkerMetadata, error) { + return map[string]WorkerMetadata{ + "kubernetes": {DefaultBaseJobTemplate: MockDefaultBaseJobTemplate}, + }, nil +}