diff --git a/ghost/core/core/server/services/automations/automations-api.ts b/ghost/core/core/server/services/automations/automations-api.ts index a87499031ae..67d47ffd0cf 100644 --- a/ghost/core/core/server/services/automations/automations-api.ts +++ b/ghost/core/core/server/services/automations/automations-api.ts @@ -264,6 +264,10 @@ export async function trigger(options: TriggerOptions) { requestPoll(); } +export async function fetchAndLockSteps(...args: Parameters) { + return await repository.fetchAndLockSteps(...args); +} + export function _resetTestDatabase() { if (process.env.NODE_ENV?.startsWith('testing')) { testDatabase = null; diff --git a/ghost/core/core/server/services/automations/automations-repository.ts b/ghost/core/core/server/services/automations/automations-repository.ts index bc9e3a184b9..cc5a9ec808a 100644 --- a/ghost/core/core/server/services/automations/automations-repository.ts +++ b/ghost/core/core/server/services/automations/automations-repository.ts @@ -62,6 +62,31 @@ export interface EditAutomationData { edges: AutomationEdge[]; } +type AutomationStepBase = { + id: string; + step_attempts: number; + automation_run_id: string; + automation_status: 'inactive' | 'active'; + member_id: string | null; + member_email: string; + action_id: string; +}; + +export type AutomationStepToRun = AutomationStepBase & ( + { + type: 'wait'; + wait_hours: number; + } | { + type: 'send_email'; + email_subject: string; + email_lexical: string; + email_sender_name: string | null; + email_sender_email: string | null; + email_sender_reply_to: string | null; + email_design_setting_id: string | null; + } +); + export interface AutomationsRepository { browse(): Promise>; getById(id: string): Promise; @@ -71,4 +96,21 @@ export interface AutomationsRepository { memberId: string; memberStatus: 'free' | 'paid'; }): Promise; + /** + * Select the steps we want to run. If no steps are found, returns the time + * we should try again, if any. + * + * If we could guarantee this function would only be called once ever, it'd + * be pretty simple! However, we want to handle cases where this function is + * called multiple times simultaneously (maybe in different processes + * querying the same database). That's why we implement a row-level locking + * mechanism, hence the word "lock" in the name of this function. + */ + fetchAndLockSteps(limit: number): Promise<{ + steps: AutomationStepToRun[], + nextStepReadyAt: null; + } | { + steps: never[], + nextStepReadyAt: Date | null; + }>; } diff --git a/ghost/core/core/server/services/automations/fake-database-automations-repository.ts b/ghost/core/core/server/services/automations/fake-database-automations-repository.ts index 4612590fd84..c1b7cd2bc89 100644 --- a/ghost/core/core/server/services/automations/fake-database-automations-repository.ts +++ b/ghost/core/core/server/services/automations/fake-database-automations-repository.ts @@ -1,5 +1,6 @@ import errors from '@tryghost/errors'; import tpl from '@tryghost/tpl'; +import crypto from 'node:crypto'; import ObjectId from 'bson-objectid'; import {dequal} from 'dequal'; import type {DatabaseSync} from 'node:sqlite'; @@ -9,12 +10,14 @@ import type { AutomationAction, AutomationEdge, AutomationSummary, + AutomationStepToRun, AutomationsRepository, EditAutomationData, Page } from './automations-repository'; -import type {ExclusifyUnion} from 'type-fest'; +import type {ExclusifyUnion, ReadonlyDeep} from 'type-fest'; +const LOCK_TIMEOUT_MS = 30 * 60 * 1000; const HOUR_MS = 60 * 60 * 1000; const messages = { @@ -61,6 +64,24 @@ interface EdgeRow { target_action_id: string; } +type StepRow = { + id: string; + step_attempts: number; + automation_run_id: string; + automation_status: 'inactive' | 'active'; + member_id: string | null; + member_email: string; + action_id: string; + type: string; + wait_hours: number | null; + email_subject: string | null; + email_lexical: string | null; + email_sender_name: string | null; + email_sender_email: string | null; + email_sender_reply_to: string | null; + email_design_setting_id: string | null; +}; + type NextActionRevisionRow = { automation_id: string; action_id: string; @@ -141,6 +162,18 @@ export function createFakeDatabaseAutomationsRepository({ const database = getDatabase(); return withTransaction(database, () => trigger(database, options)); + }, + + async fetchAndLockSteps(limit: number): Promise<{ + steps: AutomationStepToRun[], + nextStepReadyAt: null; + } | { + steps: never[], + nextStepReadyAt: null | Date; + }> { + const database = getDatabase(); + + return withTransaction(database, () => fetchAndLockSteps(database, limit)); } }; } @@ -205,6 +238,146 @@ function trigger(database: DatabaseSync, { }); } +function fetchAndLockSteps(database: DatabaseSync, limit: number): { + steps: AutomationStepToRun[], + nextStepReadyAt: null; +} | { + steps: never[], + nextStepReadyAt: null | Date; +} { + const now = new Date(); + const nowString = now.toISOString(); + const staleLockCutoff = new Date(now.getTime() - LOCK_TIMEOUT_MS); + const staleLockCutoffString = staleLockCutoff.toISOString(); + const lockId = crypto.randomUUID(); + + const candidates = database.prepare(` + SELECT id + FROM automation_run_steps + WHERE status = 'pending' + AND ready_at <= ? + AND ( + locked_by IS NULL + OR locked_at < ? + ) + ORDER BY ready_at, created_at, id + LIMIT ? + `).all(nowString, staleLockCutoffString, limit) as unknown as ReadonlyArray<{id: string}>; + + if (candidates.length === 0) { + return { + steps: [], + nextStepReadyAt: findNextPendingReadyAt(database, now) + }; + } + + const candidateIds = candidates.map(candidate => candidate.id); + + const placeholders = candidateIds.map(() => '?').join(','); + database.prepare(` + UPDATE automation_run_steps + SET locked_by = ?, + locked_at = ?, + started_at = ?, + updated_at = ?, + step_attempts = step_attempts + 1 + WHERE id IN (${placeholders}) + AND status = 'pending' + AND ready_at <= ? + AND ( + locked_by IS NULL + OR locked_at < ? + ) + `).run(lockId, nowString, nowString, nowString, ...candidateIds, nowString, staleLockCutoffString); + + const rows = database.prepare(` + SELECT + step.id AS id, + step.step_attempts AS step_attempts, + step.automation_run_id AS automation_run_id, + automation.status AS automation_status, + run.member_id AS member_id, + run.member_email AS member_email, + action.id AS action_id, + action.type AS type, + revision.wait_hours AS wait_hours, + revision.email_subject AS email_subject, + revision.email_lexical AS email_lexical, + revision.email_sender_name AS email_sender_name, + revision.email_sender_email AS email_sender_email, + revision.email_sender_reply_to AS email_sender_reply_to, + revision.email_design_setting_id AS email_design_setting_id + FROM automation_run_steps step + INNER JOIN automation_runs run ON run.id = step.automation_run_id + INNER JOIN automations automation ON automation.id = run.automation_id + INNER JOIN automation_action_revisions revision ON revision.id = step.automation_action_revision_id + INNER JOIN automation_actions action ON action.id = revision.action_id + WHERE step.locked_by = ? + ORDER BY step.ready_at, step.created_at, step.id + `).all(lockId) as unknown as StepRow[]; + + if (rows.length === 0) { + return { + steps: [], + nextStepReadyAt: findNextPendingReadyAt(database, staleLockCutoff) + }; + } else { + return { + steps: rows.map(row => buildStepToRun(row)), + nextStepReadyAt: null + }; + } +} + +function findNextPendingReadyAt(database: DatabaseSync, staleLockCutoff: Readonly): Date | null { + const row = database.prepare(` + SELECT MIN(ready_at) AS next_ready_at + FROM automation_run_steps + WHERE status = 'pending' + AND ( + locked_by IS NULL + OR locked_at < ? + ) + `).get(staleLockCutoff.toISOString()) as {next_ready_at: string | null} | undefined; + return row?.next_ready_at ? new Date(row.next_ready_at) : null; +} + +function buildStepToRun(row: ReadonlyDeep): AutomationStepToRun { + const base = { + id: row.id, + step_attempts: row.step_attempts, + automation_run_id: row.automation_run_id, + automation_status: row.automation_status, + member_id: row.member_id, + member_email: row.member_email, + action_id: row.action_id + }; + + switch (row.type) { + case 'wait': + return { + ...base, + type: 'wait', + wait_hours: requireValue(row, 'wait_hours') + }; + case 'send_email': + return { + ...base, + type: 'send_email', + email_subject: requireValue(row, 'email_subject'), + email_lexical: requireValue(row, 'email_lexical'), + email_sender_name: row.email_sender_name, + email_sender_email: row.email_sender_email, + email_sender_reply_to: row.email_sender_reply_to, + email_design_setting_id: row.email_design_setting_id + }; + default: + throw new errors.InternalServerError({ + message: `Unexpected action type from database: ${row.type}` + }); + } +} + function findFirstActionRevision(database: DatabaseSync, memberStatus: 'free' | 'paid'): NextActionRevisionRow | null { const automationSlug: NonNullable = MEMBER_WELCOME_EMAIL_SLUGS[memberStatus]; @@ -653,12 +826,15 @@ function buildActionPayload(row: ActionRow): AutomationAction { } } -function requireValue( - row: Pick, +function requireValue< + RowT extends {id: string, type: string}, + FieldT extends keyof RowT +>( + row: RowT, field: FieldT -): NonNullable { +): NonNullable { const value = row[field]; - if (value === null) { + if ((value === null) || (value === undefined)) { throw new errors.InternalServerError({ message: tpl(messages.invalidAutomationActionRevision, { actionId: row.id, diff --git a/ghost/core/core/server/services/automations/poll.ts b/ghost/core/core/server/services/automations/poll.ts index b94ebe3b090..0b0aaa98033 100644 --- a/ghost/core/core/server/services/automations/poll.ts +++ b/ghost/core/core/server/services/automations/poll.ts @@ -1,13 +1,23 @@ +import type * as AutomationsApi from './automations-api'; + +const MAX_STEPS_PER_BATCH = 100; + type PollOptions = { + automationsApi: Pick; enqueueAnotherPollAt: (date: Readonly) => unknown; }; export const poll = async ({ - // TODO(NY-1286) This ESLint suppression will be removed once we implement polling. - // eslint-disable-next-line @typescript-eslint/no-unused-vars + automationsApi, enqueueAnotherPollAt }: Readonly): Promise => { // TODO(NY-1311) Once we're using real tables, we should remove this conditional. + // Note that unlike triggering, where we only continue if the "automations" + // flag is enabled, for polling we want to run in all cases. If an + // automation was enqueued while the flag was on, we want it to run even if + // the feature was turned off. if ( process.env.NODE_ENV !== 'development' && !process.env.NODE_ENV?.startsWith('test') @@ -15,5 +25,14 @@ export const poll = async ({ return; } - // TODO(NY-1286) Implement polling. For now, this function is a skeleton. + const {steps, nextStepReadyAt} = await automationsApi.fetchAndLockSteps(MAX_STEPS_PER_BATCH); + + if (steps.length === 0) { + if (nextStepReadyAt) { + enqueueAnotherPollAt(nextStepReadyAt); + } + return; + } + + // TODO(NY-1286) Implement polling. For now, this function is incomplete. }; diff --git a/ghost/core/core/server/services/automations/service.js b/ghost/core/core/server/services/automations/service.js index 2da4e7311cd..16b906c0b19 100644 --- a/ghost/core/core/server/services/automations/service.js +++ b/ghost/core/core/server/services/automations/service.js @@ -6,6 +6,7 @@ const {getSignedAdminToken} = require('../../adapters/scheduling/utils'); const StartAutomationsPollEvent = require('./events/start-automations-poll-event'); const {poll} = require('./poll'); const {welcomeEmailAutomationPoll} = require('./welcome-email-automation-poll'); +const automationsApi = require('./automations-api'); const memberWelcomeEmailService = require('../member-welcome-emails/service'); /** @import DomainEvents from '@tryghost/domain-events' */ @@ -61,6 +62,7 @@ class AutomationsService { }; domainEvents.subscribe(StartAutomationsPollEvent, oneAtATime(async () => poll({ + automationsApi, enqueueAnotherPollAt: enqueuePollAt }))); diff --git a/ghost/core/test/unit/server/services/automations/automations-repository.test.ts b/ghost/core/test/unit/server/services/automations/automations-repository.test.ts index 19eff2a6a59..02592dd504c 100644 --- a/ghost/core/test/unit/server/services/automations/automations-repository.test.ts +++ b/ghost/core/test/unit/server/services/automations/automations-repository.test.ts @@ -1,4 +1,5 @@ import assert from 'node:assert/strict'; +import sinon from 'sinon'; import ObjectId from 'bson-objectid'; import {createTemporaryFakeAutomationsDatabase} from '../../../../../core/server/services/automations/temporary-fake-database'; import {createFakeDatabaseAutomationsRepository} from '../../../../../core/server/services/automations/fake-database-automations-repository'; @@ -72,6 +73,116 @@ describe('automations repository', function () { return Number((row as {count: number}).count); }; + const getActionByIndex = (automationId: SQLInputValue, index: SQLInputValue) => { + const result = database!.prepare(` + SELECT + automation_actions.id AS action_id, + automation_actions.type AS action_type, + automation_action_revisions.id AS revision_id, + automation_action_revisions.wait_hours AS wait_hours + FROM automation_actions + INNER JOIN automation_action_revisions ON automation_action_revisions.action_id = automation_actions.id + WHERE automation_actions.automation_id = ? + AND automation_actions.deleted_at IS NULL + ORDER BY automation_actions.created_at, automation_actions.id + LIMIT 1 OFFSET ? + `).get(automationId, index); + assert(result, 'Expected action to exist'); + return result; + }; + + const insertRun = (automationId: SQLInputValue, attrs = {}) => { + const now = new Date().toISOString(); + const run = { + id: ObjectId().toHexString(), + created_at: now, + updated_at: now, + automation_id: automationId, + member_id: ObjectId().toHexString(), + member_email: 'member@example.com', + ...attrs + }; + + database!.prepare(` + INSERT INTO automation_runs + (id, created_at, updated_at, automation_id, member_id, member_email) VALUES + (:id, :created_at, :updated_at, :automation_id, :member_id, :member_email) + `).run(run); + + return run; + }; + + const insertStep = (runId: SQLInputValue, revisionId: SQLInputValue, attrs = {}) => { + const now = new Date().toISOString(); + const step = { + id: ObjectId().toHexString(), + created_at: now, + updated_at: now, + automation_run_id: runId, + automation_action_revision_id: revisionId, + ready_at: now, + step_attempts: 0, + started_at: null, + finished_at: null, + status: 'pending', + locked_by: null, + locked_at: null, + ...attrs + }; + + database!.prepare(` + INSERT INTO automation_run_steps + ( + id, + created_at, + updated_at, + automation_run_id, + automation_action_revision_id, + ready_at, + step_attempts, + started_at, + finished_at, + status, + locked_by, + locked_at + ) VALUES ( + :id, + :created_at, + :updated_at, + :automation_run_id, + :automation_action_revision_id, + :ready_at, + :step_attempts, + :started_at, + :finished_at, + :status, + :locked_by, + :locked_at + ) + `).run(step); + + return step; + }; + + const getStepById = (id: SQLInputValue) => { + const result = database!.prepare(` + SELECT * + FROM automation_run_steps + WHERE id = ? + `).get(id); + assert(result, 'Expected step to exist'); + return result; + }; + + const assertSingleBatchLock = (steps: ReadonlyArray<{id: string}>): string => { + assert(steps.length > 0, 'Expected at least one locked step'); + const lockedSteps = steps.map(step => getStepById(step.id)); + const lockId = lockedSteps[0]?.locked_by; + assert(typeof lockId === 'string'); + assert(lockedSteps.every(step => step.locked_by === lockId)); + return lockId; + }; + const changeWaitHours = (action: AutomationAction, waitHours: number): AutomationAction => { assert.equal(action.type, 'wait'); return { @@ -90,6 +201,7 @@ describe('automations repository', function () { }); afterEach(function () { + sinon.restore(); database.close(); }); @@ -291,4 +403,202 @@ describe('automations repository', function () { assert.equal(getRevisionCount(addedActionId), 1); }); }); + + describe('fetchAndLockSteps', function () { + it('locks ready and steps with stale locks, but skips future and recently-locked steps', async function () { + const automation = await getAutomationBySlug('member-welcome-email-free'); + const action = getActionByIndex(automation.id, 0); + const run = insertRun(automation.id); + const readyStep = insertStep(run.id, action.revision_id, { + ready_at: new Date(Date.now() - 1000).toISOString() + }); + const staleLockStep = insertStep(run.id, action.revision_id, { + locked_at: new Date(Date.now() - (31 * 60 * 1000)).toISOString(), + ready_at: new Date(Date.now() - 1000).toISOString(), + locked_by: 'old-lock', + step_attempts: 2 + }); + const finishedStep = insertStep(run.id, action.revision_id, { + finished_at: new Date(Date.now() - 1000).toISOString(), + locked_at: new Date(Date.now() - (31 * 60 * 1000)).toISOString(), + ready_at: new Date(Date.now() - 1000).toISOString(), + locked_by: 'finished-lock', + status: 'finished', + step_attempts: 4 + }); + const notReadyYetStep = insertStep(run.id, action.revision_id, { + ready_at: new Date(Date.now() + 60 * 1000).toISOString() + }); + const recentlyLockedStep = insertStep(run.id, action.revision_id, { + locked_at: new Date(Date.now() - (29 * 60 * 1000)).toISOString(), + ready_at: new Date(Date.now() - 1000).toISOString(), + locked_by: 'fresh-lock' + }); + + const result = await repo.fetchAndLockSteps(10); + + const actualStepIds = new Set(result.steps.map(step => step.id)); + const expectedStepIds = new Set([readyStep.id, staleLockStep.id]); + assert.deepEqual(actualStepIds, expectedStepIds); + assert.equal(result.nextStepReadyAt, null); + + const lockId = assertSingleBatchLock(result.steps); + + const lockedReady = getStepById(readyStep.id); + assert.equal(lockedReady.status, 'pending'); + assert.equal(lockedReady.step_attempts, 1); + assert.equal(lockedReady.locked_by, lockId); + + const lockedStaleLock = getStepById(staleLockStep.id); + assert.equal(lockedStaleLock.status, 'pending'); + assert.equal(lockedStaleLock.step_attempts, 3); + assert.equal(lockedStaleLock.locked_by, lockId); + + const skippedFinished = getStepById(finishedStep.id); + assert.equal(skippedFinished.status, 'finished'); + assert.equal(skippedFinished.step_attempts, 4); + assert.equal(skippedFinished.locked_by, 'finished-lock'); + + const skippedNotReadyYet = getStepById(notReadyYetStep.id); + assert.equal(skippedNotReadyYet.step_attempts, 0); + assert.equal(skippedNotReadyYet.locked_by, null); + + const skippedRecentlyLocked = getStepById(recentlyLockedStep.id); + assert.equal(skippedRecentlyLocked.step_attempts, 0); + assert.equal(skippedRecentlyLocked.locked_by, 'fresh-lock'); + }); + + it('returns the next future pending ready_at when no steps can be locked', async function () { + const automation = await getAutomationBySlug('member-welcome-email-free'); + const action = getActionByIndex(automation.id, 0); + const run = insertRun(automation.id); + const later = new Date(Date.now() + 60 * 1000); + const sooner = new Date(Date.now() + 30 * 1000); + + insertStep(run.id, action.revision_id, {ready_at: later.toISOString()}); + insertStep(run.id, action.revision_id, {ready_at: sooner.toISOString()}); + + const result = await repo.fetchAndLockSteps(10); + + assert.deepEqual(result.steps, []); + assert(result.nextStepReadyAt); + assert.equal(result.nextStepReadyAt.toISOString(), sooner.toISOString()); + }); + + it('respects the limit argument', async function () { + const automation = await getAutomationBySlug('member-welcome-email-free'); + const action = getActionByIndex(automation.id, 0); + const run = insertRun(automation.id); + const readyAt = new Date(Date.now() - 1000).toISOString(); + const firstStep = insertStep(run.id, action.revision_id, {ready_at: readyAt}); + const secondStep = insertStep(run.id, action.revision_id, {ready_at: readyAt}); + const thirdStep = insertStep(run.id, action.revision_id, {ready_at: readyAt}); + + const result = await repo.fetchAndLockSteps(2); + + assert.equal(result.steps.length, 2); + assert.equal(result.nextStepReadyAt, null); + + const lockId = assertSingleBatchLock(result.steps); + + const first = getStepById(firstStep.id); + const second = getStepById(secondStep.id); + const third = getStepById(thirdStep.id); + const allSteps = [first, second, third]; + + const lockedSteps = allSteps.filter(step => step.locked_by === lockId); + assert.equal(lockedSteps.length, 2); + + const notLockedSteps = allSteps.filter(step => step.locked_by !== lockId); + assert.equal(notLockedSteps.length, 1); + const [notLockedStep] = notLockedSteps; + assert(notLockedStep); + assert.equal(notLockedStep.locked_by, null); + assert.equal(notLockedStep.step_attempts, 0); + }); + + it('does not return the same steps to concurrent callers', async function () { + const automation = await getAutomationBySlug('member-welcome-email-free'); + const action = getActionByIndex(automation.id, 0); + const run = insertRun(automation.id); + const readyAt = new Date(Date.now() - 1000).toISOString(); + const readySteps = [ + insertStep(run.id, action.revision_id, {ready_at: readyAt}), + insertStep(run.id, action.revision_id, {ready_at: readyAt}), + insertStep(run.id, action.revision_id, {ready_at: readyAt}), + insertStep(run.id, action.revision_id, {ready_at: readyAt}) + ]; + + const [firstResult, secondResult] = await Promise.all([ + repo.fetchAndLockSteps(2), + repo.fetchAndLockSteps(2) + ]); + + const firstStepIds = new Set(firstResult.steps.map(step => step.id)); + const secondStepIds = new Set(secondResult.steps.map(step => step.id)); + assert.equal(firstStepIds.size, firstResult.steps.length); + assert.equal(secondStepIds.size, secondResult.steps.length); + assert.equal([...firstStepIds].some(id => secondStepIds.has(id)), false); + + const firstLockId = assertSingleBatchLock(firstResult.steps); + const secondLockId = assertSingleBatchLock(secondResult.steps); + assert.notEqual(firstLockId, secondLockId); + + const allSteps = readySteps.map(step => getStepById(step.id)); + const lockedSteps = allSteps.filter(step => step.locked_by !== null); + assert.equal(lockedSteps.length, firstResult.steps.length + secondResult.steps.length); + assert(lockedSteps.length <= readySteps.length); + }); + + it('handles concurrent locks in the same transaction', async function () { + const automation = await getAutomationBySlug('member-welcome-email-free'); + const action = getActionByIndex(automation.id, 0); + const run = insertRun(automation.id); + const readyAt = new Date(Date.now() - 1000).toISOString(); + const availableStep = insertStep(run.id, action.revision_id, {ready_at: readyAt}); + const contendedStep = insertStep(run.id, action.revision_id, {ready_at: readyAt}); + + let hasSimulatedLock = false; + const originalPrepare = database.prepare.bind(database); + sinon.stub(database, 'prepare').callsFake((source) => { + const statement = originalPrepare(source); + + const shouldSimulateLockBySomeoneElse = ( + !hasSimulatedLock && + source.includes('SELECT') && + source.includes('FROM automation_run_steps') + ); + if (!shouldSimulateLockBySomeoneElse) { + return statement; + } + + const originalAll = statement.all.bind(statement); + sinon.stub(statement, 'all').callsFake((...args) => { + const result = originalAll(...args); + + hasSimulatedLock = true; + + const lockedAt = new Date().toISOString(); + originalPrepare(` + UPDATE automation_run_steps + SET locked_by = ?, + locked_at = ?, + started_at = ?, + updated_at = ? + WHERE id = ? + `).run('contending-lock', lockedAt, lockedAt, lockedAt, contendedStep.id); + + return result; + }); + + return statement; + }); + + const result = await repo.fetchAndLockSteps(2); + + const actualStepIds = new Set(result.steps.map(step => step.id)); + const expectedStepIds = new Set([availableStep.id]); + assert.deepEqual(actualStepIds, expectedStepIds); + }); + }); }); diff --git a/ghost/core/test/unit/server/services/automations/poll.test.ts b/ghost/core/test/unit/server/services/automations/poll.test.ts new file mode 100644 index 00000000000..05a1fd966f4 --- /dev/null +++ b/ghost/core/test/unit/server/services/automations/poll.test.ts @@ -0,0 +1,69 @@ +import sinon from 'sinon'; + +import {poll} from '../../../../../core/server/services/automations/poll'; + +const MAX_STEPS_PER_BATCH = 100; + +type PollOptions = Parameters[0]; +type AutomationsApi = PollOptions['automationsApi']; + +type StubbedFunction unknown> = sinon.SinonStub, ReturnType>; + +type AutomationsApiStubs = { + [Method in keyof PollOptions['automationsApi']]: StubbedFunction; +}; + +type PollOptionsStubs = PollOptions & { + automationsApi: AutomationsApiStubs; + enqueueAnotherPollAt: StubbedFunction; +}; + +const fake = unknown>(): StubbedFunction => ( + sinon.stub, ReturnType>() +); + +describe('automations poll', function () { + let automationsApi: AutomationsApiStubs; + let options: PollOptionsStubs; + + beforeEach(function () { + automationsApi = { + fetchAndLockSteps: fake().resolves({steps: [], nextStepReadyAt: null}) + }; + + options = { + automationsApi, + enqueueAnotherPollAt: fake() + }; + }); + + afterEach(function () { + sinon.restore(); + }); + + it('does nothing in production (for now)', async function () { + // NOTE: We'll remove this test once we go live. + sinon.stub(process.env, 'NODE_ENV').value('production'); + + await poll(options); + + sinon.assert.notCalled(automationsApi.fetchAndLockSteps); + sinon.assert.notCalled(options.enqueueAnotherPollAt); + }); + + it('does nothing when no steps are ready', async function () { + await poll(options); + + sinon.assert.calledOnceWithExactly(automationsApi.fetchAndLockSteps, MAX_STEPS_PER_BATCH); + sinon.assert.notCalled(options.enqueueAnotherPollAt); + }); + + it('enqueues the next future poll when no steps are ready', async function () { + const nextStepReadyAt = new Date(Date.now() + 60 * 1000); + automationsApi.fetchAndLockSteps.resolves({steps: [], nextStepReadyAt}); + + await poll(options); + + sinon.assert.calledOnceWithExactly(options.enqueueAnotherPollAt, nextStepReadyAt); + }); +});