Skip to content

Commit 189cc5b

Browse files
rohoswaggerclaude
andcommitted
fix(craft): drain stale queue messages + usage_update end-of-turn guard
Drain leftover messages from the response queue before sending a new prompt to prevent stale usage_update packets from prematurely terminating follow-up messages. Additionally, only treat usage_update as an end-of-turn signal when real content events have already been received. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 23ec386 commit 189cc5b

File tree

1 file changed

+70
-20
lines changed

1 file changed

+70
-20
lines changed

backend/onyx/server/features/build/sandbox/kubernetes/internal/acp_exec_client.py

Lines changed: 70 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,6 @@ def __init__(
145145
self._reader_thread: threading.Thread | None = None
146146
self._stop_reader = threading.Event()
147147
self._k8s_client: client.CoreV1Api | None = None
148-
self._prompt_count: int = 0 # Track how many prompts sent on this client
149148

150149
def _get_k8s_client(self) -> client.CoreV1Api:
151150
"""Get or create kubernetes client."""
@@ -305,8 +304,7 @@ def stop(self) -> None:
305304
"""Stop the exec session and clean up."""
306305
session_ids = list(self._state.sessions.keys())
307306
logger.info(
308-
f"[ACP] Stopping client: pod={self._pod_name} "
309-
f"sessions={session_ids} prompts_sent={self._prompt_count}"
307+
f"[ACP] Stopping client: pod={self._pod_name} " f"sessions={session_ids}"
310308
)
311309
self._stop_reader.set()
312310

@@ -621,13 +619,9 @@ def send_message(
621619
f"Known sessions: {list(self._state.sessions.keys())}"
622620
)
623621
packet_logger = get_packet_logger()
624-
self._prompt_count += 1
625-
prompt_num = self._prompt_count
626622

627623
logger.info(
628-
f"[ACP] Prompt #{prompt_num} start: "
629-
f"acp_session={session_id} pod={self._pod_name} "
630-
f"queue_backlog={self._response_queue.qsize()}"
624+
f"[ACP] Sending prompt: " f"acp_session={session_id} pod={self._pod_name}"
631625
)
632626

633627
prompt_content = [{"type": "text", "text": message}]
@@ -636,10 +630,37 @@ def send_message(
636630
"prompt": prompt_content,
637631
}
638632

633+
# Drain stale messages left over from previous prompts.
634+
# These are typically usage_update or session_info_update notifications
635+
# that arrived after the previous prompt completed. Without draining,
636+
# a stale usage_update can be mistaken for the current prompt's end signal.
637+
drained = 0
638+
while not self._response_queue.empty():
639+
try:
640+
stale = self._response_queue.get_nowait()
641+
drained += 1
642+
update_type = (
643+
stale.get("params", {}).get("update", {}).get("sessionUpdate", "")
644+
)
645+
logger.info(
646+
f"[ACP] Drained stale message: "
647+
f"method={stale.get('method')} "
648+
f"id={stale.get('id')} "
649+
f"update={update_type} "
650+
f"keys={list(stale.keys())}"
651+
)
652+
except Empty:
653+
break
654+
if drained:
655+
logger.info(f"[ACP] Drained {drained} stale messages before prompt")
656+
639657
request_id = self._send_request("session/prompt", params)
640658
start_time = time.time()
641659
last_event_time = time.time()
642660
events_yielded = 0
661+
content_events = (
662+
0 # Track real content events (message chunks, tool calls, etc.)
663+
)
643664
keepalive_count = 0
644665
completion_reason = "unknown"
645666

@@ -648,7 +669,7 @@ def send_message(
648669
if remaining <= 0:
649670
completion_reason = "timeout"
650671
logger.warning(
651-
f"[ACP] Prompt #{prompt_num} timeout: "
672+
f"[ACP] Prompt timeout: "
652673
f"acp_session={session_id} events={events_yielded}, "
653674
f"sending session/cancel"
654675
)
@@ -695,7 +716,7 @@ def send_message(
695716
break
696717

697718
logger.warning(
698-
f"[ACP] Reader thread dead: prompt #{prompt_num} "
719+
f"[ACP] Reader thread dead: "
699720
f"acp_session={session_id} events={events_yielded}"
700721
)
701722
break
@@ -719,7 +740,7 @@ def send_message(
719740
if "error" in message_data:
720741
error_data = message_data["error"]
721742
completion_reason = "jsonrpc_error"
722-
logger.warning(f"[ACP] Prompt #{prompt_num} error: {error_data}")
743+
logger.warning(f"[ACP] Prompt error: {error_data}")
723744
packet_logger.log_jsonrpc_response(
724745
request_id, error=error_data, context="k8s"
725746
)
@@ -741,7 +762,7 @@ def send_message(
741762

742763
elapsed_ms = (time.time() - start_time) * 1000
743764
logger.info(
744-
f"[ACP] Prompt #{prompt_num} complete: "
765+
f"[ACP] Prompt complete: "
745766
f"reason={completion_reason} acp_session={session_id} "
746767
f"events={events_yielded} elapsed={elapsed_ms:.0f}ms"
747768
)
@@ -751,10 +772,45 @@ def send_message(
751772
if message_data.get("method") == "session/update":
752773
params_data = message_data.get("params", {})
753774
update = params_data.get("update", {})
775+
update_type = update.get("sessionUpdate")
776+
777+
# usage_update as implicit end-of-turn: ACP sometimes sends
778+
# usage_update as the final packet without a prompt_response.
779+
# Only treat it as completion if we've already received real
780+
# content — otherwise it's likely a stale leftover.
781+
if update_type == "usage_update":
782+
if content_events > 0:
783+
completion_reason = "usage_update_after_content"
784+
elapsed_ms = (time.time() - start_time) * 1000
785+
logger.info(
786+
f"[ACP] Prompt complete: "
787+
f"reason={completion_reason} acp_session={session_id} "
788+
f"events={events_yielded} "
789+
f"content_events={content_events} "
790+
f"elapsed={elapsed_ms:.0f}ms"
791+
)
792+
yield PromptResponse(stopReason="end_turn")
793+
break
794+
else:
795+
logger.info(
796+
"[ACP] Ignoring usage_update — no content events yet"
797+
)
798+
continue
754799

755800
prompt_complete = False
756801
for event in self._process_session_update(update):
757802
events_yielded += 1
803+
# Count real content events
804+
if isinstance(
805+
event,
806+
(
807+
AgentMessageChunk,
808+
AgentThoughtChunk,
809+
ToolCallStart,
810+
ToolCallProgress,
811+
),
812+
):
813+
content_events += 1
758814
yield event
759815
if isinstance(event, PromptResponse):
760816
prompt_complete = True
@@ -764,7 +820,7 @@ def send_message(
764820
completion_reason = "prompt_response_via_notification"
765821
elapsed_ms = (time.time() - start_time) * 1000
766822
logger.info(
767-
f"[ACP] Prompt #{prompt_num} complete: "
823+
f"[ACP] Prompt complete: "
768824
f"reason={completion_reason} acp_session={session_id} "
769825
f"events={events_yielded} elapsed={elapsed_ms:.0f}ms"
770826
)
@@ -813,17 +869,11 @@ def _process_session_update(
813869
yield model_class.model_validate(update)
814870
except ValidationError as e:
815871
logger.warning(f"[ACP] Validation error for {update_type}: {e}")
816-
elif update_type == "usage_update":
817-
# ACP frequently sends usage_update as the final packet without
818-
# a subsequent prompt_response or JSON-RPC response, causing the
819-
# send_message loop to hang until timeout. Treat usage_update as
820-
# an implicit end-of-turn signal by synthesizing a PromptResponse.
821-
logger.info("[ACP] usage_update received — treating as end-of-turn signal")
822-
yield PromptResponse(stopReason="end_turn")
823872
elif update_type not in (
824873
"user_message_chunk",
825874
"available_commands_update",
826875
"session_info_update",
876+
"usage_update",
827877
):
828878
logger.debug(f"[ACP] Unknown update type: {update_type}")
829879

0 commit comments

Comments
 (0)