diff --git a/api/v1/prefectdeployment_types.go b/api/v1/prefectdeployment_types.go index d2c04c2..db53f71 100644 --- a/api/v1/prefectdeployment_types.go +++ b/api/v1/prefectdeployment_types.go @@ -17,6 +17,9 @@ limitations under the License. package v1 import ( + "fmt" + "strings" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -258,6 +261,18 @@ type PrefectDeploymentList struct { Items []PrefectDeployment `json:"items"` } +// Here for future validation hooks. +func (deployment *PrefectDeployment) Validate() error { + entryPoint := deployment.Spec.Deployment.Entrypoint + + idx := strings.Index(entryPoint, ":") + if idx == -1 { + return fmt.Errorf("invalid entrypoint format (missing ':'): %s", entryPoint) + } + + return nil +} + func init() { SchemeBuilder.Register(&PrefectDeployment{}, &PrefectDeploymentList{}) } diff --git a/internal/prefect/convert.go b/internal/prefect/convert.go index dae9936..ace75d4 100644 --- a/internal/prefect/convert.go +++ b/internal/prefect/convert.go @@ -20,6 +20,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "time" prefectiov1 "github.com/PrefectHQ/prefect-operator/api/v1" @@ -179,8 +180,17 @@ func UpdateDeploymentStatus(k8sDeployment *prefectiov1.PrefectDeployment, prefec // GetFlowIDFromDeployment extracts or generates a flow ID for the deployment func GetFlowIDFromDeployment(ctx context.Context, client PrefectClient, k8sDeployment *prefectiov1.PrefectDeployment) (string, error) { + entryPoint := k8sDeployment.Spec.Deployment.Entrypoint + + idx := strings.Index(entryPoint, ":") + if idx == -1 { + return "", fmt.Errorf("invalid entrypoint format (missing ':'): %s", entryPoint) + } + + flowName := entryPoint[idx+1:] + flowSpec := &FlowSpec{ - Name: k8sDeployment.Name, + Name: flowName, Tags: k8sDeployment.Spec.Deployment.Tags, Labels: k8sDeployment.Spec.Deployment.Labels, } diff --git a/internal/prefect/convert_test.go b/internal/prefect/convert_test.go index 88b643b..3a4218d 100644 --- a/internal/prefect/convert_test.go +++ b/internal/prefect/convert_test.go @@ -716,8 +716,9 @@ var _ = Describe("GetFlowIDFromDeployment", func() { }, Spec: prefectiov1.PrefectDeploymentSpec{ Deployment: prefectiov1.PrefectDeploymentConfiguration{ - Tags: []string{"test", "deployment"}, - Labels: map[string]string{"env": "test"}, + Entrypoint: "foo.py:test_flow", + Tags: []string{"test", "deployment"}, + Labels: map[string]string{"env": "test"}, }, }, } @@ -730,6 +731,26 @@ var _ = Describe("GetFlowIDFromDeployment", func() { Expect(flowID).NotTo(BeEmpty()) }) + It("Should name flows correctly", func() { + By("creating a deployment") + flowID, err := GetFlowIDFromDeployment(ctx, mockClient, k8sDeployment) + + Expect(err).NotTo(HaveOccurred()) + Expect(flowID).NotTo(BeEmpty()) + + By("fetching matching flow by name") + flowSpec := &FlowSpec{ + Name: "test_flow", + } + + flow, err := mockClient.CreateOrGetFlow(ctx, flowSpec) + Expect(err).NotTo(HaveOccurred()) + Expect(flow).NotTo(BeNil()) + + By("comparing to known flowID") + Expect(flow.ID).To(Equal(flowID)) + }) + It("Should handle flow creation error", func() { mockClient.ShouldFailFlowCreate = true mockClient.FailureMessage = "mock flow error" @@ -741,4 +762,12 @@ var _ = Describe("GetFlowIDFromDeployment", func() { Expect(err.Error()).To(ContainSubstring("mock flow error")) Expect(flowID).To(BeEmpty()) }) + + It("should handle invalid entrypoint names", func() { + k8sDeployment.Spec.Deployment.Entrypoint = "foo_test.py" + flowID, err := GetFlowIDFromDeployment(ctx, mockClient, k8sDeployment) + + Expect(err).To(HaveOccurred()) + Expect(flowID).To(BeEmpty()) + }) })