Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions docs/jobs-queue/jobs.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -301,3 +301,9 @@ await payload.jobs.cancel({
},
})
```

From within a task or workflow handler, you can also cancel the current job by throwing a `JobCancelledError`:

```ts
throw new JobCancelledError('Job was cancelled')
```
8 changes: 8 additions & 0 deletions docs/jobs-queue/tasks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,14 @@ handler: async ({ input, req }) => {
}
```

#### Preventing Job Retries

From within a task or workflow handler, you can prevent the entire job from being retried by throwing a `JobCancelledError`:

```ts
throw new JobCancelledError('Job was cancelled')
```

#### Accessing Failure Information

After a task fails, you can inspect the job to understand what went wrong:
Expand Down
1 change: 1 addition & 0 deletions packages/payload/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1719,6 +1719,7 @@ export type {
WorkflowHandler,
WorkflowTypes,
} from './queues/config/types/workflowTypes.js'
export { JobCancelledError } from './queues/errors/index.js'

export { countRunnableOrActiveJobsForQueue } from './queues/operations/handleSchedules/countRunnableOrActiveJobsForQueue.js'
export { importHandlerPath } from './queues/operations/runJobs/runJob/importHandlerPath.js'
Expand Down
20 changes: 8 additions & 12 deletions packages/payload/src/queues/config/types/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { CollectionConfig, Job } from '../../../index.js'
import type { Payload, PayloadRequest, Sort } from '../../../types/index.js'
import type { MaybePromise, Payload, PayloadRequest, Sort } from '../../../types/index.js'
import type { RunJobsSilent } from '../../localAPI.js'
import type { RunJobsArgs } from '../../operations/runJobs/index.js'
import type { JobStats } from '../global.js'
Expand Down Expand Up @@ -69,7 +69,7 @@ export type RunJobAccessArgs = {
req: PayloadRequest
}

export type RunJobAccess = (args: RunJobAccessArgs) => boolean | Promise<boolean>
export type RunJobAccess = (args: RunJobAccessArgs) => MaybePromise<boolean>

