Skip to content

Commit bf5aa12

Browse files
tinkerborgrmk-cimulate
authored andcommitted
add support for job template patches and configmap reference
1 parent ebed1ec commit bf5aa12

File tree

11 files changed

+389
-22
lines changed

11 files changed

+389
-22
lines changed

api/v1/prefectworkpool_types.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121

2222
corev1 "k8s.io/api/core/v1"
2323
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24-
"k8s.io/apimachinery/pkg/runtime"
2524
"k8s.io/apimachinery/pkg/util/intstr"
2625
)
2726

@@ -55,7 +54,7 @@ type PrefectWorkPoolSpec struct {
5554
DeploymentLabels map[string]string `json:"deploymentLabels,omitempty"`
5655

5756
// Base job template for flow runs on this Work Pool
58-
BaseJobTemplate *runtime.RawExtension `json:"baseJobTemplate,omitempty"`
57+
BaseJobTemplate *RawValueSource `json:"baseJobTemplate,omitempty"`
5958
}
6059

6160
// PrefectWorkPoolStatus defines the observed state of PrefectWorkPool
@@ -84,6 +83,10 @@ type PrefectWorkPoolStatus struct {
8483
// +optional
8584
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
8685

86+
// BaseJobTemplateVersion tracks the version of BaseJobTemplate ConfigMap, if any is defined
87+
// +optional
88+
BaseJobTemplateVersion string `json:"baseJobTemplateVersion,omitempty"`
89+
8790
// Conditions store the status conditions of the PrefectWorkPool instances
8891
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,1,rep,name=conditions"`
8992
}

api/v1/raw_value_source.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package v1
2+
3+
import (
4+
"fmt"
5+
6+
corev1 "k8s.io/api/core/v1"
7+
"k8s.io/apimachinery/pkg/runtime"
8+
)
9+
10+
type RawValueSource struct {
11+
Value *runtime.RawExtension `json:"value,omitempty"`
12+
ConfigMap *corev1.ConfigMapKeySelector `json:"configMap,omitempty"`
13+
Patches []JsonPatch `json:"patches,omitempty"`
14+
}
15+
16+
type JsonPatch struct {
17+
Operation string `json:"op"`
18+
Path string `json:"path"`
19+
20+
// +kubebuilder:pruning:PreserveUnknownFields
21+
// +kubebuilder:validation:Schemaless
22+
Value *runtime.RawExtension `json:"value,omitempty"`
23+
}
24+
25+
// TODO - no admission webhook yet
26+
func (spec *RawValueSource) Validate() error {
27+
if spec.Value != nil && spec.ConfigMap != nil {
28+
return fmt.Errorf("value and configMap are mutually exclusive")
29+
}
30+
return nil
31+
}

api/v1/zz_generated.deepcopy.go

Lines changed: 53 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

deploy/charts/prefect-operator/crds/prefect.io_prefectworkpools.yaml

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,51 @@ spec:
6363
description: PrefectWorkPoolSpec defines the desired state of PrefectWorkPool
6464
properties:
6565
baseJobTemplate:
66-
description: Base job template for flow runs on this Work Pool
66+
description: |-
67+
Base job template for flow runs on this Work Pool
68+
BaseJobTemplate *runtime.RawExtension `json:"baseJobTemplate,omitempty"`
69+
properties:
70+
configMap:
71+
description: Selects a key from a ConfigMap.
72+
properties:
73+
key:
74+
description: The key to select.
75+
type: string
76+
name:
77+
default: ""
78+
description: |-
79+
Name of the referent.
80+
This field is effectively required, but due to backwards compatibility is
81+
allowed to be empty. Instances of this type with an empty value here are
82+
almost certainly wrong.
83+
More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names
84+
type: string
85+
optional:
86+
description: Specify whether the ConfigMap or its key must
87+
be defined
88+
type: boolean
89+
required:
90+
- key
91+
type: object
92+
x-kubernetes-map-type: atomic
93+
patches:
94+
items:
95+
properties:
96+
op:
97+
type: string
98+
path:
99+
type: string
100+
value:
101+
x-kubernetes-preserve-unknown-fields: true
102+
required:
103+
- op
104+
- path
105+
type: object
106+
type: array
107+
value:
108+
type: object
109+
x-kubernetes-preserve-unknown-fields: true
67110
type: object
68-
x-kubernetes-preserve-unknown-fields: true
69111
deploymentLabels:
70112
additionalProperties:
71113
type: string
@@ -1995,6 +2037,10 @@ spec:
19952037
status:
19962038
description: PrefectWorkPoolStatus defines the observed state of PrefectWorkPool
19972039
properties:
2040+
baseJobTemplateVersion:
2041+
description: BaseJobTemplateVersion tracks the version of BaseJobTemplate
2042+
ConfigMap, if any is defined
2043+
type: string
19982044
conditions:
19992045
description: Conditions store the status conditions of the PrefectWorkPool
20002046
instances

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.24.2
44

