Skip to content

Commit cfe1bc3

Browse files
authored
Merge pull request #1934 from wheels-dev/peter/job-worker-1913
Add job worker daemon with CLI commands (#1913)
2 parents 3b51644 + bc94bdd commit cfe1bc3

File tree

23 files changed

+2074
-86
lines changed

23 files changed

+2074
-86
lines changed

.ai/wheels/jobs/overview.md

Lines changed: 47 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# Background Jobs
22

33
## Description
4-
Wheels provides a database-backed job queue system for running tasks asynchronously. Jobs are persisted to a `_wheels_jobs` table, processed in priority order, and automatically retried with exponential backoff on failure.
4+
Wheels provides a database-backed job queue system for running tasks asynchronously. Jobs are persisted to a `wheels_jobs` table, processed in priority order, and automatically retried with exponential backoff on failure.
55

66
## Key Points
77
- Jobs extend `wheels.Job` and implement a `perform()` method
@@ -17,7 +17,7 @@ Wheels provides a database-backed job queue system for running tasks asynchronou
1717
```bash
1818
wheels dbmigrate latest
1919
```
20-
This creates the `_wheels_jobs` table. The migration is at `app/migrator/migrations/20260221000001_create_wheels_jobs_table.cfc`.
20+
This creates the `wheels_jobs` table. The migration is at `app/migrator/migrations/20260221000001_createwheels_jobs_table.cfc`.
2121

2222
### 2. Create a Job
2323
```cfm
@@ -177,7 +177,7 @@ count = job.purgeCompleted(days=30, queue="reports");
177177

178178
## Database Schema
179179

180-
The `_wheels_jobs` table has these columns:
180+
The `wheels_jobs` table has these columns:
181181

182182
| Column | Type | Description |
183183
|--------|------|-------------|
@@ -198,7 +198,35 @@ The `_wheels_jobs` table has these columns:
198198

199199
## Running Workers
200200

201-
### Scheduled Task (Recommended)
201+
### Job Worker Daemon (Recommended)
202+
Use the CLI worker daemon for persistent job processing:
203+
204+
```bash
205+
# Start a worker for all queues
206+
wheels jobs work
207+
208+
# Process specific queues with custom interval
209+
wheels jobs work --queue=mailers,default --interval=3
210+
211+
# Run multiple workers for parallelism
212+
wheels jobs work --queue=mailers &
213+
wheels jobs work --queue=default &
214+
```
215+
216+
Workers claim jobs using optimistic locking (`UPDATE WHERE status='pending' AND id=:id`), so multiple workers can safely run concurrently without processing duplicates.
217+
218+
### CLI Management Commands
219+
```bash
220+
wheels jobs status # Per-queue breakdown
221+
wheels jobs status --format=json # JSON output
222+
wheels jobs retry # Reset failed jobs to pending
223+
wheels jobs retry --queue=mailers --limit=10
224+
wheels jobs purge # Purge completed jobs > 7 days
225+
wheels jobs purge --completed --failed --older-than=30
226+
wheels jobs monitor # Live dashboard
227+
```
228+
229+
### Scheduled Task (Alternative)
202230
Set up a CFML scheduled task to call `processQueue()` periodically:
203231

204232
```cfm
@@ -220,10 +248,21 @@ function processJobs() {
220248
}
221249
```
222250

223-
### CLI Command
224-
```bash
225-
# Process jobs via a controller endpoint
226-
curl http://localhost:8080/jobs/process?queue=default&limit=50
251+
## JobWorker Engine
252+
The `wheels.JobWorker` CFC provides the core worker logic:
253+
- `processNext(queues, timeout)` — Claim and process one job with optimistic locking
254+
- `checkTimeouts(timeout)` — Recover jobs stuck in 'processing'
255+
- `getStats(queue)` — Per-queue breakdown with totals
256+
- `getMonitorData(queue, minutes)` — Throughput, error rates, recent jobs
257+
- `retryFailed(queue, limit)` — Reset failed jobs to pending
258+
- `purge(status, days, queue)` — Delete old completed/failed jobs
259+
260+
## Configurable Backoff
261+
Jobs support configurable exponential backoff via `this.baseDelay` and `this.maxDelay`:
262+
```cfm
263+
this.baseDelay = 2; // Base delay in seconds (default: 2)
264+
this.maxDelay = 3600; // Maximum delay cap (default: 3600)
265+
// Formula: Min(baseDelay * 2^attempt, maxDelay)
227266
```
228267

229268
## Best Practices

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ All historical references to "CFWheels" in this changelog have been preserved fo
2121
## [Unreleased]
2222

2323
### Added
24+
- Job worker daemon with CLI commands (`wheels jobs work/status/retry/purge/monitor`) for persistent background job processing with optimistic locking, timeout recovery, and live monitoring
25+
- Configurable exponential backoff for jobs via `this.baseDelay` and `this.maxDelay` with formula `Min(baseDelay * 2^attempt, maxDelay)`
2426
- Rate limiting middleware with `wheels.middleware.RateLimiter` supporting fixed window, sliding window, and token bucket strategies with in-memory and database storage
2527
- Composable pagination view helpers: `paginationInfo()`, `previousPageLink()`, `nextPageLink()`, `firstPageLink()`, `lastPageLink()`, `pageNumberLinks()`, and `paginationNav()` for building custom pagination UIs
2628
- Route model binding with `binding=true` on resource routes or `set(routeModelBinding=true)` globally to auto-resolve model instances from route key parameters

CLAUDE.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -454,7 +454,20 @@ job.retryFailed(queue="mailers"); // retry all failed jobs
454454
job.purgeCompleted(days=7); // clean up old completed jobs
455455
```
456456

457-
Requires migration: `20260221000001_create_wheels_jobs_table.cfc`. Run with `wheels dbmigrate latest`.
457+
**Job Worker CLI** — persistent daemon for processing jobs:
458+
```bash
459+
wheels jobs work # process all queues
460+
wheels jobs work --queue=mailers --interval=3 # specific queue, 3s poll
461+
wheels jobs status # per-queue breakdown
462+
wheels jobs status --format=json # JSON output
463+
wheels jobs retry --queue=mailers # retry failed jobs
464+
wheels jobs purge --completed --failed --older-than=30
465+
wheels jobs monitor # live dashboard
466+
```
467+
468+
**Configurable backoff**: `this.baseDelay = 2` and `this.maxDelay = 3600` in job `config()`. Formula: `Min(baseDelay * 2^attempt, maxDelay)`.
469+
470+
Requires migration: `20260221000001_createwheels_jobs_table.cfc`. Run with `wheels dbmigrate latest`.
458471

459472
## Server-Sent Events (SSE) Quick Reference
460473

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/**
2+
* Live monitoring dashboard for the job queue.
3+
* Refreshes at a configurable interval showing throughput, errors, and recent jobs.
4+
*
5+
* {code:bash}
6+
* wheels jobs monitor
7+
* wheels jobs monitor --interval=5
8+
* wheels jobs monitor --queue=mailers
9+
* {code}
10+
*/
11+
component extends="../../base" {
12+
13+
/**
14+
* @interval Refresh interval in seconds (default: 3)
15+
* @queue Filter by queue name (default: all queues)
16+
*/
17+
public void function run(
18+
numeric interval = 3,
19+
string queue = ""
20+
) {
21+
local.appPath = getCWD();
22+
23+
if (!isWheelsApp(local.appPath)) {
24+
error("This command must be run from a Wheels application directory");
25+
return;
26+
}
27+
28+
print.line();
29+
print.boldMagentaLine("Wheels Job Monitor");
30+
print.line("Press Ctrl+C to stop");
31+
print.line();
32+
33+
while (true) {
34+
// Build URL parameters
35+
local.urlParams = "&command=jobsMonitor";
36+
if (Len(arguments.queue)) {
37+
local.urlParams &= "&queue=#arguments.queue#";
38+
}
39+
40+
try {
41+
local.result = $sendToCliCommand(urlstring = local.urlParams);
42+
43+
if (StructKeyExists(local.result, "success") && local.result.success) {
44+
// Clear previous output with separator
45+
print.line(RepeatString("=", 60));
46+
print.boldCyanLine("Job Queue Dashboard - #TimeFormat(Now(), "HH:mm:ss")#");
47+
print.line(RepeatString("-", 60));
48+
49+
// Queue statistics
50+
if (StructKeyExists(local.result, "stats") && StructKeyExists(local.result.stats, "totals")) {
51+
local.t = local.result.stats.totals;
52+
print.line();
53+
print.boldLine("Queue Summary:");
54+
print.yellowLine(" Pending: #local.t.pending#");
55+
print.cyanLine(" Processing: #local.t.processing#");
56+
print.greenLine(" Completed: #local.t.completed#");
57+
if (local.t.failed > 0) {
58+
print.redLine(" Failed: #local.t.failed#");
59+
} else {
60+
print.line(" Failed: #local.t.failed#");
61+
}
62+
print.line(" Total: #local.t.total#");
63+
}
64+
65+
// Throughput
66+
if (StructKeyExists(local.result, "monitor")) {
67+
local.m = local.result.monitor;
68+
69+
print.line();
70+
print.boldLine("Throughput (last 60 min):");
71+
print.greenLine(" Completed: #local.m.throughput.completed#");
72+
if (local.m.throughput.failed > 0) {
73+
print.redLine(" Failed: #local.m.throughput.failed#");
74+
} else {
75+
print.line(" Failed: #local.m.throughput.failed#");
76+
}
77+
if (local.m.errorRate > 0) {
78+
print.redLine(" Error rate: #local.m.errorRate#%");
79+
} else {
80+
print.greenLine(" Error rate: 0%");
81+
}
82+
83+
if (Len(local.m.oldestPending)) {
84+
print.line();
85+
print.line("Oldest pending job: #DateTimeFormat(local.m.oldestPending, 'yyyy-mm-dd HH:mm:ss')#");
86+
}
87+
88+
// Recent jobs
89+
if (ArrayLen(local.m.recentJobs)) {
90+
print.line();
91+
print.boldLine("Recent Jobs:");
92+
local.count = Min(ArrayLen(local.m.recentJobs), 5);
93+
for (local.i = 1; local.i <= local.count; local.i++) {
94+
local.job = local.m.recentJobs[local.i];
95+
local.statusColor = "line";
96+
if (local.job.status == "completed") local.statusColor = "greenLine";
97+
else if (local.job.status == "failed") local.statusColor = "redLine";
98+
else if (local.job.status == "processing") local.statusColor = "cyanLine";
99+
else if (local.job.status == "pending") local.statusColor = "yellowLine";
100+
101+
print["#local.statusColor#"](
102+
" [#local.job.status#] #local.job.jobClass# (#local.job.queue#) - #DateTimeFormat(local.job.updatedAt, 'HH:mm:ss')#"
103+
);
104+
}
105+
}
106+
}
107+
108+
// Timeout recoveries
109+
if (StructKeyExists(local.result, "timeoutsRecovered") && local.result.timeoutsRecovered > 0) {
110+
print.line();
111+
print.yellowLine("Recovered #local.result.timeoutsRecovered# timed-out job(s)");
112+
}
113+
}
114+
} catch (any e) {
115+
print.redLine("[#TimeFormat(Now(), "HH:mm:ss")#] Monitor error: #e.message#");
116+
}
117+
118+
sleep(arguments.interval * 1000);
119+
}
120+
}
121+
122+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/**
2+
* Purge old completed or failed jobs from the queue.
3+
*
4+
* {code:bash}
5+
* wheels jobs purge
6+
* wheels jobs purge --completed --older-than=30
7+
* wheels jobs purge --failed --queue=mailers
8+
* wheels jobs purge --completed --failed --force
9+
* {code}
10+
*/
11+
component extends="../../base" {
12+
13+
/**
14+
* @completed Purge completed jobs (default: true)
15+
* @failed Purge failed jobs (default: false)
16+
* @olderThan Delete jobs older than this many days (default: 7)
17+
* @queue Filter by queue name (default: all queues)
18+
* @force Skip confirmation prompt
19+
*/
20+
public void function run(
21+
boolean completed = true,
22+
boolean failed = false,
23+
numeric olderThan = 7,
24+
string queue = "",
25+
boolean force = false
26+
) {
27+
local.appPath = getCWD();
28+
29+
if (!isWheelsApp(local.appPath)) {
30+
error("This command must be run from a Wheels application directory");
31+
return;
32+
}
33+
34+
print.line();
35+
print.boldBlueLine("Purge Jobs");
36+
print.line();
37+
38+
local.totalPurged = 0;
39+
40+
// Purge completed jobs
41+
if (arguments.completed) {
42+
local.urlParams = "&command=jobsPurge&status=completed&days=#arguments.olderThan#";
43+
if (Len(arguments.queue)) {
44+
local.urlParams &= "&queue=#arguments.queue#";
45+
}
46+
47+
local.result = $sendToCliCommand(urlstring = local.urlParams);
48+
if (StructKeyExists(local.result, "success") && local.result.success && StructKeyExists(local.result, "purged")) {
49+
local.totalPurged += local.result.purged;
50+
if (local.result.purged > 0) {
51+
print.greenLine("Purged #local.result.purged# completed job(s) older than #arguments.olderThan# day(s).");
52+
} else {
53+
print.line("No completed jobs to purge.");
54+
}
55+
}
56+
}
57+
58+
// Purge failed jobs
59+
if (arguments.failed) {
60+
local.urlParams = "&command=jobsPurge&status=failed&days=#arguments.olderThan#";
61+
if (Len(arguments.queue)) {
62+
local.urlParams &= "&queue=#arguments.queue#";
63+
}
64+
65+
local.result = $sendToCliCommand(urlstring = local.urlParams);
66+
if (StructKeyExists(local.result, "success") && local.result.success && StructKeyExists(local.result, "purged")) {
67+
local.totalPurged += local.result.purged;
68+
if (local.result.purged > 0) {
69+
print.greenLine("Purged #local.result.purged# failed job(s) older than #arguments.olderThan# day(s).");
70+
} else {
71+
print.line("No failed jobs to purge.");
72+
}
73+
}
74+
}
75+
76+
if (local.totalPurged > 0) {
77+
print.line();
78+
print.boldGreenLine("Total purged: #local.totalPurged# job(s)");
79+
}
80+
81+
print.line();
82+
}
83+
84+
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
/**
2+
* Retry failed jobs by resetting them to pending status.
3+
*
4+
* {code:bash}
5+
* wheels jobs retry
6+
* wheels jobs retry --queue=mailers
7+
* wheels jobs retry --limit=10
8+
* {code}
9+
*/
10+
component extends="../../base" {
11+
12+
/**
13+
* @queue Filter by queue name (default: all queues)
14+
* @limit Maximum number of jobs to retry (default: 0 = all)
15+
*/
16+
public void function run(
17+
string queue = "",
18+
numeric limit = 0
19+
) {
20+
local.appPath = getCWD();
21+
22+
if (!isWheelsApp(local.appPath)) {
23+
error("This command must be run from a Wheels application directory");
24+
return;
25+
}
26+
27+
print.line();
28+
print.boldBlueLine("Retry Failed Jobs");
29+
print.line();
30+
31+
// Build URL parameters
32+
local.urlParams = "&command=jobsRetry";
33+
if (Len(arguments.queue)) {
34+
local.urlParams &= "&queue=#arguments.queue#";
35+
}
36+
if (arguments.limit > 0) {
37+
local.urlParams &= "&limit=#arguments.limit#";
38+
}
39+
40+
local.result = $sendToCliCommand(urlstring = local.urlParams);
41+
if (!local.result.success) {
42+
return;
43+
}
44+
45+
if (StructKeyExists(local.result, "retried")) {
46+
if (local.result.retried > 0) {
47+
print.greenLine("Retried #local.result.retried# failed job(s).");
48+
print.line("Jobs have been reset to 'pending' and will be processed on the next cycle.");
49+
} else {
50+
print.line("No failed jobs to retry.");
51+
}
52+
}
53+
54+
print.line();
55+
}
56+
57+
}

0 commit comments

Comments
 (0)