-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathstart_shared_context_agent.py
More file actions
128 lines (113 loc) · 5.23 KB
/
start_shared_context_agent.py
File metadata and controls
128 lines (113 loc) · 5.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import asyncio
import uuid
import os
from shared.config import TEMPORAL_TASK_QUEUE, get_temporal_client
from workflows import RepairAgentWorkflowSharingContext
from dotenv import load_dotenv
import argparse
parser = argparse.ArgumentParser(description="Run the repair agent workflow.")
parser.add_argument(
"--auto-approve",
action="store_true",
help="Automatically approve the repair workflow without user input.",
)
args = parser.parse_args()
async def main(auto_approve: bool) -> None:
"""Run the shard context repair agent workflow. It's like the normal repair Workflow, but with shared context across agents.
Used to demonstrate that sharing context between all agents may not be a good design pattern.
Use the --emails-only flag to only use email data."""
# Load environment variables
load_dotenv(override=True)
user = os.environ.get("USER_NAME", "Harry.Potter")
# Create client connected to server at the given address
client = await get_temporal_client()
# Start the workflow with an initial prompt
start_msg = {
"prompt": "Analyze and repair the orders in the order system.",
"metadata": {
"user": user,
"system": "temporal-repair-agent",
},
# Add this to enable the callback for a workflow
# "callback": {
# "type": "signal-workflow",
# "name": "add_external_message",
# "task_queue": "agent-task-queue",
# "workflow_id": "agent-workflow",
# "args": "message: Union[str, Dict[str, Any]]"
# },
# Uncomment this to enable email notifications (requires email setup)
# "callback": {
# "type": "email",
# "name": "email_callback",
# "subject": "Repair Agent Notification",
# "email": "repair-notify@yourcompany.com",
# "args": "message: Union[str, Dict[str, Any]]""
# },
}
handle = await client.start_workflow(
RepairAgentWorkflowSharingContext.run,
start_msg,
id=f"shared-context-agent-for-{user}",
task_queue=TEMPORAL_TASK_QUEUE,
)
print(f"{user}'s Repair Shared Context Workflow started with ID: {handle.id}")
repairs_planned = False
while not repairs_planned:
try:
repairs_planned = await handle.query("IsRepairPlanned")
status = await handle.query("GetRepairStatus")
print(f"Current repair status: {status}")
except Exception as e:
print(f"Error querying repair status: {e}")
await asyncio.sleep(5) # Wait before checking the status again
print("Repair planning is complete.")
try:
planning_result : dict = await handle.query("GetRepairPlanningResult")
proposed_tools_for_all_orders : dict = planning_result.get("proposed_tools", [])
additional_notes = planning_result.get("additional_notes", "")
except Exception as e:
print(f"Error querying repair planning result: {e}")
proposed_tools = "No tools proposed yet."
if not proposed_tools_for_all_orders:
print("No proposed tools found for repair.")
else:
print("Proposed Orders to repair:")
for order_id, order in proposed_tools_for_all_orders.items():
print(f" - {order_id}: ")
if not isinstance(order, list):
print(f"Expected a dictionary for order, got {type(list)}")
for tool in order:
confidence_score = tool.get("confidence_score", 0.0)
additional_notes = tool.get("additional_notes", "")
if additional_notes:
additional_notes = f"({additional_notes})"
tool_name = tool.get("tool_name", "Unknown Tool Name")
if confidence_score < 0.5:
print(f"Low confidence score for repair: {confidence_score}. Tools with low confidence will not be executed.")
print(f" - {tool_name}: confidence score {confidence_score} {additional_notes}")
tool_arguments = tool.get("tool_arguments", {})
if not isinstance(tool_arguments, dict):
print(f"Expected a dictionary for tool arguments, got {type(tool_arguments)}")
for arg_name, arg_value in tool_arguments.items():
print(f" - {arg_name}: {arg_value}")
repairs_complete = False
while not repairs_complete:
try:
status = await handle.query("GetRepairStatus")
if status == "REPORT-COMPLETED" or status == "NO-REPAIRS-NEEDED" or status == "REPAIR-COMPLETED":
repairs_complete = True
break
elif status == "REPAIR-FAILED":
print("Repair failed. Exiting workflow.")
break
print(f"Current repair status: {status}")
except Exception as e:
print(f"Error querying repair status: {e}")
await asyncio.sleep(5) # Wait before checking the status again
# Wait for the workflow to complete
result = await handle.result()
print(f"Workflow completed with result: {result}")
print("Review the repair report for more details.")
if __name__ == "__main__":
asyncio.run(main(args.auto_approve))