-
Notifications
You must be signed in to change notification settings - Fork 5.2k
Description
Summary
LiteLLMProvider.stream() in core/framework/llm/litellm.py:816–1015 retries on transient errors and rate limits without checking whether TextDeltaEvents were already yielded and published to the event bus. When an error fires after K chunks have streamed, the retry replays the full response from the beginning — permanently concatenating the partial first attempt with the complete second attempt in the client's UI stream.
EventBus.publish() is fire-and-forget with no retract, reset, or replay mechanism. The corruption is irreversible once the first chunk hits the bus.
Root Cause
The retry loop resets accumulators at the top of each attempt:
# litellm.py:816–826
for attempt in range(RATE_LIMIT_MAX_RETRIES + 1):
accumulated_text = "" # ← reset on every attempt
tool_calls_acc = {} # ← reset on every attemptTextDeltaEvents are yielded immediately as chunks arrive — before the stream completes:
# litellm.py:839–844
yield TextDeltaEvent( # ← published to event bus immediately, NOT buffered
content=delta.content,
snapshot=accumulated_text,
)Both exception handlers retry unconditionally with no guard on accumulated_text:
# litellm.py:989 — RateLimitError
except RateLimitError as e:
if attempt < RATE_LIMIT_MAX_RETRIES:
await asyncio.sleep(wait)
continue # ← no check: were chunks already published?
# litellm.py:1002 — transient errors
except Exception as e:
if _is_stream_transient_error(e) and attempt < RATE_LIMIT_MAX_RETRIES:
await asyncio.sleep(wait)
continue # ← same: no guardUser-Visible Effect
When a connection reset or 529 fires after 30-50 tokens have streamed:
Attempt 0 (partial, before error):
"I'll analyze your repository and check—"
Attempt 1 (full retry from beginning):
"I'll analyze your repository and check for any issues. Let me start by searching for..."
Client receives:
"I'll analyze your repository and check—I'll analyze your repository and check for any issues. Let me start by searching for..."
Users see this as the agent "repeating itself" with no error message or indication a retry occurred.
Blast Radius
| Scenario | Trigger | Client effect |
|---|---|---|
| RateLimitError at chunk 50 | Provider streams 49 chunks then raises 429 | Partial + full response concatenated |
| Transient error at chunk 1 | ConnectionReset after first character | Single character + full response |
| 10 retries, each failing after K chunks | Persistent transient failure | K-chunk partial × 11 before terminal error |
| Tool-call-only response | Exception mid-argument delta | Safe — tool deltas are buffered, never yielded as TextDeltaEvent |
Internal state is always correct. accumulated_text = event.snapshot in _do_stream() (line 1658) is an assignment that ends up holding the full text of the last successful attempt. Conversation history, tool call results, and L3 logs are unaffected. Only the client-facing event bus stream is corrupted.
RATE_LIMIT_MAX_RETRIES = 10 means up to 11 concatenated partial attempts before a terminal error — worst case on a flapping connection.
Complete Call Chain
LiteLLMProvider.stream() litellm.py:816
yield TextDeltaEvent() litellm.py:841
EventLoopNode._do_stream() event_loop_node.py:1658
_publish_text_delta() event_loop_node.py:1659
emit_client_output_delta() event_bus.py:692 ← UI stream (permanent)
OR emit_llm_text_delta() event_bus.py:571 ← observability pipeline
EventBus.publish() fire-and-forget, no retract exists
No Existing Tests Cover Mid-Stream Retry
All six tests in TestTransientErrorRetry (test_event_loop_node.py:1288–1486) use a mock LLM that raises before yielding any TextDeltaEvent — the safe, non-duplicating case. test_litellm_streaming.py is entirely @pytest.mark.skip(reason="Requires valid live API keys").
Proposed Fix
Add a guard in both exception handlers before continue. If content was already published, yield StreamErrorEvent(recoverable=True) instead of retrying — the existing outer retry loop in EventLoopNode (event_loop_node.py:661) resets accumulated_text and restarts the turn cleanly without touching the already-published stream:
# litellm.py:989 — RateLimitError handler
except RateLimitError as e:
if accumulated_text or tool_calls_acc:
yield StreamErrorEvent(error=str(e), recoverable=True)
return
if attempt < RATE_LIMIT_MAX_RETRIES:
await asyncio.sleep(wait)
continue
yield StreamErrorEvent(error=str(e), recoverable=False)
return
# litellm.py:1002 — transient error handler
except Exception as e:
if (_is_stream_transient_error(e)
and attempt < RATE_LIMIT_MAX_RETRIES
and not accumulated_text
and not tool_calls_acc):
await asyncio.sleep(wait)
continue
recoverable = _is_stream_transient_error(e)
yield StreamErrorEvent(error=str(e), recoverable=recoverable)
returnStreamErrorEvent(recoverable=True) reaches _do_stream():1678–1682, sets _stream_error, raises ConnectionError at line 1706, and the outer retry loop at event_loop_node.py:661 handles clean restart — no duplication, no new infrastructure needed.
Files Affected
core/framework/llm/litellm.py:989–1015— both exception handlers missing the guardcore/tests/test_event_loop_node.py:1288–1486— existing retry tests cover pre-yield errors only