Skip to content

Commit 46744a4

Browse files
JoaoDiasAblyclaude
andcommitted
transport: accept external abort signal in newTurn
Allow server-side route handlers to pass an external AbortSignal (e.g. req.signal) into newTurn via the new `signal` option. When the signal fires — due to client disconnect or serverless function timeout — the turn's internal AbortController is aborted through the same path as Ably cancel messages, stopping LLM generation and stream piping. The listener is cleaned up on both end() and close() to avoid leaks. Tests cover: signal abort, pre-aborted signal, in-flight stream cancellation, listener cleanup on end() and close(), and non-interference with Ably cancel routing. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 3e04e96 commit 46744a4

File tree

5 files changed

+119
-2
lines changed

5 files changed

+119
-2
lines changed

demo/vercel/react/use-chat/src/app/api/chat/route.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export async function POST(req: Request) {
3434
const channel = ably.channels.get(id);
3535

3636
const transport = createServerTransport({ channel });
37-
const turn = transport.newTurn({ turnId, clientId, parent, forkOf });
37+
const turn = transport.newTurn({ turnId, clientId, parent, forkOf, signal: req.signal });
3838

3939
await turn.start();
4040

demo/vercel/react/use-client-transport/src/app/api/chat/route.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ export async function POST(req: Request) {
179179
}
180180
};
181181

182-
const turn = transport.newTurn({ turnId, clientId, parent, forkOf, onMessage });
182+
const turn = transport.newTurn({ turnId, clientId, parent, forkOf, onMessage, signal: req.signal });
183183

184184
await turn.start();
185185

src/core/transport/server-transport.ts

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ interface RegisteredTurn {
5252
controller: AbortController;
5353
onCancel?: (request: CancelRequest) => Promise<boolean>;
5454
onError?: (error: Ably.ErrorInfo) => void;
55+
/** Removes the listener from the external signal, if one was provided. */
56+
cleanupExternalSignal?: () => void;
5557
}
5658

5759
// ---------------------------------------------------------------------------
@@ -116,6 +118,7 @@ class DefaultServerTransport<TEvent, TMessage> implements ServerTransport<TEvent
116118
this._logger?.trace('DefaultServerTransport.close();');
117119
this._channel.unsubscribe(EVENT_CANCEL, this._channelListener);
118120
for (const reg of this._registeredTurns.values()) {
121+
reg.cleanupExternalSignal?.();
119122
reg.controller.abort();
120123
}
121124
this._registeredTurns.clear();
@@ -249,19 +252,44 @@ class DefaultServerTransport<TEvent, TMessage> implements ServerTransport<TEvent
249252
onError: turnOnError,
250253
parent: turnParent,
251254
forkOf: turnForkOf,
255+
signal: externalSignal,
252256
} = turnOpts;
253257

254258
const controller = new AbortController();
255259
let started = false;
256260
let ended = false;
257261

262+
// Link external signal (e.g. req.signal) to the internal controller so
263+
// platform-level cancellation (client disconnect, function timeout) aborts
264+
// the turn through the same path as Ably cancel messages.
265+
let cleanupExternalSignal: (() => void) | undefined;
266+
if (externalSignal) {
267+
if (externalSignal.aborted) {
268+
this._logger?.debug('DefaultServerTransport._createTurn(); external signal already aborted', { turnId });
269+
controller.abort();
270+
} else {
271+
const listener = () => {
272+
this._logger?.debug('DefaultServerTransport._createTurn(); external signal fired, aborting turn', {
273+
turnId,
274+
});
275+
controller.abort();
276+
};
277+
externalSignal.addEventListener('abort', listener, { once: true });
278+
cleanupExternalSignal = () => {
279+
externalSignal.removeEventListener('abort', listener);
280+
};
281+
this._logger?.debug('DefaultServerTransport._createTurn(); linked external signal', { turnId });
282+
}
283+
}
284+
258285
// Spec: AIT-ST3a — register immediately so early cancels can fire the abort signal.
259286
const registration: RegisteredTurn = {
260287
turnId,
261288
clientId: turnClientId ?? '',
262289
controller,
263290
onCancel,
264291
onError: turnOnError,
292+
cleanupExternalSignal,
265293
};
266294
this._registeredTurns.set(turnId, registration);
267295

@@ -468,6 +496,7 @@ class DefaultServerTransport<TEvent, TMessage> implements ServerTransport<TEvent
468496
throw errInfo;
469497
} finally {
470498
registeredTurns.delete(turnId);
499+
cleanupExternalSignal?.();
471500
}
472501

