-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathstart_repair_agent_proactive.py
More file actions
180 lines (160 loc) · 8.03 KB
/
start_repair_agent_proactive.py
File metadata and controls
180 lines (160 loc) · 8.03 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
import asyncio
import uuid
import os
from shared.config import TEMPORAL_TASK_QUEUE, get_temporal_client
from workflows import RepairAgentWorkflowProactive
from dotenv import load_dotenv
import argparse
parser = argparse.ArgumentParser(description="Run the repair agent workflow.")
parser.add_argument(
"--auto-approve",
action="store_true",
help="Automatically approve the repair workflow without user input.",
)
args = parser.parse_args()
async def main(auto_approve: bool) -> None:
"""Run the proactive repair agent workflow.
This workflow runs periodically, analyzing and repairing orders in the order system.
It will propose repairs and wait for user approval before executing them.
After repairs & reporting, it will continue to run periodically to check for new issues.
It will notify the user of any issues found and proposed repairs.
Use the --auto-approve flag to skip user approval and proceed with repairs automatically (for the first one)."""
# Load environment variables
load_dotenv(override=True)
user = os.environ.get("USER_NAME", "Harry.Potter")
# Create client connected to server at the given address
client = await get_temporal_client()
# Start the workflow with an initial prompt
start_msg = {
"prompt": "Analyze and repair the orders in the order system.",
"metadata": {
"user": user,
"system": "temporal-repair-agent",
},
# Add this to enable the callback for a workflow
# "callback": {
# "type": "signal-workflow",
# "name": "add_external_message",
# "task_queue": "agent-task-queue",
# "workflow_id": "agent-workflow",
# "args": "message: Union[str, Dict[str, Any]]"
# },
# Uncomment this to enable email notifications (requires email setup)
# "callback": {
# "type": "email",
# "name": "email_callback",
# "subject": "Repair Agent Notification",
# "email": "repair-notify@yourcompany.com",
# "args": "message: Union[str, Dict[str, Any]]""
# },
}
handle = await client.start_workflow(
RepairAgentWorkflowProactive.run,
start_msg,
id=f"always-be-repairin-for-{user}",
task_queue=TEMPORAL_TASK_QUEUE,
)
print(f"{user}'s Repair Workflow started with ID: {handle.id}")
# monitor the workflow as it's always running
while True:
# Check if repairs have been planned
repairs_planned = False
while not repairs_planned:
try:
repairs_planned = await handle.query("IsRepairPlanned")
status = await handle.query("GetRepairStatus")
print(f"Current repair status: {status}")
except Exception as e:
print(f"Error querying repair status: {e}")
await asyncio.sleep(5) # Wait before checking the status again
print("Repair planning is complete.")
# get the proposed tools for repair
try:
planning_result : dict = await handle.query("GetRepairPlanningResult")
proposed_tools_for_all_orders : dict = planning_result.get("proposed_tools", [])
additional_notes = planning_result.get("additional_notes", "")
except Exception as e:
print(f"Error querying repair planning result: {e}")
proposed_tools = "No tools proposed yet."
if not proposed_tools_for_all_orders:
print("No proposed tools found for repair.")
else:
print("Proposed Orders to repair:")
for order_id, order in proposed_tools_for_all_orders.items():
print(f" - {order_id}: ")
if not isinstance(order, list):
print(f"Expected a dictionary for order, got {type(list)}")
for tool in order:
confidence_score = tool.get("confidence_score", 0.0)
additional_notes = tool.get("additional_notes", "")
if additional_notes:
additional_notes = f"({additional_notes})"
tool_name = tool.get("tool_name", "Unknown Tool Name")
if confidence_score < 0.5:
print(f"Low confidence score for repair: {confidence_score}. Tools with low confidence will not be executed.")
print(f" - {tool_name}: confidence score {confidence_score} {additional_notes}")
tool_arguments = tool.get("tool_arguments", {})
if not isinstance(tool_arguments, dict):
print(f"Expected a dictionary for tool arguments, got {type(tool_arguments)}")
for arg_name, arg_value in tool_arguments.items():
print(f" - {arg_name}: {arg_value}")
if not auto_approve:
print("Waiting for user approval to proceed with repairs...")
try:
approved = await handle.query("IsRepairApproved")
if approved:
print("Repair has already been approved.")
else:
print("Repair has not been approved yet. Waiting for user input...")
# Wait for user input to approve the repair
while not approved:
user_input = input("Do you approve the repair? (yes/no): ").strip().lower()
if user_input == "yes":
await handle.signal("ApproveRepair", user)
approved = True
print("Repair approved by user.")
elif user_input == "no":
print("Repair not approved. Exiting workflow.")
return
else:
print("Invalid input. Please enter 'yes' or 'no'.")
except Exception as e:
print(f"Error querying repair approval status: {e}")
else:
print("Auto-approval is enabled. Proceeding with repair workflow.")
print("Auto-approving the repair workflow")
await handle.signal("ApproveRepair", user)
repairs_complete = False
while not repairs_complete:
try:
status = await handle.query("GetRepairStatus")
if status == "REPORT-COMPLETED" or status == "NO-REPAIRS-NEEDED" or status == "WAITING-FOR-NEXT-CYCLE":
repairs_complete = True
break
elif status == "REPAIR-FAILED":
print("Repair failed. Exiting workflow.")
break
print(f"Current repair status: {status}")
except Exception as e:
print(f"Error querying repair status: {e}")
await asyncio.sleep(5) # Wait before checking the status again
try:
repair_report : dict = await handle.query("GetRepairReport")
report_summary = repair_report.get("report_summary", "No summary available")
print(f"*** Repair complete*** \n Summary: {report_summary}")
except Exception as e:
print(f"Error querying repair report: {e}")
repair_report = "No report available yet."
# while the report is waiting for its next cycle, we will monitor the status
while True:
# if the repair
try:
status = await handle.query("GetRepairStatus")
if status == "REPORT-COMPLETED" or status == "WAITING-FOR-NEXT-CYCLE":
print(f"Current repair status: {status}, waiting for a minute before checking again.")
await asyncio.sleep(60) # Wait before checking the status again
continue # Loop back to monitor the next repair planning stage
except Exception as e:
print(f"Error querying repair status: {e}")
if __name__ == "__main__":
asyncio.run(main(args.auto_approve))