-
Notifications
You must be signed in to change notification settings - Fork 0
Periodic Jobs
Periodic jobs run on a schedule using cron expressions. Senna uses distributed coordination to ensure only one worker enqueues each scheduled job, even when running multiple worker instances.
First, enable periodic jobs in your worker configuration:
w, err := worker.New(&worker.Config{
Redis: redisConfig,
Namespace: "myapp",
Settings: senna.WorkerSettings{
Concurrency: 10,
PeriodicEnabled: true, // Enable periodic scheduling
PeriodicPollInterval: 15 * time.Second, // Check schedules every 15s
},
})Use the Periodic method to register jobs that run on a schedule:
// Run every day at 9:00 AM
err := w.Periodic("0 9 * * *", "daily_report")
if err != nil {
log.Fatal(err) // Invalid cron expression
}
// Run every 5 minutes
err = w.Periodic("*/5 * * * *", "check_status")
// Run every Sunday at midnight
err = w.Periodic("0 0 * * 0", "weekly_cleanup")Senna uses standard 5-field cron expressions:
┌───────────── minute (0-59)
│ ┌───────────── hour (0-23)
│ │ ┌───────────── day of month (1-31)
│ │ │ ┌───────────── month (1-12)
│ │ │ │ ┌───────────── day of week (0-6, 0 = Sunday)
│ │ │ │ │
* * * * *
| Expression | Description |
|---|---|
0 9 * * * |
Every day at 9:00 AM |
*/5 * * * * |
Every 5 minutes |
0 */4 * * * |
Every 4 hours |
30 2 * * * |
Every day at 2:30 AM |
0 0 * * 0 |
Every Sunday at midnight |
0 0 1 * * |
First day of every month at midnight |
0 12 * * 1-5 |
Weekdays at noon |
0,30 * * * * |
Every hour at :00 and :30 |
| Character | Description | Example |
|---|---|---|
* |
Any value |
* * * * * (every minute) |
, |
Value list |
0,30 * * * * (at :00 and :30) |
- |
Range |
0 9-17 * * * (9 AM to 5 PM) |
/ |
Step |
*/15 * * * * (every 15 minutes) |
Pass arguments to each scheduled job:
w.Periodic("0 9 * * *", "daily_report",
periodic.WithArgs(map[string]any{
"type": "daily",
"format": "pdf",
}),
)Route periodic jobs to a specific queue:
w.Periodic("*/5 * * * *", "check_critical",
periodic.WithQueue("critical"),
)Set retry count for the enqueued jobs:
w.Periodic("0 2 * * *", "nightly_backup",
periodic.WithRetry(3),
)w.Periodic("0 8 * * 1-5", "weekday_report",
periodic.WithArgs(map[string]any{"day": "weekday"}),
periodic.WithQueue("reports"),
periodic.WithRetry(5),
)When multiple workers are running, only one should enqueue each scheduled job. Senna uses Redis SetNX (set-if-not-exists) for atomic coordination:
- Scheduler calculates next execution time
- Worker tries to acquire lock:
SetNX(key, "1", TTL) - If successful, this worker enqueues the job
- If failed, another worker already claimed it
The lock key pattern is:
namespace:periodic:{job_name}:{minute_timestamp}
- The scheduler checks every
PeriodicPollInterval(default 15 seconds) - Jobs are scheduled at minute precision (not second)
- All times are in the system's local timezone
Note
Periodic jobs have minute-level precision. For sub-minute scheduling (every few seconds), consider using a goroutine with a ticker instead.
Periodic jobs use the same handlers as regular jobs:
// Register the periodic schedule
w.Periodic("0 9 * * *", "daily_report")
// Register the handler
w.Register("daily_report", func(ctx context.Context, job *senna.Job) error {
// job.Args contains the arguments from periodic.WithArgs()
fmt.Println("Generating daily report...")
// Generate report...
return nil
})jobs := w.PeriodicJobs()
for _, job := range jobs {
fmt.Printf("%s: %s\n", job.Name, job.Schedule)
}package main
import (
"context"
"fmt"
"log"
"time"
"github.com/mgomes/senna"
"github.com/mgomes/senna/periodic"
"github.com/mgomes/senna/worker"
)
func main() {
w, err := worker.New(&worker.Config{
Redis: senna.RedisConfig{
Addr: "localhost:6379",
},
Namespace: "myapp",
Settings: senna.WorkerSettings{
Concurrency: 10,
PeriodicEnabled: true,
},
})
if err != nil {
log.Fatal(err)
}
// Register periodic jobs
w.Periodic("0 9 * * *", "daily_report",
periodic.WithArgs(map[string]any{"type": "daily"}),
)
w.Periodic("*/5 * * * *", "health_check",
periodic.WithQueue("monitoring"),
)
w.Periodic("0 0 * * 0", "weekly_cleanup",
periodic.WithRetry(3),
)
// Register handlers
w.Register("daily_report", func(ctx context.Context, job *senna.Job) error {
reportType := job.Args["type"].(string)
fmt.Printf("Generating %s report...\n", reportType)
return nil
})
w.Register("health_check", func(ctx context.Context, job *senna.Job) error {
fmt.Println("Running health check...")
return nil
})
w.Register("weekly_cleanup", func(ctx context.Context, job *senna.Job) error {
fmt.Println("Running weekly cleanup...")
return nil
})
// Run the worker
ctx := context.Background()
if err := w.Run(ctx); err != nil {
log.Fatal(err)
}
}Important
Periodic jobs may occasionally run twice (e.g., during deployments or clock drift). Always make your handlers idempotent—they should produce the same result whether run once or multiple times.
Ensure handlers are idempotent:
w.Register("daily_report", func(ctx context.Context, job *senna.Job) error {
date := time.Now().Format("2006-01-02")
// Check if report already exists for today
exists, _ := reportExists(date)
if exists {
return nil // Already generated
}
return generateReport(date)
})Warning
When running multiple workers, ensure system clocks are synchronized using NTP. Clock skew between workers can cause missed or duplicate job executions.
Track when periodic jobs run:
w.Register("daily_report", func(ctx context.Context, job *senna.Job) error {
metrics.RecordPeriodicJobRun("daily_report")
// ...
return nil
})For very frequent tasks (every few seconds), consider using a goroutine or timer instead of periodic jobs. Periodic jobs have minute-level precision.
- Add Middleware to log periodic job execution
- Handle Errors in periodic jobs
- Use Rate Limiters to control job throughput