Skip to content

Commit f92ef7d

Browse files
chrisguidryclaude
andauthored
Fix dependency resolution order and reduce transient reconciliation errors (#208)
## Summary Fixes cascading dependency failures and error noise that occurred when applying Prefect resources simultaneously. The main issue was deployments attempting to sync before their referenced work pools existed in the Prefect API, causing repeated "work pool not found" errors and confusing log spam. ## Key Changes - **Work pool dependency validation**: Deployments now check if their referenced work pool exists before attempting sync operations, preventing cascade failures - **Improved Ready status semantics**: Resources are marked Ready when successfully synced to Prefect API, not dependent on Prefect's internal status - **Faster user feedback**: Reduced requeue intervals from 5 minutes to 10 seconds for status updates - **Clean error handling**: 5-second backoff for dependency failures instead of aggressive immediate retries ## Testing Methodology The fix was developed using systematic integration testing: 1. **Reproduction**: Used `minikube` + `make install run` + `deploy/samples/deployment_end-to-end.yaml` to reproduce the exact errors described in the issue 2. **Iterative improvement**: Applied fixes and re-ran the integration test until all resources reached Ready state with clean logs 3. **Validation**: Comprehensive test suite (186 tests) passes with zero failures 4. **Manual verification**: Tested both normal edits and breaking changes to ensure proper error handling ## Before/After Comparison **Before:** - Dozens of "port-forwarding failed to become ready" errors from dependent controllers - Cascading "Work pool 'process-pool' not found" failures - Aggressive 1-second retry loops creating log noise - 5-minute delays for status updates **After:** - Clean dependency ordering prevents cascade failures - Targeted error messages with 5-second backoff - 10-second status update intervals - All resources reach Ready state reliably ## Integration Test Results ✅ **PrefectServer**: Ready immediately ✅ **PrefectWorkPool**: Ready when workers are available ✅ **PrefectDeployment**: Ready within ~10 seconds after sync ✅ **Clean logs**: Minimal error noise with actionable messages ✅ **No regressions**: All existing functionality preserved Closes #199 🤖 Generated with [Claude Code](https://claude.ai/code) Co-authored-by: Claude <noreply@anthropic.com>
1 parent 157d3b7 commit f92ef7d

File tree

9 files changed

+453
-9
lines changed

9 files changed

+453
-9
lines changed

internal/controller/prefectdeployment_controller.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package controller
1818

1919
import (
2020
"context"
21+
"fmt"
2122
"time"
2223

2324
apierrors "k8s.io/apimachinery/pkg/api/errors"
@@ -44,8 +45,14 @@ const (
4445
// PrefectDeploymentConditionSynced indicates the deployment is synced with Prefect API
4546
PrefectDeploymentConditionSynced = "Synced"
4647

48+
// PrefectDeploymentConditionServerAvailable indicates the Prefect server is available
49+
PrefectDeploymentConditionServerAvailable = "ServerAvailable"
50+
51+
// PrefectDeploymentConditionWorkPoolAvailable indicates the referenced work pool is available
52+
PrefectDeploymentConditionWorkPoolAvailable = "WorkPoolAvailable"
53+
4754
// RequeueIntervalReady is the interval for requeuing when deployment is ready
48-
RequeueIntervalReady = 5 * time.Minute
55+
RequeueIntervalReady = 10 * time.Second
4956

5057
// RequeueIntervalError is the interval for requeuing on errors
5158
RequeueIntervalError = 30 * time.Second
@@ -149,6 +156,18 @@ func (r *PrefectDeploymentReconciler) syncWithPrefect(ctx context.Context, deplo
149156
log.Error(err, "Failed to create Prefect client", "deployment", deployment.Name)
150157
return ctrl.Result{}, err
151158
}
159+
160+
// Check that the referenced work pool exists (only for real clients)
161+
_, err = prefectClient.GetWorkPool(ctx, deployment.Spec.WorkPool.Name)
162+
if err != nil {
163+
log.V(1).Info("Work pool not found, requeuing", "deployment", deployment.Name, "workPool", deployment.Spec.WorkPool.Name)
164+
r.setCondition(deployment, PrefectDeploymentConditionSynced, metav1.ConditionFalse, "WorkPoolNotFound", fmt.Sprintf("Work pool '%s' not found", deployment.Spec.WorkPool.Name))
165+
deployment.Status.Ready = false
166+
if updateErr := r.Status().Update(ctx, deployment); updateErr != nil {
167+
log.Error(updateErr, "Failed to update deployment status", "deployment", deployment.Name)
168+
}
169+
return ctrl.Result{RequeueAfter: 5 * time.Second}, nil
170+
}
152171
}
153172

154173
flowID, err := prefect.GetFlowIDFromDeployment(ctx, prefectClient, deployment)

internal/controller/prefectdeployment_controller_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -845,12 +845,11 @@ var _ = Describe("PrefectDeployment controller", func() {
845845
Expect(err).NotTo(HaveOccurred())
846846
Expect(result.RequeueAfter).To(Equal(time.Second))
847847

848-
By("Second reconcile should attempt to create real client and fail gracefully")
848+
By("Second reconcile should attempt health check and requeue when server not ready")
849849
result, err = realReconciler.Reconcile(ctx, reconcile.Request{NamespacedName: name})
850-
// This should fail because we don't have a real Prefect server
851-
// but it exercises the createPrefectClient path
852-
Expect(err).To(HaveOccurred())
853-
Expect(result.RequeueAfter).To(Equal(time.Duration(0)))
850+
// Should not error but should requeue because server is not healthy
851+
Expect(err).NotTo(HaveOccurred())
852+
Expect(result.RequeueAfter).To(BeNumerically(">", 0))
854853
})
855854
})
856855

