Skip to content

Commit e477e28

Browse files
committed
feat(memory): voice write-path integration + dynamic backend selection
Wire the persistent-memory library into the running app: end_conversation.py (voice write-path): - Hoist memory imports to module top (resolve_customer_key, MemoryService, BUDDY_MEMORY_BACKEND as resolve_memory_backend, BUDDY_MEMORY_ENABLED) - Enqueue extraction after DB write: best-effort try/except, never blocks - Backend selection: template MemoryConfig.backend override wins, else Redis/DevCycle dynamic default via resolve_memory_backend() dynamic.py: - Add BUDDY_MEMORY_BACKEND() async function — re-reads BUDDY_MEMORY_BACKEND from Redis/DevCycle on every call, falls back to static env default. Ops can switch pgvector <-> supermemory at runtime without a redeploy. main.py: - Import drain_memory_queue from memory.worker - Register memory_extraction_drain BackgroundTaskScheduler task, gated on BUDDY_MEMORY_ENABLED (static kill-switch, off by default)
1 parent 5619f1a commit e477e28

11 files changed

Lines changed: 217 additions & 13 deletions

File tree

app/ai/voice/agents/breeze_buddy/agent/__init__.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import asyncio
44
import time
55
from datetime import datetime, timezone
6-
from typing import Any, Callable, Dict, List, Optional
6+
from typing import Any, Callable, Dict, List, Optional, cast
77

