Skip to content

Commit fee9c7a

Browse files
authored
Moved gift subscription cron jobs to main thread via domain events (TryGhost#27639)
closes TryGhost#27638 - gift cleanup and reminder jobs were running in Bree worker threads with no service init, forcing each worker to manually re-init every transitive dependency (settings, stripe, members, tiers, email-address) before calling into the gift service - the two job files had drifted apart with different and incomplete init lists, encoding boot order by hand - replaced both worker bodies with a thin trigger that posts a domain event back to the main thread, where the `GiftServiceWrapper` subscribes and runs the work in the already-initialised process - this is the same pattern used by `email-analytics/fetch-latest` and `outbox-job` since 2023, and unifies the cron path with the existing admin API flush endpoint which already used `StartGiftReminderFlushEvent` - added a sibling `StartGiftCleanupEvent` for the `processConsumed` + `processExpired` flow
1 parent b2c1498 commit fee9c7a

10 files changed

Lines changed: 189 additions & 141 deletions

File tree

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
module.exports = class StartGiftCleanupEvent {
2+
/**
3+
* @param {any} data
4+
* @param {Date} timestamp
5+
*/
6+
constructor(data, timestamp) {
7+
this.data = data;
8+
this.timestamp = timestamp;
9+
}
10+
11+
/**
12+
* @param {any} [data]
13+
* @param {Date} [timestamp]
14+
*/
15+
static create(data, timestamp) {
16+
return new StartGiftCleanupEvent(data, timestamp ?? new Date());
17+
}
18+
};
Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,18 @@
11
module.exports = class StartGiftReminderFlushEvent {
22
/**
3+
* @param {any} data
34
* @param {Date} timestamp
45
*/
5-
constructor(timestamp) {
6-
this.data = null;
6+
constructor(data, timestamp) {
7+
this.data = data;
78
this.timestamp = timestamp;
89
}
910

1011
/**
11-
* @returns {StartGiftReminderFlushEvent}
12+
* @param {any} [data]
13+
* @param {Date} [timestamp]
1214
*/
13-
static create() {
14-
return new StartGiftReminderFlushEvent(new Date());
15+
static create(data, timestamp) {
16+
return new StartGiftReminderFlushEvent(data, timestamp ?? new Date());
1517
}
1618
};

ghost/core/core/server/services/gifts/gift-service-wrapper.js

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ class GiftServiceWrapper {
4141
const logging = require('@tryghost/logging');
4242
const {SubscriptionActivatedEvent} = require('../../../shared/events');
4343
const StartGiftReminderFlushEvent = require('./events/start-gift-reminder-flush-event');
44+
const StartGiftCleanupEvent = require('./events/start-gift-cleanup-event');
45+
const jobs = require('./jobs');
4446

4547
const {GhostMailer} = require('../mail');
4648
const settingsCache = require('../../../shared/settings-cache');
@@ -100,13 +102,41 @@ class GiftServiceWrapper {
100102
});
101103

102104
DomainEvents.subscribe(StartGiftReminderFlushEvent, async () => {
105+
const start = Date.now();
103106
try {
104-
await this.service.processReminders();
107+
const {remindedCount, skippedCount, failedCount} = await this.service.processReminders();
108+
109+
logging.info(`Sent ${remindedCount} gift reminders, skipped ${skippedCount}, failed ${failedCount} in ${Date.now() - start}ms`);
110+
} catch (err) {
111+
logging.error(err, 'Failed to process gift reminders');
112+
}
113+
});
114+
115+
DomainEvents.subscribe(StartGiftCleanupEvent, async () => {
116+
const consumedStart = Date.now();
117+
try {
118+
const {consumedCount, updatedMemberCount} = await this.service.processConsumed();
119+
120+
logging.info(`Consumed ${consumedCount} gifts, updated ${updatedMemberCount} members in ${Date.now() - consumedStart}ms`);
105121
} catch (err) {
106-
logging.error(err, 'Failed to flush gift reminders');
122+
logging.error(err, 'Failed to process consumed gifts');
123+
}
124+
125+
const expiredStart = Date.now();
126+
try {
127+
const {expiredCount} = await this.service.processExpired();
128+
129+
logging.info(`Expired ${expiredCount} gifts in ${Date.now() - expiredStart}ms`);
130+
} catch (err) {
131+
logging.error(err, 'Failed to process expired gifts');
107132
}
108133
});
109134

135+
if (labsService.isSet('giftSubscriptions')) {
136+
jobs.scheduleGiftCleanupJob();
137+
jobs.scheduleGiftReminderJob();
138+
}
139+
110140
this.#initialized = true;
111141
}
112142
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
const {parentPort} = require('node:worker_threads');
2+
const StartGiftCleanupEvent = require('../events/start-gift-cleanup-event');
3+
4+
// Recurring job to clean consumed and expired gifts. The actual work runs on
5+
// the main thread via a domain event so we don't have to re-initialize every
6+
// service dependency inside this worker
7+
8+
// Exit early when cancelled to prevent stalling shutdown. No cleanup needed
9+
// when cancelling as everything is idempotent and will pick up where it left
10+
// off on next run
11+
function cancel() {
12+
if (parentPort) {
13+
parentPort.postMessage('Gift cleanup job cancelled before completion');
14+
parentPort.postMessage('cancelled');
15+
} else {
16+
setTimeout(() => {
17+
process.exit(0);
18+
}, 1000);
19+
}
20+
}
21+
22+
if (parentPort) {
23+
parentPort.once('message', (message) => {
24+
if (message === 'cancel') {
25+
return cancel();
26+
}
27+
});
28+
}
29+
30+
(async () => {
31+
if (parentPort) {
32+
// Bounce to the main thread via the JobManager's event bridge - the
33+
// GiftServiceWrapper subscribes to this event and runs the work in the
34+
// already-initialised main process
35+
parentPort.postMessage({
36+
event: {
37+
type: StartGiftCleanupEvent.name
38+
}
39+
});
40+
parentPort.postMessage('done');
41+
} else {
42+
setTimeout(() => {
43+
process.exit(0);
44+
}, 1000);
45+
}
46+
})();
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
const path = require('path');
2+
const jobsService = require('../../jobs');
3+
4+
let hasScheduled = {
5+
cleanup: false,
6+
reminders: false
7+
};
8+
9+
function scheduleJob(key, name, jobFile) {
10+
if (hasScheduled[key] || process.env.NODE_ENV?.startsWith('test')) {
11+
return hasScheduled[key];
12+
}
13+
14+
// randomise the schedule so the job doesn't fire at the same instant
15+
// across every Ghost instance - spreads load across the day and avoids
16+
// DB spikes on the hour. Hour is bounded to a 0-5am off-peak window.
17+
const s = Math.floor(Math.random() * 60);
18+
const m = Math.floor(Math.random() * 60);
19+
const h = Math.floor(Math.random() * 6);
20+
21+
jobsService.addJob({
22+
at: `${s} ${m} ${h} * * *`,
23+
job: path.resolve(__dirname, jobFile),
24+
name
25+
});
26+
27+
hasScheduled[key] = true;
28+
29+
return true;
30+
}
31+
32+
module.exports = {
33+
scheduleGiftCleanupJob() {
34+
return scheduleJob('cleanup', 'clean-gifts', 'clean-gifts-job.js');
35+
},
36+
37+
scheduleGiftReminderJob() {
38+
return scheduleJob('reminders', 'send-gift-reminders', 'send-gift-reminders-job.js');
39+
}
40+
};
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
const {parentPort} = require('node:worker_threads');
2+
const StartGiftReminderFlushEvent = require('../events/start-gift-reminder-flush-event');
3+
4+
// Recurring job to send gift reminder emails. The actual work runs on the
5+
// main thread via a domain event so we don't have to re-initialize every
6+
// service dependency inside this worker
7+
8+
// Exit early when cancelled to prevent stalling shutdown. No cleanup needed
9+
// when cancelling as everything is idempotent - any gift that hasn't yet had
10+
// its reminder recorded will be picked up on the next run
11+
function cancel() {
12+
if (parentPort) {
13+
parentPort.postMessage('Gift reminder job cancelled before completion');
14+
parentPort.postMessage('cancelled');
15+
} else {
16+
setTimeout(() => {
17+
process.exit(0);
18+
}, 1000);
19+
}
20+
}
21+
22+
if (parentPort) {
23+
parentPort.once('message', (message) => {
24+
if (message === 'cancel') {
25+
return cancel();
26+
}
27+
});
28+
}
29+
30+
(async () => {
31+
if (parentPort) {
32+
// Bounce to the main thread via the JobManager's event bridge - the
33+
// GiftServiceWrapper subscribes to this event and runs the work in the
34+
// already-initialised main process
35+
parentPort.postMessage({
36+
event: {
37+
type: StartGiftReminderFlushEvent.name
38+
}
39+
});
40+
parentPort.postMessage('done');
41+
} else {
42+
setTimeout(() => {
43+
process.exit(0);
44+
}, 1000);
45+
}
46+
})();

ghost/core/core/server/services/members/jobs/clean-gifts.js

Lines changed: 0 additions & 59 deletions
This file was deleted.

ghost/core/core/server/services/members/jobs/index.js

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ const jobsService = require('../../jobs');
33

44
let hasScheduled = {
55
expiredComped: false,
6-
giftCleanup: false,
7-
giftReminders: false,
86
tokens: false
97
};
108

@@ -33,14 +31,6 @@ module.exports = {
3331
return scheduleJob('expiredComped', 'clean-expired-comped', 'clean-expired-comped.js');
3432
},
3533

36-
async scheduleGiftCleanupJob() {
37-
return scheduleJob('giftCleanup', 'clean-gifts', 'clean-gifts.js');
38-
},
39-
40-
async scheduleGiftReminderJob() {
41-
return scheduleJob('giftReminders', 'send-gift-reminders', 'send-gift-reminders.js');
42-
},
43-
4434
async scheduleTokenCleanupJob() {
4535
return scheduleJob('tokens', 'clean-tokens', 'clean-tokens.js', 24);
4636
}

ghost/core/core/server/services/members/jobs/send-gift-reminders.js

Lines changed: 0 additions & 58 deletions
This file was deleted.

ghost/core/core/server/services/members/service.js

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -168,13 +168,6 @@ module.exports = {
168168

169169
// Schedule daily cron job to clean expired tokens
170170
memberJobs.scheduleTokenCleanupJob();
171-
172-
// Schedule daily cron jobs to clean up consumed/expired gifts and to
173-
// send gift reminder emails.
174-
if (labsService.isSet('giftSubscriptions')) {
175-
memberJobs.scheduleGiftCleanupJob();
176-
memberJobs.scheduleGiftReminderJob();
177-
}
178171
},
179172
contentGating: require('./content-gating'),
180173

0 commit comments

Comments
 (0)