codec/anthropic: add Anthropic Agent SDK codec implementation#17
codec/anthropic: add Anthropic Agent SDK codec implementation#17JoaoDiasAbly wants to merge 14 commits intomainfrom
Conversation
36ef9e3 to
d8d278b
Compare
| // -- Complete assistant message (non-streaming or post-stream). | ||
| // Publishes the full BetaMessage as data. For typical responses this is | ||
| // well under Ably's 64 KB message limit, but very large tool inputs or | ||
| // multi-block responses could approach it. Streaming mode avoids this | ||
| // because content arrives as small deltas instead. | ||
| case 'assistant': { | ||
| const messageId = event.message.id; | ||
| const h = headerWriter() | ||
| .str('messageId', messageId) | ||
| .str('uuid', event.uuid) | ||
| .str('sessionId', event.session_id) | ||
| .str('parentToolUseId', event.parent_tool_use_id ?? undefined) | ||
| .build(); | ||
| await this._core.publishDiscrete({ name: 'assistant-message', data: event.message, headers: h }, perWrite); | ||
| break; |
There was a problem hiding this comment.
I don't think we want to re-publish the full assistant response as a separate message after we're done streaming it.
There was a problem hiding this comment.
I was a bit confused on what to do here after coming back to it for a few days. I do understand the problem and at first I thought I was going to remove this case, but I don't think that's what we want here. I think the correct thing is to know when to skip this so that's the approach I took now. LMK what you think
| // Default implementation | ||
| // --------------------------------------------------------------------------- | ||
|
|
||
| class DefaultAgentAccumulator implements MessageAccumulator<AgentCodecEvent, AgentMessage> { |
There was a problem hiding this comment.
I think this class is missing initMessage and completeMessage ? I get errors compiling the ts
| // Discrete message parts from writeMessages: identified by x-ably-role header. | ||
| // Only applies to user-message and assistant-message names — other discrete | ||
| // events (message-start, message-delta, result, etc.) also carry x-ably-role | ||
| // but must be dispatched by name, not role. | ||
| if (HEADER_ROLE in h && (input.name === 'user-message' || input.name === 'assistant-message')) { | ||
| const role = h[HEADER_ROLE]; | ||
| if (role === 'user') { | ||
| return decodeUserMessage(input); | ||
| } | ||
| return decodeAssistantMessage(input); | ||
| } | ||
|
|
||
| switch (input.name) { | ||
| case 'message-start': { | ||
| return decodeMessageStart(input, turnId, lifecycle); | ||
| } | ||
| case 'message-delta': { | ||
| return decodeMessageDelta(input); | ||
| } | ||
| case 'message-stop': { | ||
| return decodeMessageStop(input); | ||
| } | ||
| case 'assistant-message': { | ||
| return decodeAssistantMessage(input); | ||
| } | ||
| case 'user-message': { | ||
| return decodeUserMessage(input); | ||
| } |
There was a problem hiding this comment.
this is a bit confusing that user-message and assistant-message are handle at the top and in this switch statement
| switch (eventType) { | ||
| case 'message_start': { | ||
| // CAST: message_start carries .message; cast through unknown to Record. | ||
| const message = (streamEvent as unknown as Record<string, unknown>).message as Record<string, unknown>; |
There was a problem hiding this comment.
I think we should cast to an object type that has the fields that we are expecting to use on it. e.g. message.id and message.model, etc
| content[index] = { | ||
| type: 'thinking', | ||
| thinking: '', | ||
| signature: '', |
There was a problem hiding this comment.
I think we need to include the signature here, or the api will reject future inference with thinking blocks in the history
There was a problem hiding this comment.
had to do a bit extra work on this one actually, but I think it's fully addressed now. Here's what's been done:
- Added signature_delta handler to the accumulator that appends to block.signature on thinking blocks
- Added signature_delta handler to the encoder that buffers signature data instead of streaming it (streaming would mix it with thinking text on the wire)
- Encoder includes the buffered signature as an x-domain-signature header on closeStream
- Decoder reads the signature header and emits a synthetic signature_delta event before content_block_stop
- Signatures now survive the full encode/Ably/decode roundtrip, required for multi-turn API continuity with thinking blocks
| // Other delta types (e.g. citations_delta): no-op | ||
| default: { | ||
| break; | ||
| } |
There was a problem hiding this comment.
handle signature_delta here
There was a problem hiding this comment.
addressed - comment above
Implement AgentCodec — a Codec<AgentCodecEvent, AgentMessage> that maps Anthropic Agent SDK streaming events and messages to/from Ably message primitives. Components: - Encoder: converts SDKPartialAssistantMessage stream events into Ably publish/append operations, tracking open content blocks by index - Decoder: reconstructs SDKPartialAssistantMessage events from Ably messages, with lifecycle tracker for mid-stream join handling - Accumulator: builds SDKAssistantMessage state from decoder outputs, tracking concurrent in-progress messages with tool input buffering - AgentCodec: wires the three factories together with isTerminal (SDKResultMessage) and getMessageKey helpers - Transport factories: createClientTransport and createServerTransport pre-bound to AgentCodec Adds @anthropic-ai/sdk as a peer dependency for proper type resolution of transitive types from @anthropic-ai/claude-agent-sdk. Unit tests cover all code paths including content block streaming (text, tool_use, thinking), abort handling, lifecycle tracker phases, lazy message creation for mid-stream joins, and concurrent message tracking. Integration tests validate encode/decode roundtrips over real Ably channels for 9 scenarios. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Add src/anthropic/vite.config.ts and build:anthropic script; update package.json exports to point to dist/ artifacts instead of raw .ts - Exclude anthropic/ from core dts plugin, add demo to formatter paths - Wire turn.abortSignal to Agent SDK AbortController in demo route - Consolidate per-line eslint-disable unicorn/no-null into block-level disables in accumulator, decoder lifecycle tracker, and decoder abort - Remove duplicate CAST comments in decoder - Add doc comments on updateMessage identity limitation and encoder message size consideration - Update INVESTIGATION.md issues table and conclusions Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Investigation notes have been distilled into the PR description. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
f9fd1fe to
d66d041
Compare
Coverage Report
File Coverage
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Encoder: skip redundant discrete publish of SDKAssistantMessage when the message was already streamed (tracks message IDs via message_start). Add signature_delta support so thinking block signatures are streamed. Cast message_start payload to a typed interface instead of Record. Accumulator: implement initMessage and completeMessage (required by the MessageAccumulator interface, used by the core transport for cross-turn amendments and history hydration). Add signature_delta handler so thinking blocks have valid signatures for multi-turn API continuity. Decoder: remove redundant HEADER_ROLE early guard in decodeDiscretePayload — the switch statement already dispatches by message name. Demo: update Anthropic React demo to current library API (useView instead of non-existent useHistory/useConversationTree, TransportProvider instead of ChannelProvider, MessageNode instead of ConversationNode). Fix SDK version mismatch via overrides. Configure webpack source aliases to avoid Rolldown CJS shim issues with Next.js. Tests cover streaming guard, signature_delta in encoder and accumulator, and initMessage/completeMessage lifecycle. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The already-active branch of initMessage replaced the message reference but left contentBlocks, toolInputBuffers, and streamStatus stale. If a cross-turn amendment changed the content blocks, subsequent processOutputs could misroute events against outdated tracking state. Now clears and rebuilds all three maps from the synced message, matching the Vercel accumulator's pattern. Also tightens the list lookup in the not-active branch: assistant messages now match by BetaMessage ID (stable, unique) instead of the weak uuid/session_id heuristic that could collide across messages in the same session. Tests cover tracking rebuild after sync and correct matching when multiple assistant messages share a session_id. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The encoder was streaming signature_delta data through the same Ably message stream as thinking_delta. Since the decoder maps all appends on a thinking stream to thinking_delta events, signatures were lost on the client side — appended to block.thinking instead of block.signature. Fix: the encoder now buffers signature_delta data internally and includes the accumulated signature as an x-domain-signature header on the closeStream call. The decoder reads this header and emits a synthetic signature_delta event before content_block_stop, so the accumulator correctly populates block.signature on the receiving end. This preserves the signature through the full encode → Ably → decode → accumulate roundtrip, which is required for multi-turn API continuity with thinking blocks. Tests cover buffering (not appending), concatenation of multiple signature deltas, header presence/absence on close, separation from thinking text, and decoder reconstruction from close headers. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
thanks for reviewing @zknill |
Implement
AgentCodec— aCodec<AgentCodecEvent, AgentMessage>that maps Anthropic Agent SDK streaming events and messages to/from Ably message primitives.Components:
SDKPartialAssistantMessagestream events into Ably publish/append operations, tracking open content blocks by indexSDKPartialAssistantMessageevents from Ably messages, with lifecycle tracker for mid-stream join handlingSDKAssistantMessagestate from decoder outputs, tracking concurrent in-progress messages with tool input bufferingAgentCodec: wires the three factories together withisTerminal(SDKResultMessage)createClientTransportandcreateServerTransportpre-bound toAgentCodecAdds
@anthropic-ai/sdkas a peer dependency for proper type resolution of transitive types from@anthropic-ai/claude-agent-sdk.Unit tests cover all code paths including content block streaming (text, tool_use, thinking), abort handling, lifecycle tracker phases, lazy message creation for mid-stream joins, and concurrent message tracking. Integration tests validate encode/decode roundtrips over real Ably channels for 6 scenarios.
Key Differences from Vercel Codec
UIMessageChunk-- Vercel's own streaming typeSDKMessagesubset -- wraps Anthropic API stream eventsUIMessage-- single type withrolefieldSDKAssistantMessage | SDKUserMessage-- union of two structurally different typeschunk.id,chunk.toolCallId)content_block_stophas onlyindex, no type. Encoder must track open blocks by index._abortedflagMap<number, { name, streamId }>to track open content blocks by indexincludePartialMessages: true. Extended thinking disables streaming entirely.UIMessageChunkhas ~20 variants, all conversation-relevantSDKMessagehas ~20 variants, only ~5 are conversation-relevant. Need a filtered subset type.start,start-step,finish-step,finish(Vercel-specific)message_start,message_stop,message_delta(Anthropic API events)tool-input-start/delta/availableevents withtoolCallIdcontent_block_start/delta/stopwithindex+input_json_deltatool-input-availablewithout prior start -> discrete publishAssistantMessagewhenincludePartialMessagesis falseaipackage@anthropic-ai/claude-agent-sdkpackageuseChat, React hooks from@ai-sdk/react)@ably/ai-transport/reactused directly.Known Issues & Limitations
{ type: 'start', messageId }-- flat objects). For Anthropic, synthetic events require constructing nestedBetaMessageobjects with many required fields. This is fragile -- if the Anthropic SDK adds a required field, the synthetic construction breaks at runtime (theas unknown ascast bypasses compile-time checks). Mitigated with block-leveleslint-disableforunicorn/no-nulland clear CAST comments. The fundamental friction remains: the lifecycle tracker forces codecs to construct fullTEventobjects..d.tsaccumulator.updateMessage()uuid ?? session_idas identity key.SDKUserMessage.uuidis optional andsession_idcan be empty, so the key can produce false-positive matches. Not currently called by the core transport. If it becomes load-bearing, the interface should be extended to passx-ably-msg-id.Anthropic Agent SDK broken type declarations
The
@anthropic-ai/claude-agent-sdkpackage ships asdk.d.tswith two problems:Missing transitive dependency types: It imports from
@modelcontextprotocol/sdk/types.jsbut doesn't declare it as a dependency. Without@modelcontextprotocol/sdkinstalled, these imports fail.Broken internal references:
sdk.d.tsdefines aSDKControlRequestInnerunion type that references 12+ type names that are never defined in the.d.ts(e.g.SDKControlChannelEnableRequest,SDKControlEndSessionRequest). No dependency can fix this -- the types simply don't exist.With
skipLibCheck: true, TypeScript ignores both problems. But ESLint's@typescript-eslintrules create their own TypeScript program instance, and depending on the environment (Node version, npm resolution strategy), the broken.d.tsmay cause some or all exported types to resolve aserror.What we did:
@anthropic-ai/sdkas a peer + dev dependency (fixesBetaMessage/MessageParamresolution)@modelcontextprotocol/sdkas a dev dependency (fixes MCP type resolution)SDKControl*Requesttypes on one line -- types we never useeslint-disableon the 3 test lines that CI flagsWhy it passes locally but fails in CI: Locally (macOS, Node 24), the 15 broken internal refs don't cascade. In CI (Ubuntu, Node 20/22/24,
npm ci), they may cause the module's exports to be treated as unresolvable. This is a difference in TypeScript's error recovery behavior across environments.Conclusions
How easy was it to add a new codec?
The architecture delivered on its promise. The two-layer split (generic transport + pluggable codec) meant we never touched a single line of transport code. The EncoderCore and DecoderCore handled all Ably wire protocol concerns. We wrote ~2,100 lines of codec source code and ~3,800 lines of tests, with zero changes to the existing codebase (aside from
package.jsonfor deps and build config).The custom-codec example was an effective starting template. The Vercel codec served as a comprehensive reference for every edge case. The documentation in
docs/internals/was accurate and thorough enough to build a mental model before reading any source code.Overall verdict: moderately easy for the codec itself, but significantly harder than expected due to SDK type friction.
What went well
EncoderCore and DecoderCore are excellent abstractions. The domain encoder is just a switch statement mapping events to four core operations. The domain decoder provides four hooks. All Ably-specific complexity (serial tracking, append batching, flush/recovery, prefix-match, first-contact) is handled by the cores. A codec author never needs to understand Ably message actions.
The header utilities (
headerWriter/headerReader) are clean and ergonomic. Thex-domain-prefix is handled automatically. The fluent builder pattern makes header construction readable.The lifecycle tracker is a good generic solution for the mid-stream join problem. Configuring it with phases is straightforward.
The test infrastructure is reusable. Sandbox provisioning, unique channel names, client lifecycle helpers -- all worked immediately for our codec's integration tests.
The transport factory pattern (Omit codec from options) is trivially implementable. The Anthropic transport factories are ~50 lines.
What was challenging
Transitive SDK type resolution. The Anthropic Agent SDK (
@anthropic-ai/claude-agent-sdk) depends on@anthropic-ai/sdkfor types likeBetaRawMessageStreamEventandBetaMessage. Without@anthropic-ai/sdkinstalled, TypeScript resolves these as error types, causing every property access to requireeslint-disablecomments. Adding@anthropic-ai/sdkas an explicit peer dependency fixed the cascading failures. This is not a problem with the AI Transport SDK's design, but it is a real pain point for codec authors whose framework SDK has deep type dependency chains.Complex nested SDK types require
as unknown ascasts. The Anthropic codec has ~30as unknown ascasts in source files (vs 0 in Vercel). These exist because the decoder and accumulator construct SDK types (BetaRawMessageStreamEvent,BetaMessage, content blocks) from decoded wire data. The object literals don't structurally satisfy the full union types -- for example,BetaContentBlockis a union of 14 variants, and constructing any one of them requires all fields for that variant. The root cause is using complex SDK types asTEvent/TMessage. If we defined our own simpler types (like the custom-codec example's flatAgentMessage { id, role, text, toolCalls }), there'd be zero casts -- but consumers couldn't work with familiar SDK types. The casts are at well-defined trust boundaries and validated by the switch-based type narrowing, so they're correct -- just not ideal for maintainability. Additionally, the Anthropic SDK usesnullwhere the project linter prefersundefined, requiringeslint-disableforunicorn/no-null. These are consolidated into block-level disables around shell object constructions to minimize noise.Events that don't self-identify. Vercel's
UIMessageChunktypes carry their own identity (e.g.text-deltahaschunk.id). Anthropic'scontent_block_deltaandcontent_block_stoponly carry anindex, so the encoder must track open blocks to map index to stream ID. This is a minor annoyance, not a fundamental problem, but it adds state that the Vercel encoder doesn't need.Union
TMessagetypes. The Vercel codec has a singleUIMessagetype for both roles. The Anthropic codec hasSDKAssistantMessage | SDKUserMessage-- structurally different types. This complicatesupdateMessage,writeMessages, and any code that switches on the union. It works, but it's less ergonomic.Preserving SDK metadata through the wire. Fields like
uuid,session_id,parent_tool_use_idneed to survive encode -> Ably -> decode. Each requires a domain header. The initial implementation misseduuidandsession_id, which were caught during the deep audit. The Vercel codec doesn't have this problem becauseUIMessagehas fewer metadata fields.Known limitation:
session_idon streaming events. Discrete messages carrysession_idvia thex-domain-sessionIdheader and are reconstructed correctly. But streaming events (SDKPartialAssistantMessagereconstructed by the decoder from streamed content blocks) getsession_id: ''becausesession_idis not included in the persistent headers passed tostartStream. The accumulatedSDKAssistantMessagehas the correctsession_id(from themessage_startevent data). Only the transient wrappers produced by the decoder havesession_id: '', and nothing currently reads that field from streaming events. Fix path: thread the outerSDKPartialAssistantMessagethrough_handleStreamEventsosession_idcan be added as a persistent header.Suggestions to improve the base primitives
These are improvements to the core SDK identified while building this codec. None are blockers.
Lifecycle tracker should not force codecs to construct full
TEventobjects.The lifecycle tracker synthesizes missing "startup" events for mid-stream joins. Each phase has a
build()function that must returnTEvent[].For Vercel, this is trivial:
For Anthropic,
TEventincludesSDKPartialAssistantMessage-- a wrapper around deeply nested Anthropic SDK types. Thebuild()function must construct a ~25-line object with multiple casts and block-leveleslint-disable. Theas unknown ascast bypasses compile-time safety, so if the SDK adds a required field, the synthetic construction breaks at runtime.A simpler alternative: the lifecycle tracker could just report which phases are missing and let the decoder's own hooks handle construction. The hooks already know how to build events -- they do it for every stream start. The tracker doesn't need to build events itself; it just needs to track what's been emitted and what hasn't.
The accumulator contract should specify that
messagesreturns a stable reference. TheMessageAccumulatorinterface saysmessagesreturnsTMessage[]but doesn't say whether it's the same array every time or a new copy. This matters because the transport readsaccumulator.messageson every streaming token (potentially hundreds per response) -- allocating a new array each time is wasteful. Both the Vercel accumulator and the custom-codec example return the same array (mutated in place), but nothing in the interface documents this. Our initial Anthropic accumulator accidentally created a new array on every access. A one-line JSDoc addition would prevent this.