Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,60 @@ that for many testing scenarios, we can't assume that the operator is running
in the same cluster it is operating on. Use port-forwarding where appropriate
to support these cases.

### Local Testing

For local testing and development, first start a local Kubernetes cluster:

```bash
minikube start # Start local Kubernetes cluster (if minikube is available)
```

Then run the operator locally:

```bash
make install run # Install CRDs and run operator locally (long-running process)
```

This command should be run in a background shell as it's a long-lived process that watches and reconciles Prefect resources. The operator will:
- Install/update CRDs to the cluster
- Start the controller manager locally
- Watch for changes to PrefectServer, PrefectWorkPool, and PrefectDeployment resources
- Use port-forwarding to connect to in-cluster Prefect servers when needed

#### Testing with Sample Resources

Apply sample resources from `deploy/samples/` to test different scenarios:

```bash
# Apply a complete end-to-end example with all schedule types
kubectl apply -f deploy/samples/deployment_end-to-end.yaml

# Or test individual components
kubectl apply -f deploy/samples/v1_prefectserver_ephemeral.yaml
kubectl apply -f deploy/samples/v1_prefectworkpool_kubernetes.yaml

# List available sample configurations
ls deploy/samples/
```

#### Accessing the Prefect API

When testing with in-cluster Prefect servers, you can port-forward to access the API directly:

```bash
kubectl port-forward svc/prefect-ephemeral 4200:4200
# Prefect API now available at http://localhost:4200/api
```

This allows you to inspect deployments, schedules, and other resources created by the operator:
```bash
# View all deployments
curl -X POST http://localhost:4200/api/deployments/filter -H "Content-Type: application/json" -d '{}'

# View deployment schedules
curl -X POST http://localhost:4200/api/deployments/filter -H "Content-Type: application/json" -d '{}' | jq '.[] | {name, schedules}'
```

### Code Generation Workflow

The operator uses controller-gen for:
Expand Down
55 changes: 44 additions & 11 deletions api/v1/prefectdeployment_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,36 +120,69 @@ type PrefectVersionInfo struct {
Version *string `json:"version,omitempty"`
}

// PrefectSchedule defines a schedule for the deployment
// PrefectSchedule defines a schedule for the deployment.
// This structure exactly matches Prefect's prefect.yaml and API format.
// Exactly one of Interval, Cron, or RRule must be specified.
type PrefectSchedule struct {
// Slug is a unique identifier for the schedule
// Maps to: DeploymentScheduleCreate.slug (string)
Slug string `json:"slug"`

// Schedule defines the schedule configuration
Schedule PrefectScheduleConfig `json:"schedule"`
}
// === INTERVAL SCHEDULE FIELDS ===
// Maps to: IntervalSchedule schema in Prefect API

// PrefectScheduleConfig defines schedule timing configuration
type PrefectScheduleConfig struct {
// Interval is the schedule interval in seconds
// Interval is the schedule interval in seconds (required for interval schedules)
// Maps to: IntervalSchedule.interval (number, required)
// +optional
Interval *int `json:"interval,omitempty"`

// AnchorDate is the anchor date for the schedule
// AnchorDate is the anchor date for interval schedules in RFC3339 format
// Maps to: IntervalSchedule.anchor_date (string, format: date-time)
// Example: "2024-01-01T00:00:00Z"
// +optional
AnchorDate *string `json:"anchor_date,omitempty"`

// === CRON SCHEDULE FIELDS ===
// Maps to: CronSchedule schema in Prefect API

// Cron is a valid cron expression (required for cron schedules)
// Maps to: CronSchedule.cron (string, required)
// Examples: "0 9 * * *" (daily at 9am), "*/5 * * * *" (every 5 minutes)
// +optional
Cron *string `json:"cron,omitempty"`

// DayOr controls how croniter handles day and day_of_week entries
// Maps to: CronSchedule.day_or (boolean, default: true)
// true = OR logic (standard cron), false = AND logic (like fcron)
// +optional
AnchorDate *string `json:"anchorDate,omitempty"`
DayOr *bool `json:"day_or,omitempty"`

// === RRULE SCHEDULE FIELDS ===
// Maps to: RRuleSchedule schema in Prefect API

// RRule is a valid RFC 5545 RRULE string (required for rrule schedules)
// Maps to: RRuleSchedule.rrule (string, required)
// Examples: "RRULE:FREQ=WEEKLY;BYDAY=MO", "RRULE:FREQ=MONTHLY;BYDAY=1FR"
// +optional
RRule *string `json:"rrule,omitempty"`

// === COMMON FIELDS (shared across all schedule types) ===

// Timezone for the schedule
// Timezone for the schedule (IANA timezone string)
// Maps to: IntervalSchedule.timezone, CronSchedule.timezone, RRuleSchedule.timezone
// Examples: "America/New_York", "UTC", "Europe/London"
// +optional
Timezone *string `json:"timezone,omitempty"`

// Active indicates if the schedule is active
// Maps to: DeploymentScheduleCreate.active (boolean, default: true)
// +optional
Active *bool `json:"active,omitempty"`

// MaxScheduledRuns limits the number of scheduled runs
// Maps to: DeploymentScheduleCreate.max_scheduled_runs (integer > 0)
// +optional
MaxScheduledRuns *int `json:"maxScheduledRuns,omitempty"`
MaxScheduledRuns *int `json:"max_scheduled_runs,omitempty"`
}

