Skip to content

Commit 213aa36

Browse files
committed
fix(vercel): return real turn.stream to useChat
Fixes an issue where `sendMessages()` returned an empty `ReadableStream` to `useChat`. The stream returned here is used to power important features of useChat, such as: - status changes (submitted -> streaming -> ready) - turn lifecycle hooks (e.g. `onToolCall`, `onData`, `onFinish`) - evaluating whether `sendAutomaticallyWhen` should send With an empty stream, none of these features worked. This transport aims to, as much as possible, be a drop-in replacement for the default transport so we need to make these features work. This commit returns the real stream from `turn.stream` to `useChat`, which allows these features to work as expected. As a result of this change, both `useChat` and `useMessageSync` now accumulate messages in parallel but both produce identical messages from the same underlying Ably events. When the AI SDK doesn't set messageId on the start chunk (currently this is logically equivalent to not being in 'persistence mode') useChat and the transport will assign different IDs. This commit adds `messageId` to `EncoderOptions` so that the server transport can pass its generated ID to the vercel encoder which uses it as a fallback, which ensures that both sides converge on the same ID.
1 parent 6e24371 commit 213aa36

8 files changed

Lines changed: 369 additions & 42 deletions

File tree

docs/internals/chat-transport.md

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Vercel's `useChat` manages message state internally. When the user submits a mes
99
1. Determine which messages are new vs history
1010
2. Compute fork metadata for regeneration
1111
3. Delegate to the core transport's `send()`
12-
4. Return a stream that signals completion without duplicating state
12+
4. Return the turn stream so `useChat` can drive status and callbacks
1313

1414
## sendMessages
1515

@@ -28,11 +28,13 @@ The `prepareSendMessagesRequest` hook (optional) lets the server app customize t
2828

2929
Without the hook, the adapter builds a default body with `history` (including per-message Ably headers), `id`, `trigger`, and fork metadata fields.
3030

31-
### Empty stream return
31+
### Real stream return
3232

33-
The adapter returns an **empty stream** that closes when the turn ends - not the real event stream. This is intentional: `useChat` consumes the returned stream to accumulate the assistant message, but `useMessageSync` (the companion React hook) already pushes the transport's authoritative message state into `useChat` via `setMessages`. Returning the real event stream would cause `useChat` to accumulate a duplicate assistant message.
33+
The adapter returns the real turn stream from `sendMessages()`. `useChat` consumes this stream to drive status transitions (`submitted` -> `streaming` -> `ready`), fire callbacks (`onToolCall`, `onData`, `onFinish`), and evaluate `sendAutomaticallyWhen`.
3434

35-
The empty stream is created via a `TransformStream` whose writable side closes when the turn's real stream finishes.
35+
Both `useChat` and `useMessageSync` accumulate messages in parallel: `useChat` builds from the stream, while `useMessageSync` pushes from the transport's message store via `setMessages` (a full replacement). The transport's version is always authoritative - both accumulators produce identical messages from the same chunks, and `setMessages` overwrites `useChat`'s state on every transport event.
36+
37+
The server encoder ensures `messageId` alignment by stamping the transport-assigned `x-ably-msg-id` as a fallback domain `messageId` on the `start` chunk. This ensures both accumulators assign the same message ID.
3638

3739
### Abort signal
3840

src/core/codec/types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,12 @@ export interface EncoderOptions {
223223
extras?: Extras;
224224
/** Hook called before each Ably message is published. Mutate the message in place to add transport-level headers. */
225225
onMessage?: (message: Ably.Message) => void;
226+
/**
227+
* Domain-level message identity. Domain encoders use this as a fallback
228+
* messageId when a lifecycle chunk (e.g. `start`) does not provide one,
229+
* ensuring useChat and the transport accumulator assign the same ID.
230+
*/
231+
messageId?: string;
226232
}
227233

