Skip to content

Commit 1b87a60

Browse files
committed
feat: add real-time observer system for voicemail/hallucination detection
- Add ObserverConfig type to template configurations (reuses LLMConfiguration, FlowAction) - Add observers package: RealtimeObserver, ObserverManager, factory - Wire observer lifecycle in agent/__init__.py (on_user_turn_started, on_function_calls_started) - Observers read from LLMContext, run in parallel via asyncio.gather, first-writer-wins - Uses existing get_llm_service(pooled=True) and Pipecat run_inference() — no custom HTTP clients - Template-configurable: add any detection by writing a system_prompt, zero code changes - Tested with real voicemail calls — observer detects and sets outcome=VOICEMAIL
1 parent e003517 commit 1b87a60

6 files changed

Lines changed: 465 additions & 0 deletions

File tree

app/ai/voice/agents/breeze_buddy/agent/__init__.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,9 @@ def __init__(
174174
# Stream mode transcript collector (replaces LLMContext for transcription)
175175
self._transcript_collector: Optional[TranscriptCollectorProcessor] = None
176176

177+
# Real-time observers (side-LLMs for voicemail/hallucination detection)
178+
self._observer_manager: Any = None
179+
177180
# Error tracking
178181
self.errors: List[Dict[str, Any]] = []
179182

@@ -804,6 +807,23 @@ async def on_user_turn_started(aggregator, strategy):
804807
logger.debug("Post-greeting timer cancelled - user spoke")
805808
self._user_idle_callback_handler.reset_retry_count()
806809

810+
# Notify real-time observers that a turn completed
811+
if self._observer_manager:
812+
self._observer_manager.on_turn_completed()
813+
814+
# Notify real-time observers of LLM function calls
815+
if self._observer_manager and self.flow_manager:
816+
llm_service = getattr(self.flow_manager, "_llm", None)
817+
if llm_service:
818+
819+
@llm_service.event_handler("on_function_calls_started")
820+
async def _on_fn_calls_for_observer(service, function_calls):
821+
for call in function_calls:
822+
self._observer_manager.on_function_call(
823+
call.function_name,
824+
getattr(call, "arguments", {}),
825+
)
826+
807827
async def _handle_client_connected(self) -> None:
808828
"""Handle client connection and initialize flow."""
809829
if self.is_stream_mode:
@@ -1098,6 +1118,41 @@ async def run(self, runner_args: Optional[RunnerArguments] = None) -> None:
10981118
mcp_global_functions=mcp_global_functions,
10991119
)
11001120

1121+
# ── Real-time observers ──────────────────────────────────
1122+
observers_config = (
1123+
self.configurations.observers if self.configurations else None
1124+
)
1125+
logger.info(
1126+
f"Observer setup: configurations={'yes' if self.configurations else 'no'}, "
1127+
f"observers_config={observers_config is not None}, "
1128+
f"is_stream={is_stream}, "
1129+
f"observers_count={len(observers_config) if observers_config else 0}"
1130+
)
1131+
if observers_config and not is_stream:
1132+
try:
1133+
from app.ai.voice.agents.breeze_buddy.observers import (
1134+
ObserverManager,
1135+
build_observers,
1136+
)
1137+
1138+
observer_instances = await build_observers(
1139+
configs=observers_config,
1140+
template=self.template,
1141+
agent_context=self,
1142+
handler_map=self.flow_builder.handler_map,
1143+
)
1144+
if observer_instances:
1145+
self._observer_manager = ObserverManager(
1146+
observer_instances, context
1147+
)
1148+
logger.info(
1149+
f"Initialized {len(observer_instances)} "
1150+
f"real-time observer(s)"
1151+
)
1152+
except Exception as e:
1153+
logger.error(f"Failed to initialize observers: {e}")
1154+
self._observer_manager = None
1155+
11011156
self._register_event_handlers()
11021157

11031158
runner = PipelineRunner(handle_sigint=False, force_gc=True)
@@ -1113,6 +1168,9 @@ async def run(self, runner_args: Optional[RunnerArguments] = None) -> None:
11131168
except asyncio.CancelledError:
11141169
logger.info(f"{log_prefix}Pipeline task cancelled. Exiting gracefully.")
11151170
finally:
1171+
if self._observer_manager:
1172+
await self._observer_manager.stop()
1173+
self._observer_manager = None
11161174
clear_log_context()
11171175

