Skip to content

Commit e68ed47

Browse files
committed
feat(shard): 分片扩容、AI 回调与运维改进
- feat(shard): 满员自动拉起 worker、scale-only 扩容与 worker 计数修正 - fix(shard): 协议端 bot_offline 同步 WebUI、AI 回调 TTL 与 task 登记 Redis 双写 - fix(repeater): 分片下主动发言与维护任务归属 worker - fix(db): replace_answers 并发安全、孤儿 Answer 分批删除与日志降噪 - refactor(worker_count): 修复 CI - chore(db): ruff format repository_pg
1 parent a0b85f6 commit e68ed47

44 files changed

Lines changed: 1987 additions & 136 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

scripts/calc_worker_count.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
#!/usr/bin/env python3
2+
"""输出应启动的生产 worker 数量(供 run_sharded_bot.sh 调用)。"""
3+
4+
from __future__ import annotations
5+
6+
import argparse
7+
import sys
8+
from pathlib import Path
9+
10+
REPO_ROOT = Path(__file__).resolve().parents[1]
11+
if str(REPO_ROOT) not in sys.path:
12+
sys.path.insert(0, str(REPO_ROOT))
13+
14+
from src.platform.shard.registry.worker_count import calc_production_worker_count # noqa: E402
15+
16+
17+
def main() -> int:
18+
parser = argparse.ArgumentParser(description="计算生产 worker 数量")
19+
parser.add_argument("--bots-per", type=int, default=5)
20+
parser.add_argument("--base", type=int, default=None)
21+
parser.add_argument(
22+
"--accounts",
23+
type=Path,
24+
default=REPO_ROOT / "data/pallas_protocol/accounts.json",
25+
)
26+
parser.add_argument(
27+
"--registry",
28+
type=Path,
29+
default=REPO_ROOT / "data/pallas_shard/registry.json",
30+
)
31+
args = parser.parse_args()
32+
count = calc_production_worker_count(
33+
bots_per_shard=args.bots_per,
34+
worker_base_port=args.base,
35+
accounts_path=args.accounts if args.accounts.is_file() else None,
36+
registry_path=args.registry if args.registry.is_file() else None,
37+
)
38+
print(count)
39+
return 0
40+
41+
42+
if __name__ == "__main__":
43+
raise SystemExit(main())

scripts/run_sharded_bot.sh

