Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions frontend/docs/pages/home/cancellation.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,26 @@ When a task is canceled, Hatchet sends a cancellation signal to the task. The ta

/>

### AbortError behavior

Hatchet cancellation in TypeScript is driven by an internal `ctx.abortController.signal` (an `AbortSignal`). When a task is cancelled, Hatchet aborts that signal and cancellation-aware operations (like waiting on a child run result) will reject with an **`AbortError`**.

<Callout type="warning">
**Important:** JavaScript/TypeScript cannot make cancellation “uncatchable”. A broad `catch (e) { ... }` can swallow cancellation. Hatchet’s SDK will avoid enqueueing new child runs once the parent task is cancelled, and it will not report a cancelled task as “completed” even if user code catches the abort — but your code should still exit quickly to avoid wasted work.
</Callout>

If you must catch errors, re-throw abort/cancellation errors:

```ts
try {
// ... work ...
await simple.run({});
} catch (e) {
ctx.rethrowIfCancelled(e);
// ... other error handling ...
}
```

</Tabs.Tab>
<Tabs.Tab title="Go">
<Snippet src={snippets.go.cancellations.main.cancelled_task}/>
Expand Down
18 changes: 17 additions & 1 deletion sdks/typescript/.eslintrc.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,21 @@
"class-methods-use-this": "off",
"no-await-in-loop": "off",
"no-restricted-syntax": "off"
}
},
"overrides": [
{
"files": [
"src/**/examples/**/*.{ts,tsx,js}",
"src/examples/**/*.{ts,tsx,js}",
"tests/**/*.{ts,tsx,js}",
"src/**/*.test.{ts,tsx,js}",
"src/**/*.e2e.{ts,tsx,js}",
"src/**/__tests__/**/*.{ts,tsx,js}"
],
"rules": {
"@typescript-eslint/no-unused-vars": "off",
"no-console": "off"
}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class HeartbeatWorker {

if (actualInterval > HEARTBEAT_INTERVAL * 1.2) {
const message = `Heartbeat interval delay (${actualInterval}ms >> ${HEARTBEAT_INTERVAL}ms)`;
this.logger.warn(message);
this.logger.debug(message);
postMessage({
type: 'warn',
message,
Expand Down
56 changes: 56 additions & 0 deletions sdks/typescript/src/clients/hatchet-client/client-config.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { ClientConfigSchema } from './client-config';

function baseConfig() {
return {
token: 'token',
tls_config: {},
host_port: 'localhost:7070',
api_url: 'http://localhost:8080',
tenant_id: 'tenant',
};
}

describe('ClientConfigSchema cancellation timing', () => {
it('applies defaults (milliseconds)', () => {
const cfg = ClientConfigSchema.parse(baseConfig());
expect(cfg.cancellation_grace_period).toBe(1000);
expect(cfg.cancellation_warning_threshold).toBe(300);
});

it('accepts integer milliseconds', () => {
const cfg = ClientConfigSchema.parse({
...baseConfig(),
cancellation_grace_period: 2500,
cancellation_warning_threshold: 400,
});
expect(cfg.cancellation_grace_period).toBe(2500);
expect(cfg.cancellation_warning_threshold).toBe(400);
});

it('rejects invalid values', () => {
expect(() =>
ClientConfigSchema.parse({
...baseConfig(),
cancellation_grace_period: -1,
})
).toThrow();
expect(() =>
ClientConfigSchema.parse({
...baseConfig(),
cancellation_warning_threshold: 0.1,
})
).toThrow();
expect(() =>
ClientConfigSchema.parse({
...baseConfig(),
cancellation_warning_threshold: 'nope' as any,
})
).toThrow();
expect(() =>
ClientConfigSchema.parse({
...baseConfig(),
cancellation_grace_period: '7s' as any,
})
).toThrow();
});
});
23 changes: 20 additions & 3 deletions sdks/typescript/src/clients/hatchet-client/client-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ import { ChannelCredentials } from 'nice-grpc';
import { z } from 'zod';
import { Logger, LogLevel } from '@util/logger';

// Cancellation timings are specified in integer milliseconds.
const DurationMsSchema = z.number().int().nonnegative().finite();

const ClientTLSConfigSchema = z.object({
tls_strategy: z.enum(['tls', 'mtls', 'none']).optional(),
cert_file: z.string().optional(),
Expand All @@ -24,11 +27,25 @@ export const ClientConfigSchema = z.object({
log_level: z.enum(['OFF', 'DEBUG', 'INFO', 'WARN', 'ERROR']).optional(),
tenant_id: z.string(),
namespace: z.string().optional(),
cancellation_grace_period: DurationMsSchema.optional().default(1000),
cancellation_warning_threshold: DurationMsSchema.optional().default(300),
});

export type LogConstructor = (context: string, logLevel?: LogLevel) => Logger;

export type ClientConfig = z.infer<typeof ClientConfigSchema> & {
credentials?: ChannelCredentials;
} & { logger: LogConstructor };
type ClientConfigInferred = z.infer<typeof ClientConfigSchema>;

// Backwards-compatible: allow callers to omit these (schema supplies defaults when parsed).
type ClientConfigCancellationCompat = {
cancellation_grace_period?: ClientConfigInferred['cancellation_grace_period'];
cancellation_warning_threshold?: ClientConfigInferred['cancellation_warning_threshold'];
};

export type ClientConfig = Omit<
ClientConfigInferred,
'cancellation_grace_period' | 'cancellation_warning_threshold'
> &
ClientConfigCancellationCompat & {
credentials?: ChannelCredentials;
} & { logger: LogConstructor };
export type ClientTLSConfig = z.infer<typeof ClientTLSConfigSchema>;
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ export class HatchetLogger implements Logger {
await this.log('ERROR', error ? `${message} ${error}` : message, '91');
}

// eslint-disable-next-line @typescript-eslint/no-unused-vars
util(key: UtilKeys, message: string, extra?: LogExtra): void | Promise<void> {
if (key === 'trace') {
this.log('INFO', `trace: ${message}`, '35');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ export class DurableListenerClient {
return this.pooledListener.subscribe(request);
}

result(request: { taskId: string; signalKey: string }, opts?: { signal?: AbortSignal }) {
if (!this.pooledListener) {
this.pooledListener = new DurableEventGrpcPooledListener(this, () => {
this.pooledListener = undefined;
});
}

return this.pooledListener.result(request, opts);
}

registerDurableEvent(request: {
taskId: string;
signalKey: string;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { DurableEventStreamable } from './pooled-durable-listener-client';

const dummyListener: AsyncIterable<any> = (async function* gen() {
// never yields
})();

describe('DurableEventStreamable.get cancellation', () => {
it('rejects with AbortError and runs cleanup when aborted', async () => {
const cleanup = jest.fn();
const s = new DurableEventStreamable(dummyListener, 'task', 'key', 'sub-1', cleanup);
const ac = new AbortController();

const p = s.get({ signal: ac.signal });
ac.abort();

await expect(p).rejects.toMatchObject({ name: 'AbortError' });
expect(cleanup).toHaveBeenCalledTimes(1);
});

it('resolves on response and runs cleanup once', async () => {
const cleanup = jest.fn();
const s = new DurableEventStreamable(dummyListener, 'task', 'key', 'sub-1', cleanup);

const event: any = { taskId: 'task', signalKey: 'key', data: '{}' };
setTimeout(() => s.responseEmitter.emit('response', event), 0);

await expect(s.get()).resolves.toEqual(event);
expect(cleanup).toHaveBeenCalledTimes(1);
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import {
} from '@hatchet/protoc/v1/dispatcher';
import { isAbortError } from 'abort-controller-x';
import sleep from '@hatchet/util/sleep';
import { createAbortError } from '@hatchet/util/abort-error';
import {
DurableEventListenerConditions,
SleepMatchCondition,
Expand All @@ -19,18 +20,60 @@ export class DurableEventStreamable {
listener: AsyncIterable<DurableEvent>;
taskId: string;
signalKey: string;
subscriptionId: string;
onCleanup: () => void;

responseEmitter = new EventEmitter();

constructor(listener: AsyncIterable<DurableEvent>, taskId: string, signalKey: string) {
constructor(
listener: AsyncIterable<DurableEvent>,
taskId: string,
signalKey: string,
subscriptionId: string,
onCleanup: () => void
) {
this.listener = listener;
this.taskId = taskId;
this.signalKey = signalKey;
this.subscriptionId = subscriptionId;
this.onCleanup = onCleanup;
}

async get(): Promise<DurableEvent> {
return new Promise((resolve) => {
this.responseEmitter.once('response', resolve);
async get(opts?: { signal?: AbortSignal }): Promise<DurableEvent> {
const signal = opts?.signal;

return new Promise((resolve, reject) => {
let cleanedUp = false;

const cleanup = () => {
if (cleanedUp) return;
cleanedUp = true;
this.responseEmitter.removeListener('response', onResponse);
if (signal) {
signal.removeEventListener('abort', onAbort);
}
this.onCleanup();
};

const onResponse = (event: DurableEvent) => {
cleanup();
resolve(event);
};

const onAbort = () => {
cleanup();
reject(createAbortError('Operation cancelled by AbortSignal'));
};

if (signal?.aborted) {
onAbort();
return;
}

this.responseEmitter.once('response', onResponse);
if (signal) {
signal.addEventListener('abort', onAbort, { once: true });
}
});
}
}
Expand Down Expand Up @@ -106,15 +149,7 @@ export class DurableEventGrpcPooledListener {
const emitter = this.subscribers[subId];
if (emitter) {
emitter.responseEmitter.emit('response', event);
delete this.subscribers[subId];

// Remove this subscription from the mapping
this.taskSignalKeyToSubscriptionIds[subscriptionKey] =
this.taskSignalKeyToSubscriptionIds[subscriptionKey].filter((id) => id !== subId);

if (this.taskSignalKeyToSubscriptionIds[subscriptionKey].length === 0) {
delete this.taskSignalKeyToSubscriptionIds[subscriptionKey];
}
this.cleanupSubscription(subId);
}
}
}
Expand All @@ -138,14 +173,42 @@ export class DurableEventGrpcPooledListener {
}
}

private cleanupSubscription(subscriptionId: string) {
const emitter = this.subscribers[subscriptionId];
if (!emitter) {
return;
}

const subscriptionKey = keyHelper(emitter.taskId, emitter.signalKey);

delete this.subscribers[subscriptionId];

// Remove from the mapping
if (this.taskSignalKeyToSubscriptionIds[subscriptionKey]) {
this.taskSignalKeyToSubscriptionIds[subscriptionKey] = this.taskSignalKeyToSubscriptionIds[
subscriptionKey
].filter((id) => id !== subscriptionId);

if (this.taskSignalKeyToSubscriptionIds[subscriptionKey].length === 0) {
delete this.taskSignalKeyToSubscriptionIds[subscriptionKey];
}
}
}

subscribe(request: { taskId: string; signalKey: string }): DurableEventStreamable {
const { taskId, signalKey } = request;

if (!this.listener) throw new Error('listener not initialized');

// eslint-disable-next-line no-plusplus
const subscriptionId = (this.subscriptionCounter++).toString();
const subscriber = new DurableEventStreamable(this.listener, taskId, signalKey);
const subscriber = new DurableEventStreamable(
this.listener,
taskId,
signalKey,
subscriptionId,
() => this.cleanupSubscription(subscriptionId)
);

this.subscribers[subscriptionId] = subscriber;

Expand All @@ -159,9 +222,12 @@ export class DurableEventGrpcPooledListener {
return subscriber;
}

async result(request: { taskId: string; signalKey: string }): Promise<DurableEvent> {
async result(
request: { taskId: string; signalKey: string },
opts?: { signal?: AbortSignal }
): Promise<DurableEvent> {
const subscriber = this.subscribe(request);
const event = await subscriber.get();
const event = await subscriber.get({ signal: opts?.signal });
return event;
}

Expand Down Expand Up @@ -189,7 +255,7 @@ export class DurableEventGrpcPooledListener {
const subscriptionEntries = Object.entries(this.taskSignalKeyToSubscriptionIds);
this.client.logger.debug(`Replaying ${subscriptionEntries.length} requests...`);

for (const [key, _] of subscriptionEntries) {
for (const [key] of subscriptionEntries) {
const [taskId, signalKey] = key.split('|');
this.requestEmitter.emit('subscribe', { taskId, signalKey });
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ export class RunEventListener {
this.eventEmitter.emit('event');
});

// eslint-disable-next-line @typescript-eslint/no-unused-vars
for await (const _ of on(this.eventEmitter, 'event')) {
while (this.q.length > 0) {
const r = this.q.shift();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { Streamable } from './pooled-child-listener-client';

describe('RunGrpcPooledListener Streamable', () => {
it('rejects with AbortError and runs cleanup when aborted', async () => {
const onCleanup = jest.fn();
// eslint-disable-next-line func-names, no-empty-function
const streamable = new Streamable((async function* () {})(), 'run-1', onCleanup);

const ac = new AbortController();
const p = streamable.get({ signal: ac.signal });
ac.abort();

await expect(p).rejects.toMatchObject({ name: 'AbortError' });
expect(onCleanup).toHaveBeenCalled();
});
});
Loading
Loading