diff --git a/AGENTS.md b/AGENTS.md index c095ca6..67187cf 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -101,6 +101,60 @@ that for many testing scenarios, we can't assume that the operator is running in the same cluster it is operating on. Use port-forwarding where appropriate to support these cases. +### Local Testing + +For local testing and development, first start a local Kubernetes cluster: + +```bash +minikube start # Start local Kubernetes cluster (if minikube is available) +``` + +Then run the operator locally: + +```bash +make install run # Install CRDs and run operator locally (long-running process) +``` + +This command should be run in a background shell as it's a long-lived process that watches and reconciles Prefect resources. The operator will: +- Install/update CRDs to the cluster +- Start the controller manager locally +- Watch for changes to PrefectServer, PrefectWorkPool, and PrefectDeployment resources +- Use port-forwarding to connect to in-cluster Prefect servers when needed + +#### Testing with Sample Resources + +Apply sample resources from `deploy/samples/` to test different scenarios: + +```bash +# Apply a complete end-to-end example with all schedule types +kubectl apply -f deploy/samples/deployment_end-to-end.yaml + +# Or test individual components +kubectl apply -f deploy/samples/v1_prefectserver_ephemeral.yaml +kubectl apply -f deploy/samples/v1_prefectworkpool_kubernetes.yaml + +# List available sample configurations +ls deploy/samples/ +``` + +#### Accessing the Prefect API + +When testing with in-cluster Prefect servers, you can port-forward to access the API directly: + +```bash +kubectl port-forward svc/prefect-ephemeral 4200:4200 +# Prefect API now available at http://localhost:4200/api +``` + +This allows you to inspect deployments, schedules, and other resources created by the operator: +```bash +# View all deployments +curl -X POST http://localhost:4200/api/deployments/filter -H "Content-Type: application/json" -d '{}' + +# View deployment schedules +curl -X POST http://localhost:4200/api/deployments/filter -H "Content-Type: application/json" -d '{}' | jq '.[] | {name, schedules}' +``` + ### Code Generation Workflow The operator uses controller-gen for: diff --git a/api/v1/prefectdeployment_types.go b/api/v1/prefectdeployment_types.go index 0111971..d2c04c2 100644 --- a/api/v1/prefectdeployment_types.go +++ b/api/v1/prefectdeployment_types.go @@ -120,36 +120,69 @@ type PrefectVersionInfo struct { Version *string `json:"version,omitempty"` } -// PrefectSchedule defines a schedule for the deployment +// PrefectSchedule defines a schedule for the deployment. +// This structure exactly matches Prefect's prefect.yaml and API format. +// Exactly one of Interval, Cron, or RRule must be specified. type PrefectSchedule struct { // Slug is a unique identifier for the schedule + // Maps to: DeploymentScheduleCreate.slug (string) Slug string `json:"slug"` - // Schedule defines the schedule configuration - Schedule PrefectScheduleConfig `json:"schedule"` -} + // === INTERVAL SCHEDULE FIELDS === + // Maps to: IntervalSchedule schema in Prefect API -// PrefectScheduleConfig defines schedule timing configuration -type PrefectScheduleConfig struct { - // Interval is the schedule interval in seconds + // Interval is the schedule interval in seconds (required for interval schedules) + // Maps to: IntervalSchedule.interval (number, required) // +optional Interval *int `json:"interval,omitempty"` - // AnchorDate is the anchor date for the schedule + // AnchorDate is the anchor date for interval schedules in RFC3339 format + // Maps to: IntervalSchedule.anchor_date (string, format: date-time) + // Example: "2024-01-01T00:00:00Z" + // +optional + AnchorDate *string `json:"anchor_date,omitempty"` + + // === CRON SCHEDULE FIELDS === + // Maps to: CronSchedule schema in Prefect API + + // Cron is a valid cron expression (required for cron schedules) + // Maps to: CronSchedule.cron (string, required) + // Examples: "0 9 * * *" (daily at 9am), "*/5 * * * *" (every 5 minutes) + // +optional + Cron *string `json:"cron,omitempty"` + + // DayOr controls how croniter handles day and day_of_week entries + // Maps to: CronSchedule.day_or (boolean, default: true) + // true = OR logic (standard cron), false = AND logic (like fcron) // +optional - AnchorDate *string `json:"anchorDate,omitempty"` + DayOr *bool `json:"day_or,omitempty"` + + // === RRULE SCHEDULE FIELDS === + // Maps to: RRuleSchedule schema in Prefect API + + // RRule is a valid RFC 5545 RRULE string (required for rrule schedules) + // Maps to: RRuleSchedule.rrule (string, required) + // Examples: "RRULE:FREQ=WEEKLY;BYDAY=MO", "RRULE:FREQ=MONTHLY;BYDAY=1FR" + // +optional + RRule *string `json:"rrule,omitempty"` + + // === COMMON FIELDS (shared across all schedule types) === - // Timezone for the schedule + // Timezone for the schedule (IANA timezone string) + // Maps to: IntervalSchedule.timezone, CronSchedule.timezone, RRuleSchedule.timezone + // Examples: "America/New_York", "UTC", "Europe/London" // +optional Timezone *string `json:"timezone,omitempty"` // Active indicates if the schedule is active + // Maps to: DeploymentScheduleCreate.active (boolean, default: true) // +optional Active *bool `json:"active,omitempty"` // MaxScheduledRuns limits the number of scheduled runs + // Maps to: DeploymentScheduleCreate.max_scheduled_runs (integer > 0) // +optional - MaxScheduledRuns *int `json:"maxScheduledRuns,omitempty"` + MaxScheduledRuns *int `json:"max_scheduled_runs,omitempty"` } // PrefectGlobalConcurrencyLimit defines global concurrency limit configuration diff --git a/api/v1/prefectdeployment_types_test.go b/api/v1/prefectdeployment_types_test.go index c362618..2ac87bd 100644 --- a/api/v1/prefectdeployment_types_test.go +++ b/api/v1/prefectdeployment_types_test.go @@ -167,37 +167,179 @@ var _ = Describe("PrefectDeployment type", func() { Expect(deploymentConfig.Paused).To(Equal(ptr.To(false))) }) - It("should support deployment with schedules", func() { + It("should support deployment with legacy nested schedules", func() { + // This test verifies backward compatibility - we'll remove this once we migrate deploymentConfig := PrefectDeploymentConfiguration{ Entrypoint: "flows.py:my_flow", Schedules: []PrefectSchedule{ { - Slug: "daily-schedule", - Schedule: PrefectScheduleConfig{ - Interval: ptr.To(86400), // 24 hours in seconds - AnchorDate: ptr.To("2024-01-01T00:00:00Z"), - Timezone: ptr.To("UTC"), - Active: ptr.To(true), - MaxScheduledRuns: ptr.To(10), - }, + Slug: "daily-schedule", + Interval: ptr.To(86400), // 24 hours in seconds + AnchorDate: ptr.To("2024-01-01T00:00:00Z"), + Timezone: ptr.To("UTC"), + Active: ptr.To(true), + MaxScheduledRuns: ptr.To(10), }, { - Slug: "hourly-schedule", - Schedule: PrefectScheduleConfig{ - Interval: ptr.To(3600), // 1 hour in seconds - AnchorDate: ptr.To("2024-01-01T00:00:00Z"), - Timezone: ptr.To("UTC"), - Active: ptr.To(false), - }, + Slug: "hourly-schedule", + Interval: ptr.To(3600), // 1 hour in seconds + AnchorDate: ptr.To("2024-01-01T00:00:00Z"), + Timezone: ptr.To("UTC"), + Active: ptr.To(false), }, }, } Expect(deploymentConfig.Schedules).To(HaveLen(2)) Expect(deploymentConfig.Schedules[0].Slug).To(Equal("daily-schedule")) - Expect(deploymentConfig.Schedules[0].Schedule.Interval).To(Equal(ptr.To(86400))) + Expect(deploymentConfig.Schedules[0].Interval).To(Equal(ptr.To(86400))) Expect(deploymentConfig.Schedules[1].Slug).To(Equal("hourly-schedule")) - Expect(deploymentConfig.Schedules[1].Schedule.Active).To(Equal(ptr.To(false))) + Expect(deploymentConfig.Schedules[1].Active).To(Equal(ptr.To(false))) + }) + + It("should support deployment with flattened interval schedules", func() { + deploymentConfig := PrefectDeploymentConfiguration{ + Entrypoint: "flows.py:my_flow", + Schedules: []PrefectSchedule{ + { + Slug: "daily-interval", + Interval: ptr.To(86400), // 24 hours in seconds + AnchorDate: ptr.To("2024-01-01T00:00:00Z"), + Timezone: ptr.To("UTC"), + Active: ptr.To(true), + MaxScheduledRuns: ptr.To(10), + }, + { + Slug: "hourly-interval", + Interval: ptr.To(3600), // 1 hour in seconds + AnchorDate: ptr.To("2024-01-01T00:00:00Z"), + Timezone: ptr.To("UTC"), + Active: ptr.To(false), + }, + }, + } + + Expect(deploymentConfig.Schedules).To(HaveLen(2)) + Expect(deploymentConfig.Schedules[0].Slug).To(Equal("daily-interval")) + Expect(deploymentConfig.Schedules[0].Interval).To(Equal(ptr.To(86400))) + Expect(deploymentConfig.Schedules[0].AnchorDate).To(Equal(ptr.To("2024-01-01T00:00:00Z"))) + Expect(deploymentConfig.Schedules[0].Timezone).To(Equal(ptr.To("UTC"))) + Expect(deploymentConfig.Schedules[0].Active).To(Equal(ptr.To(true))) + Expect(deploymentConfig.Schedules[0].MaxScheduledRuns).To(Equal(ptr.To(10))) + + Expect(deploymentConfig.Schedules[1].Slug).To(Equal("hourly-interval")) + Expect(deploymentConfig.Schedules[1].Interval).To(Equal(ptr.To(3600))) + Expect(deploymentConfig.Schedules[1].Active).To(Equal(ptr.To(false))) + }) + + It("should support deployment with cron schedules", func() { + deploymentConfig := PrefectDeploymentConfiguration{ + Entrypoint: "flows.py:my_flow", + Schedules: []PrefectSchedule{ + { + Slug: "daily-9am", + Cron: ptr.To("0 9 * * *"), + DayOr: ptr.To(true), + Timezone: ptr.To("America/New_York"), + Active: ptr.To(true), + }, + { + Slug: "every-5-minutes", + Cron: ptr.To("*/5 * * * *"), + Timezone: ptr.To("UTC"), + Active: ptr.To(true), + MaxScheduledRuns: ptr.To(100), + }, + }, + } + + Expect(deploymentConfig.Schedules).To(HaveLen(2)) + Expect(deploymentConfig.Schedules[0].Slug).To(Equal("daily-9am")) + Expect(deploymentConfig.Schedules[0].Cron).To(Equal(ptr.To("0 9 * * *"))) + Expect(deploymentConfig.Schedules[0].DayOr).To(Equal(ptr.To(true))) + Expect(deploymentConfig.Schedules[0].Timezone).To(Equal(ptr.To("America/New_York"))) + Expect(deploymentConfig.Schedules[0].Active).To(Equal(ptr.To(true))) + + Expect(deploymentConfig.Schedules[1].Slug).To(Equal("every-5-minutes")) + Expect(deploymentConfig.Schedules[1].Cron).To(Equal(ptr.To("*/5 * * * *"))) + Expect(deploymentConfig.Schedules[1].Timezone).To(Equal(ptr.To("UTC"))) + Expect(deploymentConfig.Schedules[1].MaxScheduledRuns).To(Equal(ptr.To(100))) + }) + + It("should support deployment with rrule schedules", func() { + deploymentConfig := PrefectDeploymentConfiguration{ + Entrypoint: "flows.py:my_flow", + Schedules: []PrefectSchedule{ + { + Slug: "weekly-monday", + RRule: ptr.To("RRULE:FREQ=WEEKLY;BYDAY=MO"), + Timezone: ptr.To("UTC"), + Active: ptr.To(true), + }, + { + Slug: "monthly-first-friday", + RRule: ptr.To("RRULE:FREQ=MONTHLY;BYDAY=1FR"), + Timezone: ptr.To("America/Los_Angeles"), + Active: ptr.To(true), + MaxScheduledRuns: ptr.To(12), + }, + }, + } + + Expect(deploymentConfig.Schedules).To(HaveLen(2)) + Expect(deploymentConfig.Schedules[0].Slug).To(Equal("weekly-monday")) + Expect(deploymentConfig.Schedules[0].RRule).To(Equal(ptr.To("RRULE:FREQ=WEEKLY;BYDAY=MO"))) + Expect(deploymentConfig.Schedules[0].Timezone).To(Equal(ptr.To("UTC"))) + Expect(deploymentConfig.Schedules[0].Active).To(Equal(ptr.To(true))) + + Expect(deploymentConfig.Schedules[1].Slug).To(Equal("monthly-first-friday")) + Expect(deploymentConfig.Schedules[1].RRule).To(Equal(ptr.To("RRULE:FREQ=MONTHLY;BYDAY=1FR"))) + Expect(deploymentConfig.Schedules[1].Timezone).To(Equal(ptr.To("America/Los_Angeles"))) + Expect(deploymentConfig.Schedules[1].MaxScheduledRuns).To(Equal(ptr.To(12))) + }) + + It("should support deployment with mixed schedule types", func() { + deploymentConfig := PrefectDeploymentConfiguration{ + Entrypoint: "flows.py:my_flow", + Schedules: []PrefectSchedule{ + { + Slug: "hourly-interval", + Interval: ptr.To(3600), + AnchorDate: ptr.To("2024-01-01T00:00:00Z"), + Timezone: ptr.To("UTC"), + Active: ptr.To(true), + }, + { + Slug: "daily-cron", + Cron: ptr.To("0 9 * * *"), + Timezone: ptr.To("America/New_York"), + Active: ptr.To(true), + }, + { + Slug: "weekly-rrule", + RRule: ptr.To("RRULE:FREQ=WEEKLY;BYDAY=MO,WE,FR"), + Timezone: ptr.To("Europe/London"), + Active: ptr.To(true), + }, + }, + } + + Expect(deploymentConfig.Schedules).To(HaveLen(3)) + + // Interval schedule + Expect(deploymentConfig.Schedules[0].Interval).To(Equal(ptr.To(3600))) + Expect(deploymentConfig.Schedules[0].Cron).To(BeNil()) + Expect(deploymentConfig.Schedules[0].RRule).To(BeNil()) + + // Cron schedule + Expect(deploymentConfig.Schedules[1].Cron).To(Equal(ptr.To("0 9 * * *"))) + Expect(deploymentConfig.Schedules[1].Interval).To(BeNil()) + Expect(deploymentConfig.Schedules[1].RRule).To(BeNil()) + + // RRule schedule + Expect(deploymentConfig.Schedules[2].RRule).To(Equal(ptr.To("RRULE:FREQ=WEEKLY;BYDAY=MO,WE,FR"))) + Expect(deploymentConfig.Schedules[2].Interval).To(BeNil()) + Expect(deploymentConfig.Schedules[2].Cron).To(BeNil()) }) It("should support deployment with concurrency limits", func() { diff --git a/api/v1/zz_generated.deepcopy.go b/api/v1/zz_generated.deepcopy.go index e1d700a..5cb5895 100644 --- a/api/v1/zz_generated.deepcopy.go +++ b/api/v1/zz_generated.deepcopy.go @@ -394,22 +394,6 @@ func (in *PrefectGlobalConcurrencyLimit) DeepCopy() *PrefectGlobalConcurrencyLim // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PrefectSchedule) DeepCopyInto(out *PrefectSchedule) { - *out = *in - in.Schedule.DeepCopyInto(&out.Schedule) -} - -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PrefectSchedule. -func (in *PrefectSchedule) DeepCopy() *PrefectSchedule { - if in == nil { - return nil - } - out := new(PrefectSchedule) - in.DeepCopyInto(out) - return out -} - -// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. -func (in *PrefectScheduleConfig) DeepCopyInto(out *PrefectScheduleConfig) { *out = *in if in.Interval != nil { in, out := &in.Interval, &out.Interval @@ -421,6 +405,21 @@ func (in *PrefectScheduleConfig) DeepCopyInto(out *PrefectScheduleConfig) { *out = new(string) **out = **in } + if in.Cron != nil { + in, out := &in.Cron, &out.Cron + *out = new(string) + **out = **in + } + if in.DayOr != nil { + in, out := &in.DayOr, &out.DayOr + *out = new(bool) + **out = **in + } + if in.RRule != nil { + in, out := &in.RRule, &out.RRule + *out = new(string) + **out = **in + } if in.Timezone != nil { in, out := &in.Timezone, &out.Timezone *out = new(string) @@ -438,12 +437,12 @@ func (in *PrefectScheduleConfig) DeepCopyInto(out *PrefectScheduleConfig) { } } -// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PrefectScheduleConfig. -func (in *PrefectScheduleConfig) DeepCopy() *PrefectScheduleConfig { +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PrefectSchedule. +func (in *PrefectSchedule) DeepCopy() *PrefectSchedule { if in == nil { return nil } - out := new(PrefectScheduleConfig) + out := new(PrefectSchedule) in.DeepCopyInto(out) return out } diff --git a/deploy/charts/prefect-operator/crds/prefect.io_prefectdeployments.yaml b/deploy/charts/prefect-operator/crds/prefect.io_prefectdeployments.yaml index aaa27b1..73c12d2 100644 --- a/deploy/charts/prefect-operator/crds/prefect.io_prefectdeployments.yaml +++ b/deploy/charts/prefect-operator/crds/prefect.io_prefectdeployments.yaml @@ -129,33 +129,62 @@ spec: schedules: description: Schedules defines when the deployment should run items: - description: PrefectSchedule defines a schedule for the deployment + description: |- + PrefectSchedule defines a schedule for the deployment. + This structure exactly matches Prefect's prefect.yaml and API format. + Exactly one of Interval, Cron, or RRule must be specified. properties: - schedule: - description: Schedule defines the schedule configuration - properties: - active: - description: Active indicates if the schedule is active - type: boolean - anchorDate: - description: AnchorDate is the anchor date for the schedule - type: string - interval: - description: Interval is the schedule interval in seconds - type: integer - maxScheduledRuns: - description: MaxScheduledRuns limits the number of scheduled - runs - type: integer - timezone: - description: Timezone for the schedule - type: string - type: object + active: + description: |- + Active indicates if the schedule is active + Maps to: DeploymentScheduleCreate.active (boolean, default: true) + type: boolean + anchor_date: + description: |- + AnchorDate is the anchor date for interval schedules in RFC3339 format + Maps to: IntervalSchedule.anchor_date (string, format: date-time) + Example: "2024-01-01T00:00:00Z" + type: string + cron: + description: |- + Cron is a valid cron expression (required for cron schedules) + Maps to: CronSchedule.cron (string, required) + Examples: "0 9 * * *" (daily at 9am), "*/5 * * * *" (every 5 minutes) + type: string + day_or: + description: |- + DayOr controls how croniter handles day and day_of_week entries + Maps to: CronSchedule.day_or (boolean, default: true) + true = OR logic (standard cron), false = AND logic (like fcron) + type: boolean + interval: + description: |- + Interval is the schedule interval in seconds (required for interval schedules) + Maps to: IntervalSchedule.interval (number, required) + type: integer + max_scheduled_runs: + description: |- + MaxScheduledRuns limits the number of scheduled runs + Maps to: DeploymentScheduleCreate.max_scheduled_runs (integer > 0) + type: integer + rrule: + description: |- + RRule is a valid RFC 5545 RRULE string (required for rrule schedules) + Maps to: RRuleSchedule.rrule (string, required) + Examples: "RRULE:FREQ=WEEKLY;BYDAY=MO", "RRULE:FREQ=MONTHLY;BYDAY=1FR" + type: string slug: - description: Slug is a unique identifier for the schedule + description: |- + Slug is a unique identifier for the schedule + Maps to: DeploymentScheduleCreate.slug (string) + type: string + timezone: + description: |- + Timezone for the schedule (IANA timezone string) + Maps to: IntervalSchedule.timezone, CronSchedule.timezone, RRuleSchedule.timezone + Examples: "America/New_York", "UTC", "Europe/London" type: string required: - - schedule - slug type: object type: array diff --git a/deploy/samples/deployment_end-to-end.yaml b/deploy/samples/deployment_end-to-end.yaml index 780fadd..26ded6d 100644 --- a/deploy/samples/deployment_end-to-end.yaml +++ b/deploy/samples/deployment_end-to-end.yaml @@ -44,3 +44,21 @@ spec: type: string required: [] definitions: {} + schedules: + # Interval schedule - every minute (60 seconds) + - slug: "every-minute-interval" + interval: 60 + timezone: "UTC" + active: true + + # Cron schedule - every minute using cron syntax + - slug: "every-minute-cron" + cron: "* * * * *" + timezone: "UTC" + active: true + + # RRule schedule - every minute using RFC 5545 RRULE + - slug: "every-minute-rrule" + rrule: "RRULE:FREQ=MINUTELY;INTERVAL=1" + timezone: "UTC" + active: true diff --git a/internal/controller/prefectdeployment_controller_test.go b/internal/controller/prefectdeployment_controller_test.go index e0c2c82..54ed5cf 100644 --- a/internal/controller/prefectdeployment_controller_test.go +++ b/internal/controller/prefectdeployment_controller_test.go @@ -754,11 +754,9 @@ var _ = Describe("PrefectDeployment controller", func() { Entrypoint: "flows.py:my_flow", Schedules: []prefectiov1.PrefectSchedule{ { - Slug: "invalid-schedule", - Schedule: prefectiov1.PrefectScheduleConfig{ - Interval: ptr.To(3600), - AnchorDate: ptr.To("invalid-date-format"), - }, + Slug: "invalid-schedule", + Interval: ptr.To(3600), + AnchorDate: ptr.To("invalid-date-format"), }, }, }, @@ -785,7 +783,7 @@ var _ = Describe("PrefectDeployment controller", func() { }) // Should fail due to invalid anchor date format Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("failed to parse anchor date")) + Expect(err.Error()).To(ContainSubstring("failed to parse anchor_date for interval schedule")) Expect(result.RequeueAfter).To(Equal(time.Duration(0))) }) diff --git a/internal/prefect/client.go b/internal/prefect/client.go index d5026ea..6db7b22 100644 --- a/internal/prefect/client.go +++ b/internal/prefect/client.go @@ -32,6 +32,11 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) +// isSuccessStatusCode returns true if the HTTP status code indicates success (2xx range) +func isSuccessStatusCode(statusCode int) bool { + return 200 <= statusCode && statusCode < 300 +} + // PrefectClient defines the interface for interacting with the Prefect API type PrefectClient interface { // CreateOrUpdateDeployment creates a new deployment or updates an existing one @@ -161,7 +166,7 @@ type DeploymentSpec struct { WorkQueueName *string `json:"work_queue_name,omitempty"` WorkPoolName *string `json:"work_pool_name,omitempty"` Paused *bool `json:"paused,omitempty"` - Schedules []Schedule `json:"schedules,omitempty"` + Schedules []DeploymentSchedule `json:"schedules,omitempty"` ConcurrencyLimit *int `json:"concurrency_limit,omitempty"` GlobalConcurrencyLimits []string `json:"global_concurrency_limits,omitempty"` Entrypoint *string `json:"entrypoint,omitempty"` @@ -187,7 +192,7 @@ type Deployment struct { WorkQueueName *string `json:"work_queue_name"` WorkPoolName *string `json:"work_pool_name"` Status string `json:"status"` - Schedules []Schedule `json:"schedules"` + Schedules []DeploymentSchedule `json:"schedules"` ConcurrencyLimit *int `json:"concurrency_limit"` GlobalConcurrencyLimits []string `json:"global_concurrency_limits"` Entrypoint *string `json:"entrypoint"` @@ -197,14 +202,56 @@ type Deployment struct { EnforceParameterSchema bool `json:"enforce_parameter_schema"` } -// Schedule represents a deployment schedule +// Schedule represents a Prefect deployment schedule. +// Supports interval, cron, and rrule schedule types. +// Exactly one of Interval, Cron, or RRule should be specified. type Schedule struct { - ID string `json:"id,omitempty"` - Interval *int `json:"interval,omitempty"` - AnchorDate *time.Time `json:"anchor_date,omitempty"` - Timezone *string `json:"timezone,omitempty"` - Active *bool `json:"active,omitempty"` - MaxScheduledRuns *int `json:"max_scheduled_runs,omitempty"` + ID string `json:"id,omitempty"` + + // === INTERVAL SCHEDULE FIELDS === + // Maps to: IntervalSchedule schema in Prefect API + Interval *float64 `json:"interval,omitempty"` // seconds (required for interval) + AnchorDate *time.Time `json:"anchor_date,omitempty"` // anchor date for interval schedules + + // === CRON SCHEDULE FIELDS === + // Maps to: CronSchedule schema in Prefect API + Cron *string `json:"cron,omitempty"` // cron expression (required for cron) + DayOr *bool `json:"day_or,omitempty"` // day/day_of_week connection logic + + // === RRULE SCHEDULE FIELDS === + // Maps to: RRuleSchedule schema in Prefect API + RRule *string `json:"rrule,omitempty"` // RFC 5545 RRULE string (required for rrule) + + // === COMMON FIELDS === + // Shared across all schedule types + Timezone *string `json:"timezone,omitempty"` + Active *bool `json:"active,omitempty"` + MaxScheduledRuns *int `json:"max_scheduled_runs,omitempty"` +} + +// DeploymentSchedule represents a deployment schedule in the Prefect API. +// This matches the DeploymentScheduleCreate schema which wraps the schedule object. +type DeploymentSchedule struct { + // Slug is a unique identifier for the schedule + Slug *string `json:"slug,omitempty"` + + // Schedule contains the actual schedule configuration (interval, cron, or rrule) + Schedule Schedule `json:"schedule"` + + // Active indicates if the schedule is active + Active *bool `json:"active,omitempty"` + + // MaxScheduledRuns limits the number of scheduled runs + MaxScheduledRuns *int `json:"max_scheduled_runs,omitempty"` + + // MaxActiveRuns limits the number of active runs + MaxActiveRuns *int `json:"max_active_runs,omitempty"` + + // Catchup indicates if the worker should catch up on late runs + Catchup *bool `json:"catchup,omitempty"` + + // Parameters are schedule-specific parameters + Parameters map[string]interface{} `json:"parameters,omitempty"` } // FlowSpec represents the request payload for creating flows @@ -280,7 +327,7 @@ func (c *Client) CreateOrUpdateDeployment(ctx context.Context, deployment *Deplo return nil, fmt.Errorf("failed to read response body: %w", err) } - if resp.StatusCode != http.StatusOK { + if !isSuccessStatusCode(resp.StatusCode) { return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) } @@ -318,7 +365,7 @@ func (c *Client) GetDeployment(ctx context.Context, id string) (*Deployment, err return nil, fmt.Errorf("failed to read response body: %w", err) } - if resp.StatusCode != http.StatusOK { + if !isSuccessStatusCode(resp.StatusCode) { return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) } @@ -374,7 +421,7 @@ func (c *Client) UpdateDeployment(ctx context.Context, id string, deployment *De return nil, fmt.Errorf("failed to read response body: %w", err) } - if resp.StatusCode != http.StatusOK { + if !isSuccessStatusCode(resp.StatusCode) { return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) } @@ -412,7 +459,7 @@ func (c *Client) DeleteDeployment(ctx context.Context, id string) error { return fmt.Errorf("failed to read response body: %w", err) } - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { + if !isSuccessStatusCode(resp.StatusCode) { return fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) } @@ -466,7 +513,7 @@ func (c *Client) CreateOrGetFlow(ctx context.Context, flow *FlowSpec) (*Flow, er return nil, fmt.Errorf("failed to read response body: %w", err) } - if resp.StatusCode != http.StatusOK { + if !isSuccessStatusCode(resp.StatusCode) { return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) } @@ -513,7 +560,7 @@ func (c *Client) GetFlowByName(ctx context.Context, name string) (*Flow, error) return nil, nil // Flow doesn't exist } - if resp.StatusCode != http.StatusOK { + if !isSuccessStatusCode(resp.StatusCode) { return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) } @@ -555,7 +602,7 @@ func (c *Client) GetWorkPool(ctx context.Context, name string) (*WorkPool, error return nil, nil } - if resp.StatusCode != http.StatusOK { + if !isSuccessStatusCode(resp.StatusCode) { return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) } @@ -598,7 +645,7 @@ func (c *Client) CreateWorkPool(ctx context.Context, workPool *WorkPoolSpec) (*W return nil, fmt.Errorf("failed to read response body: %w", err) } - if resp.StatusCode != http.StatusCreated { + if !isSuccessStatusCode(resp.StatusCode) { return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) } @@ -646,7 +693,7 @@ func (c *Client) UpdateWorkPool(ctx context.Context, name string, workPool *Work return fmt.Errorf("failed to read response body: %w", err) } - if resp.StatusCode != http.StatusNoContent { + if !isSuccessStatusCode(resp.StatusCode) { return fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) } @@ -679,7 +726,7 @@ func (c *Client) DeleteWorkPool(ctx context.Context, name string) error { return fmt.Errorf("failed to read response body: %w", err) } - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { + if !isSuccessStatusCode(resp.StatusCode) { return fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) } @@ -733,7 +780,7 @@ func (c *Client) GetWorkerMetadata(ctx context.Context) (map[string]WorkerMetada return nil, nil } - if resp.StatusCode != http.StatusOK { + if !isSuccessStatusCode(resp.StatusCode) { return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) } diff --git a/internal/prefect/client_test.go b/internal/prefect/client_test.go index 21477ca..b10097b 100644 --- a/internal/prefect/client_test.go +++ b/internal/prefect/client_test.go @@ -662,4 +662,148 @@ var _ = Describe("Prefect HTTP Client", func() { Expect(deployment).To(BeNil()) }) }) + + Describe("HTTP Status Code Handling", func() { + var mockServer *httptest.Server + + AfterEach(func() { + if mockServer != nil { + mockServer.Close() + } + }) + + Context("Success status codes (2xx)", func() { + It("Should accept 200 OK", func() { + mockServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte(`{"id":"test-123","name":"test","flow_id":"flow-123","status":"READY"}`)) + })) + + client = NewClient(mockServer.URL, "test-api-key", logger) + deployment, err := client.GetDeployment(ctx, "test-123") + + Expect(err).NotTo(HaveOccurred()) + Expect(deployment).NotTo(BeNil()) + Expect(deployment.ID).To(Equal("test-123")) + }) + + It("Should accept 201 Created for deployment creation", func() { + mockServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusCreated) // 201 Created + _, _ = w.Write([]byte(`{"id":"new-deployment-123","name":"test","flow_id":"flow-123","status":"READY"}`)) + })) + + client = NewClient(mockServer.URL, "test-api-key", logger) + spec := &DeploymentSpec{ + Name: "test-deployment", + FlowID: "flow-123", + } + deployment, err := client.CreateOrUpdateDeployment(ctx, spec) + + Expect(err).NotTo(HaveOccurred()) + Expect(deployment).NotTo(BeNil()) + Expect(deployment.ID).To(Equal("new-deployment-123")) + }) + + It("Should accept 202 Accepted", func() { + mockServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusAccepted) // 202 Accepted + _, _ = w.Write([]byte(`{"id":"test-123","name":"test","flow_id":"flow-123","status":"READY"}`)) + })) + + client = NewClient(mockServer.URL, "test-api-key", logger) + deployment, err := client.GetDeployment(ctx, "test-123") + + Expect(err).NotTo(HaveOccurred()) + Expect(deployment).NotTo(BeNil()) + }) + + It("Should accept 204 No Content for deletions", func() { + mockServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNoContent) // 204 No Content + })) + + client = NewClient(mockServer.URL, "test-api-key", logger) + err := client.DeleteDeployment(ctx, "test-123") + + Expect(err).NotTo(HaveOccurred()) + }) + }) + + Context("Error status codes (non-2xx)", func() { + It("Should reject 400 Bad Request", func() { + mockServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusBadRequest) // 400 Bad Request + _, _ = w.Write([]byte(`{"error":"Invalid request"}`)) + })) + + client = NewClient(mockServer.URL, "test-api-key", logger) + deployment, err := client.GetDeployment(ctx, "test-123") + + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("API request failed with status 400")) + Expect(deployment).To(BeNil()) + }) + + It("Should reject 401 Unauthorized", func() { + mockServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusUnauthorized) // 401 Unauthorized + _, _ = w.Write([]byte(`{"error":"Unauthorized"}`)) + })) + + client = NewClient(mockServer.URL, "test-api-key", logger) + deployment, err := client.GetDeployment(ctx, "test-123") + + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("API request failed with status 401")) + Expect(deployment).To(BeNil()) + }) + + It("Should reject 404 Not Found (when not handling explicitly)", func() { + mockServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotFound) // 404 Not Found + _, _ = w.Write([]byte(`{"error":"Not found"}`)) + })) + + client = NewClient(mockServer.URL, "test-api-key", logger) + deployment, err := client.GetDeployment(ctx, "test-123") + + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("API request failed with status 404")) + Expect(deployment).To(BeNil()) + }) + + It("Should reject 422 Unprocessable Entity", func() { + mockServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusUnprocessableEntity) // 422 Unprocessable Entity + _, _ = w.Write([]byte(`{"error":"Validation failed"}`)) + })) + + client = NewClient(mockServer.URL, "test-api-key", logger) + spec := &DeploymentSpec{ + Name: "invalid-deployment", + FlowID: "flow-123", + } + deployment, err := client.CreateOrUpdateDeployment(ctx, spec) + + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("API request failed with status 422")) + Expect(deployment).To(BeNil()) + }) + + It("Should reject 500 Internal Server Error", func() { + mockServer = httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) // 500 Internal Server Error + _, _ = w.Write([]byte(`{"error":"Server error"}`)) + })) + + client = NewClient(mockServer.URL, "test-api-key", logger) + deployment, err := client.GetDeployment(ctx, "test-123") + + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("API request failed with status 500")) + Expect(deployment).To(BeNil()) + }) + }) + }) }) diff --git a/internal/prefect/convert.go b/internal/prefect/convert.go index 0a01b16..45ad21e 100644 --- a/internal/prefect/convert.go +++ b/internal/prefect/convert.go @@ -97,27 +97,66 @@ func ConvertToDeploymentSpec(k8sDeployment *prefectiov1.PrefectDeployment, flowI spec.PullSteps = pullSteps } - // Schedules + // Schedules - support interval, cron, and rrule schedules if deployment.Schedules != nil { - schedules := make([]Schedule, len(deployment.Schedules)) + schedules := make([]DeploymentSchedule, len(deployment.Schedules)) for i, k8sSchedule := range deployment.Schedules { + // Validate that exactly one schedule type is specified + scheduleTypes := []bool{ + k8sSchedule.Interval != nil, + k8sSchedule.Cron != nil, + k8sSchedule.RRule != nil, + } + activeTypes := 0 + for _, isActive := range scheduleTypes { + if isActive { + activeTypes++ + } + } + if activeTypes != 1 { + return nil, fmt.Errorf("schedule %d (%s): exactly one of interval, cron, or rrule must be specified", i, k8sSchedule.Slug) + } + + // Create the inner schedule object with type-specific fields schedule := Schedule{ - Interval: k8sSchedule.Schedule.Interval, - Timezone: k8sSchedule.Schedule.Timezone, - Active: k8sSchedule.Schedule.Active, - MaxScheduledRuns: k8sSchedule.Schedule.MaxScheduledRuns, + // Note: Common fields like Active, MaxScheduledRuns go on the outer DeploymentSchedule + Timezone: k8sSchedule.Timezone, } - // Parse anchor date if provided - if k8sSchedule.Schedule.AnchorDate != nil { - anchorDate, err := time.Parse(time.RFC3339, *k8sSchedule.Schedule.AnchorDate) - if err != nil { - return nil, fmt.Errorf("failed to parse anchor date for schedule %d: %w", i, err) + // Handle interval schedule fields + if k8sSchedule.Interval != nil { + interval := float64(*k8sSchedule.Interval) + schedule.Interval = &interval + // Parse anchor date if provided for interval schedules + if k8sSchedule.AnchorDate != nil { + anchorDate, err := time.Parse(time.RFC3339, *k8sSchedule.AnchorDate) + if err != nil { + return nil, fmt.Errorf("failed to parse anchor_date for interval schedule %d (%s): %w", i, k8sSchedule.Slug, err) + } + schedule.AnchorDate = &anchorDate } - schedule.AnchorDate = &anchorDate } - schedules[i] = schedule + // Handle cron schedule fields + if k8sSchedule.Cron != nil { + schedule.Cron = k8sSchedule.Cron + schedule.DayOr = k8sSchedule.DayOr + } + + // Handle rrule schedule fields + if k8sSchedule.RRule != nil { + schedule.RRule = k8sSchedule.RRule + } + + // Create the deployment schedule wrapper + deploymentSchedule := DeploymentSchedule{ + Slug: &k8sSchedule.Slug, + Schedule: schedule, + Active: k8sSchedule.Active, + MaxScheduledRuns: k8sSchedule.MaxScheduledRuns, + } + + schedules[i] = deploymentSchedule } spec.Schedules = schedules } diff --git a/internal/prefect/convert_test.go b/internal/prefect/convert_test.go index 5eea830..6fd2280 100644 --- a/internal/prefect/convert_test.go +++ b/internal/prefect/convert_test.go @@ -306,115 +306,264 @@ var _ = Describe("ConvertToDeploymentSpec", func() { Expect(spec.Schedules).To(BeNil()) }) - It("Should handle valid schedule without anchor date", func() { - k8sDeployment.Spec.Deployment.Schedules = []prefectiov1.PrefectSchedule{ - { - Slug: "daily-schedule", - Schedule: prefectiov1.PrefectScheduleConfig{ + Context("Interval schedules", func() { + It("Should handle interval schedule without anchor date", func() { + k8sDeployment.Spec.Deployment.Schedules = []prefectiov1.PrefectSchedule{ + { + Slug: "daily-interval", Interval: ptr.To(86400), // 1 day in seconds Timezone: ptr.To("UTC"), Active: ptr.To(true), }, - }, - } + } + + spec, err := ConvertToDeploymentSpec(k8sDeployment, flowID) + + Expect(err).NotTo(HaveOccurred()) + Expect(spec.Schedules).To(HaveLen(1)) + Expect(spec.Schedules[0].Schedule.Interval).To(Equal(ptr.To(float64(86400)))) + Expect(spec.Schedules[0].Schedule.Timezone).To(Equal(ptr.To("UTC"))) + Expect(spec.Schedules[0].Active).To(Equal(ptr.To(true))) + Expect(spec.Schedules[0].Schedule.AnchorDate).To(BeNil()) + // Ensure other schedule types are nil + Expect(spec.Schedules[0].Schedule.Cron).To(BeNil()) + Expect(spec.Schedules[0].Schedule.RRule).To(BeNil()) + }) + + It("Should handle interval schedule with anchor date", func() { + k8sDeployment.Spec.Deployment.Schedules = []prefectiov1.PrefectSchedule{ + { + Slug: "daily-interval", + Interval: ptr.To(86400), + AnchorDate: ptr.To("2024-01-01T00:00:00Z"), + Timezone: ptr.To("UTC"), + MaxScheduledRuns: ptr.To(10), + }, + } - spec, err := ConvertToDeploymentSpec(k8sDeployment, flowID) + spec, err := ConvertToDeploymentSpec(k8sDeployment, flowID) - Expect(err).NotTo(HaveOccurred()) - Expect(spec.Schedules).To(HaveLen(1)) - Expect(spec.Schedules[0].Interval).To(Equal(ptr.To(86400))) - Expect(spec.Schedules[0].Timezone).To(Equal(ptr.To("UTC"))) - Expect(spec.Schedules[0].Active).To(Equal(ptr.To(true))) - Expect(spec.Schedules[0].AnchorDate).To(BeNil()) - }) + Expect(err).NotTo(HaveOccurred()) + Expect(spec.Schedules).To(HaveLen(1)) + Expect(spec.Schedules[0].Schedule.Interval).To(Equal(ptr.To(float64(86400)))) + Expect(spec.Schedules[0].Schedule.AnchorDate).NotTo(BeNil()) + Expect(spec.Schedules[0].Schedule.AnchorDate.Year()).To(Equal(2024)) + Expect(spec.Schedules[0].MaxScheduledRuns).To(Equal(ptr.To(10))) + }) - It("Should handle valid schedule with anchor date", func() { - k8sDeployment.Spec.Deployment.Schedules = []prefectiov1.PrefectSchedule{ - { - Slug: "daily-schedule", - Schedule: prefectiov1.PrefectScheduleConfig{ + It("Should return error for invalid anchor date format", func() { + k8sDeployment.Spec.Deployment.Schedules = []prefectiov1.PrefectSchedule{ + { + Slug: "daily-interval", Interval: ptr.To(86400), - AnchorDate: ptr.To("2024-01-01T00:00:00Z"), - Timezone: ptr.To("UTC"), + AnchorDate: ptr.To("invalid-date-format"), }, - }, - } + } - spec, err := ConvertToDeploymentSpec(k8sDeployment, flowID) + spec, err := ConvertToDeploymentSpec(k8sDeployment, flowID) - Expect(err).NotTo(HaveOccurred()) - Expect(spec.Schedules).To(HaveLen(1)) - Expect(spec.Schedules[0].AnchorDate).NotTo(BeNil()) - Expect(spec.Schedules[0].AnchorDate.Year()).To(Equal(2024)) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("failed to parse anchor_date for interval schedule 0")) + Expect(spec).To(BeNil()) + }) }) - It("Should handle multiple schedules", func() { - k8sDeployment.Spec.Deployment.Schedules = []prefectiov1.PrefectSchedule{ - { - Slug: "daily-schedule", - Schedule: prefectiov1.PrefectScheduleConfig{ - Interval: ptr.To(86400), + Context("Cron schedules", func() { + It("Should handle cron schedule", func() { + k8sDeployment.Spec.Deployment.Schedules = []prefectiov1.PrefectSchedule{ + { + Slug: "daily-9am", + Cron: ptr.To("0 9 * * *"), + DayOr: ptr.To(true), + Timezone: ptr.To("America/New_York"), + Active: ptr.To(true), }, - }, - { - Slug: "hourly-schedule", - Schedule: prefectiov1.PrefectScheduleConfig{ - Interval: ptr.To(3600), - MaxScheduledRuns: ptr.To(10), + } + + spec, err := ConvertToDeploymentSpec(k8sDeployment, flowID) + + Expect(err).NotTo(HaveOccurred()) + Expect(spec.Schedules).To(HaveLen(1)) + Expect(spec.Schedules[0].Schedule.Cron).To(Equal(ptr.To("0 9 * * *"))) + Expect(spec.Schedules[0].Schedule.DayOr).To(Equal(ptr.To(true))) + Expect(spec.Schedules[0].Schedule.Timezone).To(Equal(ptr.To("America/New_York"))) + Expect(spec.Schedules[0].Active).To(Equal(ptr.To(true))) + // Ensure other schedule types are nil + Expect(spec.Schedules[0].Schedule.Interval).To(BeNil()) + Expect(spec.Schedules[0].Schedule.AnchorDate).To(BeNil()) + Expect(spec.Schedules[0].Schedule.RRule).To(BeNil()) + }) + + It("Should handle cron schedule without day_or field", func() { + k8sDeployment.Spec.Deployment.Schedules = []prefectiov1.PrefectSchedule{ + { + Slug: "every-5-minutes", + Cron: ptr.To("*/5 * * * *"), + Timezone: ptr.To("UTC"), + Active: ptr.To(true), + MaxScheduledRuns: ptr.To(100), }, - }, - } + } - spec, err := ConvertToDeploymentSpec(k8sDeployment, flowID) + spec, err := ConvertToDeploymentSpec(k8sDeployment, flowID) - Expect(err).NotTo(HaveOccurred()) - Expect(spec.Schedules).To(HaveLen(2)) - Expect(spec.Schedules[0].Interval).To(Equal(ptr.To(86400))) - Expect(spec.Schedules[1].Interval).To(Equal(ptr.To(3600))) - Expect(spec.Schedules[1].MaxScheduledRuns).To(Equal(ptr.To(10))) + Expect(err).NotTo(HaveOccurred()) + Expect(spec.Schedules).To(HaveLen(1)) + Expect(spec.Schedules[0].Schedule.Cron).To(Equal(ptr.To("*/5 * * * *"))) + Expect(spec.Schedules[0].Schedule.DayOr).To(BeNil()) // Should be nil when not specified + Expect(spec.Schedules[0].MaxScheduledRuns).To(Equal(ptr.To(100))) + }) }) - It("Should return error for invalid anchor date format", func() { - k8sDeployment.Spec.Deployment.Schedules = []prefectiov1.PrefectSchedule{ - { - Slug: "daily-schedule", - Schedule: prefectiov1.PrefectScheduleConfig{ - Interval: ptr.To(86400), - AnchorDate: ptr.To("invalid-date-format"), + Context("RRule schedules", func() { + It("Should handle rrule schedule", func() { + k8sDeployment.Spec.Deployment.Schedules = []prefectiov1.PrefectSchedule{ + { + Slug: "weekly-monday", + RRule: ptr.To("RRULE:FREQ=WEEKLY;BYDAY=MO"), + Timezone: ptr.To("UTC"), + Active: ptr.To(true), }, - }, - } + } + + spec, err := ConvertToDeploymentSpec(k8sDeployment, flowID) + + Expect(err).NotTo(HaveOccurred()) + Expect(spec.Schedules).To(HaveLen(1)) + Expect(spec.Schedules[0].Schedule.RRule).To(Equal(ptr.To("RRULE:FREQ=WEEKLY;BYDAY=MO"))) + Expect(spec.Schedules[0].Schedule.Timezone).To(Equal(ptr.To("UTC"))) + Expect(spec.Schedules[0].Active).To(Equal(ptr.To(true))) + // Ensure other schedule types are nil + Expect(spec.Schedules[0].Schedule.Interval).To(BeNil()) + Expect(spec.Schedules[0].Schedule.AnchorDate).To(BeNil()) + Expect(spec.Schedules[0].Schedule.Cron).To(BeNil()) + Expect(spec.Schedules[0].Schedule.DayOr).To(BeNil()) + }) + + It("Should handle complex rrule schedule", func() { + k8sDeployment.Spec.Deployment.Schedules = []prefectiov1.PrefectSchedule{ + { + Slug: "monthly-first-friday", + RRule: ptr.To("RRULE:FREQ=MONTHLY;BYDAY=1FR"), + Timezone: ptr.To("America/Los_Angeles"), + Active: ptr.To(true), + MaxScheduledRuns: ptr.To(12), + }, + } - spec, err := ConvertToDeploymentSpec(k8sDeployment, flowID) + spec, err := ConvertToDeploymentSpec(k8sDeployment, flowID) - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("failed to parse anchor date for schedule 0")) - Expect(spec).To(BeNil()) + Expect(err).NotTo(HaveOccurred()) + Expect(spec.Schedules).To(HaveLen(1)) + Expect(spec.Schedules[0].Schedule.RRule).To(Equal(ptr.To("RRULE:FREQ=MONTHLY;BYDAY=1FR"))) + Expect(spec.Schedules[0].Schedule.Timezone).To(Equal(ptr.To("America/Los_Angeles"))) + Expect(spec.Schedules[0].MaxScheduledRuns).To(Equal(ptr.To(12))) + }) }) - It("Should return error for invalid anchor date in second schedule", func() { - k8sDeployment.Spec.Deployment.Schedules = []prefectiov1.PrefectSchedule{ - { - Slug: "daily-schedule", - Schedule: prefectiov1.PrefectScheduleConfig{ - Interval: ptr.To(86400), + Context("Mixed schedule types", func() { + It("Should handle multiple schedules of different types", func() { + k8sDeployment.Spec.Deployment.Schedules = []prefectiov1.PrefectSchedule{ + { + Slug: "hourly-interval", + Interval: ptr.To(3600), AnchorDate: ptr.To("2024-01-01T00:00:00Z"), + Timezone: ptr.To("UTC"), + Active: ptr.To(true), }, - }, - { - Slug: "hourly-schedule", - Schedule: prefectiov1.PrefectScheduleConfig{ - Interval: ptr.To(3600), - AnchorDate: ptr.To("invalid-date"), + { + Slug: "daily-cron", + Cron: ptr.To("0 9 * * *"), + DayOr: ptr.To(false), + Timezone: ptr.To("America/New_York"), + Active: ptr.To(true), }, - }, - } + { + Slug: "weekly-rrule", + RRule: ptr.To("RRULE:FREQ=WEEKLY;BYDAY=MO,WE,FR"), + Timezone: ptr.To("Europe/London"), + Active: ptr.To(true), + }, + } + + spec, err := ConvertToDeploymentSpec(k8sDeployment, flowID) + + Expect(err).NotTo(HaveOccurred()) + Expect(spec.Schedules).To(HaveLen(3)) + + // Interval schedule + Expect(spec.Schedules[0].Schedule.Interval).To(Equal(ptr.To(float64(3600)))) + Expect(spec.Schedules[0].Schedule.AnchorDate).NotTo(BeNil()) + Expect(spec.Schedules[0].Schedule.Cron).To(BeNil()) + Expect(spec.Schedules[0].Schedule.RRule).To(BeNil()) + + // Cron schedule + Expect(spec.Schedules[1].Schedule.Cron).To(Equal(ptr.To("0 9 * * *"))) + Expect(spec.Schedules[1].Schedule.DayOr).To(Equal(ptr.To(false))) + Expect(spec.Schedules[1].Schedule.Interval).To(BeNil()) + Expect(spec.Schedules[1].Schedule.RRule).To(BeNil()) + + // RRule schedule + Expect(spec.Schedules[2].Schedule.RRule).To(Equal(ptr.To("RRULE:FREQ=WEEKLY;BYDAY=MO,WE,FR"))) + Expect(spec.Schedules[2].Schedule.Interval).To(BeNil()) + Expect(spec.Schedules[2].Schedule.Cron).To(BeNil()) + }) + }) - spec, err := ConvertToDeploymentSpec(k8sDeployment, flowID) + Context("Schedule validation", func() { + It("Should return error when no schedule type is specified", func() { + k8sDeployment.Spec.Deployment.Schedules = []prefectiov1.PrefectSchedule{ + { + Slug: "empty-schedule", + Timezone: ptr.To("UTC"), + Active: ptr.To(true), + // No interval, cron, or rrule specified + }, + } - Expect(err).To(HaveOccurred()) - Expect(err.Error()).To(ContainSubstring("failed to parse anchor date for schedule 1")) - Expect(spec).To(BeNil()) + spec, err := ConvertToDeploymentSpec(k8sDeployment, flowID) + + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("schedule 0 (empty-schedule): exactly one of interval, cron, or rrule must be specified")) + Expect(spec).To(BeNil()) + }) + + It("Should return error when multiple schedule types are specified", func() { + k8sDeployment.Spec.Deployment.Schedules = []prefectiov1.PrefectSchedule{ + { + Slug: "invalid-schedule", + Interval: ptr.To(3600), // interval specified + Cron: ptr.To("0 9 * * *"), // cron also specified - invalid! + Timezone: ptr.To("UTC"), + Active: ptr.To(true), + }, + } + + spec, err := ConvertToDeploymentSpec(k8sDeployment, flowID) + + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("schedule 0 (invalid-schedule): exactly one of interval, cron, or rrule must be specified")) + Expect(spec).To(BeNil()) + }) + + It("Should return error when all three schedule types are specified", func() { + k8sDeployment.Spec.Deployment.Schedules = []prefectiov1.PrefectSchedule{ + { + Slug: "invalid-schedule", + Interval: ptr.To(3600), // interval specified + Cron: ptr.To("0 9 * * *"), // cron specified + RRule: ptr.To("RRULE:FREQ=WEEKLY;BYDAY=MO"), // rrule specified - all three invalid! + Timezone: ptr.To("UTC"), + Active: ptr.To(true), + }, + } + + spec, err := ConvertToDeploymentSpec(k8sDeployment, flowID) + + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("schedule 0 (invalid-schedule): exactly one of interval, cron, or rrule must be specified")) + Expect(spec).To(BeNil()) + }) }) }) @@ -473,14 +622,12 @@ var _ = Describe("ConvertToDeploymentSpec", func() { }, Schedules: []prefectiov1.PrefectSchedule{ { - Slug: "test-schedule", - Schedule: prefectiov1.PrefectScheduleConfig{ - Interval: ptr.To(3600), - AnchorDate: ptr.To("2024-01-01T00:00:00Z"), - Timezone: ptr.To("UTC"), - Active: ptr.To(true), - MaxScheduledRuns: ptr.To(10), - }, + Slug: "test-schedule", + Interval: ptr.To(3600), + AnchorDate: ptr.To("2024-01-01T00:00:00Z"), + Timezone: ptr.To("UTC"), + Active: ptr.To(true), + MaxScheduledRuns: ptr.To(10), }, }, } @@ -507,7 +654,7 @@ var _ = Describe("ConvertToDeploymentSpec", func() { Expect(spec.GlobalConcurrencyLimits).To(Equal([]string{"global-limit"})) Expect(spec.WorkQueueName).To(Equal(ptr.To("test-queue"))) Expect(spec.Schedules).To(HaveLen(1)) - Expect(spec.Schedules[0].Interval).To(Equal(ptr.To(3600))) + Expect(spec.Schedules[0].Schedule.Interval).To(Equal(ptr.To(float64(3600)))) }) }) }) diff --git a/internal/prefect/mock.go b/internal/prefect/mock.go index 2224ee8..76bba4a 100644 --- a/internal/prefect/mock.go +++ b/internal/prefect/mock.go @@ -134,7 +134,7 @@ func (m *MockClient) CreateOrUpdateDeployment(ctx context.Context, deployment *D newDeployment.JobVariables = make(map[string]interface{}) } if newDeployment.Schedules == nil { - newDeployment.Schedules = []Schedule{} + newDeployment.Schedules = []DeploymentSchedule{} } if newDeployment.GlobalConcurrencyLimits == nil { newDeployment.GlobalConcurrencyLimits = []string{} @@ -310,7 +310,7 @@ func (m *MockClient) copyDeployment(d *Deployment) *Deployment { } if d.Schedules != nil { - copy.Schedules = make([]Schedule, len(d.Schedules)) + copy.Schedules = make([]DeploymentSchedule, len(d.Schedules)) for i, schedule := range d.Schedules { copy.Schedules[i] = schedule }