228234
/**

src/core/transport/server-transport.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,17 +381,19 @@ class DefaultServerTransport<TEvent, TMessage> implements ServerTransport<TEvent
381381
const assistantParent =
382382
streamOpts?.parent === undefined ? (turnParent ?? undefined) : (streamOpts.parent ?? undefined);
383383

384+
const msgId = crypto.randomUUID();
384385
const defaultHeaders = buildTransportHeaders({
385386
role: 'assistant',
386387
turnId,
387-
msgId: crypto.randomUUID(),
388+
msgId,
388389
turnClientId: turnOwnerClientId,
389390
parent: assistantParent,
390391
forkOf: streamOpts?.forkOf ?? turnForkOf,
391392
});
392393
const encoder = codec.createEncoder(channel, {
393394
extras: { headers: defaultHeaders },
394395
onMessage,
396+
messageId: msgId,
395397
});
396398

397399
const result = await pipeStream(stream, encoder, signal, onAbort, logger);

src/vercel/codec/encoder.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,12 @@ import { headerWriter } from '../../utils.js';
4646

4747
class DefaultUIMessageEncoder implements StreamEncoder<AI.UIMessageChunk, AI.UIMessage> {
4848
private readonly _core: EncoderCore;
49+
private readonly _messageId: string | undefined;
4950
private _aborted = false;
5051

5152
constructor(writer: ChannelWriter, options: EncoderCoreOptions = {}) {
5253
this._core = createEncoderCore(writer, options);
54+
this._messageId = options.messageId;
5355
}
5456

5557
async appendEvent(chunk: AI.UIMessageChunk, perWrite?: WriteOptions): Promise<void> {
@@ -144,7 +146,7 @@ class DefaultUIMessageEncoder implements StreamEncoder<AI.UIMessageChunk, AI.UIM
144146

145147
case 'start': {
146148
const h = headerWriter()
147-
.str('messageId', chunk.messageId)
149+
.str('messageId', chunk.messageId ?? this._messageId)
148150
.json('messageMetadata', chunk.messageMetadata)
149151
.build();
150152
await this._core.publishDiscrete({ name: 'start', data: '', headers: h }, perWrite);

src/vercel/transport/chat-transport.ts

Lines changed: 9 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -242,34 +242,15 @@ export const createChatTransport = (
242242
});
243243
}
244244

245-
// Return an empty stream that closes when the turn ends.
246-
// useChat consumes the returned stream to accumulate the assistant message,
247-
// but useMessageSync already pushes the transport's authoritative message
248-
// state into useChat via setMessages. Returning the real event stream would
249-
// cause useChat to accumulate a duplicate assistant message. Instead, we
250-
// return a stream that produces no chunks and closes when the turn's stream
251-
// finishes, so useChat knows when streaming is done without duplicating state.
252-
const { readable, writable } = new TransformStream<AI.UIMessageChunk>();
253-
const writer = writable.getWriter();
254-
// Fire-and-forget: we only care about the close/abort signal, not the piped data.
255-
// Errors on the turn stream are surfaced via transport.on('error'), not here.
256-
/* eslint-disable @typescript-eslint/no-empty-function -- swallow: writer.close() rejection after stream teardown is unrecoverable */
257-
turn.stream
258-
.pipeTo(
259-
new WritableStream({
260-
close: () => {
261-
writer.close().catch(() => {});
262-
},
263-
abort: () => {
264-
writer.close().catch(() => {});
265-
},
266-
}),
267-
)
268-
.catch(() => {
269-
writer.close().catch(() => {});
270-
});
271-
/* eslint-enable @typescript-eslint/no-empty-function */
272-
return readable;
245+
// Return the real turn stream. useChat reads chunks from this stream to
246+
// drive status transitions (submitted → streaming → ready), fire callbacks
247+
// (onToolCall, onData, onFinish), and evaluate sendAutomaticallyWhen.
248+
//
249+
// Both useChat and useMessageSync accumulate messages in parallel:
250+
// useChat builds from the stream, useMessageSync pushes from the
251+
// transport's message store via setMessages (a full replacement).
252+
// The transport's version is always authoritative.
253+
return turn.stream;
273254
},
274255

275256
// Observer mode handles in-progress streams automatically.

test/vercel/codec/encoder.test.ts

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,14 +234,30 @@ describe('Vercel encoder', () => {
234234
expect(headersOf(msg)[`${D}messageId`]).toBe('msg-1');
235235
});
236236

237-
it('omits messageId domain header when start chunk has no messageId', async () => {
237+
it('omits messageId domain header when neither chunk nor options provide it', async () => {
238238
const encoder = createEncoder(writer);
239239
await encoder.appendEvent({ type: 'start' });
240240

241241
const msg = firstPublish(writer);
242242
expect(headersOf(msg)[`${D}messageId`]).toBeUndefined();
243243
});
244244

245+
it('falls back to options.messageId when start chunk has no messageId', async () => {
246+
const encoder = createEncoder(writer, { messageId: 'fallback-id' });
247+
await encoder.appendEvent({ type: 'start' });
248+
249+
const msg = firstPublish(writer);
250+
expect(headersOf(msg)[`${D}messageId`]).toBe('fallback-id');
251+
});
252+
253+
it('prefers chunk.messageId over options.messageId', async () => {
254+
const encoder = createEncoder(writer, { messageId: 'fallback-id' });
255+
await encoder.appendEvent({ type: 'start', messageId: 'chunk-id' });
256+
257+
const msg = firstPublish(writer);
258+
expect(headersOf(msg)[`${D}messageId`]).toBe('chunk-id');
259+
});
260+
245261
it('stamps x-ably-msg-id from WriteOptions on all publishes', async () => {
246262
const encoder = createEncoder(writer);
247263
const perWrite = { messageId: 'msg-1' };

0 commit comments

Comments
 (0)