@@ -23,23 +23,31 @@ import (
2323 "github.com/PrefectHQ/prefect-operator/internal/prefect"
2424 appsv1 "k8s.io/api/apps/v1"
2525 corev1 "k8s.io/api/core/v1"
26+ "k8s.io/apimachinery/pkg/api/equality"
2627 "k8s.io/apimachinery/pkg/api/errors"
2728 "k8s.io/apimachinery/pkg/api/meta"
2829 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2930 "k8s.io/apimachinery/pkg/runtime"
3031 ctrl "sigs.k8s.io/controller-runtime"
3132 "sigs.k8s.io/controller-runtime/pkg/client"
3233 "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
33- ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
34+ "sigs.k8s.io/controller-runtime/pkg/log"
3435
3536 prefectiov1 "github.com/PrefectHQ/prefect-operator/api/v1"
3637 "github.com/PrefectHQ/prefect-operator/internal/conditions"
3738 "github.com/PrefectHQ/prefect-operator/internal/constants"
39+ "github.com/PrefectHQ/prefect-operator/internal/utils"
3840)
3941
4042const (
4143 // PrefectWorkPoolFinalizer is the finalizer used to ensure cleanup of Prefect work pools
4244 PrefectWorkPoolFinalizer = "prefect.io/work-pool-cleanup"
45+
46+ // PrefectDeploymentConditionReady indicates the deployment is ready
47+ PrefectWorkPoolConditionReady = "Ready"
48+
49+ // PrefectDeploymentConditionSynced indicates the deployment is synced with Prefect API
50+ PrefectWorkPoolConditionSynced = "Synced"
4351)
4452
4553// PrefectWorkPoolReconciler reconciles a PrefectWorkPool object
@@ -59,7 +67,7 @@ type PrefectWorkPoolReconciler struct {
5967// For more details, check Reconcile and its Result here:
6068// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.3/pkg/reconcile
6169func (r * PrefectWorkPoolReconciler ) Reconcile (ctx context.Context , req ctrl.Request ) (ctrl.Result , error ) {
62- log := ctrllog .FromContext (ctx )
70+ log := log .FromContext (ctx )
6371 log .V (1 ).Info ("Reconciling PrefectWorkPool" )
6472
6573 var workPool prefectiov1.PrefectWorkPool
@@ -70,6 +78,21 @@ func (r *PrefectWorkPoolReconciler) Reconcile(ctx context.Context, req ctrl.Requ
7078 return ctrl.Result {}, err
7179 }
7280
81+ currentStatus := workPool .Status
82+
83+ // Defer a final status update at the end of the reconciliation loop, so that any of the
84+ // individual reconciliation functions can update the status as they see fit.
85+ defer func () {
86+ // Skip status update if nothing changed, to avoid conflicts
87+ if equality .Semantic .DeepEqual (workPool .Status , currentStatus ) {
88+ return
89+ }
90+
91+ if statusErr := r .Status ().Update (ctx , & workPool ); statusErr != nil {
92+ log .Error (statusErr , "Failed to update WorkPool status" )
93+ }
94+ }()
95+
7396 // Handle deletion
7497 if workPool .DeletionTimestamp != nil {
7598 return r .handleDeletion (ctx , & workPool )
@@ -85,13 +108,19 @@ func (r *PrefectWorkPoolReconciler) Reconcile(ctx context.Context, req ctrl.Requ
85108 return ctrl.Result {RequeueAfter : time .Second }, nil
86109 }
87110
88- // Defer a final status update at the end of the reconciliation loop, so that any of the
89- // individual reconciliation functions can update the status as they see fit.
90- defer func () {
91- if statusErr := r .Status ().Update (ctx , & workPool ); statusErr != nil {
92- log .Error (statusErr , "Failed to update WorkPool status" )
111+ specHash , err := utils .Hash (workPool .Spec , 16 )
112+ if err != nil {
113+ log .Error (err , "Failed to calculate spec hash" , "workPool" , workPool .Name )
114+ return ctrl.Result {}, err
115+ }
116+
117+ if r .needsSync (& workPool , specHash ) {
118+ log .Info ("Starting sync with Prefect API" , "deployment" , workPool .Name )
119+ err := r .syncWithPrefect (ctx , & workPool )
120+ if err != nil {
121+ return ctrl.Result {}, err
93122 }
94- }()
123+ }
95124
96125 objName := constants .Deployment
97126
@@ -187,12 +216,114 @@ func (r *PrefectWorkPoolReconciler) Reconcile(ctx context.Context, req ctrl.Requ
187216 }
188217 }
189218
219+ r .setCondition (& workPool , PrefectWorkPoolConditionReady , metav1 .ConditionTrue , "WorkPoolReady" , "Work pool is ready and operational" )
220+
190221 return ctrl.Result {}, nil
191222}
192223
224+ func (r * PrefectWorkPoolReconciler ) needsSync (workPool * prefectiov1.PrefectWorkPool , currentSpecHash string ) bool {
225+ if workPool .Status .Id == nil || * workPool .Status .Id == "" {
226+ return true
227+ }
228+
229+ if workPool .Status .SpecHash != currentSpecHash {
230+ return true
231+ }
232+
233+ if workPool .Status .ObservedGeneration < workPool .Generation {
234+ return true
235+ }
236+
237+ // Drift detection: sync if last sync was too long ago
238+ if workPool .Status .LastSyncTime == nil {
239+ return true
240+ }
241+
242+ timeSinceLastSync := time .Since (workPool .Status .LastSyncTime .Time )
243+ return timeSinceLastSync > 10 * time .Minute
244+ }
245+
246+ func (r * PrefectWorkPoolReconciler ) syncWithPrefect (ctx context.Context , workPool * prefectiov1.PrefectWorkPool ) error {
247+ name := workPool .Name
248+ log := log .FromContext (ctx )
249+
250+ prefectClient , err := r .getPrefectClient (ctx , workPool )
251+ if err != nil {
252+ log .Error (err , "Failed to create Prefect client" , "workPool" , name )
253+ return err
254+ }
255+
256+ prefectWorkPool , err := prefectClient .GetWorkPool (ctx , name )
257+ if err != nil {
258+ log .Error (err , "Failed to get work pool in Prefect" , "workPool" , name )
259+ return err
260+ }
261+
262+ if prefectWorkPool == nil {
263+ workPoolSpec , err := prefect .ConvertToWorkPoolSpec (workPool )
264+ if err != nil {
265+ log .Error (err , "Failed to convert work pool spec" , "workPool" , name )
266+ r .setCondition (workPool , PrefectWorkPoolConditionSynced , metav1 .ConditionFalse , "ConversionError" , err .Error ())
267+ return err
268+ }
269+
270+ prefectWorkPool , err = prefectClient .CreateWorkPool (ctx , workPoolSpec )
271+ if err != nil {
272+ log .Error (err , "Failed to create work pool in Prefect" , "workPool" , name )
273+ r .setCondition (workPool , PrefectWorkPoolConditionSynced , metav1 .ConditionFalse , "SyncError" , err .Error ())
274+ return err
275+ }
276+ } else {
277+ workPoolSpec , err := prefect .ConvertToWorkPoolUpdateSpec (workPool )
278+ if err != nil {
279+ log .Error (err , "Failed to convert work pool spec" , "workPool" , name )
280+ r .setCondition (workPool , PrefectWorkPoolConditionSynced , metav1 .ConditionFalse , "ConversionError" , err .Error ())
281+ return err
282+ }
283+
284+ err = prefectClient .UpdateWorkPool (ctx , workPool .Name , workPoolSpec )
285+ if err != nil {
286+ log .Error (err , "Failed to update work pool in Prefect" , "workPool" , name )
287+ r .setCondition (workPool , PrefectWorkPoolConditionSynced , metav1 .ConditionFalse , "SyncError" , err .Error ())
288+ return err
289+ }
290+ }
291+
292+ prefect .UpdateWorkPoolStatus (workPool , prefectWorkPool )
293+
294+ specHash , err := utils .Hash (workPool .Spec , 16 )
295+ if err != nil {
296+ log .Error (err , "Failed to calculate spec hash" , "workPool" , workPool .Name )
297+ return err
298+ }
299+
300+ now := metav1 .Now ()
301+
302+ workPool .Status .SpecHash = specHash
303+ workPool .Status .ObservedGeneration = workPool .Generation
304+ workPool .Status .LastSyncTime = & now
305+
306+ r .setCondition (workPool , PrefectWorkPoolConditionSynced , metav1 .ConditionTrue , "SyncSuccessful" , "Work pool successfully synced with Prefect API" )
307+
308+ return nil
309+ }
310+
311+ // setCondition sets a condition on the deployment status
312+ func (r * PrefectWorkPoolReconciler ) setCondition (workPool * prefectiov1.PrefectWorkPool , conditionType string , status metav1.ConditionStatus , reason , message string ) {
313+ condition := metav1.Condition {
314+ Type : conditionType ,
315+ Status : status ,
316+ LastTransitionTime : metav1 .Now (),
317+ Reason : reason ,
318+ Message : message ,
319+ }
320+
321+ meta .SetStatusCondition (& workPool .Status .Conditions , condition )
322+ }
323+
193324// handleDeletion handles the cleanup of a PrefectWorkPool that is being deleted
194325func (r * PrefectWorkPoolReconciler ) handleDeletion (ctx context.Context , workPool * prefectiov1.PrefectWorkPool ) (ctrl.Result , error ) {
195- log := ctrllog .FromContext (ctx )
326+ log := log .FromContext (ctx )
196327 log .Info ("Handling deletion of PrefectWorkPool" , "workPool" , workPool .Name )
197328
198329 // If the finalizer is not present, nothing to do
@@ -234,6 +365,25 @@ func (r *PrefectWorkPoolReconciler) handleDeletion(ctx context.Context, workPool
234365 return ctrl.Result {}, nil
235366}
236367
368+ func (r * PrefectWorkPoolReconciler ) getPrefectClient (ctx context.Context , workPool * prefectiov1.PrefectWorkPool ) (prefect.PrefectClient , error ) {
369+ log := log .FromContext (ctx )
370+ name := workPool .Name
371+
372+ // Use injected client if available (for testing)
373+ prefectClient := r .PrefectClient
374+
375+ if prefectClient == nil {
376+ var err error
377+ prefectClient , err = prefect .NewClientFromK8s (ctx , & workPool .Spec .Server , r .Client , workPool .Namespace , log )
378+ if err != nil {
379+ log .Error (err , "Failed to create Prefect client" , "workPool" , name )
380+ return nil , err
381+ }
382+ }
383+
384+ return prefectClient , nil
385+ }
386+
237387// SetupWithManager sets up the controller with the Manager.
238388func (r * PrefectWorkPoolReconciler ) SetupWithManager (mgr ctrl.Manager ) error {
239389 return ctrl .NewControllerManagedBy (mgr ).
0 commit comments