Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion internal/controller/prefectdeployment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package controller

import (
"context"
"fmt"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -44,8 +45,14 @@ const (
// PrefectDeploymentConditionSynced indicates the deployment is synced with Prefect API
PrefectDeploymentConditionSynced = "Synced"

// PrefectDeploymentConditionServerAvailable indicates the Prefect server is available
PrefectDeploymentConditionServerAvailable = "ServerAvailable"

// PrefectDeploymentConditionWorkPoolAvailable indicates the referenced work pool is available
PrefectDeploymentConditionWorkPoolAvailable = "WorkPoolAvailable"

// RequeueIntervalReady is the interval for requeuing when deployment is ready
RequeueIntervalReady = 5 * time.Minute
RequeueIntervalReady = 10 * time.Second

// RequeueIntervalError is the interval for requeuing on errors
RequeueIntervalError = 30 * time.Second
Expand Down Expand Up @@ -149,6 +156,18 @@ func (r *PrefectDeploymentReconciler) syncWithPrefect(ctx context.Context, deplo
log.Error(err, "Failed to create Prefect client", "deployment", deployment.Name)
return ctrl.Result{}, err
}

// Check that the referenced work pool exists (only for real clients)
_, err = prefectClient.GetWorkPool(ctx, deployment.Spec.WorkPool.Name)
if err != nil {
log.V(1).Info("Work pool not found, requeuing", "deployment", deployment.Name, "workPool", deployment.Spec.WorkPool.Name)
r.setCondition(deployment, PrefectDeploymentConditionSynced, metav1.ConditionFalse, "WorkPoolNotFound", fmt.Sprintf("Work pool '%s' not found", deployment.Spec.WorkPool.Name))
deployment.Status.Ready = false
if updateErr := r.Status().Update(ctx, deployment); updateErr != nil {
log.Error(updateErr, "Failed to update deployment status", "deployment", deployment.Name)
}
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
}
}

flowID, err := prefect.GetFlowIDFromDeployment(ctx, prefectClient, deployment)
Expand Down
9 changes: 4 additions & 5 deletions internal/controller/prefectdeployment_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,12 +845,11 @@ var _ = Describe("PrefectDeployment controller", func() {
Expect(err).NotTo(HaveOccurred())
Expect(result.RequeueAfter).To(Equal(time.Second))

By("Second reconcile should attempt to create real client and fail gracefully")
By("Second reconcile should attempt health check and requeue when server not ready")
result, err = realReconciler.Reconcile(ctx, reconcile.Request{NamespacedName: name})
// This should fail because we don't have a real Prefect server
// but it exercises the createPrefectClient path
Expect(err).To(HaveOccurred())
Expect(result.RequeueAfter).To(Equal(time.Duration(0)))
// Should not error but should requeue because server is not healthy
Expect(err).NotTo(HaveOccurred())
Expect(result.RequeueAfter).To(BeNumerically(">", 0))
})
})

Expand Down
3 changes: 3 additions & 0 deletions internal/controller/prefectworkpool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ const (

// PrefectDeploymentConditionSynced indicates the deployment is synced with Prefect API
PrefectWorkPoolConditionSynced = "Synced"

// PrefectWorkPoolConditionServerAvailable indicates the Prefect server is available
PrefectWorkPoolConditionServerAvailable = "ServerAvailable"
)

// PrefectWorkPoolReconciler reconciles a PrefectWorkPool object
Expand Down
3 changes: 2 additions & 1 deletion internal/prefect/convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,8 @@ func ConvertToDeploymentSpec(k8sDeployment *prefectiov1.PrefectDeployment, flowI
func UpdateDeploymentStatus(k8sDeployment *prefectiov1.PrefectDeployment, prefectDeployment *Deployment) {
k8sDeployment.Status.Id = &prefectDeployment.ID
k8sDeployment.Status.FlowId = &prefectDeployment.FlowID
k8sDeployment.Status.Ready = prefectDeployment.Status == "READY"
// Set Ready=true when we successfully sync to Prefect API, regardless of Prefect's internal status
k8sDeployment.Status.Ready = true
}

// GetFlowIDFromDeployment extracts or generates a flow ID for the deployment
Expand Down
6 changes: 4 additions & 2 deletions internal/prefect/convert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,12 +688,14 @@ var _ = Describe("UpdateDeploymentStatus", func() {
Expect(k8sDeployment.Status.Ready).To(BeTrue())
})

It("Should handle non-ready status", func() {
It("Should set ready=true when successfully synced", func() {
prefectDeployment.Status = "PENDING"

UpdateDeploymentStatus(k8sDeployment, prefectDeployment)

Expect(k8sDeployment.Status.Ready).To(BeFalse())
// Ready should be true because we successfully synced to Prefect API
// regardless of Prefect's internal deployment status
Expect(k8sDeployment.Status.Ready).To(BeTrue())
})
})

Expand Down
95 changes: 95 additions & 0 deletions internal/utils/backoff.go
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(non-blocking thought): I wonder if either of these packages would help here to avoid writing our own backoff logic.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's smart, captured it here: #210

Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
Copyright 2024.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
"strconv"
"time"

"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
RetryCountAnnotation = "prefect.io/retry-count"
MaxRetryAttempts = 10
)