Lines changed: 168 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ WORKER_BASE_PORT="${PALLAS_SHARD_WORKER_BASE_PORT:-8090}"
2727
WORKER_COUNT_OVERRIDE=""
2828
WORKERS_ONLY=0
2929
HUB_ONLY=0
30+
SCALE_ONLY=0
3031
DRY_RUN=0
3132
SKIP_PORT_SYNC=0
3233
SKIP_OCCUPIED_PORTS=1
@@ -57,7 +58,7 @@ usage() {
5758
用法: ./scripts/run_sharded_bot.sh <命令> [选项]
5859
5960
命令:
60-
start 启动控制台与全部牛牛 worker(可加 --hub-only 仅启 hub)
61+
start 启动控制台与全部牛牛 worker(可加 --hub-only 仅启 hub;--workers-only 仅拉起缺失 worker
6162
stop 停止全部分片进程(可加 --workers-only 仅停 worker,或 --hub-only 仅停 hub)
6263
status 查看进程、端口、Redis 协调与配置摘要
6364
restart 先 stop 再 start(可加 --workers-only 仅重启 worker,或 --hub-only 仅重启 hub)
@@ -75,6 +76,7 @@ usage() {
7576
./scripts/run_sharded_bot.sh status
7677
./scripts/run_sharded_bot.sh restart
7778
./scripts/run_sharded_bot.sh restart --workers-only
79+
./scripts/run_sharded_bot.sh start --workers-only
7880
./scripts/run_sharded_bot.sh start --hub-only
7981
./scripts/run_sharded_bot.sh stop --hub-only
8082
./scripts/run_sharded_bot.sh restart --hub-only
@@ -91,7 +93,8 @@ usage() {
9193
--skip-port-sync 启动前不自动改写协议端里的反向 WS 地址
9294
--no-skip-occupied-ports
9395
worker 严格使用 起点+N,不自动避开已占用端口
94-
--workers-only 仅操作生产 worker(不停/不启 hub;restart 时常用)
96+
--workers-only 仅操作生产 worker(不停/不启 hub;start 时只拉起缺失进程,restart 时全量重启)
97+
--scale-only 扩容模式:不重分配端口、不同步协议端 ws_url(已有 worker 在跑时 start --workers-only 自动启用)
9598
--hub-only 仅操作 hub 控制台(不停/不启 worker;restart 时常用)
9699
--dry-run 只显示将要执行的命令,不真正启动
97100
-h, --help 显示本帮助
@@ -251,48 +254,18 @@ read_dotenv_port() {
251254
echo "${default}"
252255
}
253256

254-
calc_worker_count() {
255-
python3 - "${REPO_ROOT}" "${BOTS_PER_SHARD}" "${ACCOUNTS_JSON}" "${REGISTRY_JSON}" <<'PY'
256-
import json
257-
import math
258-
import sys
259-
from pathlib import Path
257+
CALC_WORKERS_SCRIPT="${SCRIPT_DIR}/calc_worker_count.py"
260258

261-
root = Path(sys.argv[1])
262-
bots_per = max(1, int(sys.argv[2]))
263-
accounts_path = Path(sys.argv[3])
264-
registry_path = Path(sys.argv[4])
265-
need = 1
266-
267-
if accounts_path.is_file():
268-
try:
269-
raw = json.loads(accounts_path.read_text(encoding="utf-8"))
270-
items = raw.values() if isinstance(raw, dict) else raw
271-
enabled = 0
272-
for v in items:
273-
if isinstance(v, dict) and v.get("enabled", True):
274-
enabled += 1
275-
if enabled > 0:
276-
need = max(need, math.ceil(enabled / bots_per))
277-
except (json.JSONDecodeError, OSError, TypeError):
278-
pass
279-
280-
if registry_path.is_file():
281-
try:
282-
reg = json.loads(registry_path.read_text(encoding="utf-8"))
283-
assigns = reg.get("assignments") or {}
284-
test_cfg = reg.get("test") or {}
285-
test_sid = int(test_cfg.get("shard_id", 99))
286-
if assigns:
287-
normal_vals = [int(x) for x in assigns.values() if int(x) != test_sid]
288-
if normal_vals:
289-
max_sid = max(normal_vals)
290-
need = max(need, max_sid + 1, math.ceil(len(normal_vals) / bots_per))
291-
except (json.JSONDecodeError, OSError, ValueError, TypeError):
292-
pass
293-
294-
print(need)
295-
PY
259+
calc_worker_count() {
260+
local -a args=(--bots-per "${BOTS_PER_SHARD}" --accounts "${ACCOUNTS_JSON}" --registry "${REGISTRY_JSON}")
261+
if [[ -n "${WORKER_BASE_PORT:-}" ]]; then
262+
args+=(--base "${WORKER_BASE_PORT}")
263+
fi
264+
if [[ ! -f "${CALC_WORKERS_SCRIPT}" ]]; then
265+
echo 1
266+
return
267+
fi
268+
uv run python "${CALC_WORKERS_SCRIPT}" "${args[@]}" 2>/dev/null || echo 1
296269
}
297270

298271
is_running() {
@@ -438,6 +411,11 @@ stop_production_workers() {
438411
stop_orphan_worker_processes
439412
}
440413

414+
worker_port_for_sid() {
415+
local sid="$1"
416+
registry_port_for_shard "${sid}"
417+
}
418+
441419
start_production_workers() {
442420
local workers="$1"
443421
local -a common
@@ -449,7 +427,8 @@ start_production_workers() {
449427
IFS=',' read -r -a _worker_ports <<< "${WORKER_PORTS_CSV}"
450428
fi
451429
while [[ "${sid}" -lt "${workers}" ]]; do
452-
local wport=$((WORKER_BASE_PORT + sid))
430+
local wport
431+
wport="$(worker_port_for_sid "${sid}")"
453432
if [[ "${#_worker_ports[@]}" -gt "${sid}" && -n "${_worker_ports[sid]:-}" ]]; then
454433
wport="${_worker_ports[sid]}"
455434
fi
@@ -463,6 +442,44 @@ start_production_workers() {
463442
done
464443
}
465444

445+
start_missing_production_workers() {
446+
local workers="$1"
447+
local -a common
448+
mapfile -t common < <(shard_common_env)
449+
load_shard_redis_env
450+
local sid=0
451+
while [[ "${sid}" -lt "${workers}" ]]; do
452+
local wport pidfile
453+
pidfile="${RUN_DIR}/worker-${sid}.pid"
454+
wport="$(worker_port_for_sid "${sid}")"
455+
if is_running "${pidfile}"; then
456+
echo " · worker-${sid} WS:${wport}:已在运行(无需重复启动)"
457+
sid=$((sid + 1))
458+
continue
459+
fi
460+
start_one "worker-${sid}" "worker-${sid} WS:${wport}" env \
461+
"${common[@]}" \
462+
PALLAS_BOT_ROLE=worker \
463+
PALLAS_SHARD_ID="${sid}" \
464+
PORT="${wport}" \
465+
"${START_CMD[@]}" bot_worker.py
466+
sid=$((sid + 1))
467+
done
468+
}
469+
470+
count_running_production_worker_ids() {
471+
local running=0
472+
local f
473+
for f in "${RUN_DIR}"/worker-*.pid; do
474+
[[ -e "${f}" ]] || continue
475+
[[ "$(basename "${f}" .pid)" == "worker-test" ]] && continue
476+
if is_running "${f}"; then
477+
running=$((running + 1))
478+
fi
479+
done
480+
echo "${running}"
481+
}
482+
466483
wait_worker_ports_released() {
467484
local workers="$1"
468485
if [[ "${DRY_RUN}" -eq 1 ]]; then
@@ -793,18 +810,95 @@ cmd_start_hub_only() {
793810
echo ""
794811
echo " 常用命令"
795812
echo " status 查看运行状态"
796-
echo " restart --workers-only 仅重启 worker"
813+
echo " start --workers-only 仅拉起缺失 worker"
814+
echo " restart --workers-only 全量重启 worker"
797815
echo " restart --hub-only 仅重启 hub"
798816
if [[ "${hub_ok}" -eq 0 ]]; then
799817
return 1
800818
fi
801819
}
802820

821+
cmd_start_workers_only() {
822+
local workers="${WORKER_COUNT_OVERRIDE:-$(calc_worker_count)}"
823+
local running_before
824+
running_before="$(count_running_production_worker_ids)"
825+
if [[ "${SCALE_ONLY}" -eq 0 && "${running_before}" -gt 0 ]]; then
826+
SCALE_ONLY=1
827+
fi
828+
829+
print_title "Pallas-Bot 分片模式 · 启动缺失 worker(保留已运行进程)"
830+
print_config_summary "${workers}"
831+
if is_running "${PID_HUB}"; then
832+
echo " hub 保持运行(:${HUB_PORT}"
833+
else
834+
echo " hub 未运行(本次不启动 hub)"
835+
fi
836+
if [[ "${SCALE_ONLY}" -eq 1 ]]; then
837+
echo " 端口策略 扩容模式(registry 端口,不重分配、不同步协议端)"
838+
else
839+
echo " 端口策略 冷启动(评估端口并同步协议端 ws_url)"
840+
fi
841+
load_shard_redis_env
842+
echo " $(shard_coord_backend_hint)"
843+
echo ""
844+
845+
if [[ "${SCALE_ONLY}" -eq 1 ]]; then
846+
workers="$(calc_worker_count)"
847+
echo " 正在启动缺失 worker(已在运行的分片将跳过)…"
848+
start_missing_production_workers "${workers}"
849+
else
850+
prepare_shard_ports "${workers}" || return 1
851+
workers="$(calc_worker_count)"
852+
echo ""
853+
echo " 正在启动 worker…"
854+
start_production_workers "${workers}"
855+
fi
856+
857+
if [[ "${DRY_RUN}" -eq 1 ]]; then
858+
echo ""
859+
echo " (预览模式,未实际启动)"
860+
return 0
861+
fi
862+
863+
echo ""
864+
echo " 正在确认 worker…"
865+
local worker_running=0 worker_fail=0
866+
local f
867+
for f in "${RUN_DIR}"/worker-*.pid; do
868+
[[ -e "${f}" ]] || continue
869+
[[ "$(basename "${f}" .pid)" == "worker-test" ]] && continue
870+
if is_running "${f}"; then
871+
worker_running=$((worker_running + 1))
872+
else
873+
worker_fail=$((worker_fail + 1))
874+
local wname
875+
wname="$(basename "${f}" .pid)"
876+
echo " · ${wname} 启动后已退出,请查看: ${LOG_DIR}/${wname}.log"
877+
fi
878+
done
879+
880+
print_title "worker 扩容完成"
881+
echo " 汇总 worker ${worker_running}/${workers} 运行 · $(shard_coord_backend_hint)"
882+
if [[ "${worker_fail}" -gt 0 ]]; then
883+
echo " 提示 部分 worker 未保持运行,请检查端口与日志"
884+
elif [[ "${worker_running}" -lt "${workers}" ]]; then
885+
echo " 提示 仍有 worker 未启动,可再次执行 start --workers-only"
886+
fi
887+
}
888+
803889
cmd_start() {
890+
if [[ "${WORKERS_ONLY}" -eq 1 && "${HUB_ONLY}" -eq 1 ]]; then
891+
echo " --workers-only 与 --hub-only 不能同时使用" >&2
892+
return 1
893+
fi
804894
if [[ "${HUB_ONLY}" -eq 1 ]]; then
805895
cmd_start_hub_only
806896
return
807897
fi
898+
if [[ "${WORKERS_ONLY}" -eq 1 ]]; then
899+
cmd_start_workers_only
900+
return
901+
fi
808902
local workers="${WORKER_COUNT_OVERRIDE:-$(calc_worker_count)}"
809903
local hub_url="http://127.0.0.1:${HUB_PORT}"
810904

@@ -827,6 +921,7 @@ cmd_start() {
827921
wait_worker_ports_released "${workers}" || true
828922
echo ""
829923
prepare_shard_ports "${workers}" || return 1
924+
workers="$(calc_worker_count)"
830925
echo ""
831926
echo " 正在启动进程…"
832927

@@ -991,10 +1086,23 @@ cmd_status() {
9911086
fi
9921087
printf " %-10s %-6s WS:%s\n" "${name}" "${state}" "${port}"
9931088
done
994-
if [[ "${workers}" -eq 0 ]]; then
1089+
if [[ "${workers}" -eq 0 && "${expected_workers}" -eq 0 ]]; then
9951090
echo " (尚无生产 worker,请先 start)"
9961091
else
997-
echo " 小计 ${running_workers}/${workers} 运行"
1092+
echo " 小计 ${running_workers}/${expected_workers} 运行"
1093+
if [[ "${running_workers}" -lt "${expected_workers}" ]]; then
1094+
local sid=0 missing=0
1095+
while [[ "${sid}" -lt "${expected_workers}" ]]; do
1096+
local pf="${RUN_DIR}/worker-${sid}.pid"
1097+
if ! is_running "${pf}"; then
1098+
missing=$((missing + 1))
1099+
local mport
1100+
mport="$(registry_port_for_shard "${sid}")"
1101+
echo " worker-${sid} 未启动 WS:${mport}"
1102+
fi
1103+
sid=$((sid + 1))
1104+
done
1105+
fi
9981106
fi
9991107

10001108
echo ""
@@ -1015,11 +1123,16 @@ cmd_status() {
10151123
echo ""
10161124
echo " 日志 ${LOG_DIR}/"
10171125

1018-
if [[ "${stale_workers}" -gt 0 ]]; then
1126+
if [[ "${stale_workers}" -gt 0 || "${running_workers}" -lt "${expected_workers}" ]]; then
10191127
echo ""
1020-
echo " 提示:${stale_workers} 个 worker 未运行"
1128+
if [[ "${running_workers}" -lt "${expected_workers}" ]]; then
1129+
echo " 提示:应有 ${expected_workers} 个 worker,当前仅 ${running_workers} 个在运行"
1130+
else
1131+
echo " 提示:${stale_workers} 个 worker 未运行"
1132+
fi
10211133
if is_running "${PID_HUB}"; then
1022-
echo " ./scripts/run_sharded_bot.sh restart --workers-only"
1134+
echo " ./scripts/run_sharded_bot.sh start --workers-only"
1135+
echo " ./scripts/run_sharded_bot.sh restart --workers-only # 需全量重启时"
10231136
else
10241137
echo " ./scripts/run_sharded_bot.sh restart"
10251138
echo " ./scripts/run_sharded_bot.sh start --hub-only # 仅启 WebUI 控制台"
@@ -1097,6 +1210,7 @@ cmd_restart_workers_only() {
10971210
wait_worker_ports_released "${workers}" || true
10981211
echo ""
10991212
prepare_shard_ports "${workers}" || return 1
1213+
workers="$(calc_worker_count)"
11001214
echo ""
11011215
echo " 正在启动 worker…"
11021216
start_production_workers "${workers}"
@@ -1223,6 +1337,10 @@ while [[ $# -gt 0 ]]; do
12231337
WORKERS_ONLY=1
12241338
shift
12251339
;;
1340+
--scale-only)
1341+
SCALE_ONLY=1
1342+
shift
1343+
;;
12261344
--hub-only)
12271345
HUB_ONLY=1
12281346
shift

src/features/corpus/community_source.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,9 @@ async def replace_answers(self, keywords: str, answers: list[Answer], clear_time
161161
async def append_ban(self, keywords: str, ban: Ban) -> None:
162162
return None
163163

164+
async def find_ban_reply_target(self, group_id: int, reply_message: str) -> tuple[str, str] | None:
165+
return None
166+
164167
async def _post_contribute(self, body: dict[str, Any]) -> None:
165168
if not self._api_bases:
166169
return

src/features/corpus/composite_repo.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -319,3 +319,9 @@ async def replace_answers(self, keywords: str, answers: list[Answer], clear_time
319319

320320
async def append_ban(self, keywords: str, ban: Ban) -> None:
321321
await self._local.append_ban(keywords, ban)
322+
323+
async def find_ban_reply_target(self, group_id: int, reply_message: str) -> tuple[str, str] | None:
324+
find_target = getattr(self._local, "find_ban_reply_target", None)
325+
if not callable(find_target):
326+
return None
327+
return await find_target(group_id, reply_message)

0 commit comments

Comments
 (0)