473502
logger?.debug('Turn.end(); turn ended', { turnId, reason });

src/core/transport/types.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,14 @@ export interface NewTurnOptions<TEvent> {
158158
* publish failure, encoder recovery failure, stream encoding errors.
159159
*/
160160
onError?: (error: Ably.ErrorInfo) => void;
161+
162+
/**
163+
* An external abort signal (typically the HTTP request's `req.signal`) that,
164+
* when fired, aborts this turn. This allows platform-level cancellation —
165+
* client disconnect, serverless function timeout — to stop LLM generation
166+
* and stream piping gracefully.
167+
*/
168+
signal?: AbortSignal;
161169
}
162170

163171
// ---------------------------------------------------------------------------

test/core/transport/server-transport.test.ts

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,75 @@ describe('ServerTransport', () => {
574574
});
575575
});
576576

577+
describe('external signal', () => {
578+
it('aborts the turn when the external signal fires', async () => {
579+
const externalController = new AbortController();
580+
const turn = transport.newTurn({ turnId: 'turn-1', signal: externalController.signal });
581+
await turn.start();
582+
583+
externalController.abort();
584+
585+
expect(turn.abortSignal.aborted).toBe(true);
586+
});
587+
588+
it('aborts the turn immediately when the external signal is already aborted', () => {
589+
const turn = transport.newTurn({ turnId: 'turn-1', signal: AbortSignal.abort() });
590+
591+
expect(turn.abortSignal.aborted).toBe(true);
592+
});
593+
594+
it('start() throws when the external signal was already aborted', async () => {
595+
const turn = transport.newTurn({ turnId: 'turn-1', signal: AbortSignal.abort() });
596+
597+
await expect(turn.start()).rejects.toBeErrorInfoWithCode(ErrorCode.InvalidArgument);
598+
});
599+
600+
it('cancels an in-flight streamResponse when the external signal fires', async () => {
601+
const externalController = new AbortController();
602+
const turn = transport.newTurn({ turnId: 'turn-1', signal: externalController.signal });
603+
await turn.start();
604+
605+
// A stream that never closes on its own — waits for cancellation.
606+
const stream = new ReadableStream<TestEvent>({
607+
start: (controller) => {
608+
controller.enqueue({ type: 'text', text: 'partial' });
609+
},
610+
});
611+
612+
const resultPromise = turn.streamResponse(stream);
613+
externalController.abort();
614+
615+
const result = await resultPromise;
616+
expect(result.reason).toBe('cancelled');
617+
});
618+
619+
it('removes the external signal listener when the turn ends', async () => {
620+
const externalController = new AbortController();
621+
const signal = externalController.signal;
622+
const removeSpy = vi.spyOn(signal, 'removeEventListener');
623+
624+
const turn = transport.newTurn({ turnId: 'turn-1', signal });
625+
await turn.start();
626+
await turn.end('complete');
627+
628+
expect(removeSpy).toHaveBeenCalledWith('abort', expect.any(Function));
629+
});
630+
631+
it('does not interfere with Ably cancel routing', async () => {
632+
const externalController = new AbortController();
633+
const turn = transport.newTurn({ turnId: 'turn-1', clientId: 'user-a', signal: externalController.signal });
634+
await turn.start();
635+
636+
// Cancel via Ably channel message (not external signal)
637+
simulateCancel(channel, { [HEADER_CANCEL_TURN_ID]: 'turn-1' });
638+
await new Promise((r) => setTimeout(r, 10));
639+
640+
expect(turn.abortSignal.aborted).toBe(true);
641+
// External signal was NOT fired
642+
expect(externalController.signal.aborted).toBe(false);
643+
});
644+
});
645+
577646
describe('close', () => {
578647
it('aborts all registered turns', async () => {
579648
const turn1 = transport.newTurn({ turnId: 'turn-1' });
@@ -591,5 +660,16 @@ describe('ServerTransport', () => {
591660
transport.close();
592661
expect(channel.unsubscribe).toHaveBeenCalledWith(EVENT_CANCEL, expect.any(Function));
593662
});
663+
664+
it('removes external signal listeners on close', () => {
665+
const externalController = new AbortController();
666+
const removeSpy = vi.spyOn(externalController.signal, 'removeEventListener');
667+
668+
transport.newTurn({ turnId: 'turn-1', signal: externalController.signal });
669+
670+
transport.close();
671+
672+
expect(removeSpy).toHaveBeenCalledWith('abort', expect.any(Function));
673+
});
594674
});
595675
});

0 commit comments

Comments
 (0)