// CalculateBackoffDelay returns a progressive delay based on retry attempts
// Uses exponential backoff: 15s, 30s, 60s, 120s (max)
func CalculateBackoffDelay(attempts int) time.Duration {
delays := []time.Duration{
15 * time.Second,
30 * time.Second,
60 * time.Second,
120 * time.Second,
}

if attempts >= len(delays) {
return delays[len(delays)-1] // Max delay
}

return delays[attempts]
}

// GetRetryCount retrieves the current retry count from object annotations
func GetRetryCount(obj client.Object) int {
if obj.GetAnnotations() == nil {
return 0
}

countStr, exists := obj.GetAnnotations()[RetryCountAnnotation]
if !exists {
return 0
}

count, err := strconv.Atoi(countStr)
if err != nil {
return 0
}

return count
}

// IncrementRetryCount increments the retry count annotation on an object
func IncrementRetryCount(obj client.Object) {
if obj.GetAnnotations() == nil {
obj.SetAnnotations(make(map[string]string))
}

currentCount := GetRetryCount(obj)
newCount := currentCount + 1

annotations := obj.GetAnnotations()
annotations[RetryCountAnnotation] = strconv.Itoa(newCount)
obj.SetAnnotations(annotations)
}

// ResetRetryCount clears the retry count annotation on successful operations
func ResetRetryCount(obj client.Object) {
if obj.GetAnnotations() == nil {
return
}

annotations := obj.GetAnnotations()
delete(annotations, RetryCountAnnotation)
obj.SetAnnotations(annotations)
}

// ShouldStopRetrying determines if we've exceeded max retry attempts
func ShouldStopRetrying(obj client.Object) bool {
return GetRetryCount(obj) >= MaxRetryAttempts
}
112 changes: 112 additions & 0 deletions internal/utils/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
Copyright 2024.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package utils

import (
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

prefectiov1 "github.com/PrefectHQ/prefect-operator/api/v1"
)

var _ = Describe("Backoff utilities", func() {
Describe("CalculateBackoffDelay", func() {
It("should return progressive delays", func() {
testCases := []struct {
attempts int
expected time.Duration
}{
{0, 15 * time.Second},
{1, 30 * time.Second},
{2, 60 * time.Second},
{3, 120 * time.Second},
{4, 120 * time.Second}, // Max delay
{10, 120 * time.Second}, // Still max delay
}

for _, tc := range testCases {
delay := CalculateBackoffDelay(tc.attempts)
Expect(delay).To(Equal(tc.expected), "Attempt %d should have delay %v", tc.attempts, tc.expected)
}
})
})

Describe("Retry count management", func() {
var workPool *prefectiov1.PrefectWorkPool

BeforeEach(func() {
workPool = &prefectiov1.PrefectWorkPool{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pool",
Namespace: "default",
},
}
})

It("should start with zero retry count", func() {
count := GetRetryCount(workPool)
Expect(count).To(Equal(0))
})

It("should increment retry count", func() {
IncrementRetryCount(workPool)
Expect(GetRetryCount(workPool)).To(Equal(1))

IncrementRetryCount(workPool)
Expect(GetRetryCount(workPool)).To(Equal(2))
})

It("should reset retry count", func() {
IncrementRetryCount(workPool)
IncrementRetryCount(workPool)
Expect(GetRetryCount(workPool)).To(Equal(2))

ResetRetryCount(workPool)
Expect(GetRetryCount(workPool)).To(Equal(0))
})

It("should detect when to stop retrying", func() {
for i := 0; i < MaxRetryAttempts-1; i++ {
IncrementRetryCount(workPool)
Expect(ShouldStopRetrying(workPool)).To(BeFalse())
}

IncrementRetryCount(workPool)
Expect(ShouldStopRetrying(workPool)).To(BeTrue())
})

It("should handle missing annotations gracefully", func() {
workPool.SetAnnotations(nil)
count := GetRetryCount(workPool)
Expect(count).To(Equal(0))

IncrementRetryCount(workPool)
Expect(GetRetryCount(workPool)).To(Equal(1))
})

It("should handle invalid annotation values", func() {
workPool.SetAnnotations(map[string]string{
RetryCountAnnotation: "invalid",
})
count := GetRetryCount(workPool)
Expect(count).To(Equal(0))
})
})
})
Loading
Loading