Skip to content

Commit e003517

Browse files
swaroopvarma2359swaroopvarma1
authored andcommitted
fix(chat): repair dangling tool_use/tool_result pairs in replayed history
A crash or cancel between the assistant-row persist and the tool-result persist leaves a dangling tool_use in chat history; window truncation (CHAT_HISTORY_REPLAY_LIMIT takes the last N rows) can orphan leading tool_result rows. Both shapes make every later turn fail with provider 400s — the session is bricked. repair_dangling_tool_uses() makes replayed history provider-safe in both directions: synthetic error results for unmatched tool_use ids (first-answer-wins dedupe for duplicated answers), and orphan leading tool_result messages dropped. Applied on the /message replay path; no-op on well-formed history.
1 parent a1f975f commit e003517

3 files changed

Lines changed: 292 additions & 2 deletions

File tree

app/ai/voice/agents/breeze_buddy/chat/block_codec.py

Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
from __future__ import annotations
4040

4141
import json
42-
from typing import Any, Dict, List, cast
42+
from typing import Any, Dict, List, Optional, Set, cast
4343

4444
from pipecat.frames.frames import FunctionCallFromLLM
4545
from pipecat.processors.aggregators.llm_context import LLMContextMessage
@@ -186,6 +186,123 @@ def blocks_to_llm_context_messages(
186186
return out
187187

188188

189+
_LOST_RESULT_PAYLOAD = json.dumps(
190+
{
191+
"status": "error",
192+
"reason": (
193+
"the execution result for this call was lost; " "treat this call as failed"
194+
),
195+
}
196+
)
197+
198+
199+
def repair_dangling_tool_uses(
200+
messages: List[LLMContextMessage],
201+
exclude_ids: Optional[Set[str]] = None,
202+
) -> List[LLMContextMessage]:
203+
"""Make a replayed history provider-safe in BOTH directions.
204+
205+
1. Dangling tool_use: an assistant message carries ``tool_calls`` with
206+
no answering ``{role:"tool"}`` message anywhere later — inject a
207+
synthetic error result right after that batch's contiguous tool-run.
208+
Covers crash/cancel windows where an approval was DECIDED but its
209+
result write was lost (those rows are NOT re-claimable by
210+
resolve_dangling_approvals, which only touches PENDING rows).
211+
2. Orphan tool_result: a ``{role:"tool"}`` message whose tool_call_id
212+
has no preceding assistant ``tool_calls`` in the (windowed) list —
213+
dropped. Happens when CHAT_HISTORY_REPLAY_LIMIT cuts a batch in
214+
half at the window boundary.
215+
216+
``exclude_ids`` are tool_call ids the CALLER is about to answer itself
217+
and must stay unanswered here: the approval handler passes the claimed
218+
id plus the still-PENDING sibling ids. The ``/message`` path passes
219+
nothing (every pending row was resolved + persisted before the history
220+
load). Never exclude all decided ids — a decided-but-lost row must be
221+
repaired or the session bricks.
222+
"""
223+
exclude: Set[str] = exclude_ids or set()
224+
# Ids answered ANYWHERE in the window — a non-contiguous real answer is
225+
# already-broken history we won't make worse by adding a duplicate.
226+
answered_global: Set[str] = {
227+
cast(str, m.get("tool_call_id"))
228+
for m in messages
229+
if isinstance(m, dict) and m.get("role") == "tool" and m.get("tool_call_id")
230+
}
231+
232+
repaired: List[LLMContextMessage] = []
233+
declared_so_far: Set[str] = set()
234+
# First answer wins — a second tool_result for the same id (e.g. a
235+
# historical cancel-race double write) would make providers reject the
236+
# whole conversation on every replay, so later duplicates are dropped.
237+
answered_kept: Set[str] = set()
238+
239+
def _keep_tool(tmsg: Dict[str, Any]) -> None:
240+
tcid = tmsg.get("tool_call_id")
241+
if tcid not in declared_so_far:
242+
logger.warning(
243+
f"[block_codec] dropping orphan tool_result "
244+
f"{tcid!r} (no preceding tool_use in replay window)"
245+
)
246+
return
247+
if tcid in answered_kept:
248+
logger.warning(
249+
f"[block_codec] dropping duplicate tool_result for {tcid!r} "
250+
"(first answer wins)"
251+
)
252+
return
253+
answered_kept.add(cast(str, tcid))
254+
repaired.append(cast(LLMContextMessage, tmsg))
255+
256+
n = len(messages)
257+
i = 0
258+
while i < n:
259+
msg = cast(Dict[str, Any], messages[i])
260+
role = msg.get("role")
261+
262+
if role == "tool":
263+
_keep_tool(msg)
264+
i += 1
265+
continue
266+
267+
repaired.append(cast(LLMContextMessage, msg))
268+
tool_calls = msg.get("tool_calls") if role == "assistant" else None
269+
if tool_calls:
270+
batch_ids = [tc.get("id") for tc in tool_calls if tc.get("id")]
271+
declared_so_far.update(batch_ids)
272+
# Consume the contiguous run of answering tool messages.
273+
j = i + 1
274+
while (
275+
j < n
276+
and isinstance(messages[j], dict)
277+
and cast(Dict[str, Any], messages[j]).get("role") == "tool"
278+
):
279+
_keep_tool(cast(Dict[str, Any], messages[j]))
280+
j += 1
281+
for tcid in batch_ids:
282+
if tcid in exclude or tcid in answered_global:
283+
continue
284+
logger.warning(
285+
f"[block_codec] injecting synthetic result for dangling "
286+
f"tool_use {tcid!r}"
287+
)
288+
repaired.append(
289+
cast(
290+
LLMContextMessage,
291+
{
292+
"role": "tool",
293+
"tool_call_id": tcid,
294+
"content": _LOST_RESULT_PAYLOAD,
295+
},
296+
)
297+
)
298+
i = j
299+
continue
300+
301+
i += 1
302+
303+
return repaired
304+
305+
189306
def _assistant_row_to_openai(blocks: List[Dict[str, Any]]) -> LLMContextMessage:
190307
"""Anthropic assistant blocks → one OpenAI-shape assistant message.
191308
@@ -280,4 +397,5 @@ def _user_row_to_openai(
280397
"internal_text_block",
281398
"filter_visible_blocks",
282399
"blocks_to_llm_context_messages",
400+
"repair_dangling_tool_uses",
283401
]

app/api/routers/breeze_buddy/chat/handlers.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import asyncio
1010
import time
1111
from datetime import datetime
12-
from typing import Any, AsyncIterator, Callable, Dict, Optional
12+
from typing import Any, AsyncIterator, Callable, Dict, Optional, cast
1313

1414
from fastapi import HTTPException, status
1515
from fastapi.responses import StreamingResponse
@@ -18,6 +18,7 @@
1818
from app.ai.voice.agents.breeze_buddy.chat.block_codec import (
1919
blocks_to_llm_context_messages,
2020
filter_visible_blocks,
21+
repair_dangling_tool_uses,
2122
)
2223
from app.ai.voice.agents.breeze_buddy.chat.client_context import (
2324
ClientContextTooLarge,
@@ -491,6 +492,11 @@ async def send_chat_message_handler(
491492
if row.role in (ChatMessageRole.USER, ChatMessageRole.ASSISTANT)
492493
]
493494
)
495+
# Defensive both-direction repair: an unmatched tool_use (crash or
496+
# cancel between the assistant-row persist and the tool-result
497+
# persist) gets a synthetic error result; orphan tool_results from
498+
# window truncation are dropped. No-op on well-formed history.
499+
history = cast(list, repair_dangling_tool_uses(history))
494500

495501
# Load per-session agent state (cart_id, customer_id, etc. for
496502
# commerce templates). Generic — the runtime doesn't read the

tests/test_block_codec_repair.py

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
# pyrefly: ignore-errors
2+
# Same TypedDict-union narrowing limitation as test_block_codec_visibility.py.
3+
"""repair_dangling_tool_uses — both-direction provider-safety repair.
4+
5+
The replay invariant: every assistant tool_calls id must be answered by a
6+
{role:"tool"} message, and every tool message must answer a preceding
7+
tool_calls. The repair covers the two failure directions:
8+
9+
- decided-but-lost rows (crash/cancel between approval claim and result
10+
persist) → synthetic error result injected;
11+
- window-truncation orphans (CHAT_HISTORY_REPLAY_LIMIT cuts a batch in
12+
half) → leading orphan tool messages dropped.
13+
14+
``exclude_ids`` semantics are load-bearing (plan review blocker): only the
15+
ids the caller is about to answer itself may stay unanswered.
16+
"""
17+
18+
from __future__ import annotations
19+
20+
import json
21+
22+
from app.ai.voice.agents.breeze_buddy.chat.block_codec import (
23+
repair_dangling_tool_uses,
24+
)
25+
26+
27+
def _assistant(tool_ids, content=None):
28+
return {
29+
"role": "assistant",
30+
"content": content,
31+
"tool_calls": [
32+
{
33+
"id": tid,
34+
"type": "function",
35+
"function": {"name": f"fn_{tid}", "arguments": "{}"},
36+
}
37+
for tid in tool_ids
38+
],
39+
}
40+
41+
42+
def _tool(tid, content="{}"):
43+
return {"role": "tool", "tool_call_id": tid, "content": content}
44+
45+
46+
def test_valid_history_passes_through_unchanged():
47+
messages = [
48+
{"role": "user", "content": "hi"},
49+
_assistant(["t1", "t2"]),
50+
_tool("t1"),
51+
_tool("t2"),
52+
{"role": "assistant", "content": "done"},
53+
]
54+
assert repair_dangling_tool_uses(list(messages)) == messages
55+
56+
57+
def test_dangling_tool_use_gets_synthetic_result():
58+
messages = [
59+
_assistant(["t1"]),
60+
{"role": "user", "content": "hello?"},
61+
]
62+
repaired = repair_dangling_tool_uses(messages)
63+
assert repaired[1]["role"] == "tool"
64+
assert repaired[1]["tool_call_id"] == "t1"
65+
payload = json.loads(repaired[1]["content"])
66+
assert payload["status"] == "error"
67+
assert repaired[2] == {"role": "user", "content": "hello?"}
68+
69+
70+
def test_partial_batch_gets_synthetic_for_missing_sibling_only():
71+
messages = [
72+
_assistant(["t1", "t2"]),
73+
_tool("t1", '{"ok": true}'),
74+
]
75+
repaired = repair_dangling_tool_uses(messages)
76+
assert [m["tool_call_id"] for m in repaired[1:]] == ["t1", "t2"]
77+
assert json.loads(repaired[2]["content"])["status"] == "error"
78+
79+
80+
def test_exclude_ids_stay_unanswered():
81+
"""The approval handler excludes the claimed id + pending siblings —
82+
those must remain dangling for the resume turn to answer."""
83+
messages = [
84+
_assistant(["claimed", "pending_sib", "lost"]),
85+
]
86+
repaired = repair_dangling_tool_uses(
87+
messages, exclude_ids={"claimed", "pending_sib"}
88+
)
89+
answered = [m["tool_call_id"] for m in repaired if m["role"] == "tool"]
90+
assert answered == ["lost"]
91+
92+
93+
def test_orphan_leading_tool_result_dropped():
94+
"""Window truncation can cut the assistant row off the top, leaving
95+
tool messages that answer nothing — they must be dropped."""
96+
messages = [
97+
_tool("from_truncated_batch"),
98+
{"role": "user", "content": "next question"},
99+
_assistant(["t9"]),
100+
_tool("t9"),
101+
]
102+
repaired = repair_dangling_tool_uses(messages)
103+
assert repaired[0] == {"role": "user", "content": "next question"}
104+
assert [m.get("tool_call_id") for m in repaired if m["role"] == "tool"] == ["t9"]
105+
106+
107+
def test_already_answered_id_not_duplicated():
108+
"""A non-contiguous real answer is already-broken history; the repair
109+
must not add a duplicate answer for that id."""
110+
messages = [
111+
_assistant(["t1"]),
112+
{"role": "user", "content": "wedged"},
113+
_tool("t1"),
114+
]
115+
repaired = repair_dangling_tool_uses(messages)
116+
answers = [m for m in repaired if m["role"] == "tool"]
117+
assert len(answers) == 1
118+
119+
120+
def test_duplicate_tool_results_deduped_first_wins():
121+
"""A historical cancel-race double write must self-heal on replay —
122+
providers reject two answers for one tool_use id."""
123+
messages = [
124+
_assistant(["t1"]),
125+
_tool("t1", '{"real": true}'),
126+
_tool("t1", '{"status": "error"}'),
127+
]
128+
repaired = repair_dangling_tool_uses(messages)
129+
answers = [m for m in repaired if m["role"] == "tool"]
130+
assert len(answers) == 1
131+
assert answers[0]["content"] == '{"real": true}'
132+
133+
134+
def test_consecutive_assistant_batches():
135+
"""Back-to-back assistant tool_calls messages: the synthetic answer for
136+
the first batch must land between them, not after the second."""
137+
messages = [
138+
_assistant(["a1"]),
139+
_assistant(["b1"]),
140+
_tool("b1"),
141+
]
142+
repaired = repair_dangling_tool_uses(messages)
143+
roles_and_ids = [(m["role"], m.get("tool_call_id")) for m in repaired]
144+
assert roles_and_ids == [
145+
("assistant", None),
146+
("tool", "a1"),
147+
("assistant", None),
148+
("tool", "b1"),
149+
]
150+
151+
152+
def test_multiple_batches_repaired_independently():
153+
messages = [
154+
_assistant(["a1"]),
155+
_tool("a1"),
156+
{"role": "user", "content": "more"},
157+
_assistant(["b1", "b2"]),
158+
_tool("b1"),
159+
]
160+
repaired = repair_dangling_tool_uses(messages)
161+
b_answers = [
162+
m["tool_call_id"]
163+
for m in repaired
164+
if m["role"] == "tool" and m["tool_call_id"].startswith("b")
165+
]
166+
assert b_answers == ["b1", "b2"]

0 commit comments

Comments
 (0)