Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 31 additions & 12 deletions internal/controller/prefectserver_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,15 @@ func (r *PrefectServerReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}
}()

desiredDeployment, desiredPVC, desiredMigrationJob := r.prefectServerDeployment(server)
desiredService := r.prefectServerService(server)
desiredDeployment, desiredPVC, desiredMigrationJob, err := r.prefectServerDeployment(server)
if err != nil {
return ctrl.Result{}, err
}

desiredService, err := r.prefectServerService(server)
if err != nil {
return ctrl.Result{}, err
}

var result *ctrl.Result

Expand Down Expand Up @@ -285,7 +292,7 @@ func isMigrationJobFinished(foundMigrationJob *batchv1.Job) bool {
}
}

func (r *PrefectServerReconciler) prefectServerDeployment(server *prefectiov1.PrefectServer) (appsv1.Deployment, *corev1.PersistentVolumeClaim, *batchv1.Job) {
func (r *PrefectServerReconciler) prefectServerDeployment(server *prefectiov1.PrefectServer) (appsv1.Deployment, *corev1.PersistentVolumeClaim, *batchv1.Job, error) {
var pvc *corev1.PersistentVolumeClaim
var migrationJob *batchv1.Job
var deploymentSpec appsv1.DeploymentSpec
Expand Down Expand Up @@ -318,15 +325,23 @@ func (r *PrefectServerReconciler) prefectServerDeployment(server *prefectiov1.Pr
}

// Set PrefectServer instance as the owner and controller
// TODO: handle errors from SetControllerReference.
_ = ctrl.SetControllerReference(server, dep, r.Scheme)
if r.Scheme == nil {
return appsv1.Deployment{}, nil, nil, fmt.Errorf("scheme is nil, cannot set controller reference for deployment")
}
if err := ctrl.SetControllerReference(server, dep, r.Scheme); err != nil {
return appsv1.Deployment{}, nil, nil, fmt.Errorf("failed to set controller reference for deployment: %w", err)
}
if pvc != nil {
_ = ctrl.SetControllerReference(server, pvc, r.Scheme)
if err := ctrl.SetControllerReference(server, pvc, r.Scheme); err != nil {
return appsv1.Deployment{}, nil, nil, fmt.Errorf("failed to set controller reference for PVC: %w", err)
}
}
if migrationJob != nil {
_ = ctrl.SetControllerReference(server, migrationJob, r.Scheme)
if err := ctrl.SetControllerReference(server, migrationJob, r.Scheme); err != nil {
return appsv1.Deployment{}, nil, nil, fmt.Errorf("failed to set controller reference for migration job: %w", err)
}
}
return *dep, pvc, migrationJob
return *dep, pvc, migrationJob, nil
}

func (r *PrefectServerReconciler) ephemeralDeploymentSpec(server *prefectiov1.PrefectServer) appsv1.DeploymentSpec {
Expand Down Expand Up @@ -561,7 +576,7 @@ func (r *PrefectServerReconciler) postgresMigrationJob(server *prefectiov1.Prefe
}
}

func (r *PrefectServerReconciler) prefectServerService(server *prefectiov1.PrefectServer) corev1.Service {
func (r *PrefectServerReconciler) prefectServerService(server *prefectiov1.PrefectServer) (corev1.Service, error) {
service := corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: server.Namespace,
Expand All @@ -585,10 +600,14 @@ func (r *PrefectServerReconciler) prefectServerService(server *prefectiov1.Prefe
service.Spec.Ports = append(service.Spec.Ports, server.Spec.ExtraServicePorts...)
}

// TODO: handle errors from SetControllerReference.
_ = ctrl.SetControllerReference(server, &service, r.Scheme)
if r.Scheme == nil {
return corev1.Service{}, fmt.Errorf("scheme is nil, cannot set controller reference for service")
}
if err := ctrl.SetControllerReference(server, &service, r.Scheme); err != nil {
return corev1.Service{}, fmt.Errorf("failed to set controller reference for service: %w", err)
}

return service
return service, nil
}

func (r *PrefectServerReconciler) initContainerWaitForPostgres(server *prefectiov1.PrefectServer) corev1.Container {
Expand Down
128 changes: 122 additions & 6 deletions internal/controller/prefectserver_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1258,7 +1258,7 @@ var _ = Describe("PrefectServer controller", func() {
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient.Get(ctx, name, prefectserver)).To(Succeed())

_, _, desiredMigrationJob := controllerReconciler.prefectServerDeployment(prefectserver)
_, _, desiredMigrationJob, _ := controllerReconciler.prefectServerDeployment(prefectserver)
migrateJob = &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Expand Down Expand Up @@ -1447,7 +1447,7 @@ var _ = Describe("PrefectServer controller", func() {
})
Expect(err).NotTo(HaveOccurred())

_, _, desiredMigrationJob := controllerReconciler.prefectServerDeployment(prefectserver)
_, _, desiredMigrationJob, _ := controllerReconciler.prefectServerDeployment(prefectserver)
migrateJob = &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Expand Down Expand Up @@ -1558,7 +1558,7 @@ var _ = Describe("PrefectServer controller", func() {
Expect(k8sClient.Get(ctx, name, prefectserver)).To(Succeed())

// Set the first migration Job to be complete
_, _, desiredMigrationJob := controllerReconciler.prefectServerDeployment(prefectserver)
_, _, desiredMigrationJob, _ := controllerReconciler.prefectServerDeployment(prefectserver)
migrateJob := &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Expand All @@ -1581,7 +1581,7 @@ var _ = Describe("PrefectServer controller", func() {
Expect(err).NotTo(HaveOccurred())

// Set the second migration Job to be complete
_, _, desiredMigrationJob = controllerReconciler.prefectServerDeployment(prefectserver)
_, _, desiredMigrationJob, _ = controllerReconciler.prefectServerDeployment(prefectserver)
migrateJob = &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Expand Down Expand Up @@ -1611,7 +1611,7 @@ var _ = Describe("PrefectServer controller", func() {
})

It("should create new migration Job with the new setting", func() {
_, _, desiredMigrationJob := controllerReconciler.prefectServerDeployment(prefectserver)
_, _, desiredMigrationJob, _ := controllerReconciler.prefectServerDeployment(prefectserver)
job := &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Expand All @@ -1628,7 +1628,7 @@ var _ = Describe("PrefectServer controller", func() {
})

It("should do nothing if an active migration Job already exists", func() {
_, _, desiredMigrationJob := controllerReconciler.prefectServerDeployment(prefectserver)
_, _, desiredMigrationJob, _ := controllerReconciler.prefectServerDeployment(prefectserver)
migrateJob := &batchv1.Job{}
Eventually(func() error {
return k8sClient.Get(ctx, types.NamespacedName{
Expand Down Expand Up @@ -1831,4 +1831,120 @@ var _ = Describe("PrefectServer controller", func() {
Expect(updatedPrefectServer.Status.Ready).To(Equal(true))
})
})

Context("error handling scenarios", func() {
var (
ctx context.Context
namespace *corev1.Namespace
namespaceName string
name types.NamespacedName
prefectserver *prefectiov1.PrefectServer
)

BeforeEach(func() {
ctx = context.Background()
namespaceName = fmt.Sprintf("error-ns-%d", time.Now().UnixNano())

namespace = &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{Name: namespaceName},
}
Expect(k8sClient.Create(ctx, namespace)).To(Succeed())

name = types.NamespacedName{
Namespace: namespaceName,
Name: "prefect-server-test",
}

prefectserver = &prefectiov1.PrefectServer{
ObjectMeta: metav1.ObjectMeta{
Name: name.Name,
Namespace: namespaceName,
},
Spec: prefectiov1.PrefectServerSpec{
Image: ptr.To("prefecthq/prefect:2.11.0-python3.11"),
},
}
})

AfterEach(func() {
Expect(k8sClient.Delete(ctx, namespace)).To(Succeed())
})

It("should handle SetControllerReference errors when creating deployment specs", func() {
// Create a server without a proper scheme (simulates SetControllerReference error)
badReconciler := &PrefectServerReconciler{
Client: k8sClient,
Scheme: nil, // This will cause SetControllerReference to fail
}

// This test verifies that when the helper functions are called with an invalid scheme,
// errors are now properly handled and returned
_, _, _, err := badReconciler.prefectServerDeployment(prefectserver)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("scheme is nil, cannot set controller reference"))

// Test service creation with invalid scheme
_, err = badReconciler.prefectServerService(prefectserver)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("scheme is nil, cannot set controller reference"))
})

It("should handle SetControllerReference errors in PostgreSQL configurations", func() {
prefectserver.Spec.Postgres = &prefectiov1.PostgresConfiguration{
Host: ptr.To("localhost"),
Port: ptr.To(5432),
User: ptr.To("prefect"),
Password: ptr.To("password"),
Database: ptr.To("prefect"),
}

badReconciler := &PrefectServerReconciler{
Client: k8sClient,
Scheme: nil, // This will cause SetControllerReference to fail
}

_, _, _, err := badReconciler.prefectServerDeployment(prefectserver)

// Verify that SetControllerReference error is now properly handled
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("scheme is nil, cannot set controller reference"))
})

It("should handle SetControllerReference errors in SQLite configurations", func() {
prefectserver.Spec.SQLite = &prefectiov1.SQLiteConfiguration{
StorageClassName: "standard",
Size: resource.MustParse("1Gi"),
}

badReconciler := &PrefectServerReconciler{
Client: k8sClient,
Scheme: nil, // This will cause SetControllerReference to fail
}

_, _, _, err := badReconciler.prefectServerDeployment(prefectserver)

// Verify that SetControllerReference error is now properly handled
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("scheme is nil, cannot set controller reference"))
})

It("should demonstrate proper error propagation from helper functions", func() {
// Create server and attempt reconciliation
Expect(k8sClient.Create(ctx, prefectserver)).To(Succeed())

// Create a reconciler with invalid scheme to trigger SetControllerReference errors
badReconciler := &PrefectServerReconciler{
Client: k8sClient,
Scheme: nil,
}

// Now this reconciliation should fail early when the helper functions encounter
// SetControllerReference errors, rather than panicking later
_, err := badReconciler.Reconcile(ctx, reconcile.Request{NamespacedName: name})

// The error should be properly propagated from the helper functions
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("scheme is nil, cannot set controller reference"))
})
})
})