Skip to content

Commit 0b79dd1

Browse files
committed
✨ feat(job): introduce a robust background task processing system
- Added a developer-friendly job package, supporting delayed, scheduled, and automatic retry tasks with exponential backoff. - Implemented features like dead letter queues, unique jobs, health checks, multiple queues, and flexible scheduling. - Provided integration tests to ensure reliability across dispatch, retry, uniqueness, scheduling, and queue prioritization scenarios. - Introduced a fluent API (`JobBuilder` and `ScheduledJobBuilder`) for expressive job creation and dispatching. - Documented usage, configuration, and advanced scheduling options in `doc.go`.
1 parent 3ace171 commit 0b79dd1

File tree

15 files changed

+3662
-0
lines changed

15 files changed

+3662
-0
lines changed

contract/job.go

Lines changed: 320 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,320 @@
1+
package contract
2+
3+
import (
4+
"context"
5+
"time"
6+
)
7+
8+
// JobStatus represents the current state of a job
9+
type JobStatus string
10+
11+
const (
12+
JobStatusPending JobStatus = "pending" // Job is waiting to be executed
13+
JobStatusScheduled JobStatus = "scheduled" // Job is scheduled for future execution
14+
JobStatusRunning JobStatus = "running" // Job is currently being executed
15+
JobStatusCompleted JobStatus = "completed" // Job completed successfully
16+
JobStatusFailed JobStatus = "failed" // Job failed after all retries
17+
JobStatusCancelled JobStatus = "cancelled" // Job was cancelled
18+
)
19+
20+
// Job represents a job in the system
21+
type Job interface {
22+
// ID returns the unique job ID
23+
ID() string
24+
25+
// Name returns the job name/type
26+
Name() string
27+
28+
// Queue returns the queue name this job belongs to
29+
Queue() string
30+
31+
// Payload returns the job payload data
32+
Payload() any
33+
34+
// Status returns the current job status
35+
Status() JobStatus
36+
37+
// Attempts returns the number of execution attempts
38+
Attempts() int
39+
40+
// MaxAttempts returns the maximum number of retry attempts
41+
MaxAttempts() int
42+
43+
// CreatedAt returns when the job was created
44+
CreatedAt() time.Time
45+
46+
// ScheduledAt returns when the job is scheduled to run
47+
ScheduledAt() time.Time
48+
49+
// StartedAt returns when the job started executing (zero if not started)
50+
StartedAt() time.Time
51+
52+
// CompletedAt returns when the job completed (zero if not completed)
53+
CompletedAt() time.Time
54+
55+
// LastError returns the last error message if any
56+
LastError() string
57+
58+
// Context returns the job context
59+
Context() context.Context
60+
61+
// Tags returns job metadata tags for filtering/grouping
62+
Tags() map[string]string
63+
}
64+
65+
// JobHandler handles job execution
66+
type JobHandler interface {
67+
// Handle executes the job and returns an error if processing fails
68+
Handle(ctx context.Context, job Job) error
69+
}
70+
71+
// JobHandlerFunc is a function adapter for JobHandler
72+
type JobHandlerFunc func(ctx context.Context, job Job) error
73+
74+
// Handle implements JobHandler interface
75+
func (f JobHandlerFunc) Handle(ctx context.Context, job Job) error {
76+
return f(ctx, job)
77+
}
78+
79+
// Schedule represents a job schedule configuration
80+
// This provides a developer-friendly way to define schedules without knowing cron syntax
81+
type Schedule interface {
82+
// Next returns the next execution time after the given time
83+
Next(after time.Time) time.Time
84+
85+
// String returns a human-readable description of the schedule
86+
String() string
87+
88+
// Cron returns the cron expression if applicable, empty string otherwise
89+
Cron() string
90+
}
91+
92+
// ScheduledJob represents a recurring job definition
93+
type ScheduledJob struct {
94+
// Name is the unique identifier for this scheduled job
95+
Name string
96+
97+
// Handler is the job handler function
98+
Handler JobHandler
99+
100+
// Schedule defines when the job should run
101+
Schedule Schedule
102+
103+
// Queue is the queue to use (default: "default")
104+
Queue string
105+
106+
// Payload is the static payload for the job (optional)
107+
Payload any
108+
109+
// Timeout is the maximum execution time (default: 30 minutes)
110+
Timeout time.Duration
111+
112+
// MaxAttempts is the maximum retry attempts (default: 3)
113+
MaxAttempts int
114+
115+
// Overlap determines if the job can run while a previous instance is still running
116+
// If false (default), the next scheduled run will be skipped if the job is still running
117+
Overlap bool
118+
119+
// Tags are metadata tags for filtering/grouping
120+
Tags map[string]string
121+
}
122+
123+
// JobManager provides the main interface for job management
124+
type JobManager interface {
125+
// Dispatch dispatches a job for immediate or delayed execution
126+
Dispatch(ctx context.Context, job *JobDefinition) (string, error)
127+
128+
// DispatchMany dispatches multiple jobs in a batch
129+
DispatchMany(ctx context.Context, jobs []*JobDefinition) ([]string, error)
130+
131+
// Cancel cancels a pending or scheduled job
132+
Cancel(ctx context.Context, jobID string) error
133+
134+
// Get retrieves a job by ID
135+
Get(ctx context.Context, jobID string) (Job, error)
136+
137+
// RegisterHandler registers a handler for a job name
138+
RegisterHandler(name string, handler JobHandler)
139+
140+
// RegisterSchedule registers a recurring scheduled job
141+
RegisterSchedule(job *ScheduledJob) error
142+
143+
// UnregisterSchedule removes a scheduled job
144+
UnregisterSchedule(name string) error
145+
146+
// Start starts the job workers and scheduler
147+
Start(ctx context.Context) error
148+
149+
// Stop gracefully stops all workers
150+
Stop(ctx context.Context) error
151+
152+
// Health checks the health of the job system
153+
Health(ctx context.Context) error
154+
155+
// Stats returns job system statistics
156+
Stats(ctx context.Context) (*JobStats, error)
157+
}
158+
159+
// JobDefinition defines a job to be dispatched
160+
type JobDefinition struct {
161+
// Name is the job handler name to use
162+
Name string
163+
164+
// Payload is the job payload data
165+
Payload any
166+
167+
// Queue is the queue name (default: "default")
168+
Queue string
169+
170+
// Delay is the delay before the job should be executed
171+
Delay time.Duration
172+
173+
// ScheduledAt is the specific time to execute the job (overrides Delay)
174+
ScheduledAt time.Time
175+
176+
// MaxAttempts is the maximum number of retry attempts (default: 3)
177+
MaxAttempts int
178+
179+
// Timeout is the maximum execution time (default: 30 minutes)
180+
Timeout time.Duration
181+
182+
// UniqueKey is used for job deduplication (optional)
183+
// Jobs with the same UniqueKey won't be duplicated within UniqueFor duration
184+
UniqueKey string
185+
186+
// UniqueFor is the duration to enforce uniqueness (default: 1 hour)
187+
UniqueFor time.Duration
188+
189+
// Tags are metadata tags for filtering/grouping
190+
Tags map[string]string
191+
}
192+
193+
// JobStats contains job system statistics
194+
type JobStats struct {
195+
// Queue statistics
196+
Queues map[string]*QueueStats
197+
198+
// Worker statistics
199+
Workers *WorkerStats
200+
201+
// Scheduler statistics
202+
Scheduler *SchedulerStats
203+
}
204+
205+
// QueueStats contains statistics for a specific queue
206+
type QueueStats struct {
207+
// Name is the queue name
208+
Name string
209+
210+
// Pending is the number of jobs waiting to be processed
211+
Pending int64
212+
213+
// Scheduled is the number of jobs scheduled for future execution
214+
Scheduled int64
215+
216+
// Running is the number of jobs currently being processed
217+
Running int64
218+
219+
// Completed is the total number of completed jobs
220+
Completed int64
221+
222+
// Failed is the total number of failed jobs
223+
Failed int64
224+
225+
// ProcessedPerSecond is the average processing rate
226+
ProcessedPerSecond float64
227+
}
228+
229+
// WorkerStats contains worker statistics
230+
type WorkerStats struct {
231+
// Total is the total number of workers
232+
Total int
233+
234+
// Active is the number of workers currently processing jobs
235+
Active int
236+
237+
// Idle is the number of idle workers
238+
Idle int
239+
}
240+
241+
// SchedulerStats contains scheduler statistics
242+
type SchedulerStats struct {
243+
// RegisteredJobs is the number of registered scheduled jobs
244+
RegisteredJobs int
245+
246+
// NextRun is the time of the next scheduled job
247+
NextRun time.Time
248+
249+
// Running indicates if the scheduler is running
250+
Running bool
251+
}
252+
253+
// JobOption configures a job dispatch
254+
type JobOption func(*JobDefinition)
255+
256+
// WithQueue sets the queue for the job
257+
func WithJobQueue(queue string) JobOption {
258+
return func(j *JobDefinition) {
259+
j.Queue = queue
260+
}
261+
}
262+
263+
// WithDelay sets the delay for the job
264+
func WithJobDelay(delay time.Duration) JobOption {
265+
return func(j *JobDefinition) {
266+
j.Delay = delay
267+
}
268+
}
269+
270+
// WithScheduledAt sets the scheduled execution time
271+
func WithJobScheduledAt(t time.Time) JobOption {
272+
return func(j *JobDefinition) {
273+
j.ScheduledAt = t
274+
}
275+
}
276+
277+
// WithMaxAttempts sets the maximum retry attempts
278+
func WithJobMaxAttempts(attempts int) JobOption {
279+
return func(j *JobDefinition) {
280+
j.MaxAttempts = attempts
281+
}
282+
}
283+
284+
// WithTimeout sets the job timeout
285+
func WithJobTimeout(timeout time.Duration) JobOption {
286+
return func(j *JobDefinition) {
287+
j.Timeout = timeout
288+
}
289+
}
290+
291+
// WithUniqueKey sets the unique key for deduplication
292+
func WithJobUniqueKey(key string, duration time.Duration) JobOption {
293+
return func(j *JobDefinition) {
294+
j.UniqueKey = key
295+
j.UniqueFor = duration
296+
}
297+
}
298+
299+
// WithTags sets the job tags
300+
func WithJobTags(tags map[string]string) JobOption {
301+
return func(j *JobDefinition) {
302+
j.Tags = tags
303+
}
304+
}
305+
306+
// NewJobDefinition creates a new job definition with options
307+
func NewJobDefinition(name string, payload any, opts ...JobOption) *JobDefinition {
308+
j := &JobDefinition{
309+
Name: name,
310+
Payload: payload,
311+
Queue: "default",
312+
MaxAttempts: 3,
313+
Timeout: 30 * time.Minute,
314+
UniqueFor: time.Hour,
315+
}
316+
for _, opt := range opts {
317+
opt(j)
318+
}
319+
return j
320+
}

