-
Notifications
You must be signed in to change notification settings - Fork 135
Expand file tree
/
Copy pathai.py
More file actions
198 lines (174 loc) · 6.17 KB
/
ai.py
File metadata and controls
198 lines (174 loc) · 6.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
import json
from urllib import request
from sqlalchemy import extract, func
from ..config import Settings
from ..extensions import db
from ..models import Expense
_settings = Settings()
DEFAULT_PERSONA = (
"You are FinMind's pragmatic financial coach. Be concise, non-judgmental, "
"data-driven, and action-oriented. Return actionable, realistic guidance."
)
def _monthly_totals(uid: int, ym: str) -> tuple[float, float]:
year, month = map(int, ym.split("-"))
income = (
db.session.query(func.coalesce(func.sum(Expense.amount), 0))
.filter(
Expense.user_id == uid,
extract("year", Expense.spent_at) == year,
extract("month", Expense.spent_at) == month,
Expense.expense_type == "INCOME",
)
.scalar()
)
expenses = (
db.session.query(func.coalesce(func.sum(Expense.amount), 0))
.filter(
Expense.user_id == uid,
extract("year", Expense.spent_at) == year,
extract("month", Expense.spent_at) == month,
Expense.expense_type != "INCOME",
)
.scalar()
)
return float(income or 0), float(expenses or 0)
def _category_spend(uid: int, ym: str) -> dict[str, float]:
year, month = map(int, ym.split("-"))
rows = (
db.session.query(
Expense.category_id, func.coalesce(func.sum(Expense.amount), 0)
)
.filter(
Expense.user_id == uid,
extract("year", Expense.spent_at) == year,
extract("month", Expense.spent_at) == month,
Expense.expense_type != "INCOME",
)
.group_by(Expense.category_id)
.all()
)
return {str(k or "uncat"): float(v) for k, v in rows}
def _previous_month(ym: str) -> str:
year, month = map(int, ym.split("-"))
if month == 1:
return f"{year - 1:04d}-12"
return f"{year:04d}-{month - 1:02d}"
def _build_analytics(uid: int, ym: str) -> dict:
_, current_expenses = _monthly_totals(uid, ym)
_, prev_expenses = _monthly_totals(uid, _previous_month(ym))
if prev_expenses > 0:
mom = round(((current_expenses - prev_expenses) / prev_expenses) * 100, 2)
else:
mom = 0.0
cats = _category_spend(uid, ym)
top = sorted(cats.items(), key=lambda x: x[1], reverse=True)[:3]
return {
"month_over_month_change_pct": mom,
"current_month_expenses": round(current_expenses, 2),
"previous_month_expenses": round(prev_expenses, 2),
"top_categories": [{"category_id": k, "amount": round(v, 2)} for k, v in top],
}
def _heuristic_budget(
uid: int, ym: str, persona: str, warnings: list[str] | None = None
):
income, expenses = _monthly_totals(uid, ym)
target = round((expenses * 0.9) if expenses else 500.0, 2)
payload = {
"month": ym,
"suggested_total": target,
"breakdown": {
"needs": round(target * 0.5, 2),
"wants": round(target * 0.3, 2),
"savings": round(target * 0.2, 2),
},
"tips": [
"Cap discretionary spending in the highest category by 10%.",
"Set one automatic transfer to savings on payday.",
],
"analytics": _build_analytics(uid, ym),
"persona": persona,
"method": "heuristic",
}
if warnings:
payload["warnings"] = warnings
payload["net_flow"] = round(income - expenses, 2)
return payload
def _extract_json_object(raw: str) -> dict:
"""
[NÂNG CẤP V101.3] Trích xuất JSON thông minh, chống nhiễu văn bản thừa.
Học hỏi từ kinh nghiệm xử lý lỗi AI của Quân sư LinhChu.
"""
import re
text = (raw or "").strip()
# 1. Loại bỏ các khối Markdown Code Block
text = re.sub(r'```(?:json)?\s*([\s\S]*?)\s*```', r'\1', text)
# 2. Tìm khối ngoặc nhọn { ... } xa nhất
match = re.search(r'(\{[\s\S]*\})', text)
if not match:
raise ValueError("AI Engine did not return a valid JSON object")
clean_json = match.group(1)
try:
return json.loads(clean_json)
except json.JSONDecodeError as e:
raise ValueError(f"AI JSON Parsing failed: {str(e)}")
def _gemini_budget_suggestion(
uid: int, ym: str, api_key: str, model: str, persona: str
) -> dict:
categories = _category_spend(uid, ym)
analytics = _build_analytics(uid, ym)
prompt = (
f"{persona}\n"
"Use this month data and return strict JSON only with keys: "
"suggested_total, breakdown(needs,wants,savings), tips(list <=3).\n"
f"month={ym}\n"
f"category_spend={categories}\n"
f"analytics={analytics}"
)
url = (
"https://generativelanguage.googleapis.com/v1beta/models/"
f"{model}:generateContent?key={api_key}"
)
body = json.dumps(
{
"contents": [{"parts": [{"text": prompt}]}],
"generationConfig": {"temperature": 0.2},
}
).encode("utf-8")
req = request.Request(
url=url,
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
with request.urlopen(req, timeout=10) as resp: # nosec B310
payload = json.loads(resp.read().decode("utf-8"))
text = (
payload.get("candidates", [{}])[0]
.get("content", {})
.get("parts", [{}])[0]
.get("text", "")
)
parsed = _extract_json_object(text)
parsed["month"] = ym
parsed["analytics"] = analytics
parsed["persona"] = persona
parsed["method"] = "gemini"
return parsed
def monthly_budget_suggestion(
uid: int,
ym: str,
gemini_api_key: str | None = None,
gemini_model: str | None = None,
persona: str | None = None,
):
key = (gemini_api_key or "").strip() or (_settings.gemini_api_key or "")
model = gemini_model or _settings.gemini_model
persona_text = (persona or DEFAULT_PERSONA).strip()
if key:
try:
return _gemini_budget_suggestion(uid, ym, key, model, persona_text)
except Exception:
return _heuristic_budget(
uid, ym, persona_text, warnings=["gemini_unavailable"]
)
return _heuristic_budget(uid, ym, persona_text)