Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions app/agent_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import mimetypes
import json
from collections.abc import Mapping
mimetypes.add_type("application/javascript", ".js")
import asyncio
import uuid
Expand Down Expand Up @@ -49,6 +50,14 @@
from utils.config_manager import get_config_manager
from utils.tokenize import truncate_to_tokens as _tt
from main_logic.agent_event_bus import AgentServerEventBridge
from plugin.server.application.plugins.voice_contracts import (
VOICE_TRANSCRIPT_EVENT_ID,
VOICE_TRANSCRIPT_EVENT_TYPE,
arbitrate_voice_transcript_results,
voice_transcript_custom_event_args,
voice_transcript_noop,
voice_transcript_request_has_text,
)
try:
from brain.computer_use import ComputerUseAdapter
from brain.browser_use_adapter import BrowserUseAdapter
Expand Down Expand Up @@ -1650,6 +1659,65 @@ async def _emit_agent_status_update(lanlan_name: Optional[str] = None) -> None:
pass


VOICE_TRANSCRIPT_CUSTOM_EVENT_TIMEOUT_SECONDS = 1.0


def _voice_bridge_action_from_dispatch_results(dispatch_results: object) -> Dict[str, Any]:
return arbitrate_voice_transcript_results(dispatch_results)


async def _dispatch_voice_transcript_custom_event(
event: Mapping[str, object],
) -> Dict[str, Any]:
from plugin.server.application.plugins.dispatch_service import PluginDispatchService

dispatch_results = await PluginDispatchService().trigger_custom_event_subscribers(
event_type=VOICE_TRANSCRIPT_EVENT_TYPE,
event_id=VOICE_TRANSCRIPT_EVENT_ID,
args=voice_transcript_custom_event_args(event),
timeout=VOICE_TRANSCRIPT_CUSTOM_EVENT_TIMEOUT_SECONDS,
)
return _voice_bridge_action_from_dispatch_results(dispatch_results)


async def _handle_voice_transcript_request(event: Dict[str, Any]) -> None:
event_id = str((event or {}).get("event_id") or "")
lanlan_name = (event or {}).get("lanlan_name")
result: Dict[str, Any] = voice_transcript_noop("unavailable")

try:
if not voice_transcript_request_has_text(event):
result = voice_transcript_noop("empty_transcript")
elif not Modules.analyzer_enabled:
result = voice_transcript_noop("agent_disabled")
elif not Modules.agent_flags.get("user_plugin_enabled", False):
result = voice_transcript_noop("user_plugin_disabled")
elif not bool(Modules.plugin_lifecycle_started):
result = voice_transcript_noop("plugin_warming_up")
else:
result = await _dispatch_voice_transcript_custom_event(event)
except asyncio.CancelledError:
raise
except Exception as exc:
logger.debug(
"[VoiceBridge] plugin dispatch failed: event_id=%s lanlan=%s err=%s",
event_id,
lanlan_name,
exc,
)
result = voice_transcript_noop(
"dispatch_failed",
error_type=type(exc).__name__,
)

await _emit_main_event(
"voice_bridge_result",
lanlan_name if isinstance(lanlan_name, str) else None,
event_id=event_id,
result=result,
)


async def _on_session_event(event: Dict[str, Any]) -> None:
event_type = (event or {}).get("event_type")
if event_type == "agent_intent_restore_signal":
Expand All @@ -1662,6 +1730,20 @@ async def _on_session_event(event: Dict[str, Any]) -> None:
# has its own once-flag, so this is safe to spam.
await _maybe_restore_agent_intent()
return
if event_type == "voice_transcript_request":
seen_task = asyncio.create_task(
_emit_main_event(
"voice_bridge_request_seen",
event.get("lanlan_name") if isinstance(event.get("lanlan_name"), str) else None,
event_id=str(event.get("event_id") or ""),
)
)
Modules._background_tasks.add(seen_task)
seen_task.add_done_callback(Modules._background_tasks.discard)
task = asyncio.create_task(_handle_voice_transcript_request(event))
Modules._background_tasks.add(task)
task.add_done_callback(Modules._background_tasks.discard)
return
if event_type == "analyze_request":
messages = event.get("messages", [])
lanlan_name = event.get("lanlan_name")
Expand Down
19 changes: 18 additions & 1 deletion app/main_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,13 @@ def _resolve_user_plugin_base() -> str:
from fastapi.responses import JSONResponse, Response # noqa
from fastapi.staticfiles import StaticFiles # noqa
from main_logic import core as core, cross_server as cross_server # noqa
from main_logic.agent_event_bus import MainServerAgentBridge, notify_analyze_ack, set_main_bridge # noqa
from main_logic.agent_event_bus import (
MainServerAgentBridge,
notify_analyze_ack,
notify_voice_bridge_request_seen,
notify_voice_bridge_result,
set_main_bridge,
) # noqa
from fastapi.templating import Jinja2Templates # noqa
from dataclasses import dataclass # noqa
from typing import Any, Optional # noqa
Expand Down Expand Up @@ -608,6 +614,17 @@ async def _handle_agent_event(event: dict):
notify_analyze_ack(str(event.get("event_id") or ""))
return

