-
Notifications
You must be signed in to change notification settings - Fork 81
Expand file tree
/
Copy pathbot_worker.py
More file actions
121 lines (96 loc) · 3.91 KB
/
Copy pathbot_worker.py
File metadata and controls
121 lines (96 loc) · 3.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
"""Worker 分片进程入口:每进程约 5 个牛牛反向 WS。
启动前设置::
PALLAS_SHARD_ENABLED=true
PALLAS_BOT_ROLE=worker
PALLAS_SHARD_ID=0
PORT=8090
"""
import asyncio
import os
os.environ.setdefault("PALLAS_SHARD_ENABLED", "true")
os.environ.setdefault("PALLAS_BOT_ROLE", "worker")
from src.foundation.config.dotenv import apply_repo_settings_to_environ
apply_repo_settings_to_environ()
def pin_worker_listen_port() -> None:
"""覆盖 .env 中 PORT,避免 getenv 读到首个 PORT 导致各 worker 抢同一端口。"""
from src.platform.shard.registry import get_shard_registry, worker_port_for_shard
from src.platform.shard.registry.config import get_shard_registry_settings
from src.platform.shard.registry.listen_port import apply_listen_port
s = get_shard_registry_settings()
port = worker_port_for_shard(s.shard_id, registry=get_shard_registry())
apply_listen_port(port)
pin_worker_listen_port()
# ruff: noqa: E402
import nonebot
from nonebot.adapters.onebot.v11 import Adapter as ONEBOT_V11Adapter
from src.console.web import install_nonebot_log_sink
from src.features.ban_gate.snapshot import start_ban_gate_snapshot, stop_ban_gate_snapshot
from src.features.message_scrub import start_message_scrub_if_enabled
from src.foundation.db import init_db
from src.foundation.logging import apply_stdlib_logging_channel_prefix
from src.platform.bot_runtime import load_plugins_for_role
from src.platform.shard.logs.process import install_shard_process_logging
from src.platform.shard.registry import get_shard_registry, worker_port_for_shard
from src.platform.shard.registry.config import get_shard_registry_settings
from src.platform.shard.registry.listen_port import apply_listen_port
from src.shared.adapters import register_onebot_v11_custom_events
from src.shared.utils.voice_downloader import ensure_voices
apply_stdlib_logging_channel_prefix()
nonebot.init()
apply_listen_port(
worker_port_for_shard(
get_shard_registry_settings().shard_id,
registry=get_shard_registry(),
)
)
start_message_scrub_if_enabled()
install_shard_process_logging()
install_nonebot_log_sink()
driver = nonebot.get_driver()
driver.register_adapter(ONEBOT_V11Adapter)
register_onebot_v11_custom_events()
async def _ensure_worker_voices_background() -> None:
try:
ok = await ensure_voices()
if not ok:
nonebot.logger.warning("bot_worker: voice ensure failed or incomplete")
except Exception as err:
nonebot.logger.warning("bot_worker: voice ensure failed: {}", err)
@driver.on_startup
async def startup():
await init_db()
await start_ban_gate_snapshot()
from src.platform.coord.redis_settings import (
coord_redis_enabled,
ensure_coord_redis_ready_for_sharding,
resolve_coord_redis_url,
)
from src.platform.shard.coord.worker_poll import start_shard_coord_worker_watcher
ensure_coord_redis_ready_for_sharding()
s = get_shard_registry_settings()
reg = get_shard_registry()
port = worker_port_for_shard(s.shard_id, registry=reg)
bots = reg.bots_on_shard(s.shard_id)
nonebot.logger.info(
"bot_worker: shard_id={} port={} assigned_bots={} (expect WS on this port)",
s.shard_id,
port,
bots,
)
if int(os.environ.get("PORT", "0") or 0) not in (0, port):
nonebot.logger.warning(
"bot_worker: env PORT={} differs from registry port {} for shard {}",
os.environ.get("PORT"),
port,
s.shard_id,
)
if coord_redis_enabled():
nonebot.logger.info("bot_worker: cross-process claims via Redis ({})", resolve_coord_redis_url())
start_shard_coord_worker_watcher()
asyncio.create_task(_ensure_worker_voices_background(), name="worker_ensure_voices")
@driver.on_shutdown
async def shutdown():
await stop_ban_gate_snapshot()
load_plugins_for_role()
if __name__ == "__main__":
nonebot.run()