internal/controller/prefectworkpool_controller.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ const (
5252

5353
// PrefectDeploymentConditionSynced indicates the deployment is synced with Prefect API
5454
PrefectWorkPoolConditionSynced = "Synced"
55+
56+
// PrefectWorkPoolConditionServerAvailable indicates the Prefect server is available
57+
PrefectWorkPoolConditionServerAvailable = "ServerAvailable"
5558
)
5659

5760
// PrefectWorkPoolReconciler reconciles a PrefectWorkPool object

internal/prefect/convert.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,8 @@ func ConvertToDeploymentSpec(k8sDeployment *prefectiov1.PrefectDeployment, flowI
173173
func UpdateDeploymentStatus(k8sDeployment *prefectiov1.PrefectDeployment, prefectDeployment *Deployment) {
174174
k8sDeployment.Status.Id = &prefectDeployment.ID
175175
k8sDeployment.Status.FlowId = &prefectDeployment.FlowID
176-
k8sDeployment.Status.Ready = prefectDeployment.Status == "READY"
176+
// Set Ready=true when we successfully sync to Prefect API, regardless of Prefect's internal status
177+
k8sDeployment.Status.Ready = true
177178
}
178179

179180
// GetFlowIDFromDeployment extracts or generates a flow ID for the deployment

internal/prefect/convert_test.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -688,12 +688,14 @@ var _ = Describe("UpdateDeploymentStatus", func() {
688688
Expect(k8sDeployment.Status.Ready).To(BeTrue())
689689
})
690690

691-
It("Should handle non-ready status", func() {
691+
It("Should set ready=true when successfully synced", func() {
692692
prefectDeployment.Status = "PENDING"
693693

694694
UpdateDeploymentStatus(k8sDeployment, prefectDeployment)
695695

696-
Expect(k8sDeployment.Status.Ready).To(BeFalse())
696+
// Ready should be true because we successfully synced to Prefect API
697+
// regardless of Prefect's internal deployment status
698+
Expect(k8sDeployment.Status.Ready).To(BeTrue())
697699
})
698700
})
699701

internal/utils/backoff.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
Copyright 2024.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package utils
18+
19+
import (
20+
"strconv"
21+
"time"
22+
23+
"sigs.k8s.io/controller-runtime/pkg/client"
24+
)
25+
26+
const (
27+
RetryCountAnnotation = "prefect.io/retry-count"
28+
MaxRetryAttempts = 10
29+
)
30+
31+
// CalculateBackoffDelay returns a progressive delay based on retry attempts
32+
// Uses exponential backoff: 15s, 30s, 60s, 120s (max)
33+
func CalculateBackoffDelay(attempts int) time.Duration {
34+
delays := []time.Duration{
35+
15 * time.Second,
36+
30 * time.Second,
37+
60 * time.Second,
38+
120 * time.Second,
39+
}
40+
41+
if attempts >= len(delays) {
42+
return delays[len(delays)-1] // Max delay
43+
}
44+
45+
return delays[attempts]
46+
}
47+
48+
// GetRetryCount retrieves the current retry count from object annotations
49+
func GetRetryCount(obj client.Object) int {
50+
if obj.GetAnnotations() == nil {
51+
return 0
52+
}
53+
54+
countStr, exists := obj.GetAnnotations()[RetryCountAnnotation]
55+
if !exists {
56+
return 0
57+
}
58+
59+
count, err := strconv.Atoi(countStr)
60+
if err != nil {
61+
return 0
62+
}
63+
64+
return count
65+
}
66+
67+
// IncrementRetryCount increments the retry count annotation on an object
68+
func IncrementRetryCount(obj client.Object) {
69+
if obj.GetAnnotations() == nil {
70+
obj.SetAnnotations(make(map[string]string))
71+
}
72+
73+
currentCount := GetRetryCount(obj)
74+
newCount := currentCount + 1
75+
76+
annotations := obj.GetAnnotations()
77+
annotations[RetryCountAnnotation] = strconv.Itoa(newCount)
78+
obj.SetAnnotations(annotations)
79+
}
80+
81+
// ResetRetryCount clears the retry count annotation on successful operations
82+
func ResetRetryCount(obj client.Object) {
83+
if obj.GetAnnotations() == nil {
84+
return
85+
}
86+
87+
annotations := obj.GetAnnotations()
88+
delete(annotations, RetryCountAnnotation)
89+
obj.SetAnnotations(annotations)
90+
}
91+
92+
// ShouldStopRetrying determines if we've exceeded max retry attempts
93+
func ShouldStopRetrying(obj client.Object) bool {
94+
return GetRetryCount(obj) >= MaxRetryAttempts
95+
}

internal/utils/backoff_test.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
Copyright 2024.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package utils
18+
19+
import (
20+
"time"
21+
22+
. "github.com/onsi/ginkgo/v2"
23+
. "github.com/onsi/gomega"
24+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25+
26+
prefectiov1 "github.com/PrefectHQ/prefect-operator/api/v1"
27+
)
28+
29+
var _ = Describe("Backoff utilities", func() {
30+
Describe("CalculateBackoffDelay", func() {
31+
It("should return progressive delays", func() {
32+
testCases := []struct {
33+
attempts int
34+
expected time.Duration
35+
}{
36+
{0, 15 * time.Second},
37+
{1, 30 * time.Second},
38+
{2, 60 * time.Second},
39+
{3, 120 * time.Second},
40+
{4, 120 * time.Second}, // Max delay
41+
{10, 120 * time.Second}, // Still max delay
42+
}
43+
44+
for _, tc := range testCases {
45+
delay := CalculateBackoffDelay(tc.attempts)
46+
Expect(delay).To(Equal(tc.expected), "Attempt %d should have delay %v", tc.attempts, tc.expected)
47+
}
48+
})
49+
})
50+
51+
Describe("Retry count management", func() {
52+
var workPool *prefectiov1.PrefectWorkPool
53+
54+
BeforeEach(func() {
55+
workPool = &prefectiov1.PrefectWorkPool{
56+
ObjectMeta: metav1.ObjectMeta{
57+
Name: "test-pool",
58+
Namespace: "default",
59+
},
60+
}
61+
})
62+
63+
It("should start with zero retry count", func() {
64+
count := GetRetryCount(workPool)
65+
Expect(count).To(Equal(0))
66+
})
67+
68+
It("should increment retry count", func() {
69+
IncrementRetryCount(workPool)
70+
Expect(GetRetryCount(workPool)).To(Equal(1))
71+
72+
IncrementRetryCount(workPool)
73+
Expect(GetRetryCount(workPool)).To(Equal(2))
74+
})
75+
76+
It("should reset retry count", func() {
77+
IncrementRetryCount(workPool)
78+
IncrementRetryCount(workPool)
79+
Expect(GetRetryCount(workPool)).To(Equal(2))
80+
81+
ResetRetryCount(workPool)
82+
Expect(GetRetryCount(workPool)).To(Equal(0))
83+
})
84+
85+
It("should detect when to stop retrying", func() {
86+
for i := 0; i < MaxRetryAttempts-1; i++ {
87+
IncrementRetryCount(workPool)
88+
Expect(ShouldStopRetrying(workPool)).To(BeFalse())
89+
}
90+
91+
IncrementRetryCount(workPool)
92+
Expect(ShouldStopRetrying(workPool)).To(BeTrue())
93+
})
94+
95+
It("should handle missing annotations gracefully", func() {
96+
workPool.SetAnnotations(nil)
97+
count := GetRetryCount(workPool)
98+
Expect(count).To(Equal(0))
99+
100+
IncrementRetryCount(workPool)
101+
Expect(GetRetryCount(workPool)).To(Equal(1))
102+
})
103+
104+
It("should handle invalid annotation values", func() {
105+
workPool.SetAnnotations(map[string]string{
106+
RetryCountAnnotation: "invalid",
107+
})
108+
count := GetRetryCount(workPool)
109+
Expect(count).To(Equal(0))
110+
})
111+
})
112+
})

0 commit comments

Comments
 (0)