Skip to content

Commit 181a262

Browse files
committed
cleanup
1 parent 873e287 commit 181a262

File tree

6 files changed

+184
-30
lines changed

6 files changed

+184
-30
lines changed

frontend/docs/pages/home/cancellation.mdx

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,26 @@ When a task is canceled, Hatchet sends a cancellation signal to the task. The ta
3333

3434
/>
3535

36+
### AbortError behavior
37+
38+
Hatchet cancellation in TypeScript is driven by an internal `ctx.abortController.signal` (an `AbortSignal`). When a task is cancelled, Hatchet aborts that signal and cancellation-aware operations (like waiting on a child run result) will reject with an **`AbortError`**.
39+
40+
<Callout type="warning">
41+
**Important:** JavaScript/TypeScript cannot make cancellation “uncatchable”. A broad `catch (e) { ... }` can swallow cancellation. Hatchet’s SDK will avoid enqueueing new child runs once the parent task is cancelled, and it will not report a cancelled task as “completed” even if user code catches the abort — but your code should still exit quickly to avoid wasted work.
42+
</Callout>
43+
44+
If you must catch errors, re-throw abort/cancellation errors:
45+
46+
```ts
47+
try {
48+
// ... work ...
49+
await simple.run({});
50+
} catch (e) {
51+
ctx.rethrowIfCancelled(e);
52+
// ... other error handling ...
53+
}
54+
```
55+
3656
</Tabs.Tab>
3757
<Tabs.Tab title="Go">
3858
<Snippet src={snippets.go.cancellations.main.cancelled_task}/>

sdks/typescript/src/clients/worker/worker.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import { WebhookWorkerCreateRequest } from '@clients/rest/generated/data-contrac
3131
import { WorkflowDefinition } from '@hatchet/v1';
3232
import { NonRetryableError } from '@hatchet/v1/task';
3333
import { applyNamespace } from '@hatchet/util/apply-namespace';
34+
import { throwIfAborted } from '@hatchet/util/abort-error';
3435
import { V0Context, CreateStep, V0DurableContext, mapRateLimit, StepRunFunction } from '../../step';
3536
import { WorkerLabels } from '../dispatcher/dispatcher-client';
3637

@@ -253,10 +254,16 @@ export class V0Worker {
253254
}
254255

255256
const run = async () => {
257+
// Precheck: if cancellation already happened, don't execute user code.
258+
throwIfAborted(context.controller?.signal);
256259
return step(context);
257260
};
258261

