Skip to content

Commit 8f8f03a

Browse files
committed
precheck signal
1 parent ac17a0f commit 8f8f03a

File tree

7 files changed

+53
-18
lines changed

7 files changed

+53
-18
lines changed

sdks/typescript/src/clients/listeners/durable-listener/pooled-durable-listener-client.ts

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,14 @@ import {
88
} from '@hatchet/protoc/v1/dispatcher';
99
import { isAbortError } from 'abort-controller-x';
1010
import sleep from '@hatchet/util/sleep';
11+
import { createAbortError } from '@hatchet/util/abort-error';
1112
import {
1213
DurableEventListenerConditions,
1314
SleepMatchCondition,
1415
UserEventMatchCondition,
1516
} from '@hatchet/protoc/v1/shared/condition';
1617
import { DurableListenerClient } from './durable-listener-client';
1718

18-
function createAbortError(message = 'Operation aborted'): Error {
19-
const err: any = new Error(message);
20-
err.name = 'AbortError';
21-
err.code = 'ABORT_ERR';
22-
return err as Error;
23-
}
24-
2519
export class DurableEventStreamable {
2620
listener: AsyncIterable<DurableEvent>;
2721
taskId: string;

sdks/typescript/src/clients/listeners/run-listener/pooled-child-listener-client.ts

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,9 @@ import {
77
} from '@hatchet/protoc/dispatcher';
88
import { isAbortError } from 'abort-controller-x';
99
import sleep from '@hatchet/util/sleep';
10+
import { createAbortError } from '@hatchet/util/abort-error';
1011
import { RunListenerClient } from './child-listener-client';
1112

12-
function createAbortError(message = 'Operation aborted'): Error {
13-
const err: any = new Error(message);
14-
err.name = 'AbortError';
15-
err.code = 'ABORT_ERR';
16-
return err as Error;
17-
}
18-
1913
export class Streamable {
2014
listener: AsyncIterable<WorkflowRunEvent>;
2115
id: string;

sdks/typescript/src/clients/worker/worker.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,7 @@ export class V0Worker {
520520
if (future) {
521521
future.promise.catch(() => {
522522
this.logger.info(
523-
taskRunLog(action.taskName, action.taskRunExternalId, 'cancellation completed')
523+
taskRunLog(action.taskName, action.taskRunExternalId, 'cancelled')
524524
);
525525
});
526526
future.cancel('Cancelled by worker');
@@ -529,7 +529,7 @@ export class V0Worker {
529529
} catch (e: any) {
530530
// Expected: the promise rejects when cancelled
531531
this.logger.debug(
532-
taskRunLog(action.taskName, action.taskRunExternalId, 'cancellation completed')
532+
taskRunLog(action.taskName, action.taskRunExternalId, 'cancelled')
533533
);
534534
} finally {
535535
delete this.futures[createActionKey(action)];
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
export function createAbortError(message = 'Operation aborted'): Error {
2+
const err: any = new Error(message);
3+
err.name = 'AbortError';
4+
err.code = 'ABORT_ERR';
5+
return err as Error;
6+
}

sdks/typescript/src/v1/client/worker/context.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import { Action as ConditionAction } from '@hatchet/protoc/v1/shared/condition';
2121
import { HatchetClient } from '@hatchet/v1';
2222
import { ContextWorker, NextStep } from '@hatchet/step';
2323
import { applyNamespace } from '@hatchet/util/apply-namespace';
24+
import { createAbortError } from '@hatchet/util/abort-error';
2425
import { V1Worker } from './worker-internal';
2526
import { Duration } from '../duration';
2627

@@ -90,6 +91,12 @@ export class Context<T, K = {}> {
9091
return this.controller.signal.aborted;
9192
}
9293

94+
protected throwIfCancelled(): void {
95+
if (this.abortController.signal.aborted) {
96+
throw createAbortError('Operation cancelled by AbortSignal');
97+
}
98+
}
99+
93100
async cancel() {
94101
await this.v1.runs.cancel({
95102
ids: [this.action.taskRunExternalId],
@@ -359,6 +366,8 @@ export class Context<T, K = {}> {
359366
}
360367

361368
private spawnOptions(workflow: string | Workflow | WorkflowV1<any, any>, options?: ChildRunOpts) {
369+
this.throwIfCancelled();
370+
362371
let workflowName: string;
363372

364373
if (typeof workflow === 'string') {
@@ -410,6 +419,7 @@ export class Context<T, K = {}> {
410419
options?: ChildRunOpts;
411420
}>
412421
) {
422+
this.throwIfCancelled();
413423
const workflows: Parameters<typeof this.v1.admin.runWorkflows<Q, P>>[0] = children.map(
414424
(child) => {
415425
const { workflowName, opts } = this.spawnOptions(child.workflow, child.options);
@@ -595,6 +605,7 @@ export class Context<T, K = {}> {
595605
options?: ChildRunOpts;
596606
}>
597607
): Promise<WorkflowRunRef<P>[]> {
608+
this.throwIfCancelled();
598609
const { workflowRunId, taskRunExternalId } = this.action;
599610

600611
const workflowRuns = workflows.map(({ workflow, input, options }) => {
@@ -677,6 +688,7 @@ export class Context<T, K = {}> {
677688
input: Q,
678689
options?: ChildRunOpts
679690
): Promise<WorkflowRunRef<P>> {
691+
this.throwIfCancelled();
680692
const { workflowRunId, taskRunExternalId } = this.action;
681693

682694
let workflowName: string = '';
@@ -747,6 +759,7 @@ export class DurableContext<T, K = {}> extends Context<T, K> {
747759
* @returns A promise that resolves with the event that satisfied the conditions.
748760
*/
749761
async waitFor(conditions: Conditions | Conditions[]): Promise<Record<string, any>> {
762+
this.throwIfCancelled();
750763
const pbConditions = conditionsToPb(Render(ConditionAction.CREATE, conditions));
751764

752765
// eslint-disable-next-line no-plusplus

sdks/typescript/src/v1/client/worker/worker-internal.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -886,7 +886,7 @@ export class V1Worker {
886886
]);
887887

888888
if (winner === 'done') {
889-
this.logger.info(taskRunLog(taskName, taskRunExternalId, 'cancellation completed'));
889+
this.logger.info(taskRunLog(taskName, taskRunExternalId, 'cancelled'));
890890
} else {
891891
const totalElapsedMs = Date.now() - start;
892892
this.logger.warn(

sdks/typescript/src/v1/declaration.ts

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,26 @@ import { MetricsClient } from './client/features/metrics';
2525
import { InputType, OutputType, UnknownInputType, JsonObject } from './types';
2626
import { Context, DurableContext } from './client/worker/context';
2727
import { parentRunContextManager } from './parent-run-context-vars';
28+
import { createAbortError } from '@hatchet/util/abort-error';
2829

2930
const UNBOUND_ERR = new Error('workflow unbound to hatchet client, hint: use client.run instead');
3031

32+
function throwIfAborted(signal: AbortSignal | undefined): void {
33+
if (!signal?.aborted) {
34+
return;
35+
}
36+
37+
const reason = (signal as any).reason;
38+
39+
if (reason instanceof Error) {
40+
throw reason;
41+
}
42+
43+
throw createAbortError(
44+
typeof reason === 'string' && reason.length > 0 ? reason : 'Operation cancelled by AbortSignal'
45+
);
46+
}
47+
3148
// eslint-disable-next-line no-shadow
3249
export enum Priority {
3350
LOW = 1,
@@ -312,7 +329,6 @@ export class BaseWorkflowDeclaration<
312329

313330
// set the parent run context
314331
const parentRunContext = parentRunContextManager.getContext();
315-
parentRunContextManager.incrementChildIndex(Array.isArray(input) ? input.length : 1);
316332

317333
if (!parentRunContext && (options?.childKey || options?.sticky)) {
318334
this.client.admin.logger.warn(
@@ -322,6 +338,12 @@ export class BaseWorkflowDeclaration<
322338

323339
const inheritedSignal = parentRunContext?.signal;
324340

341+
// Precheck: if we're being called from a cancelled parent task, do not enqueue more work.
342+
// The signal is inherited from the parent task's `ctx.abortController.signal`.
343+
throwIfAborted(inheritedSignal);
344+
345+
parentRunContextManager.incrementChildIndex(Array.isArray(input) ? input.length : 1);
346+
325347
const runOpts = {
326348
...(options ?? {}),
327349
parentId: parentRunContext?.parentId,
@@ -438,6 +460,9 @@ export class BaseWorkflowDeclaration<
438460
throw UNBOUND_ERR;
439461
}
440462

463+
// If called from within a cancelled parent task, do not enqueue scheduled work.
464+
throwIfAborted(parentRunContextManager.getContext()?.signal);
465+
441466
const scheduled = this.client.scheduled.create(this.definition.name, {
442467
triggerAt: enqueueAt,
443468
input: input as JsonObject,
@@ -480,6 +505,9 @@ export class BaseWorkflowDeclaration<
480505
throw UNBOUND_ERR;
481506
}
482507

508+
// If called from within a cancelled parent task, do not enqueue cron work.
509+
throwIfAborted(parentRunContextManager.getContext()?.signal);
510+
483511
const cronDef = this.client.crons.create(this.definition.name, {
484512
expression,
485513
input: input as JsonObject,

0 commit comments

Comments
 (0)