-
Notifications
You must be signed in to change notification settings - Fork 229
Expand file tree
/
Copy pathbull.js
More file actions
97 lines (83 loc) · 2.04 KB
/
bull.js
File metadata and controls
97 lines (83 loc) · 2.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
const {capitalize} = require('lodash');
const Bull = require('bull');
const Queue = require('./queue');
const Job = require('./job');
const JobData = require('./jobData');
const VALID_STATES = ['waiting', 'active', 'completed', 'failed', 'delayed'];
const SUPPORTED_ACTIONS = ['remove', 'retry'];
class BullJob extends Job {
async remove() {
await this._job.remove();
}
async retry() {
await this._job.retry();
}
async getStatus() {
return this._job.getState();
}
async toJSON() {
const {
id,
name,
data,
attemptsMade,
failedReason,
stacktrace,
returnvalue: returnValue,
timestamp,
delay,
progress
} = this._job.toJSON();
return new JobData({
id,
name,
data,
attemptsMade,
failedReason,
stacktrace,
timestamp,
delay,
progress,
returnValue,
});
}
}
module.exports = class BullQueue extends Queue {
constructor(queueConfig) {
const {name} = queueConfig;
const options = BullQueue.parseConfig(queueConfig);
const queue = Bull(name, options);
super(queue);
}
static parseConfig(queueConfig) {
const options = {redis: this.parseRedisConfig(queueConfig)};
const {createClient, prefix} = queueConfig;
if (createClient) options.createClient = createClient;
if (prefix) options.prefix = prefix;
return options;
}
async getJob(id) {
const job = await this._queue.getJob(id);
return new BullJob(job);
}
async getJobCounts() {
return this._queue.getJobCounts();
}
async getJobs(state, start, size) {
const jobs = await this._queue[`get${capitalize(state)}`](start, start + size - 1);
return jobs.map((j) => new BullJob(j));
}
async addJob(data) {
const job = await this._queue.add(data, {
removeOnComplete: false,
removeOnFail: false
});
return new BullJob(job);
}
isValidState(state) {
return VALID_STATES.includes(state);
}
isActionSupported(action) {
return SUPPORTED_ACTIONS.includes(action);
}
};