-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstarter.py
More file actions
88 lines (68 loc) · 2.96 KB
/
starter.py
File metadata and controls
88 lines (68 loc) · 2.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
"""Launch an ExpenseAuditWorkflow for every employee in expenses.json.
Starts all 20 in parallel, waits up to WAIT_SECONDS for each to finish, then
prints a summary. On this branch (no external storage) the two AI-heavy
employees time out of the wait — their audits are stuck retrying with a
blob-size error visible in the UI. The workflows stay alive; a later branch
will fix the underlying issue and watch them unstick.
"""
import asyncio
import json
import time
from pathlib import Path
from temporalio.client import Client
from workflows import ExpenseAuditWorkflow
TASK_QUEUE = "expense-audit"
TARGET = "localhost:7233"
NAMESPACE = "default"
EXPENSES_PATH = Path(__file__).parent / "expenses.json"
# A little longer than one retry interval (7s), so a single transient hiccup
# still gets counted as OK rather than STUCK.
WAIT_SECONDS = 20
def load_employees() -> list[tuple[str, str, str]]:
with EXPENSES_PATH.open() as f:
data = json.load(f)
return [(e["employee_id"], e["name"], e["role"]) for e in data["employees"]]
async def main() -> None:
client = await Client.connect(TARGET, namespace=NAMESPACE)
employees = load_employees()
run_id = int(time.time())
print(f"Launching {len(employees)} audits on {TASK_QUEUE}...\n")
handles = []
for emp_id, name, role in employees:
handle = await client.start_workflow(
ExpenseAuditWorkflow.run,
emp_id,
id=f"audit-{emp_id}-{run_id}",
task_queue=TASK_QUEUE,
)
handles.append((emp_id, name, role, handle))
print(f"Waiting up to {WAIT_SECONDS}s for completion...\n")
async def wait_one(emp_id, name, role, handle):
try:
await asyncio.wait_for(handle.result(), timeout=WAIT_SECONDS)
return (emp_id, name, role, "OK", None)
except asyncio.TimeoutError:
return (emp_id, name, role, "STUCK", None)
except Exception as e:
return (emp_id, name, role, "FAILED", str(e))
results = await asyncio.gather(*(wait_one(*h) for h in handles))
ok = [r for r in results if r[3] == "OK"]
stuck = [r for r in results if r[3] == "STUCK"]
failed = [r for r in results if r[3] == "FAILED"]
print("=== Summary ===")
print(f"OK: {len(ok)}/{len(results)}")
print(f"STUCK: {len(stuck)}/{len(results)} (still retrying - inspect in UI)")
if failed:
print(f"FAILED: {len(failed)}/{len(results)}")
if stuck:
print("\nStuck workflows (still retrying every 7s):")
for emp_id, name, role, *_ in stuck:
print(f" {emp_id:8s} {name:22s} {role}")
if failed:
print("\nFailed workflows:")
for emp_id, name, role, _, err in failed:
first_line = (err or "").splitlines()[0] if err else ""
print(f" {emp_id:8s} {name:22s} {role:16s} {first_line[:90]}")
print(f"\nInspect runs: http://localhost:8233/namespaces/{NAMESPACE}/workflows")
if __name__ == "__main__":
asyncio.run(main())