Skip to content

Commit f435c01

Browse files
fix(repeater): 分片下主动发言与维护任务归属 worker
主动发言仅选本 worker 已连接牛;speak_up 未连接则跳过;sync/context 清理仅在 shard-0 执行。 Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 778a18e commit f435c01

5 files changed

Lines changed: 126 additions & 3 deletions

File tree

src/plugins/repeater/__init__.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -390,18 +390,24 @@ async def speak_up():
390390

391391
bot_id, group_id, messages, target_id = ret
392392

393+
try:
394+
bot = get_bot(str(bot_id))
395+
except (KeyError, ValueError):
396+
logger.debug("speak_up skip bot [{}] not connected on this worker", bot_id)
397+
return
398+
393399
for msg in messages:
394400
logger.info(f"bot [{bot_id}] ready to speak [{msg}] to group [{group_id}]")
395401
try:
396-
await get_bot(str(bot_id)).call_api(
402+
await bot.call_api(
397403
"send_group_msg",
398404
**{
399405
"message": msg,
400406
"group_id": group_id,
401407
},
402408
)
403409
if target_id:
404-
await get_bot(str(bot_id)).call_api(
410+
await bot.call_api(
405411
"group_poke",
406412
**{
407413
"user_id": target_id,
@@ -421,5 +427,9 @@ async def speak_up():
421427

422428
@scheduler.scheduled_job("cron", hour=4)
423429
async def update_data():
430+
from .shard_opt import repeater_maintenance_runs_on_worker
431+
432+
if not repeater_maintenance_runs_on_worker():
433+
return
424434
await Chat.sync()
425435
await Chat.clearup_context()

src/plugins/repeater/shard_opt.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,15 @@ def repeater_worker_handles_message(bot_id: int) -> bool:
1212
return True
1313

1414

15+
def local_connected_bot_ids() -> frozenset[int]:
16+
"""本 worker 当前已连接的牛牛 QQ 集合。"""
17+
try:
18+
from nonebot import get_bots
19+
except Exception:
20+
return frozenset()
21+
return frozenset(int(key) for key in get_bots() if str(key).isdigit())
22+
23+
1524
def repeater_scheduler_runs_on_worker() -> bool:
1625
"""主动发言定时任务:分片时仅代表牛所在 worker 执行,减少重复扫描。"""
1726
from src.platform.shard.registry.config import is_sharding_active
@@ -21,3 +30,12 @@ def repeater_scheduler_runs_on_worker() -> bool:
2130
from src.platform.shard.local_representative import local_worker_representative_bot_id
2231

2332
return local_worker_representative_bot_id() is not None
33+
34+
35+
def repeater_maintenance_runs_on_worker() -> bool:
36+
"""跨 worker 全局维护(sync / context 清理):分片时仅 shard 0 执行。"""
37+
from src.platform.shard.registry.config import get_shard_registry_settings, is_sharding_active
38+
39+
if not is_sharding_active():
40+
return True
41+
return int(get_shard_registry_settings().shard_id) == 0

src/plugins/repeater/speaker.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,17 @@ def cmp(a: int | float, b: int | float) -> int:
9999
"reply_keywords": Speaker.SPEAK_FLAG,
100100
})
101101

102-
bot_id = random.choice([bid for bid in group_replies.keys() if bid])
102+
from src.platform.shard.registry.config import is_sharding_active
103+
104+
from .shard_opt import local_connected_bot_ids
105+
106+
bot_ids = [bid for bid in group_replies.keys() if bid]
107+
if is_sharding_active():
108+
local_bots = local_connected_bot_ids()
109+
bot_ids = [bid for bid in bot_ids if bid in local_bots]
110+
if not bot_ids:
111+
continue
112+
bot_id = random.choice(bot_ids)
103113

104114
ban_keywords = await BanManager.find_ban_keywords(context=None, group_id=group_id)
105115

tests/plugins/repeater/test_speaker.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,55 @@ async def test_speak_filters_banned_keywords(beanie_fixture):
9797
reply_dict.clear()
9898

9999

100+
@pytest.mark.asyncio
101+
async def test_speak_skips_remote_bot_when_sharded(beanie_fixture):
102+
from src.plugins.repeater.message_store import MessageStore
103+
from src.plugins.repeater.speaker import Speaker
104+
105+
MessageStore._message_dict = defaultdict(list)
106+
Speaker._recent_speak = defaultdict(lambda: deque(maxlen=Speaker.DUPLICATE_REPLY))
107+
108+
reply_dict = defaultdict(lambda: defaultdict(list))
109+
reply_lock = asyncio.Lock()
110+
recent_topics = defaultdict(lambda: deque(maxlen=16))
111+
topics_lock = asyncio.Lock()
112+
113+
group_id = 30004
114+
local_bot_id = 10001
115+
remote_bot_id = 10002
116+
msg_list = [_build_message(group_id, 20001 + i, f"warmup-{i}", f"warmup-{i}", i + 1) for i in range(10)]
117+
MessageStore._message_dict[group_id] = msg_list
118+
reply_dict[group_id][remote_bot_id] = [{"time": 1, "reply": "x", "reply_keywords": "x"}]
119+
reply_dict[group_id][local_bot_id] = [{"time": 1, "reply": "y", "reply_keywords": "y"}]
120+
121+
chosen_msg = msg_list[-1]
122+
try:
123+
with (
124+
patch("src.platform.shard.registry.config.is_sharding_active", return_value=True),
125+
patch("src.plugins.repeater.shard_opt.local_connected_bot_ids", return_value=frozenset({local_bot_id})),
126+
patch("src.plugins.repeater.speaker.time.time", return_value=10000),
127+
patch(
128+
"src.plugins.repeater.speaker.random.choice",
129+
side_effect=[local_bot_id, [chosen_msg], chosen_msg],
130+
),
131+
patch("src.plugins.repeater.speaker.random.random", return_value=1.0),
132+
patch(
133+
"src.plugins.repeater.speaker.BanManager.find_ban_keywords",
134+
new_callable=AsyncMock,
135+
return_value=set(),
136+
),
137+
patch("src.plugins.repeater.speaker.BotConfig.taken_name", new_callable=AsyncMock, return_value=-1),
138+
patch("src.plugins.repeater.reply_record_sync.publish_reply_record"),
139+
):
140+
result = await Speaker.speak(reply_dict, reply_lock, recent_topics, topics_lock)
141+
assert result is not None
142+
assert result[0] == local_bot_id
143+
finally:
144+
MessageStore._message_dict.clear()
145+
Speaker._recent_speak.clear()
146+
reply_dict.clear()
147+
148+
100149
@pytest.mark.asyncio
101150
async def test_speak_recent_dedup_avoids_same_message_twice(beanie_fixture):
102151
from src.plugins.repeater.message_store import MessageStore

tests/plugins/test_repeater_shard_opt.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import pytest
44

55
from src.plugins.repeater.shard_opt import (
6+
local_connected_bot_ids,
7+
repeater_maintenance_runs_on_worker,
68
repeater_scheduler_runs_on_worker,
79
repeater_worker_handles_message,
810
)
@@ -84,3 +86,37 @@ def test_scheduler_skips_worker_without_rep(monkeypatch):
8486
lambda: None,
8587
)
8688
assert repeater_scheduler_runs_on_worker() is False
89+
90+
91+
def test_maintenance_runs_only_on_shard_zero_when_sharded(monkeypatch):
92+
monkeypatch.setattr(
93+
"src.platform.shard.registry.config.is_sharding_active",
94+
lambda: True,
95+
)
96+
monkeypatch.setattr(
97+
"src.platform.shard.registry.config.get_shard_registry_settings",
98+
lambda: type("S", (), {"shard_id": 0})(),
99+
)
100+
assert repeater_maintenance_runs_on_worker() is True
101+
102+
monkeypatch.setattr(
103+
"src.platform.shard.registry.config.get_shard_registry_settings",
104+
lambda: type("S", (), {"shard_id": 3})(),
105+
)
106+
assert repeater_maintenance_runs_on_worker() is False
107+
108+
109+
def test_maintenance_runs_when_not_sharded(monkeypatch):
110+
monkeypatch.setattr(
111+
"src.platform.shard.registry.config.is_sharding_active",
112+
lambda: False,
113+
)
114+
assert repeater_maintenance_runs_on_worker() is True
115+
116+
117+
def test_local_connected_bot_ids_reads_nonebot(monkeypatch):
118+
monkeypatch.setattr(
119+
"nonebot.get_bots",
120+
lambda: {"1001": object(), "2002": object(), "bad": object()},
121+
)
122+
assert local_connected_bot_ids() == frozenset({1001, 2002})

0 commit comments

Comments
 (0)