Skip to content

Commit 2756932

Browse files
committed
feat(shard): 接入 Redis 协调存储与观测链路
1 parent b088ac3 commit 2756932

46 files changed

Lines changed: 1519 additions & 1445 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

bot_worker.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
PORT=8090
99
"""
1010

11+
import asyncio
1112
import os
1213

1314
os.environ.setdefault("PALLAS_SHARD_ENABLED", "true")
@@ -64,6 +65,15 @@ def pin_worker_listen_port() -> None:
6465
register_onebot_v11_custom_events()
6566

6667

68+
async def _ensure_worker_voices_background() -> None:
69+
try:
70+
ok = await ensure_voices()
71+
if not ok:
72+
nonebot.logger.warning("bot_worker: voice ensure failed or incomplete")
73+
except Exception as err:
74+
nonebot.logger.warning("bot_worker: voice ensure failed: {}", err)
75+
76+
6777
@driver.on_startup
6878
async def startup():
6979
await init_db()
@@ -97,7 +107,7 @@ async def startup():
97107
if coord_redis_enabled():
98108
nonebot.logger.info("bot_worker: cross-process claims via Redis ({})", resolve_coord_redis_url())
99109
start_shard_coord_worker_watcher()
100-
await ensure_voices()
110+
asyncio.create_task(_ensure_worker_voices_background(), name="worker_ensure_voices")
101111

102112

103113
@driver.on_shutdown

docs/architecture/bot_process_sharding.md

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -179,25 +179,25 @@ WebUI:`GET /pallas/api/shard-registry` 查看分片与 QQ 归属(需控制
179179

180180
## 跨 worker 协调(共享 `data/pallas_shard/`
181181

182-
| 能力 | 目录 / 模块 | 说明 |
182+
| 能力 | Redis 键 / 模块 | 说明 |
183183
|------|-------------|------|
184-
| 牛牛报数顺序 | `coord/bot_count/` | 各片登记 → 收集窗口 → 最小 QQ finalize 顺序 |
185-
| 决斗 QTE 代答 | `coord/duel_qte/` | 主持片写会话,应答牛所在片 watcher 回写 |
186-
| 同群决斗互斥 | `coord/duel_group/` | 跨片占用 |
187-
| 指定牛代发 API | `coord/bot_action/` | 跨片请求 + watcher |
188-
| 复读近期语料 | `coord/repeater_buffer/` + Redis Pub/Sub | ingress 获胜片广播;有 Redis 时订阅即时合并,否则文件轮询回退 |
189-
| 群级短占位 | `coord/group_gate/` |`draw` owned gate |
184+
| 牛牛报数顺序 | `pallas:coord:bot_count:*` | 各片登记 → 收集窗口 → 最小 QQ finalize 顺序 |
185+
| 决斗 QTE 代答 | `pallas:duel_qte:session:*` + Pub/Sub | 主持片写会话,应答牛所在片 listener 回写 |
186+
| 同群决斗互斥 | `pallas:coord:duel_group:*` | 跨片占用 |
187+
| 指定牛代发 API | `pallas:coord:bot_action:*` + Pub/Sub | 跨片请求 + listener |
188+
| 复读近期语料 | `pallas:repeater_buffer` + Pub/Sub | ingress 获胜片广播并即时合并 |
189+
| 群级短占位 | `pallas:coord:group_gate:*` |`draw` owned gate |
190190
| 在线态 / WebUI | `worker_presence.json` | hub 读各 worker 连接 |
191191
| 控制台指标 | `stats/worker-{N}.json` | hub 合并展示 |
192192
| registry / fleet | `registry.json` + mtime | `shard_data_sync` 失效缓存 |
193193

194-
worker 由 `src/platform/shard/coord/worker_poll.py` 轮询 `duel_qte``bot_action` 待办
194+
worker 由 `src/platform/shard/coord/worker_poll.py` 启动 Redis listener,监听 `duel_qte``bot_action` 与 repeater 缓冲广播
195195

196196
## WebUI 与日志(hub)
197197

198198
- `GET /pallas/api/bots`:读 **worker_presence.json**(hub 无反向 WS)。
199199
- `GET /pallas/api/logs`:合并 `data/pallas_shard/logs/hub.log` 与各 `worker-*.log` 尾行(`sharded_logs: true`)。
200-
- `GET /pallas/api/shard-observability`:合并各 worker `stats/worker-*.json`**ingress 命中率****coord 积压**实时扫描 `coord/`)、**PG 连接池粗算****WebUI 首页**在分片模式下展示「分片可观测」面板。
200+
- `GET /pallas/api/shard-observability`:合并各 worker `stats/worker-*.json`**ingress 命中率****coord 积压**实时统计 Redis 键空间)、**PG 连接池粗算****WebUI 首页**在分片模式下展示「分片可观测」面板。
201201
- `./scripts/run_sharded_bot.sh status`:输出 **分片可观测** 摘要(同 API 数据源)。
202202
- `uv run python scripts/shard_observability_snapshot.py`:完整 JSON;`scripts/shard_observability_status.py`:终端摘要。
203203
- `plugin-run-stats` / `message-stats`:合并各 worker 的 `stats/worker-*.json`;ERROR 日志合并 worker/hub 落盘文件。

docs/plugins/maa/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
| 现象 | 处理 |
4646
| --- | --- |
4747
| 未检测到轮询 | MAA 端点不可达或 URL 错误;分片须 hub `maa_public_base_url` 且各 worker 共用 `data/` |
48-
| 状态有待拉取、MAA 无任务 | 分片时队列在 `data/pallas_shard/coord/maa_pending/`;须 hub 能访问各 worker 端口 |
48+
| 状态有待拉取、MAA 无任务 | 分片时队列走 Redis `pallas:coord:maa_pending:*`;须 hub 能访问各 worker 端口并保证 Redis 可用 |
4949
| 下发后无任务 | 未绑定或用户标识符非 QQ;查 `牛牛MAA状态` |
5050
| 队列有、MAA 无 | 设备 id 与「当前选用」不一致;可清空队列重试 |
5151
| 截图失败 | 调大反代 `client_max_body_size` |

scripts/detect_shard_redis.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def main() -> int:
4848
pkg = "yes" if redis_package_installed() else "no"
4949
reachable = "yes" if url and coord_redis_enabled() else "no"
5050
active = "yes" if reachable == "yes" else "no"
51-
backend = "redis" if active == "yes" else "file"
51+
backend = "redis" if active == "yes" else "unavailable"
5252

5353
if args.status:
5454
print(f"policy={mode}")

scripts/prune_shard_coord.py

Lines changed: 16 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#!/usr/bin/env python3
2-
"""一次性清理 coord 陈旧 JSON(bot_action / duel_qte 等)。分片运行时可在 hub 或运维机执行。"""
2+
"""coord 清理脚本(coord 已 Redis 化,本脚本仅输出快照供运维兼容)。"""
33

44
from __future__ import annotations
55

@@ -14,51 +14,29 @@
1414
sys.path.insert(0, str(REPO_ROOT))
1515

1616

17-
def prune_all_done_bot_action() -> int:
18-
from src.platform.shard.coord.bot_action import _coord_dir, _read
19-
20-
removed = 0
21-
for path in _coord_dir().glob("*.json"):
22-
if ".lock" in path.name:
23-
continue
24-
row = _read(path)
25-
if isinstance(row, dict) and row.get("done"):
26-
try:
27-
path.unlink(missing_ok=True)
28-
removed += 1
29-
except OSError:
30-
pass
31-
return removed
32-
33-
3417
async def main() -> int:
35-
parser = argparse.ArgumentParser(description="清理 data/pallas_shard/coord 陈旧 JSON")
18+
parser = argparse.ArgumentParser(description="coord Redis 模式快照(legacy 文件清理已停用)")
3619
parser.add_argument(
3720
"--purge-done",
3821
action="store_true",
39-
help="删除全部 bot_action done 文件(运维一次性减压,不影响进行中的 open)",
22+
help="兼容旧参数;Redis 模式下无文件可清理",
23+
)
24+
parser.add_argument(
25+
"--live-scan",
26+
action="store_true",
27+
help="显式扫描 Redis 键空间;默认仅输出轻量快照",
4028
)
4129
args = parser.parse_args()
42-
from src.platform.shard.coord.bot_action import prune_stale_bot_action_files
43-
from src.platform.shard.coord.bot_count import prune_stale_bot_count_files
44-
from src.platform.shard.coord.cage_duel import prune_stale_cage_duel_files
45-
from src.platform.shard.coord.duel_group import prune_stale_duel_group_files
46-
from src.platform.shard.coord.duel_qte import prune_stale_duel_qte_files
47-
from src.platform.shard.coord.repeater_buffer import prune_stale_repeater_buffer_files
4830
from src.platform.shard.coord_pending import coord_pending_snapshot_sync
4931

50-
before = coord_pending_snapshot_sync()
51-
purged_done = prune_all_done_bot_action() if args.purge_done else 0
52-
bot_stats = await prune_stale_bot_action_files()
53-
if purged_done:
54-
bot_stats["purged_all_done"] = purged_done
55-
await prune_stale_duel_qte_files()
56-
await prune_stale_repeater_buffer_files()
57-
await prune_stale_cage_duel_files()
58-
await prune_stale_bot_count_files()
59-
await prune_stale_duel_group_files()
60-
after = coord_pending_snapshot_sync()
61-
out = {"before": before, "bot_action": bot_stats, "after": after}
32+
snap = coord_pending_snapshot_sync(live=bool(args.live_scan))
33+
out = {
34+
"storage": snap.get("storage"),
35+
"snapshot": snap,
36+
"purge_done_requested": bool(args.purge_done),
37+
"live_scan_requested": bool(args.live_scan),
38+
"note": "coord 数据存 Redis,无 JSON 文件轮询/prune",
39+
}
6240
json.dump(out, sys.stdout, ensure_ascii=False, indent=2)
6341
sys.stdout.write("\n")
6442
return 0

scripts/run_sharded_bot.sh

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ WAIT_PORTS_SCRIPT="${SCRIPT_DIR}/wait_shard_worker_ports.py"
3939
SHARD_TEST_SCRIPT="${SCRIPT_DIR}/shard_test_worker.py"
4040
DETECT_REDIS_SCRIPT="${SCRIPT_DIR}/detect_shard_redis.py"
4141
PORT_RELEASE_TIMEOUT="${PALLAS_SHARD_PORT_RELEASE_TIMEOUT:-60}"
42+
FORCE_STOP=0
4243
PID_WORKER_TEST="${RUN_DIR}/worker-test.pid"
4344
TEST_SHARD_ID="${PALLAS_SHARD_TEST_ID:-99}"
4445

@@ -96,6 +97,7 @@ usage() {
9697
--workers-only 仅操作生产 worker(不停/不启 hub;start 时只拉起缺失进程,restart 时全量重启)
9798
--scale-only 扩容模式:不重分配端口、不同步协议端 ws_url(已有 worker 在跑时 start --workers-only 自动启用)
9899
--hub-only 仅操作 hub 控制台(不停/不启 worker;restart 时常用)
100+
--force stop/restart 时 SIGKILL 强杀并跳过端口长等待(加快重启)
99101
--dry-run 只显示将要执行的命令,不真正启动
100102
-h, --help 显示本帮助
101103
@@ -330,6 +332,13 @@ stop_one() {
330332
fi
331333
local pid
332334
pid="$(cat "${pidfile}")"
335+
if [[ "${FORCE_STOP}" -eq 1 ]]; then
336+
kill -KILL "${pid}" 2>/dev/null || true
337+
pkill -KILL -P "${pid}" 2>/dev/null || true
338+
rm -f "${pidfile}"
339+
echo " · ${label}:已强制停止"
340+
return 0
341+
fi
333342
kill -TERM "${pid}" 2>/dev/null || true
334343
local i=0
335344
while kill -0 "${pid}" 2>/dev/null && [[ "${i}" -lt 30 ]]; do
@@ -338,6 +347,7 @@ stop_one() {
338347
done
339348
if kill -0 "${pid}" 2>/dev/null; then
340349
kill -KILL "${pid}" 2>/dev/null || true
350+
pkill -KILL -P "${pid}" 2>/dev/null || true
341351
fi
342352
rm -f "${pidfile}"
343353
echo " · ${label}:已停止"
@@ -347,8 +357,15 @@ stop_one() {
347357
stop_orphan_shard_processes() {
348358
local pat
349359
for pat in "${REPO_ROOT}/.venv/bin/python3 bot_hub.py" "${REPO_ROOT}/.venv/bin/python3 bot_worker.py"; do
350-
pkill -TERM -f "${pat}" 2>/dev/null || true
360+
if [[ "${FORCE_STOP}" -eq 1 ]]; then
361+
pkill -KILL -f "${pat}" 2>/dev/null || true
362+
else
363+
pkill -TERM -f "${pat}" 2>/dev/null || true
364+
fi
351365
done
366+
if [[ "${FORCE_STOP}" -eq 1 ]]; then
367+
return 0
368+
fi
352369
sleep 1
353370
for pat in "${REPO_ROOT}/.venv/bin/python3 bot_hub.py" "${REPO_ROOT}/.venv/bin/python3 bot_worker.py"; do
354371
pkill -KILL -f "${pat}" 2>/dev/null || true
@@ -357,13 +374,21 @@ stop_orphan_shard_processes() {
357374

358375
stop_orphan_worker_processes() {
359376
local pat="${REPO_ROOT}/.venv/bin/python3 bot_worker.py"
377+
if [[ "${FORCE_STOP}" -eq 1 ]]; then
378+
pkill -KILL -f "${pat}" 2>/dev/null || true
379+
return 0
380+
fi
360381
pkill -TERM -f "${pat}" 2>/dev/null || true
361382
sleep 1
362383
pkill -KILL -f "${pat}" 2>/dev/null || true
363384
}
364385

365386
stop_orphan_hub_processes() {
366387
local pat="${REPO_ROOT}/.venv/bin/python3 bot_hub.py"
388+
if [[ "${FORCE_STOP}" -eq 1 ]]; then
389+
pkill -KILL -f "${pat}" 2>/dev/null || true
390+
return 0
391+
fi
367392
pkill -TERM -f "${pat}" 2>/dev/null || true
368393
sleep 1
369394
pkill -KILL -f "${pat}" 2>/dev/null || true
@@ -482,6 +507,11 @@ count_running_production_worker_ids() {
482507

483508
wait_worker_ports_released() {
484509
local workers="$1"
510+
if [[ "${FORCE_STOP}" -eq 1 ]]; then
511+
echo " · worker 端口:强制模式,短等待 3s"
512+
sleep 3
513+
return 0
514+
fi
485515
if [[ "${DRY_RUN}" -eq 1 ]]; then
486516
echo " · worker 端口:将等待释放后再启动(预览)"
487517
return 0
@@ -1024,7 +1054,7 @@ cmd_stop() {
10241054
echo " 生产 worker 已处理完毕(测试 worker-test 未停止)。"
10251055
return 0
10261056
fi
1027-
print_title "Pallas-Bot 分片模式 · 停止"
1057+
print_title "Pallas-Bot 分片模式 · 停止$([[ "${FORCE_STOP}" -eq 1 ]] && echo '(强制)')"
10281058
stop_one "worker-test" "测试 worker-test"
10291059
stop_production_workers
10301060
stop_hub_process
@@ -1345,6 +1375,10 @@ while [[ $# -gt 0 ]]; do
13451375
HUB_ONLY=1
13461376
shift
13471377
;;
1378+
--force)
1379+
FORCE_STOP=1
1380+
shift
1381+
;;
13481382
-h|--help)
13491383
usage
13501384
exit 0

scripts/shard_observability_status.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,15 +50,17 @@ def main() -> int:
5050
f"早丢弃={int(ing.get('early_fleet', 0)) + int(ing.get('early_not_at_target', 0))}"
5151
)
5252
print(
53-
f"coord JSON total={coord.get('total_json', 0)} "
53+
f"coord Redis keys={coord.get('total_json', 0)} "
5454
f"actionable={coord.get('actionable_total', coord.get('bot_action_open', 0))} "
5555
f"historical={coord.get('historical_retained', 0)} "
5656
f"bot_action_open={coord.get('bot_action_open', 0)} "
5757
f"stale_open={coord.get('bot_action_stale_open', 0)}"
5858
)
59+
if coord.get("scan_skipped"):
60+
print("coord live scan skipped(默认不扫 Redis,全量排查用 prune_shard_coord.py --live-scan)")
5961
ba = (coord.get("by_dir") or {}).get("bot_action", 0)
6062
if ba:
61-
print(f"coord bot_action 文件 {ba}prune_shard_coord.py --purge-done)")
63+
print(f"coord bot_action {ba} (prune_shard_coord.py 仅输出快照)")
6264

6365
peak = pg.get("estimated_pg_connections_peak")
6466
proc = pg.get("estimated_processes")

0 commit comments

Comments
 (0)