Skip to content

Commit 495e2f7

Browse files
rohoswaggerclaude
andcommitted
fix(craft): use JSON-RPC response as sole turn completion signal
Stop breaking out of the send_message loop on `prompt_response` session/update notifications. Per the ACP spec (and Zed's impl), turn completion is determined by the JSON-RPC response to session/prompt — not by any notification. Breaking early on the notification left the actual JSON-RPC response orphaned in the queue, which caused stale message buildup and premature termination of follow-up prompts. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 189cc5b commit 495e2f7

File tree

1 file changed

+5
-78
lines changed

1 file changed

+5
-78
lines changed

backend/onyx/server/features/build/sandbox/kubernetes/internal/acp_exec_client.py

Lines changed: 5 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -630,37 +630,10 @@ def send_message(
630630
"prompt": prompt_content,
631631
}
632632

633-
# Drain stale messages left over from previous prompts.
634-
# These are typically usage_update or session_info_update notifications
635-
# that arrived after the previous prompt completed. Without draining,
636-
# a stale usage_update can be mistaken for the current prompt's end signal.
637-
drained = 0
638-
while not self._response_queue.empty():
639-
try:
640-
stale = self._response_queue.get_nowait()
641-
drained += 1
642-
update_type = (
643-
stale.get("params", {}).get("update", {}).get("sessionUpdate", "")
644-
)
645-
logger.info(
646-
f"[ACP] Drained stale message: "
647-
f"method={stale.get('method')} "
648-
f"id={stale.get('id')} "
649-
f"update={update_type} "
650-
f"keys={list(stale.keys())}"
651-
)
652-
except Empty:
653-
break
654-
if drained:
655-
logger.info(f"[ACP] Drained {drained} stale messages before prompt")
656-
657633
request_id = self._send_request("session/prompt", params)
658634
start_time = time.time()
659635
last_event_time = time.time()
660636
events_yielded = 0
661-
content_events = (
662-
0 # Track real content events (message chunks, tool calls, etc.)
663-
)
664637
keepalive_count = 0
665638
completion_reason = "unknown"
666639

@@ -772,59 +745,10 @@ def send_message(
772745
if message_data.get("method") == "session/update":
773746
params_data = message_data.get("params", {})
774747
update = params_data.get("update", {})
775-
update_type = update.get("sessionUpdate")
776-
777-
# usage_update as implicit end-of-turn: ACP sometimes sends
778-
# usage_update as the final packet without a prompt_response.
779-
# Only treat it as completion if we've already received real
780-
# content — otherwise it's likely a stale leftover.
781-
if update_type == "usage_update":
782-
if content_events > 0:
783-
completion_reason = "usage_update_after_content"
784-
elapsed_ms = (time.time() - start_time) * 1000
785-
logger.info(
786-
f"[ACP] Prompt complete: "
787-
f"reason={completion_reason} acp_session={session_id} "
788-
f"events={events_yielded} "
789-
f"content_events={content_events} "
790-
f"elapsed={elapsed_ms:.0f}ms"
791-
)
792-
yield PromptResponse(stopReason="end_turn")
793-
break
794-
else:
795-
logger.info(
796-
"[ACP] Ignoring usage_update — no content events yet"
797-
)
798-
continue
799748

800-
prompt_complete = False
801749
for event in self._process_session_update(update):
802750
events_yielded += 1
803-
# Count real content events
804-
if isinstance(
805-
event,
806-
(
807-
AgentMessageChunk,
808-
AgentThoughtChunk,
809-
ToolCallStart,
810-
ToolCallProgress,
811-
),
812-
):
813-
content_events += 1
814751
yield event
815-
if isinstance(event, PromptResponse):
816-
prompt_complete = True
817-
break
818-
819-
if prompt_complete:
820-
completion_reason = "prompt_response_via_notification"
821-
elapsed_ms = (time.time() - start_time) * 1000
822-
logger.info(
823-
f"[ACP] Prompt complete: "
824-
f"reason={completion_reason} acp_session={session_id} "
825-
f"events={events_yielded} elapsed={elapsed_ms:.0f}ms"
826-
)
827-
break
828752

829753
# Handle requests from agent - send error response
830754
elif "method" in message_data and "id" in message_data:
@@ -852,15 +776,17 @@ def _process_session_update(
852776
"""Process a session/update notification and yield typed ACP schema objects."""
853777
update_type = update.get("sessionUpdate")
854778

855-
# Map update types to their ACP schema classes
779+
# Map update types to their ACP schema classes.
780+
# Note: prompt_response is intentionally excluded here — turn completion
781+
# is determined by the JSON-RPC response to session/prompt, not by a
782+
# session/update notification. This matches the ACP spec and Zed's impl.
856783
type_map: dict[str, type] = {
857784
"agent_message_chunk": AgentMessageChunk,
858785
"agent_thought_chunk": AgentThoughtChunk,
859786
"tool_call": ToolCallStart,
860787
"tool_call_update": ToolCallProgress,
861788
"plan": AgentPlanUpdate,
862789
"current_mode_update": CurrentModeUpdate,
863-
"prompt_response": PromptResponse,
864790
}
865791

866792
model_class = type_map.get(update_type) # type: ignore[arg-type]
@@ -874,6 +800,7 @@ def _process_session_update(
874800
"available_commands_update",
875801
"session_info_update",
876802
"usage_update",
803+
"prompt_response",
877804
):
878805
logger.debug(f"[ACP] Unknown update type: {update_type}")
879806

0 commit comments

Comments
 (0)