Skip to content

Commit 8a0cf5e

Browse files
Merge remote-tracking branch 'origin/feature/queen-worker-comm' into feature/queen-worker-comm
2 parents 0bfbf1e + 69218d5 commit 8a0cf5e

File tree

9 files changed

+872
-79
lines changed

9 files changed

+872
-79
lines changed

core/framework/graph/conversation.py

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,17 +107,38 @@ def _extract_spillover_filename(content: str) -> str | None:
107107
def _compact_tool_calls(tool_calls: list[dict[str, Any]]) -> list[dict[str, Any]]:
108108
"""Truncate tool_call arguments to save context tokens during compaction.
109109
110-
Preserves ``id``, ``type``, and ``function.name`` exactly. Truncates
111-
``function.arguments`` (a JSON string) to at most ``_TC_ARG_LIMIT`` chars
112-
so that large payloads (e.g. set_output with full findings) don't survive
113-
compaction and defeat the purpose of context reduction.
110+
Preserves ``id``, ``type``, and ``function.name`` exactly. When arguments
111+
exceed ``_TC_ARG_LIMIT``, replaces the full JSON string with a compact
112+
**valid** JSON summary. The Anthropic API parses tool_call arguments and
113+
rejects requests with malformed JSON (e.g. unterminated strings), so we
114+
must never produce broken JSON here.
114115
"""
115116
compact = []
116117
for tc in tool_calls:
117118
func = tc.get("function", {})
118119
args = func.get("arguments", "")
119120
if len(args) > _TC_ARG_LIMIT:
120-
args = args[:_TC_ARG_LIMIT] + "…[truncated]"
121+
# Build a valid JSON summary instead of slicing mid-string.
122+
# Try to extract top-level keys for a meaningful preview.
123+
try:
124+
parsed = json.loads(args)
125+
if isinstance(parsed, dict):
126+
# Preserve key names, truncate values
127+
summary_parts = []
128+
for k, v in parsed.items():
129+
v_str = str(v)
130+
if len(v_str) > 60:
131+
v_str = v_str[:60] + "..."
132+
summary_parts.append(f"{k}={v_str}")
133+
summary = ", ".join(summary_parts)
134+
if len(summary) > _TC_ARG_LIMIT:
135+
summary = summary[:_TC_ARG_LIMIT] + "..."
136+
args = json.dumps({"_compacted": summary})
137+
else:
138+
args = json.dumps({"_compacted": str(parsed)[:_TC_ARG_LIMIT]})
139+
except (json.JSONDecodeError, TypeError):
140+
# Args were already invalid JSON — wrap the preview safely
141+
args = json.dumps({"_compacted": args[:_TC_ARG_LIMIT]})
121142
compact.append(
122143
{
123144
"id": tc.get("id", ""),

core/framework/graph/edge.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -338,6 +338,10 @@ class AsyncEntryPointSpec(BaseModel):
338338
max_concurrent: int = Field(
339339
default=10, description="Maximum concurrent executions for this entry point"
340340
)
341+
max_resurrections: int = Field(
342+
default=3,
343+
description="Auto-restart on non-fatal failure (0 to disable)",
344+
)
341345

342346
model_config = {"extra": "allow"}
343347

core/framework/graph/event_loop_node.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -511,6 +511,7 @@ async def execute(self, ctx: NodeContext) -> NodeResult:
511511
# 5. Stall / doom loop detection state (restored from cursor if resuming)
512512
recent_responses: list[str] = _restored_recent_responses
513513
recent_tool_fingerprints: list[list[tuple[str, str]]] = _restored_tool_fingerprints
514+
_consecutive_empty_turns: int = 0
514515

515516
# 6. Main loop
516517
for iteration in range(start_iteration, self._config.max_iterations):
@@ -649,6 +650,22 @@ async def execute(self, ctx: NodeContext) -> NodeResult:
649650
error=str(e)[:500],
650651
execution_id=execution_id,
651652
)
653+
654+
# For malformed tool call errors, inject feedback into
655+
# the conversation before retrying. Retrying with the
656+
# same messages is futile — the LLM will reproduce the
657+
# same truncated JSON. The nudge tells it to shorten
658+
# its arguments.
659+
error_str = str(e).lower()
660+
if "failed to parse tool call" in error_str:
661+
await conversation.add_user_message(
662+
"[System: Your previous tool call had malformed "
663+
"JSON arguments (likely truncated). Keep your "
664+
"tool call arguments shorter and simpler. Do NOT "
665+
"repeat the same long argument — summarize or "
666+
"split into multiple calls.]"
667+
)
668+
652669
await asyncio.sleep(delay)
653670
continue # retry same iteration
654671

@@ -774,6 +791,57 @@ async def execute(self, ctx: NodeContext) -> NodeResult:
774791
latency_ms=latency_ms,
775792
conversation=conversation if _is_continuous else None,
776793
)
794+
else:
795+
# Ghost empty stream: LLM returned nothing and outputs
796+
# are still missing. The conversation hasn't changed, so
797+
# repeating the same call will produce the same empty
798+
# result. Inject a nudge to break the cycle.
799+
_consecutive_empty_turns += 1
800+
logger.warning(
801+
"[%s] iter=%d: empty response with missing outputs %s (consecutive=%d)",
802+
node_id,
803+
iteration,
804+
missing,
805+
_consecutive_empty_turns,
806+
)
807+
if _consecutive_empty_turns >= self._config.stall_detection_threshold:
808+
# Persistent ghost stream — fail the node.
809+
error_msg = (
810+
f"Ghost empty stream: {_consecutive_empty_turns} "
811+
f"consecutive empty responses with missing "
812+
f"outputs {missing}"
813+
)
814+
latency_ms = int((time.time() - start_time) * 1000)
815+
if ctx.runtime_logger:
816+
ctx.runtime_logger.log_node_complete(
817+
node_id=node_id,
818+
node_name=ctx.node_spec.name,
819+
node_type="event_loop",
820+
success=False,
821+
error=error_msg,
822+
total_steps=iteration + 1,
823+
tokens_used=total_input_tokens + total_output_tokens,
824+
input_tokens=total_input_tokens,
825+
output_tokens=total_output_tokens,
826+
latency_ms=latency_ms,
827+
exit_status="ghost_stream",
828+
accept_count=_accept_count,
829+
retry_count=_retry_count,
830+
escalate_count=_escalate_count,
831+
continue_count=_continue_count,
832+
)
833+
raise RuntimeError(error_msg)
834+
# First nudge — inject a system message to break the
835+
# empty-response cycle.
836+
await conversation.add_user_message(
837+
"[System: Your response was empty. You have required "
838+
f"outputs that are not yet set: {missing}. Review "
839+
"your task and call the appropriate tools to make "
840+
"progress.]"
841+
)
842+
continue
843+
else:
844+
_consecutive_empty_turns = 0
777845

