Submission checklist
Package (Required)
Related Issues / PRs
No response
Reproduction Steps / Example Code (Python)
import asyncio
import os
import sys
from dotenv import load_dotenv
from langchain.agents import create_agent
from langchain.tools import tool
from langchain_deepseek import ChatDeepSeek
from langchain_core.messages import HumanMessage
load_dotenv()
sys.path.insert(0, os.path.join(os.path.dirname(__file__), ".."))
model = ChatDeepSeek(
model="deepseek-v4-flash",
api_key=os.getenv("LLM_API_KEY"),
base_url=os.getenv("LLM_BASE_URL"),
streaming=True,
)
def get_weather(city: str) -> str:
"""Query weather information for a specified city."""
return f"{city}: Sunny, 26°C, humidity 50%"
weather_agent = create_agent(
model=model,
tools=[get_weather],
system_prompt="You are a weather assistant. When users ask about weather, use the tool to query and return accurate weather information. Keep answers concise.",
name="weather_agent",
)
@tool
def call_weather(query: str) -> str:
"""Delegate weather‑related queries to the weather expert sub‑agent."""
result = weather_agent.invoke({"messages": [{"role": "user", "content": query}]})
return result["messages"][-1].content
supervisor = create_agent(
model=model,
tools=[call_weather],
system_prompt=(
"You are a general assistant. If the user asks about weather, use the call_weather tool to delegate to the weather expert. "
"Answer other questions directly. Respond in Chinese."
),
name="supervisor",
)
def _indent(level: int) -> str:
return " " * (level + 1)
async def _consume_messages(messages_stream, level: int, label: str):
prefix = _indent(level)
async for message in messages_stream:
# Reasoning (thinking) – streamed
first = True
async for delta in message.reasoning:
if first:
print(f"{prefix}[{label} thinking] ", end="", flush=True)
first = False
print(delta, end="", flush=True)
if not first:
print()
first = True
# Text output – streamed
async for token in message.text:
if first:
print(f"{prefix}[{label} output] ", end="", flush=True)
first = False
print(token, end="", flush=True)
if not first:
print()
async def _consume_tool_calls(tool_calls_stream, level: int):
prefix = _indent(level)
async for tc in tool_calls_stream:
print(f"{prefix}[Tool] {tc.tool_name}({tc.input})")
async def _consume_subagents(subagents_stream, level: int):
async for subagent in subagents_stream:
child_level = level + 1
child_prefix = _indent(level)
agent_name = subagent.name or "subagent"
print(f"\n{child_prefix}=== {agent_name} (level {child_level}) ===")
await asyncio.gather(
_consume_messages(subagent.messages, child_level, agent_name),
_consume_tool_calls(subagent.tool_calls, child_level),
_consume_subagents(subagent.subagents, child_level), # recursive
)
async def run(query: str):
print(f">>> User: {query}")
print("-" * 60)
stream = await supervisor.astream_events(
{"messages": [HumanMessage(content=query)]},
version="v3",
)
# Consume main agent + all sub‑agents (recursively) in parallel
await asyncio.gather(
_consume_messages(stream.messages, 0, "Main Agent"),
_consume_tool_calls(stream.tool_calls, 0),
_consume_subagents(stream.subagents, 0),
)
# ---------------------------------------------------------------------------
async def main():
print("=" * 60)
await run("What's the weather like in NewYork today? After answering the question about the weather, tell a longer story")
if __name__ == "__main__":
asyncio.run(main())
Error Message and Stack Trace (if applicable)
Description
I'm observing a streaming behavior where intermediate outputs (reasoning, tool calls, sub-agent messages) are streamed token-by-token as expected, but the final response from the top-level agent is not streamed in real time. Instead, all tokens of the final answer are buffered and then emitted together (with the same timestamp) after a noticeable delay. In my actual project, the SSE stream to the frontend still delivers tokens sequentially, but they all share the same timestamp, which breaks the real‑time experience.
I've prepared a minimal reproducible example below. Could you please help me determine whether this is a bug in LangChain / DeepSeek integration, or if I'm misusing the API? Any guidance would be greatly appreciated.
System Info
System Information
OS: Windows
OS Version: 10.0.26220
Python Version: 3.14.5 (main, May 10 2026, 20:29:46) [MSC v.1944 64 bit (AMD64)]
Package Information
langchain_core: 1.4.8
langchain: 1.3.11
langsmith: 0.9.2
langchain_deepseek: 1.1.0
langchain_openai: 1.3.3
langchain_protocol: 0.0.18
langgraph_sdk: 0.4.2
Optional packages not installed
deepagents
deepagents-cli
Other Dependencies
anyio: 4.14.1
distro: 1.9.0
httpx: 0.28.1
jsonpatch: 1.33
langgraph: 1.2.6
openai: 2.44.0
orjson: 3.11.9
packaging: 26.2
pydantic: 2.13.4
pyyaml: 6.0.3
requests: 2.34.2
requests-toolbelt: 1.0.0
sniffio: 1.3.1
tenacity: 9.1.4
tiktoken: 0.13.0
typing-extensions: 4.15.0
uuid-utils: 0.16.2
websockets: 15.0.1
xxhash: 3.7.1
zstandard: 0.25.0
Submission checklist
Package (Required)
Related Issues / PRs
No response
Reproduction Steps / Example Code (Python)
Error Message and Stack Trace (if applicable)
Description
I'm observing a streaming behavior where intermediate outputs (reasoning, tool calls, sub-agent messages) are streamed token-by-token as expected, but the final response from the top-level agent is not streamed in real time. Instead, all tokens of the final answer are buffered and then emitted together (with the same timestamp) after a noticeable delay. In my actual project, the SSE stream to the frontend still delivers tokens sequentially, but they all share the same timestamp, which breaks the real‑time experience.
I've prepared a minimal reproducible example below. Could you please help me determine whether this is a bug in LangChain / DeepSeek integration, or if I'm misusing the API? Any guidance would be greatly appreciated.
System Info
System Information
Package Information
Optional packages not installed
Other Dependencies