259262
const success = async (result: any) => {
263+
// If cancellation happened, do not report completion.
264+
if (context.controller?.signal?.aborted) {
265+
return;
266+
}
260267
this.logger.info(taskRunLog(taskName, taskRunExternalId, 'completed'));
261268

262269
try {
@@ -302,6 +309,10 @@ export class V0Worker {
302309
};
303310

304311
const failure = async (error: any) => {
312+
// If cancellation happened, do not report failure.
313+
if (context.controller?.signal?.aborted) {
314+
return;
315+
}
305316
this.logger.error(taskRunLog(taskName, taskRunExternalId, `failed: ${error.message}`));
306317

307318
if (error.stack) {
@@ -341,6 +352,19 @@ export class V0Worker {
341352
await failure(e);
342353
return;
343354
}
355+
356+
// Postcheck: user code may swallow AbortError; don't report completion after cancellation.
357+
// If we reached this point and the signal is aborted, the task likely caught/ignored cancellation.
358+
if (context.controller?.signal?.aborted) {
359+
this.logger.warn(
360+
`Cancellation: task run ${taskRunExternalId} returned after cancellation was signaled. ` +
361+
`This usually means an AbortError was caught and not propagated. ` +
362+
`See https://docs.hatchet.run/home/cancellation`
363+
);
364+
return;
365+
}
366+
throwIfAborted(context.controller?.signal);
367+
344368
await success(result);
345369
})()
346370
);

sdks/typescript/src/util/abort-error.ts

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,89 @@ export function createAbortError(message = 'Operation aborted'): Error {
44
err.code = 'ABORT_ERR';
55
return err as Error;
66
}
7+
8+
export function isAbortError(err: unknown): err is Error {
9+
return (
10+
err instanceof Error &&
11+
(err.name === 'AbortError' || (err as any).code === 'ABORT_ERR')
12+
);
13+
}
14+
15+
/**
16+
* Helper to be used inside broad `catch` blocks so cancellation isn't accidentally swallowed.
17+
*
18+
* Example:
19+
* ```ts
20+
* try { ... } catch (e) { rethrowIfAborted(e); ... }
21+
* ```
22+
*/
23+
export function rethrowIfAborted(err: unknown): void {
24+
if (isAbortError(err)) {
25+
throw err;
26+
}
27+
}
28+
29+
export type ThrowIfAbortedOpts = {
30+
/**
31+
* Optional: called before throwing when the signal is aborted.
32+
* This lets callsites attach logging without coupling this util to a logger implementation.
33+
*/
34+
warn?: (message: string) => void;
35+
36+
/**
37+
* If true, emits a generic warning intended for "trigger/enqueue" paths.
38+
*/
39+
isTrigger?: boolean;
40+
41+
/**
42+
* Optional context used to make warnings consistent, e.g. "task run <id>".
43+
*/
44+
context?: string;
45+
46+
/**
47+
* Message used when the AbortSignal doesn't provide a reason.
48+
*/
49+
defaultMessage?: string;
50+
};
51+
52+
/**
53+
* Throws an AbortError if the provided signal is aborted.
54+
*
55+
* Notes:
56+
* - In JS/TS, `catch` can swallow any thrown value, so this is best-effort.
57+
* - We prefer throwing the signal's `reason` when it is already an Error.
58+
*/
59+
export function throwIfAborted(
60+
signal: AbortSignal | undefined,
61+
optsOrDefaultMessage: ThrowIfAbortedOpts | string = 'Operation cancelled by AbortSignal'
62+
): void {
63+
if (!signal?.aborted) {
64+
return;
65+
}
66+
67+
const opts: ThrowIfAbortedOpts =
68+
typeof optsOrDefaultMessage === 'string'
69+
? { defaultMessage: optsOrDefaultMessage }
70+
: (optsOrDefaultMessage ?? {});
71+
72+
if (opts.isTrigger) {
73+
const ctx = opts.context ? `${opts.context} ` : '';
74+
opts.warn?.(
75+
`Cancellation: ${ctx}attempted to enqueue/trigger work after cancellation was signaled. ` +
76+
`This usually means an AbortError was caught and not propagated. ` +
77+
`See https://docs.hatchet.run/home/cancellation`
78+
);
79+
}
80+
81+
const reason = (signal as any).reason;
82+
83+
if (reason instanceof Error) {
84+
throw reason;
85+
}
86+
87+
if (typeof reason === 'string' && reason.length > 0) {
88+
throw createAbortError(reason);
89+
}
90+
91+
throw createAbortError(opts.defaultMessage ?? 'Operation cancelled by AbortSignal');
92+
}

sdks/typescript/src/v1/client/worker/context.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import { Action as ConditionAction } from '@hatchet/protoc/v1/shared/condition';
2121
import { HatchetClient } from '@hatchet/v1';
2222
import { ContextWorker, NextStep } from '@hatchet/step';
2323
import { applyNamespace } from '@hatchet/util/apply-namespace';
24-
import { createAbortError } from '@hatchet/util/abort-error';
24+
import { createAbortError, rethrowIfAborted } from '@hatchet/util/abort-error';
2525
import { V1Worker } from './worker-internal';
2626
import { Duration } from '../duration';
2727

@@ -97,6 +97,18 @@ export class Context<T, K = {}> {
9797
}
9898
}
9999

100+
/**
101+
* Helper for broad `catch` blocks so cancellation isn't accidentally swallowed.
102+
*
103+
* Example:
104+
* ```ts
105+
* try { ... } catch (e) { ctx.rethrowIfCancelled(e); ... }
106+
* ```
107+
*/
108+
rethrowIfCancelled(err: unknown): void {
109+
rethrowIfAborted(err);
110+
}
111+
100112
async cancel() {
101113
await this.v1.runs.cancel({
102114
ids: [this.action.taskRunExternalId],

sdks/typescript/src/v1/client/worker/worker-internal.ts

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import { WorkerLabels } from '@hatchet/clients/dispatcher/dispatcher-client';
3737
import { CreateStep, mapRateLimit, StepRunFunction } from '@hatchet/step';
3838
import { applyNamespace } from '@hatchet/util/apply-namespace';
3939
import sleep from '@hatchet/util/sleep';
40+
import { throwIfAborted } from '@hatchet/util/abort-error';
4041
import { Context, DurableContext } from './context';
4142
import { parentRunContextManager } from '../../parent-run-context-vars';
4243
import { HealthServer, workerStatus, type WorkerStatus } from './health-server';
@@ -544,7 +545,11 @@ export class V1Worker {
544545
desiredWorkerId: this.workerId || '',
545546
signal: context.abortController.signal,
546547
},
547-
() => step(context)
548+
() => {
549+
// Precheck: if cancellation already happened, don't execute user code.
550+
throwIfAborted(context.abortController.signal);
551+
return step(context);
552+
}
548553
);
549554
};
550555

@@ -641,6 +646,19 @@ export class V1Worker {
641646
await failure(e);
642647
return;
643648
}
649+
650+
// Postcheck: user code may swallow AbortError; don't report completion after cancellation.
651+
// If we reached this point and the signal is aborted, the task likely caught/ignored cancellation.
652+
if (context.abortController.signal.aborted) {
653+
this.logger.warn(
654+
`Cancellation: task run ${taskRunExternalId} returned after cancellation was signaled. ` +
655+
`This usually means an AbortError was caught and not propagated. ` +
656+
`See https://docs.hatchet.run/home/cancellation`
657+
);
658+
return;
659+
}
660+
throwIfAborted(context.abortController.signal);
661+
644662
await success(result);
645663
})()
646664
);
@@ -848,11 +866,9 @@ export class V1Worker {
848866
]);
849867

