-
Notifications
You must be signed in to change notification settings - Fork 5.2k
Expand file tree
/
Copy pathqueen_lifecycle_tools.py
More file actions
1661 lines (1451 loc) · 64 KB
/
queen_lifecycle_tools.py
File metadata and controls
1661 lines (1451 loc) · 64 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""Queen lifecycle tools for worker management.
These tools give the Queen agent control over the worker agent's lifecycle.
They close over a session-like object that provides ``worker_runtime``,
allowing late-binding access to the worker (which may be loaded/unloaded
dynamically).
Usage::
from framework.tools.queen_lifecycle_tools import register_queen_lifecycle_tools
# Server path — pass a Session object
register_queen_lifecycle_tools(
registry=queen_tool_registry,
session=session,
session_id=session.id,
)
# TUI path — wrap bare references in an adapter
from framework.tools.queen_lifecycle_tools import WorkerSessionAdapter
adapter = WorkerSessionAdapter(
worker_runtime=runtime,
event_bus=event_bus,
worker_path=storage_path,
)
register_queen_lifecycle_tools(
registry=queen_tool_registry,
session=adapter,
session_id=session_id,
)
"""
from __future__ import annotations
import asyncio
import json
import logging
from dataclasses import dataclass, field
from datetime import UTC
from pathlib import Path
from typing import TYPE_CHECKING, Any
from framework.credentials.models import CredentialError
from framework.runner.preload_validation import credential_errors_to_json, validate_credentials
from framework.runtime.event_bus import AgentEvent, EventType
from framework.server.app import validate_agent_path
if TYPE_CHECKING:
from framework.runner.tool_registry import ToolRegistry
from framework.runtime.agent_runtime import AgentRuntime
from framework.runtime.event_bus import EventBus
logger = logging.getLogger(__name__)
@dataclass
class WorkerSessionAdapter:
"""Adapter for TUI compatibility.
Wraps bare worker_runtime + event_bus + storage_path into a
session-like object that queen lifecycle tools can use.
"""
worker_runtime: Any # AgentRuntime
event_bus: Any # EventBus
worker_path: Path | None = None
@dataclass
class QueenPhaseState:
"""Mutable state container for queen operating phase.
Three phases: building → staging → running.
Shared between the dynamic_tools_provider callback and tool handlers
that trigger phase transitions.
"""
phase: str = "building" # "building", "staging", or "running"
building_tools: list = field(default_factory=list) # list[Tool]
staging_tools: list = field(default_factory=list) # list[Tool]
running_tools: list = field(default_factory=list) # list[Tool]
inject_notification: Any = None # async (str) -> None
event_bus: Any = None # EventBus — for emitting QUEEN_PHASE_CHANGED events
# Phase-specific prompts (set by session_manager after construction)
prompt_building: str = ""
prompt_staging: str = ""
prompt_running: str = ""
def get_current_tools(self) -> list:
"""Return tools for the current phase."""
if self.phase == "running":
return list(self.running_tools)
if self.phase == "staging":
return list(self.staging_tools)
return list(self.building_tools)
def get_current_prompt(self) -> str:
"""Return the system prompt for the current phase."""
if self.phase == "running":
return self.prompt_running
if self.phase == "staging":
return self.prompt_staging
return self.prompt_building
async def _emit_phase_event(self) -> None:
"""Publish a QUEEN_PHASE_CHANGED event so the frontend updates the tag."""
if self.event_bus is not None:
await self.event_bus.publish(
AgentEvent(
type=EventType.QUEEN_PHASE_CHANGED,
stream_id="queen",
data={"phase": self.phase},
)
)
async def switch_to_running(self, source: str = "tool") -> None:
"""Switch to running phase and notify the queen.
Args:
source: Who triggered the switch — "tool" (queen LLM),
"frontend" (user clicked Run), or "auto" (system).
"""
if self.phase == "running":
return
self.phase = "running"
tool_names = [t.name for t in self.running_tools]
logger.info("Queen phase → running (source=%s, tools: %s)", source, tool_names)
await self._emit_phase_event()
if self.inject_notification:
if source == "frontend":
msg = (
"[PHASE CHANGE] The user clicked Run in the UI. Switched to RUNNING phase. "
"Worker is now executing. You have monitoring/lifecycle tools: "
+ ", ".join(tool_names)
+ "."
)
else:
msg = (
"[PHASE CHANGE] Switched to RUNNING phase. "
"Worker is executing. You now have monitoring/lifecycle tools: "
+ ", ".join(tool_names)
+ "."
)
await self.inject_notification(msg)
async def switch_to_staging(self, source: str = "tool") -> None:
"""Switch to staging phase and notify the queen.
Args:
source: Who triggered the switch — "tool", "frontend", or "auto".
"""
if self.phase == "staging":
return
self.phase = "staging"
tool_names = [t.name for t in self.staging_tools]
logger.info("Queen phase → staging (source=%s, tools: %s)", source, tool_names)
await self._emit_phase_event()
if self.inject_notification:
if source == "frontend":
msg = (
"[PHASE CHANGE] The user stopped the worker from the UI. "
"Switched to STAGING phase. Agent is still loaded. "
"Available tools: " + ", ".join(tool_names) + "."
)
elif source == "auto":
msg = (
"[PHASE CHANGE] Worker execution completed. Switched to STAGING phase. "
"Agent is still loaded. Call run_agent_with_input(task) to run again. "
"Available tools: " + ", ".join(tool_names) + "."
)
else:
msg = (
"[PHASE CHANGE] Switched to STAGING phase. "
"Agent loaded and ready. Call run_agent_with_input(task) to start, "
"or stop_worker_and_edit() to go back to building. "
"Available tools: " + ", ".join(tool_names) + "."
)
await self.inject_notification(msg)
async def switch_to_building(self, source: str = "tool") -> None:
"""Switch to building phase and notify the queen.
Args:
source: Who triggered the switch — "tool", "frontend", or "auto".
"""
if self.phase == "building":
return
self.phase = "building"
tool_names = [t.name for t in self.building_tools]
logger.info("Queen phase → building (source=%s, tools: %s)", source, tool_names)
await self._emit_phase_event()
if self.inject_notification:
await self.inject_notification(
"[PHASE CHANGE] Switched to BUILDING phase. "
"Lifecycle tools removed. Full coding tools restored. "
"Call load_built_agent(path) when ready to stage."
)
def build_worker_profile(runtime: AgentRuntime, agent_path: Path | str | None = None) -> str:
"""Build a worker capability profile from its graph/goal definition.
Injected into the queen's system prompt so it knows what the worker
can and cannot do — enabling correct delegation decisions.
"""
graph = runtime.graph
goal = runtime.goal
lines = ["\n\n# Worker Profile"]
lines.append(f"Agent: {runtime.graph_id}")
if agent_path:
lines.append(f"Path: {agent_path}")
lines.append(f"Goal: {goal.name}")
if goal.description:
lines.append(f"Description: {goal.description}")
if goal.success_criteria:
lines.append("\n## Success Criteria")
for sc in goal.success_criteria:
lines.append(f"- {sc.description}")
if goal.constraints:
lines.append("\n## Constraints")
for c in goal.constraints:
lines.append(f"- {c.description}")
if graph.nodes:
lines.append("\n## Processing Stages")
for node in graph.nodes:
lines.append(f"- {node.id}: {node.description or node.name}")
all_tools: set[str] = set()
for node in graph.nodes:
if node.tools:
all_tools.update(node.tools)
if all_tools:
lines.append(f"\n## Worker Tools\n{', '.join(sorted(all_tools))}")
lines.append("\nStatus at session start: idle (not started).")
return "\n".join(lines)
def register_queen_lifecycle_tools(
registry: ToolRegistry,
session: Any = None,
session_id: str | None = None,
# Legacy params — used by TUI when not passing a session object
worker_runtime: AgentRuntime | None = None,
event_bus: EventBus | None = None,
storage_path: Path | None = None,
# Server context — enables load_built_agent tool
session_manager: Any = None,
manager_session_id: str | None = None,
# Mode switching
phase_state: QueenPhaseState | None = None,
) -> int:
"""Register queen lifecycle tools.
Args:
session: A Session or WorkerSessionAdapter with ``worker_runtime``
attribute. The tools read ``session.worker_runtime`` on each
call, supporting late-binding (worker loaded/unloaded).
session_id: Shared session ID so the worker uses the same session
scope as the queen and judge.
worker_runtime: (Legacy) Direct runtime reference. If ``session``
is not provided, a WorkerSessionAdapter is created from
worker_runtime + event_bus + storage_path.
session_manager: (Server only) The SessionManager instance, needed
for ``load_built_agent`` to hot-load a worker.
manager_session_id: (Server only) The session's ID in the manager,
used with ``session_manager.load_worker()``.
phase_state: (Optional) Mutable phase state for building/running
phase switching. When provided, load_built_agent switches to
running phase and stop_worker_and_edit switches to building phase.
Returns the number of tools registered.
"""
# Build session adapter from legacy params if needed
if session is None:
if worker_runtime is None:
raise ValueError("Either session or worker_runtime must be provided")
session = WorkerSessionAdapter(
worker_runtime=worker_runtime,
event_bus=event_bus,
worker_path=storage_path,
)
from framework.llm.provider import Tool
tools_registered = 0
def _get_runtime():
"""Get current worker runtime from session (late-binding)."""
return getattr(session, "worker_runtime", None)
# --- start_worker ---------------------------------------------------------
# How long to wait for credential validation + MCP resync before
# proceeding with trigger anyway. These are pre-flight checks that
# should not block the queen indefinitely.
_START_PREFLIGHT_TIMEOUT = 15 # seconds
async def start_worker(task: str) -> str:
"""Start the worker agent with a task description.
Triggers the worker's default entry point with the given task.
Returns immediately — the worker runs asynchronously.
"""
runtime = _get_runtime()
if runtime is None:
return json.dumps({"error": "No worker loaded in this session."})
try:
# Pre-flight: validate credentials and resync MCP servers.
# Both are blocking I/O (HTTP health-checks, subprocess spawns)
# so they run in a thread-pool executor. We cap the total
# preflight time so the queen never hangs waiting.
loop = asyncio.get_running_loop()
async def _preflight():
cred_error: CredentialError | None = None
try:
await loop.run_in_executor(
None,
lambda: validate_credentials(
runtime.graph.nodes,
interactive=False,
skip=False,
),
)
except CredentialError as e:
cred_error = e
runner = getattr(session, "runner", None)
if runner:
try:
await loop.run_in_executor(
None,
lambda: runner._tool_registry.resync_mcp_servers_if_needed(),
)
except Exception as e:
logger.warning("MCP resync failed: %s", e)
# Re-raise CredentialError after MCP resync so both steps
# get a chance to run before we bail.
if cred_error is not None:
raise cred_error
try:
await asyncio.wait_for(_preflight(), timeout=_START_PREFLIGHT_TIMEOUT)
except TimeoutError:
logger.warning(
"start_worker preflight timed out after %ds — proceeding with trigger",
_START_PREFLIGHT_TIMEOUT,
)
except CredentialError:
raise # handled below
# Resume timers in case they were paused by a previous stop_worker
runtime.resume_timers()
# Get session state from any prior execution for memory continuity
session_state = runtime._get_primary_session_state("default") or {}
# Use the shared session ID so queen, judge, and worker all
# scope their conversations to the same session.
if session_id:
session_state["resume_session_id"] = session_id
exec_id = await runtime.trigger(
entry_point_id="default",
input_data={"user_request": task},
session_state=session_state,
)
return json.dumps(
{
"status": "started",
"execution_id": exec_id,
"task": task,
}
)
except CredentialError as e:
# Build structured error with per-credential details so the
# queen can report exactly what's missing and how to fix it.
error_payload = credential_errors_to_json(e)
error_payload["agent_path"] = str(getattr(session, "worker_path", "") or "")
# Emit SSE event so the frontend opens the credentials modal
bus = getattr(session, "event_bus", None)
if bus is not None:
await bus.publish(
AgentEvent(
type=EventType.CREDENTIALS_REQUIRED,
stream_id="queen",
data=error_payload,
)
)
return json.dumps(error_payload)
except Exception as e:
return json.dumps({"error": f"Failed to start worker: {e}"})
_start_tool = Tool(
name="start_worker",
description=(
"Start the worker agent with a task description. The worker runs "
"autonomously in the background. Returns an execution ID for tracking."
),
parameters={
"type": "object",
"properties": {
"task": {
"type": "string",
"description": "Description of the task for the worker to perform",
},
},
"required": ["task"],
},
)
registry.register("start_worker", _start_tool, lambda inputs: start_worker(**inputs))
tools_registered += 1
# --- stop_worker ----------------------------------------------------------
async def stop_worker() -> str:
"""Cancel all active worker executions across all graphs.
Stops the worker immediately. Returns the IDs of cancelled executions.
"""
runtime = _get_runtime()
if runtime is None:
return json.dumps({"error": "No worker loaded in this session."})
cancelled = []
# Iterate ALL registered graphs — multiple entrypoint requests
# can spawn executions in different graphs within the same session.
for graph_id in runtime.list_graphs():
reg = runtime.get_graph_registration(graph_id)
if reg is None:
continue
for _ep_id, stream in reg.streams.items():
# Signal shutdown on all active EventLoopNodes first so they
# exit cleanly and cancel their in-flight LLM streams.
for executor in stream._active_executors.values():
for node in executor.node_registry.values():
if hasattr(node, "signal_shutdown"):
node.signal_shutdown()
if hasattr(node, "cancel_current_turn"):
node.cancel_current_turn()
for exec_id in list(stream.active_execution_ids):
try:
ok = await stream.cancel_execution(exec_id)
if ok:
cancelled.append(exec_id)
except Exception as e:
logger.warning("Failed to cancel %s: %s", exec_id, e)
# Pause timers so the next tick doesn't restart execution
runtime.pause_timers()
return json.dumps(
{
"status": "stopped" if cancelled else "no_active_executions",
"cancelled": cancelled,
"timers_paused": True,
}
)
_stop_tool = Tool(
name="stop_worker",
description=(
"Cancel the worker agent's active execution and pause its timers. "
"The worker stops gracefully. No parameters needed."
),
parameters={"type": "object", "properties": {}},
)
registry.register("stop_worker", _stop_tool, lambda inputs: stop_worker())
tools_registered += 1
# --- stop_worker_and_edit -------------------------------------------------
async def stop_worker_and_edit() -> str:
"""Stop the worker and switch to building phase for editing the agent."""
stop_result = await stop_worker()
# Switch to building phase
if phase_state is not None:
await phase_state.switch_to_building()
result = json.loads(stop_result)
result["phase"] = "building"
result["message"] = (
"Worker stopped. You are now in building phase. "
"Use your coding tools to modify the agent, then call "
"load_built_agent(path) to stage it again."
)
return json.dumps(result)
_stop_edit_tool = Tool(
name="stop_worker_and_edit",
description=(
"Stop the running worker and switch to building phase. "
"Use this when you need to modify the agent's code, nodes, or configuration. "
"After editing, call load_built_agent(path) to reload and run."
),
parameters={"type": "object", "properties": {}},
)
registry.register(
"stop_worker_and_edit", _stop_edit_tool, lambda inputs: stop_worker_and_edit()
)
tools_registered += 1
# --- stop_worker (Running → Staging) -------------------------------------
async def stop_worker_to_staging() -> str:
"""Stop the running worker and switch to staging phase.
After stopping, ask the user whether they want to:
1. Re-run the agent with new input → call run_agent_with_input(task)
2. Edit the agent code → call stop_worker_and_edit() to go to building phase
"""
stop_result = await stop_worker()
# Switch to staging phase
if phase_state is not None:
await phase_state.switch_to_staging()
result = json.loads(stop_result)
result["phase"] = "staging"
result["message"] = (
"Worker stopped. You are now in staging phase. "
"Ask the user: would they like to re-run with new input, "
"or edit the agent code?"
)
return json.dumps(result)
_stop_worker_tool = Tool(
name="stop_worker",
description=(
"Stop the running worker and switch to staging phase. "
"After stopping, ask the user whether they want to re-run "
"with new input or edit the agent code."
),
parameters={"type": "object", "properties": {}},
)
registry.register("stop_worker", _stop_worker_tool, lambda inputs: stop_worker_to_staging())
tools_registered += 1
# --- get_worker_status ----------------------------------------------------
def _get_event_bus():
"""Get the session's event bus for querying history."""
return getattr(session, "event_bus", None)
# Tiered cooldowns: summary is free, detail has short cooldown, full keeps 30s
_COOLDOWN_FULL = 30.0
_COOLDOWN_DETAIL = 10.0
_status_last_called: dict[str, float] = {} # tier -> monotonic time
def _format_elapsed(seconds: float) -> str:
"""Format seconds as human-readable duration."""
s = int(seconds)
if s < 60:
return f"{s}s"
m, rem = divmod(s, 60)
if m < 60:
return f"{m}m {rem}s"
h, m = divmod(m, 60)
return f"{h}h {m}m"
def _format_time_ago(ts) -> str:
"""Format a datetime as relative time ago."""
from datetime import datetime
now = datetime.now(UTC)
if ts.tzinfo is None:
ts = ts.replace(tzinfo=UTC)
delta = (now - ts).total_seconds()
if delta < 60:
return f"{int(delta)}s ago"
if delta < 3600:
return f"{int(delta / 60)}m ago"
return f"{int(delta / 3600)}h ago"
def _preview_value(value: Any, max_len: int = 120) -> str:
"""Format a memory value for display, truncating if needed."""
if value is None:
return "null (not yet set)"
if isinstance(value, list):
preview = str(value)[:max_len]
return f"[{len(value)} items] {preview}"
if isinstance(value, dict):
preview = str(value)[:max_len]
return f"{{{len(value)} keys}} {preview}"
s = str(value)
if len(s) > max_len:
return s[:max_len] + "..."
return s
def _build_preamble(
runtime: AgentRuntime,
) -> dict[str, Any]:
"""Build the lightweight preamble: status, node, elapsed, iteration.
Always cheap to compute. Returns a dict with:
- status: idle / running / waiting_for_input
- current_node, current_iteration, elapsed_seconds (when applicable)
- pending_question (when waiting)
- _active_execs (internal, stripped before return)
"""
from datetime import datetime
graph_id = runtime.graph_id
reg = runtime.get_graph_registration(graph_id)
if reg is None:
return {"status": "not_loaded"}
preamble: dict[str, Any] = {}
# Execution state
active_execs = []
for ep_id, stream in reg.streams.items():
for exec_id in stream.active_execution_ids:
exec_info: dict[str, Any] = {
"execution_id": exec_id,
"entry_point": ep_id,
}
ctx = stream.get_context(exec_id)
if ctx:
elapsed = (datetime.now() - ctx.started_at).total_seconds()
exec_info["elapsed_seconds"] = round(elapsed, 1)
active_execs.append(exec_info)
preamble["_active_execs"] = active_execs
if not active_execs:
preamble["status"] = "idle"
else:
waiting_nodes = []
for _ep_id, stream in reg.streams.items():
waiting_nodes.extend(stream.get_waiting_nodes())
preamble["status"] = "waiting_for_input" if waiting_nodes else "running"
if active_execs:
preamble["elapsed_seconds"] = active_execs[0].get("elapsed_seconds", 0)
# Enrich with EventBus basics (cheap limit=1 queries)
bus = _get_event_bus()
if bus:
if preamble["status"] == "waiting_for_input":
input_events = bus.get_history(event_type=EventType.CLIENT_INPUT_REQUESTED, limit=1)
if input_events:
prompt = input_events[0].data.get("prompt", "")
if prompt:
preamble["pending_question"] = prompt[:200]
edge_events = bus.get_history(event_type=EventType.EDGE_TRAVERSED, limit=1)
if edge_events:
target = edge_events[0].data.get("target_node")
if target:
preamble["current_node"] = target
iter_events = bus.get_history(event_type=EventType.NODE_LOOP_ITERATION, limit=1)
if iter_events:
preamble["current_iteration"] = iter_events[0].data.get("iteration")
return preamble
def _detect_red_flags(bus: EventBus) -> int:
"""Count issue categories with cheap limit=1 queries."""
count = 0
for evt_type in (
EventType.NODE_STALLED,
EventType.NODE_TOOL_DOOM_LOOP,
EventType.CONSTRAINT_VIOLATION,
):
if bus.get_history(event_type=evt_type, limit=1):
count += 1
return count
def _format_summary(preamble: dict[str, Any], red_flags: int) -> str:
"""Generate a 1-2 sentence prose summary from the preamble."""
status = preamble["status"]
if status == "idle":
return "Worker is idle. No active executions."
if status == "not_loaded":
return "No worker loaded."
if status == "waiting_for_input":
q = preamble.get("pending_question", "")
if q:
return f'Worker is waiting for input: "{q}"'
return "Worker is waiting for input."
# Running
parts = []
elapsed = preamble.get("elapsed_seconds", 0)
parts.append(f"Worker is running ({_format_elapsed(elapsed)})")
node = preamble.get("current_node")
iteration = preamble.get("current_iteration")
if node:
node_part = f"Currently in {node}"
if iteration is not None:
node_part += f", iteration {iteration}"
parts.append(node_part)
if red_flags:
parts.append(f"{red_flags} issue type(s) detected — use focus='issues' for details")
else:
parts.append("No issues detected")
return ". ".join(parts) + "."
def _format_activity(bus: EventBus, preamble: dict[str, Any], last_n: int) -> str:
"""Format current activity: node, iteration, transitions, LLM output."""
lines = []
node = preamble.get("current_node", "unknown")
iteration = preamble.get("current_iteration")
elapsed = preamble.get("elapsed_seconds", 0)
node_desc = f"Current node: {node}"
if iteration is not None:
node_desc += f" (iteration {iteration}, {_format_elapsed(elapsed)} elapsed)"
else:
node_desc += f" ({_format_elapsed(elapsed)} elapsed)"
lines.append(node_desc)
# Latest LLM output snippet
text_events = bus.get_history(event_type=EventType.LLM_TEXT_DELTA, limit=1)
if text_events:
snapshot = text_events[0].data.get("snapshot", "") or ""
snippet = snapshot[-300:].strip()
if snippet:
# Show last meaningful chunk
lines.append(f'Last LLM output: "{snippet}"')
# Recent node transitions
edges = bus.get_history(event_type=EventType.EDGE_TRAVERSED, limit=last_n)
if edges:
lines.append("")
lines.append("Recent transitions:")
for evt in edges:
src = evt.data.get("source_node", "?")
tgt = evt.data.get("target_node", "?")
cond = evt.data.get("edge_condition", "")
ago = _format_time_ago(evt.timestamp)
lines.append(f" {src} -> {tgt} ({cond}, {ago})")
return "\n".join(lines)
async def _format_memory(runtime: AgentRuntime) -> str:
"""Format the worker's shared memory snapshot and recent changes."""
from framework.runtime.shared_state import IsolationLevel
lines = []
active_streams = runtime.get_active_streams()
if not active_streams:
return "Worker has no active executions. No memory to inspect."
# Read memory from the first active execution
stream_info = active_streams[0]
exec_ids = stream_info.get("active_execution_ids", [])
stream_id = stream_info.get("stream_id", "")
if not exec_ids:
return "No active execution found."
exec_id = exec_ids[0]
memory = runtime.state_manager.create_memory(exec_id, stream_id, IsolationLevel.SHARED)
state = await memory.read_all()
if not state:
lines.append("Worker's shared memory is empty.")
else:
lines.append(f"Worker's shared memory ({len(state)} keys):")
for key, value in state.items():
lines.append(f" {key}: {_preview_value(value)}")
# Recent state changes
changes = runtime.state_manager.get_recent_changes(limit=5)
if changes:
lines.append("")
lines.append(f"Recent changes (last {len(changes)}):")
for change in reversed(changes): # most recent first
from datetime import datetime
ago = _format_time_ago(datetime.fromtimestamp(change.timestamp, tz=UTC))
if change.old_value is None:
lines.append(f" {change.key} set ({ago})")
else:
old_preview = _preview_value(change.old_value, 40)
new_preview = _preview_value(change.new_value, 40)
lines.append(f" {change.key}: {old_preview} -> {new_preview} ({ago})")
return "\n".join(lines)
def _format_tools(bus: EventBus, last_n: int) -> str:
"""Format running and recent tool calls."""
lines = []
# Running tools (started but not yet completed)
tool_started = bus.get_history(event_type=EventType.TOOL_CALL_STARTED, limit=last_n * 2)
tool_completed = bus.get_history(event_type=EventType.TOOL_CALL_COMPLETED, limit=last_n * 2)
completed_ids = {
evt.data.get("tool_use_id") for evt in tool_completed if evt.data.get("tool_use_id")
}
running = [
evt
for evt in tool_started
if evt.data.get("tool_use_id") and evt.data.get("tool_use_id") not in completed_ids
]
if running:
names = [evt.data.get("tool_name", "?") for evt in running]
lines.append(f"{len(running)} tool(s) running: {', '.join(names)}.")
for evt in running:
name = evt.data.get("tool_name", "?")
node = evt.node_id or "?"
ago = _format_time_ago(evt.timestamp)
inp = str(evt.data.get("tool_input", ""))[:150]
lines.append(f" {name} ({node}, started {ago})")
if inp:
lines.append(f" Input: {inp}")
else:
lines.append("No tools currently running.")
# Recent completed calls
if tool_completed:
lines.append("")
lines.append(f"Recent calls (last {min(last_n, len(tool_completed))}):")
for evt in tool_completed[:last_n]:
name = evt.data.get("tool_name", "?")
node = evt.node_id or "?"
is_error = bool(evt.data.get("is_error"))
status = "error" if is_error else "ok"
duration = evt.data.get("duration_s")
dur_str = f", {duration:.1f}s" if duration else ""
lines.append(f" {name} ({node}) — {status}{dur_str}")
else:
lines.append("No recent tool calls.")
return "\n".join(lines)
def _format_issues(bus: EventBus) -> str:
"""Format retries, stalls, doom loops, and constraint violations."""
lines = []
total = 0
# Retries
retries = bus.get_history(event_type=EventType.NODE_RETRY, limit=20)
if retries:
total += len(retries)
lines.append(f"{len(retries)} retry event(s):")
for evt in retries[:5]:
node = evt.node_id or "?"
count = evt.data.get("retry_count", "?")
error = evt.data.get("error", "")[:120]
ago = _format_time_ago(evt.timestamp)
lines.append(f" {node} (attempt {count}, {ago}): {error}")
# Stalls
stalls = bus.get_history(event_type=EventType.NODE_STALLED, limit=5)
if stalls:
total += len(stalls)
lines.append(f"{len(stalls)} stall(s):")
for evt in stalls:
node = evt.node_id or "?"
reason = evt.data.get("reason", "")[:150]
ago = _format_time_ago(evt.timestamp)
lines.append(f" {node} ({ago}): {reason}")
# Doom loops
doom_loops = bus.get_history(event_type=EventType.NODE_TOOL_DOOM_LOOP, limit=5)
if doom_loops:
total += len(doom_loops)
lines.append(f"{len(doom_loops)} tool doom loop(s):")
for evt in doom_loops:
node = evt.node_id or "?"
desc = evt.data.get("description", "")[:150]
ago = _format_time_ago(evt.timestamp)
lines.append(f" {node} ({ago}): {desc}")
# Constraint violations
violations = bus.get_history(event_type=EventType.CONSTRAINT_VIOLATION, limit=5)
if violations:
total += len(violations)
lines.append(f"{len(violations)} constraint violation(s):")
for evt in violations:
cid = evt.data.get("constraint_id", "?")
desc = evt.data.get("description", "")[:150]
ago = _format_time_ago(evt.timestamp)
lines.append(f" {cid} ({ago}): {desc}")
if total == 0:
return "No issues detected. No retries, stalls, or constraint violations."
header = f"{total} issue(s) detected."
return header + "\n\n" + "\n".join(lines)
async def _format_progress(runtime: AgentRuntime, bus: EventBus) -> str:
"""Format goal progress, token consumption, and execution outcomes."""
lines = []
# Goal progress
try:
progress = await runtime.get_goal_progress()
if progress:
criteria = progress.get("criteria_status", {})
if criteria:
met = sum(1 for c in criteria.values() if c.get("met"))
total_c = len(criteria)
lines.append(f"Goal: {met}/{total_c} criteria met.")
for cid, cdata in criteria.items():
marker = "met" if cdata.get("met") else "not met"
desc = cdata.get("description", cid)
evidence = cdata.get("evidence", [])
ev_str = f" — {evidence[0]}" if evidence else ""
lines.append(f" [{marker}] {desc}{ev_str}")
rec = progress.get("recommendation")
if rec:
lines.append(f"Recommendation: {rec}.")
except Exception:
lines.append("Goal progress unavailable.")
# Token summary
llm_events = bus.get_history(event_type=EventType.LLM_TURN_COMPLETE, limit=200)
if llm_events:
total_in = sum(evt.data.get("input_tokens", 0) or 0 for evt in llm_events)
total_out = sum(evt.data.get("output_tokens", 0) or 0 for evt in llm_events)
total_tok = total_in + total_out
lines.append("")
lines.append(
f"Tokens: {len(llm_events)} LLM turns, "
f"{total_tok:,} total ({total_in:,} in + {total_out:,} out)."
)
# Execution outcomes
exec_completed = bus.get_history(event_type=EventType.EXECUTION_COMPLETED, limit=5)
exec_failed = bus.get_history(event_type=EventType.EXECUTION_FAILED, limit=5)
completed_n = len(exec_completed)
failed_n = len(exec_failed)
active_n = len(runtime.get_active_streams())
lines.append(
f"Executions: {completed_n} completed, {failed_n} failed"
+ (f" ({active_n} active)." if active_n else ".")
)
if exec_failed:
for evt in exec_failed[:3]:
error = evt.data.get("error", "")[:150]
ago = _format_time_ago(evt.timestamp)
lines.append(f" Failed ({ago}): {error}")
return "\n".join(lines)
def _build_full_json(
runtime: AgentRuntime,
bus: EventBus,
preamble: dict[str, Any],
last_n: int,
) -> dict[str, Any]:
"""Build the legacy full JSON response (backward compat for focus='full')."""
graph_id = runtime.graph_id
goal = runtime.goal
result: dict[str, Any] = {
"worker_graph_id": graph_id,
"worker_goal": getattr(goal, "name", graph_id),
"status": preamble["status"],
}
active_execs = preamble.get("_active_execs", [])
if active_execs:
result["active_executions"] = active_execs
if preamble.get("pending_question"):
result["pending_question"] = preamble["pending_question"]
result["agent_idle_seconds"] = round(runtime.agent_idle_seconds, 1)
for key in ("current_node", "current_iteration"):
if key in preamble:
result[key] = preamble[key]
# Running + completed tool calls
tool_started = bus.get_history(event_type=EventType.TOOL_CALL_STARTED, limit=last_n * 2)
tool_completed = bus.get_history(event_type=EventType.TOOL_CALL_COMPLETED, limit=last_n * 2)
completed_ids = {
evt.data.get("tool_use_id") for evt in tool_completed if evt.data.get("tool_use_id")
}
running = [
evt
for evt in tool_started
if evt.data.get("tool_use_id") and evt.data.get("tool_use_id") not in completed_ids
]
if running:
result["running_tools"] = [
{
"tool": evt.data.get("tool_name"),