Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 56 additions & 2 deletions app/agent_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -1342,8 +1342,6 @@ def _check_agent_api_gate() -> Dict[str, Any]:
try:
cm = get_config_manager()
ok, reasons = cm.is_agent_api_ready()
# 字段名保留 is_free_version(前端/下游 gate 消费者沿用),值取 agent 维度的
# is_agent_free():判 agent 是否走内置免费模型,而非 core/assist 的版本免费。
return {"ready": ok, "reasons": reasons, "is_free_version": cm.is_agent_free()}
except Exception as e:
return {"ready": False, "reasons": [f"Agent API check failed: {e}"], "is_free_version": False}
Expand Down Expand Up @@ -1650,6 +1648,57 @@ async def _emit_agent_status_update(lanlan_name: Optional[str] = None) -> None:
pass


async def _handle_voice_transcript_request(event: Dict[str, Any]) -> None:
event_id = str((event or {}).get("event_id") or "")
lanlan_name = (event or {}).get("lanlan_name")
result: Dict[str, Any] = {"action": "noop", "reason": "unavailable"}

try:
from plugin.server.application.plugins import voice_transcript_bridge

if not voice_transcript_bridge.voice_transcript_request_has_text(event):
result = voice_transcript_bridge.voice_transcript_noop("empty_transcript")
elif not Modules.analyzer_enabled:
result = voice_transcript_bridge.voice_transcript_noop("agent_disabled")
elif not Modules.agent_flags.get("user_plugin_enabled", False):
result = voice_transcript_bridge.voice_transcript_noop(
"user_plugin_disabled"
)
else:
lifecycle_ready = bool(Modules.plugin_lifecycle_started)
if not lifecycle_ready:
lifecycle_ready = await _ensure_plugin_lifecycle_started()

if not lifecycle_ready:
result = voice_transcript_bridge.voice_transcript_noop(
"plugin_lifecycle_start_failed"
)
else:
result = await voice_transcript_bridge.resolve_voice_transcript_request(
event,
timeout=voice_transcript_bridge.VOICE_TRANSCRIPT_DISPATCH_TIMEOUT_SECONDS,
)
except Exception as exc:
logger.debug(
"[VoiceBridge] plugin dispatch failed: event_id=%s lanlan=%s err=%s",
event_id,
lanlan_name,
exc,
)
result = {
"action": "noop",
"reason": "dispatch_failed",
"error_type": type(exc).__name__,
}

await _emit_main_event(
"voice_bridge_result",
lanlan_name if isinstance(lanlan_name, str) else None,
event_id=event_id,
result=result,
)


async def _on_session_event(event: Dict[str, Any]) -> None:
event_type = (event or {}).get("event_type")
if event_type == "agent_intent_restore_signal":
Expand All @@ -1662,6 +1711,11 @@ async def _on_session_event(event: Dict[str, Any]) -> None:
# has its own once-flag, so this is safe to spam.
await _maybe_restore_agent_intent()
return
if event_type == "voice_transcript_request":
task = asyncio.create_task(_handle_voice_transcript_request(event))
Modules._background_tasks.add(task)
task.add_done_callback(Modules._background_tasks.discard)
return
if event_type == "analyze_request":
messages = event.get("messages", [])
lanlan_name = event.get("lanlan_name")
Expand Down
20 changes: 18 additions & 2 deletions app/main_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def _resolve_user_plugin_base() -> str:
from fastapi.responses import JSONResponse, Response # noqa
from fastapi.staticfiles import StaticFiles # noqa
from main_logic import core as core, cross_server as cross_server # noqa
from main_logic.agent_event_bus import MainServerAgentBridge, notify_analyze_ack, set_main_bridge # noqa
from main_logic.agent_event_bus import MainServerAgentBridge, notify_analyze_ack, notify_voice_bridge_result, set_main_bridge # noqa
from fastapi.templating import Jinja2Templates # noqa
from dataclasses import dataclass # noqa
from typing import Any, Optional # noqa
Expand Down Expand Up @@ -608,6 +608,22 @@ async def _handle_agent_event(event: dict):
notify_analyze_ack(str(event.get("event_id") or ""))
return

if event_type == "voice_bridge_result":
event_id = str(event.get("event_id") or "")
result = event.get("result")
if not event_id:
logger.warning("[EventBus] voice_bridge_result dropped: missing event_id")
return
if not isinstance(result, dict):
logger.warning(
"[EventBus] voice_bridge_result dropped: invalid result payload for event_id=%s payload=%r",
event_id,
result,
)
return
notify_voice_bridge_result(event_id, result)
return

