Skip to content

Commit 33f34b2

Browse files
feat(job-cleaning): add support for cleaning scheduled jobs (#1020)
* add support for cleaning scheduled jobs * clean up
1 parent 0921816 commit 33f34b2

File tree

6 files changed

+382
-11
lines changed

6 files changed

+382
-11
lines changed

packages/api/src/handlers/cleanJob.ts

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,39 @@
1-
import {
2-
BullBoardRequest,
3-
ControllerHandlerReturnType,
4-
QueueJob,
5-
} from '../../typings/app';
1+
import { BullBoardRequest, ControllerHandlerReturnType, QueueJob } from '../../typings/app';
62
import { jobProvider } from '../providers/job';
73
import { queueProvider } from '../providers/queue';
4+
import { BaseAdapter } from '../queueAdapters/base';
5+
6+
function extractRepeatJobKey(job: QueueJob): string | undefined {
7+
const key = job.repeatJobKey;
8+
9+
if (typeof key === 'string' && key.length > 0) {
10+
return key;
11+
}
12+
}
813

914
async function cleanJob(
1015
_req: BullBoardRequest,
11-
job: QueueJob
16+
job: QueueJob,
17+
queue: BaseAdapter
1218
): Promise<ControllerHandlerReturnType> {
13-
await job.remove();
19+
const repeatJobKey = extractRepeatJobKey(job);
1420

21+
if (repeatJobKey) {
22+
const removed = await queue.removeJobScheduler(repeatJobKey);
23+
24+
if (!removed) {
25+
throw new Error(
26+
`Failed to remove scheduler ${repeatJobKey} for job ${job.toJSON().id ?? 'unknown id'}.`
27+
);
28+
}
29+
30+
return {
31+
status: 204,
32+
body: {},
33+
};
34+
}
35+
36+
await job.remove();
1537
return {
1638
status: 204,
1739
body: {},

packages/api/src/queueAdapters/base.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,8 @@ export abstract class BaseAdapter {
9797

9898
public abstract promoteAll(): Promise<void>;
9999

100+
public abstract removeJobScheduler(id: string): Promise<boolean>;
101+
100102
public abstract getStatuses(): Status[];
101103

102104
public abstract getJobStatuses(): JobStatus[];

packages/api/src/queueAdapters/bull.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import { Job, Queue } from 'bull';
2-
import BullQueue from 'bull';
1+
import BullQueue, { Job, Queue } from 'bull';
2+
33
import {
44
JobCleanStatus,
55
JobCounts,
@@ -12,7 +12,10 @@ import { STATUSES } from '../constants/statuses';
1212
import { BaseAdapter } from './base';
1313

1414
export class BullAdapter extends BaseAdapter {
15-
constructor(public queue: Queue, options: Partial<QueueAdapterOptions> = {}) {
15+
constructor(
16+
public queue: Queue,
17+
options: Partial<QueueAdapterOptions> = {}
18+
) {
1619
super('bull', { ...options, allowCompletedRetries: false });
1720

1821
if (!(queue instanceof BullQueue)) {
@@ -79,6 +82,10 @@ export class BullAdapter extends BaseAdapter {
7982
await Promise.all(jobs.map((job) => job.promote()));
8083
}
8184

85+
public async removeJobScheduler(_id: string): Promise<boolean> {
86+
return false;
87+
}
88+
8289
public getStatuses(): Status<'bull'>[] {
8390
return [
8491
STATUSES.latest,

packages/api/src/queueAdapters/bullMQ.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { Job, Queue } from 'bullmq';
2+
23
import {
34
JobCleanStatus,
45
JobCounts,
@@ -11,7 +12,10 @@ import { STATUSES } from '../constants/statuses';
1112
import { BaseAdapter } from './base';
1213

1314
export class BullMQAdapter extends BaseAdapter {
14-
constructor(private queue: Queue, options: Partial<QueueAdapterOptions> = {}) {
15+
constructor(
16+
private queue: Queue,
17+
options: Partial<QueueAdapterOptions> = {}
18+
) {
1519
const libName = 'bullmq';
1620
super(libName, options);
1721
if (
@@ -80,6 +84,10 @@ export class BullMQAdapter extends BaseAdapter {
8084
}
8185
}
8286

87+
public removeJobScheduler(id: string): Promise<boolean> {
88+
return this.queue.removeJobScheduler(id);
89+
}
90+
8391
public getStatuses(): Status[] {
8492
return [
8593
STATUSES.latest,

0 commit comments

Comments
 (0)