diff --git a/frontend/docs/pages/home/cancellation.mdx b/frontend/docs/pages/home/cancellation.mdx index 18568b42e7..a4fe6c9897 100644 --- a/frontend/docs/pages/home/cancellation.mdx +++ b/frontend/docs/pages/home/cancellation.mdx @@ -33,6 +33,26 @@ When a task is canceled, Hatchet sends a cancellation signal to the task. The ta /> +### AbortError behavior + +Hatchet cancellation in TypeScript is driven by an internal `ctx.abortController.signal` (an `AbortSignal`). When a task is cancelled, Hatchet aborts that signal and cancellation-aware operations (like waiting on a child run result) will reject with an **`AbortError`**. + + + **Important:** JavaScript/TypeScript cannot make cancellation “uncatchable”. A broad `catch (e) { ... }` can swallow cancellation. Hatchet’s SDK will avoid enqueueing new child runs once the parent task is cancelled, and it will not report a cancelled task as “completed” even if user code catches the abort — but your code should still exit quickly to avoid wasted work. + + +If you must catch errors, re-throw abort/cancellation errors: + +```ts +try { + // ... work ... + await simple.run({}); +} catch (e) { + ctx.rethrowIfCancelled(e); + // ... other error handling ... +} +``` + diff --git a/sdks/typescript/.eslintrc.json b/sdks/typescript/.eslintrc.json index b3ede4100c..78a7273b2c 100644 --- a/sdks/typescript/.eslintrc.json +++ b/sdks/typescript/.eslintrc.json @@ -33,5 +33,21 @@ "class-methods-use-this": "off", "no-await-in-loop": "off", "no-restricted-syntax": "off" - } + }, + "overrides": [ + { + "files": [ + "src/**/examples/**/*.{ts,tsx,js}", + "src/examples/**/*.{ts,tsx,js}", + "tests/**/*.{ts,tsx,js}", + "src/**/*.test.{ts,tsx,js}", + "src/**/*.e2e.{ts,tsx,js}", + "src/**/__tests__/**/*.{ts,tsx,js}" + ], + "rules": { + "@typescript-eslint/no-unused-vars": "off", + "no-console": "off" + } + } + ] } diff --git a/sdks/typescript/src/clients/dispatcher/heartbeat/heartbeat-worker.ts b/sdks/typescript/src/clients/dispatcher/heartbeat/heartbeat-worker.ts index bbffcea29c..20d0f97678 100644 --- a/sdks/typescript/src/clients/dispatcher/heartbeat/heartbeat-worker.ts +++ b/sdks/typescript/src/clients/dispatcher/heartbeat/heartbeat-worker.ts @@ -66,7 +66,7 @@ class HeartbeatWorker { if (actualInterval > HEARTBEAT_INTERVAL * 1.2) { const message = `Heartbeat interval delay (${actualInterval}ms >> ${HEARTBEAT_INTERVAL}ms)`; - this.logger.warn(message); + this.logger.debug(message); postMessage({ type: 'warn', message, diff --git a/sdks/typescript/src/clients/hatchet-client/client-config.test.ts b/sdks/typescript/src/clients/hatchet-client/client-config.test.ts new file mode 100644 index 0000000000..3c0a3851ac --- /dev/null +++ b/sdks/typescript/src/clients/hatchet-client/client-config.test.ts @@ -0,0 +1,56 @@ +import { ClientConfigSchema } from './client-config'; + +function baseConfig() { + return { + token: 'token', + tls_config: {}, + host_port: 'localhost:7070', + api_url: 'http://localhost:8080', + tenant_id: 'tenant', + }; +} + +describe('ClientConfigSchema cancellation timing', () => { + it('applies defaults (milliseconds)', () => { + const cfg = ClientConfigSchema.parse(baseConfig()); + expect(cfg.cancellation_grace_period).toBe(1000); + expect(cfg.cancellation_warning_threshold).toBe(300); + }); + + it('accepts integer milliseconds', () => { + const cfg = ClientConfigSchema.parse({ + ...baseConfig(), + cancellation_grace_period: 2500, + cancellation_warning_threshold: 400, + }); + expect(cfg.cancellation_grace_period).toBe(2500); + expect(cfg.cancellation_warning_threshold).toBe(400); + }); + + it('rejects invalid values', () => { + expect(() => + ClientConfigSchema.parse({ + ...baseConfig(), + cancellation_grace_period: -1, + }) + ).toThrow(); + expect(() => + ClientConfigSchema.parse({ + ...baseConfig(), + cancellation_warning_threshold: 0.1, + }) + ).toThrow(); + expect(() => + ClientConfigSchema.parse({ + ...baseConfig(), + cancellation_warning_threshold: 'nope' as any, + }) + ).toThrow(); + expect(() => + ClientConfigSchema.parse({ + ...baseConfig(), + cancellation_grace_period: '7s' as any, + }) + ).toThrow(); + }); +}); diff --git a/sdks/typescript/src/clients/hatchet-client/client-config.ts b/sdks/typescript/src/clients/hatchet-client/client-config.ts index 48139772e3..7c282723b1 100644 --- a/sdks/typescript/src/clients/hatchet-client/client-config.ts +++ b/sdks/typescript/src/clients/hatchet-client/client-config.ts @@ -2,6 +2,9 @@ import { ChannelCredentials } from 'nice-grpc'; import { z } from 'zod'; import { Logger, LogLevel } from '@util/logger'; +// Cancellation timings are specified in integer milliseconds. +const DurationMsSchema = z.number().int().nonnegative().finite(); + const ClientTLSConfigSchema = z.object({ tls_strategy: z.enum(['tls', 'mtls', 'none']).optional(), cert_file: z.string().optional(), @@ -24,11 +27,25 @@ export const ClientConfigSchema = z.object({ log_level: z.enum(['OFF', 'DEBUG', 'INFO', 'WARN', 'ERROR']).optional(), tenant_id: z.string(), namespace: z.string().optional(), + cancellation_grace_period: DurationMsSchema.optional().default(1000), + cancellation_warning_threshold: DurationMsSchema.optional().default(300), }); export type LogConstructor = (context: string, logLevel?: LogLevel) => Logger; -export type ClientConfig = z.infer & { - credentials?: ChannelCredentials; -} & { logger: LogConstructor }; +type ClientConfigInferred = z.infer; + +// Backwards-compatible: allow callers to omit these (schema supplies defaults when parsed). +type ClientConfigCancellationCompat = { + cancellation_grace_period?: ClientConfigInferred['cancellation_grace_period']; + cancellation_warning_threshold?: ClientConfigInferred['cancellation_warning_threshold']; +}; + +export type ClientConfig = Omit< + ClientConfigInferred, + 'cancellation_grace_period' | 'cancellation_warning_threshold' +> & + ClientConfigCancellationCompat & { + credentials?: ChannelCredentials; + } & { logger: LogConstructor }; export type ClientTLSConfig = z.infer; diff --git a/sdks/typescript/src/clients/hatchet-client/hatchet-logger.ts b/sdks/typescript/src/clients/hatchet-client/hatchet-logger.ts index ddef363d17..31481abca5 100644 --- a/sdks/typescript/src/clients/hatchet-client/hatchet-logger.ts +++ b/sdks/typescript/src/clients/hatchet-client/hatchet-logger.ts @@ -72,6 +72,7 @@ export class HatchetLogger implements Logger { await this.log('ERROR', error ? `${message} ${error}` : message, '91'); } + // eslint-disable-next-line @typescript-eslint/no-unused-vars util(key: UtilKeys, message: string, extra?: LogExtra): void | Promise { if (key === 'trace') { this.log('INFO', `trace: ${message}`, '35'); diff --git a/sdks/typescript/src/clients/listeners/durable-listener/durable-listener-client.ts b/sdks/typescript/src/clients/listeners/durable-listener/durable-listener-client.ts index 93f019443e..85a3b90d66 100644 --- a/sdks/typescript/src/clients/listeners/durable-listener/durable-listener-client.ts +++ b/sdks/typescript/src/clients/listeners/durable-listener/durable-listener-client.ts @@ -33,6 +33,16 @@ export class DurableListenerClient { return this.pooledListener.subscribe(request); } + result(request: { taskId: string; signalKey: string }, opts?: { signal?: AbortSignal }) { + if (!this.pooledListener) { + this.pooledListener = new DurableEventGrpcPooledListener(this, () => { + this.pooledListener = undefined; + }); + } + + return this.pooledListener.result(request, opts); + } + registerDurableEvent(request: { taskId: string; signalKey: string; diff --git a/sdks/typescript/src/clients/listeners/durable-listener/pooled-durable-listener-client.test.ts b/sdks/typescript/src/clients/listeners/durable-listener/pooled-durable-listener-client.test.ts new file mode 100644 index 0000000000..06e6792c64 --- /dev/null +++ b/sdks/typescript/src/clients/listeners/durable-listener/pooled-durable-listener-client.test.ts @@ -0,0 +1,30 @@ +import { DurableEventStreamable } from './pooled-durable-listener-client'; + +const dummyListener: AsyncIterable = (async function* gen() { + // never yields +})(); + +describe('DurableEventStreamable.get cancellation', () => { + it('rejects with AbortError and runs cleanup when aborted', async () => { + const cleanup = jest.fn(); + const s = new DurableEventStreamable(dummyListener, 'task', 'key', 'sub-1', cleanup); + const ac = new AbortController(); + + const p = s.get({ signal: ac.signal }); + ac.abort(); + + await expect(p).rejects.toMatchObject({ name: 'AbortError' }); + expect(cleanup).toHaveBeenCalledTimes(1); + }); + + it('resolves on response and runs cleanup once', async () => { + const cleanup = jest.fn(); + const s = new DurableEventStreamable(dummyListener, 'task', 'key', 'sub-1', cleanup); + + const event: any = { taskId: 'task', signalKey: 'key', data: '{}' }; + setTimeout(() => s.responseEmitter.emit('response', event), 0); + + await expect(s.get()).resolves.toEqual(event); + expect(cleanup).toHaveBeenCalledTimes(1); + }); +}); diff --git a/sdks/typescript/src/clients/listeners/durable-listener/pooled-durable-listener-client.ts b/sdks/typescript/src/clients/listeners/durable-listener/pooled-durable-listener-client.ts index b29c60acfa..67b3d8522d 100644 --- a/sdks/typescript/src/clients/listeners/durable-listener/pooled-durable-listener-client.ts +++ b/sdks/typescript/src/clients/listeners/durable-listener/pooled-durable-listener-client.ts @@ -8,6 +8,7 @@ import { } from '@hatchet/protoc/v1/dispatcher'; import { isAbortError } from 'abort-controller-x'; import sleep from '@hatchet/util/sleep'; +import { createAbortError } from '@hatchet/util/abort-error'; import { DurableEventListenerConditions, SleepMatchCondition, @@ -19,18 +20,60 @@ export class DurableEventStreamable { listener: AsyncIterable; taskId: string; signalKey: string; + subscriptionId: string; + onCleanup: () => void; responseEmitter = new EventEmitter(); - constructor(listener: AsyncIterable, taskId: string, signalKey: string) { + constructor( + listener: AsyncIterable, + taskId: string, + signalKey: string, + subscriptionId: string, + onCleanup: () => void + ) { this.listener = listener; this.taskId = taskId; this.signalKey = signalKey; + this.subscriptionId = subscriptionId; + this.onCleanup = onCleanup; } - async get(): Promise { - return new Promise((resolve) => { - this.responseEmitter.once('response', resolve); + async get(opts?: { signal?: AbortSignal }): Promise { + const signal = opts?.signal; + + return new Promise((resolve, reject) => { + let cleanedUp = false; + + const cleanup = () => { + if (cleanedUp) return; + cleanedUp = true; + this.responseEmitter.removeListener('response', onResponse); + if (signal) { + signal.removeEventListener('abort', onAbort); + } + this.onCleanup(); + }; + + const onResponse = (event: DurableEvent) => { + cleanup(); + resolve(event); + }; + + const onAbort = () => { + cleanup(); + reject(createAbortError('Operation cancelled by AbortSignal')); + }; + + if (signal?.aborted) { + onAbort(); + return; + } + + this.responseEmitter.once('response', onResponse); + if (signal) { + signal.addEventListener('abort', onAbort, { once: true }); + } }); } } @@ -106,15 +149,7 @@ export class DurableEventGrpcPooledListener { const emitter = this.subscribers[subId]; if (emitter) { emitter.responseEmitter.emit('response', event); - delete this.subscribers[subId]; - - // Remove this subscription from the mapping - this.taskSignalKeyToSubscriptionIds[subscriptionKey] = - this.taskSignalKeyToSubscriptionIds[subscriptionKey].filter((id) => id !== subId); - - if (this.taskSignalKeyToSubscriptionIds[subscriptionKey].length === 0) { - delete this.taskSignalKeyToSubscriptionIds[subscriptionKey]; - } + this.cleanupSubscription(subId); } } } @@ -138,6 +173,28 @@ export class DurableEventGrpcPooledListener { } } + private cleanupSubscription(subscriptionId: string) { + const emitter = this.subscribers[subscriptionId]; + if (!emitter) { + return; + } + + const subscriptionKey = keyHelper(emitter.taskId, emitter.signalKey); + + delete this.subscribers[subscriptionId]; + + // Remove from the mapping + if (this.taskSignalKeyToSubscriptionIds[subscriptionKey]) { + this.taskSignalKeyToSubscriptionIds[subscriptionKey] = this.taskSignalKeyToSubscriptionIds[ + subscriptionKey + ].filter((id) => id !== subscriptionId); + + if (this.taskSignalKeyToSubscriptionIds[subscriptionKey].length === 0) { + delete this.taskSignalKeyToSubscriptionIds[subscriptionKey]; + } + } + } + subscribe(request: { taskId: string; signalKey: string }): DurableEventStreamable { const { taskId, signalKey } = request; @@ -145,7 +202,13 @@ export class DurableEventGrpcPooledListener { // eslint-disable-next-line no-plusplus const subscriptionId = (this.subscriptionCounter++).toString(); - const subscriber = new DurableEventStreamable(this.listener, taskId, signalKey); + const subscriber = new DurableEventStreamable( + this.listener, + taskId, + signalKey, + subscriptionId, + () => this.cleanupSubscription(subscriptionId) + ); this.subscribers[subscriptionId] = subscriber; @@ -159,9 +222,12 @@ export class DurableEventGrpcPooledListener { return subscriber; } - async result(request: { taskId: string; signalKey: string }): Promise { + async result( + request: { taskId: string; signalKey: string }, + opts?: { signal?: AbortSignal } + ): Promise { const subscriber = this.subscribe(request); - const event = await subscriber.get(); + const event = await subscriber.get({ signal: opts?.signal }); return event; } @@ -189,7 +255,7 @@ export class DurableEventGrpcPooledListener { const subscriptionEntries = Object.entries(this.taskSignalKeyToSubscriptionIds); this.client.logger.debug(`Replaying ${subscriptionEntries.length} requests...`); - for (const [key, _] of subscriptionEntries) { + for (const [key] of subscriptionEntries) { const [taskId, signalKey] = key.split('|'); this.requestEmitter.emit('subscribe', { taskId, signalKey }); } diff --git a/sdks/typescript/src/clients/listeners/run-listener/child-listener-client.ts b/sdks/typescript/src/clients/listeners/run-listener/child-listener-client.ts index e5d020d4c1..a104126e87 100644 --- a/sdks/typescript/src/clients/listeners/run-listener/child-listener-client.ts +++ b/sdks/typescript/src/clients/listeners/run-listener/child-listener-client.ts @@ -174,6 +174,7 @@ export class RunEventListener { this.eventEmitter.emit('event'); }); + // eslint-disable-next-line @typescript-eslint/no-unused-vars for await (const _ of on(this.eventEmitter, 'event')) { while (this.q.length > 0) { const r = this.q.shift(); diff --git a/sdks/typescript/src/clients/listeners/run-listener/pooled-child-listener-client.test.ts b/sdks/typescript/src/clients/listeners/run-listener/pooled-child-listener-client.test.ts new file mode 100644 index 0000000000..f5106cddeb --- /dev/null +++ b/sdks/typescript/src/clients/listeners/run-listener/pooled-child-listener-client.test.ts @@ -0,0 +1,16 @@ +import { Streamable } from './pooled-child-listener-client'; + +describe('RunGrpcPooledListener Streamable', () => { + it('rejects with AbortError and runs cleanup when aborted', async () => { + const onCleanup = jest.fn(); + // eslint-disable-next-line func-names, no-empty-function + const streamable = new Streamable((async function* () {})(), 'run-1', onCleanup); + + const ac = new AbortController(); + const p = streamable.get({ signal: ac.signal }); + ac.abort(); + + await expect(p).rejects.toMatchObject({ name: 'AbortError' }); + expect(onCleanup).toHaveBeenCalled(); + }); +}); diff --git a/sdks/typescript/src/clients/listeners/run-listener/pooled-child-listener-client.ts b/sdks/typescript/src/clients/listeners/run-listener/pooled-child-listener-client.ts index 8d9f348e66..88b3c5cee2 100644 --- a/sdks/typescript/src/clients/listeners/run-listener/pooled-child-listener-client.ts +++ b/sdks/typescript/src/clients/listeners/run-listener/pooled-child-listener-client.ts @@ -7,25 +7,72 @@ import { } from '@hatchet/protoc/dispatcher'; import { isAbortError } from 'abort-controller-x'; import sleep from '@hatchet/util/sleep'; +import { createAbortError } from '@hatchet/util/abort-error'; import { RunListenerClient } from './child-listener-client'; export class Streamable { listener: AsyncIterable; id: string; + onCleanup: () => void; + private cleanedUp = false; responseEmitter = new EventEmitter(); - constructor(listener: AsyncIterable, id: string) { + constructor(listener: AsyncIterable, id: string, onCleanup: () => void) { this.listener = listener; this.id = id; + this.onCleanup = onCleanup; } - async *stream(): AsyncGenerator { + private cleanupOnce() { + if (this.cleanedUp) return; + this.cleanedUp = true; + this.onCleanup(); + } + + async get(opts?: { signal?: AbortSignal }): Promise { + const signal = opts?.signal; + + return new Promise((resolve, reject) => { + const cleanupListeners = () => { + this.responseEmitter.removeListener('response', onResponse); + if (signal) { + signal.removeEventListener('abort', onAbort); + } + }; + + const onResponse = (event: WorkflowRunEvent) => { + cleanupListeners(); + resolve(event); + }; + + const onAbort = () => { + cleanupListeners(); + this.cleanupOnce(); + reject(createAbortError('Operation cancelled by AbortSignal')); + }; + + if (signal?.aborted) { + onAbort(); + return; + } + + this.responseEmitter.once('response', onResponse); + if (signal) { + signal.addEventListener('abort', onAbort, { once: true }); + } + }); + } + + async *stream(opts?: { signal?: AbortSignal }): AsyncGenerator { while (true) { - const req: WorkflowRunEvent = await new Promise((resolve) => { - this.responseEmitter.once('response', resolve); - }); - yield req; + const event = await this.get(opts); + yield event; + + if (event.eventType === WorkflowRunEventType.WORKFLOW_RUN_EVENT_TYPE_FINISHED) { + this.cleanupOnce(); + break; + } } } } @@ -99,7 +146,13 @@ export class RunGrpcPooledListener { subscribe(request: SubscribeToWorkflowRunsRequest) { if (!this.listener) throw new Error('listener not initialized'); - this.subscribers[request.workflowRunId] = new Streamable(this.listener, request.workflowRunId); + this.subscribers[request.workflowRunId] = new Streamable( + this.listener, + request.workflowRunId, + () => { + delete this.subscribers[request.workflowRunId]; + } + ); this.requestEmitter.emit('subscribe', request); return this.subscribers[request.workflowRunId]; } diff --git a/sdks/typescript/src/clients/worker/worker.ts b/sdks/typescript/src/clients/worker/worker.ts index b54bdffb73..adf8e49362 100644 --- a/sdks/typescript/src/clients/worker/worker.ts +++ b/sdks/typescript/src/clients/worker/worker.ts @@ -25,12 +25,13 @@ import { DesiredWorkerLabels, WorkflowConcurrencyOpts, } from '@hatchet/protoc/workflows'; -import { Logger } from '@hatchet/util/logger'; +import { actionMap, Logger, taskRunLog } from '@hatchet/util/logger'; import { WebhookHandler } from '@clients/worker/handler'; import { WebhookWorkerCreateRequest } from '@clients/rest/generated/data-contracts'; import { WorkflowDefinition } from '@hatchet/v1'; -import { CreateWorkflowTaskOpts, NonRetryableError } from '@hatchet/v1/task'; +import { NonRetryableError } from '@hatchet/v1/task'; import { applyNamespace } from '@hatchet/util/apply-namespace'; +import { throwIfAborted } from '@hatchet/util/abort-error'; import { V0Context, CreateStep, V0DurableContext, mapRateLimit, StepRunFunction } from '../../step'; import { WorkerLabels } from '../dispatcher/dispatcher-client'; @@ -43,6 +44,7 @@ export interface WorkerOpts { labels?: WorkerLabels; } +// TODO: can i nerf this now... export class V0Worker { client: LegacyHatchetClient; name: string; @@ -236,7 +238,7 @@ export class V0Worker { } async handleStartStepRun(action: Action) { - const { actionId } = action; + const { actionId, taskName, taskRunExternalId } = action; try { // Note: we always use a DurableContext since its a superset of the Context class @@ -252,11 +254,17 @@ export class V0Worker { } const run = async () => { + // Precheck: if cancellation already happened, don't execute user code. + throwIfAborted(context.controller?.signal); return step(context); }; const success = async (result: any) => { - this.logger.info(`Task run ${action.taskRunExternalId} succeeded`); + // If cancellation happened, do not report completion. + if (context.controller?.signal?.aborted) { + return; + } + this.logger.info(taskRunLog(taskName, taskRunExternalId, 'completed')); try { // Send the action event to the dispatcher @@ -301,7 +309,11 @@ export class V0Worker { }; const failure = async (error: any) => { - this.logger.error(`Task run ${action.taskRunExternalId} failed: ${error.message}`); + // If cancellation happened, do not report failure. + if (context.controller?.signal?.aborted) { + return; + } + this.logger.error(taskRunLog(taskName, taskRunExternalId, `failed: ${error.message}`)); if (error.stack) { this.logger.error(error.stack); @@ -340,6 +352,19 @@ export class V0Worker { await failure(e); return; } + + // Postcheck: user code may swallow AbortError; don't report completion after cancellation. + // If we reached this point and the signal is aborted, the task likely caught/ignored cancellation. + if (context.controller?.signal?.aborted) { + this.logger.warn( + `Cancellation: task run ${taskRunExternalId} returned after cancellation was signaled. ` + + `This usually means an AbortError was caught and not propagated. ` + + `See https://docs.hatchet.run/home/cancellation` + ); + return; + } + throwIfAborted(context.controller?.signal); + await success(result); })() ); @@ -362,7 +387,7 @@ export class V0Worker { } catch (e: any) { const message = e?.message || String(e); if (message.includes('Cancelled')) { - this.logger.debug(`Task run ${action.taskRunExternalId} was cancelled`); + this.logger.debug(taskRunLog(taskName, taskRunExternalId, 'was cancelled')); } else { this.logger.error( `Could not wait for task run ${action.taskRunExternalId} to finish. ` + @@ -377,7 +402,7 @@ export class V0Worker { } async handleStartGroupKeyRun(action: Action) { - const { actionId } = action; + const { actionId, taskName, taskRunExternalId } = action; try { const context = new V0Context(action, this.client, this); @@ -405,7 +430,7 @@ export class V0Worker { }; const success = (result: any) => { - this.logger.info(`Task run ${action.taskRunExternalId} succeeded`); + this.logger.info(taskRunLog(taskName, taskRunExternalId, 'succeeded')); try { // Send the action event to the dispatcher @@ -427,7 +452,7 @@ export class V0Worker { }; const failure = (error: any) => { - this.logger.error(`Task run ${key} failed: ${error.message}`); + this.logger.error(taskRunLog(taskName, taskRunExternalId, `failed: ${error.message}`)); try { // Send the action event to the dispatcher @@ -508,9 +533,7 @@ export class V0Worker { } async handleCancelStepRun(action: Action) { - const { taskRunExternalId } = action; try { - this.logger.info(`Cancelling task run ${action.taskRunExternalId}`); const future = this.futures[createActionKey(action)]; const context = this.contexts[createActionKey(action)]; @@ -520,14 +543,14 @@ export class V0Worker { if (future) { future.promise.catch(() => { - this.logger.info(`Cancelled task run ${action.taskRunExternalId}`); + this.logger.info(taskRunLog(action.taskName, action.taskRunExternalId, 'cancelled')); }); future.cancel('Cancelled by worker'); await future.promise; } } catch (e: any) { // Expected: the promise rejects when cancelled - this.logger.debug(`Task run ${taskRunExternalId} cancellation completed`); + this.logger.debug(taskRunLog(action.taskName, action.taskRunExternalId, 'cancelled')); } finally { delete this.futures[createActionKey(action)]; delete this.contexts[createActionKey(action)]; @@ -586,8 +609,9 @@ export class V0Worker { this.logger.info(`Worker ${this.name} listening for actions`); for await (const action of generator) { + const receivedType = actionMap(action.actionType); this.logger.info( - `Worker ${this.name} received action ${action.actionId}:${action.actionType}` + taskRunLog(action.taskName, action.taskRunExternalId, `received action ${receivedType}`) ); void this.handleAction(action); @@ -684,18 +708,3 @@ function toPbWorkerLabel( {} as Record ); } - -function onFailureTaskName(workflow: WorkflowDefinition) { - return `${workflow.name}:on-failure-task`; -} - -function getLeaves(tasks: CreateWorkflowTaskOpts[]): CreateWorkflowTaskOpts[] { - return tasks.filter((task) => isLeafTask(task, tasks)); -} - -function isLeafTask( - task: CreateWorkflowTaskOpts, - allTasks: CreateWorkflowTaskOpts[] -): boolean { - return !allTasks.some((t) => t.parents?.some((p) => p.name === task.name)); -} diff --git a/sdks/typescript/src/step.ts b/sdks/typescript/src/step.ts index 2f5e8bb58a..a6fce4a6c1 100644 --- a/sdks/typescript/src/step.ts +++ b/sdks/typescript/src/step.ts @@ -655,13 +655,13 @@ export class V0DurableContext extends V0Context { sleepConditions: pbConditions.sleepConditions, userEventConditions: pbConditions.userEventConditions, }); - - const listener = this.v0.durableListener.subscribe({ - taskId: this.action.taskRunExternalId, - signalKey: key, - }); - - const event = await listener.get(); + const event = await this.v0.durableListener.result( + { + taskId: this.action.taskRunExternalId, + signalKey: key, + }, + { signal: this.abortController.signal } + ); // Convert event.data from Uint8Array to string if needed const eventData = @@ -758,8 +758,9 @@ export function mapRateLimit(limits: CreateStep['rate_limits']): Creat } // Helper function to validate CEL expressions -function validateCelExpression(expr: string): boolean { - // This is a placeholder. In a real implementation, you'd need to use a CEL parser or validator. +// eslint-disable-next-line @typescript-eslint/no-unused-vars +function validateCelExpression(_expr: string): boolean { + // FIXME: this is a placeholder. In a real implementation, you'd need to use a CEL parser or validator. // For now, we'll just return true to mimic the behavior. return true; } diff --git a/sdks/typescript/src/util/abort-error.ts b/sdks/typescript/src/util/abort-error.ts new file mode 100644 index 0000000000..0c3864c737 --- /dev/null +++ b/sdks/typescript/src/util/abort-error.ts @@ -0,0 +1,89 @@ +export function createAbortError(message = 'Operation aborted'): Error { + const err: any = new Error(message); + err.name = 'AbortError'; + err.code = 'ABORT_ERR'; + return err as Error; +} + +export function isAbortError(err: unknown): err is Error { + return err instanceof Error && (err.name === 'AbortError' || (err as any).code === 'ABORT_ERR'); +} + +/** + * Helper to be used inside broad `catch` blocks so cancellation isn't accidentally swallowed. + * + * Example: + * ```ts + * try { ... } catch (e) { rethrowIfAborted(e); ... } + * ``` + */ +export function rethrowIfAborted(err: unknown): void { + if (isAbortError(err)) { + throw err; + } +} + +export type ThrowIfAbortedOpts = { + /** + * Optional: called before throwing when the signal is aborted. + * This lets callsites attach logging without coupling this util to a logger implementation. + */ + warn?: (message: string) => void; + + /** + * If true, emits a generic warning intended for "trigger/enqueue" paths. + */ + isTrigger?: boolean; + + /** + * Optional context used to make warnings consistent, e.g. "task run ". + */ + context?: string; + + /** + * Message used when the AbortSignal doesn't provide a reason. + */ + defaultMessage?: string; +}; + +/** + * Throws an AbortError if the provided signal is aborted. + * + * Notes: + * - In JS/TS, `catch` can swallow any thrown value, so this is best-effort. + * - We prefer throwing the signal's `reason` when it is already an Error. + */ +export function throwIfAborted( + signal: AbortSignal | undefined, + optsOrDefaultMessage: ThrowIfAbortedOpts | string = 'Operation cancelled by AbortSignal' +): void { + if (!signal?.aborted) { + return; + } + + const opts: ThrowIfAbortedOpts = + typeof optsOrDefaultMessage === 'string' + ? { defaultMessage: optsOrDefaultMessage } + : (optsOrDefaultMessage ?? {}); + + if (opts.isTrigger) { + const ctx = opts.context ? `${opts.context} ` : ''; + opts.warn?.( + `Cancellation: ${ctx}attempted to enqueue/trigger work after cancellation was signaled. ` + + `This usually means an AbortError was caught and not propagated. ` + + `See https://docs.hatchet.run/home/cancellation` + ); + } + + const { reason } = signal as any; + + if (reason instanceof Error) { + throw reason; + } + + if (typeof reason === 'string' && reason.length > 0) { + throw createAbortError(reason); + } + + throw createAbortError(opts.defaultMessage ?? 'Operation cancelled by AbortSignal'); +} diff --git a/sdks/typescript/src/util/config-loader/config-loader.test.ts b/sdks/typescript/src/util/config-loader/config-loader.test.ts index ed07af0a3e..c9443ce60c 100644 --- a/sdks/typescript/src/util/config-loader/config-loader.test.ts +++ b/sdks/typescript/src/util/config-loader/config-loader.test.ts @@ -1,6 +1,6 @@ import { ConfigLoader } from './config-loader'; -fdescribe('ConfigLoader', () => { +describe('ConfigLoader', () => { beforeEach(() => { process.env.HATCHET_CLIENT_TOKEN = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJncnBjX2Jyb2FkY2FzdF9hZGRyZXNzIjoiMTI3LjAuMC4xOjgwODAiLCJzZXJ2ZXJfdXJsIjoiaHR0cDovL2xvY2FsaG9zdDo4MDgwIiwic3ViIjoiNzA3ZDA4NTUtODBhYi00ZTFmLWExNTYtZjFjNDU0NmNiZjUyIn0K.abcdef'; diff --git a/sdks/typescript/src/util/hatchet-promise/hatchet-promise.ts b/sdks/typescript/src/util/hatchet-promise/hatchet-promise.ts index d1912cb603..03298cda3c 100644 --- a/sdks/typescript/src/util/hatchet-promise/hatchet-promise.ts +++ b/sdks/typescript/src/util/hatchet-promise/hatchet-promise.ts @@ -1,12 +1,20 @@ class HatchetPromise { // eslint-disable-next-line @typescript-eslint/no-unused-vars - cancel: Function = (reason: any) => {}; + cancel: (reason?: any) => void = (_reason?: any) => {}; promise: Promise; + /** + * The original (non-cancelable) promise passed to the constructor. + * + * `promise` is a cancelable wrapper which rejects immediately when `cancel` is called. + * `inner` continues executing and will settle when the underlying work completes. + */ + inner: Promise; constructor(promise: Promise) { + this.inner = Promise.resolve(promise) as Promise; this.promise = new Promise((resolve, reject) => { this.cancel = reject; - Promise.resolve(promise).then(resolve).catch(reject); + this.inner.then(resolve).catch(reject); }); } } diff --git a/sdks/typescript/src/util/logger/index.ts b/sdks/typescript/src/util/logger/index.ts index 1ff09efd40..9a1b758a4a 100644 --- a/sdks/typescript/src/util/logger/index.ts +++ b/sdks/typescript/src/util/logger/index.ts @@ -1 +1,2 @@ export * from './logger'; +export * from './task-run-log'; diff --git a/sdks/typescript/src/util/logger/task-run-log.ts b/sdks/typescript/src/util/logger/task-run-log.ts new file mode 100644 index 0000000000..d558d845d5 --- /dev/null +++ b/sdks/typescript/src/util/logger/task-run-log.ts @@ -0,0 +1,18 @@ +import { ActionType } from '../../protoc/dispatcher'; + +export function actionMap(action: ActionType): string { + switch (action) { + case ActionType.START_STEP_RUN: + return 'starting...'; + case ActionType.CANCEL_STEP_RUN: + return 'cancelling...'; + case ActionType.START_GET_GROUP_KEY: + return 'starting to get group key...'; + default: + return 'unknown'; + } +} + +export function taskRunLog(taskName: string, taskRunExternalId: string, action: string): string { + return `Task run ${action} \t ${taskName}/${taskRunExternalId} `; +} diff --git a/sdks/typescript/src/util/workflow-run-ref.ts b/sdks/typescript/src/util/workflow-run-ref.ts index 504301484c..8f7bdf7d32 100644 --- a/sdks/typescript/src/util/workflow-run-ref.ts +++ b/sdks/typescript/src/util/workflow-run-ref.ts @@ -53,6 +53,12 @@ export default class WorkflowRunRef { private client: RunListenerClient; private runs: RunsClient | undefined; _standaloneTaskName?: string; + /** + * Optional default AbortSignal used for listener-backed waits (e.g. `.result()`). + * This is primarily set when a run is spawned from within a task so cancellations propagate + * without manually threading `{ signal }` everywhere. + */ + defaultSignal?: AbortSignal; constructor( workflowRunId: @@ -64,13 +70,15 @@ export default class WorkflowRunRef { client: RunListenerClient, runsClient?: RunsClient, parentWorkflowRunId?: string, - standaloneTaskName?: string + standaloneTaskName?: string, + defaultSignal?: AbortSignal ) { this.workflowRunId = workflowRunId; this.parentWorkflowRunId = parentWorkflowRunId; this.client = client; this.runs = runsClient; this._standaloneTaskName = standaloneTaskName; + this.defaultSignal = defaultSignal; } // TODO docstrings @@ -103,7 +111,8 @@ export default class WorkflowRunRef { return new Promise((resolve, reject) => { (async () => { - for await (const event of streamable.stream()) { + const signal = this.defaultSignal; + for await (const event of streamable.stream({ signal })) { if (event.eventType === WorkflowRunEventType.WORKFLOW_RUN_EVENT_TYPE_FINISHED) { if (event.results.some((r) => r.error !== undefined)) { // HACK: this might replace intentional empty errors but this is the more common case @@ -168,7 +177,7 @@ export default class WorkflowRunRef { return; } } - })(); + })().catch(reject); }); } diff --git a/sdks/typescript/src/v1/client/worker/context.ts b/sdks/typescript/src/v1/client/worker/context.ts index c35703f3a6..24d75c9cbc 100644 --- a/sdks/typescript/src/v1/client/worker/context.ts +++ b/sdks/typescript/src/v1/client/worker/context.ts @@ -21,6 +21,7 @@ import { Action as ConditionAction } from '@hatchet/protoc/v1/shared/condition'; import { HatchetClient } from '@hatchet/v1'; import { ContextWorker, NextStep } from '@hatchet/step'; import { applyNamespace } from '@hatchet/util/apply-namespace'; +import { createAbortError, rethrowIfAborted } from '@hatchet/util/abort-error'; import { V1Worker } from './worker-internal'; import { Duration } from '../duration'; @@ -90,6 +91,24 @@ export class Context { return this.controller.signal.aborted; } + protected throwIfCancelled(): void { + if (this.abortController.signal.aborted) { + throw createAbortError('Operation cancelled by AbortSignal'); + } + } + + /** + * Helper for broad `catch` blocks so cancellation isn't accidentally swallowed. + * + * Example: + * ```ts + * try { ... } catch (e) { ctx.rethrowIfCancelled(e); ... } + * ``` + */ + rethrowIfCancelled(err: unknown): void { + rethrowIfAborted(err); + } + async cancel() { await this.v1.runs.cancel({ ids: [this.action.taskRunExternalId], @@ -359,6 +378,8 @@ export class Context { } private spawnOptions(workflow: string | Workflow | WorkflowV1, options?: ChildRunOpts) { + this.throwIfCancelled(); + let workflowName: string; if (typeof workflow === 'string') { @@ -379,7 +400,7 @@ export class Context { const { workflowRunId, taskRunExternalId } = this.action; const finalOpts = { - ...options, + ...opts, parentId: workflowRunId, parentTaskRunExternalId: taskRunExternalId, childIndex: this.spawnIndex, @@ -410,6 +431,7 @@ export class Context { options?: ChildRunOpts; }> ) { + this.throwIfCancelled(); const workflows: Parameters>[0] = children.map( (child) => { const { workflowName, opts } = this.spawnOptions(child.workflow, child.options); @@ -432,7 +454,12 @@ export class Context { options?: ChildRunOpts; }> ): Promise[]> { - return this.spawnBulk(children); + const refs = await this.spawnBulk(children); + refs.forEach((ref) => { + // eslint-disable-next-line no-param-reassign + ref.defaultSignal = this.abortController.signal; + }); + return refs; } /** @@ -465,6 +492,9 @@ export class Context { options?: ChildRunOpts ): Promise

