diff --git a/docs/jobs-queue/jobs.mdx b/docs/jobs-queue/jobs.mdx index 5c6c2082b30..f103ffac973 100644 --- a/docs/jobs-queue/jobs.mdx +++ b/docs/jobs-queue/jobs.mdx @@ -301,3 +301,9 @@ await payload.jobs.cancel({ }, }) ``` + +From within a task or workflow handler, you can also cancel the current job by throwing a `JobCancelledError`: + +```ts +throw new JobCancelledError('Job was cancelled') +``` diff --git a/docs/jobs-queue/tasks.mdx b/docs/jobs-queue/tasks.mdx index c88c731d077..5792bb376fa 100644 --- a/docs/jobs-queue/tasks.mdx +++ b/docs/jobs-queue/tasks.mdx @@ -252,6 +252,14 @@ handler: async ({ input, req }) => { } ``` +#### Preventing Job Retries + +From within a task or workflow handler, you can prevent the entire job from being retried by throwing a `JobCancelledError`: + +```ts +throw new JobCancelledError('Job was cancelled') +``` + #### Accessing Failure Information After a task fails, you can inspect the job to understand what went wrong: diff --git a/packages/payload/src/index.ts b/packages/payload/src/index.ts index f3b74fa6b49..1b7baee83c1 100644 --- a/packages/payload/src/index.ts +++ b/packages/payload/src/index.ts @@ -1719,6 +1719,7 @@ export type { WorkflowHandler, WorkflowTypes, } from './queues/config/types/workflowTypes.js' +export { JobCancelledError } from './queues/errors/index.js' export { countRunnableOrActiveJobsForQueue } from './queues/operations/handleSchedules/countRunnableOrActiveJobsForQueue.js' export { importHandlerPath } from './queues/operations/runJobs/runJob/importHandlerPath.js' diff --git a/packages/payload/src/queues/config/types/index.ts b/packages/payload/src/queues/config/types/index.ts index 0e5e7699e43..8d08675e660 100644 --- a/packages/payload/src/queues/config/types/index.ts +++ b/packages/payload/src/queues/config/types/index.ts @@ -1,5 +1,5 @@ import type { CollectionConfig, Job } from '../../../index.js' -import type { Payload, PayloadRequest, Sort } from '../../../types/index.js' +import type { MaybePromise, Payload, PayloadRequest, Sort } from '../../../types/index.js' import type { RunJobsSilent } from '../../localAPI.js' import type { RunJobsArgs } from '../../operations/runJobs/index.js' import type { JobStats } from '../global.js' @@ -69,7 +69,7 @@ export type RunJobAccessArgs = { req: PayloadRequest } -export type RunJobAccess = (args: RunJobAccessArgs) => boolean | Promise +export type RunJobAccess = (args: RunJobAccessArgs) => MaybePromise export type QueueJobAccessArgs = { req: PayloadRequest @@ -78,8 +78,8 @@ export type QueueJobAccessArgs = { export type CancelJobAccessArgs = { req: PayloadRequest } -export type CancelJobAccess = (args: CancelJobAccessArgs) => boolean | Promise -export type QueueJobAccess = (args: QueueJobAccessArgs) => boolean | Promise +export type CancelJobAccess = (args: CancelJobAccessArgs) => MaybePromise +export type QueueJobAccess = (args: QueueJobAccessArgs) => MaybePromise export type SanitizedJobsConfig = { /** @@ -130,9 +130,7 @@ export type JobsConfig = { * * @remark this property should not be used on serverless platforms like Vercel */ - autoRun?: - | ((payload: Payload) => AutorunCronConfig[] | Promise) - | AutorunCronConfig[] + autoRun?: ((payload: Payload) => MaybePromise) | AutorunCronConfig[] /** * Determine whether or not to delete a job after it has successfully completed. */ @@ -186,7 +184,7 @@ export type JobsConfig = { * @param payload * @returns boolean */ - shouldAutoRun?: (payload: Payload) => boolean | Promise + shouldAutoRun?: (payload: Payload) => MaybePromise /** * Define all possible tasks here */ @@ -205,8 +203,6 @@ export type Queueable = { workflowConfig?: WorkflowConfig } -type OptionalPromise = Promise | T - export type BeforeScheduleFn = (args: { defaultBeforeSchedule: BeforeScheduleFn /** @@ -215,7 +211,7 @@ export type BeforeScheduleFn = (args: { jobStats: JobStats queueable: Queueable req: PayloadRequest -}) => OptionalPromise<{ +}) => MaybePromise<{ input?: object shouldSchedule: boolean waitUntil?: Date @@ -250,7 +246,7 @@ export type AfterScheduleFn = ( status: 'skipped' } ), -) => OptionalPromise +) => MaybePromise export type ScheduleConfig = { /** diff --git a/packages/payload/src/queues/config/types/taskTypes.ts b/packages/payload/src/queues/config/types/taskTypes.ts index 2f51b951b2d..c53e5e0416d 100644 --- a/packages/payload/src/queues/config/types/taskTypes.ts +++ b/packages/payload/src/queues/config/types/taskTypes.ts @@ -1,4 +1,11 @@ -import type { Field, Job, PayloadRequest, StringKeyOf, TypedJobs } from '../../../index.js' +import type { + Field, + Job, + MaybePromise, + PayloadRequest, + StringKeyOf, + TypedJobs, +} from '../../../index.js' import type { ScheduleConfig } from './index.js' import type { SingleTaskStatus } from './workflowTypes.js' @@ -99,8 +106,6 @@ export type RunTaskFunctions = { [TTaskSlug in keyof TypedJobs['tasks']]: RunTaskFunction } -type MaybePromise = Promise | T - export type RunInlineTaskFunction = ( taskID: string, taskArgs: { @@ -151,7 +156,7 @@ export type TaskCallbackArgs = { export type ShouldRestoreFn = ( args: { taskStatus: SingleTaskStatus } & Omit, -) => boolean | Promise +) => MaybePromise export type TaskCallbackFn = (args: TaskCallbackArgs) => MaybePromise export type RetryConfig = { diff --git a/packages/payload/src/queues/config/types/workflowTypes.ts b/packages/payload/src/queues/config/types/workflowTypes.ts index 8c168251020..adf5c4ea520 100644 --- a/packages/payload/src/queues/config/types/workflowTypes.ts +++ b/packages/payload/src/queues/config/types/workflowTypes.ts @@ -1,6 +1,7 @@ import type { Field } from '../../../fields/config/types.js' import type { Job, + MaybePromise, PayloadRequest, StringKeyOf, TypedCollection, @@ -105,7 +106,7 @@ export type WorkflowHandler< job: Job req: PayloadRequest tasks: RunTaskFunctions -}) => Promise +}) => MaybePromise export type SingleTaskStatus = { complete: boolean diff --git a/packages/payload/src/queues/errors/index.ts b/packages/payload/src/queues/errors/index.ts index 56b4b46ec5a..5622b221636 100644 --- a/packages/payload/src/queues/errors/index.ts +++ b/packages/payload/src/queues/errors/index.ts @@ -39,13 +39,12 @@ export class WorkflowError extends Error { } } +/** + * Throw this error from within a task or workflow handler to cancel the job. + * Unlike failing a job (e.g. by throwing any other error), a cancelled job will not be retried. + */ export class JobCancelledError extends Error { - args: { - job: Job - } - - constructor(args: { job: Job }) { - super(`Job ${args.job.id} was cancelled`) - this.args = args + constructor(message: string) { + super(message) } } diff --git a/packages/payload/src/queues/operations/runJobs/index.ts b/packages/payload/src/queues/operations/runJobs/index.ts index 6c982bb538a..f5da8fddde6 100644 --- a/packages/payload/src/queues/operations/runJobs/index.ts +++ b/packages/payload/src/queues/operations/runJobs/index.ts @@ -70,7 +70,7 @@ export type RunJobsArgs = { export type RunJobsResult = { jobStatus?: Record /** - * If this is false, there for sure are no jobs remaining, regardless of the limit + * If this is true, there for sure are no jobs remaining, regardless of the limit */ noJobsRemaining?: boolean /** @@ -211,6 +211,13 @@ export const runJobs = async (args: RunJobsArgs): Promise => { } } + if (!jobs.length) { + return { + noJobsRemaining: true, + remainingJobsFromQueried: 0, + } + } + /** * Just for logging purposes, we want to know how many jobs are new and how many are existing (= already been tried). * This is only for logs - in the end we still want to run all jobs, regardless of whether they are new or existing. @@ -227,13 +234,6 @@ export const runJobs = async (args: RunJobsArgs): Promise => { { existingJobs: [] as Job[], newJobs: [] as Job[] }, ) - if (!jobs.length) { - return { - noJobsRemaining: true, - remainingJobsFromQueried: 0, - } - } - if (!silent || (typeof silent === 'object' && !silent.info)) { payload.logger.info({ msg: `Running ${jobs.length} jobs.`, @@ -348,6 +348,35 @@ export const runJobs = async (args: RunJobsArgs): Promise => { } } catch (error) { if (error instanceof JobCancelledError) { + if ( + // @ts-expect-error error is not typed + !job.error?.cancelled || + !job.hasError || + job.processing || + job.completedAt || + job.waitUntil + ) { + // When using the local API to cancel jobs, the local API will update the job data for us to ensure the job is cancelled. + // But when throwing a JobCancelledError within a task or workflow handler, we are responsible for updating the job data ourselves. + await updateJob({ + id: job.id, + data: { + completedAt: null, + error: { + cancelled: true, + message: error.message, + }, + hasError: true, + processing: false, + waitUntil: null, + }, + depth: 0, + disableTransaction: true, + req, + returning: false, + }) + } + return { id: job.id, result: { diff --git a/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts b/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts index d34786164e3..bb55e4d4e84 100644 --- a/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts +++ b/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts @@ -20,7 +20,7 @@ import type { } from '../../../config/types/workflowTypes.js' import type { UpdateJobFunction } from './getUpdateJobFunction.js' -import { TaskError } from '../../../errors/index.js' +import { JobCancelledError, TaskError } from '../../../errors/index.js' import { getCurrentDate } from '../../../utilities/getCurrentDate.js' import { getTaskHandlerFromConfig } from './importHandlerPath.js' @@ -146,6 +146,10 @@ export const getRunTaskFunction = ( }), }) } catch (err: any) { + if (err instanceof JobCancelledError) { + // Re-throw JobCancelledError to be handled by the top-level error handler + throw err + } throw new TaskError({ executedAt, input: input!, diff --git a/packages/payload/src/queues/operations/runJobs/runJob/getUpdateJobFunction.ts b/packages/payload/src/queues/operations/runJobs/runJob/getUpdateJobFunction.ts index 0751f1ba034..5e5a5cbbe1c 100644 --- a/packages/payload/src/queues/operations/runJobs/runJob/getUpdateJobFunction.ts +++ b/packages/payload/src/queues/operations/runJobs/runJob/getUpdateJobFunction.ts @@ -41,7 +41,7 @@ export function getUpdateJobFunction(job: Job, req: PayloadRequest): UpdateJobFu } if ((updatedJob?.error as Record)?.cancelled) { - throw new JobCancelledError({ job }) + throw new JobCancelledError(`Job ${job.id} was cancelled`) } return updatedJob diff --git a/packages/payload/src/types/index.ts b/packages/payload/src/types/index.ts index e3fa3cc8c92..a42f8837da3 100644 --- a/packages/payload/src/types/index.ts +++ b/packages/payload/src/types/index.ts @@ -281,3 +281,5 @@ export type PickPreserveOptional = Partial< Pick>> > & Pick>> + +export type MaybePromise = Promise | T diff --git a/test/queues/getConfig.ts b/test/queues/getConfig.ts index 7630b847a83..3539a7133dd 100644 --- a/test/queues/getConfig.ts +++ b/test/queues/getConfig.ts @@ -14,6 +14,7 @@ import { DoNothingTask } from './tasks/DoNothingTask.js' import { ExternalTask } from './tasks/ExternalTask.js' import { ReturnCustomErrorTask } from './tasks/ReturnCustomErrorTask.js' import { ReturnErrorTask } from './tasks/ReturnErrorTask.js' +import { SelfCancelTask } from './tasks/SelfCancelTask.js' import { ThrowErrorTask } from './tasks/ThrowErrorTask.js' import { UpdatePostStep2Task } from './tasks/UpdatePostStep2Task.js' import { UpdatePostTask } from './tasks/UpdatePostTask.js' @@ -30,6 +31,7 @@ import { retriesBackoffTestWorkflow } from './workflows/retriesBackoffTest.js' import { retriesRollbackTestWorkflow } from './workflows/retriesRollbackTest.js' import { retriesTestWorkflow } from './workflows/retriesTest.js' import { retriesWorkflowLevelTestWorkflow } from './workflows/retriesWorkflowLevelTest.js' +import { selfCancelWorkflow } from './workflows/selfCancel.js' import { subTaskWorkflow } from './workflows/subTask.js' import { subTaskFailsWorkflow } from './workflows/subTaskFails.js' import { updatePostWorkflow } from './workflows/updatePost.js' @@ -144,8 +146,10 @@ export const getConfig: () => Partial = () => ({ ReturnErrorTask, ReturnCustomErrorTask, DoNothingTask, + SelfCancelTask, ], workflows: [ + selfCancelWorkflow, updatePostWorkflow, updatePostJSONWorkflow, retriesTestWorkflow, diff --git a/test/queues/int.spec.ts b/test/queues/int.spec.ts index 05a4c432db0..be2149c9b65 100644 --- a/test/queues/int.spec.ts +++ b/test/queues/int.spec.ts @@ -626,9 +626,8 @@ describe('Queues - Payload', () => { expect(jobAfterRun.input.amountRetried).toBe(0) }) - /* // Task rollbacks are not supported in the current version of Payload. This test will be re-enabled when task rollbacks are supported once we figure out the transaction issues - it('ensure failed tasks are rolled back via transactions', async () => { + it.skip('ensure failed tasks are rolled back via transactions', async () => { const job = await payload.jobs.queue({ workflow: 'retriesRollbackTest', input: { @@ -639,7 +638,7 @@ describe('Queues - Payload', () => { let hasJobsRemaining = true while (hasJobsRemaining) { - const response = await payload.jobs.run({silent: true}) + const response = await payload.jobs.run({ silent: true }) if (response.noJobsRemaining) { hasJobsRemaining = false @@ -660,7 +659,7 @@ describe('Queues - Payload', () => { // @ts-expect-error amountRetried is new arbitrary data and not in the type expect(jobAfterRun.input.amountRetried).toBe(4) - })*/ + }) it('ensure backoff strategy of task is respected', async () => { payload.config.jobs.deleteJobOnComplete = false @@ -965,9 +964,8 @@ describe('Queues - Payload', () => { payload.config.jobs.workflows = workflowsRef }) - /* // Task rollbacks are not supported in the current version of Payload. This test will be re-enabled when task rollbacks are supported once we figure out the transaction issues - it('transaction test against payload-jobs collection', async () => { + it.skip('transaction test against payload-jobs collection', async () => { // This kinds of emulates what happens when multiple jobs are queued and then run in parallel. const runWorkflowFN = async (i: number) => { const { id } = await payload.create({ @@ -1002,7 +1000,7 @@ describe('Queues - Payload', () => { /** * T1 start */ - /* + const t2Req = isolateObjectProperty(t1Req, 'transactionID') delete t2Req.transactionID // @@ -1047,7 +1045,7 @@ describe('Queues - Payload', () => { /** * T1 end */ - /* + await payload.update({ collection: 'payload-jobs', id, @@ -1075,7 +1073,7 @@ describe('Queues - Payload', () => { }) expect(allSimples.totalDocs).toBe(30) - })*/ + }) it('can queue single tasks 8 times', async () => { for (let i = 0; i < 8; i++) { @@ -1480,6 +1478,14 @@ describe('Queues - Payload', () => { void payload.jobs.run({ silent: true }).catch((_ignored) => {}) await new Promise((resolve) => setTimeout(resolve, 1000)) + // Should be in processing - ensure job is running + const jobAfterRunProcessing = await payload.findByID({ + collection: 'payload-jobs', + id: job.id, + depth: 0, + }) + expect(jobAfterRunProcessing.processing).toBe(true) + // Should be in processing - cancel job await payload.jobs.cancelByID({ id: job.id, @@ -1502,6 +1508,11 @@ describe('Queues - Payload', () => { // @ts-expect-error error is not typed expect(jobAfterRun.error?.cancelled).toBe(true) expect(jobAfterRun.processing).toBe(false) + + // Ensure job is not retried + const runResponse = await payload.jobs.run({ silent: true }) + expect(runResponse.noJobsRemaining).toBe(true) + expect(runResponse.jobStatus).toBeUndefined() }) it('ensure jobs can be cancelled using payload.jobs.cancel', async () => { @@ -1542,6 +1553,202 @@ describe('Queues - Payload', () => { expect(jobAfterRun.processing).toBe(false) }) + it('ensure jobs can cancel themselves by throwing a JobCancelledError in workflow handler', async () => { + payload.config.jobs.deleteJobOnComplete = false + + /** + * First, verify that this job is retried if it simply failed + */ + { + const job = await payload.jobs.queue({ + workflow: 'selfCancel', + input: { + shouldCancel: false, + }, + }) + const runResponse = await payload.jobs.run({ silent: true }) + expect(runResponse.remainingJobsFromQueried).toBe(1) + expect(runResponse.jobStatus?.[job.id]?.status).toBe('error') + + const jobAfterRun = await payload.findByID({ + collection: 'payload-jobs', + id: job.id, + depth: 0, + }) + + // @ts-expect-error error is not typed + expect(jobAfterRun.error?.message).toBe('Failed, not cancelled') + expect(jobAfterRun.totalTried).toBe(1) + expect(jobAfterRun.hasError).toBe(false) + + const runResponse2 = await payload.jobs.run({ silent: true }) + expect(runResponse2.remainingJobsFromQueried).toBe(1) + expect(runResponse2.jobStatus?.[job.id]?.status).toBe('error') + + const jobAfterRun2 = await payload.findByID({ + collection: 'payload-jobs', + id: job.id, + depth: 0, + }) + + expect(jobAfterRun2.totalTried).toBe(2) + expect(jobAfterRun2.hasError).toBe(false) + } + + /** + * Cleanup + */ + await payload.db.deleteMany({ + collection: 'payload-jobs', + where: { + id: { + exists: true, + }, + }, + }) + + /** + * Now, verify the behavior when the job is cancelled by throwing a JobCancelledError in workflow handler + */ + { + const job = await payload.jobs.queue({ + workflow: 'selfCancel', + input: { + shouldCancel: true, + }, + }) + + const runResponse = await payload.jobs.run({ silent: true }) + expect(runResponse.remainingJobsFromQueried).toBe(0) + expect(runResponse.jobStatus?.[job.id]?.status).toBe('error-reached-max-retries') + + const jobAfterRun = await payload.findByID({ + collection: 'payload-jobs', + id: job.id, + depth: 0, + }) + + expect(Boolean(jobAfterRun.completedAt)).toBe(false) + expect(jobAfterRun.hasError).toBe(true) + // @ts-expect-error error is not typed + expect(jobAfterRun.error?.cancelled).toBe(true) + expect(jobAfterRun.processing).toBe(false) + + // Run again to ensure the job is not retried + const runResponse2 = await payload.jobs.run({ silent: true }) + expect(runResponse2.remainingJobsFromQueried).toBe(0) + expect(runResponse2.jobStatus).toBeUndefined() + + const jobAfterRun2 = await payload.findByID({ + collection: 'payload-jobs', + id: job.id, + depth: 0, + }) + + expect(jobAfterRun2.totalTried).toBe(jobAfterRun.totalTried) + expect(jobAfterRun2.hasError).toBe(true) + } + }) + + it('ensure jobs can cancel themselves by throwing a JobCancelledError in task handler', async () => { + payload.config.jobs.deleteJobOnComplete = false + + /** + * First, verify that this job is retried if it simply failed + */ + { + const job = await payload.jobs.queue({ + task: 'SelfCancel', + input: { + shouldCancel: false, + }, + }) + const runResponse = await payload.jobs.run({ silent: true }) + expect(runResponse.remainingJobsFromQueried).toBe(1) + expect(runResponse.jobStatus?.[job.id]?.status).toBe('error') + + const jobAfterRun = await payload.findByID({ + collection: 'payload-jobs', + id: job.id, + depth: 0, + }) + + expect(jobAfterRun.log?.length).toBe(1) + expect(jobAfterRun?.log?.[0]?.error?.message).toBe('Failed, not cancelled') + expect(jobAfterRun.totalTried).toBe(1) + expect(jobAfterRun.hasError).toBe(false) + + const runResponse2 = await payload.jobs.run({ silent: true }) + expect(runResponse2.remainingJobsFromQueried).toBe(1) + expect(runResponse2.jobStatus?.[job.id]?.status).toBe('error') + + const jobAfterRun2 = await payload.findByID({ + collection: 'payload-jobs', + id: job.id, + depth: 0, + }) + + expect(jobAfterRun2.totalTried).toBe(2) + expect(jobAfterRun2.hasError).toBe(false) + } + + /** + * Cleanup + */ + await payload.db.deleteMany({ + collection: 'payload-jobs', + where: { + id: { + exists: true, + }, + }, + }) + + /** + * Now, verify the behavior when the job is cancelled by throwing a JobCancelledError in task handler + */ + { + const job = await payload.jobs.queue({ + task: 'SelfCancel', + input: { + shouldCancel: true, + }, + }) + console.log('running job') + + const runResponse = await payload.jobs.run({ silent: true }) + console.log('runResponse', runResponse) + expect(runResponse.remainingJobsFromQueried).toBe(0) + expect(runResponse.jobStatus?.[job.id]?.status).toBe('error-reached-max-retries') + + const jobAfterRun = await payload.findByID({ + collection: 'payload-jobs', + id: job.id, + depth: 0, + }) + + expect(Boolean(jobAfterRun.completedAt)).toBe(false) + expect(jobAfterRun.hasError).toBe(true) + // @ts-expect-error error is not typed + expect(jobAfterRun.error?.cancelled).toBe(true) + expect(jobAfterRun.processing).toBe(false) + + // Run again to ensure the job is not retried + const runResponse2 = await payload.jobs.run({ silent: true }) + expect(runResponse2.remainingJobsFromQueried).toBe(0) + expect(runResponse2.jobStatus).toBeUndefined() + + const jobAfterRun2 = await payload.findByID({ + collection: 'payload-jobs', + id: job.id, + depth: 0, + }) + + expect(jobAfterRun2.totalTried).toBe(jobAfterRun.totalTried) + expect(jobAfterRun2.hasError).toBe(true) + } + }) + it('can tasks throw error', async () => { payload.config.jobs.deleteJobOnComplete = false diff --git a/test/queues/payload-types.ts b/test/queues/payload-types.ts index 34f847e9ab1..b18276a844a 100644 --- a/test/queues/payload-types.ts +++ b/test/queues/payload-types.ts @@ -110,12 +110,14 @@ export interface Config { ReturnError: TaskReturnError; ReturnCustomError: TaskReturnCustomError; DoNothingTask: TaskDoNothingTask; + SelfCancel: TaskSelfCancel; inline: { input: unknown; output: unknown; }; }; workflows: { + selfCancel: WorkflowSelfCancel; updatePost: MyUpdatePostWorkflowType; updatePostJSONWorkflow: WorkflowUpdatePostJSONWorkflow; retriesTest: WorkflowRetriesTest; @@ -299,7 +301,8 @@ export interface PayloadJob { | 'ThrowError' | 'ReturnError' | 'ReturnCustomError' - | 'DoNothingTask'; + | 'DoNothingTask' + | 'SelfCancel'; taskID: string; input?: | { @@ -334,6 +337,7 @@ export interface PayloadJob { | null; workflowSlug?: | ( + | 'selfCancel' | 'updatePost' | 'updatePostJSONWorkflow' | 'retriesTest' @@ -370,6 +374,7 @@ export interface PayloadJob { | 'ReturnError' | 'ReturnCustomError' | 'DoNothingTask' + | 'SelfCancel' ) | null; queue?: string | null; @@ -678,6 +683,25 @@ export interface TaskDoNothingTask { }; output?: unknown; } +/** + * This interface was referenced by `Config`'s JSON-Schema + * via the `definition` "TaskSelfCancel". + */ +export interface TaskSelfCancel { + input: { + shouldCancel?: boolean | null; + }; + output?: unknown; +} +/** + * This interface was referenced by `Config`'s JSON-Schema + * via the `definition` "WorkflowSelfCancel". + */ +export interface WorkflowSelfCancel { + input: { + shouldCancel?: boolean | null; + }; +} /** * This interface was referenced by `Config`'s JSON-Schema * via the `definition` "MyUpdatePostWorkflowType". diff --git a/test/queues/tasks/SelfCancelTask.ts b/test/queues/tasks/SelfCancelTask.ts new file mode 100644 index 00000000000..ac002e09e20 --- /dev/null +++ b/test/queues/tasks/SelfCancelTask.ts @@ -0,0 +1,22 @@ +import { JobCancelledError, type TaskConfig } from 'payload' + +export const SelfCancelTask: TaskConfig<'SelfCancel'> = { + slug: 'SelfCancel', + inputSchema: [ + { + name: 'shouldCancel', + type: 'checkbox', + defaultValue: false, + }, + ], + outputSchema: [], + // Set to 4, to test that this job is not retried despite the task level retries being set to 4 + retries: 4, + handler: ({ input }) => { + if (input.shouldCancel) { + console.log('222throwing error') + throw new JobCancelledError('Task was cancelled') + } + throw new Error('Failed, not cancelled') + }, +} diff --git a/test/queues/workflows/longRunning.ts b/test/queues/workflows/longRunning.ts index 34c0ce265c5..8d3f1d0a8e0 100644 --- a/test/queues/workflows/longRunning.ts +++ b/test/queues/workflows/longRunning.ts @@ -6,6 +6,8 @@ import type { WorkflowConfig } from 'payload' export const longRunningWorkflow: WorkflowConfig<'longRunning'> = { slug: 'longRunning', inputSchema: [], + // Set to 4, to test that this job is not retried despite the workflow level retries being set to 4 + retries: 4, handler: async ({ inlineTask }) => { for (let i = 0; i < 4; i += 1) { await inlineTask(String(i), { diff --git a/test/queues/workflows/selfCancel.ts b/test/queues/workflows/selfCancel.ts new file mode 100644 index 00000000000..423afb40320 --- /dev/null +++ b/test/queues/workflows/selfCancel.ts @@ -0,0 +1,20 @@ +import { JobCancelledError, type WorkflowConfig } from 'payload' + +export const selfCancelWorkflow: WorkflowConfig<'selfCancel'> = { + slug: 'selfCancel', + inputSchema: [ + { + name: 'shouldCancel', + type: 'checkbox', + defaultValue: false, + }, + ], + // Set to 4, to test that this job is not retried despite the workflow level retries being set to 4 + retries: 4, + handler: ({ job }) => { + if (job.input.shouldCancel) { + throw new JobCancelledError('Job was cancelled') + } + throw new Error('Failed, not cancelled') + }, +}