Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 64 additions & 14 deletions ddev/src/ddev/ai/react/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,27 @@
from ddev.ai.agent.build import AgentRuntime
from ddev.ai.agent.exceptions import AgentError
from ddev.ai.agent.scope import AgentScope
from ddev.ai.agent.types import AgentResponse, StopReason, ToolResultMessage
from ddev.ai.agent.types import AgentResponse, StopReason, ToolCall, ToolResultMessage
from ddev.ai.callbacks.callbacks import Callbacks
from ddev.ai.react.types import ReActResult
from ddev.ai.tools.core.types import ToolResult

# A turn that stops on MAX_TOKENS while a tool call is pending was truncated mid-call: the
# tool_use block is incomplete and was never executed. We must still answer every tool_use
# with a tool_result, otherwise the next send() replays a dangling tool_use and the provider
# rejects the request. This synthetic result repairs the conversation and nudges the model
# toward a smaller follow-up. Each tool owns the specifics of that nudge via
# BaseTool.truncated_call_hint; this is only the shared, mechanical part of the message.
TRUNCATED_TOOL_CALL_PREFIX = (
"This tool call was NOT executed: your previous response was truncated after reaching the "
"maximum output token limit, so the tool call is incomplete. "
)
GENERIC_TRUNCATED_TOOL_CALL_HINT = "Retry with a smaller, more targeted change."

# Upper bound on back-to-back truncated turns before we give up, to avoid an unrecoverable loop
# where the model keeps emitting an oversized tool call that never fits in the output budget.
MAX_CONSECUTIVE_TRUNCATIONS = 2


class ReActProcess:
"""
Expand Down Expand Up @@ -59,7 +75,7 @@ async def compact(self, response: AgentResponse | None = None) -> tuple[int, int
await self._callbacks.fire_before_compact(self._scope)

compact_response = None
if response is None or response.stop_reason != StopReason.TOOL_USE:
if response is None or not response.tool_calls:
compact_response = await self._agent.compact()
else:
compact_response = await self._agent.compact_preserving_last_turn()
Expand All @@ -77,6 +93,31 @@ def _is_compact_needed(self, response: AgentResponse) -> bool:
return False
return True

async def _execute_tool_calls(self, tool_calls: list[ToolCall]) -> list[ToolResult]:
"""Run all tool calls in parallel, converting any raised exception into a failure result."""
raw_results = await asyncio.gather(
*[self._tool_registry.run(tc.name, tc.input) for tc in tool_calls],
return_exceptions=True,
)
return [
r if isinstance(r, ToolResult) else ToolResult(success=False, error=f"{type(r).__name__}: {r}")
for r in raw_results
]

def _truncated_tool_results(self, tool_calls: list[ToolCall]) -> list[ToolResult]:
"""Synthetic failure results for a turn truncated by the output token limit.

Each result carries the specific tool's own recovery hint (BaseTool.truncated_call_hint)
so the guidance matches what actually broke — e.g. a truncated edit_file and a truncated
create_file need different follow-ups. Falls back to a generic hint for unknown tools.
"""
results = []
for tc in tool_calls:
tool = self._tool_registry.get(tc.name)
hint = (tool.truncated_call_hint if tool else None) or GENERIC_TRUNCATED_TOOL_CALL_HINT
results.append(ToolResult(success=False, error=TRUNCATED_TOOL_CALL_PREFIX + hint))
return results

async def start(self, prompt: str, allowed_tools: list[str] | None = None) -> ReActResult:
"""
Run the ReAct loop for a single task.
Expand Down Expand Up @@ -104,18 +145,24 @@ async def start(self, prompt: str, allowed_tools: list[str] | None = None) -> Re
await self._callbacks.fire_agent_response(self._scope, response, iterations)

# No iteration cap — this is an interactive CLI tool; the user can Ctrl+C to stop.
while response.stop_reason == StopReason.TOOL_USE:
if not response.tool_calls:
raise AgentError("Agent returned stop_reason=TOOL_USE with no tool calls")

