Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,10 @@ export async function trigger(options: TriggerOptions) {
requestPoll();
}

export async function fetchAndLockSteps(...args: Parameters<AutomationsRepository['fetchAndLockSteps']>) {
return await repository.fetchAndLockSteps(...args);
}

export function _resetTestDatabase() {
if (process.env.NODE_ENV?.startsWith('testing')) {
testDatabase = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,31 @@ export interface EditAutomationData {
edges: AutomationEdge[];
}

type AutomationStepBase = {
id: string;
step_attempts: number;
automation_run_id: string;
automation_status: 'inactive' | 'active';
member_id: string | null;
member_email: string;
action_id: string;
};

export type AutomationStepToRun = AutomationStepBase & (
{
type: 'wait';
wait_hours: number;
} | {
type: 'send_email';
email_subject: string;
email_lexical: string;
email_sender_name: string | null;
email_sender_email: string | null;
email_sender_reply_to: string | null;
email_design_setting_id: string | null;
}
);

export interface AutomationsRepository {
browse(): Promise<Page<AutomationSummary>>;
getById(id: string): Promise<Automation | null>;
Expand All @@ -71,4 +96,21 @@ export interface AutomationsRepository {
memberId: string;
memberStatus: 'free' | 'paid';
}): Promise<void>;
/**
* Select the steps we want to run. If no steps are found, returns the time
* we should try again, if any.
*
* If we could guarantee this function would only be called once ever, it'd
* be pretty simple! However, we want to handle cases where this function is
* called multiple times simultaneously (maybe in different processes
* querying the same database). That's why we implement a row-level locking
* mechanism, hence the word "lock" in the name of this function.
*/
fetchAndLockSteps(limit: number): Promise<{
steps: AutomationStepToRun[],
nextStepReadyAt: null;
} | {
steps: never[],
nextStepReadyAt: Date | null;
}>;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import errors from '@tryghost/errors';
import tpl from '@tryghost/tpl';
import crypto from 'node:crypto';
import ObjectId from 'bson-objectid';
import {dequal} from 'dequal';
import type {DatabaseSync} from 'node:sqlite';
Expand All @@ -9,12 +10,14 @@ import type {
AutomationAction,
AutomationEdge,
AutomationSummary,
AutomationStepToRun,
AutomationsRepository,
EditAutomationData,
Page
} from './automations-repository';
import type {ExclusifyUnion} from 'type-fest';
import type {ExclusifyUnion, ReadonlyDeep} from 'type-fest';

const LOCK_TIMEOUT_MS = 30 * 60 * 1000;
const HOUR_MS = 60 * 60 * 1000;

const messages = {
Expand Down Expand Up @@ -61,6 +64,24 @@ interface EdgeRow {
target_action_id: string;
}

type StepRow = {
id: string;
step_attempts: number;
automation_run_id: string;
automation_status: 'inactive' | 'active';
member_id: string | null;
member_email: string;
action_id: string;
type: string;
wait_hours: number | null;
email_subject: string | null;
email_lexical: string | null;
email_sender_name: string | null;
email_sender_email: string | null;
email_sender_reply_to: string | null;
email_design_setting_id: string | null;
};

type NextActionRevisionRow = {
automation_id: string;
action_id: string;
Expand Down Expand Up @@ -141,6 +162,18 @@ export function createFakeDatabaseAutomationsRepository({
const database = getDatabase();

return withTransaction(database, () => trigger(database, options));
},

async fetchAndLockSteps(limit: number): Promise<{
steps: AutomationStepToRun[],
nextStepReadyAt: null;
} | {
steps: never[],
nextStepReadyAt: null | Date;
}> {
const database = getDatabase();

return withTransaction(database, () => fetchAndLockSteps(database, limit));
}
};
}
Expand Down Expand Up @@ -205,6 +238,146 @@ function trigger(database: DatabaseSync, {
});
}

function fetchAndLockSteps(database: DatabaseSync, limit: number): {
steps: AutomationStepToRun[],
nextStepReadyAt: null;
} | {
steps: never[],
nextStepReadyAt: null | Date;
} {
const now = new Date();
const nowString = now.toISOString();
const staleLockCutoff = new Date(now.getTime() - LOCK_TIMEOUT_MS);
const staleLockCutoffString = staleLockCutoff.toISOString();
const lockId = crypto.randomUUID();

const candidates = database.prepare(`
SELECT id
FROM automation_run_steps
WHERE status = 'pending'
AND ready_at <= ?
AND (
locked_by IS NULL
OR locked_at < ?
)
ORDER BY ready_at, created_at, id
LIMIT ?
`).all(nowString, staleLockCutoffString, limit) as unknown as ReadonlyArray<{id: string}>;

if (candidates.length === 0) {
return {
steps: [],
nextStepReadyAt: findNextPendingReadyAt(database, now)
};
}

const candidateIds = candidates.map(candidate => candidate.id);

const placeholders = candidateIds.map(() => '?').join(',');
database.prepare(`
UPDATE automation_run_steps
SET locked_by = ?,
locked_at = ?,
started_at = ?,
updated_at = ?,
step_attempts = step_attempts + 1
WHERE id IN (${placeholders})
AND status = 'pending'
AND ready_at <= ?
AND (
locked_by IS NULL
OR locked_at < ?
)
`).run(lockId, nowString, nowString, nowString, ...candidateIds, nowString, staleLockCutoffString);

const rows = database.prepare(`
SELECT
step.id AS id,
step.step_attempts AS step_attempts,
step.automation_run_id AS automation_run_id,
automation.status AS automation_status,
run.member_id AS member_id,
run.member_email AS member_email,
action.id AS action_id,
action.type AS type,
revision.wait_hours AS wait_hours,
revision.email_subject AS email_subject,
revision.email_lexical AS email_lexical,
revision.email_sender_name AS email_sender_name,
revision.email_sender_email AS email_sender_email,
revision.email_sender_reply_to AS email_sender_reply_to,
revision.email_design_setting_id AS email_design_setting_id
FROM automation_run_steps step
INNER JOIN automation_runs run ON run.id = step.automation_run_id
INNER JOIN automations automation ON automation.id = run.automation_id
INNER JOIN automation_action_revisions revision ON revision.id = step.automation_action_revision_id
INNER JOIN automation_actions action ON action.id = revision.action_id
WHERE step.locked_by = ?
ORDER BY step.ready_at, step.created_at, step.id
`).all(lockId) as unknown as StepRow[];

if (rows.length === 0) {
return {
steps: [],
nextStepReadyAt: findNextPendingReadyAt(database, staleLockCutoff)
};
} else {
return {
steps: rows.map(row => buildStepToRun(row)),
nextStepReadyAt: null
};
}
}

function findNextPendingReadyAt(database: DatabaseSync, staleLockCutoff: Readonly<Date>): Date | null {
const row = database.prepare(`
SELECT MIN(ready_at) AS next_ready_at
FROM automation_run_steps
WHERE status = 'pending'
AND (
locked_by IS NULL
OR locked_at < ?
)
`).get(staleLockCutoff.toISOString()) as {next_ready_at: string | null} | undefined;
return row?.next_ready_at ? new Date(row.next_ready_at) : null;
}

function buildStepToRun(row: ReadonlyDeep<StepRow>): AutomationStepToRun {
const base = {
id: row.id,
step_attempts: row.step_attempts,
automation_run_id: row.automation_run_id,
automation_status: row.automation_status,
member_id: row.member_id,
member_email: row.member_email,
action_id: row.action_id
};

switch (row.type) {
case 'wait':
return {
...base,
type: 'wait',
wait_hours: requireValue(row, 'wait_hours')
};
case 'send_email':
return {
...base,
type: 'send_email',
email_subject: requireValue(row, 'email_subject'),
email_lexical: requireValue(row, 'email_lexical'),
email_sender_name: row.email_sender_name,
email_sender_email: row.email_sender_email,
email_sender_reply_to: row.email_sender_reply_to,
email_design_setting_id: row.email_design_setting_id
};
default:
throw new errors.InternalServerError({
message: `Unexpected action type from database: ${row.type}`
});
}
}

function findFirstActionRevision(database: DatabaseSync, memberStatus: 'free' | 'paid'): NextActionRevisionRow | null {
const automationSlug: NonNullable<string> = MEMBER_WELCOME_EMAIL_SLUGS[memberStatus];

Expand Down Expand Up @@ -653,12 +826,15 @@ function buildActionPayload(row: ActionRow): AutomationAction {
}
}

function requireValue<FieldT extends keyof ActionRow>(
row: Pick<ActionRow, 'id' | 'type' | FieldT>,
function requireValue<
RowT extends {id: string, type: string},
FieldT extends keyof RowT
>(
row: RowT,
field: FieldT
): NonNullable<ActionRow[FieldT]> {
): NonNullable<RowT[FieldT]> {
const value = row[field];
if (value === null) {
if ((value === null) || (value === undefined)) {
throw new errors.InternalServerError({
message: tpl(messages.invalidAutomationActionRevision, {
actionId: row.id,
Expand Down
25 changes: 22 additions & 3 deletions ghost/core/core/server/services/automations/poll.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,38 @@
import type * as AutomationsApi from './automations-api';

const MAX_STEPS_PER_BATCH = 100;

type PollOptions = {
automationsApi: Pick<typeof AutomationsApi,
'fetchAndLockSteps'
>;
enqueueAnotherPollAt: (date: Readonly<Date>) => unknown;
};

export const poll = async ({
// TODO(NY-1286) This ESLint suppression will be removed once we implement polling.
// eslint-disable-next-line @typescript-eslint/no-unused-vars
automationsApi,
enqueueAnotherPollAt
}: Readonly<PollOptions>): Promise<void> => {
// TODO(NY-1311) Once we're using real tables, we should remove this conditional.
// Note that unlike triggering, where we only continue if the "automations"
// flag is enabled, for polling we want to run in all cases. If an
// automation was enqueued while the flag was on, we want it to run even if
// the feature was turned off.
if (
process.env.NODE_ENV !== 'development'
&& !process.env.NODE_ENV?.startsWith('test')
) {
return;
}

// TODO(NY-1286) Implement polling. For now, this function is a skeleton.
const {steps, nextStepReadyAt} = await automationsApi.fetchAndLockSteps(MAX_STEPS_PER_BATCH);

if (steps.length === 0) {
if (nextStepReadyAt) {
enqueueAnotherPollAt(nextStepReadyAt);
}
return;
}

// TODO(NY-1286) Implement polling. For now, this function is incomplete.
};
2 changes: 2 additions & 0 deletions ghost/core/core/server/services/automations/service.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const {getSignedAdminToken} = require('../../adapters/scheduling/utils');
const StartAutomationsPollEvent = require('./events/start-automations-poll-event');
const {poll} = require('./poll');
const {welcomeEmailAutomationPoll} = require('./welcome-email-automation-poll');
const automationsApi = require('./automations-api');
const memberWelcomeEmailService = require('../member-welcome-emails/service');
/** @import DomainEvents from '@tryghost/domain-events' */

Expand Down Expand Up @@ -61,6 +62,7 @@ class AutomationsService {
};

domainEvents.subscribe(StartAutomationsPollEvent, oneAtATime(async () => poll({
automationsApi,
enqueueAnotherPollAt: enqueuePollAt
})));

Expand Down
Loading
Loading