Skip to content

Commit bff4aa7

Browse files
committed
fix: prevent mid-stream retry from duplicating published tokens to event bus
Fixes #5923 LiteLLMProvider.stream() retried on transient errors and rate limits without checking whether TextDeltaEvents had already been yielded and published to the event bus. When an error fired after K chunks had streamed, the retry replayed the full response from token 1 — permanently concatenating the partial first attempt with the complete second attempt in the client UI stream. EventBus.publish() is fire-and-forget with no retract mechanism, making the corruption irreversible. With RATE_LIMIT_MAX_RETRIES=10, up to 11 concatenated partial attempts could reach the client before a terminal error. Tool-call-only streams were unaffected (tool deltas are buffered, never yielded as TextDeltaEvents). Fix: add a guard in both exception handlers — if accumulated_text is non-empty when an error fires, yield StreamErrorEvent(recoverable=True) and return instead of retrying. EventLoopNode._do_stream() commits the partial text to conversation history and does not trigger an outer retry (line 1706 condition requires accumulated_text == '' to raise ConnectionError). Clean restart without touching the already-published stream. Guard uses accumulated_text only, not tool_calls_acc — tool deltas are buffered locally and never published before stream completion, so mid-tool-stream errors remain safe to retry internally. Tests added (5 new, 74/74 passing): - test_mid_stream_error_no_duplicate_deltas_3_chunks: 3 chunks + error -> exactly 3 deltas on bus, no outer retry - test_mid_stream_error_no_duplicate_deltas_50_chunks: 50 chunks + error -> exactly 50 deltas, no outer retry - test_mid_stream_error_at_chunk_0_triggers_outer_retry: error before first chunk -> outer retry fires, exactly 2 deltas from success path - test_mid_stream_tool_only_error_inner_retry_unaffected: tool-only error -> inner retry safe, no duplication - test_mid_stream_recoverable_error_partial_text_committed: partial text committed to history, call_index == 1
1 parent 30a58b0 commit bff4aa7

File tree

2 files changed

+289
-0
lines changed

2 files changed

+289
-0
lines changed