// PrefectGlobalConcurrencyLimit defines global concurrency limit configuration
Expand Down
178 changes: 160 additions & 18 deletions api/v1/prefectdeployment_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,37 +167,179 @@ var _ = Describe("PrefectDeployment type", func() {
Expect(deploymentConfig.Paused).To(Equal(ptr.To(false)))
})

It("should support deployment with schedules", func() {
It("should support deployment with legacy nested schedules", func() {
// This test verifies backward compatibility - we'll remove this once we migrate
deploymentConfig := PrefectDeploymentConfiguration{
Entrypoint: "flows.py:my_flow",
Schedules: []PrefectSchedule{
{
Slug: "daily-schedule",
Schedule: PrefectScheduleConfig{
Interval: ptr.To(86400), // 24 hours in seconds
AnchorDate: ptr.To("2024-01-01T00:00:00Z"),
Timezone: ptr.To("UTC"),
Active: ptr.To(true),
MaxScheduledRuns: ptr.To(10),
},
Slug: "daily-schedule",
Interval: ptr.To(86400), // 24 hours in seconds
AnchorDate: ptr.To("2024-01-01T00:00:00Z"),
Timezone: ptr.To("UTC"),
Active: ptr.To(true),
MaxScheduledRuns: ptr.To(10),
},
{
Slug: "hourly-schedule",
Schedule: PrefectScheduleConfig{
Interval: ptr.To(3600), // 1 hour in seconds
AnchorDate: ptr.To("2024-01-01T00:00:00Z"),
Timezone: ptr.To("UTC"),
Active: ptr.To(false),
},
Slug: "hourly-schedule",
Interval: ptr.To(3600), // 1 hour in seconds
AnchorDate: ptr.To("2024-01-01T00:00:00Z"),
Timezone: ptr.To("UTC"),
Active: ptr.To(false),
},
},
}

Expect(deploymentConfig.Schedules).To(HaveLen(2))
Expect(deploymentConfig.Schedules[0].Slug).To(Equal("daily-schedule"))
Expect(deploymentConfig.Schedules[0].Schedule.Interval).To(Equal(ptr.To(86400)))
Expect(deploymentConfig.Schedules[0].Interval).To(Equal(ptr.To(86400)))
Expect(deploymentConfig.Schedules[1].Slug).To(Equal("hourly-schedule"))
Expect(deploymentConfig.Schedules[1].Schedule.Active).To(Equal(ptr.To(false)))
Expect(deploymentConfig.Schedules[1].Active).To(Equal(ptr.To(false)))
})

It("should support deployment with flattened interval schedules", func() {
deploymentConfig := PrefectDeploymentConfiguration{
Entrypoint: "flows.py:my_flow",
Schedules: []PrefectSchedule{
{
Slug: "daily-interval",
Interval: ptr.To(86400), // 24 hours in seconds
AnchorDate: ptr.To("2024-01-01T00:00:00Z"),
Timezone: ptr.To("UTC"),
Active: ptr.To(true),
MaxScheduledRuns: ptr.To(10),
},
{
Slug: "hourly-interval",
Interval: ptr.To(3600), // 1 hour in seconds
AnchorDate: ptr.To("2024-01-01T00:00:00Z"),
Timezone: ptr.To("UTC"),
Active: ptr.To(false),
},
},
}

Expect(deploymentConfig.Schedules).To(HaveLen(2))
Expect(deploymentConfig.Schedules[0].Slug).To(Equal("daily-interval"))
Expect(deploymentConfig.Schedules[0].Interval).To(Equal(ptr.To(86400)))
Expect(deploymentConfig.Schedules[0].AnchorDate).To(Equal(ptr.To("2024-01-01T00:00:00Z")))
Expect(deploymentConfig.Schedules[0].Timezone).To(Equal(ptr.To("UTC")))
Expect(deploymentConfig.Schedules[0].Active).To(Equal(ptr.To(true)))
Expect(deploymentConfig.Schedules[0].MaxScheduledRuns).To(Equal(ptr.To(10)))

Expect(deploymentConfig.Schedules[1].Slug).To(Equal("hourly-interval"))
Expect(deploymentConfig.Schedules[1].Interval).To(Equal(ptr.To(3600)))
Expect(deploymentConfig.Schedules[1].Active).To(Equal(ptr.To(false)))
})

It("should support deployment with cron schedules", func() {
deploymentConfig := PrefectDeploymentConfiguration{
Entrypoint: "flows.py:my_flow",
Schedules: []PrefectSchedule{
{
Slug: "daily-9am",
Cron: ptr.To("0 9 * * *"),
DayOr: ptr.To(true),
Timezone: ptr.To("America/New_York"),
Active: ptr.To(true),
},
{
Slug: "every-5-minutes",
Cron: ptr.To("*/5 * * * *"),
Timezone: ptr.To("UTC"),
Active: ptr.To(true),
MaxScheduledRuns: ptr.To(100),
},
},
}

Expect(deploymentConfig.Schedules).To(HaveLen(2))
Expect(deploymentConfig.Schedules[0].Slug).To(Equal("daily-9am"))
Expect(deploymentConfig.Schedules[0].Cron).To(Equal(ptr.To("0 9 * * *")))
Expect(deploymentConfig.Schedules[0].DayOr).To(Equal(ptr.To(true)))
Expect(deploymentConfig.Schedules[0].Timezone).To(Equal(ptr.To("America/New_York")))
Expect(deploymentConfig.Schedules[0].Active).To(Equal(ptr.To(true)))

Expect(deploymentConfig.Schedules[1].Slug).To(Equal("every-5-minutes"))
Expect(deploymentConfig.Schedules[1].Cron).To(Equal(ptr.To("*/5 * * * *")))
Expect(deploymentConfig.Schedules[1].Timezone).To(Equal(ptr.To("UTC")))
Expect(deploymentConfig.Schedules[1].MaxScheduledRuns).To(Equal(ptr.To(100)))
})

It("should support deployment with rrule schedules", func() {
deploymentConfig := PrefectDeploymentConfiguration{
Entrypoint: "flows.py:my_flow",
Schedules: []PrefectSchedule{
{
Slug: "weekly-monday",
RRule: ptr.To("RRULE:FREQ=WEEKLY;BYDAY=MO"),
Timezone: ptr.To("UTC"),
Active: ptr.To(true),
},
{
Slug: "monthly-first-friday",
RRule: ptr.To("RRULE:FREQ=MONTHLY;BYDAY=1FR"),
Timezone: ptr.To("America/Los_Angeles"),
Active: ptr.To(true),
MaxScheduledRuns: ptr.To(12),
},
},
}

Expect(deploymentConfig.Schedules).To(HaveLen(2))
Expect(deploymentConfig.Schedules[0].Slug).To(Equal("weekly-monday"))
Expect(deploymentConfig.Schedules[0].RRule).To(Equal(ptr.To("RRULE:FREQ=WEEKLY;BYDAY=MO")))
Expect(deploymentConfig.Schedules[0].Timezone).To(Equal(ptr.To("UTC")))
Expect(deploymentConfig.Schedules[0].Active).To(Equal(ptr.To(true)))

Expect(deploymentConfig.Schedules[1].Slug).To(Equal("monthly-first-friday"))
Expect(deploymentConfig.Schedules[1].RRule).To(Equal(ptr.To("RRULE:FREQ=MONTHLY;BYDAY=1FR")))
Expect(deploymentConfig.Schedules[1].Timezone).To(Equal(ptr.To("America/Los_Angeles")))
Expect(deploymentConfig.Schedules[1].MaxScheduledRuns).To(Equal(ptr.To(12)))
})

It("should support deployment with mixed schedule types", func() {
deploymentConfig := PrefectDeploymentConfiguration{
Entrypoint: "flows.py:my_flow",
Schedules: []PrefectSchedule{
{
Slug: "hourly-interval",
Interval: ptr.To(3600),
AnchorDate: ptr.To("2024-01-01T00:00:00Z"),
Timezone: ptr.To("UTC"),
Active: ptr.To(true),
},
{
Slug: "daily-cron",
Cron: ptr.To("0 9 * * *"),
Timezone: ptr.To("America/New_York"),
Active: ptr.To(true),
},
{
Slug: "weekly-rrule",
RRule: ptr.To("RRULE:FREQ=WEEKLY;BYDAY=MO,WE,FR"),
Timezone: ptr.To("Europe/London"),
Active: ptr.To(true),
},
},
}

Expect(deploymentConfig.Schedules).To(HaveLen(3))

// Interval schedule
Expect(deploymentConfig.Schedules[0].Interval).To(Equal(ptr.To(3600)))
Expect(deploymentConfig.Schedules[0].Cron).To(BeNil())
Expect(deploymentConfig.Schedules[0].RRule).To(BeNil())

// Cron schedule
Expect(deploymentConfig.Schedules[1].Cron).To(Equal(ptr.To("0 9 * * *")))
Expect(deploymentConfig.Schedules[1].Interval).To(BeNil())
Expect(deploymentConfig.Schedules[1].RRule).To(BeNil())

// RRule schedule
Expect(deploymentConfig.Schedules[2].RRule).To(Equal(ptr.To("RRULE:FREQ=WEEKLY;BYDAY=MO,WE,FR")))
Expect(deploymentConfig.Schedules[2].Interval).To(BeNil())
Expect(deploymentConfig.Schedules[2].Cron).To(BeNil())
})

It("should support deployment with concurrency limits", func() {
Expand Down
Loading
Loading