-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathschedule_repair_agent.py
More file actions
127 lines (108 loc) · 4.09 KB
/
schedule_repair_agent.py
File metadata and controls
127 lines (108 loc) · 4.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import asyncio
from datetime import timedelta
import os
from shared.config import TEMPORAL_TASK_QUEUE, get_temporal_client
from workflows import RepairAgentWorkflow
from temporalio.client import (
Client,
Schedule,
ScheduleActionStartWorkflow,
ScheduleIntervalSpec,
ScheduleSpec,
ScheduleState,
ScheduleAsyncIterator,
)
from workflows import RepairAgentWorkflow
from dotenv import load_dotenv
import argparse
from temporalio.client import WorkflowExecutionAsyncIterator
SCHEDULE_ID = "Repair-Agent-Daily-Schedule"
parser = argparse.ArgumentParser(description="Schedule a repair agent workflow.")
parser.add_argument(
"--operation",
type=str,
default="create",
help="Optional operations to perform related to schedules.",
)
args = parser.parse_args()
async def main(operation: str) -> None:
"""Schedule the repair agent workflow to run daily."""
# Load environment variables
load_dotenv(override=True)
user = os.environ.get("USER_NAME", "Harry.Potter")
print(f"Using user: {user}")
# Create client connected to server at the given address
client: Client = await get_temporal_client()
workflow_id = f"scheduled-repair-{user}"
if operation == "list":
await list_schedules(client)
elif operation == "delete":
await delete_schedule(client, SCHEDULE_ID)
elif operation == "describe":
await describe_schedule(client, SCHEDULE_ID)
elif operation == "trigger":
await trigger_schedule(user, client)
elif operation == "upsert":
await upsert_schedule(user, client, workflow_id, SCHEDULE_ID)
elif operation == "create":
await create_schedule(user, client, workflow_id, SCHEDULE_ID)
else:
await create_schedule(user, client, workflow_id, SCHEDULE_ID) # for now create is the default operation
async def trigger_schedule(user, client):
handle = client.get_schedule_handle(SCHEDULE_ID)
await handle.trigger()
print(f"Triggered schedule {SCHEDULE_ID} for user {user}.")
async def upsert_schedule(user, client, workflow_id, schedule_id=SCHEDULE_ID):
try:
handle = client.get_schedule_handle(schedule_id)
await handle.describe()
print(f"Schedule {schedule_id} already exists.")
# If the schedule exists, we can update it, for now i leave as is
except Exception as e:
print(f"Schedule {schedule_id} does not exist. Creating a new one.")
await create_schedule(user, client, workflow_id, schedule_id)
async def create_schedule(user, client, workflow_id, schedule_id=SCHEDULE_ID):
start_msg = {
"prompt": "Analyze and repair the orders in the order system.",
"metadata": {
"user": user,
"system": "temporal-repair-agent",
},
}
await client.create_schedule(
schedule_id,
Schedule(
action=ScheduleActionStartWorkflow(
RepairAgentWorkflow.run,
start_msg,
id=workflow_id,
task_queue=TEMPORAL_TASK_QUEUE,
),
spec=ScheduleSpec(
intervals=[ScheduleIntervalSpec(every=timedelta(days=1))]
),
state=ScheduleState(note="Daily repair agent workflow scheduled by user " + user),
),
)
print(f"Schedule created with ID: {schedule_id} for user {user}.")
async def list_schedules(client: Client):
"""List all schedules in our namespace."""
await client.list_schedules()
async for schedule in await client.list_schedules():
print(f"List Schedule Info: {schedule.info}.")
async def delete_schedule(client: Client, schedule_id: str):
"""Delete a schedule by ID."""
handle = client.get_schedule_handle(
schedule_id,
)
await handle.delete()
print(f"Schedule {schedule_id} deleted.")
async def describe_schedule(client: Client, schedule_id: str):
"""Describe a schedule by ID."""
handle = client.get_schedule_handle(
schedule_id
)
desc = await handle.describe()
print(f"Schedule Description: {desc.info}.")
if __name__ == "__main__":
asyncio.run(main(args.operation))