Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 139 additions & 0 deletions app/agent_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import mimetypes
import json
from collections.abc import Mapping
mimetypes.add_type("application/javascript", ".js")
import asyncio
import uuid
Expand Down Expand Up @@ -1648,6 +1649,139 @@ async def _emit_agent_status_update(lanlan_name: Optional[str] = None) -> None:
pass


VOICE_TRANSCRIPT_CUSTOM_EVENT_TYPE = "voice_transcript"
VOICE_TRANSCRIPT_CUSTOM_EVENT_TIMEOUT_SECONDS = 1.0


def _voice_bridge_noop(reason: str, **extra: object) -> Dict[str, Any]:
return {
"action": "noop",
"reason": str(reason or "noop"),
**extra,
}


def _voice_transcript_request_has_text(event: Mapping[str, object] | None) -> bool:
if not isinstance(event, Mapping):
return False
return bool(str(event.get("transcript") or "").strip())


def _voice_transcript_custom_event_args(event: Mapping[str, object]) -> Dict[str, object]:
metadata = event.get("metadata")
return {
"transcript": str(event.get("transcript") or "").strip(),
"lanlan_name": str(event.get("lanlan_name") or ""),
"metadata": dict(metadata) if isinstance(metadata, Mapping) else {},
}


def _voice_bridge_action_from_dispatch_results(dispatch_results: object) -> Dict[str, Any]:
if not isinstance(dispatch_results, list) or not dispatch_results:
return _voice_bridge_noop("no_subscribers")

from plugin.plugins.study_companion.voice_contracts import (
arbitrate_voice_transcript_results,
)

arbitration_items: list[dict[str, object]] = []
failure_count = 0
for item in dispatch_results:
if not isinstance(item, Mapping):
continue
if not bool(item.get("success")):
failure_count += 1
continue
result = item.get("result")
if not isinstance(result, Mapping):
continue
action = str(result.get("action") or "").strip()
if not action:
continue
payload: Dict[str, Any] = dict(result)
payload["action"] = action
plugin_id = str(item.get("plugin_id") or "").strip()
if plugin_id:
payload.setdefault("source_plugin", plugin_id)
source_event_id = str(item.get("event_id") or "").strip()
if source_event_id:
payload.setdefault("source_event_id", source_event_id)
arbitration_items.append(
{
"plugin_id": payload.get("source_plugin") or plugin_id,
"event_id": payload.get("source_event_id") or source_event_id,
"success": True,
"result": payload,
}
)

if not arbitration_items:
return _voice_bridge_noop("no_handler_result", failures=failure_count)
payload = arbitrate_voice_transcript_results(arbitration_items)
if failure_count:
try:
existing_failures = int(payload.get("failures") or 0)
except (TypeError, ValueError):
existing_failures = 0
payload["failures"] = existing_failures + failure_count
return payload


async def _dispatch_voice_transcript_custom_event(
event: Mapping[str, object],
) -> Dict[str, Any]:
from plugin.server.application.plugins.dispatch_service import PluginDispatchService

dispatch_results = await PluginDispatchService().trigger_custom_event_subscribers(
event_type=VOICE_TRANSCRIPT_CUSTOM_EVENT_TYPE,
args=_voice_transcript_custom_event_args(event),
timeout=VOICE_TRANSCRIPT_CUSTOM_EVENT_TIMEOUT_SECONDS,
)
return _voice_bridge_action_from_dispatch_results(dispatch_results)


async def _handle_voice_transcript_request(event: Dict[str, Any]) -> None:
event_id = str((event or {}).get("event_id") or "")
lanlan_name = (event or {}).get("lanlan_name")
result: Dict[str, Any] = _voice_bridge_noop("unavailable")

try:
if not _voice_transcript_request_has_text(event):
result = _voice_bridge_noop("empty_transcript")
elif not Modules.analyzer_enabled:
result = _voice_bridge_noop("agent_disabled")
elif not Modules.agent_flags.get("user_plugin_enabled", False):
result = _voice_bridge_noop("user_plugin_disabled")
else:
lifecycle_ready = bool(Modules.plugin_lifecycle_started)
if not lifecycle_ready:
lifecycle_ready = await _ensure_plugin_lifecycle_started()

if not lifecycle_ready:
result = _voice_bridge_noop("plugin_lifecycle_start_failed")
else:
result = await _dispatch_voice_transcript_custom_event(event)
except Exception as exc:
logger.debug(
"[VoiceBridge] plugin dispatch failed: event_id=%s lanlan=%s err=%s",
event_id,
lanlan_name,
exc,
)
result = {
"action": "noop",
"reason": "dispatch_failed",
"error_type": type(exc).__name__,
}

