Skip to content

Commit 05836f6

Browse files
committed
fix: 定时任务超时问题优化
Fixes #141 定时分析任务间隔启动,放宽超时配置限制
1 parent 8ba5f69 commit 05836f6

6 files changed

Lines changed: 59 additions & 28 deletions

File tree

_conf_schema.json

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -507,14 +507,20 @@
507507
"max_concurrent_llm": {
508508
"type": "int",
509509
"description": "最大 LLM 请求并发数(API 闸口)",
510-
"default": 2,
510+
"default": 3,
511511
"hint": "限制同时发起的 AI 分析请求数量。主要用于【遵守 API 频率限制(RPM)】,避免因为请求过快导致 API 被封号或报错。"
512512
},
513513
"max_concurrent_t2i": {
514514
"type": "int",
515515
"description": "最大 T2I渲染并发数",
516516
"default": 1,
517517
"hint": "限制同时开启的 T2I 渲染进程数。"
518+
},
519+
"stagger_seconds": {
520+
"type": "int",
521+
"description": "多群分析交错间隔(秒)",
522+
"default": 10,
523+
"hint": "当同时启动多个群组的定时分析任务时,每个任务之间的延迟间隔(秒),建议根据实际的 LLM 服务速度填写,过短可能导致请求堆积,过长则分析效率降低。"
518524
}
519525
}
520526
}

main.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
from .src.shared.trace_context import TraceContext, TraceLogFilter
5050
from .src.utils.logger import logger
5151
from .src.utils.pdf_utils import PDFInstaller
52+
from .src.utils.resilience import GlobalRateLimiter
5253

5354

5455
class GroupDailyAnalysis(Star):
@@ -148,6 +149,9 @@ def __init__(self, context: Context, config: AstrBotConfig):
148149
plugin_instance=self,
149150
)
150151

152+
# 同步全局限流并进行初始化配置
153+
GlobalRateLimiter.get_instance(self.config_manager.get_llm_max_concurrent())
154+
151155
self._initialized = False
152156
self._terminating = False # 生命周期标志
153157
self._init_lock = asyncio.Lock()

src/infrastructure/analysis/utils/llm_utils.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import asyncio
77

88
from ....utils.logger import logger
9-
from ....utils.resilience import CircuitBreaker, global_llm_rate_limiter
9+
from ....utils.resilience import CircuitBreaker, GlobalRateLimiter
1010
from .structured_output_schema import JSONObject
1111