raw_results = await asyncio.gather(
*[self._tool_registry.run(tc.name, tc.input) for tc in response.tool_calls],
return_exceptions=True,
)
tool_results: list[ToolResult] = [
r if isinstance(r, ToolResult) else ToolResult(success=False, error=f"{type(r).__name__}: {r}")
for r in raw_results
]
# Loop while a tool call is pending. A MAX_TOKENS turn can also carry a (truncated)
# tool_use that must be answered, so we key off tool_calls rather than the stop reason.
consecutive_truncations = 0
while response.tool_calls:
truncated = response.stop_reason == StopReason.MAX_TOKENS
Comment on lines +151 to +152

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve pending max-token tool calls before compacting

When a follow-up response contains tool_calls with StopReason.MAX_TOKENS, this new loop keeps it pending, but the auto-compaction block below can still run before the synthetic tool_results are sent. If that response is over the compaction threshold, compact(response) treats it as a normal non-TOOL_USE turn and sends a summary request after an unresolved tool_use, which recreates the provider 400 this change is trying to avoid; this occurs on truncated retries or any tool-result turn that truncates while requesting another tool in a high-context conversation.

Useful? React with 👍 / 👎.

if truncated:
consecutive_truncations += 1
if consecutive_truncations > MAX_CONSECUTIVE_TRUNCATIONS:
raise AgentError(
"Agent response was truncated by the output token limit on "
f"{consecutive_truncations} consecutive turns while a tool call was "
"pending; aborting to avoid an unrecoverable loop. Reduce the amount "
"of work attempted in a single tool call."
)
tool_results: list[ToolResult] = self._truncated_tool_results(response.tool_calls)
else:
consecutive_truncations = 0
tool_results = await self._execute_tool_calls(response.tool_calls)
total_input += sum(result.total_input_tokens for result in tool_results)
total_output += sum(result.total_output_tokens for result in tool_results)

Expand All @@ -140,6 +187,9 @@ async def start(self, prompt: str, allowed_tools: list[str] | None = None) -> Re
total_input += compact_in
total_output += compact_out

if response.stop_reason == StopReason.TOOL_USE:
raise AgentError("Agent returned stop_reason=TOOL_USE with no tool calls")