if event_type == "voice_bridge_result":
notify_voice_bridge_result(
str(event.get("event_id") or ""),
event.get("result") if isinstance(event.get("result"), dict) else {},
)
return

if event_type == "voice_bridge_request_seen":
notify_voice_bridge_request_seen(str(event.get("event_id") or ""))
return

# Agent status updates may be broadcast (lanlan_name omitted).
if event_type == "agent_status_update":
payload = {
Expand Down
201 changes: 201 additions & 0 deletions main_logic/agent_event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ def _zmq_addr(env_key: str, default_port: int) -> str:
_main_bridge_ref: Optional["MainServerAgentBridge"] = None
_ack_waiters: dict[str, asyncio.Future] = {}
_ack_waiters_lock = threading.Lock()
_voice_bridge_waiters: dict[str, asyncio.Future] = {}
_voice_bridge_request_seen_waiters: dict[str, asyncio.Future] = {}
# Voice bridge waiter state is protected by _voice_bridge_waiters_lock.
# queued means a result has been enqueued on the main loop but resolution has
# not started; resolving means notify_voice_bridge_result is actively resolving
# it. Normal flow is queued -> resolving -> removed.
_voice_bridge_waiters_queued: set[str] = set()
_voice_bridge_waiters_resolving: set[str] = set()
_voice_bridge_waiters_lock = threading.Lock()
VOICE_BRIDGE_AGENT_SEEN_TIMEOUT_SECONDS = 0.08


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -103,6 +113,8 @@ def _recv_thread_fn(self) -> None:
try:
msg = orjson.loads(self.pull.recv())
if isinstance(msg, dict) and self.owner_loop is not None:
if msg.get("event_type") == "voice_bridge_result":
mark_voice_bridge_result_queued(str(msg.get("event_id") or ""))
asyncio.run_coroutine_threadsafe(
self.on_agent_event(msg), self.owner_loop,
)
Expand Down Expand Up @@ -316,6 +328,92 @@ def _resolve() -> None:
loop.call_soon_threadsafe(_resolve)


def _resolve_voice_bridge_seen_waiter(event_id: str, waiter: asyncio.Future | None) -> None:
if waiter is None or waiter.done():
return
loop = waiter.get_loop()

def _resolve() -> None:
if not waiter.done():
waiter.set_result(True)

try:
loop.call_soon_threadsafe(_resolve)
except RuntimeError:
with _voice_bridge_waiters_lock:
_voice_bridge_request_seen_waiters.pop(event_id, None)
if not waiter.done():
waiter.cancel()


def notify_voice_bridge_request_seen(event_id: str) -> None:
"""Mark that agent_server received a voice-bridge request."""
if not event_id:
return
with _voice_bridge_waiters_lock:
waiter = _voice_bridge_request_seen_waiters.get(event_id)
_resolve_voice_bridge_seen_waiter(event_id, waiter)


def mark_voice_bridge_result_queued(event_id: str) -> None:
"""Mark that a voice result is queued on the main loop but not resolved yet."""
if not event_id:
return
with _voice_bridge_waiters_lock:
if event_id in _voice_bridge_waiters:
_voice_bridge_waiters_queued.add(event_id)
waiter = _voice_bridge_request_seen_waiters.get(event_id)
else:
waiter = None
_resolve_voice_bridge_seen_waiter(event_id, waiter)


def notify_voice_bridge_result(event_id: str, result: Dict[str, Any]) -> None:
"""Resolve a pending voice-bridge request sent from main_server to agent_server."""
if not event_id:
return
payload = result if isinstance(result, dict) else {}
with _voice_bridge_waiters_lock:
waiter = _voice_bridge_waiters.get(event_id)
if waiter is not None and not waiter.done():
_voice_bridge_waiters_resolving.add(event_id)
_voice_bridge_waiters_queued.discard(event_id)
seen_waiter = _voice_bridge_request_seen_waiters.get(event_id)
else:
_voice_bridge_waiters_resolving.discard(event_id)
_voice_bridge_waiters_queued.discard(event_id)
seen_waiter = None
_resolve_voice_bridge_seen_waiter(event_id, seen_waiter)
if waiter is None or waiter.done():
return
loop = waiter.get_loop()

def _resolve() -> None:
with _voice_bridge_waiters_lock:
if _voice_bridge_waiters.get(event_id) is not waiter:
_voice_bridge_waiters_resolving.discard(event_id)
_voice_bridge_waiters_queued.discard(event_id)
return
_voice_bridge_waiters.pop(event_id, None)
_voice_bridge_request_seen_waiters.pop(event_id, None)
_voice_bridge_waiters_resolving.discard(event_id)
_voice_bridge_waiters_queued.discard(event_id)
if not waiter.done():
waiter.set_result(payload)

try:
loop.call_soon_threadsafe(_resolve)
except RuntimeError:
with _voice_bridge_waiters_lock:
if _voice_bridge_waiters.get(event_id) is waiter:
if not waiter.done():
waiter.cancel()
_voice_bridge_waiters.pop(event_id, None)
_voice_bridge_request_seen_waiters.pop(event_id, None)
_voice_bridge_waiters_resolving.discard(event_id)
_voice_bridge_waiters_queued.discard(event_id)


# ---------------------------------------------------------------------------
# Layering-inversion sinks: lower layers (main_logic) emit, higher layers
# (plugin / main_routers) register.
Expand Down Expand Up @@ -492,3 +590,106 @@ async def publish_analyze_request_reliably(
)

return False


async def publish_voice_transcript_request_reliably(
lanlan_name: str,
transcript: str,
*,
timeout_s: float = 1.2,
agent_seen_timeout_s: float = VOICE_BRIDGE_AGENT_SEEN_TIMEOUT_SECONDS,
retries: int = 0,
metadata: Optional[Dict[str, Any]] = None,
) -> Optional[Dict[str, Any]]:
"""Ask agent_server/plugins how a realtime voice transcript should be handled.

Returns a plugin-produced action payload, or ``None`` when the bridge is not
running or no response arrives within the bounded timeout.
"""
text = str(transcript or "").strip()
if not text:
return None
event_id = uuid.uuid4().hex
loop = asyncio.get_running_loop()
waiter: asyncio.Future = loop.create_future()
seen_waiter: asyncio.Future = loop.create_future()
with _voice_bridge_waiters_lock:
_voice_bridge_waiters[event_id] = waiter
_voice_bridge_request_seen_waiters[event_id] = seen_waiter

try:
for attempt in range(max(retries, 0) + 1):
event = {
"event_type": "voice_transcript_request",
"event_id": event_id,
"lanlan_name": lanlan_name,
"transcript": text,
"metadata": dict(metadata or {}),
"attempt": attempt,
}
sent = await publish_session_event(event)
if not sent:
logger.debug(
"[EventBus] voice_transcript_request not sent: no main bridge lanlan=%s attempt=%s",
lanlan_name,
attempt,
)
continue
await asyncio.wait(
{asyncio.shield(waiter), asyncio.shield(seen_waiter)},
timeout=max(0.0, float(agent_seen_timeout_s)),
return_when=asyncio.FIRST_COMPLETED,
)
if waiter.done():
result = waiter.result()
return result if isinstance(result, dict) else {}
if not seen_waiter.done():
logger.debug(
"[EventBus] voice_transcript_request not acknowledged by agent: event_id=%s lanlan=%s attempt=%s",
event_id,
lanlan_name,
attempt,
)
continue
try:
result = await asyncio.wait_for(asyncio.shield(waiter), timeout=timeout_s)
return result if isinstance(result, dict) else {}
except asyncio.TimeoutError:
with _voice_bridge_waiters_lock:
result_is_queued = (
_voice_bridge_waiters.get(event_id) is waiter
and (
event_id in _voice_bridge_waiters_resolving
or event_id in _voice_bridge_waiters_queued
)
)
if result_is_queued:
try:
result = await asyncio.wait_for(
asyncio.shield(waiter),
timeout=0.05,
)
return result if isinstance(result, dict) else {}
except asyncio.TimeoutError:
pass
logger.debug(
"[EventBus] voice_transcript_request timed out: event_id=%s lanlan=%s attempt=%s",
event_id,
lanlan_name,
attempt,
)
continue
return None
finally:
should_cancel = False
with _voice_bridge_waiters_lock:
if _voice_bridge_waiters.get(event_id) is waiter:
_voice_bridge_waiters.pop(event_id, None)
should_cancel = True
_voice_bridge_request_seen_waiters.pop(event_id, None)
_voice_bridge_waiters_queued.discard(event_id)
_voice_bridge_waiters_resolving.discard(event_id)
if should_cancel and not waiter.done():
waiter.cancel()
if not seen_waiter.done():
seen_waiter.cancel()
Loading
Loading