778846
# 6f. Stall detection
779847
recent_responses.append(assistant_text)
@@ -2502,6 +2570,7 @@ def _is_transient_error(exc: BaseException) -> bool:
25022570
"service unavailable",
25032571
"bad gateway",
25042572
"overloaded",
2573+
"failed to parse tool call",
25052574
]
25062575
return any(kw in error_str for kw in transient_keywords)
25072576

core/framework/llm/litellm.py

Lines changed: 28 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,11 @@ def _is_stream_transient_error(exc: BaseException) -> bool:
237237
238238
Transient errors (recoverable=True): network issues, server errors, timeouts.
239239
Permanent errors (recoverable=False): auth, bad request, context window, etc.
240+
241+
NOTE: "Failed to parse tool call arguments" (malformed LLM output) is NOT
242+
transient at the stream level — retrying with the same messages produces the
243+
same malformed output. This error is handled at the EventLoopNode level
244+
where the conversation can be modified before retrying.
240245
"""
241246
try:
242247
from litellm.exceptions import (
@@ -917,30 +922,6 @@ async def stream(
917922
# and we skip the retry path — nothing was yielded in vain.)
918923
has_content = accumulated_text or tool_calls_acc
919924
if not has_content:
920-
# If the conversation ends with an assistant or tool
921-
# message, an empty stream is expected — the LLM has
922-
# nothing new to say. Don't burn retries on this;
923-
# let the caller (EventLoopNode) decide what to do.
924-
# Typical case: client_facing node where the LLM set
925-
# all outputs via set_output tool calls, and the tool
926-
# results are the last messages.
927-
last_role = next(
928-
(m["role"] for m in reversed(full_messages) if m.get("role") != "system"),
929-
None,
930-
)
931-
if last_role in ("assistant", "tool"):
932-
logger.warning(
933-
"[stream] %s returned empty stream after %s message "
934-
"(no text, no tool calls). Treating as a no-op turn. "
935-
"If this repeats, the agent may be stuck — check for "
936-
"ghost empty assistant messages in conversation history.",
937-
self.model,
938-
last_role,
939-
)
940-
for event in tail_events:
941-
yield event
942-
return
943-
944925
# finish_reason=length means the model exhausted
945926
# max_tokens before producing content. Retrying with
946927
# the same max_tokens will never help.
@@ -958,10 +939,16 @@ async def stream(
958939
yield event
959940
return
960941

961-
# Empty stream after a user message — use short fixed
962-
# retries, not the rate-limit backoff. This is likely
963-
# a deterministic conversation-structure issue, so long
964-
# exponential waits don't help.
942+
# Empty stream — always retry regardless of last message
943+
# role. Ghost empty streams after tool results are NOT
944+
# expected no-ops; they create infinite loops when the
945+
# conversation doesn't change between iterations.
946+
# After retries, return the empty result and let the
947+
# caller (EventLoopNode) decide how to handle it.
948+
last_role = next(
949+
(m["role"] for m in reversed(full_messages) if m.get("role") != "system"),
950+
None,
951+
)
965952
if attempt < EMPTY_STREAM_MAX_RETRIES:
966953
token_count, token_method = _estimate_tokens(
967954
self.model,
@@ -974,7 +961,8 @@ async def stream(
974961
attempt=attempt,
975962
)
976963
logger.warning(
977-
f"[stream-retry] {self.model} returned empty stream — "
964+
f"[stream-retry] {self.model} returned empty stream "
965+
f"after {last_role} message — "
978966
f"~{token_count} tokens ({token_method}). "
979967
f"Request dumped to: {dump_path}. "
980968
f"Retrying in {EMPTY_STREAM_RETRY_DELAY}s "
@@ -983,7 +971,17 @@ async def stream(
983971
await asyncio.sleep(EMPTY_STREAM_RETRY_DELAY)
984972
continue
985973

986-
# Success (or final attempt) — flush remaining events.
974+
# All retries exhausted — log and return the empty
975+
# result. EventLoopNode's empty response guard will
976+
# accept if all outputs are set, or handle the ghost
977+
# stream case if outputs are still missing.
978+
logger.error(
979+
f"[stream] {self.model} returned empty stream after "
980+
f"{EMPTY_STREAM_MAX_RETRIES} retries "
981+
f"(last_role={last_role}). Returning empty result."
982+
)
983+
984+
# Success (or empty after exhausted retries) — flush events.
987985
for event in tail_events:
988986
yield event
989987
return

core/framework/runner/runner.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1274,6 +1274,7 @@ def _setup_agent_runtime(
12741274
isolation_level=async_ep.isolation_level,
12751275
priority=async_ep.priority,
12761276
max_concurrent=async_ep.max_concurrent,
1277+
max_resurrections=async_ep.max_resurrections,
12771278
)
12781279
entry_points.append(ep)
12791280

core/framework/runtime/event_bus.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,9 @@ class EventType(StrEnum):
130130
WORKER_ESCALATION_TICKET = "worker_escalation_ticket"
131131
QUEEN_INTERVENTION_REQUESTED = "queen_intervention_requested"
132132

133+
# Execution resurrection (auto-restart on non-fatal failure)
134+
EXECUTION_RESURRECTED = "execution_resurrected"
135+
133136
# Worker lifecycle (session manager → frontend)
134137
WORKER_LOADED = "worker_loaded"
135138
CREDENTIALS_REQUIRED = "credentials_required"

0 commit comments

Comments
 (0)