react_result = ReActResult(
final_response=response,
iterations=iterations,
Expand Down
6 changes: 6 additions & 0 deletions ddev/src/ddev/ai/tools/core/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ def name(self) -> str:
def description(self) -> str:
return inspect.cleandoc(self.__class__.__doc__) if self.__class__.__doc__ else ""

@property
def truncated_call_hint(self) -> str | None:
"""Tool-specific guidance appended to the synthetic failure result when a call to
this tool is truncated by the output token limit. None falls back to a generic hint."""
return None

@property
def input_schema(self) -> dict[str, object]:
return _get_input_type(type(self)).to_input_schema()
Expand Down
2 changes: 2 additions & 0 deletions ddev/src/ddev/ai/tools/core/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,7 @@ def name(self) -> str: ...
@property
def description(self) -> str: ...
@property
def truncated_call_hint(self) -> str | None: ...
@property
def definition(self) -> ToolParam: ...
async def run(self, raw: dict[str, object]) -> ToolResult: ...
4 changes: 4 additions & 0 deletions ddev/src/ddev/ai/tools/fs/append_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ class AppendFileTool(FileRegistryTool[AppendFileInput]):
def name(self) -> str:
return "append_file"

@property
def truncated_call_hint(self) -> str:
return "Append a smaller chunk instead — split the remaining content across multiple append_file calls."

async def __call__(self, tool_input: AppendFileInput) -> ToolResult:
try:
path = self._assert_writable(tool_input.path)
Expand Down
7 changes: 7 additions & 0 deletions ddev/src/ddev/ai/tools/fs/create_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ class CreateFileTool(FileRegistryTool[CreateFileInput]):
def name(self) -> str:
return "create_file"

@property
def truncated_call_hint(self) -> str:
return (
"Write a smaller initial chunk of the file now, then use append_file to add the "
"remaining content across one or more follow-up calls."
)

async def __call__(self, tool_input: CreateFileInput) -> ToolResult:
try:
path = self._assert_writable(tool_input.path)
Expand Down
15 changes: 13 additions & 2 deletions ddev/src/ddev/ai/tools/fs/edit_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ class EditFileInput(BaseToolInput):
Field(
description=(
"Exact non-empty text to replace. Must appear exactly once in the file "
"(hint: include surrounding context if needed)."
"(hint: include surrounding context if needed). Keep this to the smallest "
"unique region that needs changing — do not paste the whole file. To rewrite "
"a file extensively, apply several small edits rather than one huge one."
),
min_length=1,
),
Expand All @@ -30,12 +32,21 @@ class EditFileInput(BaseToolInput):
class EditFileTool(FileRegistryTool[EditFileInput]):
"""Edits a file by replacing an exact string with a new one.
Fails if the file was modified since the last read.
old_string must appear exactly once in the file — if it appears multiple times, the call fails."""
old_string must appear exactly once in the file — if it appears multiple times, the call fails.
Prefer small, targeted edits over large ones: a single edit that spans most of the file can
exceed the response token limit and be truncated. Break big rewrites into several edits."""

@property
def name(self) -> str:
return "edit_file"

@property
def truncated_call_hint(self) -> str:
return (
"Edit a single small unique region instead of rewriting a whole file. For a full "
"rewrite, use create_file with a smaller initial chunk, then append_file for the rest."
)

async def __call__(self, tool_input: EditFileInput) -> ToolResult:
try:
path = self._assert_writable(tool_input.path)
Expand Down
4 changes: 4 additions & 0 deletions ddev/src/ddev/ai/tools/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ async def run(self, name: str, raw: dict[str, object]) -> ToolResult:
return ToolResult(success=False, error=f"Unknown tool: {name!r}")
return await tool.run(raw)

def get(self, name: str) -> ToolProtocol | None:
"""Look up a registered tool instance by name, or None if not registered."""
return self._tools.get(name)


def filter_read_only(tool_names: list[str]) -> list[str]:
"""Return only the read-only names. Unknown names raise."""
Expand Down
61 changes: 23 additions & 38 deletions ddev/tests/ai/agent/test_anthropic_client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
# (C) Datadog, Inc. 2026-present
# All rights reserved
# Licensed under a 3-clause BSD style license (see LICENSE)
from __future__ import annotations

from types import SimpleNamespace
from typing import TYPE_CHECKING
from unittest.mock import AsyncMock, MagicMock

import anthropic
Expand All @@ -21,6 +23,9 @@
from ddev.ai.tools.core.types import ToolResult
from ddev.ai.tools.registry import NATIVE_TOOL_NAMES, ToolRegistry

if TYPE_CHECKING:
from tests.ai.conftest import FakeToolFactory

# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -189,28 +194,8 @@ async def test_max_tokens_is_not_an_error() -> None:
# ---------------------------------------------------------------------------


class FakeTool:
def __init__(self, name: str) -> None:
self._name = name

@property
def name(self) -> str:
return self._name

@property
def description(self) -> str:
return ""

@property
def definition(self) -> dict:
return {"name": self._name, "description": "", "input_schema": {}}

async def run(self, raw: dict) -> ToolResult:
pass


async def test_allowed_tools_filters_to_subset() -> None:
registry = ToolRegistry([FakeTool(n) for n in ["read_file", "grep", "mkdir"]])
async def test_allowed_tools_filters_to_subset(fake_tool: FakeToolFactory) -> None:
registry = ToolRegistry([fake_tool(n) for n in ["read_file", "grep", "mkdir"]])
resp = make_response("end_turn", [make_text_block("ok")])
agent, create_mock = make_agent(tools=registry, mock_response=resp)

Expand All @@ -220,8 +205,8 @@ async def test_allowed_tools_filters_to_subset() -> None:
assert sent_names == ["read_file"]


async def test_allowed_tools_none_passes_all() -> None:
registry = ToolRegistry([FakeTool(n) for n in ["a", "b"]])
async def test_allowed_tools_none_passes_all(fake_tool: FakeToolFactory) -> None:
registry = ToolRegistry([fake_tool(n) for n in ["a", "b"]])
resp = make_response("end_turn", [make_text_block("ok")])
agent, create_mock = make_agent(tools=registry, mock_response=resp)

Expand Down Expand Up @@ -477,8 +462,8 @@ async def test_web_fetch_injected_with_citations_enabled() -> None:
assert web_fetch["max_uses"] == MAX_CONTINUATIONS - 1


async def test_both_native_tools_injected_together() -> None:
registry = ToolRegistry([FakeTool("read_file")], native_tool_names=["web_search", "web_fetch"])
async def test_both_native_tools_injected_together(fake_tool: FakeToolFactory) -> None:
registry = ToolRegistry([fake_tool("read_file")], native_tool_names=["web_search", "web_fetch"])
resp = make_response("end_turn", [make_text_block("ok")])
agent, create_mock = make_agent(tools=registry, mock_response=resp)

Expand All @@ -504,8 +489,8 @@ async def test_create_until_complete_returns_completion_result() -> None:
assert result.all_responses == [final]


async def test_native_tool_appended_after_client_tools() -> None:
registry = ToolRegistry([FakeTool("read_file")], native_tool_names=["web_search"])
async def test_native_tool_appended_after_client_tools(fake_tool: FakeToolFactory) -> None:
registry = ToolRegistry([fake_tool("read_file")], native_tool_names=["web_search"])
resp = make_response("end_turn", [make_text_block("ok")])
agent, create_mock = make_agent(tools=registry, mock_response=resp)

Expand All @@ -519,8 +504,8 @@ async def test_native_tool_appended_after_client_tools() -> None:
assert sent_tools[-1]["cache_control"] == {"type": "ephemeral", "ttl": "1h"}


async def test_allowed_tools_gates_native_tool() -> None:
registry = ToolRegistry([FakeTool("read_file")], native_tool_names=["web_search"])
async def test_allowed_tools_gates_native_tool(fake_tool: FakeToolFactory) -> None:
registry = ToolRegistry([fake_tool("read_file")], native_tool_names=["web_search"])
resp = make_response("end_turn", [make_text_block("ok")])
agent, create_mock = make_agent(tools=registry, mock_response=resp)

Expand All @@ -531,8 +516,8 @@ async def test_allowed_tools_gates_native_tool() -> None:
assert "read_file" in sent_names


async def test_allowed_tools_none_passes_all_including_native() -> None:
registry = ToolRegistry([FakeTool("read_file")], native_tool_names=["web_search"])
async def test_allowed_tools_none_passes_all_including_native(fake_tool: FakeToolFactory) -> None:
registry = ToolRegistry([fake_tool("read_file")], native_tool_names=["web_search"])
resp = make_response("end_turn", [make_text_block("ok")])
agent, create_mock = make_agent(tools=registry, mock_response=resp)

Expand All @@ -543,8 +528,8 @@ async def test_allowed_tools_none_passes_all_including_native() -> None:
assert "web_search" in sent_names


async def test_no_native_tools_request_unchanged() -> None:
registry = ToolRegistry([FakeTool("read_file")])
async def test_no_native_tools_request_unchanged(fake_tool: FakeToolFactory) -> None:
registry = ToolRegistry([fake_tool("read_file")])
resp = make_response("end_turn", [make_text_block("ok")])
agent, create_mock = make_agent(tools=registry, mock_response=resp)

Expand Down Expand Up @@ -974,8 +959,8 @@ async def test_system_prompt_sent_as_block_with_static_cache_control() -> None:
[["only"], ["a", "b"], ["a", "b", "c", "d"]],
ids=["single_tool", "two_tools", "four_tools"],
)
async def test_only_last_tool_carries_static_cache_control(tool_names: list[str]) -> None:
registry = ToolRegistry([FakeTool(n) for n in tool_names])
async def test_only_last_tool_carries_static_cache_control(tool_names: list[str], fake_tool: FakeToolFactory) -> None:
registry = ToolRegistry([fake_tool(n) for n in tool_names])
resp = make_response("end_turn", [make_text_block("ok")])
agent, create_mock = make_agent(tools=registry, mock_response=resp)

Expand All @@ -986,8 +971,8 @@ async def test_only_last_tool_carries_static_cache_control(tool_names: list[str]
assert sent_tools[-1]["cache_control"] == {"type": "ephemeral", "ttl": "1h"}


async def test_allowed_tools_subset_places_cache_control_on_last_of_subset() -> None:
registry = ToolRegistry([FakeTool(n) for n in ["a", "b", "c"]])
async def test_allowed_tools_subset_places_cache_control_on_last_of_subset(fake_tool: FakeToolFactory) -> None:
registry = ToolRegistry([fake_tool(n) for n in ["a", "b", "c"]])
resp = make_response("end_turn", [make_text_block("ok")])
agent, create_mock = make_agent(tools=registry, mock_response=resp)

Expand Down
Loading
Loading