88
from fastapi import WebSocket
99
from opentelemetry import trace
@@ -61,6 +61,9 @@
6161
prepare_and_store_initial_greeting,
6262
)
6363
from app.ai.voice.agents.breeze_buddy.mcp import get_mcp_global_functions
64+
from app.ai.voice.agents.breeze_buddy.memory.backends import get_memory_backend
65+
from app.ai.voice.agents.breeze_buddy.memory.identity import resolve_customer_key
66+
from app.ai.voice.agents.breeze_buddy.memory.service import MemoryService
6467
from app.ai.voice.agents.breeze_buddy.observability.tracing_setup import (
6568
create_root_span,
6669
)
@@ -90,6 +93,10 @@
9093
close_websocket_safely,
9194
)
9295
from app.ai.voice.agents.breeze_buddy.utils.warm_transfer import set_transfer_flag
96+
from app.core.config.dynamic import (
97+
BUDDY_MEMORY_BACKEND as resolve_memory_backend,
98+
BUDDY_MEMORY_ENABLED as is_memory_enabled,
99+
)
93100
from app.core.config.static import ENABLE_BREEZE_BUDDY_TRACING
94101
from app.core.logger import logger
95102
from app.core.logger.context import (
@@ -870,6 +877,51 @@ async def _handle_client_connected(self) -> None:
870877
context = TemplateContext(self)
871878
context.record_node_entry(initial_node_name)
872879

880+
# ── Memory read: inject user profile into LLM context ────────────────
881+
# Runs after node config is finalised, before flow_manager.initialize()
882+
# so the <user_memory> block lands in the initial LLM context RESET.
883+
# Gates: global kill-switch AND per-template opt-in (MemoryConfig.enabled).
884+
# Best-effort — any failure logs a warning and the call proceeds normally.
885+
_mem_cfg = getattr(self.configurations, "memory", None)
886+
if await is_memory_enabled() and _mem_cfg and _mem_cfg.enabled and self.lead:
887+
try:
888+
_payload = self.lead.payload or {}
889+
_resolved = await resolve_customer_key(
890+
reseller_id=self.lead.reseller_id or "",
891+
merchant_id=self.lead.merchant_id or "",
892+
payload=_payload,
893+
)
894+
if _resolved:
895+
_customer_key, _key_type = _resolved
896+
_backend_name = _mem_cfg.backend or await resolve_memory_backend()
897+
_memory_block = await MemoryService(
898+
backend=get_memory_backend(_backend_name)
899+
).get_profile_block(
900+
reseller_id=self.lead.reseller_id or "",
901+
merchant_id=self.lead.merchant_id or "",
902+
customer_key=_customer_key,
903+
key_type=_key_type,
904+
max_facts=_mem_cfg.max_facts,
905+
)
906+
if _memory_block:
907+
_role_msgs = list(
908+
cast(Dict[str, Any], initial_node_config).get(
909+
"role_messages", []
910+
)
911+
)
912+
_role_msgs.append({"role": "system", "content": _memory_block})
913+
cast(Dict[str, Any], initial_node_config)[
914+
"role_messages"
915+
] = _role_msgs
916+
logger.info(
917+
f"[memory] injected profile for lead {self.lead.id}: "
918+
f"key={_customer_key!r} chars={len(_memory_block)}"
919+
)
920+
except Exception as _mem_err:
921+
logger.warning(
922+
f"[memory] read-path failed for lead {self.lead.id}: {_mem_err}"
923+
)
924+
873925
await self.flow_manager.initialize(initial_node_config)
874926
logger.info(
875927
f"FlowManager initialized at node: {initial_node_name}"

app/ai/voice/agents/breeze_buddy/handlers/internal/end_conversation.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
from app.ai.voice.agents.breeze_buddy.callbacks import (
88
service_callback,
99
)
10+
from app.ai.voice.agents.breeze_buddy.memory.identity import resolve_customer_key
11+
from app.ai.voice.agents.breeze_buddy.memory.service import MemoryService
1012
from app.ai.voice.agents.breeze_buddy.observability.tracing_setup import (
1113
update_span_with_evaluation_data,
1214
)
@@ -15,6 +17,10 @@
1517
publish_hold_transfer_result,
1618
summarize_transcription,
1719
)
20+
from app.core.config.dynamic import (
21+
BUDDY_MEMORY_BACKEND as resolve_memory_backend,
22+
BUDDY_MEMORY_ENABLED as is_memory_enabled,
23+
)
1824
from app.core.logger import logger
1925
from app.core.logger.context import clear_log_context
2026
from app.database.accessor.breeze_buddy.chat_session import (
@@ -259,6 +265,48 @@ async def end_conversation(context: TemplateContext, args, transition_to=None):
259265
# so context.lead.outcome reflects the final persisted value.
260266
update_span_with_evaluation_data(context)
261267

268+
# ── Memory extraction enqueue ─────────────────────────────────────
269+
# Best-effort, must never block end_conversation.
270+
# The drain worker re-reads the transcript from DB (already saved above).
271+
# Gates: global kill-switch AND per-template opt-in (MemoryConfig.enabled).
272+
mem_cfg = getattr(getattr(context.bot, "configurations", None), "memory", None)
273+
if await is_memory_enabled() and mem_cfg and mem_cfg.enabled and context.lead:
274+
try:
275+
resolved = await resolve_customer_key(
276+
reseller_id=context.lead.reseller_id or "",
277+
merchant_id=context.lead.merchant_id or "",
278+
payload=payload,
279+
)
280+
if resolved:
281+
customer_key, key_type = resolved
282+
phone_raw = payload.get("customer_mobile_number") or payload.get(
283+
"phone"
284+
)
285+
explicit_cid = (
286+
payload.get("customer_id") if key_type == "phone" else None
287+
)
288+
# Backend: per-template override wins, else Redis/DevCycle dynamic default.
289+
backend_name = mem_cfg.backend or await resolve_memory_backend()
290+
await MemoryService().enqueue_extraction(
291+
kind="voice_lead",
292+
record_id=str(context.lead.id),
293+
customer_key=customer_key,
294+
key_type=key_type,
295+
reseller_id=context.lead.reseller_id or "",
296+
merchant_id=context.lead.merchant_id or "",
297+
source_channel="voice",
298+
phone=str(phone_raw) if phone_raw else None,
299+
explicit_customer_id=(
300+
str(explicit_cid) if explicit_cid else None
301+
),
302+
backend=backend_name,
303+
extraction_prompt=mem_cfg.extraction_prompt,
304+
)
305+
except Exception as mem_err:
306+
logger.warning(
307+
f"[memory] enqueue failed for lead {context.lead.id}: {mem_err}"
308+
)
309+
262310
# Execute end_conversation_callbacks
263311
if context.end_conversation_callbacks:
264312
logger.info(

app/ai/voice/agents/breeze_buddy/memory/backends/base.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,12 +66,16 @@ async def ingest(
6666
identity: MemoryIdentity,
6767
transcript: List[Dict[str, Any]],
6868
source_channel: str,
69+
extraction_prompt: Optional[str] = None,
6970
) -> None:
7071
"""Persist durable memory from a conversation transcript.
7172
7273
The backend owns extraction: pgvector runs the LLM consolidation +
7374
embedding + dedup itself; supermemory hands the transcript off and
7475
lets the service extract.
76+
77+
`extraction_prompt` overrides the default LLM system prompt when set
78+
(pgvector only; supermemory ignores it as extraction is server-side).
7579
"""
7680

7781
@abstractmethod

app/ai/voice/agents/breeze_buddy/memory/backends/pgvector/backend.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ async def ingest(
6565
identity: MemoryIdentity,
6666
transcript: List[Dict[str, Any]],
6767
source_channel: str,
68+
extraction_prompt: Optional[str] = None,
6869
) -> None:
6970
"""Extract durable facts from the transcript and upsert them."""
7071
if not transcript:
@@ -78,7 +79,11 @@ async def ingest(
7879
logger.error(f"[memory.pgvector] fetch existing facts failed: {e}")
7980
existing = []
8081

81-
ops = await consolidate(existing_facts=existing, transcript=transcript)
82+
ops = await consolidate(
83+
existing_facts=existing,
84+
transcript=transcript,
85+
extraction_prompt=extraction_prompt,
86+
)
8287
if not ops:
8388
return
8489

@@ -200,9 +205,30 @@ async def _apply_op(
200205

201206
elif verb == "UPDATE":
202207
old_fact_text = (op.get("supersedes_fact") or "").strip()
208+
209+
# 1. Exact match (free, no embedding call)
203210
old_mem = next(
204211
(m for m in existing if m.fact.strip() == old_fact_text), None
205212
)
213+
214+
# 2. Embedding similarity fallback — catches LLM paraphrases of the
215+
# stored fact (the LLM rarely reproduces exact stored text).
216+
if not old_mem and old_fact_text and existing:
217+
old_embedding = await embed_single(old_fact_text)
218+
if old_embedding:
219+
best_sim, best_mem = 0.0, None
220+
for m in existing:
221+
if m.embedding:
222+
sim = _cosine_similarity(old_embedding, m.embedding)
223+
if sim > best_sim:
224+
best_sim, best_mem = sim, m
225+
if best_mem and best_sim >= 0.80:
226+
old_mem = best_mem
227+
logger.debug(
228+
f"[memory.pgvector] UPDATE fuzzy-matched supersedes_fact "
229+
f"sim={best_sim:.3f} old={old_mem.fact!r}"
230+
)
231+
206232
if old_mem:
207233
await supersede_memory(str(old_mem.id))
208234
embedding = await embed_single(fact)

app/ai/voice/agents/breeze_buddy/memory/backends/pgvector/extract.py

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,25 @@
2525
2626
Output a JSON list of memory operations. Each operation has:
2727
"op": "ADD" | "UPDATE" | "DELETE"
28-
"fact": short sentence (one durable personalization/preference/attribute/outcome)
29-
"category": one of ["preference","attribute","outcome","context"] (optional)
30-
"structured": optional dict with machine-readable fields (optional)
31-
"supersedes_fact": (UPDATE/DELETE only) the exact text of the KNOWN_FACT being replaced/removed
28+
"fact": short sentence (one durable fact about the customer)
29+
"category": one of ["preference", "attribute", "outcome", "context"]
30+
"structured": optional dict with machine-readable fields, e.g. {"name": "Amreet"}
31+
"supersedes_fact": (UPDATE/DELETE only) the closest matching text from KNOWN_FACTS
3232
3333
Rules:
34-
- Only capture durable facts worth remembering across future conversations.
35-
- Ignore small talk, greetings, PII (passwords, full card numbers, OTPs).
36-
- If a new fact contradicts a known fact, emit UPDATE (not ADD).
37-
- If a known fact is confirmed still true, emit nothing (no op).
38-
- If no new facts are worth storing, return an empty list [].
34+
- Only capture facts durable enough to be useful in a future conversation.
35+
- ALWAYS capture: the customer's name or preferred form of address, any explicit
36+
corrections they make to previously stated information, travel preferences,
37+
stated outcomes, and personal attributes they volunteer.
38+
- A customer stating their name is NOT a greeting to ignore — it is a high-value
39+
attribute. Capture it with category "attribute" and structured {"name": "<value>"}.
40+
- Ignore: passwords, full card numbers, OTPs, bank account numbers, and one-time
41+
transactional details with no future value. Do NOT treat a customer's name or
42+
identity as PII to ignore.
43+
- If a new fact contradicts or corrects a KNOWN_FACT, emit UPDATE (not ADD).
44+
Set supersedes_fact to the closest matching text from KNOWN_FACTS.
45+
- If a known fact is confirmed still true, emit nothing.
46+
- If no facts are worth storing, return [].
3947
- Keep each fact concise (one sentence).
4048
4149
Return ONLY valid JSON — a list of operation objects, no markdown fences."""
@@ -77,9 +85,12 @@ def _find_duplicate(
7785
async def consolidate(
7886
existing_facts: List[UserMemory],
7987
transcript: List[Dict[str, Any]],
88+
extraction_prompt: Optional[str] = None,
8089
) -> List[Dict[str, Any]]:
8190
"""Run the LLM extraction and return raw op dicts.
8291
92+
Uses `extraction_prompt` if provided (template-level override), otherwise
93+
falls back to the built-in `_SYSTEM_PROMPT`.
8394
The worker applies these ops against the DB (insert/supersede).
8495
Returns [] on failure (safe to ignore).
8596
"""
@@ -89,6 +100,8 @@ async def consolidate(
89100
try:
90101
llm = await _resolve_azure(None)
91102

103+
system_prompt = extraction_prompt or _SYSTEM_PROMPT
104+
92105
known_lines = (
93106
"\n".join(f"- [{m.category or 'fact'}] {m.fact}" for m in existing_facts)
94107
if existing_facts
@@ -102,7 +115,7 @@ async def consolidate(
102115

103116
params = OpenAILLMInvocationParams( # type: ignore[call-overload]
104117
messages=[ # type: ignore[arg-type]
105-
{"role": "system", "content": _SYSTEM_PROMPT},
118+
{"role": "system", "content": system_prompt},
106119
{
107120
"role": "user",
108121
"content": (

app/ai/voice/agents/breeze_buddy/memory/backends/supermemory/backend.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,9 @@ async def ingest(
8282
identity: MemoryIdentity,
8383
transcript: List[dict],
8484
source_channel: str,
85+
extraction_prompt: Optional[str] = None,
8586
) -> None:
87+
# extraction_prompt is ignored — supermemory extraction is server-side.
8688
if not transcript:
8789
return
8890
try:

app/ai/voice/agents/breeze_buddy/memory/service.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ async def enqueue_extraction(
7676
phone: Optional[str] = None,
7777
explicit_customer_id: Optional[str] = None,
7878
backend: Optional[str] = None,
79+
extraction_prompt: Optional[str] = None,
7980
) -> None:
8081
"""Push an extraction job onto the Redis queue.
8182
@@ -111,6 +112,7 @@ async def enqueue_extraction(
111112
"phone": phone,
112113
"explicit_customer_id": explicit_customer_id,
113114
"backend": backend,
115+
"extraction_prompt": extraction_prompt,
114116
}
115117
)
116118
await client.rpush(_QUEUE_KEY, payload) # type: ignore[union-attr]

app/ai/voice/agents/breeze_buddy/memory/worker.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ async def _process_item(item: Dict[str, Any]) -> None:
6767
phone: Optional[str] = item.get("phone") or None
6868
explicit_customer_id: Optional[str] = item.get("explicit_customer_id") or None
6969
backend_name: Optional[str] = item.get("backend") or None
70+
extraction_prompt: Optional[str] = item.get("extraction_prompt") or None
7071

7172
if not (kind and record_id and customer_key and reseller_id and merchant_id):
7273
logger.warning(f"[memory.worker] incomplete item, skipping: {item}")
@@ -94,7 +95,7 @@ async def _process_item(item: Dict[str, Any]) -> None:
9495
if phone and explicit_customer_id and key_type == "phone":
9596
identity = await backend.merge_identity(identity)
9697

97-
await backend.ingest(identity, transcript, source_channel)
98+
await backend.ingest(identity, transcript, source_channel, extraction_prompt)
9899

99100

100101
async def _fetch_transcript(kind: str, record_id: str) -> List[Dict[str, Any]]:

app/ai/voice/agents/breeze_buddy/template/types.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1271,6 +1271,23 @@ class MemoryConfig(BaseModel):
12711271
"When None, falls back to the global BUDDY_MEMORY_BACKEND env."
12721272
),
12731273
)
1274+
max_facts: int = Field(
1275+
20,
1276+
description=(
1277+
"Maximum number of memory facts injected into the LLM context at "
1278+
"call start. Increase for richer recall; decrease to keep the "
1279+
"context window tight. Default 20."
1280+
),
1281+
)
1282+
extraction_prompt: Optional[str] = Field(
1283+
None,
1284+
description=(
1285+
"Override the default LLM extraction prompt used by the pgvector "
1286+
"backend when consolidating facts from a conversation. When None, "
1287+
"the built-in prompt is used. Has no effect on the supermemory "
1288+
"backend (extraction is server-side)."
1289+
),
1290+
)
12741291

12751292

12761293
class ConfigurationModel(BaseModel):

app/core/config/dynamic.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
import json
22

3+
from app.core.config.static import (
4+
BUDDY_MEMORY_BACKEND as _STATIC_MEMORY_BACKEND,
5+
BUDDY_MEMORY_ENABLED as _STATIC_MEMORY_ENABLED,
6+
)
37
from app.core.logger import logger
48
from app.services.live_config.store import get_config
59

@@ -46,6 +50,28 @@ async def ENABLE_BACKGROUND_TASKS() -> bool:
4650
return await get_config("ENABLE_BACKGROUND_TASKS", "false", bool)
4751

4852

53+
async def BUDDY_MEMORY_ENABLED() -> bool:
54+
"""Global persistent-memory kill-switch, Redis/DevCycle-overridable.
55+
56+
Defaults to the static BUDDY_MEMORY_ENABLED env (off unless explicitly
57+
set). Ops can enable or disable memory across all calls at runtime by
58+
flipping this key in Redis/DevCycle without a pod restart.
59+
Note: the drain-worker registration in main.py and the pgvector codec
60+
gate in database/__init__.py still read the static value at startup.
61+
"""
62+
return await get_config("BUDDY_MEMORY_ENABLED", _STATIC_MEMORY_ENABLED, bool)
63+
64+
65+
async def BUDDY_MEMORY_BACKEND() -> str:
66+
"""Active persistent-memory backend ("pgvector" | "supermemory").
67+
68+
Redis/DevCycle-overridable so ops can switch backends at runtime without a
69+
redeploy; defaults to the static BUDDY_MEMORY_BACKEND env. A template's
70+
MemoryConfig.backend still takes precedence over this at the call-site.
71+
"""
72+
return await get_config("BUDDY_MEMORY_BACKEND", _STATIC_MEMORY_BACKEND, str)
73+
74+
4975
# ----------------------------------------------------------------------------
5076
# Dispatcher dials. Re-read on every invocation, so DevCycle / Redis changes
5177
# propagate without a pod restart. See docs/BACKLOG_DISPATCHER_REDESIGN.md

0 commit comments

Comments
 (0)