-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathactivities.py
More file actions
108 lines (89 loc) · 3.14 KB
/
activities.py
File metadata and controls
108 lines (89 loc) · 3.14 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
"""Activities for the expense audit workflow.
Branch 1: no external storage. fetch_transactions returns the whole list and
review_recurring_payments takes the whole list. Both fail for AI-heavy
employees whose transaction arrays exceed the 2MB payload limit.
"""
import json
from collections import defaultdict
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from temporalio import activity
EXPENSES_PATH = Path(__file__).parent / "expenses.json"
# Tunables for review rules
RECURRING_MIN_OCCURRENCES = 3
LARGE_PAYMENT_THRESHOLD = 1000.0
@dataclass
class RecurringFinding:
merchant_id: str
merchant_name: str
occurrence_count: int
total_amount: float
flagged_for_review: bool
@dataclass
class LargePaymentFinding:
transaction_id: str
merchant_name: str
amount: float
category: str
description: str
@dataclass
class AuditResult:
employee_id: str
employee_name: str
total_transactions: int
recurring: list[RecurringFinding]
large_payments: list[LargePaymentFinding]
@activity.defn
async def fetch_transactions(employee_id: str) -> list[dict[str, Any]]:
"""Load an employee's transaction history from the expenses store."""
activity.logger.info(f"Fetching transactions for {employee_id}")
with EXPENSES_PATH.open() as f:
data = json.load(f)
for emp in data["employees"]:
if emp["employee_id"] == employee_id:
activity.logger.info(
f"{employee_id}: {len(emp['transactions'])} transactions"
)
return emp["transactions"]
raise ValueError(f"Unknown employee: {employee_id}")
@activity.defn
async def review_recurring_payments(
transactions: list[dict[str, Any]],
) -> list[RecurringFinding]:
"""Group by merchant_id; flag merchants charging >=N times for human review."""
groups: dict[str, list[dict[str, Any]]] = defaultdict(list)
for t in transactions:
groups[t["merchant_id"]].append(t)
findings: list[RecurringFinding] = []
for merchant_id, txns in groups.items():
if len(txns) < RECURRING_MIN_OCCURRENCES:
continue
findings.append(
RecurringFinding(
merchant_id=merchant_id,
merchant_name=txns[0]["merchant_name"],
occurrence_count=len(txns),
total_amount=round(sum(t["amount"] for t in txns), 2),
flagged_for_review=True,
)
)
findings.sort(key=lambda f: f.total_amount, reverse=True)
activity.logger.info(f"Recurring review: flagged {len(findings)} merchants")
return findings
@activity.defn
async def review_large_payments(
transactions: list[dict[str, Any]],
) -> list[LargePaymentFinding]:
"""Review a pre-filtered list of large payments."""
activity.logger.info(f"Large-payment review: {len(transactions)} transactions")
return [
LargePaymentFinding(
transaction_id=t["transaction_id"],
merchant_name=t["merchant_name"],
amount=t["amount"],
category=t["category"],
description=t["description"],
)
for t in transactions
]