Skip to content

Commit 4ce797d

Browse files
wehosHongzhi Wenclaude
authored
feat(proactive): 通用主动投递框架 — 前端播放门控 + 优先级排序 + 合并 (Project-N-E-K-O#1545)
* feat(proactive): 通用主动投递框架 —— 前端播放门控 + 优先级排序 + 合并 把所有主动消息(push_message ai_behavior=respond / agent 任务结果)的投递从 "response.done(生成完成)即放行"升级为前端真实播放完成(voice_play_end)才放行, 并新增优先级排序、可选合并、单飞 + 最小间隔节流,修复语音模式下说不停 / 自我打断。 - main_logic/lifecycle_bus.py:进程内命名事件总线(voice_play_start/end, text_start/end) - main_logic/proactive_delivery.py:ProactiveDeliveryManager 前置层。不替换 pending_agent_callbacks(保留其竞态测试覆盖),坐落于 EventBus proactive 接缝。 优先级低=更紧急;未指定(0)归一化到中性档(100),不让既有高数值插件被反转/饿死。 合并 opt-in:未设 coalesce_key 即不合并,保证不丢任何插件的不同 cue。 - core.py:语音 inject gate 增加 _voice_playback_active(键于前端播放而非生成); on_voice_playback_signal 在 voice_play_end 重投;text_start/end 包住 prompt_ephemeral - push_message 贯通 coalesce_key:schema → context._build_wire_payload → bridge → callback → manager - websocket_router:分发 voice_play_start / voice_play_end → on_voice_playback_signal - 前端 app-audio-playback.js:真实播放开始 / 队列排空 / 打断时上报 voice_play_start/end (走同一 ws,含 Electron chat.html WSProxy/IPC 路径) - mc 插件:alert=1 / completion=2 / in_progress=3 / keep_going=4 + 按类合并键; 周期状态突发降级为 ai_behavior=read;in_progress 仅叙述不派任务; keep_going 不在 master 刚下达指令时抢方向盘(zh/en/ja/ko/ru/es/pt 同步) - 跨插件无回归:read/passive/blind 不经 manager;HUD toast 路径独立;优先级仅重排不丢弃 新增 tests/unit/test_proactive_delivery.py(优先级/合并/门控/节流/过期)。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * review: 空 except 补说明/日志(code-quality bot Project-N-E-K-O#1545) lifecycle_bus emit 守卫改记 debug 日志并注明:总线已逐 handler 隔离异常, 此守卫仅防 emit 自身抛错,绝不让生命周期信号打断投递路径。 unsubscribe 的 ValueError pass 补注幂等性说明。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * review(proactive): 播放门控自愈 + 边界 priority OverflowError 兜底(Project-N-E-K-O#1545) CodeRabbit/Codex review: - manager:voice_play_start 后若 voice_play_end 丢失(前端断线/刷新 mid-play), _playing 会永久 True 卡死队列 → 加 max_play_s=30s 看门狗超时自动重开门; release 后按 inflight_timeout 主动排一次 _pump,避免 deliver 被内层吞掉且 无后续信号时后续 cue 卡死;新增 reset() 清门控+队列。 - core:_voice_playback_active 改为时间受限读取 _is_voice_playing()(30s 自愈); start_session/end_session 调 _reset_proactive_gate() 做会话级重置(切角色/断线)。 - main_server + proactive_bridge:int(priority) 兜住 OverflowError(JSON Infinity 边界输入),不让坏 priority 丢掉整条回调/消息。 - 新增看门狗 + reset 单测。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * review(proactive): gate reset 移到 stale-session guard 之后(Project-N-E-K-O#1545 Codex P1) _reset_proactive_gate() 原放在 end_session/start_session 入口,早于 stale-session / 熔断 / "正在启动中" 去重等早退判断 → 过期或被拒的调用会误清掉当前活跃会话的 播放门控并丢掉其排队 cue(Codex P1)。 - end_session:移到 self.lock 块之后(stale 分支已 return),仅真正 teardown 时 reset。 - start_session:移到 _starting_session_count++ 之后(熔断/去重早退之后),仅确定 要起新会话时 reset。 - 测试:passthrough respond 用例补 enqueue_agent_callback.assert_not_called() 负向 断言,锁死「不走旧直投路径」防双路投递回归。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(proactive): 恢复批量投递 + teardown 不丢 cue(保留播放门控) 按产品判断调整两处行为,核心修复(播放门控修自我打断 / 优先级 / coalesce)保留: - 单飞 → 批量:manager 门开时一次性放行整批(按优先级排序),交 trigger 渲染成 一轮 LLM,恢复"几条几乎同时到的主动 cue 合进一轮"的旧语义。manager 只决定 "何时放这一批"(等 voice_play_end + min_gap),不再拆成一条一轮。 _deliver_proactive_callback → _deliver_proactive_batch(list)。 - teardown 不丢:reset() 拆成 reset_gate()(只清门控/单飞状态,不清队列)+ drain_pending()(取出排队 cue)。_reset_proactive_gate 把未投递的 cue 转入 pending_agent_callbacks(持久、重连补投),不再丢弃——proactive 一般是重要消息。 - 测试改为批量语义:deliver 收 list;新增"第二批等下次 play_end""drain 不投递" "reset_gate 不丢队列"用例。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(proactive): 把 respond 的图片随 callback 延迟到 release 再 stream(Project-N-E-K-O#1545 Codex P2) push_message 带图片 + ai_behavior=respond(如 mc in_progress 截图 cue)时,图片原本 在 EventBus 里立即 stream 进当前 session,而文本 callback 被 pacing manager 延迟 → 她正在说话/min-gap内/还没 session 时,图片落进了上一/当前轮(或无 session 被丢), 等几秒后 manager 放行文本触发回应时,视觉上下文已对不上。 - main_server:respond 的 image 不再立即 stream,改为挂到 callback["media_images"]; read(被动上下文)维持立即 stream 不变。 - core._deliver_proactive_batch:在 release(enqueue+trigger)前先把 callback 携带的 图片 stream 进 session,保证同一轮 trigger 产生的回应能看到匹配的图。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * review(proactive): media 绑定到投递点 + drain 保优先级 + deliver 类型契约(Project-N-E-K-O#1545) - Codex P2:proactive 图片的 stream 从 release 点(_deliver_proactive_batch,彼时 self.session 可能为 None / 将被 hot-swap 换掉,且根本不覆盖重连补投/turn-end 重试 路径)移到真正投递点。新增 _stream_cb_media,在 trigger 的语音 inject 前 / 文本 prompt_ephemeral 前 best-effort stream;无 session 则保留 media 待下次投递(不丢), stream 后 pop 防 defer-retry 重复。现覆盖 release / 重连补投 / turn-end 重试 全路径。 - CodeRabbit:drain_pending 改为按 sort_key(优先级升序 + 同优先级 FIFO)导出, teardown 补投不再丢失优先级语义。 - CodeRabbit(outside-diff):deliver 类型注解 Callable[[dict]] → Callable[[list]],与 批量交付契约一致。 - 测试:drain 顺序断言。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * review(proactive): deliver 类型精确化 list[dict] + drain/reset_gate 测试加固(Project-N-E-K-O#1545) - CodeRabbit(outside-diff):deliver 注解 Callable[[list]] → Callable[[list[dict]]], _run_deliver 参数标 list[dict],与批量交付实际入参一致。 - CodeRabbit(nitpick):drain 用例补"开门后仍不释放"断言(确认队列确实清空)。 - CodeRabbit(nitpick):reset_gate 用例改为先入队再 reset,验证旧队列未丢失——但用 符合设计的断言:reset_gate 只清门控、不 auto-pump,旧 cue 在下次 submit 触发的 批次里按优先级随新 cue 一起放出(非其建议补丁假设的"reset 后自动投递")。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * review(proactive): media 失败保留/不丢图 + coalesce_key 透传 SDK facade(Project-N-E-K-O#1545 Codex×3) - Codex P2 + CodeRabbit:_stream_cb_media 流图失败时不再 pop media_images,保留待 下次投递(重连/重试)重新 stream;仅全部成功才 pop,避免 defer-retry 重复。 - Codex P2:image-only 的 respond(无 text 载体走 manager)退回立即 stream(如 read), 不再 defer 后丢图——旧行为本就是立即 stream,此前重构把它丢了,是回归。 - Codex P2:coalesce_key 透传 SDK v2 facade(plugin/sdk/shared/core/context.py)及 Protocol 签名(sdk/shared/core/types.py、_types/protocols.py),契约完整、任意插件可 opt-in。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs(proactive): 注明 effective_priority 的全局约定与 legacy 高数值插件已知限制(Project-N-E-K-O#1545) Codex P2:现有 higher=important 插件(bilibili gift=9)在 lower=urgent 单一约定下, 批内会排在低数值 cue 之后。这是任务规范要求的全局约定与现有插件约定的张力,非 bug—— 影响仅限单轮已合批的叙述顺序(不丢,HUD toast 独立触发);不盲目 remap legacy(各插件 数值语义不同),迁移现有 producer 列为后续。注释记录之。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * review(proactive): media 部分成功后只保留未发送的尾部,不重发已落地图片(Project-N-E-K-O#1545) CodeRabbit:上一版失败时保留整份 media_images,若前几张已 stream 成功、后面失败, 重试会从头重放已成功的图,与"don't re-stream what already landed"注释矛盾且重复塞图。 改为只保留 images[streamed:](失败那张及之后),已成功的丢弃。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * refactor(proactive): 优先级统一为 higher=important(最大公约数)+ 整改 mc 按 maintainer 决策:取现有插件的最大公约数约定,而非沿用我最初设的 lower=urgent。 - 约定:HIGHER number = 更重要(与 bilibili gift/SC=9、memo=8、study answer=5、 HUD priority_min 过滤一致);未指定/非法 → 0 = 最低(不抢占设了优先级的 cue)。 effective_priority 去掉 NEUTRAL 映射、0 默认;_QueuedCue.sort_key 改 (-eff_priority, seq) 即重要性降序 + FIFO。OverflowError 也兜(Infinity → 0)。 - mc(唯一不一致的、且属本人)整改到同尺度:alert 1→9(最重要)、completion 2→7、 in_progress 3→4、keep_going 4→3,保持 alert>completion>in_progress>keep_going。 - 0 回退:现有插件数值不动,翻转后它们的相对顺序恢复其本意(gift9 先于 guidance8), 顺带彻底解决 Codex 之前指出的"高价值 gift 被低值 cue 抢先"反转(不再是婉拒)。 - 测试断言改 higher-first;去 NEUTRAL_PRIORITY 引用。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(proactive): 文本模式 prompt_ephemeral 消费 _pending_images,图文同轮(Project-N-E-K-O#1545 Codex P2) OmniOfflineClient.stream_image 只塞 _pending_images(由 stream_text 消费),而 prompt_ephemeral 原来只用 text 构建 messages_to_send,不读 _pending_images → 文本模式下 proactive(respond)+图 的回应看不到图,图漏到下次 user turn(错误轮次上下文)。 改为 prompt_ephemeral 在有 _pending_images 时构建多模态 HumanMessage(图+instruction)、 按需切 vision model、清空 _pending_images,镜像 stream_text 行为。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * revert(proactive): 文本模式 prompt_ephemeral 不再强行消费图/切 vision(Project-N-E-K-O#1545) 回退 4c1e4d4 在 prompt_ephemeral 里消费 _pending_images + switch_model 的做法, 它引入两个问题: - CodeRabbit:switch_model 永久改 session 级 self.llm/model,一次带图 proactive 后 后续普通用户轮也被切到 vision 模型。 - Codex:ephemeral 消费 _pending_images 后若 LLM 调用失败/无输出重试,图已 clear, 重试无图。 改为:文本模式 proactive 的图留在 _pending_images,由下一轮 stream_text 正常带出 (已处理多模态+vision 切换并持久化进 history,之后所有轮含 proactive 都能看到)。 图可能晚一轮可见,属合理退化;严格同轮保证仅适用语音模式(stream_image→持久 conversation.item,不受影响)。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(proactive): 文本模式 proactive 不注入图,避免泄漏到下一条 user 消息(Project-N-E-K-O#1545 Codex P2) Codex:回退后文本模式 _stream_cb_media 仍调 OmniOfflineClient.stream_image,把图 stage 进 _pending_images——而它绑的是下一条 stream_text(user turn),于是 respond cue 的截图会附到用户接下来某条无关消息上(错误视觉关联)。 根因:stream_image 在语音(持久 conversation.item,同轮)与文本(_pending_images,绑下条 user text)语义不同。_stream_cb_media 现仅对 OmniRealtimeClient 注入图;文本 session 直接丢弃 media_images(文本模式 proactive 纯文本),不污染 _pending_images。语音模式 仍精确同轮、不丢。一并消除此前文本模式 media 的 vision 永久切换/重试丢图/泄漏诸 edge。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(proactive): deferred voice cue 重试遵守 min-gap(Project-N-E-K-O#1545 Codex P2) cue 在 realtime busy 时被 release → trigger defer 留 pending_agent_callbacks; on_voice_playback_signal 原在 voice_play_end 立即 re-fire trigger 重投,绕过 manager 的 min_gap,导致她零间隔开始下一个 proactive turn。改为延迟 manager.min_gap_s 再 re-fire(与 manager 自身 on_playback_end → pump(min_gap) 节奏一致)。manager 暴露 只读 min_gap_s。trigger 自带 re-gate,延迟期间又开始说话则自然 defer。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * review: 去掉多余的 delay 初始赋值(Project-N-E-K-O#1545 code-quality) try/except 两分支都给 delay 赋值,初始 delay=0.0 是 dead store。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * feat(proactive): 文本模式 proactive 带图也切 vision(maintainer 决策) 按 maintainer:文本会话切 vision 本就是 stream_text 既定行为(带图即永久切,注释明说 cannot switch back),proactive 带图同理应切,不该砍成纯文本。恢复 prompt_ephemeral 消费 _pending_images + 切 vision,文本模式 proactive 图同轮可见。 之前回避的两个真问题这次正解、而非退缩: - 重试丢图:_stream_cb_media 改 preserve-until-success —— media_images 保留在 cb 上 直到投递成功并被 prune,失败/重试重新 stage(文本重填 _pending_images / 语音重加 conversation.item);部分 stream 失败只保留未发送尾部。 - 泄漏到下条 user 消息:prompt_ephemeral 现消费 _pending_images,不再残留给下条 stream_text。 - _stream_cb_media 现对语音+文本都注入图(之前只语音)。 已知副作用:文本会话一旦收到 proactive 截图即永久切 vision 端点(与 stream_text 带图 语义一致;vision model 亦能处理后续纯文本轮)。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(proactive): manager release gate 对齐 trigger,busy 时 cue 留队列(Project-N-E-K-O#1545 Codex P2) manager 的 gate 只看 _playing(voice_play 信号),覆盖不到"response 还在生成 (is_active_response,voice_play_start 之前)"或 SM 非 IDLE 的 busy 窗口。此时 manager 会 release → 内层 trigger 立刻 defer → cue 落入 pending_agent_callbacks(manager 之外), 后续同 key/更高优先级 cue 无法对它合并/排序。 加 can_release 谓词(= trigger defer 条件取反:playback / phase!=IDLE / is_active_response): manager release 前先 re-gate,busy 则把 cue 留在队列、每 0.5s 重检,不再过早 dump 到 pending。谓词异常默认放行(不卡死)。core 提供 _can_release_proactive。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(proactive): can_release 用 _is_voice_playing() 自愈 + 空 except 补日志(Project-N-E-K-O#1545) - Codex P1:_can_release_proactive 原读 raw _voice_playback_active,前端丢 voice_play_end 时 flag 永久 True → can_release 永远 False → 0.5s busy 重检无限循环、cue 卡死,绕过了 30s watchdog。改用 _is_voice_playing()(时间受限、自愈)。 - code-quality:phase / is_active_response 两处空 except 改记 debug 日志并注明降级行为。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(proactive): media stream 失败时 defer 整批投递,不 text-only 投递丢图(Project-N-E-K-O#1545 Codex P2) _stream_cb_media 之前失败保留 media 但 caller 仍继续投文本;若文本投递成功 cb 被 prune, 保留的 media 就永远不会重试(preserve 白做)。改为 _stream_cb_media 返回 bool,任一图 stream 失败返回 False: - 语音分支:inject 前若 media 失败则 return(不 inject),cb 仍在 pending 未 prune, 下次 trigger 带图重试。 - 文本分支:prompt 前若 media 失败则 requeue + return(文本 append 不抛,防御性对偶)。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(proactive): 文本模式 proactive 图与 user 的 _pending_images 队列分离(Project-N-E-K-O#1545 Codex P2) _pending_images 是 USER 的 screen/camera 待投队列(下一条 stream_text 消费)。我之前让 proactive 复用它(stream_image stage + prompt_ephemeral 消费),会偷走 user 刚 stage 的 图:proactive turn 错误看到 user 图,user 下条消息又丢了视觉上下文。 改为彻底分离: - prompt_ephemeral 加显式 images 参数,用它构建多模态 + 切 vision,**不碰 _pending_images**。 - _deliver_agent_callbacks_text 从 cb.media_images 收集图、显式传 prompt_ephemeral, 不再走 _stream_cb_media(那是语音路径,用 realtime 持久 conversation.item)。 - _stream_cb_media 现仅语音用;docstring 更新。media 仍保留在 cb 上至投递成功被 prune。 - 测试 fake prompt_ephemeral 接受 images kwarg。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(proactive): voice media 部分失败时保留全份,不 trim(Project-N-E-K-O#1545 Codex P2) 与之前为 CodeRabbit 做的"部分失败保留尾部"相反——经独立判断采纳 Codex:voice stream_image 失败几乎总意味 session 正在关闭,重试会落到新 session(新 conversation 没有前面已成功 stream 的图),trim 尾部会把那些图永久丢掉。改为失败保留全份 media_images, 让新 session 重试时整组重 stream。同 session 重试的重复风险在失败路径罕见且无害。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(proactive): can_release 也 gate 文本 session 的 _is_responding(Project-N-E-K-O#1545 Codex P2) _can_release_proactive 之前只查 realtime 的 is_active_response,漏了文本(OmniOfflineClient) user response 生成中(_is_responding)。文本 user 回应进行时谓词返回 True → manager release → trigger try_start_proactive 拒 claim → cue 落 pending、逃脱 manager 合并/排序。改用通用 getattr(sess,"_is_responding",False),同时覆盖语音(response.created→voice_play_start 窗口) 与文本(活跃 user response)。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(proactive): 播放 watchdog 30s→180s,避免误杀合法长回复(Project-N-E-K-O#1545 Codex P2) 30s 太短:response-length 允许的长 TTS 回复播超 30s 时,watchdog 清 _playing + _can_release 放行 → 插件 cue 注入打断她自己(正是 PR 要修的问题反被 watchdog 制造)。 watchdog 的真正目的是兜底"voice_play_end 永不来"——而其主因(前端断线/刷新)已由 session teardown 的 _reset_proactive_gate 覆盖,时间 ceiling 只剩"连接在但信号丢"这种 极罕见情况要兜,故应远大于任何合法单回复。manager.max_play_s 与 core._VOICE_PLAYBACK_ STALE_S 同步调到 180s。完全事件驱动(前端 per-turn 心跳)是更大改动,留作后续。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * tweak(proactive): 播放 watchdog 180s→45s(maintainer 决策) maintainer:单回复不会超 45s,45s 足够覆盖正常回复且能更快从丢失的 voice_play_end 恢复。manager.max_play_s + core._VOICE_PLAYBACK_STALE_S 同步 45s。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(proactive): voice media-fail defer 时 re-arm 延迟重试(Project-N-E-K-O#1545 Codex P2) media-stream 失败 defer 后 cb 留 pending_agent_callbacks,但 manager 队列已空、其 inflight timeout 只 pump manager 队列,且 response.create 未发→无 response.done/ voice_play_end 重驱 trigger,cue 会卡到下次无关 turn。defer 时改为 _schedule_proactive_retry(min_gap) 延迟重投,瞬时 media/WS 失败可自恢复。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * tweak(mc-prompt): KEEP_GOING 去掉多余的"别乱填坐标"(7 语言同步) maintainer:该具体例子多余。7 个 locale 同步删掉括号短语,各语言衔接自然 (逗号直接连接,无直译残渣);占位符 {MASTER_NAME} 完好。RETROACTIVE 里 "改坐标(换思路重试)"是正向建议、语义不同,保留。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * fix(proactive): cue 图 bypass native-vision 帧率节流,避免被静默丢(Project-N-E-K-O#1545 Codex P2) OmniRealtimeClient.stream_image 对 native-vision session 在 NATIVE_IMAGE_MIN_INTERVAL 内会静默 return(不发不抛);mc 持续 stream 截图(read)紧接 respond cue 带图时,proactive 图会被这节流吞掉、且 inject 只发 text→回应没图。stream_image 加 bypass_rate_limit: proactive cue 图是单张刻意图(非高频流),bypass 帧率检查必发(仍更新时间戳)。 _stream_cb_media 传 True;offline.stream_image 加同名参数(忽略,签名对齐)。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * docs(proactive): 注明 bypass 帧也计入限流窗口(Project-N-E-K-O#1545 CodeRabbit nitpick) bypass_rate_limit 跳过 interval 检查但仍 stamp _last_native_image_time——补注释说明 原因:确实向服务器发了一帧,应计入节流窗口,防连续 bypass cue 图 flood native vision。 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Hongzhi Wen <cartabio.coder1@gmail.com> Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent f34fed1 commit 4ce797d

21 files changed

Lines changed: 1199 additions & 65 deletions

app/main_server.py

Lines changed: 56 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -710,6 +710,14 @@ async def _send_play(target_mgr):
710710
# ai_behavior=blind suppresses injection entirely.
711711
media_parts = event.get("media_parts") if isinstance(event.get("media_parts"), list) else []
712712
ai_behavior_v2 = event.get("ai_behavior")
713+
# Images that must travel WITH a proactive (respond) callback so they
714+
# can be streamed at the moment the pacing manager releases the cue
715+
# (see LLMSessionManager._deliver_proactive_batch). Streaming them
716+
# here immediately would land the image in the previous/current turn
717+
# (or drop it when no session exists yet) while the text is held back
718+
# by the manager — the eventual proactive response would then lack
719+
# its matching visual context.
720+
deferred_proactive_images: list[str] = []
713721
if media_parts and ai_behavior_v2 in ("respond", "read"):
714722
sess = getattr(mgr, "session", None)
715723
stream_image = getattr(sess, "stream_image", None) if sess else None
@@ -730,13 +738,24 @@ async def _send_play(target_mgr):
730738
part_type, mime,
731739
)
732740
continue
733-
if stream_image is None:
734-
logger.debug(
735-
"[EventBus] image media_part dropped: session=%s has no stream_image",
736-
type(sess).__name__ if sess else "None",
737-
)
738-
continue
739741
if isinstance(b64, str) and b64:
742+
if ai_behavior_v2 == "respond" and text:
743+
# Defer: stream when the manager releases this cue so
744+
# the image shares the proactive response's context.
745+
# (Only when there's text — the callback that carries
746+
# these images is built in the ``if text:`` block.)
747+
deferred_proactive_images.append(b64)
748+
continue
749+
# read (passive), OR image-only respond with no text to
750+
# carry it through the pacing manager: inject now so it
751+
# isn't lost (image-only respond has no text cue to drive
752+
# a proactive turn anyway).
753+
if stream_image is None:
754+
logger.debug(
755+
"[EventBus] image media_part dropped: session=%s has no stream_image",
756+
type(sess).__name__ if sess else "None",
757+
)
758+
continue
740759
# ``stream_image`` takes a base64 STRING (not bytes); pass through
741760
try:
742761
await stream_image(b64)
@@ -843,6 +862,18 @@ async def _send_play(target_mgr):
843862
# producer that lands on this branch); see the (event_type in
844863
# {"task_result", "proactive_message"}) gate above.
845864
origin = "event"
865+
# Proactive-delivery hints from push_message (priority +
866+
# coalesce_key). Lower priority = more urgent; unspecified
867+
# (0) is normalised to a neutral band by the manager.
868+
try:
869+
# OverflowError: JSON Infinity/-Infinity → float → int() raises;
870+
# must not let a malformed priority drop the whole callback.
871+
cb_priority = int(event.get("priority", 0) or 0)
872+
except (TypeError, ValueError, OverflowError):
873+
cb_priority = 0
874+
cb_coalesce_key = event.get("coalesce_key")
875+
if not isinstance(cb_coalesce_key, str):
876+
cb_coalesce_key = ""
846877
callback = {
847878
"event": "agent_task_callback",
848879
"origin": origin,
@@ -856,30 +887,38 @@ async def _send_play(target_mgr):
856887
"source_kind": source_kind,
857888
"source_name": source_name,
858889
"delivery_mode": delivery_mode,
890+
"priority": cb_priority,
891+
"coalesce_key": cb_coalesce_key,
892+
# Images to stream at manager-release time (respond only;
893+
# empty for read, which already streamed above).
894+
"media_images": deferred_proactive_images,
859895
"timestamp": event.get("timestamp") or "",
860896
"metadata": event_metadata,
861897
"context_type": event_metadata.get("context_type") or "",
862898
}
863899
if delivery_mode != "silent":
864-
mgr.enqueue_agent_callback(callback)
865900
if delivery_mode == "passive":
901+
# Passive cues keep the direct enqueue-only path:
902+
# they must NOT interrupt; the next user turn drains
903+
# them. The pacing manager only governs proactive.
904+
mgr.enqueue_agent_callback(callback)
866905
logger.info(
867906
"[EventBus] %s enqueued callback (passive); next user turn will carry it",
868907
event_type,
869908
)
870909
else:
910+
# Proactive: hand to the delivery manager, which
911+
# orders by priority, coalesces by key, and paces
912+
# release on the frontend playback gate + min-gap.
871913
logger.info(
872-
"[EventBus] %s enqueued callback, scheduling trigger_agent_callbacks",
873-
event_type,
914+
"[EventBus] %s submitting proactive callback to delivery manager (priority=%s key=%r)",
915+
event_type, cb_priority, cb_coalesce_key or "(source)",
916+
)
917+
mgr.submit_proactive_callback(
918+
callback,
919+
priority=cb_priority,
920+
coalesce_key=cb_coalesce_key or None,
874921
)
875-
876-
# Create task with exception logging
877-
async def _run_trigger_with_logging():
878-
try:
879-
await mgr.trigger_agent_callbacks()
880-
except Exception as e:
881-
logger.error("[EventBus] trigger_agent_callbacks task failed: %s", e)
882-
mgr._pending_agent_callback_task = asyncio.create_task(_run_trigger_with_logging())
883922
else:
884923
logger.info(
885924
"[EventBus] %s delivery=silent: skipping LLM channel (frontend HUD still fires)",

0 commit comments

Comments
 (0)