Skip to content

Commit d175693

Browse files
committed
Added "fetch and lock steps", a necessary operation for automations
towards https://linear.app/ghost/issue/NY-1286 ref #28120 To run an automation, we will: 1. Fetch (and lock) up to 100 steps. 2. If no steps are found, enqueue another poll for later, and stop. 3. Run all the steps. 4. Possibly enqueue another poll for later. This patch implements the first of those procedures. (If you want to see more fully how it will be used, see [this PR][0].) [0]: #28120
1 parent 24244bb commit d175693

7 files changed

Lines changed: 630 additions & 8 deletions

File tree

ghost/core/core/server/services/automations/automations-api.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,6 +264,10 @@ export async function trigger(options: TriggerOptions) {
264264
requestPoll();
265265
}
266266

267+
export async function fetchAndLockSteps(...args: Parameters<AutomationsRepository['fetchAndLockSteps']>) {
268+
return await repository.fetchAndLockSteps(...args);
269+
}
270+
267271
export function _resetTestDatabase() {
268272
if (process.env.NODE_ENV?.startsWith('testing')) {
269273
testDatabase = null;

ghost/core/core/server/services/automations/automations-repository.ts

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,31 @@ export interface EditAutomationData {
6262
edges: AutomationEdge[];
6363
}
6464

65+
type AutomationStepBase = {
66+
id: string;
67+
step_attempts: number;
68+
automation_run_id: string;
69+
automation_status: 'inactive' | 'active';
70+
member_id: string | null;
71+
member_email: string;
72+
action_id: string;
73+
};
74+
75+
export type AutomationStepToRun = AutomationStepBase & (
76+
{
77+
type: 'wait';
78+
wait_hours: number;
79+
} | {
80+
type: 'send_email';
81+
email_subject: string;
82+
email_lexical: string;
83+
email_sender_name: string | null;
84+
email_sender_email: string | null;
85+
email_sender_reply_to: string | null;
86+
email_design_setting_id: string | null;
87+
}
88+
);
89+
6590
export interface AutomationsRepository {
6691
browse(): Promise<Page<AutomationSummary>>;
6792
getById(id: string): Promise<Automation | null>;
@@ -71,4 +96,21 @@ export interface AutomationsRepository {
7196
memberId: string;
7297
memberStatus: 'free' | 'paid';
7398
}): Promise<void>;
99+
/**
100+
* Select the steps we want to run. If no steps are found, returns the time
101+
* we should try again, if any.
102+
*
103+
* If we could guarantee this function would only be called once ever, it'd
104+
* be pretty simple! However, we want to handle cases where this function is
105+
* called multiple times simultaneously (maybe in different processes
106+
* querying the same database). That's why we implement a row-level locking
107+
* mechanism, hence the word "lock" in the name of this function.
108+
*/
109+
fetchAndLockSteps(limit: number): Promise<{
110+
steps: AutomationStepToRun[],
111+
nextStepReadyAt: null;
112+
} | {
113+
steps: never[],
114+
nextStepReadyAt: Date | null;
115+
}>;
74116
}

ghost/core/core/server/services/automations/fake-database-automations-repository.ts

Lines changed: 181 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import errors from '@tryghost/errors';
22
import tpl from '@tryghost/tpl';
3+
import crypto from 'node:crypto';
34
import ObjectId from 'bson-objectid';
45
import {dequal} from 'dequal';
56
import type {DatabaseSync} from 'node:sqlite';
@@ -9,12 +10,14 @@ import type {
910
AutomationAction,
1011
AutomationEdge,
1112
AutomationSummary,
13+
AutomationStepToRun,
1214
AutomationsRepository,
1315
EditAutomationData,
1416
Page
1517
} from './automations-repository';
16-
import type {ExclusifyUnion} from 'type-fest';
18+
import type {ExclusifyUnion, ReadonlyDeep} from 'type-fest';
1719

20+
const LOCK_TIMEOUT_MS = 30 * 60 * 1000;
1821
const HOUR_MS = 60 * 60 * 1000;
1922

2023
const messages = {
@@ -61,6 +64,24 @@ interface EdgeRow {
6164
target_action_id: string;
6265
}
6366

67+
type StepRow = {
68+
id: string;
69+
step_attempts: number;
70+
automation_run_id: string;
71+
automation_status: 'inactive' | 'active';
72+
member_id: string | null;
73+
member_email: string;
74+
action_id: string;
75+
type: string;
76+
wait_hours: number | null;
77+
email_subject: string | null;
78+
email_lexical: string | null;
79+
email_sender_name: string | null;
80+
email_sender_email: string | null;
81+
email_sender_reply_to: string | null;
82+
email_design_setting_id: string | null;
83+
};
84+
6485
type NextActionRevisionRow = {
6586
automation_id: string;
6687
action_id: string;
@@ -141,6 +162,18 @@ export function createFakeDatabaseAutomationsRepository({
141162
const database = getDatabase();
142163

143164
return withTransaction(database, () => trigger(database, options));
165+
},
166+
167+
async fetchAndLockSteps(limit: number): Promise<{
168+
steps: AutomationStepToRun[],
169+
nextStepReadyAt: null;
170+
} | {
171+
steps: never[],
172+
nextStepReadyAt: null | Date;
173+
}> {
174+
const database = getDatabase();
175+
176+
return withTransaction(database, () => fetchAndLockSteps(database, limit));
144177
}
145178
};
146179
}
@@ -205,6 +238,146 @@ function trigger(database: DatabaseSync, {
205238
});
206239
}
207240

241+
function fetchAndLockSteps(database: DatabaseSync, limit: number): {
242+
steps: AutomationStepToRun[],
243+
nextStepReadyAt: null;
244+
} | {
245+
steps: never[],
246+
nextStepReadyAt: null | Date;
247+
} {
248+
const now = new Date();
249+
const nowString = now.toISOString();
250+
const staleLockCutoff = new Date(now.getTime() - LOCK_TIMEOUT_MS);
251+
const staleLockCutoffString = staleLockCutoff.toISOString();
252+
const lockId = crypto.randomUUID();
253+
254+
const candidates = database.prepare(`
255+
SELECT id
256+
FROM automation_run_steps
257+
WHERE status = 'pending'
258+
AND ready_at <= ?
259+
AND (
260+
locked_by IS NULL
261+
OR locked_at < ?
262+
)
263+
ORDER BY ready_at, created_at, id
264+
LIMIT ?
265+
`).all(nowString, staleLockCutoffString, limit) as unknown as ReadonlyArray<{id: string}>;
266+
267+
if (candidates.length === 0) {
268+
return {
269+
steps: [],
270+
nextStepReadyAt: findNextPendingReadyAt(database, now)
271+
};
272+
}
273+
274+
const candidateIds = candidates.map(candidate => candidate.id);
275+
276+
const placeholders = candidateIds.map(() => '?').join(',');
277+
database.prepare(`
278+
UPDATE automation_run_steps
279+
SET locked_by = ?,
280+
locked_at = ?,
281+
started_at = ?,
282+
updated_at = ?,
283+
step_attempts = step_attempts + 1
284+
WHERE id IN (${placeholders})
285+
AND status = 'pending'
286+
AND ready_at <= ?
287+
AND (
288+
locked_by IS NULL
289+
OR locked_at < ?
290+
)
291+
`).run(lockId, nowString, nowString, nowString, ...candidateIds, nowString, staleLockCutoffString);
292+
293+
const rows = database.prepare(`
294+
SELECT
295+
step.id AS id,
296+
step.step_attempts AS step_attempts,
297+
step.automation_run_id AS automation_run_id,
298+
automation.status AS automation_status,
299+
run.member_id AS member_id,
300+
run.member_email AS member_email,
301+
action.id AS action_id,
302+
action.type AS type,
303+
revision.wait_hours AS wait_hours,
304+
revision.email_subject AS email_subject,
305+
revision.email_lexical AS email_lexical,
306+
revision.email_sender_name AS email_sender_name,
307+
revision.email_sender_email AS email_sender_email,
308+
revision.email_sender_reply_to AS email_sender_reply_to,
309+
revision.email_design_setting_id AS email_design_setting_id
310+
FROM automation_run_steps step
311+
INNER JOIN automation_runs run ON run.id = step.automation_run_id
312+
INNER JOIN automations automation ON automation.id = run.automation_id
313+
INNER JOIN automation_action_revisions revision ON revision.id = step.automation_action_revision_id
314+
INNER JOIN automation_actions action ON action.id = revision.action_id
315+
WHERE step.locked_by = ?
316+
ORDER BY step.ready_at, step.created_at, step.id
317+
`).all(lockId) as unknown as StepRow[];
318+
319+
if (rows.length === 0) {
320+
return {
321+
steps: [],
322+
nextStepReadyAt: findNextPendingReadyAt(database, staleLockCutoff)
323+
};
324+
} else {
325+
return {
326+
steps: rows.map(row => buildStepToRun(row)),
327+
nextStepReadyAt: null
328+
};
329+
}
330+
}
331+
332+
function findNextPendingReadyAt(database: DatabaseSync, staleLockCutoff: Readonly<Date>): Date | null {
333+
const row = database.prepare(`
334+
SELECT MIN(ready_at) AS next_ready_at
335+
FROM automation_run_steps
336+
WHERE status = 'pending'
337+
AND (
338+
locked_by IS NULL
339+
OR locked_at < ?
340+
)
341+
`).get(staleLockCutoff.toISOString()) as {next_ready_at: string | null} | undefined;
342+
return row?.next_ready_at ? new Date(row.next_ready_at) : null;
343+
}
344+
345+
function buildStepToRun(row: ReadonlyDeep<StepRow>): AutomationStepToRun {
346+
const base = {
347+
id: row.id,
348+
step_attempts: row.step_attempts,
349+
automation_run_id: row.automation_run_id,
350+
automation_status: row.automation_status,
351+
member_id: row.member_id,
352+
member_email: row.member_email,
353+
action_id: row.action_id
354+
};
355+
356+
switch (row.type) {
357+
case 'wait':
358+
return {
359+
...base,
360+
type: 'wait',
361+
wait_hours: requireValue(row, 'wait_hours')
362+
};
363+
case 'send_email':
364+
return {
365+
...base,
366+
type: 'send_email',
367+
email_subject: requireValue(row, 'email_subject'),
368+
email_lexical: requireValue(row, 'email_lexical'),
369+
email_sender_name: row.email_sender_name,
370+
email_sender_email: row.email_sender_email,
371+
email_sender_reply_to: row.email_sender_reply_to,
372+
email_design_setting_id: row.email_design_setting_id
373+
};
374+
default:
375+
throw new errors.InternalServerError({
376+
message: `Unexpected action type from database: ${row.type}`
377+
});
378+
}
379+
}
380+
208381
function findFirstActionRevision(database: DatabaseSync, memberStatus: 'free' | 'paid'): NextActionRevisionRow | null {
209382
const automationSlug: NonNullable<string> = MEMBER_WELCOME_EMAIL_SLUGS[memberStatus];
210383

@@ -653,12 +826,15 @@ function buildActionPayload(row: ActionRow): AutomationAction {
653826
}
654827
}
655828

656-
function requireValue<FieldT extends keyof ActionRow>(
657-
row: Pick<ActionRow, 'id' | 'type' | FieldT>,
829+
function requireValue<
830+
RowT extends {id: string, type: string},
831+
FieldT extends keyof RowT
832+
>(
833+
row: RowT,
658834
field: FieldT
659-
): NonNullable<ActionRow[FieldT]> {
835+
): NonNullable<RowT[FieldT]> {
660836
const value = row[field];
661-
if (value === null) {
837+
if ((value === null) || (value === undefined)) {
662838
throw new errors.InternalServerError({
663839
message: tpl(messages.invalidAutomationActionRevision, {
664840
actionId: row.id,
Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,38 @@
1+
import type * as AutomationsApi from './automations-api';
2+
3+
const MAX_STEPS_PER_BATCH = 100;
4+
15
type PollOptions = {
6+
automationsApi: Pick<typeof AutomationsApi,
7+
'fetchAndLockSteps'
8+
>;
29
enqueueAnotherPollAt: (date: Readonly<Date>) => unknown;
310
};
411

512
export const poll = async ({
6-
// TODO(NY-1286) This ESLint suppression will be removed once we implement polling.
7-
// eslint-disable-next-line @typescript-eslint/no-unused-vars
13+
automationsApi,
814
enqueueAnotherPollAt
915
}: Readonly<PollOptions>): Promise<void> => {
1016
// TODO(NY-1311) Once we're using real tables, we should remove this conditional.
17+
// Note that unlike triggering, where we only continue if the "automations"
18+
// flag is enabled, for polling we want to run in all cases. If an
19+
// automation was enqueued while the flag was on, we want it to run even if
20+
// the feature was turned off.
1121
if (
1222
process.env.NODE_ENV !== 'development'
1323
&& !process.env.NODE_ENV?.startsWith('test')
1424
) {
1525
return;
1626
}
1727

18-
// TODO(NY-1286) Implement polling. For now, this function is a skeleton.
28+
const {steps, nextStepReadyAt} = await automationsApi.fetchAndLockSteps(MAX_STEPS_PER_BATCH);
29+
30+
if (steps.length === 0) {
31+
if (nextStepReadyAt) {
32+
enqueueAnotherPollAt(nextStepReadyAt);
33+
}
34+
return;
35+
}
36+
37+
// TODO(NY-1286) Implement polling. For now, this function is incomplete.
1938
};

ghost/core/core/server/services/automations/service.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ const {getSignedAdminToken} = require('../../adapters/scheduling/utils');
66
const StartAutomationsPollEvent = require('./events/start-automations-poll-event');
77
const {poll} = require('./poll');
88
const {welcomeEmailAutomationPoll} = require('./welcome-email-automation-poll');
9+
const automationsApi = require('./automations-api');
910
const memberWelcomeEmailService = require('../member-welcome-emails/service');
1011
/** @import DomainEvents from '@tryghost/domain-events' */
1112

@@ -61,6 +62,7 @@ class AutomationsService {
6162
};
6263

6364
domainEvents.subscribe(StartAutomationsPollEvent, oneAtATime(async () => poll({
65+
automationsApi,
6466
enqueueAnotherPollAt: enqueuePollAt
6567
})));
6668

0 commit comments

Comments
 (0)