Skip to content

Commit 6a0f141

Browse files
authored
Removed job queue manager class (#23167)
ref https://linear.app/ghost/issue/ENG-2028/ We created this persisted queue using a MySQL-based queue to prototype a more robust queuing system, intending to use Redis, RabbitMQ, or the like. We didn't find enough benefit from this and found other constraints, primarily r/w to the db being burdensome, and not enough benefit to justify the move to RMQ/Redis at this time. Given that, we're removing the job queue manager. We've previously removed any calling code, so now we're pulling the class itself and will follow up with a migration (or two) to clean up the jobs table schema and entries.
1 parent 8c48bb0 commit 6a0f141

11 files changed

+7
-1196
lines changed

ghost/core/test/integration/jobs/job-queue.test.js

-119
This file was deleted.

ghost/core/test/integration/jobs/test-job-events.js

-19
This file was deleted.

ghost/core/test/integration/jobs/test-job.js

-7
This file was deleted.

ghost/job-manager/README.md

+3-99
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
# Job Manager
22

3-
A manager for background jobs in Ghost, supporting one-off tasks, recurring jobs, and persistent job queues.
3+
A manager for background jobs in Ghost, supporting one-off tasks and recurring jobs.
44

55
## Table of Contents
66
- [Quick Start](#quick-start)
77
- [Job Types](#job-types)
88
- [Background Job Requirements](#background-job-requirements)
9-
- [Configuration](#configuration)
109
- [Advanced Usage](#advanced-usage)
1110
- [Technical Details](#technical-details)
1211

@@ -29,20 +28,11 @@ jobManager.addJob({
2928
at: 'every 5 minutes',
3029
job: './jobs/check-emails.js'
3130
});
32-
33-
// Persisted background job
34-
jobManager.addQueuedJob({
35-
name: 'update-member-analytics-123',
36-
metadata: {
37-
job: './jobs/update-analytics.js',
38-
data: { memberId: '123' }
39-
}
40-
});
4131
```
4232

4333
## Job Types
4434

45-
Ghost supports three types of jobs:
35+
Ghost supports two types of jobs:
4636

4737
1. **Inline Jobs**
4838
- Run in the main Ghost process
@@ -54,39 +44,16 @@ Ghost supports three types of jobs:
5444
- Good for CPU-intensive tasks
5545
- Can be scheduled or recurring
5646

57-
3. **Persisted Queue Jobs**
58-
- Stored in database
59-
- Survive server restarts
60-
- Best for background tasks
61-
- Can be monitored and retried
62-
6347
## Background Job Requirements
6448

65-
Both offloaded and queued jobs must:
49+
Offloaded jobs must:
6650

6751
- Have a unique name
6852
- Be idempotent (safe to run multiple times)
6953
- Use minimal parameters (prefer using IDs rather than full objects)
7054
- Import their own dependencies
7155
- Primarily use DB or API calls
7256

73-
## Configuration
74-
75-
The job queue can be configured through Ghost's config:
76-
77-
```js
78-
jobs: {
79-
queue: {
80-
enabled: true,
81-
maxWorkers: 1,
82-
pollMinInterval: 1000, // 1 sec
83-
pollMaxInterval: 60000, // 1 min
84-
queueCapacity: 500, // Max queued jobs
85-
fetchCount: 500 // Max jobs per poll
86-
}
87-
}
88-
```
89-
9057
## Advanced Usage
9158

9259
Below is a sample code to wire up job manger and initialize jobs. This is the simplest way to interact with the job manager - these jobs do not persist after reboot:
@@ -182,66 +149,3 @@ Offloaded jobs are running on dedicated worker threads which makes their lifecyc
182149
4. When **exceptions** happen and expected outcome is to terminate current job, leave the exception unhandled allowing it to bubble up to the job manager. Unhandled exceptions [terminate current thread](https://nodejs.org/dist/latest-v14.x/docs/api/worker_threads.html#worker_threads_event_error) and allow for next scheduled job execution to happen.
183150

184151
For more nuances on job structure best practices check [bree documentation](https://github.com/breejs/bree#writing-jobs-with-promises-and-async-await).
185-
186-
### Implementation Notes
187-
For any persisted tasks, the Job Manager has a queue based on the `jobs` table. This table is polled regularly and processed with a single worker, and is ideal for background tasks, e.g. updating member analytics. (see notes below about job types)
188-
189-
```js
190-
// the job manager is typically injected into the consumer service via the service wrapper
191-
// from there
192-
const JobManager = require('../../services/jobs');
193-
...
194-
195-
// ** job submission should be handled by the wrapper service **
196-
// this could be via subscription to events, emitted by the wrapped service(s)
197-
domainEvents.subscribe(MemberEmailAnalyticsUpdateEvent, async (event) => {
198-
const memberId = event.data.memberId;
199-
await JobManager.addQueuedJob({
200-
name: `update-member-email-analytics-${memberId}`,
201-
metadata: {
202-
job: path.resolve(__dirname, 'jobs/update-member-email-analytics'),
203-
name: 'update-member-email-analytics',
204-
data: {
205-
memberId
206-
}
207-
}
208-
});
209-
});
210-
211-
// or it could be passed through as a hook
212-
const handleAnalyticsJobSubmission = async (memberId) => {
213-
await JobManager.addQueuedJob({
214-
name: `update-member-email-analytics-${memberId}`,
215-
metadata: {
216-
job: path.resolve(__dirname, 'jobs/update-member-email-analytics'),
217-
name: 'update-member-email-analytics',
218-
data: {
219-
memberId
220-
}
221-
}
222-
});
223-
}
224-
```
225-
In most cases, jobs should not be submitted by services directly. Because they must import what is needed, it would require too many injected dependencies.
226-
227-
### Adjusting the Job Queue
228-
The queue manager will poll the `jobs` table every minute unless jobs are being actively or were recently processed, where it will instead poll every second.
229-
230-
The job queue has a few other config flags for the number of workers, polling rate, max jobs to process, etc.
231-
232-
```
233-
services: {
234-
jobs: {
235-
queue: {
236-
enabled: true,
237-
reportStats: true,
238-
maxWorkers: 1,
239-
logLevel: 'info' | 'error',
240-
pollMinInterval: 1000, // 1 sec
241-
pollMaxInterval: 60000, // 1 min
242-
queueCapacity: 500, // # of jobs in the process queue at any time
243-
fetchCount: 500 // max # of jobs fetched in each poll
244-
}
245-
}
246-
}
247-
```

ghost/job-manager/lib/JobManager.js

+1-52
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,9 @@ const Bree = require('bree');
77
const pWaitFor = require('p-wait-for');
88
const {UnhandledJobError, IncorrectUsageError} = require('@tryghost/errors');
99
const logging = require('@tryghost/logging');
10-
const metrics = require('@tryghost/metrics');
1110
const isCronExpression = require('./is-cron-expression');
1211
const assembleBreeJob = require('./assemble-bree-job');
1312
const JobsRepository = require('./JobsRepository');
14-
const JobQueueManager = require('./JobQueueManager');
1513

1614
const worker = async (task, callback) => {
1715
try {
@@ -40,7 +38,6 @@ const ALL_STATUSES = {
4038
class JobManager {
4139
#domainEvents;
4240
#completionPromises = new Map();
43-
#jobQueueManager = null;
4441
#config;
4542
#JobModel;
4643
#events;
@@ -52,11 +49,9 @@ class JobManager {
5249
* @param {Object} [options.JobModel] - a model which can persist job data in the storage
5350
* @param {Object} [options.domainEvents] - domain events emitter
5451
* @param {Object} [options.config] - config
55-
* @param {boolean} [options.isDuplicate] - if true, the job manager will not initialize the job queue
56-
* @param {JobQueueManager} [options.jobQueueManager] - job queue manager instance (for testing)
5752
* @param {Object} [options.events] - events instance (for testing)
5853
*/
59-
constructor({errorHandler, workerMessageHandler, JobModel, domainEvents, config, isDuplicate = false, jobQueueManager = null, events = null}) {
54+
constructor({errorHandler, workerMessageHandler, JobModel, domainEvents, config, events = null}) {
6055
this.inlineQueue = fastq(this, worker, 3);
6156
this._jobMessageHandler = this._jobMessageHandler.bind(this);
6257
this._jobErrorHandler = this._jobErrorHandler.bind(this);
@@ -95,19 +90,6 @@ class JobManager {
9590
if (JobModel) {
9691
this._jobsRepository = new JobsRepository({JobModel});
9792
}
98-
99-
if (jobQueueManager) {
100-
this.#jobQueueManager = jobQueueManager;
101-
} else if (!isDuplicate) {
102-
this.#initializeJobQueueManager();
103-
}
104-
}
105-
106-
#initializeJobQueueManager() {
107-
if (this.#config?.get('services:jobs:queue:enabled') === true && !this.#jobQueueManager) {
108-
this.#jobQueueManager = new JobQueueManager({JobModel: this.#JobModel, config: this.#config, eventEmitter: this.#events, metricLogger: metrics});
109-
this.#jobQueueManager.init();
110-
}
11193
}
11294

11395
inlineJobHandler(jobName) {
@@ -128,35 +110,6 @@ class JobManager {
128110
};
129111
}
130112

131-
/**
132-
* @typedef {Object} QueuedJob
133-
* @property {string} name - The name or identifier of the job.
134-
* @property {Object} metadata - Metadata associated with the job.
135-
* @property {string} metadata.job - The absolute path to the job to execute.
136-
* @property {string} metadata.name - The name of the job. Used for metrics.
137-
* @property {Object} metadata.data - The data associated with the job.
138-
*/
139-
140-
/**
141-
* @method addQueuedJob
142-
* @async
143-
* @description Adds a new job to the job repository, which will be polled and executed by the job queue manager.
144-
* @param {QueuedJob} job - The job to be added to the queue.
145-
* @returns {Promise<Object>} The added job model.
146-
*/
147-
async addQueuedJob({name, metadata}) {
148-
// Try to initialize JobQueueManager if it's missing
149-
if (!this.#jobQueueManager) {
150-
this.#initializeJobQueueManager();
151-
}
152-
153-
if (this.#config?.get('services:jobs:queue:enabled') === true && this.#jobQueueManager) {
154-
const model = await this.#jobQueueManager.addJob({name, metadata});
155-
return model;
156-
}
157-
return undefined;
158-
}
159-
160113
async _jobMessageHandler({name, message}) {
161114
if (name) {
162115
if (message === ALL_STATUSES.started) {
@@ -459,10 +412,6 @@ class JobManager {
459412
async shutdown(options) {
460413
await this.bree.stop();
461414

462-
if (this.#jobQueueManager) {
463-
await this.#jobQueueManager.shutdown();
464-
}
465-
466415
if (this.inlineQueue.idle()) {
467416
return;
468417
}

0 commit comments

Comments
 (0)