export type QueueJobAccessArgs = {
req: PayloadRequest
Expand All @@ -78,8 +78,8 @@ export type QueueJobAccessArgs = {
export type CancelJobAccessArgs = {
req: PayloadRequest
}
export type CancelJobAccess = (args: CancelJobAccessArgs) => boolean | Promise<boolean>
export type QueueJobAccess = (args: QueueJobAccessArgs) => boolean | Promise<boolean>
export type CancelJobAccess = (args: CancelJobAccessArgs) => MaybePromise<boolean>
export type QueueJobAccess = (args: QueueJobAccessArgs) => MaybePromise<boolean>

export type SanitizedJobsConfig = {
/**
Expand Down Expand Up @@ -130,9 +130,7 @@ export type JobsConfig = {
*
* @remark this property should not be used on serverless platforms like Vercel
*/
autoRun?:
| ((payload: Payload) => AutorunCronConfig[] | Promise<AutorunCronConfig[]>)
| AutorunCronConfig[]
autoRun?: ((payload: Payload) => MaybePromise<AutorunCronConfig[]>) | AutorunCronConfig[]
/**
* Determine whether or not to delete a job after it has successfully completed.
*/
Expand Down Expand Up @@ -186,7 +184,7 @@ export type JobsConfig = {
* @param payload
* @returns boolean
*/
shouldAutoRun?: (payload: Payload) => boolean | Promise<boolean>
shouldAutoRun?: (payload: Payload) => MaybePromise<boolean>
/**
* Define all possible tasks here
*/
Expand All @@ -205,8 +203,6 @@ export type Queueable = {
workflowConfig?: WorkflowConfig
}

type OptionalPromise<T> = Promise<T> | T

export type BeforeScheduleFn = (args: {
defaultBeforeSchedule: BeforeScheduleFn
/**
Expand All @@ -215,7 +211,7 @@ export type BeforeScheduleFn = (args: {
jobStats: JobStats
queueable: Queueable
req: PayloadRequest
}) => OptionalPromise<{
}) => MaybePromise<{
input?: object
shouldSchedule: boolean
waitUntil?: Date
Expand Down Expand Up @@ -250,7 +246,7 @@ export type AfterScheduleFn = (
status: 'skipped'
}
),
) => OptionalPromise<void>
) => MaybePromise<void>

export type ScheduleConfig = {
/**
Expand Down
13 changes: 9 additions & 4 deletions packages/payload/src/queues/config/types/taskTypes.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import type { Field, Job, PayloadRequest, StringKeyOf, TypedJobs } from '../../../index.js'
import type {
Field,
Job,
MaybePromise,
PayloadRequest,
StringKeyOf,
TypedJobs,
} from '../../../index.js'
import type { ScheduleConfig } from './index.js'
import type { SingleTaskStatus } from './workflowTypes.js'

Expand Down Expand Up @@ -99,8 +106,6 @@ export type RunTaskFunctions = {
[TTaskSlug in keyof TypedJobs['tasks']]: RunTaskFunction<TTaskSlug>
}

type MaybePromise<T> = Promise<T> | T

export type RunInlineTaskFunction = <TTaskInput extends object, TTaskOutput extends object>(
taskID: string,
taskArgs: {
Expand Down Expand Up @@ -151,7 +156,7 @@ export type TaskCallbackArgs = {

export type ShouldRestoreFn = (
args: { taskStatus: SingleTaskStatus<string> } & Omit<TaskCallbackArgs, 'taskStatus'>,
) => boolean | Promise<boolean>
) => MaybePromise<boolean>
export type TaskCallbackFn = (args: TaskCallbackArgs) => MaybePromise<void>

export type RetryConfig = {
Expand Down
3 changes: 2 additions & 1 deletion packages/payload/src/queues/config/types/workflowTypes.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { Field } from '../../../fields/config/types.js'
import type {
Job,
MaybePromise,
PayloadRequest,
StringKeyOf,
TypedCollection,
Expand Down Expand Up @@ -105,7 +106,7 @@ export type WorkflowHandler<
job: Job<TWorkflowSlugOrInput>
req: PayloadRequest
tasks: RunTaskFunctions
}) => Promise<void>
}) => MaybePromise<void>

export type SingleTaskStatus<T extends keyof TypedJobs['tasks']> = {
complete: boolean
Expand Down
13 changes: 6 additions & 7 deletions packages/payload/src/queues/errors/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,12 @@ export class WorkflowError extends Error {
}
}

/**
* Throw this error from within a task or workflow handler to cancel the job.
* Unlike failing a job (e.g. by throwing any other error), a cancelled job will not be retried.
*/
export class JobCancelledError extends Error {
args: {
job: Job
}

constructor(args: { job: Job }) {
super(`Job ${args.job.id} was cancelled`)
this.args = args
constructor(message: string) {
super(message)
}
}
45 changes: 37 additions & 8 deletions packages/payload/src/queues/operations/runJobs/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ export type RunJobsArgs = {
export type RunJobsResult = {
jobStatus?: Record<string, RunJobResult>
/**
* If this is false, there for sure are no jobs remaining, regardless of the limit
* If this is true, there for sure are no jobs remaining, regardless of the limit
*/
noJobsRemaining?: boolean
/**
Expand Down Expand Up @@ -211,6 +211,13 @@ export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
}
}

if (!jobs.length) {
return {
noJobsRemaining: true,
remainingJobsFromQueried: 0,
}
}

/**
* Just for logging purposes, we want to know how many jobs are new and how many are existing (= already been tried).
* This is only for logs - in the end we still want to run all jobs, regardless of whether they are new or existing.
Expand All @@ -227,13 +234,6 @@ export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
{ existingJobs: [] as Job[], newJobs: [] as Job[] },
)

if (!jobs.length) {
return {
noJobsRemaining: true,
remainingJobsFromQueried: 0,
}
}

if (!silent || (typeof silent === 'object' && !silent.info)) {
payload.logger.info({
msg: `Running ${jobs.length} jobs.`,
Expand Down Expand Up @@ -348,6 +348,35 @@ export const runJobs = async (args: RunJobsArgs): Promise<RunJobsResult> => {
}
} catch (error) {
if (error instanceof JobCancelledError) {
if (
// @ts-expect-error error is not typed
!job.error?.cancelled ||
!job.hasError ||
job.processing ||
job.completedAt ||
job.waitUntil
) {
// When using the local API to cancel jobs, the local API will update the job data for us to ensure the job is cancelled.
// But when throwing a JobCancelledError within a task or workflow handler, we are responsible for updating the job data ourselves.
await updateJob({
id: job.id,
data: {
completedAt: null,
error: {
cancelled: true,
message: error.message,
},
hasError: true,
processing: false,
waitUntil: null,
},
depth: 0,
disableTransaction: true,
req,
returning: false,
})
}

return {
id: job.id,
result: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import type {
} from '../../../config/types/workflowTypes.js'
import type { UpdateJobFunction } from './getUpdateJobFunction.js'

import { TaskError } from '../../../errors/index.js'
import { JobCancelledError, TaskError } from '../../../errors/index.js'
import { getCurrentDate } from '../../../utilities/getCurrentDate.js'
import { getTaskHandlerFromConfig } from './importHandlerPath.js'

Expand Down Expand Up @@ -146,6 +146,10 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
}),
})
} catch (err: any) {
if (err instanceof JobCancelledError) {
// Re-throw JobCancelledError to be handled by the top-level error handler
throw err
}
throw new TaskError({
executedAt,
input: input!,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export function getUpdateJobFunction(job: Job, req: PayloadRequest): UpdateJobFu
}

if ((updatedJob?.error as Record<string, unknown>)?.cancelled) {
throw new JobCancelledError({ job })
throw new JobCancelledError(`Job ${job.id} was cancelled`)
}

return updatedJob
Expand Down
2 changes: 2 additions & 0 deletions packages/payload/src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,3 +281,5 @@ export type PickPreserveOptional<T, K extends keyof T> = Partial<
Pick<T, Extract<K, OptionalKeys<T>>>
> &
Pick<T, Extract<K, RequiredKeys<T>>>

export type MaybePromise<T> = Promise<T> | T
4 changes: 4 additions & 0 deletions test/queues/getConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { DoNothingTask } from './tasks/DoNothingTask.js'
import { ExternalTask } from './tasks/ExternalTask.js'
import { ReturnCustomErrorTask } from './tasks/ReturnCustomErrorTask.js'
import { ReturnErrorTask } from './tasks/ReturnErrorTask.js'
import { SelfCancelTask } from './tasks/SelfCancelTask.js'
import { ThrowErrorTask } from './tasks/ThrowErrorTask.js'
import { UpdatePostStep2Task } from './tasks/UpdatePostStep2Task.js'
import { UpdatePostTask } from './tasks/UpdatePostTask.js'
Expand All @@ -30,6 +31,7 @@ import { retriesBackoffTestWorkflow } from './workflows/retriesBackoffTest.js'
import { retriesRollbackTestWorkflow } from './workflows/retriesRollbackTest.js'
import { retriesTestWorkflow } from './workflows/retriesTest.js'
import { retriesWorkflowLevelTestWorkflow } from './workflows/retriesWorkflowLevelTest.js'
import { selfCancelWorkflow } from './workflows/selfCancel.js'
import { subTaskWorkflow } from './workflows/subTask.js'
import { subTaskFailsWorkflow } from './workflows/subTaskFails.js'
import { updatePostWorkflow } from './workflows/updatePost.js'
Expand Down Expand Up @@ -144,8 +146,10 @@ export const getConfig: () => Partial<Config> = () => ({
ReturnErrorTask,
ReturnCustomErrorTask,
DoNothingTask,
SelfCancelTask,
],
workflows: [
selfCancelWorkflow,
updatePostWorkflow,
updatePostJSONWorkflow,
retriesTestWorkflow,
Expand Down
Loading
Loading