11181176
async def _handle_unexpected_disconnect(self, reason: str) -> None:
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from .factory import build_observers
2+
from .manager import ObserverManager
3+
from .observer import RealtimeObserver
4+
5+
__all__ = ["build_observers", "ObserverManager", "RealtimeObserver"]
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
"""Observer factory — builds RealtimeObserver instances from template config.
2+
3+
Uses existing ``get_llm_service()`` for LLM service creation and existing
4+
``LLMConfiguration`` for config merging (inherit with override).
5+
"""
6+
7+
from typing import Any, Dict, List, Optional
8+
9+
from app.ai.voice.agents.breeze_buddy.llm import get_llm_service
10+
from app.ai.voice.agents.breeze_buddy.template.types import ObserverConfig
11+
from app.ai.voice.llm.types import LLMConfiguration
12+
from app.core.logger import logger
13+
14+
from .observer import RealtimeObserver
15+
16+
17+
def merge_llm_config(
18+
override: Optional[LLMConfiguration],
19+
base: LLMConfiguration,
20+
) -> LLMConfiguration:
21+
"""Merge observer's optional LLM overrides on top of template's config.
22+
23+
Model defaults to ``gpt-4o-mini``. Temperature defaults to 0.1.
24+
"""
25+
if override is None:
26+
return LLMConfiguration(
27+
provider=base.provider,
28+
sdk=base.sdk,
29+
model="gpt-4o-mini",
30+
region=getattr(base, "region", None),
31+
endpoint=base.endpoint,
32+
api_key_name=base.api_key_name,
33+
temperature=0.1,
34+
max_tokens=100,
35+
)
36+
37+
return LLMConfiguration(
38+
provider=override.provider or base.provider,
39+
sdk=override.sdk or base.sdk,
40+
model=override.model or "gpt-4o-mini",
41+
region=override.region or getattr(base, "region", None),
42+
endpoint=override.endpoint or base.endpoint,
43+
api_key_name=override.api_key_name or base.api_key_name,
44+
temperature=(override.temperature if override.temperature is not None else 0.1),
45+
max_tokens=(override.max_tokens if override.max_tokens is not None else 100),
46+
)
47+
48+
49+
async def build_observers(
50+
configs: List[ObserverConfig],
51+
template: Any,
52+
agent_context: Any,
53+
handler_map: Dict[str, Any],
54+
) -> List[RealtimeObserver]:
55+
"""Build observer instances from template config."""
56+
template_llm = template.configurations.llm_configurations
57+
if template_llm is None:
58+
# Template uses global env defaults — create a minimal config
59+
# that will resolve to Azure gpt-4o-mini via get_llm_service()
60+
logger.info(
61+
"Template has no llm_configurations — "
62+
"observers will use env defaults with gpt-4o-mini"
63+
)
64+
template_llm = LLMConfiguration()
65+
66+
observers: List[RealtimeObserver] = []
67+
68+
for cfg in configs:
69+
try:
70+
merged_config = merge_llm_config(cfg.llm, template_llm)
71+
llm_service = await get_llm_service(merged_config, pooled=True)
72+
observers.append(
73+
RealtimeObserver(cfg, llm_service, agent_context, handler_map)
74+
)
75+
logger.info(
76+
f"Built observer '{cfg.name}' with model="
77+
f"{merged_config.model}, start_after_turn={cfg.start_after_turn}"
78+
)
79+
except Exception as e:
80+
logger.error(f"Failed to build observer '{cfg.name}': {e}")
81+
82+
return observers
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
"""ObserverManager — coordinates N real-time observers.
2+
3+
Reads the conversation transcript from the pipeline's existing LLMContext,
4+
builds a formatted transcript string, and feeds it to all eligible observers
5+
in parallel after every LLM turn.
6+
7+
Not a pipeline processor. Completely separate async system.
8+
"""
9+
10+
import asyncio
11+
import json
12+
from typing import Any, List
13+
14+
from pipecat.processors.aggregators.llm_context import LLMContext
15+
16+
from app.core.logger import logger
17+
18+
from .observer import RealtimeObserver
19+
20+
21+
class ObserverManager:
22+
"""Coordinates N observers. Reads existing LLMContext.
23+
24+
Triggered on every LLM turn (via ``on_user_turn_started``) and on every
25+
function call (via ``on_function_calls_started``). All eligible observers
26+
run in parallel via ``asyncio.gather``. First to detect wins.
27+
"""
28+
29+
def __init__(
30+
self,
31+
observers: List[RealtimeObserver],
32+
llm_context: LLMContext,
33+
):
34+
self._observers = observers
35+
self._llm_context = llm_context
36+
self._function_calls: List[str] = []
37+
self._turn_count: int = 0
38+
self._action_taken: bool = False
39+
self._check_lock = asyncio.Lock()
40+
41+
# ------------------------------------------------------------------
42+
# Data ingestion (called by pipeline event hooks in agent/__init__.py)
43+
# ------------------------------------------------------------------
44+
45+
def on_turn_completed(self):
46+
"""A turn completed — kick off observer checks in background."""
47+
if self._action_taken:
48+
return
49+
self._turn_count += 1
50+
asyncio.create_task(self._run_checks(), name="observer:check_round")
51+
52+
def on_function_call(self, function_name: str, arguments: Any):
53+
"""Bot called a function — record it and trigger checks."""
54+
args_str = json.dumps(arguments) if arguments else ""
55+
self._function_calls.append(f"{function_name}({args_str})")
56+
if not self._action_taken:
57+
asyncio.create_task(self._run_checks(), name="observer:check_round_fn")
58+
59+
# ------------------------------------------------------------------
60+
# Check execution
61+
# ------------------------------------------------------------------
62+
63+
async def _run_checks(self):
64+
"""Run all eligible observers in parallel. First to detect wins."""
65+
if self._action_taken:
66+
return
67+
68+
async with self._check_lock:
69+
if self._action_taken:
70+
return
71+
72+
transcript = self._build_transcript()
73+
74+
eligible = [
75+
obs
76+
for obs in self._observers
77+
if not obs._detected and self._turn_count >= obs.config.start_after_turn
78+
]
79+
if not eligible:
80+
return
81+
82+
# gather() over as_completed(): as_completed wraps futures in
83+
# new coroutines so the original future→observer mapping breaks.
84+
# gather() returns results in input order which is deterministic
85+
# and keeps the observer→result pairing trivial via zip().
86+
results = await asyncio.gather(
87+
*[obs.check(transcript) for obs in eligible],
88+
return_exceptions=True,
89+
)
90+
91+
for obs, result in zip(eligible, results):
92+
if self._action_taken:
93+
return
94+
if isinstance(result, Exception):
95+
logger.warning(f"Observer '{obs.name}' check failed: {result}")
96+
continue
97+
if result is True:
98+
self._action_taken = True
99+
await obs.execute_action()
100+
return
101+
102+
# ------------------------------------------------------------------
103+
# Transcript building
104+
# ------------------------------------------------------------------
105+
106+
def _build_transcript(self) -> str:
107+
"""Build transcript from LLMContext messages + recorded function calls."""
108+
lines: List[str] = []
109+
110+
for msg in self._llm_context.messages:
111+
if not isinstance(msg, dict):
112+
continue
113+
role = msg.get("role", "")
114+
content = msg.get("content", "")
115+
if not content:
116+
continue
117+
if role == "user":
118+
lines.append(f"[customer] {content}")
119+
elif role == "assistant":
120+
lines.append(f"[bot] {content}")
121+
122+
for fc in self._function_calls:
123+
lines.append(f"[bot_action] {fc}")
124+
125+
return "\n".join(lines)
126+
127+
# ------------------------------------------------------------------
128+
# Lifecycle
129+
# ------------------------------------------------------------------
130+
131+
async def stop(self):
132+
"""Cleanup. Called when the call ends."""
133+
self._action_taken = True

0 commit comments

Comments
 (0)