Skip to content

fix: prevent mid-stream retry from duplicating published tokens (#5923)#6070

Open
EsraaKamel11 wants to merge 5 commits intoaden-hive:mainfrom
EsraaKamel11:fix/stream-retry-duplication-5923
Open

fix: prevent mid-stream retry from duplicating published tokens (#5923)#6070
EsraaKamel11 wants to merge 5 commits intoaden-hive:mainfrom
EsraaKamel11:fix/stream-retry-duplication-5923

Conversation

@EsraaKamel11
Copy link

Summary

Fixes #5923 — when the LiteLLM streaming layer retried after a mid-stream
RateLimitError or transient error, it re-streamed from token 1, duplicating
content that had already been yielded to callers and published to the event bus.
Since published events cannot be recalled, the retry must be abandoned when a
partial stream has already been emitted.

Root Cause

LiteLLMProvider.stream() has an internal retry loop for transient errors. Both
the RateLimitError handler and the generic transient-error handler would
unconditionally continue — restarting the entire stream from the beginning —
even when accumulated_text was non-empty (i.e., chunks had already been
yielded upstream and emitted on the event bus).

Before — both handlers did this unconditionally:

except RateLimitError as e:
    if attempt < RATE_LIMIT_MAX_RETRIES:
        wait = _compute_retry_delay(...)
        await asyncio.sleep(wait)
        continue   # re-streams from token 0, duplicating all prior output

The event bus publishes token deltas eagerly as they stream in. There is no
mechanism to retract already-published events, so retrying produced a second
copy of every token the client had already received.

Fix

In both exception handlers, check accumulated_text before retrying. If any
text has already been yielded, emit a recoverable=True StreamErrorEvent and
return immediately. EventLoopNode's existing empty-response guard at line 1706
detects the non-empty accumulated_text and suppresses the outer retry,
preserving the partial turn.

except RateLimitError as e:
    if attempt < RATE_LIMIT_MAX_RETRIES:
        if accumulated_text:
            # Text already published to event bus — cannot be recalled.
            # Yield recoverable error; EventLoopNode will commit the partial
            # text and skip the outer retry (accumulated_text non-empty).
            yield StreamErrorEvent(error=str(e), recoverable=True)
            return
        wait = _compute_retry_delay(attempt, exception=e)
        ...
        continue

except Exception as e:
    if _is_stream_transient_error(e) and attempt < RATE_LIMIT_MAX_RETRIES:
        if accumulated_text:
            yield StreamErrorEvent(error=str(e), recoverable=True)
            return
        ...
        continue

Changes

File Change
core/framework/llm/litellm.py Add accumulated_text guard to RateLimitError handler (L989) and transient Exception handler (L1012)
core/tests/test_event_loop_node.py Add PartialStreamThenErrorLLM helper and TestMidStreamRetryNoDuplication class with 5 new tests

Tests

5 new automated tests added to TestMidStreamRetryNoDuplication in core/tests/test_event_loop_node.py — all passing (74/74 total):

  • test_mid_stream_error_no_duplicate_deltas_3_chunks — 3 chunks + error -> exactly 3 deltas on bus, no outer retry
  • test_mid_stream_error_no_duplicate_deltas_50_chunks — 50 chunks + error -> exactly 50 deltas, no outer retry
  • test_mid_stream_error_at_chunk_0_triggers_outer_retry — error before first chunk -> outer retry fires correctly, no duplication
  • test_mid_stream_tool_only_error_inner_retry_unaffected — tool-only error -> inner retry safe, guard does not block
  • test_mid_stream_recoverable_error_partial_text_committed — partial text committed to history, _call_index == 1

Notes

  • Guard uses accumulated_text only, not tool_calls_acc — tool deltas are buffered locally and never published before stream completion, so mid-tool-stream errors remain safe to retry internally
  • The empty-stream retry path (L1019-1020) is correctly guarded by not has_content and is unaffected
  • EventLoopNode empty-response guard at runtime_logger.py:1706 is the cooperating mechanism that absorbs the early exit without crashing the turn

EsraaKamel11 and others added 5 commits March 9, 2026 16:29
Implements hive replay CLI command that re-executes a previous session
using cached LLM responses and tool results from L3 tool_logs.jsonl,
enabling root-cause analysis of failed runs without live LLM/tool calls.

Closes aden-hive#4669

Changes:
- schemas/replay.py: ReplayConfig, NodeReplayDiff, ReplayResult models
- runtime/replay_runtime.py: ReplayCache, ReplayInterceptor, ReplayLLMProvider
- graph/executor.py: replay_config injection into execute()
- runner/runner.py: AgentRunner.run_replay()
- runner/cli.py: hive replay command with diff table output
- tests/test_replay.py: 28 tests across 5 test classes

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Fixes aden-hive#5760

Long prompts passed via ?prompt= URL query param were silently truncated
at browser URL length limits (~2KB). encodeURIComponent expands non-ASCII
chars to 3x bytes, making the limit easy to hit. Truncated multi-byte
sequences decode to corrupted text, not empty string.

Changes:
- home.tsx: switch handleSubmit and handlePromptHint to navigate() with
  location.state instead of URL query param
- workspace.tsx: read prompt from location.state first, fall back to
  searchParams for backward compat with bookmarked/shared URLs
- workspace.tsx: pass state: null to cleanup navigate() to prevent
  state leaking on history replace
- prompt-navigation.test.ts: 15 vitest tests covering priority chain,
  large prompt integrity, and navigate call shape
Fixes aden-hive#5855

EncryptedFileStorage had five failure modes causing silent credential
loss and full store corruption in production:

1. _update_index() had no lock — concurrent saves from MCP server and
   HTTP server (separate instances, same directory) could silently drop
   credentials from the index while their .enc files remained on disk

2. json.dump() wrote directly to index.json — a SIGKILL mid-write left
   a truncated empty file, making all credentials inaccessible on restart

3. save() opened .enc with open('wb') — truncating the existing file
   before writing new ciphertext; crash mid-write permanently corrupted
   the credential (InvalidToken on every subsequent load)

4. list_all() had no error handling on json.load() — index corruption
   from bugs 1-2 raised JSONDecodeError unconditionally, breaking the
   credential list UI, agent launch, and all OAuth tool calls

5. delete() called unlink() before _update_index() — a failure between
   the two left a ghost index entry pointing to a deleted .enc file

Changes:
- utils/io.py: atomic_write now supports binary mode (conditional encoding)
- storage.py: add threading.RLock + FileLock on index.json for both
  in-process and cross-instance serialization
- storage.py: save() uses atomic_write(mode='wb') for .enc files
- storage.py: list_all() catches JSONDecodeError/OSError and recovers
  by scanning .enc files directly
- storage.py: delete() updates index first, then unlinks .enc
- storage.py: _update_index() split into public dispatcher +
  _update_index_locked() which holds both locks and writes via atomic_write
- pyproject.toml: filelock>=3.12 added as explicit direct dependency
- test_credential_store.py: 7 new tests covering concurrency,
  crash recovery, and atomicity (70/70 passing)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…L2 entries

Fixes aden-hive#5918

ensure_node_logged() had two non-contiguous critical sections — the
idempotency check ran inside with self._lock, the lock was released,
then self.log_node_complete() re-acquired it independently. In parallel
fan-out execution (asyncio.gather at executor.py:2365), a concurrent
log_node_complete() call from EventLoopNode could slip into the gap,
causing a duplicate NodeDetail entry in details.jsonl with no exception
or warning.

Downstream effect: end_run() aggregates every line in details.jsonl,
doubling total_input_tokens and total_output_tokens for affected nodes,
inflating total_nodes_executed, and triggering spurious needs_attention
flags (a node at 60k tokens trips the 100k threshold when summed twice).

Fix:
- runtime_logger.py:54: threading.Lock() -> threading.RLock() to allow
  the same thread to re-enter the lock without deadlocking
- runtime_logger.py:253-278: move self.log_node_complete() call inside
  the with self._lock block, making check-and-write atomic

log_node_complete() is intentionally left without a guard -- it is a raw
unconditional append used legitimately across multiple verdict exits in
EventLoopNode. The idempotency contract belongs in ensure_node_logged().

Tests added (3 new, 37/37 passing):
- test_ensure_node_logged_concurrent_with_log_node_complete: 10 threads
  race via threading.Barrier, assert exactly 1 entry
- test_two_concurrent_ensure_node_logged_same_node_id: monkeypatched
  append stall forces both threads through the check before either writes
- test_ensure_node_logged_asyncio_gather_no_duplicate: asyncio.gather
  integration test with sleep(0) yield point between log and ensure
…ent bus

Fixes aden-hive#5923

LiteLLMProvider.stream() retried on transient errors and rate limits
without checking whether TextDeltaEvents had already been yielded and
published to the event bus. When an error fired after K chunks had
streamed, the retry replayed the full response from token 1 — permanently
concatenating the partial first attempt with the complete second attempt
in the client UI stream. EventBus.publish() is fire-and-forget with no
retract mechanism, making the corruption irreversible.

With RATE_LIMIT_MAX_RETRIES=10, up to 11 concatenated partial attempts
could reach the client before a terminal error. Tool-call-only streams
were unaffected (tool deltas are buffered, never yielded as
TextDeltaEvents).

Fix: add a guard in both exception handlers — if accumulated_text is
non-empty when an error fires, yield StreamErrorEvent(recoverable=True)
and return instead of retrying. EventLoopNode._do_stream() commits the
partial text to conversation history and does not trigger an outer retry
(line 1706 condition requires accumulated_text == '' to raise
ConnectionError). Clean restart without touching the already-published
stream.

Guard uses accumulated_text only, not tool_calls_acc — tool deltas are
buffered locally and never published before stream completion, so
mid-tool-stream errors remain safe to retry internally.

Tests added (5 new, 74/74 passing):
- test_mid_stream_error_no_duplicate_deltas_3_chunks: 3 chunks + error
  -> exactly 3 deltas on bus, no outer retry
- test_mid_stream_error_no_duplicate_deltas_50_chunks: 50 chunks + error
  -> exactly 50 deltas, no outer retry
- test_mid_stream_error_at_chunk_0_triggers_outer_retry: error before
  first chunk -> outer retry fires, exactly 2 deltas from success path
- test_mid_stream_tool_only_error_inner_retry_unaffected: tool-only
  error -> inner retry safe, no duplication
- test_mid_stream_recoverable_error_partial_text_committed: partial text
  committed to history, call_index == 1
@EsraaKamel11 EsraaKamel11 force-pushed the fix/stream-retry-duplication-5923 branch from bff4aa7 to e3ff1be Compare March 9, 2026 14:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: LiteLLMProvider.stream() retries mid-stream without guard — clients receive duplicated partial response concatenated with full retry response

1 participant