await _emit_main_event(
"voice_bridge_result",
lanlan_name if isinstance(lanlan_name, str) else None,
event_id=event_id,
result=result,
)


async def _on_session_event(event: Dict[str, Any]) -> None:
event_type = (event or {}).get("event_type")
if event_type == "agent_intent_restore_signal":
Expand All @@ -1660,6 +1794,11 @@ async def _on_session_event(event: Dict[str, Any]) -> None:
# has its own once-flag, so this is safe to spam.
await _maybe_restore_agent_intent()
return
if event_type == "voice_transcript_request":
task = asyncio.create_task(_handle_voice_transcript_request(event))
Modules._background_tasks.add(task)
task.add_done_callback(Modules._background_tasks.discard)
return
if event_type == "analyze_request":
messages = event.get("messages", [])
lanlan_name = event.get("lanlan_name")
Expand Down
9 changes: 8 additions & 1 deletion app/main_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def _resolve_user_plugin_base() -> str:
from fastapi.responses import JSONResponse, Response # noqa
from fastapi.staticfiles import StaticFiles # noqa
from main_logic import core as core, cross_server as cross_server # noqa
from main_logic.agent_event_bus import MainServerAgentBridge, notify_analyze_ack, set_main_bridge # noqa
from main_logic.agent_event_bus import MainServerAgentBridge, notify_analyze_ack, notify_voice_bridge_result, set_main_bridge # noqa
from fastapi.templating import Jinja2Templates # noqa
from dataclasses import dataclass # noqa
from typing import Any, Optional # noqa
Expand Down Expand Up @@ -608,6 +608,13 @@ async def _handle_agent_event(event: dict):
notify_analyze_ack(str(event.get("event_id") or ""))
return

if event_type == "voice_bridge_result":
notify_voice_bridge_result(
str(event.get("event_id") or ""),
event.get("result") if isinstance(event.get("result"), dict) else {},
)
return

