-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathai_agent.py
More file actions
executable file
·265 lines (241 loc) · 12.4 KB
/
ai_agent.py
File metadata and controls
executable file
·265 lines (241 loc) · 12.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
import requests
import json
import random
import re
import unicodedata
import os
class AIAgent:
GEMINI_API_KEYS = []
MIMO_KEY = ""
CUSTOM_GEMINI_KEY = "sk-demo"
@classmethod
def load_api_keys(cls):
# Default keys from environment variables
cls.GEMINI_API_KEYS = os.getenv("GEMINI_API_KEYS", "").split(",")
cls.GEMINI_API_KEYS = [k.strip() for k in cls.GEMINI_API_KEYS if k.strip()]
cls.MIMO_KEY = os.getenv("MIMO_KEY", "")
cls.CUSTOM_GEMINI_KEY = os.getenv("CUSTOM_GEMINI_KEY", "sk-demo")
# Load from config.json if keys are still empty
config_paths = ['config.json', '/etc/aiagent/config.json']
for path in config_paths:
if os.path.exists(path):
try:
with open(path, 'r') as f:
config = json.load(f)
if not cls.GEMINI_API_KEYS:
keys = config.get("gemini_keys", [])
cls.GEMINI_API_KEYS = [k for k in keys if "YOUR_" not in k]
if not cls.MIMO_KEY:
key = config.get("mimo_key", "")
if "YOUR_" not in key:
cls.MIMO_KEY = key
if "custom_gemini_key" in config:
cls.CUSTOM_GEMINI_KEY = config.get("custom_gemini_key")
except: pass
AUTHORS = [
{"id": 1, "name": "Nguyên Minh Tuân", "job": "Chuyên gia công nghệ"},
{"id": 2, "name": "Trân Quôc Huy", "job": "Nhà báo"},
{"id": 3, "name": "Lê Hoàng Anh", "job": "Chuyên gia marketing"},
{"id": 4, "name": "Phạm Ngọc Lan", "job": "Giảng viên"},
{"id": 5, "name": "Võ Thanh Long", "job": "Kỹ sư hệ thông"},
{"id": 6, "name": "Nguyên Đức Anh", "job": "Chuyên gia tài chính"},
{"id": 7, "name": "Hoàng Tuân Kiệt", "job": "Nhiêp ảnh gia"},
{"id": 8, "name": "Phan Minh Quân", "job": "Biên tâp viên"},
{"id": 9, "name": "Bùi Thị Lan", "job": "Blogger"},
{"id": 10, "name": "Đặng Thu Hà", "job": "Chuyên gia sức khỏe"},
{"id": 11, "name": "Lý Quôc Khánh", "job": "Chuyên gia âm thanh"},
{"id": 12, "name": "Trân Minh Phúc", "job": "Chuyên gia dữ liệu"},
{"id": 13, "name": "Nguyên Thị Yên", "job": "Nhà nghiên cứu"},
{"id": 14, "name": "Võ Quôc Nam", "job": "Chuyên gia logistics"},
{"id": 15, "name": "Hoàng Gia Bảo", "job": "Doanh nhân"},
{"id": 16, "name": "Trịnh Minh Khoa", "job": "Chuyên gia pháp lý"},
{"id": 17, "name": "Lâm Thanh Tùng", "job": "Chuyên gia CNTT"},
{"id": 18, "name": "Phạm Thúy An", "job": "Chuyên gia nhân sự"},
{"id": 19, "name": "Nguyên Quôc Bảo", "job": "Nhà phân tích"},
{"id": 20, "name": "Đỗ Thanh Bình", "job": "Cô vân chiên lược"},
{"id": 21, "name": "Phan Hoàng Long", "job": "Chuyên gia AI"},
{"id": 22, "name": "Nguyên Hữu Phúc", "job": "Kỹ sư phân mêm"},
{"id": 23, "name": "Lê Thị Mai", "job": "Chuyên gia giáo dục"},
{"id": 24, "name": "Trương Quôc Khánh", "job": "Chuyên gia thương mại"},
{"id": 25, "name": "Nguyên Văn Hùng", "job": "Chuyên gia xây dựng"},
{"id": 26, "name": "Hoàng Thị Ngọc", "job": "Chuyên gia thời trang"},
{"id": 27, "name": "Phạm Quôc Dũng", "job": "Chuyên gia an ninh"},
{"id": 28, "name": "Lý Minh Châu", "job": "Chuyên gia UX/UI"},
{"id": 29, "name": "Trân Văn Phát", "job": "Chuyên gia đâu tư"},
{"id": 30, "name": "Nguyên Thu Hương", "job": "Chuyên gia truyên thông"},
{"id": 31, "name": "Đặng Quôc Việt", "job": "Chuyên gia blockchain"},
{"id": 32, "name": "Phạm Thanh Tùng", "job": "Chuyên gia thương mại điên tử"},
{"id": 33, "name": "Hoàng Đức Long", "job": "Chuyên gia SEO"},
{"id": 34, "name": "Nguyên Bảo Trân", "job": "Chuyên gia nôi dung"},
{"id": 35, "name": "Trịnh Văn An", "job": "Chuyên gia đào tạo"},
{"id": 36, "name": "Lê Minh Đức", "job": "Chuyên gia sản xuât"},
{"id": 37, "name": "Phan Thị Kim", "job": "Chuyên gia bán hàng"},
{"id": 38, "name": "Nguyên Quang Vinh", "job": "Chuyên gia startup"},
{"id": 39, "name": "Đỗ Mỹ Linh", "job": "Chuyên gia thương hiêu"},
{"id": 40, "name": "Trân Thanh Hải", "job": "Chuyên gia vân hành"},
{"id": 41, "name": "Nguyên Nhật Minh", "job": "Chuyên gia phân tích"},
{"id": 42, "name": "Lê Quôc Thịnh", "job": "Chuyên gia xuât nhâp khâu"},
{"id": 43, "name": "Phạm Anh Khoa", "job": "Chuyên gia đào tạo online"},
{"id": 44, "name": "Võ Minh Tâm", "job": "Chuyên gia quản lý dự án"},
{"id": 45, "name": "Nguyên Hồng Phúc", "job": "Chuyên gia tư vân"},
{"id": 46, "name": "Lê Thanh Bình", "job": "Chuyên gia nghiên cứu thị trường"},
]
@staticmethod
def strip_blog_tags(text):
start_tag = "[startblog]"
end_tag = "[endblog]"
start_index = text.find(start_tag)
end_index = text.find(end_tag)
if start_index != -1 and end_index != -1:
content = text[start_index + len(start_tag):end_index]
return content.strip()
return text.replace(start_tag, "").replace(end_tag, "").strip()
@staticmethod
def slugify(text):
if not text:
return ""
text = unicodedata.normalize('NFD', str(text))
text = text.encode('ascii', 'ignore').decode('utf-8')
text = text.lower()
text = re.sub(r'\s+', '-', text)
text = re.sub(r'[^\w\-]+', '', text)
text = re.sub(r'\-\-+', '-', text)
text = text.strip('-')
return text
@classmethod
def select_author(cls, title):
authors_text = "\n".join([f"{a['id']}: {a['name']} ({a['job']})" for a in cls.AUTHORS])
prompt = (
f"Dựa vào tiêu đề bài viết sau, hãy chọn ra ID của tác giả phù hợp nhất trong danh sách bên dưới.\n"
f"Tiêu đề: {title}\n\n"
f"Danh sách tác giả:\n{authors_text}\n\n"
f"Chỉ trả về duy nhất con số ID của tác giả, không giải thích gì thêm."
)
res = cls.call_ai_agent(prompt, "Bạn là một biên tập viên chuyên nghiệp.")
if res:
match = re.search(r'\d+', res)
if match:
selected_id = int(match.group())
if any(a['id'] == selected_id for a in cls.AUTHORS):
return selected_id
return 1
@classmethod
def call_ai_agent(cls, prompt, system_prompt=None, temperature=0.7, max_tokens=8000, model=None):
# Refresh keys if empty
if not cls.GEMINI_API_KEYS and not cls.MIMO_KEY:
cls.load_api_keys()
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
messages.append({"role": "user", "content": prompt})
# List of available models and their functions
# This allows us to call a specific model if requested
def call_gemini():
api_key = random.choice(cls.GEMINI_API_KEYS)
print(f" [AI] Đang gọi Gemini (API Key kết thúc bằng ...{api_key[-4:]})...")
url = f"https://generativelanguage.googleapis.com/v1beta/models/gemini-2.5-flash:generateContent?key={api_key}"
gemini_prompt = (system_prompt + "\n\n" + prompt) if system_prompt else prompt
payload = {
"contents": [{"role": "user", "parts": [{"text": gemini_prompt}]}],
"generationConfig": {"temperature": temperature, "maxOutputTokens": max_tokens}
}
try:
response = requests.post(url, headers={"Content-Type": "application/json"}, json=payload, timeout=60)
res_json = response.json()
if 'candidates' in res_json and res_json['candidates']:
return cls.strip_blog_tags(res_json['candidates'][0]['content']['parts'][0]['text'])
except: pass
return None
def call_custom_gemini():
print(" [AI] Đang gọi Custom Gemini (aiautotool.com)...")
url = "https://gemini.aiautotool.com/v1/chat/completions"
payload = {
"model": "gemini-2.5-flash",
"messages": messages,
"stream": False, "temperature": temperature, "max_tokens": max_tokens
}
try:
response = requests.post(url, headers={"Content-Type": "application/json", "Authorization": f"Bearer {cls.CUSTOM_GEMINI_KEY}"}, json=payload, timeout=60)
res_json = response.json()
if 'choices' in res_json and len(res_json['choices']) > 0:
return cls.strip_blog_tags(res_json['choices'][0]['message']['content'])
except: pass
return None
def call_pollinations():
print(" [AI] Đang gọi Pollinations AI...")
url = "https://text.pollinations.ai/openai"
payload = {
"model": "openai",
"messages": messages,
"temperature": 1.0,
"max_tokens": max_tokens, "stream": False
}
try:
response = requests.post(url, headers={"Content-Type": "application/json"}, json=payload, timeout=90)
res_json = response.json()
if 'choices' in res_json and len(res_json['choices']) > 0:
return cls.strip_blog_tags(res_json['choices'][0]['message']['content'])
except: pass
return None
def call_mimo():
print(" [AI] Đang gọi Xiaomi MiMo (mimo-v2-flash)...")
url = "https://api.xiaomimimo.com/v1/chat/completions"
headers = {
"Authorization": f"Bearer {cls.MIMO_KEY or 'sk-demo'}",
"Content-Type": "application/json"
}
payload = {
"model": "mimo-v2-flash",
"messages": messages,
"max_completion_tokens": 1024,
"temperature": 0.3,
"top_p": 0.95,
"stream": False,
"stop": None,
"frequency_penalty": 0,
"presence_penalty": 0,
"thinking": {
"type": "disabled"
}
}
try:
response = requests.post(url, headers=headers, json=payload, timeout=90)
res_json = response.json()
if 'choices' in res_json and len(res_json['choices']) > 0:
return cls.strip_blog_tags(res_json['choices'][0]['message']['content'])
except: pass
return None
def call_llm7():
print(" [AI] Đang gọi LLM7.io...")
try:
import openai
client = openai.OpenAI(base_url="https://api.llm7.io/v1", api_key="unused")
response = client.chat.completions.create(
model="default",
messages=messages,
temperature=temperature,
max_tokens=max_tokens
)
return cls.strip_blog_tags(response.choices[0].message.content)
except: pass
return None
model_map = {
"gemini": call_gemini,
"custom-gemini": call_custom_gemini,
"pollinations": call_pollinations,
"mimo": call_mimo,
"llm7": call_llm7
}
# If a specific model is requested, try it first
if model and model.lower() in model_map:
result = model_map[model.lower()]()
if result: return result
# Fallback sequence (default behavior)
sequence = [call_gemini, call_custom_gemini, call_pollinations, call_mimo, call_llm7]
for func in sequence:
# Skip if we already tried it via 'model' param to avoid double call if it failed
if model and model.lower() in model_map and func == model_map[model.lower()]:
continue
res = func()
if res: return res
return None