feat(core): add content-block-centric streaming (v2)#36834
Open
Nick Hollon (nick-hollon-lc) wants to merge 29 commits intomasterfrom
Open
feat(core): add content-block-centric streaming (v2)#36834Nick Hollon (nick-hollon-lc) wants to merge 29 commits intomasterfrom
Nick Hollon (nick-hollon-lc) wants to merge 29 commits intomasterfrom
Conversation
Add `BaseChatModel.stream_v2()` / `astream_v2()` returning a `ChatModelStream` with typed projections (`.text`, `.reasoning`, `.tool_calls`, `.usage`, `.output`) plus raw protocol event iteration. Providers that only implement `_stream()` get a compat bridge that converts `AIMessageChunk`s to the content-block protocol lifecycle, preserving usage and response metadata for v1 parity. - New module `chat_model_stream.py` with `ChatModelStream`, `AsyncChatModelStream`, and push/pull projection hierarchy (`SyncProjection`, `SyncTextProjection`, `AsyncProjection`). - New module `_compat_bridge.py` that converts chunk streams to protocol events, with `response_metadata` preserved via `MessageStartData.metadata` and `MessageFinishData.metadata`. - `stream_v2` wires `on_chat_model_start` / `on_llm_end` / `on_llm_error` callbacks into the pump; `astream_v2` spawns a producer task and awaits it alongside the output so `on_llm_end` fires before `await stream` returns. - tool_use finish-reason inference runs after finalization so malformed tool-call JSON (finalized as `invalid_tool_call`) does not flip `finish_reason` to `"tool_use"`. - Add `langchain-protocol>=0.0.6` dependency (local path override retained for dev). Tests cover projection semantics, tool-call streaming (single + parallel + malformed args), async/sync event replay, callback firing, and v1 parity (text, tool calls, usage, response metadata, reasoning+text ordering, error propagation).
Add explicit `stream_v2` / `astream_v2` overrides on `RunnableBinding` that merge `self.kwargs` into the delegated call, mirroring the existing `stream` / `astream` / `invoke` overrides. Without these, calls that chained through `bind` or `bind_tools` fell through `__getattr__` (which merges `self.config` but not `self.kwargs`) and silently dropped bound tools, stop sequences, and other runtime kwargs. The returns are typed as `Any` to avoid pulling chat-model types into `langchain_core.runnables.base`; the method only makes sense when the bound runnable is a chat model, and `AttributeError` propagates unchanged if it isn't. Adds tests covering bound-kwarg forwarding for both sync and async paths plus the call-time kwarg override semantics.
Drop the local path override in `[tool.uv.sources]` now that 0.0.8 is published, and raise the lower bound to match the APIs the compat bridge relies on (notably `MessageFinishData.metadata`).
- `stream_v2` / `astream_v2` now pass the assembled `AIMessage` to `on_llm_end` via `LLMResult(generations=[[ChatGeneration(message=...)]])`, so LangSmith and other tracers see the final response on v2 calls (was previously `generations=[]`). - `astream_v2`'s producer re-raises `asyncio.CancelledError` ahead of the generic handler, so cancellation propagates normally instead of being converted into `on_llm_error` + a swallowed exception. - New `message_to_events` / `amessage_to_events` in `_compat_bridge` replay a finalized `AIMessage` as a synthetic content-block lifecycle. Intended for the langgraph-side handler that emits protocol events for non-streamed node outputs (cache hits, `model.invoke()` inside a node, checkpointed state). Turns `_extract_final_blocks` from a dangling helper into a real caller. - Document the optional `_stream_chat_model_events` / `_astream_chat_model_events` provider hooks inline at the getattr sites so integrators can discover the expected signature.
Adds a new `on_stream_event` hook on `LLMManagerMixin` / `AsyncCallbackHandler` that fires once per `MessagesData` event produced by `stream_v2` / `astream_v2`, with dispatch methods on `CallbackManagerForLLMRun` and `AsyncCallbackManagerForLLMRun`. This is v2's observer hook, analogous to `on_llm_new_token` in v1 but at event granularity rather than chunk. It fires uniformly whether the provider emits events natively via `_stream_chat_model_events` or goes through the chunk-to-event compat bridge — observers see the same event stream regardless of how the underlying model produces output. Primary consumer: langgraph's forthcoming `StreamProtocolMessagesHandler`, which can now be a one-line forwarder (lookup namespace metadata by run_id, push `(ns, "messages", (event, meta))` to the graph's output stream) instead of re-implementing the chunks-to-events state machine internally. Does not fire from v1 `stream()` / `astream()`. Purely additive — `on_chat_model_start`, `on_llm_end`, and `on_llm_error` continue to bracket a v2 call as they do a v1 call.
Replaces the list-of-futures + `_wake()` pattern with a single `asyncio.Event` shared by all waiters (the awaitable plus every async iterator cursor). Each waiter clears the event before awaiting and re-checks its own condition on wake, so stale notifications don't cause spin loops. Single-loop only — if cross-thread wake is ever required, revert to the list-of-futures pattern with `call_soon_threadsafe`. Noted in the AsyncProjection docstring. Net -9 lines; drops `import contextlib` and the per-iteration `create_future`/`append` boilerplate.
Renames the stream's and projections' "private" producer-side methods to public names, since they are the intended call surface for anyone driving the stream (the pump, langgraph's forthcoming handler, tests). Removes ~36 `noqa: SLF001` suppressions along the way. On `_ProjectionBase`: - `_push` -> `push` - `_finish` -> `complete` - `_fail` -> `fail` - adds `done` / `error` read-only properties for sidekicks (iterator) - `SyncProjection.set_request_more(cb)` replaces direct `_request_more` assignment On `ChatModelStream`: - `_bind_pump` -> `bind_pump` - `_fail` -> `fail` - adds `output_message` property (non-blocking peek) - new `dispatch(event)` method replaces the module-level `dispatch_event` helper (kept as a thin deprecated wrapper for back-compat) The genuinely internal helpers (`_record_event`, `_push_*`, `_finish` on the stream, `_drain`, `_assemble_message`) stay private — they have one caller each, inside the class. Remaining SLF001 suppressions in this file are intentional `_AsyncProjectionIterator` coupling to its projection's `_deltas` and `_event`; annotated with a comment.
The compat bridge produces InvalidToolCallBlock when tool-call JSON parse
fails, but ChatModelStream had no handler for it. The finish event was
silently ignored, the stale chunk stayed in _tool_call_chunks, and
_finish's sweep re-parsed (failed again), fell back to args={}, and
appended a valid-looking ToolCallBlock — so the protocol said "invalid"
while the assembled AIMessage said "valid with empty args". An agent layer
downstream could then dispatch the malformed call.
The finish handler now routes invalid_tool_call blocks into
_invalid_tool_calls_acc and deletes the stale chunk entry; _finish's sweep
emits InvalidToolCallBlock on JSON failure instead of an empty-args tool
call; _assemble_message passes invalid_tool_calls through to AIMessage.
Extend the v2 stream and compat bridge to handle every protocol ContentBlock variant end-to-end — server tool calls, invalid tool calls, images, audio, video, file, and non-standard blocks — not just text, reasoning, and regular tool calls. Previously these were silently dropped at the bridge's extractor, had no handler in ChatModelStream, and could not appear in .output.content. The stream now keeps an index-ordered `_blocks` snapshot as the single source of truth for .output.content, alongside the existing typed accumulators that drive the public projections. `_assemble_message` builds content from that snapshot, emitting protocol-shape `tool_call` blocks instead of the legacy `tool_use` shape, and collapses to a bare string only when the message contains exactly one text block. Bridge extractors (_extract_blocks_from_chunk, _extract_final_blocks) now pass through any protocol-shape block in msg.content, _accumulate_block and _delta_block handle server_tool_call_chunk and self-contained types, and _finalize_block promotes server_tool_call_chunk to server_tool_call (falling back to invalid_tool_call on JSON failure, symmetric with regular tool calls). The standard `invalid_tool_calls` field on AIMessage is also surfaced by the final-block extractor. Forward-looking: today's partners keep provider-native shapes in msg.content and expose protocol blocks lazily via the `.content_blocks` property, so these paths are latent until partners either populate msg.content with protocol shape or override _stream_chat_model_events. The bridge is ready.
Collapse _compat_bridge to a single path that reads msg.content_blocks and emits protocol events. The translator / best-effort / tool_call_chunks extraction all live in content_blocks already — the legacy branch, _PROTOCOL_PASS_THROUGH_TYPES, _SELF_CONTAINED_BLOCK_TYPES skeleton handling, and manual reasoning-variant sniffing were duplicating work. Side fixes picked up along the way: - No-provider chunks with both text content and tool_call_chunks silently dropped the tool call because the legacy extractor put both at index 0. content_blocks places them on distinct indices. - "server_tool_call_result" (typo) replaced with "server_tool_result" in ChatModelStream's finish dispatch and the test that exercises it — matches the protocol type that every translator actually emits. Also collapses duplicated tool_call_chunk / server_tool_call_chunk handling in chat_model_stream into shared merge/sweep helpers so the two code paths can't drift apart again (which is how the typo survived). _compat_bridge.py: 855 -> 581 lines. No public API changes.
Reduce the cast count in _compat_bridge from 9 to 2. The casts exist because langchain_core.messages.content.ContentBlock and langchain_protocol.protocol.ContentBlock are two nominally distinct TypedDict Unions that are structurally near-identical. msg.content_blocks returns the core Union; event payloads want the protocol Union; the bridge launders between them through dict[str, Any]. - Remove redundant casts (isinstance-narrowed dict; getattr Any). - Use TypedDict constructors (ServerToolCallChunkBlock, ToolCallBlock, ServerToolCallBlock) where we build fresh blocks — no cast needed for constructor output. - Introduce _to_protocol_block and _to_finalized_block helpers that each hold a single cast with a docstring explaining the seam and pointing at the cross-module refactor that would retire them. CompatBlock's docstring now explains the laundering role.
…ckHandler Adds `_V2StreamingCallbackHandler`, a marker class in `tracers/_streaming.py` that handlers can inherit to signal they consume `on_stream_event` rather than `on_llm_new_token`. Extracts the shared event-producing logic from `stream_v2` / `astream_v2` into `_iter_v2_events` / `_aiter_v2_events` helpers, which pick the native `_stream_chat_model_events` hook or fall back to `chunks_to_events` bridged from `_stream`. `BaseChatModel.invoke` / `ainvoke` now route through the v2 event generator when any attached handler inherits the marker: `_generate_with_cache` / `_agenerate_with_cache` gain a v2 branch, parallel to the existing v1 streaming branch, that drains the helper into a `ChatModelStream` and wraps the assembled `AIMessage` as a `ChatResult`. Caching, rate limiting, run lifecycle, and `llm_output` merging stay on the existing generate path — the v2 and v1 branches diverge only on which callback fires per chunk. The marker is a concrete class rather than a `runtime_checkable` `Protocol` on purpose: an empty Protocol matches every object and would misroute every call.
Under caller-driven async streaming, `AsyncChatModelStream` projections deadlocked when iterated inside an outer `async for stream in run.messages` loop: the projection's `asyncio.Event` was only set by external dispatch, but no task was driving the pump while the consumer was suspended in the inner iteration. Mirror the sync `Projection._request_more` path on the async side: - `AsyncProjection.set_arequest_more` stores an async pull callback. - `_AsyncProjectionIterator.__anext__` drains the callback in an inner loop when wired, falling back to the event wait otherwise. - `_await_impl` drives the callback too so `await stream.output` and `await stream.usage` advance the producer. - `AsyncChatModelStream.set_arequest_more` fans the callback out to every projection so langgraph's `AsyncGraphRunStream` can wire it on stream construction via a transformer `_bind_apump` hook. Pump-exhaustion-without-completion ends iteration cleanly rather than hanging — matches the pragmatic contract for graphs that exhaust mid-stream.
_assemble_message builds AIMessage content from v1 protocol blocks (tool calls typed "tool_call"). Without the output_version marker, provider request builders that gate v1->provider translation on that flag (e.g. ChatAnthropic._get_request_payload) pass the v1 blocks through unconverted and the API rejects them.
…ntric-streaming # Conflicts: # libs/langchain/uv.lock # libs/partners/huggingface/uv.lock
Earlier lock regens picked up editable langgraph paths from local dev setup, inflating langgraph to 1.1.7a2 in openai/model-profiles/langchain locks. Rebase against master's baseline via uv lock --no-sources-package for langgraph* so the only diff vs master is the langchain-protocol addition from core.
…-protocol CI was failing with 'No module named langchain_protocol' because these two lockfiles weren't regenerated when core added the langchain-protocol dep. Regenerated from master's baseline using --no-sources-package for langgraph* to avoid the local editable path pollution.
Self-contained content blocks (image, audio, video, file, non_standard, finalized tool_call) were emitting their full payload on both content-block-start and content-block-finish, doubling wire bandwidth and JSON parse cost when providers emit large inline base64 media. Emit a minimal skeleton on content-block-start — correlation fields (id, name, toolCallId) and small metadata (mime_type, url, status) are preserved; heavy fields (data, args, output, transcript, value) are stripped and carried by content-block-finish only. Required CDDL fields get minimal placeholders so start still validates.
Streams where content_blocks carries string index identifiers (e.g. OpenAI responses/v1 mode with 'lc_rs_305f30', 'lc_txt_1') collapsed all blocks to wire index 0 because _iter_protocol_blocks fell back to positional i for non-int indices. Every block appeared as a delta of block 0, and only one content-block-finish event fired at the end of the stream. Keep the raw block key (int or string) internally, allocate sequential uint wire indices per distinct block, and finish the previously-open block when a new block key appears — matching the protocol's no-interleave rule.
New `langchain_tests.utils.stream_lifecycle.assert_valid_event_stream` helper enforces the protocol contract on any event stream: - single message-start / message-finish envelope - blocks do not interleave (each block finishes before the next starts) - sequential uint wire indices from 0 - accumulated deltas match the finish payload for deltaable types Applied at three levels: - core/test_compat_bridge: provider-style emission patterns exercised directly through chunks_to_events / message_to_events (openai chat completions int indices, openai responses/v1 string identifiers, anthropic-style per-chunk int indices, inline image, invalid tool call, empty stream) - openai partner: validator applied to stream_v2 against the existing responses-api mock and to a new chat-completions stream_v2 test - anthropic partner: new mock stream of RawMessageStartEvent + RawContentBlock* events threaded through _stream via `_create` patch; covers thinking + text + tool_use lifecycle with tool-use stop_reason Enabling thinking on the anthropic test flips coerce_content_to_string off so every block carries a proper integer index — the structured path the bridge actually exercises. Default-mode (no tools / thinking / docs) coerces text to a plain string and strips per-chunk indices; the bridge handles that branch by collapsing to positional-0 and it is a known separate code path, intentionally not covered here.
When a provider emits content_blocks without an `index` field (e.g. anthropic `_stream` with coerce_content_to_string=True for pure text), the bridge's positional-0 fallback merges successive chunks into one block. This works correctly today because the only coerced-string path in the anthropic integration is mutually exclusive with any structured (indexed) emission. Document the scenario that would surface the gap and the two ways to close it (native _stream_chat_model_events hook or 'continue on anonymous key' bridge rule) so a future integration doesn't discover the edge case the hard way.
Anthropic's thinking stream emits a `signature_delta` after the
reasoning text finishes. The adapter surfaces this as a reasoning
delta carrying `extras.signature` (and no new text). Two places
were dropping those fields while assembling the accumulated block:
- `_compat_bridge._accumulate` only concatenated the `reasoning`
text, silently discarding any other keys (including `extras`) on
later deltas.
- `chat_model_stream._push_content_block_finish` rebuilt the
finalized reasoning block as `{"type": "reasoning", "reasoning": ...}`,
dropping everything the finish event carried.
Together, these stripped Claude's `extras.signature` from the
assembled `AIMessage`, and the next turn in a `create_agent` loop
failed with `messages.<n>.content.<k>.thinking.signature: Field
required`.
The bridge now merges `extras` (so earlier keys survive later
deltas) and replaces other non-text fields; `ChatModelStream`
spreads the incoming finish block before overwriting the two
fields it owns.
Covered by the new
`test_lifecycle_validator_anthropic_reasoning_preserves_signature`
case.
| handlers = run_manager.handlers if run_manager else [] | ||
| return any(isinstance(h, _StreamingCallbackHandler) for h in handlers) | ||
|
|
||
| def _should_stream_v2( |
Collaborator
There was a problem hiding this comment.
(nit) is it possible for _should_stream to be False but _should_stream_v2 to be True? (should we call _should_stream here to deduplicate logic)
|
|
||
| Async variant of `stream_v2`. See that method for the full rationale. | ||
| """ | ||
| return await self.bound.astream_v2( # type: ignore[attr-defined] |
Collaborator
There was a problem hiding this comment.
do we need a NotImplementedError on Runnable
| Returns an :class:`AsyncChatModelStream` whose projections are | ||
| async-iterable and awaitable. | ||
|
|
||
| .. warning:: |
Collaborator
There was a problem hiding this comment.
shouldn't use sphinx here
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds a content-block-centric streaming protocol to
BaseChatModel—stream_v2()/astream_v2()— alongside the existingstream()/astream(). Gives consumers explicit lifecycle events (message-start,content-block-start/delta/finish,message-finish) instead of merge-your-ownAIMessageChunks, plus typed projections (.text,.reasoning,.tool_calls,.usage,.output) on aChatModelStreamobject. Uses shared types from the publishedlangchain-protocolpackage.V1 remains unchanged; nothing is deprecated.
Motivation
V1 streaming hands consumers
Iterator[AIMessageChunk]with no lifecycle boundaries — you merge chunks yourself, parse partial tool-call JSON yourself, and deal with provider-specific content shapes (Anthropiccontent_block_start/delta/stop, OpenAI deltas, reasoning under three different keys, etc.) because the chunk is the only streaming primitive. V2 compiles all of that into a normalized lifecycle with explicit start/finish boundaries per content block, finalized tool calls with parsed args (or explicitInvalidToolCallBlockon parse failure), and a shared wire format vialangchain-protocol.The long-term target is for providers to implement
_stream_chat_model_eventsnatively. Until they do, the compat bridge in this PR translates legacyAIMessageChunkstreams into protocol events transparently — every provider participates in v2 without being rewritten.Key files
langchain_core/language_models/_compat_bridge.py— the transitional translation layer.chunks_to_events/achunks_to_eventsturnAIMessageChunkstreams into protocol events;message_to_events/amessage_to_eventsdo the same for finalizedAIMessages (cache hits, checkpoint replay,model.invoke()inside a graph node). Shrinks as providers adopt the native hook; the non-streamed replay path survives forever.langchain_core/language_models/chat_model_stream.py—ChatModelStream/AsyncChatModelStreamwith typed projections, replay-buffer-backed raw event iteration, anddispatch(event)entry point for driving the stream from outside.langchain_core/language_models/chat_models.py—stream_v2()/astream_v2()onBaseChatModel. Wires callbacks (on_chat_model_start,on_llm_endwith the assembledAIMessage,on_llm_error, and the newon_stream_event), the rate limiter, and the compat bridge. An optional provider hook_stream_chat_model_events/_astream_chat_model_eventsbypasses the bridge — documented inline at thegetattrcall sites.langchain_core/callbacks/base.pyandcallbacks/manager.py— newon_stream_eventcallback onLLMManagerMixin/AsyncCallbackHandlerplus dispatch onCallbackManagerForLLMRun. Fires once per protocol event fromstream_v2/astream_v2, additive to the existing lifecycle callbacks. See the docstring for contract details (event-granularity vs v1 chunk-granularity, transparency across native/compat providers).langchain_core/runnables/base.py—RunnableBinding.stream_v2/astream_v2overrides that merge bound kwargs into the delegated call, matching the existingstream/astreamoverrides. Without these, chains built viabind/bind_toolswould silently drop their bound kwargs on v2 calls.Example