# Agent status updates may be broadcast (lanlan_name omitted).
if event_type == "agent_status_update":
payload = {
Expand Down
11 changes: 8 additions & 3 deletions frontend/plugin-manager/scripts/check-hosted-tsx.mjs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { existsSync, mkdtempSync, readdirSync, readFileSync, rmSync, statSync, writeFileSync } from 'node:fs'
import { existsSync, mkdirSync, mkdtempSync, readdirSync, readFileSync, rmSync, statSync, writeFileSync } from 'node:fs'
import { tmpdir } from 'node:os'
import { dirname, isAbsolute, join, resolve } from 'node:path'
import { dirname, isAbsolute, join, relative, resolve } from 'node:path'
import { fileURLToPath } from 'node:url'
import process from 'node:process'
import ts from 'typescript'
Expand Down Expand Up @@ -189,8 +189,12 @@ function createCheckFile(entryPath, tempDir, index, surface, tomlPath) {
const stripped = source
.replace(/^\s*import[\s\S]*?from\s+['"](?:@neko\/plugin-ui|neko:ui)['"]\s*;?\s*/gm, '')
.replace(/^\s*import\s+['"](?:@neko\/plugin-ui|neko:ui)['"]\s*;?\s*/gm, '')
const checkPath = join(tempDir, `surface-${index}.tsx`)
const relativeEntryPath = relative(repoRoot, entryPath)
const checkPath = relativeEntryPath.startsWith('..') || isAbsolute(relativeEntryPath)
? join(tempDir, `surface-${index}.tsx`)
: join(tempDir, relativeEntryPath)
const prefixLines = 6
mkdirSync(dirname(checkPath), { recursive: true })
writeFileSync(
checkPath,
`/// <reference path="${hostedUiGlobalsPath}" />\nimport * as NekoUi from "@neko/plugin-ui";\nimport type { PluginSurfaceProps, HostedAction, JsonSchema, HostedApi } from "@neko/plugin-ui";\nconst { ${[
Expand Down Expand Up @@ -276,6 +280,7 @@ function main() {
target: ts.ScriptTarget.ES2020,
moduleResolution: ts.ModuleResolutionKind.Bundler,
baseUrl: repoRoot,
rootDirs: [tempDir, repoRoot],
paths: {
'@neko/plugin-ui': ['plugin/sdk/hosted-ui'],
},
Expand Down
119 changes: 119 additions & 0 deletions main_logic/agent_event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ def _zmq_addr(env_key: str, default_port: int) -> str:
_main_bridge_ref: Optional["MainServerAgentBridge"] = None
_ack_waiters: dict[str, asyncio.Future] = {}
_ack_waiters_lock = threading.Lock()
_voice_bridge_waiters: dict[str, asyncio.Future] = {}
_voice_bridge_waiters_resolving: set[str] = set()
_voice_bridge_waiters_lock = threading.Lock()


# ---------------------------------------------------------------------------
Expand Down Expand Up @@ -316,6 +319,42 @@ def _resolve() -> None:
loop.call_soon_threadsafe(_resolve)


def notify_voice_bridge_result(event_id: str, result: Dict[str, Any]) -> None:
"""Resolve a pending voice-bridge request sent from main_server to agent_server."""
if not event_id:
return
payload = result if isinstance(result, dict) else {}
with _voice_bridge_waiters_lock:
waiter = _voice_bridge_waiters.get(event_id)
if waiter is not None and not waiter.done():
_voice_bridge_waiters_resolving.add(event_id)
else:
_voice_bridge_waiters_resolving.discard(event_id)
if waiter is None or waiter.done():
return
loop = waiter.get_loop()

def _resolve() -> None:
with _voice_bridge_waiters_lock:
if _voice_bridge_waiters.get(event_id) is not waiter:
_voice_bridge_waiters_resolving.discard(event_id)
return
_voice_bridge_waiters.pop(event_id, None)
_voice_bridge_waiters_resolving.discard(event_id)
if not waiter.done():
waiter.set_result(payload)

try:
loop.call_soon_threadsafe(_resolve)
except RuntimeError:
with _voice_bridge_waiters_lock:
if _voice_bridge_waiters.get(event_id) is waiter:
if not waiter.done():
waiter.cancel()
_voice_bridge_waiters.pop(event_id, None)
_voice_bridge_waiters_resolving.discard(event_id)


# ---------------------------------------------------------------------------
# Layering-inversion sinks: lower layers (main_logic) emit, higher layers
# (plugin / main_routers) register.
Expand Down Expand Up @@ -492,3 +531,83 @@ async def publish_analyze_request_reliably(
)

return False


async def publish_voice_transcript_request_reliably(
lanlan_name: str,
transcript: str,
*,
timeout_s: float = 1.2,
retries: int = 0,
metadata: Optional[Dict[str, Any]] = None,
) -> Optional[Dict[str, Any]]:
"""Ask agent_server/plugins how a realtime voice transcript should be handled.

Returns a plugin-produced action payload, or ``None`` when the bridge is not
running or no response arrives within the bounded timeout.
"""
text = str(transcript or "").strip()
if not text:
return None
event_id = uuid.uuid4().hex
loop = asyncio.get_running_loop()
waiter: asyncio.Future = loop.create_future()
with _voice_bridge_waiters_lock:
_voice_bridge_waiters[event_id] = waiter

try:
for attempt in range(max(retries, 0) + 1):
event = {
"event_type": "voice_transcript_request",
"event_id": event_id,
"lanlan_name": lanlan_name,
"transcript": text,
"metadata": dict(metadata or {}),
"attempt": attempt,
}
sent = await publish_session_event(event)
if not sent:
logger.debug(
"[EventBus] voice_transcript_request not sent: no main bridge lanlan=%s attempt=%s",
lanlan_name,
attempt,
)
continue
try:
result = await asyncio.wait_for(asyncio.shield(waiter), timeout=timeout_s)
return result if isinstance(result, dict) else {}
except asyncio.TimeoutError:
with _voice_bridge_waiters_lock:
result_is_queued = (
_voice_bridge_waiters.get(event_id) is waiter
and event_id in _voice_bridge_waiters_resolving
)
if result_is_queued:
try:
result = await asyncio.wait_for(
asyncio.shield(waiter),
timeout=0.05,
)
return result if isinstance(result, dict) else {}
except asyncio.TimeoutError:
pass
logger.debug(
"[EventBus] voice_transcript_request timed out: event_id=%s lanlan=%s attempt=%s",
event_id,
lanlan_name,
attempt,
)
continue
return None
finally:
should_cancel = False
with _voice_bridge_waiters_lock:
if (
_voice_bridge_waiters.get(event_id) is waiter
and event_id not in _voice_bridge_waiters_resolving
):
_voice_bridge_waiters.pop(event_id, None)
should_cancel = True
_voice_bridge_waiters_resolving.discard(event_id)
if should_cancel and not waiter.done():
waiter.cancel()
Loading
Loading