-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathprefectdeployment_controller.go
More file actions
279 lines (231 loc) · 10.9 KB
/
prefectdeployment_controller.go
File metadata and controls
279 lines (231 loc) · 10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"context"
"fmt"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/log"
prefectiov1 "github.com/PrefectHQ/prefect-operator/api/v1"
"github.com/PrefectHQ/prefect-operator/internal/prefect"
"github.com/PrefectHQ/prefect-operator/internal/utils"
)
const (
// PrefectDeploymentFinalizer is the finalizer used to ensure cleanup of Prefect deployments
PrefectDeploymentFinalizer = "prefect.io/deployment-cleanup"
// PrefectDeploymentConditionReady indicates the deployment is ready
PrefectDeploymentConditionReady = "Ready"
// PrefectDeploymentConditionSynced indicates the deployment is synced with Prefect API
PrefectDeploymentConditionSynced = "Synced"
// PrefectDeploymentConditionServerAvailable indicates the Prefect server is available
PrefectDeploymentConditionServerAvailable = "ServerAvailable"
// PrefectDeploymentConditionWorkPoolAvailable indicates the referenced work pool is available
PrefectDeploymentConditionWorkPoolAvailable = "WorkPoolAvailable"
// RequeueIntervalReady is the interval for requeuing when deployment is ready
RequeueIntervalReady = 10 * time.Second
// RequeueIntervalError is the interval for requeuing on errors
RequeueIntervalError = 30 * time.Second
// RequeueIntervalSync is the interval for requeuing during sync operations
RequeueIntervalSync = 10 * time.Second
)
// PrefectDeploymentReconciler reconciles a PrefectDeployment object
type PrefectDeploymentReconciler struct {
client.Client
Scheme *runtime.Scheme
PrefectClient prefect.PrefectClient
}
//+kubebuilder:rbac:groups=prefect.io,resources=prefectdeployments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=prefect.io,resources=prefectdeployments/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=prefect.io,resources=prefectdeployments/finalizers,verbs=update
// Reconcile handles the reconciliation of a PrefectDeployment
func (r *PrefectDeploymentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := log.FromContext(ctx)
log.V(1).Info("Reconciling PrefectDeployment", "request", req)
var deployment prefectiov1.PrefectDeployment
if err := r.Get(ctx, req.NamespacedName, &deployment); err != nil {
if apierrors.IsNotFound(err) {
log.V(1).Info("PrefectDeployment not found, ignoring", "request", req)
return ctrl.Result{}, nil
}
log.Error(err, "Failed to get PrefectDeployment", "request", req)
return ctrl.Result{}, err
}
// Handle deletion
if deployment.DeletionTimestamp != nil {
return r.handleDeletion(ctx, &deployment)
}
// Ensure finalizer is present
if !controllerutil.ContainsFinalizer(&deployment, PrefectDeploymentFinalizer) {
controllerutil.AddFinalizer(&deployment, PrefectDeploymentFinalizer)
if err := r.Update(ctx, &deployment); err != nil {
log.Error(err, "Failed to add finalizer", "deployment", deployment.Name)
return ctrl.Result{}, err
}
return ctrl.Result{RequeueAfter: time.Second}, nil
}
specHash, err := utils.Hash(deployment.Spec, 16)
if err != nil {
log.Error(err, "Failed to calculate spec hash", "deployment", deployment.Name)
return ctrl.Result{}, err
}
if r.needsSync(&deployment, specHash) {
log.Info("Starting sync with Prefect API", "deployment", deployment.Name)
result, err := r.syncWithPrefect(ctx, &deployment)
if err != nil {
return result, err
}
return result, nil
}
return ctrl.Result{RequeueAfter: RequeueIntervalReady}, nil
}
// needsSync determines if the deployment needs to be synced with Prefect API
func (r *PrefectDeploymentReconciler) needsSync(deployment *prefectiov1.PrefectDeployment, currentSpecHash string) bool {
if deployment.Status.Id == nil || *deployment.Status.Id == "" {
return true
}
if deployment.Status.SpecHash != currentSpecHash {
return true
}
if deployment.Status.ObservedGeneration < deployment.Generation {
return true
}
// Drift detection: sync if last sync was too long ago
if deployment.Status.LastSyncTime == nil {
return true
}
timeSinceLastSync := time.Since(deployment.Status.LastSyncTime.Time)
return timeSinceLastSync > 10*time.Minute
}
// syncWithPrefect syncs the deployment with the Prefect API
func (r *PrefectDeploymentReconciler) syncWithPrefect(ctx context.Context, deployment *prefectiov1.PrefectDeployment) (ctrl.Result, error) {
log := log.FromContext(ctx)
// Use injected client if available (for testing)
prefectClient := r.PrefectClient
if prefectClient == nil {
var err error
prefectClient, err = prefect.NewClientFromK8s(ctx, &deployment.Spec.Server, r.Client, deployment.Namespace, log)
if err != nil {
log.Error(err, "Failed to create Prefect client", "deployment", deployment.Name)
return ctrl.Result{}, err
}
// Check that the referenced work pool exists (only for real clients)
_, err = prefectClient.GetWorkPool(ctx, deployment.Spec.WorkPool.Name)
if err != nil {
log.V(1).Info("Work pool not found, requeuing", "deployment", deployment.Name, "workPool", deployment.Spec.WorkPool.Name)
r.setCondition(deployment, PrefectDeploymentConditionSynced, metav1.ConditionFalse, "WorkPoolNotFound", fmt.Sprintf("Work pool '%s' not found", deployment.Spec.WorkPool.Name))
deployment.Status.Ready = false
if updateErr := r.Status().Update(ctx, deployment); updateErr != nil {
log.Error(updateErr, "Failed to update deployment status", "deployment", deployment.Name)
}
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
}
flowID, err := prefect.GetFlowIDFromDeployment(ctx, prefectClient, deployment)
if err != nil {
log.Error(err, "Failed to get flow ID", "deployment", deployment.Name)
r.setCondition(deployment, PrefectDeploymentConditionSynced, metav1.ConditionFalse, "FlowIDError", err.Error())
return ctrl.Result{}, err
}
deploymentSpec, err := prefect.ConvertToDeploymentSpec(deployment, flowID)
if err != nil {
log.Error(err, "Failed to convert deployment spec", "deployment", deployment.Name)
r.setCondition(deployment, PrefectDeploymentConditionSynced, metav1.ConditionFalse, "ConversionError", err.Error())
return ctrl.Result{}, err
}
prefectDeployment, err := prefectClient.CreateOrUpdateDeployment(ctx, deploymentSpec)
if err != nil {
log.Error(err, "Failed to create or update deployment in Prefect", "deployment", deployment.Name)
r.setCondition(deployment, PrefectDeploymentConditionSynced, metav1.ConditionFalse, "SyncError", err.Error())
return ctrl.Result{}, err
}
prefect.UpdateDeploymentStatus(deployment, prefectDeployment)
specHash, err := utils.Hash(deployment.Spec, 16)
if err != nil {
log.Error(err, "Failed to calculate spec hash", "deployment", deployment.Name)
return ctrl.Result{}, err
}
deployment.Status.SpecHash = specHash
deployment.Status.ObservedGeneration = deployment.Generation
r.setCondition(deployment, PrefectDeploymentConditionSynced, metav1.ConditionTrue, "SyncSuccessful", "Deployment successfully synced with Prefect API")
r.setCondition(deployment, PrefectDeploymentConditionReady, metav1.ConditionTrue, "DeploymentReady", "Deployment is ready and operational")
if err := r.Status().Update(ctx, deployment); err != nil {
log.Error(err, "Failed to update deployment status", "deployment", deployment.Name)
return ctrl.Result{}, err
}
log.Info("Successfully synced deployment with Prefect", "deploymentId", prefectDeployment.ID)
return ctrl.Result{RequeueAfter: RequeueIntervalReady}, nil
}
// setCondition sets a condition on the deployment status
func (r *PrefectDeploymentReconciler) setCondition(deployment *prefectiov1.PrefectDeployment, conditionType string, status metav1.ConditionStatus, reason, message string) {
condition := metav1.Condition{
Type: conditionType,
Status: status,
LastTransitionTime: metav1.Now(),
Reason: reason,
Message: message,
}
meta.SetStatusCondition(&deployment.Status.Conditions, condition)
}
// handleDeletion handles the cleanup of a PrefectDeployment that is being deleted
func (r *PrefectDeploymentReconciler) handleDeletion(ctx context.Context, deployment *prefectiov1.PrefectDeployment) (ctrl.Result, error) {
log := log.FromContext(ctx)
log.Info("Handling deletion of PrefectDeployment", "deployment", deployment.Name)
// If finalizer is not present, nothing to do
if !controllerutil.ContainsFinalizer(deployment, PrefectDeploymentFinalizer) {
return ctrl.Result{}, nil
}
// Only attempt cleanup if we have a deployment ID in Prefect
if deployment.Status.Id != nil && *deployment.Status.Id != "" {
// Create Prefect client for cleanup
prefectClient := r.PrefectClient
if prefectClient == nil {
var err error
prefectClient, err = prefect.NewClientFromK8s(ctx, &deployment.Spec.Server, r.Client, deployment.Namespace, log)
if err != nil {
log.Error(err, "Failed to create Prefect client for deletion", "deployment", deployment.Name)
// Continue with finalizer removal even if client creation fails
// to avoid blocking deletion indefinitely
} else {
// Attempt to delete from Prefect API
if err := prefectClient.DeleteDeployment(ctx, *deployment.Status.Id); err != nil {
log.Error(err, "Failed to delete deployment from Prefect API", "deployment", deployment.Name, "prefectId", *deployment.Status.Id)
// Continue with finalizer removal even if Prefect deletion fails
// to avoid blocking Kubernetes deletion indefinitely
} else {
log.Info("Successfully deleted deployment from Prefect API", "deployment", deployment.Name, "prefectId", *deployment.Status.Id)
}
}
}
}
// Remove finalizer to allow Kubernetes to complete deletion
controllerutil.RemoveFinalizer(deployment, PrefectDeploymentFinalizer)
if err := r.Update(ctx, deployment); err != nil {
log.Error(err, "Failed to remove finalizer", "deployment", deployment.Name)
return ctrl.Result{}, err
}
log.Info("Finalizer removed, deletion will proceed", "deployment", deployment.Name)
return ctrl.Result{}, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *PrefectDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&prefectiov1.PrefectDeployment{}).
Complete(r)
}