55
require (
66
dario.cat/mergo v1.0.2
7+
github.com/evanphx/json-patch v0.5.2
78
github.com/go-logr/logr v1.4.3
89
github.com/google/uuid v1.6.0
910
github.com/onsi/ginkgo/v2 v2.25.2

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 h1:BHT72Gu3keYf3ZEu2J
5050
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA=
5151
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
5252
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
53+
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
5354
github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY=
5455
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
5556
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=

internal/controller/prefectworkpool_controller.go

Lines changed: 77 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package controller
1818

1919
import (
2020
"context"
21+
"fmt"
2122
"time"
2223

2324
"github.com/PrefectHQ/prefect-operator/internal/prefect"
@@ -28,10 +29,13 @@ import (
2829
"k8s.io/apimachinery/pkg/api/meta"
2930
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3031
"k8s.io/apimachinery/pkg/runtime"
32+
"k8s.io/apimachinery/pkg/types"
3133
ctrl "sigs.k8s.io/controller-runtime"
3234
"sigs.k8s.io/controller-runtime/pkg/client"
3335
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
36+
"sigs.k8s.io/controller-runtime/pkg/handler"
3437
"sigs.k8s.io/controller-runtime/pkg/log"
38+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
3539

3640
prefectiov1 "github.com/PrefectHQ/prefect-operator/api/v1"
3741
"github.com/PrefectHQ/prefect-operator/internal/conditions"
@@ -108,15 +112,25 @@ func (r *PrefectWorkPoolReconciler) Reconcile(ctx context.Context, req ctrl.Requ
108112
return ctrl.Result{RequeueAfter: time.Second}, nil
109113
}
110114

115+
var baseJobTemplateConfigMap corev1.ConfigMap
116+
117+
if workPool.Spec.BaseJobTemplate != nil && workPool.Spec.BaseJobTemplate.ConfigMap != nil {
118+
configMapRef := workPool.Spec.BaseJobTemplate.ConfigMap
119+
err := r.Get(ctx, types.NamespacedName{Name: configMapRef.Name, Namespace: workPool.Namespace}, &baseJobTemplateConfigMap)
120+
if err != nil {
121+
return ctrl.Result{}, err
122+
}
123+
}
124+
111125
specHash, err := utils.Hash(workPool.Spec, 16)
112126
if err != nil {
113127
log.Error(err, "Failed to calculate spec hash", "workPool", workPool.Name)
114128
return ctrl.Result{}, err
115129
}
116130

117-
if r.needsSync(&workPool, specHash) {
131+
if r.needsSync(&workPool, specHash, &baseJobTemplateConfigMap) {
118132
log.Info("Starting sync with Prefect API", "deployment", workPool.Name)
119-
err := r.syncWithPrefect(ctx, &workPool)
133+
err := r.syncWithPrefect(ctx, &workPool, &baseJobTemplateConfigMap)
120134
if err != nil {
121135
return ctrl.Result{}, err
122136
}
@@ -221,7 +235,7 @@ func (r *PrefectWorkPoolReconciler) Reconcile(ctx context.Context, req ctrl.Requ
221235
return ctrl.Result{}, nil
222236
}
223237

224-
func (r *PrefectWorkPoolReconciler) needsSync(workPool *prefectiov1.PrefectWorkPool, currentSpecHash string) bool {
238+
func (r *PrefectWorkPoolReconciler) needsSync(workPool *prefectiov1.PrefectWorkPool, currentSpecHash string, baseJobTemplateConfigMap *corev1.ConfigMap) bool {
225239
if workPool.Status.Id == nil || *workPool.Status.Id == "" {
226240
return true
227241
}
@@ -234,6 +248,10 @@ func (r *PrefectWorkPoolReconciler) needsSync(workPool *prefectiov1.PrefectWorkP
234248
return true
235249
}
236250

251+
if workPool.Status.BaseJobTemplateVersion != baseJobTemplateConfigMap.ResourceVersion {
252+
return true
253+
}
254+
237255
// Drift detection: sync if last sync was too long ago
238256
if workPool.Status.LastSyncTime == nil {
239257
return true
@@ -243,7 +261,7 @@ func (r *PrefectWorkPoolReconciler) needsSync(workPool *prefectiov1.PrefectWorkP
243261
return timeSinceLastSync > 10*time.Minute
244262
}
245263

246-
func (r *PrefectWorkPoolReconciler) syncWithPrefect(ctx context.Context, workPool *prefectiov1.PrefectWorkPool) error {
264+
func (r *PrefectWorkPoolReconciler) syncWithPrefect(ctx context.Context, workPool *prefectiov1.PrefectWorkPool, baseJobTemplateConfigMap *corev1.ConfigMap) error {
247265
name := workPool.Name
248266
log := log.FromContext(ctx)
249267

@@ -259,8 +277,21 @@ func (r *PrefectWorkPoolReconciler) syncWithPrefect(ctx context.Context, workPoo
259277
return err
260278
}
261279

280+
var baseJobTemplate []byte
281+
282+
if baseJobTemplateConfigMap.Name != "" {
283+
key := workPool.Spec.BaseJobTemplate.ConfigMap.Key
284+
285+
baseJobTemplateJson, exists := baseJobTemplateConfigMap.Data[key]
286+
if !exists {
287+
return fmt.Errorf("can't find key %s in ConfigMap %s", key, baseJobTemplateConfigMap.Name)
288+
}
289+
290+
baseJobTemplate = []byte(baseJobTemplateJson)
291+
}
292+
262293
if prefectWorkPool == nil {
263-
workPoolSpec, err := prefect.ConvertToWorkPoolSpec(workPool)
294+
workPoolSpec, err := prefect.ConvertToWorkPoolSpec(ctx, workPool, baseJobTemplate, prefectClient)
264295
if err != nil {
265296
log.Error(err, "Failed to convert work pool spec", "workPool", name)
266297
r.setCondition(workPool, PrefectWorkPoolConditionSynced, metav1.ConditionFalse, "ConversionError", err.Error())
@@ -274,7 +305,7 @@ func (r *PrefectWorkPoolReconciler) syncWithPrefect(ctx context.Context, workPoo
274305
return err
275306
}
276307
} else {
277-
workPoolSpec, err := prefect.ConvertToWorkPoolUpdateSpec(workPool)
308+
workPoolSpec, err := prefect.ConvertToWorkPoolUpdateSpec(ctx, workPool, baseJobTemplate, prefectClient)
278309
if err != nil {
279310
log.Error(err, "Failed to convert work pool spec", "workPool", name)
280311
r.setCondition(workPool, PrefectWorkPoolConditionSynced, metav1.ConditionFalse, "ConversionError", err.Error())
@@ -302,6 +333,7 @@ func (r *PrefectWorkPoolReconciler) syncWithPrefect(ctx context.Context, workPoo
302333
workPool.Status.SpecHash = specHash
303334
workPool.Status.ObservedGeneration = workPool.Generation
304335
workPool.Status.LastSyncTime = &now
336+
workPool.Status.BaseJobTemplateVersion = baseJobTemplateConfigMap.ResourceVersion
305337

306338
r.setCondition(workPool, PrefectWorkPoolConditionSynced, metav1.ConditionTrue, "SyncSuccessful", "Work pool successfully synced with Prefect API")
307339

@@ -384,10 +416,49 @@ func (r *PrefectWorkPoolReconciler) getPrefectClient(ctx context.Context, workPo
384416
return prefectClient, nil
385417
}
386418

419+
// func (r *PrefectWorkPoolReconciler) mapConfigMapToWorkPools() handler.MapFunc {
420+
// return func(ctx context.Context, obj handler.MapObject) []reconcile.Request {
421+
422+
// func (r *PrefectWorkPoolReconciler) mapConfigMapToWorkPools(obj client.Object) []reconcile.Request {
423+
func (r *PrefectWorkPoolReconciler) mapConfigMapToWorkPools(ctx context.Context, obj client.Object) []reconcile.Request {
424+
configMap, ok := obj.(*corev1.ConfigMap)
425+
if !ok {
426+
return nil
427+
}
428+
429+
// List all CRs in the ConfigMap's namespace
430+
workPools := &prefectiov1.PrefectWorkPoolList{}
431+
if err := r.List(ctx, workPools, client.InNamespace(configMap.Namespace)); err != nil {
432+
// Log error if needed
433+
return nil
434+
}
435+
436+
if len(workPools.Items) == 0 {
437+
return nil
438+
}
439+
440+
var requests []reconcile.Request
441+
for _, workPool := range workPools.Items {
442+
if workPool.Spec.BaseJobTemplate != nil &&
443+
workPool.Spec.BaseJobTemplate.ConfigMap != nil &&
444+
workPool.Spec.BaseJobTemplate.ConfigMap.Name == configMap.Name {
445+
requests = append(requests, reconcile.Request{
446+
NamespacedName: types.NamespacedName{
447+
Name: workPool.Name,
448+
Namespace: workPool.Namespace,
449+
},
450+
})
451+
}
452+
}
453+
454+
return requests
455+
}
456+
387457
// SetupWithManager sets up the controller with the Manager.
388458
func (r *PrefectWorkPoolReconciler) SetupWithManager(mgr ctrl.Manager) error {
389459
return ctrl.NewControllerManagedBy(mgr).
390460
For(&prefectiov1.PrefectWorkPool{}).
391461
Owns(&appsv1.Deployment{}).
462+
Watches(&corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Namespace: "prefect"}}, handler.EnqueueRequestsFromMapFunc(r.mapConfigMapToWorkPools)).
392463
Complete(r)
393464
}

0 commit comments

Comments
 (0)