-
Notifications
You must be signed in to change notification settings - Fork 37
Expand file tree
/
Copy pathbackground.py
More file actions
271 lines (225 loc) · 9.97 KB
/
background.py
File metadata and controls
271 lines (225 loc) · 9.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
"""
后台任务模块
- 消息监听器 (动态轮询)
- 心跳监控 (自动重连)
- 会话保存
- 文件清理
"""
from __future__ import annotations
import asyncio
import mimetypes
import time
from collections import deque
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from direct_bot import WeChatHelperBot
from processor import CommandProcessor
class BackgroundTasks:
"""后台任务管理器"""
def __init__(
self,
bot: "WeChatHelperBot",
processor: "CommandProcessor",
download_dir: Path,
stability_state: dict[str, Any],
*,
auto_download: bool = True,
file_date_subdir: bool = True,
heartbeat_interval: int = 30,
reconnect_delay: int = 5,
max_reconnect_attempts: int = 10,
file_retention_days: int = 0,
):
self.bot = bot
self.processor = processor
self.download_dir = download_dir
self.stability_state = stability_state
# 配置
self.auto_download = auto_download
self.file_date_subdir = file_date_subdir
self.heartbeat_interval = heartbeat_interval
self.reconnect_delay = reconnect_delay
self.max_reconnect_attempts = max_reconnect_attempts
self.file_retention_days = file_retention_days
# 任务句柄
self._listener_task: asyncio.Task | None = None
self._session_saver_task: asyncio.Task | None = None
self._heartbeat_task: asyncio.Task | None = None
self._cleanup_task: asyncio.Task | None = None
def start_all(self) -> None:
"""启动所有后台任务"""
self._listener_task = asyncio.create_task(self._background_listener())
self._session_saver_task = asyncio.create_task(self._periodic_session_saver())
self._heartbeat_task = asyncio.create_task(self._heartbeat_monitor())
if self.file_retention_days > 0:
self._cleanup_task = asyncio.create_task(self._file_cleanup_task())
async def stop_all(self) -> None:
"""停止所有后台任务"""
tasks = [
self._listener_task,
self._session_saver_task,
self._heartbeat_task,
self._cleanup_task,
]
for task in tasks:
if task:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
def _get_file_save_path(self, file_name: str) -> Path:
"""获取文件保存路径 (支持按日期分目录)"""
if self.file_date_subdir:
date_dir = datetime.now().strftime("%Y-%m-%d")
target_dir = self.download_dir / date_dir
target_dir.mkdir(parents=True, exist_ok=True)
return target_dir / file_name
return self.download_dir / file_name
def _add_error(self, error: str) -> None:
"""记录错误 (保留最近20条)"""
self.stability_state["errors"].append({
"time": datetime.now().isoformat(),
"error": error,
})
if len(self.stability_state["errors"]) > 20:
self.stability_state["errors"] = self.stability_state["errors"][-20:]
async def _background_listener(self) -> None:
"""消息监听器 - 带自动重连和动态轮询间隔"""
# 使用 deque 替代 list,pop(0) 复杂度从 O(n) 变为 O(1)
processed_order: deque[str] = deque(maxlen=5000)
processed_set: set[str] = set()
sent_buffer: deque[str] = deque(maxlen=40)
# 动态轮询间隔
poll_interval = 1.0
min_interval = 0.5
max_interval = 3.0
print("[Listener] Started")
while True:
try:
had_messages = False
if not self.bot.is_logged_in:
await self.bot.check_login_status(poll=True)
if self.bot.is_logged_in:
self.stability_state["reconnect_attempts"] = 0
print("[Listener] Login restored")
if self.bot.is_logged_in:
messages = await self.bot.get_latest_messages(limit=12)
for msg in reversed(messages):
content = str(msg.get("text", "")).strip()
msg_id = str(msg.get("id", "")).strip()
unique_key = msg_id or content
if not unique_key:
continue
if unique_key in processed_set:
continue
processed_set.add(unique_key)
processed_order.append(unique_key)
if content and content in sent_buffer:
continue
had_messages = True
# 自动下载文件
file_feedback = None
if self.auto_download and msg.get("type") in {"image", "file"}:
file_feedback = await self._handle_file_download(msg, msg_id, unique_key, len(processed_order))
# 处理消息
reply = await self.processor.process(msg)
# 合并文件反馈和命令回复
if file_feedback and reply:
reply = f"{file_feedback}\n\n{reply}"
elif file_feedback:
reply = file_feedback
if reply:
ok = await self.bot.send_text(reply)
if ok:
sent_buffer.append(reply)
self.stability_state["last_message_time"] = time.time()
self.stability_state["total_messages"] += 1
# 同步清理 processed_set (deque 满时旧元素被移除)
if len(processed_set) > len(processed_order) + 100:
processed_set = set(processed_order)
# 动态调整轮询间隔
if had_messages:
poll_interval = min_interval
else:
poll_interval = min(poll_interval * 1.2, max_interval)
except Exception as exc:
error_msg = f"Listener error: {exc}"
print(f"[Listener] {error_msg}")
self._add_error(error_msg)
poll_interval = max_interval
await asyncio.sleep(poll_interval)
async def _handle_file_download(
self, msg: dict, msg_id: str, unique_key: str, order_len: int
) -> str | None:
"""处理文件下载,返回反馈消息"""
file_name = msg.get("file_name") or f"download_{msg_id[:8] or order_len}"
if msg.get("type") == "image" and "." not in file_name:
file_name += ".jpg"
save_path = self._get_file_save_path(file_name)
success = await self.bot.download_message_content(msg_id or unique_key, str(save_path))
if success:
# 更新消息中的文件路径
msg["file_path"] = str(save_path)
msg["file_size"] = save_path.stat().st_size if save_path.exists() else 0
# 保存文件元数据
mime_type, _ = mimetypes.guess_type(file_name)
self.processor.message_store.save_file(
msg_id=msg_id,
file_name=file_name,
file_path=str(save_path),
file_size=msg.get("file_size", 0),
mime_type=mime_type,
)
# 返回接收成功反馈
file_type = "图片" if msg.get("type") == "image" else "文件"
return f"已接收{file_type}: {file_name}"
return None
async def _periodic_session_saver(self) -> None:
"""定期保存会话"""
while True:
await asyncio.sleep(60)
try:
if self.bot.is_logged_in:
await self.bot.save_session()
except Exception as exc:
print(f"[SessionSaver] Error: {exc}")
async def _heartbeat_monitor(self) -> None:
"""心跳监控 - 检测掉线并触发重连"""
while True:
await asyncio.sleep(self.heartbeat_interval)
try:
self.stability_state["last_heartbeat"] = time.time()
if self.bot.is_logged_in:
# 检查连接状态
status = await self.bot._synccheck()
if status == "loginout":
print("[Heartbeat] Detected logout, will reconnect")
self.bot.is_logged_in = False
self.stability_state["reconnect_attempts"] += 1
if self.stability_state["reconnect_attempts"] <= self.max_reconnect_attempts:
await asyncio.sleep(self.reconnect_delay)
# 尝试使用已保存的会话重新登录
await self.bot._load_session()
await self.bot.check_login_status(poll=True)
else:
self._add_error(
f"Max reconnect attempts ({self.max_reconnect_attempts}) reached"
)
except Exception as exc:
self._add_error(f"Heartbeat error: {exc}")
async def _file_cleanup_task(self) -> None:
"""定期清理过期文件"""
while True:
await asyncio.sleep(3600) # 每小时检查一次
try:
deleted_count = self.processor.message_store.cleanup_old_files(
days=self.file_retention_days,
delete_files=True,
)
if deleted_count > 0:
print(f"[Cleanup] Deleted {deleted_count} old files")
except Exception as exc:
self._add_error(f"Cleanup error: {exc}")