Skip to content

Commit ef81a3a

Browse files
committed
style: run oxfmt once
1 parent 72851b2 commit ef81a3a

File tree

3 files changed

+95
-72
lines changed

3 files changed

+95
-72
lines changed

README.md

Lines changed: 35 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -123,12 +123,10 @@ Organize related jobs together for monitoring and filtering:
123123

124124
```typescript
125125
// Group newsletter jobs
126-
await SendEmailJob.dispatch({ to: '[email protected]' })
127-
.group('newsletter-jan-2025')
126+
await SendEmailJob.dispatch({ to: '[email protected]' }).group('newsletter-jan-2025')
128127

129128
// Group with bulk dispatch
130-
await SendEmailJob.dispatchMany(recipients)
131-
.group('newsletter-jan-2025')
129+
await SendEmailJob.dispatchMany(recipients).group('newsletter-jan-2025')
132130
```
133131

134132
The `groupId` is stored with job data and accessible via `job.data.groupId`.
@@ -153,7 +151,7 @@ export default class ImportantJob extends Job<Payload> {
153151
<summary><strong>Retention options</strong></summary>
154152

155153
| Value | Behavior |
156-
|-----------------------------|--------------------|
154+
| --------------------------- | ------------------ |
157155
| `true` (default) | Remove immediately |
158156
| `false` | Keep forever |
159157
| `{ count: n }` | Keep last n jobs |
@@ -164,9 +162,9 @@ Query job history:
164162

165163
```typescript
166164
const job = await adapter.getJob('job-id', 'queue-name')
167-
console.log(job.status) // 'completed' | 'failed'
168-
console.log(job.finishedAt) // timestamp
169-
console.log(job.error) // error message (if failed)
165+
console.log(job.status) // 'completed' | 'failed'
166+
console.log(job.finishedAt) // timestamp
167+
console.log(job.error) // error message (if failed)
170168
```
171169

172170
</details>
@@ -256,24 +254,24 @@ const adapter = sync() // Jobs execute immediately
256254
```typescript
257255
export default class MyJob extends Job<Payload> {
258256
static options: JobOptions = {
259-
queue: 'email', // Queue name (default: 'default')
260-
priority: 1, // Lower = higher priority (default: 5)
261-
maxRetries: 3, // Retry attempts before failing
262-
timeout: '30s', // Max execution time
263-
failOnTimeout: true, // Fail permanently on timeout (default: retry)
264-
removeOnComplete: { count: 100 }, // Keep last 100 completed
265-
removeOnFail: { age: '7d' }, // Keep failed for 7 days
257+
queue: 'email', // Queue name (default: 'default')
258+
priority: 1, // Lower = higher priority (default: 5)
259+
maxRetries: 3, // Retry attempts before failing
260+
timeout: '30s', // Max execution time
261+
failOnTimeout: true, // Fail permanently on timeout (default: retry)
262+
removeOnComplete: { count: 100 }, // Keep last 100 completed
263+
removeOnFail: { age: '7d' }, // Keep failed for 7 days
266264
}
267265
}
268266
```
269267

270268
## Delayed Jobs
271269

272270
```typescript
273-
await SendEmailJob.dispatch(payload).in('30s') // 30 seconds
274-
await SendEmailJob.dispatch(payload).in('5m') // 5 minutes
275-
await SendEmailJob.dispatch(payload).in('2h') // 2 hours
276-
await SendEmailJob.dispatch(payload).in('1d') // 1 day
271+
await SendEmailJob.dispatch(payload).in('30s') // 30 seconds
272+
await SendEmailJob.dispatch(payload).in('5m') // 5 minutes
273+
await SendEmailJob.dispatch(payload).in('2h') // 2 hours
274+
await SendEmailJob.dispatch(payload).in('1d') // 1 day
277275
```
278276

279277
## Retry & Backoff
@@ -285,12 +283,13 @@ export default class ReliableJob extends Job<Payload> {
285283
static options: JobOptions = {
286284
maxRetries: 5,
287285
retry: {
288-
backoff: () => exponentialBackoff({
289-
baseDelay: '1s',
290-
maxDelay: '1m',
291-
multiplier: 2,
292-
jitter: true,
293-
}),
286+
backoff: () =>
287+
exponentialBackoff({
288+
baseDelay: '1s',
289+
maxDelay: '1m',
290+
multiplier: 2,
291+
jitter: true,
292+
}),
294293
},
295294
}
296295
}
@@ -356,13 +355,12 @@ Run jobs on a recurring basis:
356355

357356
```typescript
358357
// Every 10 seconds
359-
await MetricsJob.schedule({ endpoint: '/health' })
360-
.every('10s')
358+
await MetricsJob.schedule({ endpoint: '/health' }).every('10s')
361359

362360
// Cron schedule
363361
await CleanupJob.schedule({ days: 30 })
364362
.id('daily-cleanup')
365-
.cron('0 0 * * *') // Midnight daily
363+
.cron('0 0 * * *') // Midnight daily
366364
.timezone('Europe/Paris')
367365
```
368366

@@ -376,7 +374,7 @@ import { Schedule } from '@boringnode/queue'
376374
const schedule = await Schedule.find('daily-cleanup')
377375
await schedule.pause()
378376
await schedule.resume()
379-
await schedule.trigger() // Run now
377+
await schedule.trigger() // Run now
380378
await schedule.delete()
381379

382380
// List schedules
@@ -387,7 +385,7 @@ const active = await Schedule.list({ status: 'active' })
387385
**Schedule options:**
388386

389387
| Method | Description |
390-
|---------------------|-----------------------------------|
388+
| ------------------- | --------------------------------- |
391389
| `.id(string)` | Unique identifier |
392390
| `.every(duration)` | Fixed interval ('5s', '1m', '1h') |
393391
| `.cron(expression)` | Cron schedule |
@@ -437,13 +435,13 @@ export default class SendEmailJob extends Job<SendEmailPayload> {
437435
```typescript
438436
const config = {
439437
worker: {
440-
concurrency: 5, // Parallel jobs
441-
idleDelay: '2s', // Poll interval when idle
442-
timeout: '1m', // Default job timeout
438+
concurrency: 5, // Parallel jobs
439+
idleDelay: '2s', // Poll interval when idle
440+
timeout: '1m', // Default job timeout
443441
stalledThreshold: '30s', // When to consider job stalled
444-
stalledInterval: '30s', // How often to check
445-
maxStalledCount: 1, // Max recoveries before failing
446-
gracefulShutdown: true, // Wait for jobs on SIGTERM
442+
stalledInterval: '30s', // How often to check
443+
maxStalledCount: 1, // Max recoveries before failing
444+
gracefulShutdown: true, // Wait for jobs on SIGTERM
447445
},
448446
}
449447
```
@@ -464,7 +462,7 @@ await QueueManager.init({
464462
Performance comparison with BullMQ (5ms simulated work per job):
465463

466464
| Jobs | Concurrency | @boringnode/queue | BullMQ | Diff |
467-
|------|-------------|-------------------|--------|-------------|
465+
| ---- | ----------- | ----------------- | ------ | ----------- |
468466
| 1000 | 5 | 1096ms | 1116ms | 1.8% faster |
469467
| 1000 | 10 | 565ms | 579ms | 2.4% faster |
470468
| 100K | 10 | 56.2s | 57.5s | 2.1% faster |

compose.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@ services:
22
redis:
33
image: redis:alpine
44
ports:
5-
- "6379:6379"
5+
- '6379:6379'
66

77
postgres:
88
image: postgres:alpine
99
ports:
10-
- "5432:5432"
10+
- '5432:5432'
1111
environment:
1212
POSTGRES_USER: postgres
1313
POSTGRES_PASSWORD: postgres
14-
POSTGRES_DB: queue_test
14+
POSTGRES_DB: queue_test

src/drivers/knex_adapter.ts

Lines changed: 57 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,10 @@ export class KnexAdapter implements Adapter {
123123
table.integer('run_count').unsigned().notNullable().defaultTo(0)
124124
table.timestamp('next_run_at').nullable()
125125
table.timestamp('last_run_at').nullable()
126-
table.timestamp('created_at').notNullable().defaultTo(this.#connection.fn.now())
126+
table
127+
.timestamp('created_at')
128+
.notNullable()
129+
.defaultTo(this.#connection.fn.now())
127130
// Indexes
128131
table.index(['status', 'next_run_at'])
129132
})
@@ -178,7 +181,9 @@ export class KnexAdapter implements Adapter {
178181

179182
// Update job to active status
180183
// For SQLite (no SKIP LOCKED), add status='pending' guard to prevent double-claim
181-
const updateQuery = trx(this.#jobsTable).where('id', job.id).where('queue', queue)
184+
const updateQuery = trx(this.#jobsTable)
185+
.where('id', job.id)
186+
.where('queue', queue)
182187

183188
if (!this.#supportsSkipLocked()) {
184189
updateQuery.where('status', 'pending')
@@ -237,11 +242,14 @@ export class KnexAdapter implements Adapter {
237242
const priority = jobData.priority ?? DEFAULT_PRIORITY
238243
const score = calculateScore(priority, now)
239244

240-
await trx(this.#jobsTable).where('id', job.id).where('queue', queue).update({
241-
status: 'pending',
242-
score,
243-
execute_at: null,
244-
})
245+
await trx(this.#jobsTable)
246+
.where('id', job.id)
247+
.where('queue', queue)
248+
.update({
249+
status: 'pending',
250+
score,
251+
execute_at: null,
252+
})
245253
}
246254
})
247255
}
@@ -395,27 +403,33 @@ export class KnexAdapter implements Adapter {
395403

396404
if (retryAt && retryAt.getTime() > now) {
397405
// Move to delayed
398-
await this.#connection(this.#jobsTable).where('id', jobId).where('queue', queue).update({
399-
status: 'delayed',
400-
data: updatedData,
401-
worker_id: null,
402-
acquired_at: null,
403-
score: null,
404-
execute_at: retryAt.getTime(),
405-
})
406+
await this.#connection(this.#jobsTable)
407+
.where('id', jobId)
408+
.where('queue', queue)
409+
.update({
410+
status: 'delayed',
411+
data: updatedData,
412+
worker_id: null,
413+
acquired_at: null,
414+
score: null,
415+
execute_at: retryAt.getTime(),
416+
})
406417
} else {
407418
// Move back to pending
408419
const priority = jobData.priority ?? DEFAULT_PRIORITY
409420
const score = calculateScore(priority, now)
410421

411-
await this.#connection(this.#jobsTable).where('id', jobId).where('queue', queue).update({
412-
status: 'pending',
413-
data: updatedData,
414-
worker_id: null,
415-
acquired_at: null,
416-
score,
417-
execute_at: null,
418-
})
422+
await this.#connection(this.#jobsTable)
423+
.where('id', jobId)
424+
.where('queue', queue)
425+
.update({
426+
status: 'pending',
427+
data: updatedData,
428+
worker_id: null,
429+
acquired_at: null,
430+
score,
431+
execute_at: null,
432+
})
419433
}
420434
}
421435

@@ -526,7 +540,10 @@ export class KnexAdapter implements Adapter {
526540

527541
if (currentStalledCount >= maxStalledCount) {
528542
// Fail permanently - remove the job
529-
await trx(this.#jobsTable).where('id', row.id).where('queue', queue).delete()
543+
await trx(this.#jobsTable)
544+
.where('id', row.id)
545+
.where('queue', queue)
546+
.delete()
530547
} else {
531548
// Recover: increment stalledCount and put back in pending
532549
jobData.stalledCount = currentStalledCount + 1
@@ -596,7 +613,9 @@ export class KnexAdapter implements Adapter {
596613
async getSchedule(id: string): Promise<ScheduleData | null> {
597614
await this.#ensureTables()
598615

599-
const row = await this.#connection(this.#schedulesTable).where('id', id).first()
616+
const row = await this.#connection(this.#schedulesTable)
617+
.where('id', id)
618+
.first()
600619
if (!row) return null
601620

602621
return this.#rowToScheduleData(row)
@@ -629,14 +648,18 @@ export class KnexAdapter implements Adapter {
629648
if (updates.runCount !== undefined) data.run_count = updates.runCount
630649

631650
if (Object.keys(data).length > 0) {
632-
await this.#connection(this.#schedulesTable).where('id', id).update(data)
651+
await this.#connection(this.#schedulesTable)
652+
.where('id', id)
653+
.update(data)
633654
}
634655
}
635656

636657
async deleteSchedule(id: string): Promise<void> {
637658
await this.#ensureTables()
638659

639-
await this.#connection(this.#schedulesTable).where('id', id).delete()
660+
await this.#connection(this.#schedulesTable)
661+
.where('id', id)
662+
.delete()
640663
}
641664

642665
async claimDueSchedule(): Promise<ScheduleData | null> {
@@ -693,11 +716,13 @@ export class KnexAdapter implements Adapter {
693716
}
694717

695718
// Update atomically
696-
await trx(this.#schedulesTable).where('id', row.id).update({
697-
next_run_at: nextRunAt,
698-
last_run_at: now,
699-
run_count: newRunCount,
700-
})
719+
await trx(this.#schedulesTable)
720+
.where('id', row.id)
721+
.update({
722+
next_run_at: nextRunAt,
723+
last_run_at: now,
724+
run_count: newRunCount,
725+
})
701726

702727
// Return schedule data (before update state for payload)
703728
return this.#rowToScheduleData(row)

0 commit comments

Comments
 (0)