core/framework/llm/litellm.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -988,6 +988,16 @@ async def stream(
988988

989989
except RateLimitError as e:
990990
if attempt < RATE_LIMIT_MAX_RETRIES:
991+
if accumulated_text:
992+
# Text chunks were already yielded to the caller and
993+
# published to the event bus — they cannot be recalled.
994+
# Retrying would re-stream from token 1, duplicating
995+
# content the client has already received. Yield a
996+
# recoverable error instead; EventLoopNode will commit
997+
# the partial text and skip the outer retry (because
998+
# accumulated_text is non-empty at line 1706).
999+
yield StreamErrorEvent(error=str(e), recoverable=True)
1000+
return
9911001
wait = _compute_retry_delay(attempt, exception=e)
9921002
logger.warning(
9931003
f"[stream-retry] {self.model} rate limited (429): {e!s}. "
@@ -1001,6 +1011,12 @@ async def stream(
10011011

10021012
except Exception as e:
10031013
if _is_stream_transient_error(e) and attempt < RATE_LIMIT_MAX_RETRIES:
1014+
if accumulated_text:
1015+
# Same guard as RateLimitError: text already published
1016+
# to the event bus cannot be recalled. Stop retrying
1017+
# to prevent client-visible duplication.
1018+
yield StreamErrorEvent(error=str(e), recoverable=True)
1019+
return
10041020
wait = _compute_retry_delay(attempt, exception=e)
10051021
logger.warning(
10061022
f"[stream-retry] {self.model} transient error "

core/tests/test_event_loop_node.py

Lines changed: 273 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1285,6 +1285,39 @@ def complete(self, messages, system="", **kwargs) -> LLMResponse:
12851285
return LLMResponse(content="ok", model="mock", stop_reason="stop")
12861286

12871287

1288+
class PartialStreamThenErrorLLM(LLMProvider):
1289+
"""Simulates the fixed LiteLLMProvider.stream() behavior for mid-stream errors.
1290+
1291+
Call 0: yields ``chunks`` as TextDeltaEvents then yields
1292+
StreamErrorEvent(recoverable=True) — modelling the guard in litellm.py
1293+
that stops retrying once text has been published to the event bus.
1294+
1295+
Call 1+: yields ``success_scenario`` events (or nothing if None), allowing
1296+
tests to assert that no outer retry fires when accumulated_text is non-empty.
1297+
"""
1298+
1299+
def __init__(self, chunks: list[str], success_scenario: list | None = None):
1300+
self.chunks = chunks
1301+
self.success_scenario = success_scenario or []
1302+
self._call_index = 0
1303+
1304+
async def stream(self, messages, system="", tools=None, max_tokens=4096):
1305+
idx = self._call_index
1306+
self._call_index += 1
1307+
if idx == 0:
1308+
snapshot = ""
1309+
for chunk in self.chunks:
1310+
snapshot += chunk
1311+
yield TextDeltaEvent(content=chunk, snapshot=snapshot)
1312+
yield StreamErrorEvent(error="connection reset mid-stream", recoverable=True)
1313+
else:
1314+
for event in self.success_scenario:
1315+
yield event
1316+
1317+
def complete(self, messages, system="", **kwargs) -> LLMResponse:
1318+
return LLMResponse(content="ok", model="mock", stop_reason="stop")
1319+
1320+
12881321
class TestTransientErrorRetry:
12891322
"""Test retry-with-backoff for transient LLM errors in EventLoopNode."""
12901323

@@ -1516,6 +1549,246 @@ def test_runtime_error_without_transient_keywords(self):
15161549
assert EventLoopNode._is_transient_error(RuntimeError("invalid JSON in response")) is False
15171550

15181551

1552+
# ===========================================================================
1553+
# Mid-stream retry duplication (#5923)
1554+
# ===========================================================================
1555+
1556+
1557+
class TestMidStreamRetryNoDuplication:
1558+
"""Verify that mid-stream errors with accumulated text do not duplicate
1559+
client-visible events.
1560+
1561+
The fix in litellm.py guards the retry path: when ``accumulated_text``
1562+
is non-empty at the time an exception fires, it yields
1563+
StreamErrorEvent(recoverable=True) instead of continuing the retry loop.
1564+
EventLoopNode then commits the partial text and does NOT trigger an outer
1565+
retry (because ``accumulated_text != ""`` makes the line-1706 guard false).
1566+
1567+
These tests simulate the fixed LiteLLMProvider.stream() behavior via
1568+
PartialStreamThenErrorLLM, then assert exact event-bus emission counts.
1569+
"""
1570+
1571+
@pytest.mark.asyncio
1572+
async def test_mid_stream_error_no_duplicate_deltas_3_chunks(
1573+
self, runtime, node_spec, memory
1574+
):
1575+
"""3 text chunks yielded, then recoverable error — bus gets exactly 3 deltas.
1576+
1577+
Before the fix: internal retry would re-stream from token 1, producing
1578+
6 or more LLM_TEXT_DELTA events. After the fix: exactly 3.
1579+
"""
1580+
node_spec.output_keys = []
1581+
chunks = ["Hello", " world", "!"]
1582+
llm = PartialStreamThenErrorLLM(chunks=chunks)
1583+
bus = EventBus()
1584+
delta_events: list = []
1585+
bus.subscribe(
1586+
event_types=[EventType.LLM_TEXT_DELTA],
1587+
handler=lambda e: delta_events.append(e),
1588+
)
1589+
ctx = build_ctx(runtime, node_spec, memory, llm)
1590+
node = EventLoopNode(
1591+
event_bus=bus,
1592+
config=LoopConfig(
1593+
max_iterations=5,
1594+
max_stream_retries=3,
1595+
stream_retry_backoff_base=0.01,
1596+
),
1597+
)
1598+
result = await node.execute(ctx)
1599+
assert result.success is True
1600+
# Exactly K delta events — no second wave from retry
1601+
assert len(delta_events) == 3
1602+
# Confirm content is not doubled
1603+
content = "".join(e.data["content"] for e in delta_events)
1604+
assert content == "Hello world!"
1605+
# No outer retry fired — stream() was called exactly once
1606+
assert llm._call_index == 1
1607+
1608+
@pytest.mark.asyncio
1609+
async def test_mid_stream_error_no_duplicate_deltas_50_chunks(
1610+
self, runtime, node_spec, memory
1611+
):
1612+
"""50 text chunks yielded, then recoverable error — bus gets exactly 50 deltas.
1613+
1614+
Covers the 'chunk 50' scenario from the forensic analysis where a
1615+
mid-response rate limit would previously replay the full response.
1616+
"""
1617+
node_spec.output_keys = []
1618+
chunks = [f"t{i}" for i in range(50)]
1619+
llm = PartialStreamThenErrorLLM(chunks=chunks)
1620+
bus = EventBus()
1621+
delta_events: list = []
1622+
bus.subscribe(
1623+
event_types=[EventType.LLM_TEXT_DELTA],
1624+
handler=lambda e: delta_events.append(e),
1625+
)
1626+
ctx = build_ctx(runtime, node_spec, memory, llm)
1627+
node = EventLoopNode(
1628+
event_bus=bus,
1629+
config=LoopConfig(
1630+
max_iterations=5,
1631+
max_stream_retries=3,
1632+
stream_retry_backoff_base=0.01,
1633+
),
1634+
)
1635+
result = await node.execute(ctx)
1636+
assert result.success is True
1637+
assert len(delta_events) == 50
1638+
assert llm._call_index == 1
1639+
1640+
@pytest.mark.asyncio
1641+
async def test_mid_stream_error_at_chunk_0_triggers_outer_retry(
1642+
self, runtime, node_spec, memory
1643+
):
1644+
"""Error before any text chunk — outer retry fires; bus gets success deltas only.
1645+
1646+
When accumulated_text == '' at error time, the guard in litellm.py does
1647+
NOT fire. The inner retry proceeds (or outer retry handles it). The
1648+
client sees exactly the deltas from the successful attempt — no partial
1649+
first chunk to duplicate.
1650+
"""
1651+
node_spec.output_keys = []
1652+
success_events = [
1653+
TextDeltaEvent(content="A", snapshot="A"),
1654+
TextDeltaEvent(content="B", snapshot="AB"),
1655+
FinishEvent(stop_reason="stop", input_tokens=5, output_tokens=2, model="mock"),
1656+
]
1657+
llm = ErrorThenSuccessLLM(
1658+
error=ConnectionError("connection reset before first chunk"),
1659+
fail_count=1,
1660+
success_scenario=success_events,
1661+
)
1662+
bus = EventBus()
1663+
delta_events: list = []
1664+
bus.subscribe(
1665+
event_types=[EventType.LLM_TEXT_DELTA],
1666+
handler=lambda e: delta_events.append(e),
1667+
)
1668+
ctx = build_ctx(runtime, node_spec, memory, llm)
1669+
node = EventLoopNode(
1670+
event_bus=bus,
1671+
config=LoopConfig(
1672+
max_iterations=5,
1673+
max_stream_retries=3,
1674+
stream_retry_backoff_base=0.01,
1675+
),
1676+
)
1677+
result = await node.execute(ctx)
1678+
assert result.success is True
1679+
# Outer retry fired — two stream() calls
1680+
assert llm._call_index == 2
1681+
# Exactly 2 deltas from the successful retry, zero from attempt 0
1682+
assert len(delta_events) == 2
1683+
assert delta_events[0].data["content"] == "A"
1684+
assert delta_events[1].data["content"] == "B"
1685+
1686+
@pytest.mark.asyncio
1687+
async def test_mid_stream_tool_only_error_inner_retry_unaffected(
1688+
self, runtime, node_spec, memory
1689+
):
1690+
"""Error with no text emitted (tool-call-only stream) — outer retry fires safely.
1691+
1692+
Tool call argument deltas are buffered inside LiteLLMProvider.stream()
1693+
and never published to the event bus. So an error mid-tool-stream has
1694+
accumulated_text == '' and the guard does NOT fire. The retry proceeds
1695+
(inner or outer) without any duplication risk.
1696+
1697+
Simulated here as: LLM call 0 raises ConnectionError before yielding
1698+
any TextDeltaEvent; LLM call 1 succeeds with set_output.
1699+
"""
1700+
node_spec.output_keys = ["result"]
1701+
call_index = 0
1702+
1703+
class ToolOnlyThenErrorLLM(LLMProvider):
1704+
async def stream(self, messages, system="", tools=None, max_tokens=4096):
1705+
nonlocal call_index
1706+
idx = call_index
1707+
call_index += 1
1708+
if idx == 0:
1709+
# Raise immediately — no TextDeltaEvent emitted
1710+
raise ConnectionError("network error during tool stream")
1711+
elif idx == 1:
1712+
for event in tool_call_scenario(
1713+
"set_output", {"key": "result", "value": "tool-done"}
1714+
):
1715+
yield event
1716+
else:
1717+
for event in text_scenario("done"):
1718+
yield event
1719+
1720+
def complete(self, messages, system="", **kwargs) -> LLMResponse:
1721+
return LLMResponse(content="ok", model="mock", stop_reason="stop")
1722+
1723+
llm = ToolOnlyThenErrorLLM()
1724+
bus = EventBus()
1725+
delta_events: list = []
1726+
bus.subscribe(
1727+
event_types=[EventType.LLM_TEXT_DELTA],
1728+
handler=lambda e: delta_events.append(e),
1729+
)
1730+
ctx = build_ctx(runtime, node_spec, memory, llm)
1731+
node = EventLoopNode(
1732+
event_bus=bus,
1733+
config=LoopConfig(
1734+
max_iterations=5,
1735+
max_stream_retries=3,
1736+
stream_retry_backoff_base=0.01,
1737+
),
1738+
)
1739+
result = await node.execute(ctx)
1740+
assert result.success is True
1741+
assert result.output.get("result") == "tool-done"
1742+
# Outer retry fired (call 0 failed, call 1 succeeded with set_output)
1743+
assert call_index >= 2
1744+
# Call 0 raised before yielding any text — zero deltas from the error call.
1745+
# Call 1 is a pure tool call (no TextDeltaEvent).
1746+
# Call 2 (inner loop after tool result) returns text_scenario("done") → 1 delta.
1747+
# The key property: no duplication from call 0; the count matches only
1748+
# the legitimate post-tool-call LLM turn.
1749+
assert len(delta_events) == 1
1750+
assert delta_events[0].data["content"] == "done"
1751+
1752+
@pytest.mark.asyncio
1753+
async def test_mid_stream_recoverable_error_partial_text_committed(
1754+
self, runtime, node_spec, memory
1755+
):
1756+
"""Partial text from before the error is committed as the turn output.
1757+
1758+
When the guard fires (accumulated_text != ''), EventLoopNode receives
1759+
StreamErrorEvent(recoverable=True) after the partial text. Because
1760+
accumulated_text is non-empty, the line-1706 check does not raise
1761+
ConnectionError. The partial text is committed as final_text and the
1762+
node completes successfully — no duplication, no crash.
1763+
"""
1764+
node_spec.output_keys = []
1765+
chunks = ["partial", " response"]
1766+
llm = PartialStreamThenErrorLLM(chunks=chunks)
1767+
bus = EventBus()
1768+
delta_events: list = []
1769+
bus.subscribe(
1770+
event_types=[EventType.LLM_TEXT_DELTA],
1771+
handler=lambda e: delta_events.append(e),
1772+
)
1773+
ctx = build_ctx(runtime, node_spec, memory, llm)
1774+
node = EventLoopNode(
1775+
event_bus=bus,
1776+
config=LoopConfig(
1777+
max_iterations=5,
1778+
max_stream_retries=3,
1779+
stream_retry_backoff_base=0.01,
1780+
),
1781+
)
1782+
result = await node.execute(ctx)
1783+
assert result.success is True
1784+
# Exactly 2 deltas — no second wave
1785+
assert len(delta_events) == 2
1786+
assert delta_events[0].data["content"] == "partial"
1787+
assert delta_events[1].data["content"] == " response"
1788+
# Stream called exactly once — no outer retry
1789+
assert llm._call_index == 1
1790+
1791+
15191792
# ===========================================================================
15201793
# Tool doom loop detection (ITEM 1)
15211794
# ===========================================================================

0 commit comments

Comments
 (0)