-
Notifications
You must be signed in to change notification settings - Fork 0
Best Practices
Follow these guidelines to get the most out of Senna.
Senna serializes job arguments to JSON before storing them in Redis. Pass simple identifiers, not complex objects:
// Don't do this - pass the whole object
user, _ := db.GetUser(userID)
c.Enqueue(ctx, "send_welcome", map[string]any{
"user": user, // Complex struct, may not serialize correctly
})
// Do this - pass just the ID
c.Enqueue(ctx, "send_welcome", map[string]any{
"user_id": userID,
})In your handler, look up the data when you need it:
w.Register("send_welcome", func(ctx context.Context, job *senna.Job) error {
userID := int(job.Args["user_id"].(float64))
user, err := db.GetUser(userID)
if err != nil {
return err
}
// Use fresh data...
return nil
})Warning
Jobs may not run immediately. A job enqueued now might execute minutes or hours later if queues back up, workers restart, or the job is scheduled/retried. Any data you embed in job arguments is a snapshot in time that becomes stale the moment it's enqueued.
This approach has two benefits:
- Fresh data - You always work with the current state, not a stale snapshot from when the job was enqueued
- Smaller payloads - IDs are tiny compared to full objects, reducing Redis memory usage
Note
Job arguments must be JSON-serializable: string, int/float, bool, nil, slice, and map. Remember that all numbers become float64 after JSON round-trip, so cast accordingly in your handlers.
Idempotent means your job can safely run multiple times with the same result. This is critical because:
- Jobs retry automatically on failure
- A job might partially complete before an error
- Network issues can cause a job to run twice
- Worker crashes can leave jobs in an uncertain state
// Problematic: What if this runs twice?
w.Register("refund_order", func(ctx context.Context, job *senna.Job) error {
orderID := int(job.Args["order_id"].(float64))
order, _ := db.GetOrder(orderID)
payment.Refund(order.ChargeID) // Could refund twice!
email.Send(order.UserEmail, "refund") // Could email twice!
return nil
})
// Better: Check state before acting
w.Register("refund_order", func(ctx context.Context, job *senna.Job) error {
orderID := int(job.Args["order_id"].(float64))
order, _ := db.GetOrder(orderID)
if order.Status == "refunded" {
return nil // Already processed
}
if err := payment.Refund(order.ChargeID); err != nil {
return err
}
order.Status = "refunded"
if err := db.SaveOrder(order); err != nil {
return err
}
// Email is idempotent-ish (user might get duplicates, but that's acceptable)
email.Send(order.UserEmail, "refund")
return nil
})Important
Senna provides at-least-once delivery, not exactly-once. Even a job that completed successfully can theoretically run again if Redis fails between job completion and acknowledgment. Design accordingly.
Senna is designed for parallel execution. Design your jobs to run concurrently without issues:
- Avoid global mutable state
- Use database transactions for data consistency
- Don't assume job execution order
If you have resource constraints (like API rate limits or connection pools), use the built-in tools:
// Limit concurrent API calls across all workers
w.Register("sync_external", handler, worker.WithMaxConcurrency(5))
// Or use distributed rate limiting
limiter := ratelimit.Concurrent(w.Redis(), ratelimit.ConcurrentConfig{
Name: "api-connections",
Limit: 10,
})
w.Register("call_api", handler, worker.WithJobRateLimiter(limiter))Tip
Your architecture is simpler if jobs don't require special concurrency handling. Reserve WithMaxConcurrency and rate limiters for genuinely constrained resources like external APIs.
Clear terminology prevents confusion:
| Term | Meaning |
|---|---|
| Job | A unit of work with a type and arguments |
| Handler | A function that processes a specific job type |
| Worker | A process that fetches and executes jobs |
| Queue | A Redis list holding jobs waiting to be processed |
| Goroutine | A concurrent execution slot within a worker (controlled by Concurrency) |
Example usage:
- "The worker has 10 goroutines processing jobs from 3 queues"
- "Register a handler for the
send_emailjob type" - "Enqueue a job to the
criticalqueue"
Not all errors should trigger retries:
w.Register("process_upload", func(ctx context.Context, job *senna.Job) error {
uploadID := job.Args["upload_id"].(string)
upload, err := db.GetUpload(uploadID)
if errors.Is(err, db.ErrNotFound) {
// Record doesn't exist - retrying won't help
log.Printf("upload %s not found, skipping", uploadID)
return nil
}
if err != nil {
// Transient DB error - retry
return err
}
if err := processFile(upload.Path); err != nil {
var parseErr *ParseError
if errors.As(err, &parseErr) {
// Invalid file format - retrying won't help
return &senna.MaxRetriesExceededError{
Job: job,
Cause: err,
}
}
// Other error - retry
return err
}
return nil
})Tip
Log skipped jobs so you can investigate later. A job that silently returns nil on error is hard to debug.
Jobs are just functions—test them like any other code:
func TestSendWelcomeEmail(t *testing.T) {
// Create a test job
job := &senna.Job{
ID: "test-123",
Type: "send_welcome",
Args: map[string]any{
"user_id": float64(42), // Remember: numbers are float64
},
}
// Call the handler directly
err := sendWelcomeHandler(context.Background(), job)
assert.NoError(t, err)
// Assert side effects...
}- Error Handling - Retries, backoff strategies
- Rate Limiters - Control throughput
- Batches - Group related jobs