-
Notifications
You must be signed in to change notification settings - Fork 229
Refactors code to support inheritance #198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 2 commits
010b1e4
68021e8
34b9922
0a2f7d1
584b3e0
69e0aaf
dd8532d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,91 @@ | ||
| const Queue = require('./queue'); | ||
| const Job = require('./job'); | ||
| const JobData = require('./jobData'); | ||
|
|
||
| class BeeJob extends Job { | ||
| constructor(job) { | ||
| super(job); | ||
| } | ||
|
|
||
| async remove() { | ||
| await this._job.remove(); | ||
| } | ||
|
|
||
| async getStatus() { | ||
| return Promise.resolve(this._job.status); | ||
|
altaiezior marked this conversation as resolved.
Outdated
|
||
| } | ||
|
|
||
| async toJSON() { | ||
| const {id, progress, data, options: {timestamp, stacktraces: stacktrace, delay}} = this._job; | ||
| return new JobData({id, progress, data, timestamp, stacktrace, delay}); | ||
| } | ||
| } | ||
|
|
||
| const VALID_STATES = ['waiting', 'active', 'succeeded', 'failed', 'delayed']; | ||
| const SUPPORTED_ACTIONS = ['remove']; | ||
|
|
||
| module.exports = class BeeQueue extends Queue { | ||
| constructor(queueConfig) { | ||
| const {name} = queueConfig; | ||
| const options = BeeQueue.parseConfig(queueConfig); | ||
| const queue = new BeeQueue(name, options); | ||
| super(queue); | ||
| } | ||
|
|
||
| static parseConfig(queueConfig) { | ||
| const options = { | ||
| redis: this.parseRedisConfig(queueConfig), | ||
| isWorker: false, | ||
| getEvents: false, | ||
| sendEvents: false, | ||
| storeJobs: false, | ||
| }; | ||
| const {prefix} = queueConfig; | ||
| if (prefix) options.prefix = prefix; | ||
| return options; | ||
| } | ||
|
|
||
| async getJob(id) { | ||
| const job = this._queue.getJob(id); | ||
| return new BeeJob(job); | ||
| } | ||
|
|
||
| async getJobCounts() { | ||
| const jobCounts = this._queue.checkHealth(); | ||
| delete jobCounts.newestJob; | ||
| return jobCounts; | ||
| } | ||
|
|
||
| async getJobs(state, start, size) { | ||
| const page = {}; | ||
|
|
||
| if (['failed', 'succeeded'].includes(state)) { | ||
| page.size = size; | ||
| } else { | ||
| page.start = start; | ||
| page.end = start + size - 1; | ||
| } | ||
|
|
||
| let jobs = await this._queue.getJobs(state, page); | ||
| // Filter out Bee jobs that have already been removed by the time the promise resolves | ||
| jobs = jobs.filter((job) => job); | ||
| return jobs.map((j) => new BeeJob(j)); | ||
| } | ||
|
|
||
| async addJob(data) { | ||
| const job = await this._queue.createJob(data).save(); | ||
| return new BeeJob(job); | ||
| } | ||
|
|
||
| isValidState(state) { | ||
| return VALID_STATES.includes(state); | ||
| } | ||
|
|
||
| isActionSupported(action) { | ||
| return SUPPORTED_ACTIONS.includes(action); | ||
| } | ||
|
|
||
| isPaginationSupported(state) { | ||
| return state !== 'succeeded' && state !== 'failed'; | ||
| } | ||
| }; | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,101 @@ | ||
| const {capitalize} = require('lodash'); | ||
| const Bull = require('bull'); | ||
| const Queue = require('./queue'); | ||
| const Job = require('./job'); | ||
| const JobData = require('./jobData'); | ||
|
|
||
| const VALID_STATES = ['waiting', 'active', 'completed', 'failed', 'delayed']; | ||
| const SUPPORTED_ACTIONS = ['remove', 'retry']; | ||
|
|
||
| class BullJob extends Job { | ||
| constructor(job) { | ||
| super(job); | ||
| } | ||
|
|
||
| async remove() { | ||
| await this._job.remove(); | ||
| } | ||
|
|
||
| async retry() { | ||
| await this._job.retry(); | ||
| } | ||
|
|
||
| async getStatus() { | ||
| return this._job.getState(); | ||
| } | ||
|
|
||
|
|
||
| async toJSON() { | ||
| const { | ||
| id, | ||
| name, | ||
| data, | ||
| attemptsMade, | ||
| failedReason, | ||
| stacktrace, | ||
| returnvalue: returnValue, | ||
| timestamp, | ||
| delay, | ||
| progress | ||
| } = this._job.toJSON(); | ||
| return new JobData({ | ||
| id, | ||
| name, | ||
| data, | ||
| attemptsMade, | ||
| failedReason, | ||
| stacktrace, | ||
| timestamp, | ||
| delay, | ||
| progress, | ||
| returnValue, | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| module.exports = class BullQueue extends Queue { | ||
| constructor(queueConfig) { | ||
| const {name} = queueConfig; | ||
| const options = BullQueue.parseConfig(queueConfig); | ||
| const queue = Bull(name, options); | ||
| super(queue); | ||
| } | ||
|
|
||
| static parseConfig(queueConfig) { | ||
| const options = {redis: this.parseRedisConfig(queueConfig)}; | ||
| const {createClient, prefix} = queueConfig; | ||
| if (createClient) options.createClient = createClient; | ||
| if (prefix) options.prefix = prefix; | ||
| return options; | ||
| } | ||
|
|
||
| async getJob(id) { | ||
| const job = await this._queue.getJob(id); | ||
| return new BullJob(job); | ||
| } | ||
|
|
||
| async getJobCounts() { | ||
| return this._queue.getJobCounts(); | ||
| } | ||
|
|
||
| async getJobs(state, start, size) { | ||
| const jobs = await this._queue[`get${capitalize(state)}`](start, start + size - 1); | ||
| return jobs.map((j) => new BullJob(j)); | ||
| } | ||
|
|
||
| async addJob(data) { | ||
| const job = await this._queue.add(data, { | ||
| removeOnComplete: false, | ||
| removeOnFail: false | ||
| }); | ||
| return new BullJob(job); | ||
| } | ||
|
|
||
| isValidState(state) { | ||
| return VALID_STATES.includes(state); | ||
| } | ||
|
|
||
| isActionSupported(action) { | ||
| return SUPPORTED_ACTIONS.includes(action); | ||
| } | ||
| }; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| module.exports = class Job { | ||
| constructor(job) { | ||
| this._job = job; | ||
| if (new.target === Job) { | ||
| throw new TypeError("Cannot construct Job instances directly"); | ||
| } | ||
| } | ||
|
|
||
| async remove() { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we have these abstract methods throw a not implemented exception? This would both make it clear that these should be overridden, and avoid weird cases down the road where we instantiate a
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done, just check once. Didn't use a message because the stacktrace should already cover the method name from where the error originated. |
||
| } | ||
|
|
||
| async getStatus() { | ||
| } | ||
|
|
||
| async toJSON() { | ||
| } | ||
| }; | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,16 @@ | ||
| module.exports = class JobData { | ||
| constructor({id, name, data, stacktrace, timestamp, progress, delay, attemptsMade, returnValue, failedReason}) { | ||
| this.id = id; | ||
| this.name = name; | ||
| this.data = data; | ||
| this.progress = progress; | ||
| this.attemptsMade = attemptsMade; | ||
| this.returnValue = returnValue; | ||
| this.failedReason = failedReason; | ||
| this.options = { | ||
| stacktrace, | ||
| timestamp, | ||
| delay, | ||
| }; | ||
| } | ||
| }; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,43 @@ | ||
| module.exports = class Queue { | ||
| constructor(queue) { | ||
| this._queue = queue; | ||
| if (new.target === Queue) { | ||
| throw new TypeError("Cannot construct Queue instances directly"); | ||
| } | ||
| } | ||
|
|
||
| static parseRedisConfig({port, host, db, password, url, redis, tls}) { | ||
| const redisHost = {host}; | ||
| if (password) redisHost.password = password; | ||
| if (port) redisHost.port = port; | ||
| if (db) redisHost.db = db; | ||
| if (tls) redisHost.tls = tls; | ||
| return redis || url || redisHost; | ||
| } | ||
|
|
||
| get redisClient() { | ||
| return this._queue.client; | ||
| } | ||
|
|
||
| async getJob(id) { | ||
| } | ||
|
|
||
| async getJobCounts() { | ||
| } | ||
|
|
||
| async getJobs(state, start, size) { | ||
| } | ||
|
|
||
| async addJob(data, options) { | ||
| } | ||
|
|
||
| isValidState(state) { | ||
| } | ||
|
|
||
| isActionSupported(action) { | ||
| } | ||
|
|
||
| isPaginationSupported(_state) { | ||
| return true; | ||
| } | ||
| }; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,23 +1 @@ | ||
| async function handler(req, res) { | ||
| const { queueName, queueHost, id } = req.params; | ||
|
|
||
| const {Queues} = req.app.locals; | ||
| const queue = await Queues.get(queueName, queueHost); | ||
| if (!queue) return res.status(404).send({error: 'queue not found'}); | ||
|
|
||
| const job = await queue.getJob(id); | ||
| if (!job) return res.status(404).send({error: 'job not found'}); | ||
|
|
||
| try { | ||
| await job.remove(); | ||
| return res.sendStatus(200); | ||
| } catch (e) { | ||
| const body = { | ||
| error: 'queue error', | ||
| details: e.stack | ||
| }; | ||
| return res.status(500).send(body); | ||
| } | ||
| } | ||
|
|
||
| module.exports = handler; | ||
| module.exports = require('./performAction')('remove'); |
Uh oh!
There was an error while loading. Please reload this page.