@@ -218,20 +218,26 @@ def _read_responses(self) -> None:
218218 """Background thread to read responses from the exec stream."""
219219 buffer = ""
220220 packet_logger = get_packet_logger ()
221- # Stale cycle counter: when the buffer has unterminated content
222- # and no new data arrives, we try to parse after a few cycles.
221+ messages_enqueued = 0
223222 buffer_stale_cycles = 0
223+ empty_reads = 0
224+
225+ logger .info (f"[ACP] Reader thread started: pod={ self ._pod_name } " )
224226
225227 try :
226228 while not self ._stop_reader .is_set ():
227229 if self ._ws_client is None :
230+ logger .warning (
231+ f"[ACP] Reader: ws_client is None, exiting: "
232+ f"pod={ self ._pod_name } "
233+ )
228234 break
229235
230236 try :
231237 if self ._ws_client .is_open ():
232238 self ._ws_client .update (timeout = 0.1 )
233239
234- # Read stderr - log any agent errors
240+ # Read stderr
235241 stderr_data = self ._ws_client .read_stderr (timeout = 0.01 )
236242 if stderr_data :
237243 logger .warning (
@@ -244,51 +250,135 @@ def _read_responses(self) -> None:
244250 if data :
245251 buffer += data
246252 buffer_stale_cycles = 0
253+ empty_reads = 0
254+
255+ # Log raw data chunks for debugging
256+ newline_count = data .count ("\n " )
257+ logger .info (
258+ f"[ACP] Reader: received { len (data )} bytes, "
259+ f"{ newline_count } newlines, "
260+ f"buffer_total={ len (buffer )} b "
261+ f"pod={ self ._pod_name } "
262+ )
247263
248264 while "\n " in buffer :
249265 line , buffer = buffer .split ("\n " , 1 )
250266 line = line .strip ()
251267 if line :
252268 try :
253269 message = json .loads (line )
270+ messages_enqueued += 1
271+
272+ # Identify the message for logging
273+ msg_id = message .get ("id" )
274+ method = message .get ("method" )
275+ update_type = (
276+ message .get ("params" , {})
277+ .get ("update" , {})
278+ .get ("sessionUpdate" , "" )
279+ )
280+ has_result = "result" in message
281+ has_error = "error" in message
282+
283+ logger .info (
284+ f"[ACP] Reader enqueue #{ messages_enqueued } : "
285+ f"id={ msg_id } method={ method } "
286+ f"update={ update_type } "
287+ f"has_result={ has_result } "
288+ f"has_error={ has_error } "
289+ f"queue_size={ self ._response_queue .qsize ()} "
290+ f"pod={ self ._pod_name } "
291+ )
292+
254293 packet_logger .log_jsonrpc_raw_message (
255294 "IN" , message , context = "k8s"
256295 )
257296 self ._response_queue .put (message )
258297 except json .JSONDecodeError :
259298 logger .warning (
260299 f"[ACP] Invalid JSON from agent: "
261- f"{ line [:100 ]} "
300+ f"{ line [:200 ]} "
262301 )
263- elif buffer .strip ():
264- # No new data but buffer has unterminated content.
265- # After a few cycles (~0.5s), try to parse it —
266- # the agent may have omitted the trailing newline.
267- buffer_stale_cycles += 1
268- if buffer_stale_cycles >= 3 :
269- try :
270- message = json .loads (buffer .strip ())
271- packet_logger .log_jsonrpc_raw_message (
272- "IN" , message , context = "k8s-unterminated"
302+
303+ # Log if there's partial data left in the buffer
304+ if buffer .strip ():
305+ logger .info (
306+ f"[ACP] Reader: partial buffer after parse: "
307+ f"{ len (buffer )} b "
308+ f"preview={ buffer .strip ()[:200 ]} "
309+ f"pod={ self ._pod_name } "
310+ )
311+ else :
312+ empty_reads += 1
313+
314+ if buffer .strip ():
315+ buffer_stale_cycles += 1
316+ if buffer_stale_cycles == 1 :
317+ logger .info (
318+ f"[ACP] Reader: unterminated buffer: "
319+ f"{ len (buffer )} b "
320+ f"preview={ buffer .strip ()[:200 ]} "
321+ f"pod={ self ._pod_name } "
273322 )
274- self ._response_queue .put (message )
275- buffer = ""
276- buffer_stale_cycles = 0
277- except json .JSONDecodeError :
278- pass
323+ if buffer_stale_cycles >= 3 :
324+ try :
325+ message = json .loads (buffer .strip ())
326+ messages_enqueued += 1
327+ logger .info (
328+ f"[ACP] Reader: parsed unterminated "
329+ f"buffer as message "
330+ f"#{ messages_enqueued } : "
331+ f"id={ message .get ('id' )} "
332+ f"method={ message .get ('method' )} "
333+ f"pod={ self ._pod_name } "
334+ )
335+ packet_logger .log_jsonrpc_raw_message (
336+ "IN" ,
337+ message ,
338+ context = "k8s-unterminated" ,
339+ )
340+ self ._response_queue .put (message )
341+ buffer = ""
342+ buffer_stale_cycles = 0
343+ except json .JSONDecodeError :
344+ pass
345+
346+ # Log every ~3s of silence
347+ if empty_reads > 0 and empty_reads % 30 == 0 :
348+ logger .info (
349+ f"[ACP] Reader idle: "
350+ f"no data for { empty_reads } cycles "
351+ f"(~{ empty_reads * 0.1 :.0f} s) "
352+ f"ws_open={ self ._ws_client .is_open ()} "
353+ f"buffer={ len (buffer )} b "
354+ f"enqueued={ messages_enqueued } "
355+ f"queue_size={ self ._response_queue .qsize ()} "
356+ f"pod={ self ._pod_name } "
357+ )
279358
280359 else :
281- logger .warning (f"[ACP] WebSocket closed: pod={ self ._pod_name } " )
360+ logger .warning (
361+ f"[ACP] WebSocket closed: pod={ self ._pod_name } "
362+ f"enqueued={ messages_enqueued } "
363+ )
282364 break
283365
284366 except Exception as e :
285367 if not self ._stop_reader .is_set ():
286- logger .warning (f"[ACP] Reader error: { e } , pod={ self ._pod_name } " )
368+ logger .warning (
369+ f"[ACP] Reader error: { e } , "
370+ f"enqueued={ messages_enqueued } "
371+ f"pod={ self ._pod_name } "
372+ )
287373 break
288374 finally :
289- # Flush any remaining data in buffer
290375 remaining = buffer .strip ()
291376 if remaining :
377+ logger .info (
378+ f"[ACP] Reader: flushing buffer on exit: "
379+ f"{ len (remaining )} b preview={ remaining [:200 ]} "
380+ f"pod={ self ._pod_name } "
381+ )
292382 try :
293383 message = json .loads (remaining )
294384 packet_logger .log_jsonrpc_raw_message (
@@ -300,6 +390,12 @@ def _read_responses(self) -> None:
300390 f"[ACP] Buffer flush failed (not JSON): { remaining [:200 ]} "
301391 )
302392
393+ logger .info (
394+ f"[ACP] Reader thread exiting: pod={ self ._pod_name } "
395+ f"enqueued={ messages_enqueued } "
396+ f"queue_size={ self ._response_queue .qsize ()} "
397+ )
398+
303399 def stop (self ) -> None :
304400 """Stop the exec session and clean up."""
305401 session_ids = list (self ._state .sessions .keys ())
0 commit comments