850868
if (winner === 'warn') {
851-
const elapsedSeconds = (Date.now() - start) / 1000;
869+
const milliseconds = Date.now() - start;
852870
this.logger.warn(
853-
`Cancellation: task run ${taskRunExternalId} has not cancelled after ${elapsedSeconds.toFixed(
854-
1
855-
)}s. Consider checking for blocking operations. ` +
871+
`Cancellation: task run ${taskRunExternalId} has not cancelled after ${milliseconds}ms. Consider checking for blocking operations. ` +
856872
`See https://docs.hatchet.run/home/cancellation`
857873
);
858874
}
@@ -870,10 +886,10 @@ export class V1Worker {
870886
this.logger.info(taskRunLog(taskName, taskRunExternalId, 'cancelled'));
871887
} else {
872888
const totalElapsedMs = Date.now() - start;
873-
this.logger.warn(
874-
`Cancellation: task run ${taskRunExternalId} still running after grace period ` +
875-
`${gracePeriodMs}ms (elapsed ${totalElapsedMs.toFixed(1)}ms). ` +
876-
`JavaScript cannot force-kill user code; ensure your tasks honor ctx.abortController.signal.`
889+
this.logger.error(
890+
`Cancellation: task run ${taskRunExternalId} still running after cancellation grace period ` +
891+
`${totalElapsedMs}ms.\n` +
892+
`JavaScript cannot force-kill user code; see: https://docs.hatchet.run/home/cancellation`
877893
);
878894
}
879895
}

sdks/typescript/src/v1/declaration.ts

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,26 +25,10 @@ import { MetricsClient } from './client/features/metrics';
2525
import { InputType, OutputType, UnknownInputType, JsonObject } from './types';
2626
import { Context, DurableContext } from './client/worker/context';
2727
import { parentRunContextManager } from './parent-run-context-vars';
28-
import { createAbortError } from '@hatchet/util/abort-error';
28+
import { throwIfAborted } from '@hatchet/util/abort-error';
2929

3030
const UNBOUND_ERR = new Error('workflow unbound to hatchet client, hint: use client.run instead');
3131

32-
function throwIfAborted(signal: AbortSignal | undefined): void {
33-
if (!signal?.aborted) {
34-
return;
35-
}
36-
37-
const reason = (signal as any).reason;
38-
39-
if (reason instanceof Error) {
40-
throw reason;
41-
}
42-
43-
throw createAbortError(
44-
typeof reason === 'string' && reason.length > 0 ? reason : 'Operation cancelled by AbortSignal'
45-
);
46-
}
47-
4832
// eslint-disable-next-line no-shadow
4933
export enum Priority {
5034
LOW = 1,
@@ -340,7 +324,13 @@ export class BaseWorkflowDeclaration<
340324

341325
// Precheck: if we're being called from a cancelled parent task, do not enqueue more work.
342326
// The signal is inherited from the parent task's `ctx.abortController.signal`.
343-
throwIfAborted(inheritedSignal);
327+
throwIfAborted(inheritedSignal, {
328+
isTrigger: true,
329+
context: parentRunContext?.parentTaskRunExternalId
330+
? `task run ${parentRunContext.parentTaskRunExternalId}`
331+
: undefined,
332+
warn: (message) => this.client!.admin.logger.warn(message),
333+
});
344334

345335
parentRunContextManager.incrementChildIndex(Array.isArray(input) ? input.length : 1);
346336

@@ -461,7 +451,10 @@ export class BaseWorkflowDeclaration<
461451
}
462452

463453
// If called from within a cancelled parent task, do not enqueue scheduled work.
464-
throwIfAborted(parentRunContextManager.getContext()?.signal);
454+
throwIfAborted(parentRunContextManager.getContext()?.signal, {
455+
isTrigger: true,
456+
warn: (message) => this.client!.admin.logger.warn(message),
457+
});
465458

466459
const scheduled = this.client.scheduled.create(this.definition.name, {
467460
triggerAt: enqueueAt,
@@ -506,7 +499,10 @@ export class BaseWorkflowDeclaration<
506499
}
507500

508501
// If called from within a cancelled parent task, do not enqueue cron work.
509-
throwIfAborted(parentRunContextManager.getContext()?.signal);
502+
throwIfAborted(parentRunContextManager.getContext()?.signal, {
503+
isTrigger: true,
504+
warn: (message) => this.client!.admin.logger.warn(message),
505+
});
510506

511507
const cronDef = this.client.crons.create(this.definition.name, {
512508
expression,

0 commit comments

Comments
 (0)