Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 72 additions & 6 deletions internal/controller/prefectworkpool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package controller

import (
"context"
"time"

"github.com/PrefectHQ/prefect-operator/internal/prefect"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -35,10 +37,16 @@ import (
"github.com/PrefectHQ/prefect-operator/internal/constants"
)

const (
// PrefectWorkPoolFinalizer is the finalizer used to ensure cleanup of Prefect work pools
PrefectWorkPoolFinalizer = "prefect.io/work-pool-cleanup"
)

// PrefectWorkPoolReconciler reconciles a PrefectWorkPool object
type PrefectWorkPoolReconciler struct {
client.Client
Scheme *runtime.Scheme
Scheme *runtime.Scheme
PrefectClient prefect.PrefectClient
}

//+kubebuilder:rbac:groups=prefect.io,resources=prefectworkpools,verbs=get;list;watch;create;update;patch;delete
Expand All @@ -52,21 +60,35 @@ type PrefectWorkPoolReconciler struct {
// - https://pkg.go.dev/sigs.k8s.io/controller-runtime@v0.17.3/pkg/reconcile
func (r *PrefectWorkPoolReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := ctrllog.FromContext(ctx)

log.V(1).Info("Reconciling PrefectWorkPool")

workPool := &prefectiov1.PrefectWorkPool{}
err := r.Get(ctx, req.NamespacedName, workPool)
var workPool prefectiov1.PrefectWorkPool
err := r.Get(ctx, req.NamespacedName, &workPool)
if errors.IsNotFound(err) {
return ctrl.Result{}, nil
} else if err != nil {
return ctrl.Result{}, err
}

// Handle deletion
if workPool.DeletionTimestamp != nil {
return r.handleDeletion(ctx, &workPool)
}

// Ensure a finalizer is present
if !controllerutil.ContainsFinalizer(&workPool, PrefectWorkPoolFinalizer) {
controllerutil.AddFinalizer(&workPool, PrefectWorkPoolFinalizer)
if err := r.Update(ctx, &workPool); err != nil {
log.Error(err, "Failed to add finalizer", "workPool", workPool.Name)
return ctrl.Result{}, err
}
return ctrl.Result{RequeueAfter: time.Second}, nil
}

// Defer a final status update at the end of the reconciliation loop, so that any of the
// individual reconciliation functions can update the status as they see fit.
defer func() {
if statusErr := r.Status().Update(ctx, workPool); statusErr != nil {
if statusErr := r.Status().Update(ctx, &workPool); statusErr != nil {
log.Error(statusErr, "Failed to update WorkPool status")
}
}()
Expand All @@ -82,7 +104,7 @@ func (r *PrefectWorkPoolReconciler) Reconcile(ctx context.Context, req ctrl.Requ
}

result, err := controllerutil.CreateOrUpdate(ctx, r.Client, deploy, func() error {
if err := ctrl.SetControllerReference(workPool, deploy, r.Scheme); err != nil {
if err := ctrl.SetControllerReference(&workPool, deploy, r.Scheme); err != nil {
return err
}

Expand Down Expand Up @@ -168,6 +190,50 @@ func (r *PrefectWorkPoolReconciler) Reconcile(ctx context.Context, req ctrl.Requ
return ctrl.Result{}, nil
}

// handleDeletion handles the cleanup of a PrefectWorkPool that is being deleted
func (r *PrefectWorkPoolReconciler) handleDeletion(ctx context.Context, workPool *prefectiov1.PrefectWorkPool) (ctrl.Result, error) {
log := ctrllog.FromContext(ctx)
log.Info("Handling deletion of PrefectWorkPool", "workPool", workPool.Name)

// If the finalizer is not present, nothing to do
if !controllerutil.ContainsFinalizer(workPool, PrefectWorkPoolFinalizer) {
return ctrl.Result{}, nil
}

if workPool.Name != "" {
// Create a Prefect client for cleanup
prefectClient := r.PrefectClient
if prefectClient == nil {
var err error
prefectClient, err = prefect.NewClientFromK8s(ctx, &workPool.Spec.Server, r.Client, workPool.Namespace, log)
if err != nil {
log.Error(err, "Failed to create Prefect client for deletion", "workPool", workPool.Name)
// Continue with finalizer removal even if client creation fails
// to avoid blocking deletion indefinitely
} else {
// Attempt to delete from Prefect API
if err := prefectClient.DeleteWorkPool(ctx, workPool.Name); err != nil {
log.Error(err, "Failed to delete workPool from Prefect API", "workPool", workPool.Name)
// Continue with finalizer removal even if Prefect deletion fails
// to avoid blocking Kubernetes deletion indefinitely
} else {
log.Info("Successfully deleted workPool from Prefect API", "workPool", workPool.Name)
}
}
}
}

// Remove the finalizer to allow Kubernetes to complete deletion
controllerutil.RemoveFinalizer(workPool, PrefectWorkPoolFinalizer)
if err := r.Update(ctx, workPool); err != nil {
log.Error(err, "Failed to remove finalizer", "deployment", workPool.Name)
return ctrl.Result{RequeueAfter: 15 * time.Second}, err
}

log.Info("Finalizer removed, deletion will proceed", "deployment", workPool.Name)
return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *PrefectWorkPoolReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down
Loading