-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathworkflows.py
More file actions
78 lines (66 loc) · 2.48 KB
/
workflows.py
File metadata and controls
78 lines (66 loc) · 2.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
"""ExpenseAuditWorkflow — orchestrates the three review activities.
Branch 1: plain Temporal payloads. For employees with >2MB transaction lists
the activity can't record its result, so it keeps retrying on a fixed 7s
interval forever. The workflow stays running (not failed) and will recover
automatically once external storage is wired up.
"""
import asyncio
from datetime import timedelta
from temporalio import workflow
from temporalio.common import RetryPolicy
with workflow.unsafe.imports_passed_through():
from activities import (
AuditResult,
LargePaymentFinding,
RecurringFinding,
fetch_transactions,
review_large_payments,
review_recurring_payments,
)
LARGE_PAYMENT_THRESHOLD = 1000.0
# Demo retry policy: fixed 7s interval, unlimited attempts. Workflows aren't
# designed to fail — they wait for whatever's broken to get fixed. Fixed
# interval (backoff_coefficient=1.0) keeps demo timing predictable.
ACTIVITY_OPTS = dict(
start_to_close_timeout=timedelta(seconds=30),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=7),
backoff_coefficient=1.0,
),
)
@workflow.defn
class ExpenseAuditWorkflow:
@workflow.run
async def run(self, employee_id: str) -> AuditResult:
workflow.logger.info(f"Starting audit for {employee_id}")
transactions = await workflow.execute_activity(
fetch_transactions,
employee_id,
**ACTIVITY_OPTS,
)
# Workflow filters large payments before passing to its reviewer.
large_only = [
t for t in transactions if t["amount"] >= LARGE_PAYMENT_THRESHOLD
]
workflow.logger.info(
f"{employee_id}: {len(transactions)} total, {len(large_only)} large"
)
# Run recurring (full list) and large (filtered) reviews in parallel.
recurring_task = workflow.execute_activity(
review_recurring_payments,
transactions,
**ACTIVITY_OPTS,
)
large_task = workflow.execute_activity(
review_large_payments,
large_only,
**ACTIVITY_OPTS,
)
recurring, large = await asyncio.gather(recurring_task, large_task)
return AuditResult(
employee_id=employee_id,
employee_name="", # worker doesn't know; starter can join later
total_transactions=len(transactions),
recurring=recurring,
large_payments=large,
)