1212
_circuit_breakers = {}
@@ -273,7 +273,7 @@ async def call_provider_with_retry(
273273
# 使用全局限流器 + 熔断器记录
274274
# 超时由 Provider 内部控制,无需外层 wait_for
275275
try:
276-
async with global_llm_rate_limiter:
276+
async with GlobalRateLimiter.get_instance().semaphore:
277277
llm_kwargs: dict[str, object] = {
278278
"chat_provider_id": provider_id,
279279
"prompt": prompt,

src/infrastructure/config/config_manager.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -387,6 +387,10 @@ def get_t2i_max_concurrent(self) -> int:
387387
"""获取全局图片渲染(T2I)最大并发数"""
388388
return self._get_group("performance").get("max_concurrent_t2i", 1)
389389

390+
def get_stagger_seconds(self) -> int:
391+
"""获取多群分析任务启动时的交错间隔(秒)"""
392+
return self._get_group("performance").get("stagger_seconds", 2)
393+
390394
def set_max_concurrent_tasks(self, count: int):
391395
"""设置自动分析最大并发数"""
392396
self._ensure_group("performance")["max_concurrent_groups"] = count

src/infrastructure/scheduler/auto_scheduler.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -376,10 +376,17 @@ async def dispatch_group(gid, pid, mode):
376376
)
377377

378378
tasks = []
379-
for gid, pid, mode in all_targets:
379+
stagger = self.config_manager.get_stagger_seconds() or 2
380+
# 针对定时大任务加入交错等待,减少瞬间峰值延迟
381+
for idx, (gid, pid, mode) in enumerate(all_targets):
380382
if self._terminating:
381383
logger.info("检测到插件正在停止,取消后续任务创建")
382384
break
385+
386+
# 为前几个任务添加微小的启动间隔,均匀分散 API 压力
387+
if idx > 0 and stagger > 0:
388+
await asyncio.sleep(stagger)
389+
383390
task = asyncio.create_task(
384391
dispatch_group(gid, pid, mode),
385392
name=f"report_{mode}_{gid}",
@@ -418,13 +425,13 @@ async def _perform_auto_analysis_for_group_with_timeout(
418425
):
419426
"""为指定群执行自动分析(带超时控制)"""
420427
try:
421-
# 为每个群聊设置独立的超时时间(20分钟)
428+
# 为每个群聊设置独立的超时时间,适当放宽到 30 分钟以支持大型批次
422429
await asyncio.wait_for(
423430
self._perform_auto_analysis_for_group(group_id, target_platform_id),
424-
timeout=1200,
431+
timeout=1800,
425432
)
426433
except asyncio.TimeoutError:
427-
logger.error(f"群 {group_id} 分析超时(20分钟),跳过该群分析")
434+
logger.error(f"群 {group_id} 分析超时(30分钟),跳过该群分析")
428435
except Exception as e:
429436
logger.error(f"群 {group_id} 分析任务执行失败: {e}")
430437

@@ -664,7 +671,7 @@ async def _perform_incremental_final_report_for_group_with_timeout(
664671
self._perform_incremental_final_report_for_group(
665672
group_id, target_platform_id
666673
),
667-
timeout=1200,
674+
timeout=1800,
668675
)
669676

670677
# 判定是否需要触发回退 (例如:无增量数据等)
@@ -684,7 +691,7 @@ async def _perform_incremental_final_report_for_group_with_timeout(
684691
return result
685692

686693
except asyncio.TimeoutError:
687-
logger.error(f"群 {group_id} 最终报告超时(20分钟)")
694+
logger.error(f"群 {group_id} 最终报告超时(30分钟)")
688695
if self.config_manager.get_incremental_fallback_enabled():
689696
logger.warning(f"群 {group_id} 增量报告超时,正在回退到传统全量分析...")
690697
return await self._fallback_to_traditional(group_id, target_platform_id)

src/utils/resilience.py

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -111,40 +111,50 @@ class GlobalRateLimiter:
111111
_instance: "GlobalRateLimiter | None" = None
112112
_semaphore: asyncio.Semaphore | None = None
113113

114+
def __new__(cls):
115+
if cls._instance is None:
116+
cls._instance = super().__new__(cls)
117+
return cls._instance
118+
114119
@classmethod
115-
def get_instance(cls, max_concurrency: int = 3) -> "GlobalRateLimiter":
120+
def get_instance(cls, max_concurrency: int | None = None) -> "GlobalRateLimiter":
116121
"""
117122
获取或创建限流器单例。
118123
119124
Args:
120-
max_concurrency (int): 允许的最大并发行数
125+
max_concurrency (int, optional): 允许的最大并发数。如果提供且与当前不同,则重置信号量。
121126
122127
Returns:
123128
GlobalRateLimiter: 唯一实例
124129
"""
125-
if cls._instance is None:
126-
cls._instance = cls()
127-
cls._semaphore = asyncio.Semaphore(max_concurrency)
128-
elif (
129-
cls._semaphore is not None and cls._semaphore._value != max_concurrency # type: ignore
130+
instance = cls()
131+
if max_concurrency is not None:
132+
instance.reconfigure(max_concurrency)
133+
elif cls._semaphore is None:
134+
# 默认兜底
135+
cls._semaphore = asyncio.Semaphore(3)
136+
return instance
137+
138+
def reconfigure(self, max_concurrency: int):
139+
"""重新配置并发上限。注意:这会替换信号量对象。"""
140+
if self._semaphore is None or (
141+
hasattr(self._semaphore, "_value")
142+
and self._semaphore._value != max_concurrency # type: ignore
130143
):
131-
# 如果请求的并发数发生变化,重新创建信号量
144+
old_val = (
145+
getattr(self._semaphore, "_value", "None")
146+
if self._semaphore
147+
else "None"
148+
)
132149
logger.info(
133-
f"GlobalRateLimiter 重新配置:{cls._semaphore._value} -> {max_concurrency}"
150+
f"GlobalRateLimiter 重新配置并发上限:{old_val} -> {max_concurrency}"
134151
)
135-
cls._semaphore = asyncio.Semaphore(max_concurrency)
136-
return cls._instance
152+
self.__class__._semaphore = asyncio.Semaphore(max_concurrency)
137153

138154
@property
139155
def semaphore(self) -> asyncio.Semaphore:
140156
"""返回核心的异步信号量对象。"""
141157
if self._semaphore is None:
142-
# 兜底:若直接通过属性访问则初始化默认值
143-
self._semaphore = asyncio.Semaphore(3)
158+
self.__class__._semaphore = asyncio.Semaphore(3)
159+
assert self._semaphore is not None
144160
return self._semaphore
145-
146-
147-
# 导出默认实例:用于 LLM 调用的全局限流
148-
global_llm_rate_limiter: asyncio.Semaphore = GlobalRateLimiter.get_instance(
149-
max_concurrency=3
150-
).semaphore

0 commit comments

Comments
 (0)