Skip to content

Commit 1bd3146

Browse files
authored
feat: ability to cancel current job from within workflow or task handlers (#15119)
This PR exports a new `JobCancelledError` error class. Throwing it from within a task or workflow handler behaves similarly to throwing any other error, with the difference that the job won't retry anymore.
1 parent e214deb commit 1bd3146

File tree

17 files changed

+374
-44
lines changed

17 files changed

+374
-44
lines changed

docs/jobs-queue/jobs.mdx

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,3 +301,9 @@ await payload.jobs.cancel({
301301
},
302302
})
303303
```
304+
305+
From within a task or workflow handler, you can also cancel the current job by throwing a `JobCancelledError`:
306+
307+
```ts
308+
throw new JobCancelledError('Job was cancelled')
309+
```

docs/jobs-queue/tasks.mdx

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,14 @@ handler: async ({ input, req }) => {
252252
}
253253
```
254254

255+
#### Preventing Job Retries
256+
257+
From within a task or workflow handler, you can prevent the entire job from being retried by throwing a `JobCancelledError`:
258+
259+
```ts
260+
throw new JobCancelledError('Job was cancelled')
261+
```
262+
255263
#### Accessing Failure Information
256264

257265
After a task fails, you can inspect the job to understand what went wrong:

packages/payload/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1719,6 +1719,7 @@ export type {
17191719
WorkflowHandler,
17201720
WorkflowTypes,
17211721
} from './queues/config/types/workflowTypes.js'
1722+
export { JobCancelledError } from './queues/errors/index.js'
17221723

17231724
export { countRunnableOrActiveJobsForQueue } from './queues/operations/handleSchedules/countRunnableOrActiveJobsForQueue.js'
17241725
export { importHandlerPath } from './queues/operations/runJobs/runJob/importHandlerPath.js'

packages/payload/src/queues/config/types/index.ts

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { CollectionConfig, Job } from '../../../index.js'
2-
import type { Payload, PayloadRequest, Sort } from '../../../types/index.js'
2+
import type { MaybePromise, Payload, PayloadRequest, Sort } from '../../../types/index.js'
33
import type { RunJobsSilent } from '../../localAPI.js'
44
import type { RunJobsArgs } from '../../operations/runJobs/index.js'
55
import type { JobStats } from '../global.js'
@@ -69,7 +69,7 @@ export type RunJobAccessArgs = {
6969
req: PayloadRequest
7070
}
7171

72-
export type RunJobAccess = (args: RunJobAccessArgs) => boolean | Promise<boolean>
72+
export type RunJobAccess = (args: RunJobAccessArgs) => MaybePromise<boolean>
7373

