diff --git a/internal/controller/prefectdeployment_controller.go b/internal/controller/prefectdeployment_controller.go index e934c6f..55b39a5 100644 --- a/internal/controller/prefectdeployment_controller.go +++ b/internal/controller/prefectdeployment_controller.go @@ -18,6 +18,7 @@ package controller import ( "context" + "fmt" "time" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -44,8 +45,14 @@ const ( // PrefectDeploymentConditionSynced indicates the deployment is synced with Prefect API PrefectDeploymentConditionSynced = "Synced" + // PrefectDeploymentConditionServerAvailable indicates the Prefect server is available + PrefectDeploymentConditionServerAvailable = "ServerAvailable" + + // PrefectDeploymentConditionWorkPoolAvailable indicates the referenced work pool is available + PrefectDeploymentConditionWorkPoolAvailable = "WorkPoolAvailable" + // RequeueIntervalReady is the interval for requeuing when deployment is ready - RequeueIntervalReady = 5 * time.Minute + RequeueIntervalReady = 10 * time.Second // RequeueIntervalError is the interval for requeuing on errors RequeueIntervalError = 30 * time.Second @@ -149,6 +156,18 @@ func (r *PrefectDeploymentReconciler) syncWithPrefect(ctx context.Context, deplo log.Error(err, "Failed to create Prefect client", "deployment", deployment.Name) return ctrl.Result{}, err } + + // Check that the referenced work pool exists (only for real clients) + _, err = prefectClient.GetWorkPool(ctx, deployment.Spec.WorkPool.Name) + if err != nil { + log.V(1).Info("Work pool not found, requeuing", "deployment", deployment.Name, "workPool", deployment.Spec.WorkPool.Name) + r.setCondition(deployment, PrefectDeploymentConditionSynced, metav1.ConditionFalse, "WorkPoolNotFound", fmt.Sprintf("Work pool '%s' not found", deployment.Spec.WorkPool.Name)) + deployment.Status.Ready = false + if updateErr := r.Status().Update(ctx, deployment); updateErr != nil { + log.Error(updateErr, "Failed to update deployment status", "deployment", deployment.Name) + } + return ctrl.Result{RequeueAfter: 5 * time.Second}, nil + } } flowID, err := prefect.GetFlowIDFromDeployment(ctx, prefectClient, deployment) diff --git a/internal/controller/prefectdeployment_controller_test.go b/internal/controller/prefectdeployment_controller_test.go index 54ed5cf..202eee8 100644 --- a/internal/controller/prefectdeployment_controller_test.go +++ b/internal/controller/prefectdeployment_controller_test.go @@ -845,12 +845,11 @@ var _ = Describe("PrefectDeployment controller", func() { Expect(err).NotTo(HaveOccurred()) Expect(result.RequeueAfter).To(Equal(time.Second)) - By("Second reconcile should attempt to create real client and fail gracefully") + By("Second reconcile should attempt health check and requeue when server not ready") result, err = realReconciler.Reconcile(ctx, reconcile.Request{NamespacedName: name}) - // This should fail because we don't have a real Prefect server - // but it exercises the createPrefectClient path - Expect(err).To(HaveOccurred()) - Expect(result.RequeueAfter).To(Equal(time.Duration(0))) + // Should not error but should requeue because server is not healthy + Expect(err).NotTo(HaveOccurred()) + Expect(result.RequeueAfter).To(BeNumerically(">", 0)) }) }) diff --git a/internal/controller/prefectworkpool_controller.go b/internal/controller/prefectworkpool_controller.go index 987f6d8..7e71216 100644 --- a/internal/controller/prefectworkpool_controller.go +++ b/internal/controller/prefectworkpool_controller.go @@ -52,6 +52,9 @@ const ( // PrefectDeploymentConditionSynced indicates the deployment is synced with Prefect API PrefectWorkPoolConditionSynced = "Synced" + + // PrefectWorkPoolConditionServerAvailable indicates the Prefect server is available + PrefectWorkPoolConditionServerAvailable = "ServerAvailable" ) // PrefectWorkPoolReconciler reconciles a PrefectWorkPool object diff --git a/internal/prefect/convert.go b/internal/prefect/convert.go index 45ad21e..dae9936 100644 --- a/internal/prefect/convert.go +++ b/internal/prefect/convert.go @@ -173,7 +173,8 @@ func ConvertToDeploymentSpec(k8sDeployment *prefectiov1.PrefectDeployment, flowI func UpdateDeploymentStatus(k8sDeployment *prefectiov1.PrefectDeployment, prefectDeployment *Deployment) { k8sDeployment.Status.Id = &prefectDeployment.ID k8sDeployment.Status.FlowId = &prefectDeployment.FlowID - k8sDeployment.Status.Ready = prefectDeployment.Status == "READY" + // Set Ready=true when we successfully sync to Prefect API, regardless of Prefect's internal status + k8sDeployment.Status.Ready = true } // GetFlowIDFromDeployment extracts or generates a flow ID for the deployment diff --git a/internal/prefect/convert_test.go b/internal/prefect/convert_test.go index 6fd2280..88b643b 100644 --- a/internal/prefect/convert_test.go +++ b/internal/prefect/convert_test.go @@ -688,12 +688,14 @@ var _ = Describe("UpdateDeploymentStatus", func() { Expect(k8sDeployment.Status.Ready).To(BeTrue()) }) - It("Should handle non-ready status", func() { + It("Should set ready=true when successfully synced", func() { prefectDeployment.Status = "PENDING" UpdateDeploymentStatus(k8sDeployment, prefectDeployment) - Expect(k8sDeployment.Status.Ready).To(BeFalse()) + // Ready should be true because we successfully synced to Prefect API + // regardless of Prefect's internal deployment status + Expect(k8sDeployment.Status.Ready).To(BeTrue()) }) }) diff --git a/internal/utils/backoff.go b/internal/utils/backoff.go new file mode 100644 index 0000000..ae664fd --- /dev/null +++ b/internal/utils/backoff.go @@ -0,0 +1,95 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "strconv" + "time" + + "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + RetryCountAnnotation = "prefect.io/retry-count" + MaxRetryAttempts = 10 +) + +// CalculateBackoffDelay returns a progressive delay based on retry attempts +// Uses exponential backoff: 15s, 30s, 60s, 120s (max) +func CalculateBackoffDelay(attempts int) time.Duration { + delays := []time.Duration{ + 15 * time.Second, + 30 * time.Second, + 60 * time.Second, + 120 * time.Second, + } + + if attempts >= len(delays) { + return delays[len(delays)-1] // Max delay + } + + return delays[attempts] +} + +// GetRetryCount retrieves the current retry count from object annotations +func GetRetryCount(obj client.Object) int { + if obj.GetAnnotations() == nil { + return 0 + } + + countStr, exists := obj.GetAnnotations()[RetryCountAnnotation] + if !exists { + return 0 + } + + count, err := strconv.Atoi(countStr) + if err != nil { + return 0 + } + + return count +} + +// IncrementRetryCount increments the retry count annotation on an object +func IncrementRetryCount(obj client.Object) { + if obj.GetAnnotations() == nil { + obj.SetAnnotations(make(map[string]string)) + } + + currentCount := GetRetryCount(obj) + newCount := currentCount + 1 + + annotations := obj.GetAnnotations() + annotations[RetryCountAnnotation] = strconv.Itoa(newCount) + obj.SetAnnotations(annotations) +} + +// ResetRetryCount clears the retry count annotation on successful operations +func ResetRetryCount(obj client.Object) { + if obj.GetAnnotations() == nil { + return + } + + annotations := obj.GetAnnotations() + delete(annotations, RetryCountAnnotation) + obj.SetAnnotations(annotations) +} + +// ShouldStopRetrying determines if we've exceeded max retry attempts +func ShouldStopRetrying(obj client.Object) bool { + return GetRetryCount(obj) >= MaxRetryAttempts +} diff --git a/internal/utils/backoff_test.go b/internal/utils/backoff_test.go new file mode 100644 index 0000000..ad5b809 --- /dev/null +++ b/internal/utils/backoff_test.go @@ -0,0 +1,112 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + prefectiov1 "github.com/PrefectHQ/prefect-operator/api/v1" +) + +var _ = Describe("Backoff utilities", func() { + Describe("CalculateBackoffDelay", func() { + It("should return progressive delays", func() { + testCases := []struct { + attempts int + expected time.Duration + }{ + {0, 15 * time.Second}, + {1, 30 * time.Second}, + {2, 60 * time.Second}, + {3, 120 * time.Second}, + {4, 120 * time.Second}, // Max delay + {10, 120 * time.Second}, // Still max delay + } + + for _, tc := range testCases { + delay := CalculateBackoffDelay(tc.attempts) + Expect(delay).To(Equal(tc.expected), "Attempt %d should have delay %v", tc.attempts, tc.expected) + } + }) + }) + + Describe("Retry count management", func() { + var workPool *prefectiov1.PrefectWorkPool + + BeforeEach(func() { + workPool = &prefectiov1.PrefectWorkPool{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pool", + Namespace: "default", + }, + } + }) + + It("should start with zero retry count", func() { + count := GetRetryCount(workPool) + Expect(count).To(Equal(0)) + }) + + It("should increment retry count", func() { + IncrementRetryCount(workPool) + Expect(GetRetryCount(workPool)).To(Equal(1)) + + IncrementRetryCount(workPool) + Expect(GetRetryCount(workPool)).To(Equal(2)) + }) + + It("should reset retry count", func() { + IncrementRetryCount(workPool) + IncrementRetryCount(workPool) + Expect(GetRetryCount(workPool)).To(Equal(2)) + + ResetRetryCount(workPool) + Expect(GetRetryCount(workPool)).To(Equal(0)) + }) + + It("should detect when to stop retrying", func() { + for i := 0; i < MaxRetryAttempts-1; i++ { + IncrementRetryCount(workPool) + Expect(ShouldStopRetrying(workPool)).To(BeFalse()) + } + + IncrementRetryCount(workPool) + Expect(ShouldStopRetrying(workPool)).To(BeTrue()) + }) + + It("should handle missing annotations gracefully", func() { + workPool.SetAnnotations(nil) + count := GetRetryCount(workPool) + Expect(count).To(Equal(0)) + + IncrementRetryCount(workPool) + Expect(GetRetryCount(workPool)).To(Equal(1)) + }) + + It("should handle invalid annotation values", func() { + workPool.SetAnnotations(map[string]string{ + RetryCountAnnotation: "invalid", + }) + count := GetRetryCount(workPool) + Expect(count).To(Equal(0)) + }) + }) +}) diff --git a/internal/utils/server_health.go b/internal/utils/server_health.go new file mode 100644 index 0000000..339067f --- /dev/null +++ b/internal/utils/server_health.go @@ -0,0 +1,122 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "context" + "fmt" + "net/http" + "time" + + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + + prefectiov1 "github.com/PrefectHQ/prefect-operator/api/v1" +) + +// CheckPrefectServerHealth verifies that a Prefect server is ready to accept API requests +// This works for both in-cluster servers and external servers (including Prefect Cloud) +func CheckPrefectServerHealth(ctx context.Context, serverRef *prefectiov1.PrefectServerReference, k8sClient client.Client, namespace string) (bool, error) { + // For in-cluster servers, check Kubernetes deployment readiness first + if IsInClusterServer(serverRef) { + ready, err := checkInClusterDeploymentReady(ctx, k8sClient, serverRef, namespace) + if err != nil { + return false, fmt.Errorf("failed to check in-cluster deployment: %w", err) + } + if !ready { + return false, nil + } + + // For in-cluster servers, just checking deployment readiness is sufficient + // The existing client creation logic handles port-forwarding complexity + // Trying to do HTTP health checks from outside the cluster is problematic + return true, nil + } + + // For external servers, check API health directly + apiURL := serverRef.GetAPIURL(namespace) + if apiURL == "" { + return false, fmt.Errorf("unable to determine API URL for server") + } + + // Get API key if available for authentication + headers := make(map[string]string) + if apiKey, err := serverRef.GetAPIKey(ctx, k8sClient, namespace); err == nil && apiKey != "" { + headers["Authorization"] = "Bearer " + apiKey + } + + return checkAPIHealth(ctx, apiURL, headers) +} + +// IsInClusterServer determines if the server reference points to an in-cluster server +func IsInClusterServer(serverRef *prefectiov1.PrefectServerReference) bool { + return serverRef.Name != "" && serverRef.RemoteAPIURL == nil +} + +// checkInClusterDeploymentReady verifies that the in-cluster Prefect server deployment is ready +func checkInClusterDeploymentReady(ctx context.Context, k8sClient client.Client, serverRef *prefectiov1.PrefectServerReference, namespace string) (bool, error) { + serverNamespace := serverRef.Namespace + if serverNamespace == "" { + serverNamespace = namespace + } + + deployment := &appsv1.Deployment{} + err := k8sClient.Get(ctx, types.NamespacedName{ + Name: serverRef.Name, + Namespace: serverNamespace, + }, deployment) + if err != nil { + return false, fmt.Errorf("failed to get deployment %s/%s: %w", serverNamespace, serverRef.Name, err) + } + + // Check if deployment has ready replicas + return deployment.Status.ReadyReplicas > 0, nil +} + +// checkAPIHealth performs a lightweight health check against the Prefect API +func checkAPIHealth(ctx context.Context, apiURL string, headers map[string]string) (bool, error) { + client := &http.Client{ + Timeout: 5 * time.Second, + } + + healthURL := apiURL + "/health" + req, err := http.NewRequestWithContext(ctx, "GET", healthURL, nil) + if err != nil { + return false, fmt.Errorf("failed to create health check request: %w", err) + } + + // Add authentication headers if provided + for key, value := range headers { + req.Header.Set(key, value) + } + + resp, err := client.Do(req) + if err != nil { + // For debugging: return the error so we can see what's happening + return false, fmt.Errorf("health check failed for %s: %w", healthURL, err) + } + defer func() { + _ = resp.Body.Close() + }() + + // Prefect health endpoint should return 200 OK when healthy + if resp.StatusCode != 200 { + return false, fmt.Errorf("health check returned status %d for %s", resp.StatusCode, healthURL) + } + return true, nil +} diff --git a/internal/utils/server_health_test.go b/internal/utils/server_health_test.go new file mode 100644 index 0000000..cb664be --- /dev/null +++ b/internal/utils/server_health_test.go @@ -0,0 +1,91 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "context" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "k8s.io/utils/ptr" + + prefectiov1 "github.com/PrefectHQ/prefect-operator/api/v1" +) + +var _ = Describe("Server health utilities", func() { + Describe("IsInClusterServer", func() { + It("should identify in-cluster servers", func() { + serverRef := &prefectiov1.PrefectServerReference{ + Name: "prefect-ephemeral", + Namespace: "default", + } + Expect(IsInClusterServer(serverRef)).To(BeTrue()) + }) + + It("should identify external servers with RemoteAPIURL", func() { + serverRef := &prefectiov1.PrefectServerReference{ + Name: "prefect-ephemeral", + Namespace: "default", + RemoteAPIURL: ptr.To("https://api.prefect.cloud/api/accounts/123/workspaces/456"), + } + Expect(IsInClusterServer(serverRef)).To(BeFalse()) + }) + + It("should identify Prefect Cloud servers", func() { + serverRef := &prefectiov1.PrefectServerReference{ + RemoteAPIURL: ptr.To("https://api.prefect.cloud/api/accounts/123/workspaces/456"), + AccountID: ptr.To("123"), + WorkspaceID: ptr.To("456"), + } + Expect(IsInClusterServer(serverRef)).To(BeFalse()) + }) + + It("should handle empty server reference", func() { + serverRef := &prefectiov1.PrefectServerReference{} + Expect(IsInClusterServer(serverRef)).To(BeFalse()) + }) + }) + + Describe("CheckPrefectServerHealth", func() { + var ctx context.Context + + BeforeEach(func() { + ctx = context.Background() + }) + + It("should handle external server references", func() { + serverRef := &prefectiov1.PrefectServerReference{ + RemoteAPIURL: ptr.To("https://httpbin.org"), + } + + // This will fail because httpbin.org doesn't have /api/health endpoint + // But it tests that we don't crash on external servers + healthy, err := CheckPrefectServerHealth(ctx, serverRef, nil, "default") + Expect(healthy).To(BeFalse()) + Expect(err).To(HaveOccurred()) // Will error because endpoint doesn't exist + }) + + It("should handle servers with no API URL", func() { + serverRef := &prefectiov1.PrefectServerReference{} + + healthy, err := CheckPrefectServerHealth(ctx, serverRef, nil, "default") + Expect(healthy).To(BeFalse()) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("unable to determine API URL")) + }) + }) +})