{ const run = await this.spawn(workflow, input, options); + // Ensure waiting for the child result aborts when this task is cancelled. + // eslint-disable-next-line no-param-reassign + run.defaultSignal = this.abortController.signal; return run.output; } @@ -482,6 +512,7 @@ export class Context { options?: ChildRunOpts ): Promise> { const ref = await this.spawn(workflow, input, options); + ref.defaultSignal = this.abortController.signal; return ref; } @@ -586,6 +617,7 @@ export class Context { options?: ChildRunOpts; }> ): Promise[]> { + this.throwIfCancelled(); const { workflowRunId, taskRunExternalId } = this.action; const workflowRuns = workflows.map(({ workflow, input, options }) => { @@ -608,11 +640,16 @@ export class Context { ); } + // `signal` must never be sent over the wire. + const optsWithoutSignal: Omit & { signal?: never } = { ...opts }; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + delete (optsWithoutSignal as any).signal; + const resp = { workflowName: name, input, options: { - ...opts, + ...optsWithoutSignal, parentId: workflowRunId, parentTaskRunExternalId: taskRunExternalId, childIndex: this.spawnIndex, @@ -663,6 +700,7 @@ export class Context { input: Q, options?: ChildRunOpts ): Promise> { + this.throwIfCancelled(); const { workflowRunId, taskRunExternalId } = this.action; let workflowName: string = ''; @@ -733,6 +771,7 @@ export class DurableContext extends Context { * @returns A promise that resolves with the event that satisfied the conditions. */ async waitFor(conditions: Conditions | Conditions[]): Promise> { + this.throwIfCancelled(); const pbConditions = conditionsToPb(Render(ConditionAction.CREATE, conditions)); // eslint-disable-next-line no-plusplus @@ -743,13 +782,13 @@ export class DurableContext extends Context { sleepConditions: pbConditions.sleepConditions, userEventConditions: pbConditions.userEventConditions, }); - - const listener = this.v1._v0.durableListener.subscribe({ - taskId: this.action.taskRunExternalId, - signalKey: key, - }); - - const event = await listener.get(); + const event = await this.v1._v0.durableListener.result( + { + taskId: this.action.taskRunExternalId, + signalKey: key, + }, + { signal: this.abortController.signal } + ); // Convert event.data from Uint8Array to string if needed const eventData = diff --git a/sdks/typescript/src/v1/client/worker/worker-cancel-supervision.test.ts b/sdks/typescript/src/v1/client/worker/worker-cancel-supervision.test.ts new file mode 100644 index 0000000000..6ff90fbd11 --- /dev/null +++ b/sdks/typescript/src/v1/client/worker/worker-cancel-supervision.test.ts @@ -0,0 +1,94 @@ +import { V1Worker } from '@hatchet/v1/client/worker/worker-internal'; +import HatchetPromise from '@util/hatchet-promise/hatchet-promise'; + +describe('V1Worker handleCancelStepRun cancellation supervision', () => { + beforeEach(() => { + jest.useFakeTimers(); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + it('logs warnings after threshold and grace period, then returns', async () => { + const logger = { + info: jest.fn(), + warn: jest.fn(), + debug: jest.fn(), + error: jest.fn(), + }; + + const taskExternalId = 'task-1'; + + // Use the real HatchetPromise behavior: cancel rejects the wrapper immediately, + // while the underlying work (`inner`) continues. + const inner = new Promise(() => { + // never resolves + }); + const future = new HatchetPromise(inner); + const originalCancel = future.cancel; + const cancelSpy = jest.fn((reason: any) => originalCancel(reason)); + future.cancel = cancelSpy; + + const ctx = { + abortController: new AbortController(), + }; + + const fakeThis: any = { + logger, + client: { + config: { + cancellation_warning_threshold: 300, + cancellation_grace_period: 1000, + }, + }, + cancellingTaskRuns: new Set(), + futures: { [taskExternalId]: future }, + contexts: { [taskExternalId]: ctx }, + }; + + const action: any = { taskRunExternalId: taskExternalId }; + + const p = V1Worker.prototype.handleCancelStepRun.call(fakeThis, action); + + await jest.advanceTimersByTimeAsync(1500); + await p; + + expect(ctx.abortController.signal.aborted).toBe(true); + expect(cancelSpy).toHaveBeenCalled(); + expect(logger.warn).toHaveBeenCalled(); + + expect(fakeThis.futures[taskExternalId]).toBeUndefined(); + expect(fakeThis.contexts[taskExternalId]).toBeUndefined(); + }); + + it('suppresses "was cancelled" debug log when cancellation is supervised', async () => { + const logger = { + info: jest.fn(), + warn: jest.fn(), + debug: jest.fn(), + error: jest.fn(), + }; + + const taskExternalId = 'task-2'; + + const fakeThis: any = { + logger, + cancellingTaskRuns: new Set([taskExternalId]), + }; + + // Reproduce the log suppression logic from the step execution path: + // we only log "was cancelled" if the cancellation isn't currently supervised. + const maybeLog = (e: any) => { + const message = e?.message || String(e); + if (message.includes('Cancelled')) { + if (!fakeThis.cancellingTaskRuns.has(taskExternalId)) { + fakeThis.logger.debug(`Task run ${taskExternalId} was cancelled`); + } + } + }; + + maybeLog(new Error('Cancelled by worker')); + expect(logger.debug).not.toHaveBeenCalled(); + }); +}); diff --git a/sdks/typescript/src/v1/client/worker/worker-internal.ts b/sdks/typescript/src/v1/client/worker/worker-internal.ts index c5dc58c94d..75ecebc417 100644 --- a/sdks/typescript/src/v1/client/worker/worker-internal.ts +++ b/sdks/typescript/src/v1/client/worker/worker-internal.ts @@ -19,7 +19,7 @@ import { DesiredWorkerLabels, WorkflowConcurrencyOpts, } from '@hatchet/protoc/workflows'; -import { Logger } from '@hatchet/util/logger'; +import { actionMap, Logger, taskRunLog } from '@hatchet/util/logger'; import { WebhookWorkerCreateRequest } from '@clients/rest/generated/data-contracts'; import { BaseWorkflowDeclaration, WorkflowDefinition, HatchetClient } from '@hatchet/v1'; import { CreateTaskOpts } from '@hatchet/protoc/v1/workflows'; @@ -36,6 +36,8 @@ import { zodToJsonSchema } from 'zod-to-json-schema'; import { WorkerLabels } from '@hatchet/clients/dispatcher/dispatcher-client'; import { CreateStep, mapRateLimit, StepRunFunction } from '@hatchet/step'; import { applyNamespace } from '@hatchet/util/apply-namespace'; +import sleep from '@hatchet/util/sleep'; +import { throwIfAborted } from '@hatchet/util/abort-error'; import { Context, DurableContext } from './context'; import { parentRunContextManager } from '../../parent-run-context-vars'; import { HealthServer, workerStatus, type WorkerStatus } from './health-server'; @@ -182,21 +184,6 @@ export class V1Worker { }; } - getHandler(workflows: Workflow[]) { - throw new Error('Not implemented'); - // TODO v1 - // for (const workflow of workflows) { - // const wf: Workflow = { - // ...workflow, - // id: this.client.config.namespace + workflow.id, - // }; - - // this.registerActions(wf); - // } - - // return new WebhookHandler(this, workflows); - } - async registerWebhook(webhook: WebhookWorkerCreateRequest) { return this.client._v0.admin.registerWebhook({ ...webhook }); } @@ -534,7 +521,7 @@ export class V1Worker { } async handleStartStepRun(action: Action) { - const { actionId, taskRunExternalId } = action; + const { actionId, taskRunExternalId, taskName } = action; try { // Note: we always use a DurableContext since its a superset of the Context class @@ -550,13 +537,20 @@ export class V1Worker { } const run = async () => { - parentRunContextManager.setContext({ - parentId: action.workflowRunId, - parentTaskRunExternalId: taskRunExternalId, - childIndex: 0, - desiredWorkerId: this.workerId || '', - }); - return step(context); + return parentRunContextManager.runWithContext( + { + parentId: action.workflowRunId, + parentTaskRunExternalId: taskRunExternalId, + childIndex: 0, + desiredWorkerId: this.workerId || '', + signal: context.abortController.signal, + }, + () => { + // Precheck: if cancellation already happened, don't execute user code. + throwIfAborted(context.abortController.signal); + return step(context); + } + ); }; const success = async (result: any) => { @@ -565,7 +559,7 @@ export class V1Worker { return; } - this.logger.info(`Task run ${taskRunExternalId} succeeded`); + this.logger.info(taskRunLog(taskName, taskRunExternalId, 'completed')); // Send the action event to the dispatcher const event = this.getStepActionEvent( @@ -616,7 +610,7 @@ export class V1Worker { return; } - this.logger.error(`Task run ${taskRunExternalId} failed: ${error.message}`); + this.logger.error(taskRunLog(taskName, taskRunExternalId, `failed: ${error.message}`)); if (error.stack) { this.logger.error(error.stack); @@ -652,6 +646,19 @@ export class V1Worker { await failure(e); return; } + + // Postcheck: user code may swallow AbortError; don't report completion after cancellation. + // If we reached this point and the signal is aborted, the task likely caught/ignored cancellation. + if (context.abortController.signal.aborted) { + this.logger.warn( + `Cancellation: task run ${taskRunExternalId} returned after cancellation was signaled. ` + + `This usually means an AbortError was caught and not propagated. ` + + `See https://docs.hatchet.run/home/cancellation` + ); + return; + } + throwIfAborted(context.abortController.signal); + await success(result); })() ); @@ -673,9 +680,8 @@ export class V1Worker { await future.promise; } catch (e: any) { const message = e?.message || String(e); - if (message.includes('Cancelled')) { - this.logger.debug(`Task run ${taskRunExternalId} was cancelled`); - } else { + // TODO is this cased correctly... + if (!message.includes('Cancelled')) { this.logger.error( `Could not wait for task run ${taskRunExternalId} to finish. ` + `See https://docs.hatchet.run/home/cancellation for best practices on handling cancellation: `, @@ -689,7 +695,7 @@ export class V1Worker { } async handleStartGroupKeyRun(action: Action) { - const { actionId, getGroupKeyRunId, taskRunExternalId } = action; + const { actionId, getGroupKeyRunId, taskRunExternalId, taskName } = action; this.logger.error( 'Concurrency Key Functions have been deprecated and will be removed in a future release. Use Concurrency Expressions instead.' @@ -721,7 +727,7 @@ export class V1Worker { }; const success = (result: any) => { - this.logger.info(`Task run ${taskRunExternalId} succeeded`); + this.logger.info(taskRunLog(taskName, taskRunExternalId, 'completed')); try { // Send the action event to the dispatcher @@ -743,7 +749,7 @@ export class V1Worker { }; const failure = (error: any) => { - this.logger.error(`Task run ${key} failed: ${error.message}`); + this.logger.error(taskRunLog(taskName, taskRunExternalId, `failed: ${error.message}`)); try { // Send the action event to the dispatcher @@ -824,26 +830,73 @@ export class V1Worker { } async handleCancelStepRun(action: Action) { - const { taskRunExternalId } = action; + const { taskRunExternalId, taskName } = action; + try { - this.logger.info(`Cancelling task run ${taskRunExternalId}`); const future = this.futures[taskRunExternalId]; const context = this.contexts[taskRunExternalId]; if (context && context.abortController) { - context.abortController.abort('Cancelled by worker'); + context.abortController.abort('Cancelled by worker'); // TODO this reason is nonsensical } if (future) { - future.promise.catch(() => { - this.logger.info(`Cancelled task run ${taskRunExternalId}`); - }); - future.cancel('Cancelled by worker'); - await future.promise; + const start = Date.now(); + const warningThresholdMs = this.client.config.cancellation_warning_threshold ?? 300; + const gracePeriodMs = this.client.config.cancellation_grace_period ?? 1000; + const warningMs = Math.max(0, warningThresholdMs); + const graceMs = Math.max(0, gracePeriodMs); + + // Ensure cancelling this future doesn't create an unhandled rejection in cases + // where the main action handler isn't currently awaiting `future.promise`. + future.promise.catch(() => undefined); + + // Cancel the future (rejects the wrapper); user code must still cooperate with AbortSignal. + future.cancel('Cancelled by worker'); // TODO this reason is nonsensical + + // Track completion of the underlying work (not the cancelable wrapper). + // Ensure this promise never throws into our supervision flow. + const completion = (future.inner ?? future.promise).catch(() => undefined); + + // Wait until warning threshold, then log if still running. + if (warningMs > 0) { + const winner = await Promise.race([ + completion.then(() => 'done' as const), + sleep(warningMs).then(() => 'warn' as const), + ]); + + if (winner === 'warn') { + const milliseconds = Date.now() - start; + this.logger.warn( + `Cancellation: task run ${taskRunExternalId} has not cancelled after ${milliseconds}ms. Consider checking for blocking operations. ` + + `See https://docs.hatchet.run/home/cancellation` + ); + } + } + + // Wait until grace period (total), then log if still running. + const elapsedMs = Date.now() - start; + const remainingMs = graceMs - elapsedMs; + const winner = await Promise.race([ + completion.then(() => 'done' as const), + sleep(Math.max(0, remainingMs)).then(() => 'grace' as const), + ]); + + if (winner === 'done') { + this.logger.info(taskRunLog(taskName, taskRunExternalId, 'cancelled')); + } else { + const totalElapsedMs = Date.now() - start; + this.logger.error( + `Cancellation: task run ${taskRunExternalId} still running after cancellation grace period ` + + `${totalElapsedMs}ms.\n` + + `JavaScript cannot force-kill user code; see: https://docs.hatchet.run/home/cancellation` + ); + } } } catch (e: any) { - // Expected: the promise rejects when cancelled - this.logger.debug(`Task run ${taskRunExternalId} cancellation completed`); + this.logger.error( + `Cancellation: error while supervising cancellation for task run ${taskRunExternalId}: ${e?.message || e}` + ); } finally { delete this.futures[taskRunExternalId]; delete this.contexts[taskRunExternalId]; @@ -924,9 +977,9 @@ export class V1Worker { this.logger.info(`Worker ${this.name} listening for actions`); for await (const action of generator) { - this.logger.info( - `Worker ${this.name} received action ${action.actionId}:${action.actionType}` - ); + const receivedType = actionMap(action.actionType); + + this.logger.info(taskRunLog(action.taskName, action.taskRunExternalId, `${receivedType}`)); void this.handleAction(action); } diff --git a/sdks/typescript/src/v1/declaration.ts b/sdks/typescript/src/v1/declaration.ts index 090b86dc67..be3e15bd99 100644 --- a/sdks/typescript/src/v1/declaration.ts +++ b/sdks/typescript/src/v1/declaration.ts @@ -9,6 +9,7 @@ import { } from '@hatchet/clients/rest/generated/data-contracts'; import { Workflow as WorkflowV0 } from '@hatchet/workflow'; import { z } from 'zod'; +import { throwIfAborted } from '@hatchet/util/abort-error'; import { IHatchetClient } from './client/client.interface'; import { CreateWorkflowTaskOpts, @@ -312,7 +313,6 @@ export class BaseWorkflowDeclaration< // set the parent run context const parentRunContext = parentRunContextManager.getContext(); - parentRunContextManager.incrementChildIndex(Array.isArray(input) ? input.length : 1); if (!parentRunContext && (options?.childKey || options?.sticky)) { this.client.admin.logger.warn( @@ -320,8 +320,22 @@ export class BaseWorkflowDeclaration< ); } + const inheritedSignal = parentRunContext?.signal; + + // Precheck: if we're being called from a cancelled parent task, do not enqueue more work. + // The signal is inherited from the parent task's `ctx.abortController.signal`. + throwIfAborted(inheritedSignal, { + isTrigger: true, + context: parentRunContext?.parentTaskRunExternalId + ? `task run ${parentRunContext.parentTaskRunExternalId}` + : undefined, + warn: (message) => this.client!.admin.logger.warn(message), + }); + + parentRunContextManager.incrementChildIndex(Array.isArray(input) ? input.length : 1); + const runOpts = { - ...options, + ...(options ?? {}), parentId: parentRunContext?.parentId, parentTaskRunExternalId: parentRunContext?.parentTaskRunExternalId, childIndex: parentRunContext?.childIndex, @@ -357,6 +371,9 @@ export class BaseWorkflowDeclaration< // eslint-disable-next-line no-param-reassign ref._standaloneTaskName = _standaloneTaskName; } + // Ensure result subscriptions inherit cancellation if no signal is provided explicitly. + // eslint-disable-next-line no-param-reassign + ref.defaultSignal = inheritedSignal; res.push(ref); }); return res; @@ -368,6 +385,7 @@ export class BaseWorkflowDeclaration< res._standaloneTaskName = _standaloneTaskName; } + res.defaultSignal = inheritedSignal; return res; } @@ -432,10 +450,16 @@ export class BaseWorkflowDeclaration< throw UNBOUND_ERR; } + // If called from within a cancelled parent task, do not enqueue scheduled work. + throwIfAborted(parentRunContextManager.getContext()?.signal, { + isTrigger: true, + warn: (message) => this.client!.admin.logger.warn(message), + }); + const scheduled = this.client.scheduled.create(this.definition.name, { triggerAt: enqueueAt, input: input as JsonObject, - ...options, + ...(options ?? {}), }); return scheduled; @@ -474,10 +498,16 @@ export class BaseWorkflowDeclaration< throw UNBOUND_ERR; } + // If called from within a cancelled parent task, do not enqueue cron work. + throwIfAborted(parentRunContextManager.getContext()?.signal, { + isTrigger: true, + warn: (message) => this.client!.admin.logger.warn(message), + }); + const cronDef = this.client.crons.create(this.definition.name, { expression, input: input as JsonObject, - ...options, + ...(options ?? {}), additionalMetadata: options?.additionalMetadata, name, }); diff --git a/sdks/typescript/src/v1/examples/on_event/event.e2e.ts b/sdks/typescript/src/v1/examples/on_event/event.e2e.ts index 612082037c..3d081e0a14 100644 --- a/sdks/typescript/src/v1/examples/on_event/event.e2e.ts +++ b/sdks/typescript/src/v1/examples/on_event/event.e2e.ts @@ -56,6 +56,7 @@ xdescribe('events-e2e', () => { const maxAttempts = 15; const eventToRuns: Record = {}; + // eslint-disable-next-line no-constant-condition while (true) { console.log('Waiting for event runs to complete...'); if (attempts > maxAttempts) { diff --git a/sdks/typescript/src/v1/parent-run-context-vars.ts b/sdks/typescript/src/v1/parent-run-context-vars.ts index e02d45ccdb..8518d23f8a 100644 --- a/sdks/typescript/src/v1/parent-run-context-vars.ts +++ b/sdks/typescript/src/v1/parent-run-context-vars.ts @@ -8,6 +8,12 @@ export interface ParentRunContext { parentTaskRunExternalId: string; desiredWorkerId: string; childIndex?: number; + + /** + * (optional) AbortSignal inherited by nested `run()` calls. + * Used to cancel local "wait for result" subscriptions when the parent task is cancelled. + */ + signal?: AbortSignal; } export class ParentRunContextManager { @@ -17,6 +23,15 @@ export class ParentRunContextManager { this.storage = new AsyncLocalStorage(); } + runWithContext(opts: ParentRunContext, fn: () => T): T { + return this.storage.run( + { + ...opts, + }, + fn + ); + } + setContext(opts: ParentRunContext): void { this.storage.enterWith({ ...opts,