feat: add streaming to direct tool calls#1955
feat: add streaming to direct tool calls#1955emaan-c wants to merge 1 commit intostrands-agents:mainfrom
Conversation
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
4eb85eb to
636cf86
Compare
|
/strands review |
|
|
||
| def __init__(self, agent: "Agent | BidiAgent") -> None: | ||
| """Initialize instance. | ||
| class _ToolExecutor: |
There was a problem hiding this comment.
do we need a separate Tool Executor, why not just add stream methods to toolcaller?
Also strands already has tool executor concept, so there is a name clash
| events = run_async(collect_events) | ||
| yield from events | ||
|
|
||
| async def stream_async(self, **kwargs: Any) -> AsyncIterator[TypedEvent]: |
There was a problem hiding this comment.
is this path separate from __call__? Can we make streaming default and return final event (like how we handle agent stream and call)
| return events | ||
|
|
||
| events = run_async(collect_events) | ||
| yield from events |
There was a problem hiding this comment.
Issue: stream() collects all events into a list before yielding, which defeats the purpose of streaming. Users calling stream() will not see events in real-time — they'll get all events at once after tool execution completes, making this behave identically to __call__() but with extra overhead.
Suggestion: Consider using a thread-safe queue to bridge async-to-sync streaming, or at minimum, make the docstring and PR description very explicit that stream() does NOT provide real-time streaming. Currently the docstring says "events are buffered before yielding" but the PR description positions this as enabling "real-time progress" which is misleading for the sync variant.
An alternative approach using a queue:
def stream(self, **kwargs: Any) -> Iterator[TypedEvent]:
import queue
import threading
q: queue.Queue[TypedEvent | None] = queue.Queue()
async def _produce() -> None:
try:
async for event in self.stream_async(**kwargs):
q.put(event)
finally:
q.put(None) # sentinel
thread = threading.Thread(target=lambda: run_async(_produce), daemon=True)
thread.start()
while True:
item = q.get()
if item is None:
break
yield item
thread.join()If a true sync streaming implementation is too complex for this PR, consider removing stream() entirely and only shipping stream_async(). A sync method that doesn't actually stream could confuse users (violates "the obvious path is the happy path" tenet).
| RuntimeError: If called during interrupt. | ||
| """ | ||
| if self._agent._interrupt_state.activated: | ||
| raise RuntimeError("cannot directly call tool during interrupt") |
There was a problem hiding this comment.
Issue: stream_async() doesn't check the invocation lock or record_direct_tool_call, while __call__() does. This means streaming tool calls could execute concurrently with agent invocations even when record_direct_tool_call is True (the default), creating potential race conditions with shared agent state.
Suggestion: Either add the same concurrency guard from __call__(), or document why streaming calls intentionally skip it (e.g., because they don't record to message history). If the intent is that streaming never records, make this an explicit design decision in the docstring.
| events = run_async(collect_events) | ||
| yield from events | ||
|
|
||
| async def stream_async(self, **kwargs: Any) -> AsyncIterator[TypedEvent]: |
There was a problem hiding this comment.
Issue: The return type AsyncIterator[TypedEvent] for an async def with yield should be AsyncGenerator[TypedEvent, None] or AsyncIterator[TypedEvent]. However, TypedEvent is from the private _events module and is not exported from strands or strands.types. Customers wanting to type-hint the events they receive from this public API have no stable import path.
Suggestion: Either export TypedEvent from a public module (e.g., strands.types or strands), or use a more generic type like dict[str, Any] for the public API signature until TypedEvent is promoted to a public type.
| logger.debug("tool_name=<%s>, streaming=<True> | executing tool stream", normalized_name) | ||
|
|
||
| # Create unique tool ID and set up the tool request | ||
| tool_id = f"tooluse_{self._tool_name}_{random.randint(100000000, 999999999)}" |
There was a problem hiding this comment.
Issue: The tool_use setup logic (normalize name, create tool_id, build ToolUse dict, create tool_results list) is duplicated between __call__() and stream_async(). This violates DRY and creates a maintenance burden — any future change to tool setup must be applied in both places.
Suggestion: Extract a private helper method, e.g.:
def _prepare_tool_use(self, **kwargs: Any) -> tuple[ToolUse, list[ToolResult], dict]:
normalized_name = self._find_normalized_tool_name(self._tool_name)
tool_id = f"tooluse_{self._tool_name}_{random.randint(100000000, 999999999)}"
tool_use: ToolUse = {"toolUseId": tool_id, "name": normalized_name, "input": kwargs.copy()}
return tool_use, [], kwargs|
Assessment: Request Changes This PR addresses a real gap in the developer experience (issue #1436) and the architectural approach of wrapping Review Categories
The refactoring of |
|
|
||
| # TODO: https://github.com/strands-agents/sdk-python/issues/1311 | ||
| if isinstance(self._agent, Agent): | ||
| self._agent.conversation_manager.apply_management(self._agent) |
There was a problem hiding this comment.
Issue: The __call__ method applies conversation_manager.apply_management() after execution (line 134), but stream_async() does not. While this may be intentional since streaming doesn't record to history, if a user mixes streaming and non-streaming calls, the conversation management behavior could become inconsistent.
Suggestion: Document this behavioral difference explicitly, or keep the two paths symmetric.
Description
Direct tool calls (
agent.tool.tool_name()) currently block without providing streaming events, while agent-level calls expose rich streaming from the same underlying infrastructure. This creates inconsistent developer experience and prevents building responsive UIs for long-running operations, multi-agent systems, and debugging workflows.This adds
stream()andstream_async()methods to tool calls, enabling real-time observability without recording to message history.Resolves: #1436
Public API Changes
Tool calls now support three execution modes:
Streaming methods yield the same events as
ToolExecutor._stream()without recording to message history. 100% backward compatible.Use Cases
Related Issues
#1436
Documentation PR
No documentation PR needed.
Type of Change
New feature
Testing
hatch run prepareChecklist
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.