7474
export type QueueJobAccessArgs = {
7575
req: PayloadRequest
@@ -78,8 +78,8 @@ export type QueueJobAccessArgs = {
7878
export type CancelJobAccessArgs = {
7979
req: PayloadRequest
8080
}
81-
export type CancelJobAccess = (args: CancelJobAccessArgs) => boolean | Promise<boolean>
82-
export type QueueJobAccess = (args: QueueJobAccessArgs) => boolean | Promise<boolean>
81+
export type CancelJobAccess = (args: CancelJobAccessArgs) => MaybePromise<boolean>
82+
export type QueueJobAccess = (args: QueueJobAccessArgs) => MaybePromise<boolean>
8383

8484
export type SanitizedJobsConfig = {
8585
/**
@@ -130,9 +130,7 @@ export type JobsConfig = {
130130
*
131131
* @remark this property should not be used on serverless platforms like Vercel
132132
*/
133-
autoRun?:
134-
| ((payload: Payload) => AutorunCronConfig[] | Promise<AutorunCronConfig[]>)
135-
| AutorunCronConfig[]
133+
autoRun?: ((payload: Payload) => MaybePromise<AutorunCronConfig[]>) | AutorunCronConfig[]
136134
/**
137135
* Determine whether or not to delete a job after it has successfully completed.
138136
*/
@@ -186,7 +184,7 @@ export type JobsConfig = {
186184
* @param payload
187185
* @returns boolean
188186
*/
189-
shouldAutoRun?: (payload: Payload) => boolean | Promise<boolean>
187+
shouldAutoRun?: (payload: Payload) => MaybePromise<boolean>
190188
/**
191189
* Define all possible tasks here
192190
*/
@@ -205,8 +203,6 @@ export type Queueable = {
205203
workflowConfig?: WorkflowConfig
206204
}
207205

208-
type OptionalPromise<T> = Promise<T> | T
209-
210206
export type BeforeScheduleFn = (args: {
211207
defaultBeforeSchedule: BeforeScheduleFn
212208
/**
@@ -215,7 +211,7 @@ export type BeforeScheduleFn = (args: {
215211
jobStats: JobStats
216212
queueable: Queueable
217213
req: PayloadRequest
218-
}) => OptionalPromise<{
214+
}) => MaybePromise<{
219215
input?: object
220216
shouldSchedule: boolean
221217
waitUntil?: Date
@@ -250,7 +246,7 @@ export type AfterScheduleFn = (
250246
status: 'skipped'
251247
}
252248
),
253-
) => OptionalPromise<void>
249+
) => MaybePromise<void>
254250

255251
export type ScheduleConfig = {
256252
/**

packages/payload/src/queues/config/types/taskTypes.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,11 @@
1-
import type { Field, Job, PayloadRequest, StringKeyOf, TypedJobs } from '../../../index.js'
1+
import type {
2+
Field,
3+
Job,
4+
MaybePromise,
5+
PayloadRequest,
6+
StringKeyOf,
7+
TypedJobs,
8+
} from '../../../index.js'
29
import type { ScheduleConfig } from './index.js'
310
import type { SingleTaskStatus } from './workflowTypes.js'
411

@@ -99,8 +106,6 @@ export type RunTaskFunctions = {
99106
[TTaskSlug in keyof TypedJobs['tasks']]: RunTaskFunction<TTaskSlug>
100107
}
101108

102-
type MaybePromise<T> = Promise<T> | T
103-
104109
export type RunInlineTaskFunction = <TTaskInput extends object, TTaskOutput extends object>(
105110
taskID: string,
106111
taskArgs: {
@@ -151,7 +156,7 @@ export type TaskCallbackArgs = {
151156

152157
export type ShouldRestoreFn = (
153158
args: { taskStatus: SingleTaskStatus<string> } & Omit<TaskCallbackArgs, 'taskStatus'>,
154-
) => boolean | Promise<boolean>
159+
) => MaybePromise<boolean>
155160
export type TaskCallbackFn = (args: TaskCallbackArgs) => MaybePromise<void>
156161

157162
export type RetryConfig = {

packages/payload/src/queues/config/types/workflowTypes.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import type { Field } from '../../../fields/config/types.js'
22
import type {
33
Job,
4+
MaybePromise,
45
PayloadRequest,
56
StringKeyOf,
67
TypedCollection,
@@ -105,7 +106,7 @@ export type WorkflowHandler<
105106
job: Job<TWorkflowSlugOrInput>
106107
req: PayloadRequest
107108
tasks: RunTaskFunctions
108-
}) => Promise<void>
109+
}) => MaybePromise<void>
109110

110111
export type SingleTaskStatus<T extends keyof TypedJobs['tasks']> = {
111112
complete: boolean

packages/payload/src/queues/errors/index.ts

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,12 @@ export class WorkflowError extends Error {
3939
}
4040
}
4141

42+
/**
43+
* Throw this error from within a task or workflow handler to cancel the job.
44+
* Unlike failing a job (e.g. by throwing any other error), a cancelled job will not be retried.
45+
*/
4246
export class JobCancelledError extends Error {
43-
args: {
44-
job: Job
45-
}
46-
47-
constructor(args: { job: Job }) {
48-
super(`Job ${args.job.id} was cancelled`)
49-
this.args = args
47+
constructor(message: string) {
48+
super(message)
5049
}
5150
}

packages/payload/src/queues/operations/runJobs/index.ts

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ export type RunJobsArgs = {
7070
export type RunJobsResult = {
7171
jobStatus?: Record<string, RunJobResult>
7272
/**
73-
* If this is false, there for sure are no jobs remaining, regardless of the limit
73+
* If this is true, there for sure are no jobs remaining, regardless of the limit
7474
*/
7575
noJobsRemaining?: boolean
7676
/**
@@ -211,6 +211,13 @@ export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
211211
}
212212
}
213213

214+
if (!jobs.length) {
215+
return {
216+
noJobsRemaining: true,
217+
remainingJobsFromQueried: 0,
218+
}
219+
}
220+
214221
/**
215222
* Just for logging purposes, we want to know how many jobs are new and how many are existing (= already been tried).
216223
* This is only for logs - in the end we still want to run all jobs, regardless of whether they are new or existing.
@@ -227,13 +234,6 @@ export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
227234
{ existingJobs: [] as Job[], newJobs: [] as Job[] },
228235
)
229236

230-
if (!jobs.length) {
231-
return {
232-
noJobsRemaining: true,
233-
remainingJobsFromQueried: 0,
234-
}
235-
}
236-
237237
if (!silent || (typeof silent === 'object' && !silent.info)) {
238238
payload.logger.info({
239239
msg: `Running ${jobs.length} jobs.`,
@@ -348,6 +348,35 @@ export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
348348
}
349349
} catch (error) {
350350
if (error instanceof JobCancelledError) {
351+
if (
352+
// @ts-expect-error error is not typed
353+
!job.error?.cancelled ||
354+
!job.hasError ||
355+
job.processing ||
356+
job.completedAt ||
357+
job.waitUntil
358+
) {
359+
// When using the local API to cancel jobs, the local API will update the job data for us to ensure the job is cancelled.
360+
// But when throwing a JobCancelledError within a task or workflow handler, we are responsible for updating the job data ourselves.
361+
await updateJob({
362+
id: job.id,
363+
data: {
364+
completedAt: null,
365+
error: {
366+
cancelled: true,
367+
message: error.message,
368+
},
369+
hasError: true,
370+
processing: false,
371+
waitUntil: null,
372+
},
373+
depth: 0,
374+
disableTransaction: true,
375+
req,
376+
returning: false,
377+
})
378+
}
379+
351380
return {
352381
id: job.id,
353382
result: {

packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import type {
2020
} from '../../../config/types/workflowTypes.js'
2121
import type { UpdateJobFunction } from './getUpdateJobFunction.js'
2222

23-
import { TaskError } from '../../../errors/index.js'
23+
import { JobCancelledError, TaskError } from '../../../errors/index.js'
2424
import { getCurrentDate } from '../../../utilities/getCurrentDate.js'
2525
import { getTaskHandlerFromConfig } from './importHandlerPath.js'
2626

@@ -146,6 +146,10 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
146146
}),
147147
})
148148
} catch (err: any) {
149+
if (err instanceof JobCancelledError) {
150+
// Re-throw JobCancelledError to be handled by the top-level error handler
151+
throw err
152+
}
149153
throw new TaskError({
150154
executedAt,
151155
input: input!,

packages/payload/src/queues/operations/runJobs/runJob/getUpdateJobFunction.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ export function getUpdateJobFunction(job: Job, req: PayloadRequest): UpdateJobFu
4141
}
4242

4343
if ((updatedJob?.error as Record<string, unknown>)?.cancelled) {
44-
throw new JobCancelledError({ job })
44+
throw new JobCancelledError(`Job ${job.id} was cancelled`)
4545
}
4646

4747
return updatedJob

0 commit comments

Comments
 (0)