# Agent status updates may be broadcast (lanlan_name omitted).
if event_type == "agent_status_update":
payload = {
Expand Down Expand Up @@ -1621,6 +1637,7 @@ async def get_response(self, path, scope):
# --- 初始化共享状态并挂载路由 ---
# 显式从各子模块导入 router,避免与包级模块导出产生同名遮蔽。
from main_routers.agent_router import router as agent_router # noqa
from main_routers.card_assist_router import router as card_assist_router # noqa
from main_routers.capture_router import router as capture_router # noqa
from main_routers.characters_router import router as characters_router # noqa
from main_routers.cloudsave_router import router as cloudsave_router # noqa
Expand All @@ -1641,7 +1658,6 @@ async def get_response(self, path, scope):
from main_routers.workshop_router import router as workshop_router # noqa
from main_routers.cookies_login_router import router as cookies_login_router # noqa
from main_routers.game_router import router as game_router # noqa
from main_routers.card_assist_router import router as card_assist_router # noqa
from main_routers.debug_router import router as debug_router, start_watchdog as _start_debug_health_watchdog # noqa
from main_routers.shared_state import init_shared_state, set_steamworks_initializer # noqa

Expand Down
119 changes: 119 additions & 0 deletions main_logic/agent_event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ def _zmq_addr(env_key: str, default_port: int) -> str:
_main_bridge_ref: Optional["MainServerAgentBridge"] = None
_ack_waiters: dict[str, asyncio.Future] = {}
_ack_waiters_lock = threading.Lock()
_voice_bridge_waiters: dict[str, asyncio.Future] = {}
_voice_bridge_waiters_resolving: set[str] = set()
_voice_bridge_waiters_lock = threading.Lock()


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -316,6 +319,42 @@ def _resolve() -> None:
loop.call_soon_threadsafe(_resolve)


def notify_voice_bridge_result(event_id: str, result: Dict[str, Any]) -> None:
"""Resolve a pending voice-bridge request sent from main_server to agent_server."""
if not event_id:
return
payload = result if isinstance(result, dict) else {}
with _voice_bridge_waiters_lock:
waiter = _voice_bridge_waiters.get(event_id)
if waiter is not None and not waiter.done():
_voice_bridge_waiters_resolving.add(event_id)
else:
_voice_bridge_waiters_resolving.discard(event_id)
if waiter is None or waiter.done():
return
loop = waiter.get_loop()

def _resolve() -> None:
with _voice_bridge_waiters_lock:
if _voice_bridge_waiters.get(event_id) is not waiter:
_voice_bridge_waiters_resolving.discard(event_id)
return
_voice_bridge_waiters.pop(event_id, None)
_voice_bridge_waiters_resolving.discard(event_id)
if not waiter.done():
waiter.set_result(payload)

try:
loop.call_soon_threadsafe(_resolve)
except RuntimeError:
with _voice_bridge_waiters_lock:
if _voice_bridge_waiters.get(event_id) is waiter:
if not waiter.done():
waiter.cancel()
_voice_bridge_waiters.pop(event_id, None)
_voice_bridge_waiters_resolving.discard(event_id)


# ---------------------------------------------------------------------------
# Layering-inversion sinks: lower layers (main_logic) emit, higher layers
# (plugin / main_routers) register.
Expand Down Expand Up @@ -492,3 +531,83 @@ async def publish_analyze_request_reliably(
)

return False


async def publish_voice_transcript_request_reliably(
lanlan_name: str,
transcript: str,
*,
timeout_s: float = 1.2,
retries: int = 0,
metadata: Optional[Dict[str, Any]] = None,
) -> Optional[Dict[str, Any]]:
"""Ask agent_server/plugins how a realtime voice transcript should be handled.

Returns a plugin-produced action payload, or ``None`` when the bridge is not
running or no response arrives within the bounded timeout.
"""
text = str(transcript or "").strip()
if not text:
return None
event_id = uuid.uuid4().hex
loop = asyncio.get_running_loop()
waiter: asyncio.Future = loop.create_future()
with _voice_bridge_waiters_lock:
_voice_bridge_waiters[event_id] = waiter

try:
for attempt in range(max(retries, 0) + 1):
event = {
"event_type": "voice_transcript_request",
"event_id": event_id,
"lanlan_name": lanlan_name,
"transcript": text,
"metadata": dict(metadata or {}),
"attempt": attempt,
}
sent = await publish_session_event(event)
if not sent:
logger.debug(
"[EventBus] voice_transcript_request not sent: no main bridge lanlan=%s attempt=%s",
lanlan_name,
attempt,
)
continue
try:
result = await asyncio.wait_for(asyncio.shield(waiter), timeout=timeout_s)
return result if isinstance(result, dict) else {}
except asyncio.TimeoutError:
with _voice_bridge_waiters_lock:
result_is_queued = (
_voice_bridge_waiters.get(event_id) is waiter
and event_id in _voice_bridge_waiters_resolving
)
if result_is_queued:
try:
result = await asyncio.wait_for(
asyncio.shield(waiter),
timeout=0.05,
)
return result if isinstance(result, dict) else {}
except asyncio.TimeoutError:
pass
logger.debug(
"[EventBus] voice_transcript_request timed out: event_id=%s lanlan=%s attempt=%s",
event_id,
lanlan_name,
attempt,
)
continue
return None
finally:
should_cancel = False
with _voice_bridge_waiters_lock:
if (
_voice_bridge_waiters.get(event_id) is waiter
and event_id not in _voice_bridge_waiters_resolving
):
_voice_bridge_waiters.pop(event_id, None)
should_cancel = True
_voice_bridge_waiters_resolving.discard(event_id)
if should_cancel and not waiter.done():
waiter.cancel()
82 changes: 82 additions & 0 deletions main_logic/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
from main_logic.agent_event_bus import (
dispatch_text_user_message,
dispatch_user_utterance,
publish_voice_transcript_request_reliably,
)
from utils.preferences import load_global_conversation_settings, aload_global_conversation_settings
from config import (
Expand Down Expand Up @@ -1966,6 +1967,75 @@ def _publish_user_utterance_to_plugin_bus(
# swallowed inside the dispatcher.
dispatch_user_utterance(bucket, event)

async def _dispatch_voice_transcript_bridge(self, transcript: str) -> str:
"""Let plugin-side voice filters decide whether to cancel or prime context."""
session_snapshot = self.session
try:
result = await publish_voice_transcript_request_reliably(
self.lanlan_name,
transcript,
metadata={
"session_type": type(session_snapshot).__name__ if session_snapshot else "",
"voice_source": True,
},
)
except Exception as exc:
logger.debug("[%s] voice bridge request failed: %s", self.lanlan_name, exc)
return ""
if not isinstance(result, dict) or not result:
return ""

def _session_changed() -> bool:
if self.session is session_snapshot:
return False
logger.debug("[%s] voice bridge result ignored after session change", self.lanlan_name)
return True

if _session_changed():
return ""

action = str(result.get("action") or "").strip()
if action == "cancel_response":
if _session_changed():
return ""
cancel_response = getattr(session_snapshot, "cancel_response", None)
if not callable(cancel_response):
return ""
try:
if _session_changed():
return ""
await cancel_response()
logger.debug("[%s] voice bridge cancelled current response", self.lanlan_name)
except Exception as exc:
logger.debug("[%s] voice bridge cancel skipped/failed: %s", self.lanlan_name, exc)
return ""
return action
Comment thread
coderabbitai[bot] marked this conversation as resolved.

if action == "prime_context":
context_text = str(result.get("context") or "").strip()
if not context_text:
return action
if _session_changed():
return ""
prime_context = getattr(session_snapshot, "prime_context", None)
if not callable(prime_context):
return action
skipped = bool(result.get("skipped", False))
try:
if _session_changed():
return ""
await prime_context(context_text, skipped=skipped)
logger.debug(
"[%s] voice bridge primed context len=%d skipped=%s",
self.lanlan_name,
len(context_text),
skipped,
)
except Exception as exc:
logger.debug("[%s] voice bridge prime skipped/failed: %s", self.lanlan_name, exc)
return action
return action

def _reset_voice_echo_suppression_cache(self) -> None:
self._recent_ai_voice_echo_text = ''
self._recent_ai_voice_echo_at = 0.0
Expand Down Expand Up @@ -2162,6 +2232,18 @@ async def handle_input_transcript(self, transcript: str, *, is_voice_source: boo
# 维持 voice_engaged 状态。
self._activity_tracker.on_voice_rms()

if is_voice_source and transcript_text:
session_snapshot = self.session
voice_bridge_action = await self._dispatch_voice_transcript_bridge(transcript_text)
if session_snapshot is not self.session:
logger.debug(
"[%s] voice bridge action ignored after session change",
self.lanlan_name,
)
voice_bridge_action = ""
if voice_bridge_action == "cancel_response":
return

if is_voice_source:
# 仅非空转录才算"用户消息":on_user_message 会清掉 unfinished_thread、
# bump _conv_seq(让 open_threads 缓存失效)、把文本进 buffer 给
Expand Down
Loading
Loading