core/health/checkers.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,45 @@ func (c *EventChecker) Check(ctx context.Context) contract.HealthCheckResult {
201201
}
202202
}
203203

204+
// JobChecker checks the health of the job system
205+
type JobChecker struct {
206+
name string
207+
job contract.JobManager
208+
}
209+
210+
// NewJobChecker creates a new job system health checker
211+
func NewJobChecker(job contract.JobManager) *JobChecker {
212+
return &JobChecker{
213+
name: "job",
214+
job: job,
215+
}
216+
}
217+
218+
// Name returns the checker name
219+
func (c *JobChecker) Name() string {
220+
return c.name
221+
}
222+
223+
// Check performs the job system health check
224+
func (c *JobChecker) Check(ctx context.Context) contract.HealthCheckResult {
225+
start := time.Now()
226+
227+
if err := c.job.Health(ctx); err != nil {
228+
return contract.HealthCheckResult{
229+
Status: contract.HealthStatusDown,
230+
Latency: time.Since(start),
231+
Message: "health check failed: " + err.Error(),
232+
Timestamp: time.Now(),
233+
}
234+
}
235+
236+
return contract.HealthCheckResult{
237+
Status: contract.HealthStatusUp,
238+
Latency: time.Since(start),
239+
Timestamp: time.Now(),
240+
}
241+
}
242+
204243
// CustomChecker is a helper for creating custom health checkers
205244
type CustomChecker struct {
206245
name string

0 commit comments

Comments
 (0)