Skip to content

Commit ae9a37d

Browse files
tinkerborgrmk-cimulate
authored andcommitted
Add support for managing base job templates
1 parent 4af1ca2 commit ae9a37d

File tree

7 files changed

+495
-11
lines changed

7 files changed

+495
-11
lines changed

api/v1/prefectworkpool_types.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
corev1 "k8s.io/api/core/v1"
2323
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24+
"k8s.io/apimachinery/pkg/runtime"
2425
"k8s.io/apimachinery/pkg/util/intstr"
2526
)
2627

@@ -52,10 +53,17 @@ type PrefectWorkPoolSpec struct {
5253

5354
// DeploymentLabels defines additional labels to add to the Prefect Server Deployment
5455
DeploymentLabels map[string]string `json:"deploymentLabels,omitempty"`
56+
57+
// Base job template for flow runs on this Work Pool
58+
BaseJobTemplate *runtime.RawExtension `json:"baseJobTemplate,omitempty"`
5559
}
5660

5761
// PrefectWorkPoolStatus defines the observed state of PrefectWorkPool
5862
type PrefectWorkPoolStatus struct {
63+
// Id is the workPool ID from Prefect
64+
// +optional
65+
Id *string `json:"id,omitempty"`
66+
5967
// Version is the version of the Prefect Worker that is currently running
6068
Version string `json:"version"`
6169

@@ -65,6 +73,17 @@ type PrefectWorkPoolStatus struct {
6573
// Ready is true if the work pool is ready to accept work
6674
Ready bool `json:"ready"`
6775

76+
// SpecHash tracks changes to the spec to minimize API calls
77+
SpecHash string `json:"specHash,omitempty"`
78+
79+
// LastSyncTime is the last time the workPool was synced with Prefect
80+
// +optional
81+
LastSyncTime *metav1.Time `json:"lastSyncTime,omitempty"`
82+
83+
// ObservedGeneration tracks the last processed generation
84+
// +optional
85+
ObservedGeneration int64 `json:"observedGeneration,omitempty"`
86+
6887
// Conditions store the status conditions of the PrefectWorkPool instances
6988
Conditions []metav1.Condition `json:"conditions,omitempty" patchStrategy:"merge" patchMergeKey:"type" protobuf:"bytes,1,rep,name=conditions"`
7089
}

api/v1/zz_generated.deepcopy.go

Lines changed: 14 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

deploy/charts/prefect-operator/crds/prefect.io_prefectworkpools.yaml

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ spec:
6262
spec:
6363
description: PrefectWorkPoolSpec defines the desired state of PrefectWorkPool
6464
properties:
65+
baseJobTemplate:
66+
description: Base job template for flow runs on this Work Pool
67+
type: object
68+
x-kubernetes-preserve-unknown-fields: true
6569
deploymentLabels:
6670
additionalProperties:
6771
type: string
@@ -2049,6 +2053,18 @@ spec:
20492053
- type
20502054
type: object
20512055
type: array
2056+
id:
2057+
description: Id is the workPool ID from Prefect
2058+
type: string
2059+
lastSyncTime:
2060+
description: LastSyncTime is the last time the workPool was synced
2061+
with Prefect
2062+
format: date-time
2063+
type: string
2064+
observedGeneration:
2065+
description: ObservedGeneration tracks the last processed generation
2066+
format: int64
2067+
type: integer
20522068
ready:
20532069
description: Ready is true if the work pool is ready to accept work
20542070
type: boolean
@@ -2057,6 +2073,9 @@ spec:
20572073
ready
20582074
format: int32
20592075
type: integer
2076+
specHash:
2077+
description: SpecHash tracks changes to the spec to minimize API calls
2078+
type: string
20602079
version:
20612080
description: Version is the version of the Prefect Worker that is
20622081
currently running

internal/controller/prefectworkpool_controller.go

Lines changed: 159 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,23 +23,31 @@ import (
2323
"github.com/PrefectHQ/prefect-operator/internal/prefect"
2424
appsv1 "k8s.io/api/apps/v1"
2525
corev1 "k8s.io/api/core/v1"
26+
"k8s.io/apimachinery/pkg/api/equality"
2627
"k8s.io/apimachinery/pkg/api/errors"
2728
"k8s.io/apimachinery/pkg/api/meta"
2829
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2930
"k8s.io/apimachinery/pkg/runtime"
3031
ctrl "sigs.k8s.io/controller-runtime"
3132
"sigs.k8s.io/controller-runtime/pkg/client"
3233
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
33-
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
34+
"sigs.k8s.io/controller-runtime/pkg/log"
3435

3536
prefectiov1 "github.com/PrefectHQ/prefect-operator/api/v1"
3637
"github.com/PrefectHQ/prefect-operator/internal/conditions"
3738
"github.com/PrefectHQ/prefect-operator/internal/constants"
39+
"github.com/PrefectHQ/prefect-operator/internal/utils"
3840
)
3941

4042
const (
4143
// PrefectWorkPoolFinalizer is the finalizer used to ensure cleanup of Prefect work pools
4244
PrefectWorkPoolFinalizer = "prefect.io/work-pool-cleanup"
45+
46+
// PrefectDeploymentConditionReady indicates the deployment is ready
47+
PrefectWorkPoolConditionReady = "Ready"
48+
49+
// PrefectDeploymentConditionSynced indicates the deployment is synced with Prefect API
50+
PrefectWorkPoolConditionSynced = "Synced"
4351
)
4452

4553
// PrefectWorkPoolReconciler reconciles a PrefectWorkPool object
@@ -59,7 +67,7 @@ type PrefectWorkPoolReconciler struct {
5967
// For more details, check Reconcile and its Result here:
6068
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.3/pkg/reconcile
6169
func (r *PrefectWorkPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
62-
log := ctrllog.FromContext(ctx)
70+
log := log.FromContext(ctx)
6371
log.V(1).Info("Reconciling PrefectWorkPool")
6472

6573
var workPool prefectiov1.PrefectWorkPool
@@ -70,6 +78,21 @@ func (r *PrefectWorkPoolReconciler) Reconcile(ctx context.Context, req ctrl.Requ
7078
return ctrl.Result{}, err
7179
}
7280

81+
currentStatus := workPool.Status
82+
83+
// Defer a final status update at the end of the reconciliation loop, so that any of the
84+
// individual reconciliation functions can update the status as they see fit.
85+
defer func() {
86+
// Skip status update if nothing changed, to avoid conflicts
87+
if equality.Semantic.DeepEqual(workPool.Status, currentStatus) {
88+
return
89+
}
90+
91+
if statusErr := r.Status().Update(ctx, &workPool); statusErr != nil {
92+
log.Error(statusErr, "Failed to update WorkPool status")
93+
}
94+
}()
95+
7396
// Handle deletion
7497
if workPool.DeletionTimestamp != nil {
7598
return r.handleDeletion(ctx, &workPool)
@@ -85,13 +108,19 @@ func (r *PrefectWorkPoolReconciler) Reconcile(ctx context.Context, req ctrl.Requ
85108
return ctrl.Result{RequeueAfter: time.Second}, nil
86109
}
87110

88-
// Defer a final status update at the end of the reconciliation loop, so that any of the
89-
// individual reconciliation functions can update the status as they see fit.
90-
defer func() {
91-
if statusErr := r.Status().Update(ctx, &workPool); statusErr != nil {
92-
log.Error(statusErr, "Failed to update WorkPool status")
111+
specHash, err := utils.Hash(workPool.Spec, 16)
112+
if err != nil {
113+
log.Error(err, "Failed to calculate spec hash", "workPool", workPool.Name)
114+
return ctrl.Result{}, err
115+
}
116+
117+
if r.needsSync(&workPool, specHash) {
118+
log.Info("Starting sync with Prefect API", "deployment", workPool.Name)
119+
err := r.syncWithPrefect(ctx, &workPool)
120+
if err != nil {
121+
return ctrl.Result{}, err
93122
}
94-
}()
123+
}
95124

96125
objName := constants.Deployment
97126

@@ -187,12 +216,114 @@ func (r *PrefectWorkPoolReconciler) Reconcile(ctx context.Context, req ctrl.Requ
187216
}
188217
}
189218

219+
r.setCondition(&workPool, PrefectWorkPoolConditionReady, metav1.ConditionTrue, "WorkPoolReady", "Work pool is ready and operational")
220+
190221
return ctrl.Result{}, nil
191222
}
192223

224+
func (r *PrefectWorkPoolReconciler) needsSync(workPool *prefectiov1.PrefectWorkPool, currentSpecHash string) bool {
225+
if workPool.Status.Id == nil || *workPool.Status.Id == "" {
226+
return true
227+
}
228+
229+
if workPool.Status.SpecHash != currentSpecHash {
230+
return true
231+
}
232+
233+
if workPool.Status.ObservedGeneration < workPool.Generation {
234+
return true
235+
}
236+
237+
// Drift detection: sync if last sync was too long ago
238+
if workPool.Status.LastSyncTime == nil {
239+
return true
240+
}
241+
242+
timeSinceLastSync := time.Since(workPool.Status.LastSyncTime.Time)
243+
return timeSinceLastSync > 10*time.Minute
244+
}
245+
246+
func (r *PrefectWorkPoolReconciler) syncWithPrefect(ctx context.Context, workPool *prefectiov1.PrefectWorkPool) error {
247+
name := workPool.Name
248+
log := log.FromContext(ctx)
249+
250+
prefectClient, err := r.getPrefectClient(ctx, workPool)
251+
if err != nil {
252+
log.Error(err, "Failed to create Prefect client", "workPool", name)
253+
return err
254+
}
255+
256+
prefectWorkPool, err := prefectClient.GetWorkPool(ctx, name)
257+
if err != nil {
258+
log.Error(err, "Failed to get work pool in Prefect", "workPool", name)
259+
return err
260+
}
261+
262+
if prefectWorkPool == nil {
263+
workPoolSpec, err := prefect.ConvertToWorkPoolSpec(workPool)
264+
if err != nil {
265+
log.Error(err, "Failed to convert work pool spec", "workPool", name)
266+
r.setCondition(workPool, PrefectWorkPoolConditionSynced, metav1.ConditionFalse, "ConversionError", err.Error())
267+
return err
268+
}
269+
270+
prefectWorkPool, err = prefectClient.CreateWorkPool(ctx, workPoolSpec)
271+
if err != nil {
272+
log.Error(err, "Failed to create work pool in Prefect", "workPool", name)
273+
r.setCondition(workPool, PrefectWorkPoolConditionSynced, metav1.ConditionFalse, "SyncError", err.Error())
274+
return err
275+
}
276+
} else {
277+
workPoolSpec, err := prefect.ConvertToWorkPoolUpdateSpec(workPool)
278+
if err != nil {
279+
log.Error(err, "Failed to convert work pool spec", "workPool", name)
280+
r.setCondition(workPool, PrefectWorkPoolConditionSynced, metav1.ConditionFalse, "ConversionError", err.Error())
281+
return err
282+
}
283+
284+
err = prefectClient.UpdateWorkPool(ctx, workPool.Name, workPoolSpec)
285+
if err != nil {
286+
log.Error(err, "Failed to update work pool in Prefect", "workPool", name)
287+
r.setCondition(workPool, PrefectWorkPoolConditionSynced, metav1.ConditionFalse, "SyncError", err.Error())
288+
return err
289+
}
290+
}
291+
292+
prefect.UpdateWorkPoolStatus(workPool, prefectWorkPool)
293+
294+
specHash, err := utils.Hash(workPool.Spec, 16)
295+
if err != nil {
296+
log.Error(err, "Failed to calculate spec hash", "workPool", workPool.Name)
297+
return err
298+
}
299+
300+
now := metav1.Now()
301+
302+
workPool.Status.SpecHash = specHash
303+
workPool.Status.ObservedGeneration = workPool.Generation
304+
workPool.Status.LastSyncTime = &now
305+
306+
r.setCondition(workPool, PrefectWorkPoolConditionSynced, metav1.ConditionTrue, "SyncSuccessful", "Work pool successfully synced with Prefect API")
307+
308+
return nil
309+
}
310+
311+
// setCondition sets a condition on the deployment status
312+
func (r *PrefectWorkPoolReconciler) setCondition(workPool *prefectiov1.PrefectWorkPool, conditionType string, status metav1.ConditionStatus, reason, message string) {
313+
condition := metav1.Condition{
314+
Type: conditionType,
315+
Status: status,
316+
LastTransitionTime: metav1.Now(),
317+
Reason: reason,
318+
Message: message,
319+
}
320+
321+
meta.SetStatusCondition(&workPool.Status.Conditions, condition)
322+
}
323+
193324
// handleDeletion handles the cleanup of a PrefectWorkPool that is being deleted
194325
func (r *PrefectWorkPoolReconciler) handleDeletion(ctx context.Context, workPool *prefectiov1.PrefectWorkPool) (ctrl.Result, error) {
195-
log := ctrllog.FromContext(ctx)
326+
log := log.FromContext(ctx)
196327
log.Info("Handling deletion of PrefectWorkPool", "workPool", workPool.Name)
197328

198329
// If the finalizer is not present, nothing to do
@@ -234,6 +365,25 @@ func (r *PrefectWorkPoolReconciler) handleDeletion(ctx context.Context, workPool
234365
return ctrl.Result{}, nil
235366
}
236367

368+
func (r *PrefectWorkPoolReconciler) getPrefectClient(ctx context.Context, workPool *prefectiov1.PrefectWorkPool) (prefect.PrefectClient, error) {
369+
log := log.FromContext(ctx)
370+
name := workPool.Name
371+
372+
// Use injected client if available (for testing)
373+
prefectClient := r.PrefectClient
374+
375+
if prefectClient == nil {
376+
var err error
377+
prefectClient, err = prefect.NewClientFromK8s(ctx, &workPool.Spec.Server, r.Client, workPool.Namespace, log)
378+
if err != nil {
379+
log.Error(err, "Failed to create Prefect client", "workPool", name)
380+
return nil, err
381+
}
382+
}
383+
384+
return prefectClient, nil
385+
}
386+
237387
// SetupWithManager sets up the controller with the Manager.
238388
func (r *PrefectWorkPoolReconciler) SetupWithManager(mgr ctrl.Manager) error {
239389
return ctrl.NewControllerManagedBy(mgr).

0 commit comments

Comments
 (0)