Feat/memory integration#803
Conversation
|
Important Review skippedAuto incremental reviews are disabled on this repository. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughThis PR implements persistent, cross-conversation user memory for Breeze Buddy voice and chat agents. The system captures durable facts via Azure LLM-driven extraction from conversation transcripts, deduplicates using embeddings, stores in Postgres with pgvector support, and provides searchable memory to future conversations. A Redis-backed background worker processes extraction jobs asynchronously. ChangesPersistent User Memory System
Sequence Diagram(s)sequenceDiagram
participant VoiceAgent
participant MemoryService
participant Redis
participant Worker
participant Database
participant LLM as Azure LLM
VoiceAgent->>MemoryService: enqueue_extraction(kind="voice_lead", customer_key)
MemoryService->>Redis: rpush job
activate Worker
Worker->>Redis: pop batch
Worker->>Database: fetch transcript & existing memories
Database-->>Worker: transcript, [UserMemory...]
Worker->>LLM: consolidate(existing_facts, transcript)
LLM-->>Worker: [{op: "ADD", fact: "..."}...]
Worker->>Database: embed_single, insert_user_memory
Database-->>Worker: [UserMemory...]
deactivate Worker
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds the design spec and initial scaffolding for a persistent per-user memory layer (voice + chat) for Breeze Buddy, including a new pgvector-backed user_memory table, an identity resolver, an Azure-OpenAI embedding/extraction pipeline, and a Redis-queue + scheduler drain worker. Memory is opt-in via BUDDY_MEMORY_ENABLED and isolated per (reseller_id, merchant_id, customer_key).
Changes:
- New
memory/module (identity resolution, embeddings, LLM extraction, MemoryService, drain worker) + design doc. - New DB layer:
032_create_memory_tables.sqlmigration (pgvector), queries/accessors/decoders/schemas foruser_memoryandcustomer_identity, and asyncpginithook to register the vector codec. - Wire-up:
pgvectordependency, new static config, scheduler registration inmain.py, and best-effort enqueue fromend_conversation.
Reviewed changes
Copilot reviewed 20 out of 21 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| pyproject.toml | Adds pgvector runtime dependency. |
| docs/PERSISTENT_USER_MEMORY.md | Design + integration spec for the feature. |
| app/schemas/breeze_buddy/memory.py | Pydantic models: UserMemory, CustomerIdentity, MemoryKey. |
| app/main.py | Conditionally registers the memory_extraction_drain scheduler task. |
| app/database/migrations/032_create_memory_tables.sql | Creates pgvector extension, user_memory, customer_identity. |
| app/database/queries/breeze_buddy/user_memory.py | Insert/list/supersede/repoint query builders. |
| app/database/queries/breeze_buddy/customer_identity.py | Upsert alias + phone→id lookup queries. |
| app/database/decoder/breeze_buddy/user_memory.py | Decodes rows into UserMemory. |
| app/database/decoder/breeze_buddy/customer_identity.py | Decodes rows into CustomerIdentity. |
| app/database/accessor/breeze_buddy/user_memory.py | Async accessors incl. phone→id merge. |
| app/database/accessor/breeze_buddy/customer_identity.py | Alias upsert / phone lookup. |
| app/database/init.py | Registers pgvector codec on each pooled connection. |
| app/core/config/static.py | New env-driven settings for memory feature. |
| app/ai/voice/agents/breeze_buddy/memory/init.py | Module marker. |
| app/ai/voice/agents/breeze_buddy/memory/identity.py | resolve_customer_key fallback chain. |
| app/ai/voice/agents/breeze_buddy/memory/embeddings.py | Azure embeddings (singleton client) + best-effort wrappers. |
| app/ai/voice/agents/breeze_buddy/memory/extract.py | LLM consolidation + cosine dedup helpers. |
| app/ai/voice/agents/breeze_buddy/memory/service.py | MemoryService: profile block, enqueue, phase-2 search. |
| app/ai/voice/agents/breeze_buddy/memory/worker.py | Drain worker: dequeue → fetch transcript → consolidate → apply. |
| app/ai/voice/agents/breeze_buddy/handlers/internal/end_conversation.py | Best-effort enqueue on call end. |
| CORS_ALLOWED_ORIGINS, | ||
| DAILY_API_KEY, | ||
| DAILY_API_URL, | ||
| DAILY_ROOM_MAX_POOL_SIZE, | ||
| MEMORY_EXTRACTION_INTERVAL_SECONDS, | ||
| DAILY_ROOM_POOL_SIZE, |
| elif verb == "UPDATE": | ||
| old_fact_text = (op.get("supersedes_fact") or "").strip() | ||
| old_mem = next( | ||
| (m for m in existing if m.fact.strip() == old_fact_text), None | ||
| ) | ||
| if old_mem: | ||
| await supersede_memory(str(old_mem.id)) | ||
| embedding = await embed_single(fact) | ||
| await insert_user_memory( |
| for op in ops: | ||
| try: | ||
| await _apply_op( | ||
| op=op, | ||
| existing=existing, | ||
| reseller_id=reseller_id, | ||
| merchant_id=merchant_id, | ||
| customer_key=customer_key, | ||
| key_type=key_type, | ||
| source_channel=source_channel, | ||
| ) |
| async def search( | ||
| self, | ||
| reseller_id: str, | ||
| merchant_id: str, | ||
| customer_key: str, | ||
| query_embedding: List[float], | ||
| k: int = 5, | ||
| ) -> List[str]: |
| import math | ||
| from typing import Any, Dict, List, Optional | ||
|
|
||
| from app.ai.voice.agents.breeze_buddy.llm import _resolve_azure |
| try: | ||
| from pgvector.asyncpg import register_vector as _register_vector | ||
|
|
||
| _PGVECTOR_AVAILABLE = True | ||
| except ImportError: | ||
| _PGVECTOR_AVAILABLE = False | ||
|
|
||
|
|
||
| async def _init_connection(conn: asyncpg.Connection) -> None: | ||
| """Register the pgvector codec on each new connection.""" | ||
| if _PGVECTOR_AVAILABLE: | ||
| await _register_vector(conn) # type: ignore[arg-type] |
| customer_id = payload.get(id_field) or payload.get("customer_id") | ||
| if customer_id and str(customer_id).strip(): | ||
| return (str(customer_id).strip(), "customer_id") | ||
|
|
||
| # Step 2 & 3: phone path | ||
| phone_raw = payload.get(phone_field) or payload.get("customer_mobile_number") |
| "nltk", | ||
| "jmespath>=1.1.0", | ||
| "h2>=4.3.0", | ||
| "pgvector", |
| phone_raw = payload.get("customer_mobile_number") or payload.get( | ||
| "phone" | ||
| ) | ||
| explicit_cid = payload.get("customer_id") if key_type == "phone" else None |
| chunks = await llm.get_chat_completions(params) # type: ignore[arg-type] | ||
| parts = [ | ||
| chunk.choices[0].delta.content | ||
| async for chunk in chunks | ||
| if chunk.choices | ||
| and chunk.choices[0].delta | ||
| and chunk.choices[0].delta.content | ||
| ] | ||
| raw = "".join(parts).strip() |
There was a problem hiding this comment.
Actionable comments posted: 11
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
app/ai/voice/agents/breeze_buddy/memory/worker.py (1)
1-251: 🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick winAdd type hints to all function signatures.
The module is missing type hints on function parameters and return types, violating the coding guideline. As per coding guidelines, all function signatures must have type hints.
📝 Proposed type hint additions
from __future__ import annotations import json from typing import Any, Dict, List, Optional +from uuid import UUID from app.ai.voice.agents.breeze_buddy.memory.embeddings import embed_single-async def _process_item(item: Dict[str, Any]) -> None: +async def _process_item(item: Dict[str, Any]) -> None:-async def _fetch_transcript(kind: str, record_id: str) -> List[Dict[str, Any]]: +async def _fetch_transcript(kind: str, record_id: str) -> List[Dict[str, Any]]:async def _apply_op( op: Dict[str, Any], - existing: list, + existing: List[Any], reseller_id: str, merchant_id: str, customer_key: str, key_type: str, source_channel: str, ) -> None:async def _upsert_alias_and_merge( reseller_id: str, merchant_id: str, phone: str, customer_id: str, old_phone_key: str, ) -> None:Note:
_process_item,_fetch_transcript, and_upsert_alias_and_mergealready have correct signatures; the primary fix needed is tightening theexisting: listparameter in_apply_optoexisting: List[Any]for consistency.As per coding guidelines: "Add type hints on all function signatures."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/ai/voice/agents/breeze_buddy/memory/worker.py` around lines 1 - 251, The function _apply_op uses a bare list type for the existing parameter which violates the guideline; change its signature from existing: list to existing: List[Any] (import List, Any are already present) and update any internal references if needed; ensure the other functions (drain_memory_queue, _process_item, _fetch_transcript, _upsert_alias_and_merge) retain their existing type hints and run linters/tests after making the signature change.
🧹 Nitpick comments (8)
app/database/migrations/032_create_memory_tables.sql (1)
18-18: ⚖️ Poor tradeoffConsider an ANN index on
embeddingif similarity search spans many rows.
MemoryService.searchranks by embedding distance. If queries pre-filter by(reseller_id, merchant_id, customer_key)they hit the partialidx_user_memory_identityand candidate sets stay small, so a vector index may be unnecessary. But if similarity search ever runs broadly (per merchant), add anivfflat/hnswindex to avoid full scans as the table grows.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/database/migrations/032_create_memory_tables.sql` at line 18, Add a migration to create an ANN vector index on the embedding column of the user_memory table (the column shown as embedding vector(1536)) so similarity searches in MemoryService.search avoid full-table scans when pre-filters are not selective; use pgvector-backed index type (ivfflat or hnsw) and specify appropriate parameters for dimension 1536 (e.g., lists for ivfflat or ef_construction for hnsw), ensure the migration checks/creates the pgvector extension if missing, and build the index CONCURRENTLY to avoid locking and run ANALYZE afterward; reference the existing partial index name idx_user_memory_identity to keep both indexes aligned with expected query filters.app/database/__init__.py (1)
8-22: ⚡ Quick winMove
_init_connectionbelow the import block to keep isort import groups contiguous
await _register_vector(conn)is correct sincepgvector.asyncpg.register_vectoris async. However,_init_connection(and the pgvector import/flag) is defined between third-party imports andapp.*imports, which breaks the contiguous import grouping expected by isort.♻️ Suggested reorganization
import asyncpg -try: - from pgvector.asyncpg import register_vector as _register_vector - - _PGVECTOR_AVAILABLE = True -except ImportError: - _PGVECTOR_AVAILABLE = False - - -async def _init_connection(conn: asyncpg.Connection) -> None: - """Register the pgvector codec on each new connection.""" - if _PGVECTOR_AVAILABLE: - await _register_vector(conn) # type: ignore[arg-type] - - from app.core.config.static import ( POSTGRES_DB, POSTGRES_HOST, POSTGRES_MAX_OVERFLOW, POSTGRES_PASSWORD, POSTGRES_POOL_SIZE, POSTGRES_PORT, POSTGRES_USER, ) from app.core.logger import logger from app.services.aws.kms import decrypt_kms + +try: + from pgvector.asyncpg import register_vector as _register_vector + + _PGVECTOR_AVAILABLE = True +except ImportError: + _PGVECTOR_AVAILABLE = False + + +async def _init_connection(conn: asyncpg.Connection) -> None: + """Register the pgvector codec on each new connection.""" + if _PGVECTOR_AVAILABLE: + await _register_vector(conn) # type: ignore[arg-type]🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/database/__init__.py` around lines 8 - 22, Move the pgvector-related import, flag and async helper below the app.* import block so isort sees all third-party imports grouped together; specifically relocate the try/except that defines _PGVECTOR_AVAILABLE and the async function _init_connection (which calls await _register_vector(conn) and references _PGVECTOR_AVAILABLE) to after the existing app.* imports, keeping the symbols _register_vector, _PGVECTOR_AVAILABLE and _init_connection intact and otherwise unchanged.app/ai/voice/agents/breeze_buddy/memory/service.py (1)
110-120: Consider pushing top-k similarity search into pgvector.
searchloads all of a user's memories and computes cosine in Python. For the current small per-user fact sets this is fine, but the migration enables pgvector — an indexedORDER BY embedding <=> $query LIMIT kquery would scale better and avoid transferring all rows/embeddings into the app. Worth keeping in mind if per-user memory volume grows.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/ai/voice/agents/breeze_buddy/memory/service.py` around lines 110 - 120, The current top-k similarity is computed in Python by loading all memories and using _cosine_similarity; change this to push the top-k search into Postgres/pgvector by adding or updating a DB helper (e.g., replace or augment list_user_memories with a new list_user_memories_topk or add a parameter top_k) that issues a parameterized SQL query using the pgvector distance operator (embedding <=> $1) with ORDER BY embedding <=> $query_embedding LIMIT $k and returns only the top k fact values; update the caller in service.py (the block using memories, _cosine_similarity, query_embedding, and k) to call that DB helper and return the facts directly.app/ai/voice/agents/breeze_buddy/memory/extract.py (1)
39-39: 💤 Low valueOptional: strip markdown fences before
json.loads.The prompt forbids fences, but models occasionally wrap output in
```jsonregardless. A defensive strip avoids discarding an otherwise valid extraction.Also applies to: 123-123
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/ai/voice/agents/breeze_buddy/memory/extract.py` at line 39, The JSON parsing currently assumes clean model output; before calling json.loads in this module (where json.loads is invoked in extract.py), defensively strip surrounding markdown fences and wrapping backticks (handle ```json ... ```, ``` ... ```, and inline backticks) by detecting and extracting the inner content (e.g., regex to capture between fences or strip leading/trailing backticks/whitespace) and then pass that cleaned string to json.loads so valid JSON wrapped in fences is still parsed.app/database/accessor/breeze_buddy/customer_identity.py (1)
29-35: ⚡ Quick winRe-raising deviates from the accessor-layer error convention.
The established accessor contract catches broad exceptions and returns a safe sentinel (
Nonefor single-value accessors) rather than re-raising, signalling severity via log level. These functions log thenraise, which diverges from that convention and forces every caller to wrap intry/except(asresolve_customer_keyalready does). Consider returningNoneto stay consistent.Based on learnings: accessor functions should "return a safe sentinel value on failure (
Nonefor single-value accessors,[]for list accessors) rather than re-raising."🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/database/accessor/breeze_buddy/customer_identity.py` around lines 29 - 35, The except block in upsert_alias currently logs the error then re-raises, which breaks the accessor-layer convention; change the behavior in customer_identity.upsert_alias to log the failure (keep exc_info=True) and return None as a safe sentinel instead of raising so callers like resolve_customer_key don't need to wrap calls in try/except; ensure the function signature and any callers that expect a value handle a None return appropriately.app/database/accessor/breeze_buddy/user_memory.py (1)
29-34: 💤 Low valueRe-raise vs. accessor sentinel convention (see
customer_identityaccessor).Same deviation as flagged in
customer_identity.py: the layer's convention returns sentinels ([]for list accessors,Nonefor single,0for the merge count) instead of re-raising. Applies to all four functions here.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/database/accessor/breeze_buddy/user_memory.py` around lines 29 - 34, The functions in user_memory.py currently log exceptions then re-raise, but the accessor layer convention is to return sentinels instead of propagating errors; update the four functions (list_user_memories, get_user_memory, upsert_user_memory, merge_user_memories) so that on Exception they log with exc_info=True and then return the appropriate sentinel: list_user_memories -> return [], single-accessors like get_user_memory and upsert_user_memory -> return None, and merge_user_memories (merge count) -> return 0, removing the raise.app/database/queries/breeze_buddy/user_memory.py (1)
27-27: 💤 Low valueMove
import jsonto module top.In-function import deviates from the stdlib→third-party→app import ordering.
jsonis stdlib and should sit with the top-level imports (alongsidetyping).As per coding guidelines: "Follow import order: stdlib -> third-party -> app, enforced by isort".♻️ Proposed change
"""SQL query builders for user_memory table.""" +import json from typing import Any, List, Optional, Tuple- import json - values: List[Any] = [🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/database/queries/breeze_buddy/user_memory.py` at line 27, The in-function "import json" should be moved to the module top to follow stdlib→third-party→app ordering; remove the inline "import json" and add "import json" alongside the other top-level imports in user_memory.py (near existing "typing" imports) so the stdlib import is declared once at module scope.app/ai/voice/agents/breeze_buddy/handlers/internal/end_conversation.py (1)
263-301: 💤 Low valueConsider removing defensive fallbacks for
reseller_idandmerchant_id.Lines 277-279 and 292-293 use
or ""fallbacks forcontext.lead.reseller_idandcontext.lead.merchant_id. Based on learnings,LeadCallTracker.reseller_idis enforced as a required non-Optional[str] at the Pydantic layer, so these fields should never be None when a lead exists. The fallbacks are defensive but may mask schema violations.♻️ Proposed cleanup
resolved = await resolve_customer_key( - reseller_id=context.lead.reseller_id or "", - merchant_id=context.lead.merchant_id or "", + reseller_id=context.lead.reseller_id, + merchant_id=context.lead.merchant_id, payload=payload, )await MemoryService().enqueue_extraction( kind="voice_lead", record_id=str(context.lead.id), customer_key=customer_key, key_type=key_type, - reseller_id=context.lead.reseller_id or "", - merchant_id=context.lead.merchant_id or "", + reseller_id=context.lead.reseller_id, + merchant_id=context.lead.merchant_id, source_channel="voice", phone=str(phone_raw) if phone_raw else None, explicit_customer_id=str(explicit_cid) if explicit_cid else None, )Based on learnings: "Enforce that LeadCallTracker.reseller_id is a required non-Optional[str] (not None) at the Pydantic layer... rely on the DB-decoded lead always carrying a non-null reseller_id."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/ai/voice/agents/breeze_buddy/handlers/internal/end_conversation.py` around lines 263 - 301, Remove the defensive "or ''" fallbacks for context.lead.reseller_id and context.lead.merchant_id in the end_conversation memory enqueue block: pass context.lead.reseller_id and context.lead.merchant_id directly into resolve_customer_key and MemoryService().enqueue_extraction (and optionally add a short assert or raise if either is unexpectedly falsy) so the code relies on the Pydantic-enforced non-Optional fields; update any related local variable uses (e.g., the call to resolve_customer_key and the reseller_id/merchant_id kwargs to enqueue_extraction) to use the raw values instead of the empty-string fallbacks.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@app/ai/voice/agents/breeze_buddy/memory/embeddings.py`:
- Around line 53-57: The embeddings response may not preserve input order, so
update the code that calls client.embeddings.create (using
AZURE_OPENAI_EMBEDDING_DEPLOYMENT and input texts) to return embeddings aligned
to the original texts by sorting response.data by each item.index before mapping
to embedding (i.e., sort response.data by item.index and then extract
item.embedding) so the returned list matches the order of texts.
In `@app/ai/voice/agents/breeze_buddy/memory/extract.py`:
- Around line 128-134: The code currently only checks that `ops` is a list but
doesn't ensure each element is a dict, so non-dict elements will later cause
errors when the worker accesses `op["op"]`/`op["fact"]`; update the block that
parses `raw` to filter `ops` to only dicts (e.g., [o for o in ops if
isinstance(o, dict)]), log a warning via `logger.warning` if any non-dict items
were dropped (include a short preview or count), and return the filtered list
(or `[]` if none remain) while keeping the existing branch that returns `[]`
when `ops` is not a list.
In `@app/ai/voice/agents/breeze_buddy/memory/identity.py`:
- Around line 61-64: The warning in memory.identity currently logs the full
phone number variable normalized in logger.warning; change this to avoid PII by
omitting or masking the phone before logging (e.g., use or create a helper like
mask_phone(normalized) or log only the last 4 digits with a fixed prefix), and
update the logger.warning call that references normalized to use the masked
value (or remove it) while still including alias_err for context; ensure the
change is applied to the logger.warning invocation and any similar logging in
the identity.py alias lookup path.
In `@app/ai/voice/agents/breeze_buddy/memory/service.py`:
- Around line 38-44: The error logging in get_profile_block, enqueue_extraction,
and search currently emits the raw customer_key (which may be a phone number);
update those handlers to avoid logging PII by replacing the raw key with either
the key prefix/type (e.g., "phone") and a hashed or masked representation (e.g.,
sha256 or show last 4 digits only) before passing to logger.error, and ensure
the same masked/hashed helper is used consistently across the three functions
(get_profile_block, enqueue_extraction, search) to prevent leaking phone numbers
in logs.
In `@app/ai/voice/agents/breeze_buddy/memory/worker.py`:
- Around line 88-91: The warning currently logs raw phone PII in the except
block (logger.warning using phone and merge_err); change the message to omit the
raw phone and instead log a non-PII identifier (e.g., a redacted/masked phone,
phone length, or a stable hash) along with the merge_err; update the
logger.warning call in the memory worker except block (the handler that captures
merge_err) to emit only non-PII context and the error details.
In `@app/database/accessor/breeze_buddy/customer_identity.py`:
- Around line 30-34: The logger.error calls in upsert_alias and
get_customer_id_for_phone currently include raw PII (phone and customer_id);
modify these logging sites (logger.error in functions upsert_alias and
get_customer_id_for_phone) to avoid persisting raw identifiers by either
removing the phone/customer_id from the message or replacing them with a masked
form (e.g., only last 4 digits of phone or a consistent hash/ID), and keep
exc_info=True for diagnostic details; ensure all other log statements in these
functions follow the same masking rule so no raw PII is written to logs.
In `@app/database/accessor/breeze_buddy/user_memory.py`:
- Around line 67-72: Remove PII from logs in user_memory.py: in
insert_user_memory, list_user_memories, merge_phone_key_into_customer_id, and
the success/info logging, stop including raw customer_key or fact content (e.g.,
fact[:60]) in logger calls; instead log a non-identifying surrogate such as the
memory_id or a stable hash/UUID and rely on exc_info for error context. Update
the logger.error and logger.info calls inside functions named
insert_user_memory, list_user_memories, and merge_phone_key_into_customer_id to
omit customer_key/fact, replace with memory_id_or_hash, and ensure exc_info=True
is preserved for exceptions.
In `@app/database/decoder/breeze_buddy/user_memory.py`:
- Line 45: The current decoding uses the expression
"confidence=row.get('confidence') or 1.0" which treats 0.0 as falsy and replaces
stored zeros with 1.0; change it to only substitute the default when the DB
value is actually NULL by retrieving the raw value first and performing an
explicit None check (i.e. if the retrieved confidence is None then use 1.0 else
keep the retrieved value) so that a persisted 0.0 is preserved; update the
occurrence of the "confidence=row.get('confidence') or 1.0" assignment in
user_memory.py accordingly.
In `@app/schemas/breeze_buddy/memory.py`:
- Line 21: The schema field confidence in app/schemas/breeze_buddy/memory.py is
typed as float but the DB column in 032_create_memory_tables.sql allows NULL;
update the migration to set the column to NOT NULL DEFAULT 1.0 (preferred) so DB
and the model align, or if you opt to accept NULLs, change the schema field to
Optional[float] (and update any callers); specifically modify the confidence
column definition in 032_create_memory_tables.sql to "confidence real NOT NULL
DEFAULT 1.0" (or change the dataclass/Pydantic field typed name confidence to
Optional[float]) and run tests/migrations to verify decoding no longer fails.
In `@docs/PERSISTENT_USER_MEMORY.md`:
- Line 3: The status line currently reads "**Status:** Design / integration spec
(not yet implemented)." — update that single status to reflect the feature is
implemented and integrated across the stack (e.g., "**Status:** Implemented —
fully integrated (DB migrations, background workers, voice agent, etc.)");
ensure the phrasing mentions the implemented layers (migrations, background
workers, voice agent) and remove the "not yet implemented" note and any
contradictory wording elsewhere in PERSISTENT_USER_MEMORY.md.
- Around line 163-178: The repo lacks the Phase 1 read-path for injecting a
rendered <user_memory> block into role_messages; implement it by calling
MemoryService.get_profile_block where conversations are initialized and per-turn
for chat: 1) In the voice flow, update prepare_initial_node and
prepare_resume_node to accept memory_block: Optional[str] and append
{"role":"system","content":memory_block} to role_messages when memory_block is
present; fetch memory_block before those calls by invoking
resolve_customer_key(...) then await MemoryService().get_profile_block(key)
inside the same async initialization that awaits flow_manager.initialize (guard
behind config + key). 2) In ChatAgent, add memory_block: Optional[str] to
ChatAgent.__init__ and inject it as a system message in _seed_context before
assembling messages; in send_chat_message_handler, fetch memory_block each turn
(cheap DB read) and pass it into ChatAgent(..., memory_block=memory_block) so
per-turn memory updates are picked up. Ensure all new parameters are optional
and guarded by configuration and key presence.
---
Outside diff comments:
In `@app/ai/voice/agents/breeze_buddy/memory/worker.py`:
- Around line 1-251: The function _apply_op uses a bare list type for the
existing parameter which violates the guideline; change its signature from
existing: list to existing: List[Any] (import List, Any are already present) and
update any internal references if needed; ensure the other functions
(drain_memory_queue, _process_item, _fetch_transcript, _upsert_alias_and_merge)
retain their existing type hints and run linters/tests after making the
signature change.
---
Nitpick comments:
In `@app/ai/voice/agents/breeze_buddy/handlers/internal/end_conversation.py`:
- Around line 263-301: Remove the defensive "or ''" fallbacks for
context.lead.reseller_id and context.lead.merchant_id in the end_conversation
memory enqueue block: pass context.lead.reseller_id and context.lead.merchant_id
directly into resolve_customer_key and MemoryService().enqueue_extraction (and
optionally add a short assert or raise if either is unexpectedly falsy) so the
code relies on the Pydantic-enforced non-Optional fields; update any related
local variable uses (e.g., the call to resolve_customer_key and the
reseller_id/merchant_id kwargs to enqueue_extraction) to use the raw values
instead of the empty-string fallbacks.
In `@app/ai/voice/agents/breeze_buddy/memory/extract.py`:
- Line 39: The JSON parsing currently assumes clean model output; before calling
json.loads in this module (where json.loads is invoked in extract.py),
defensively strip surrounding markdown fences and wrapping backticks (handle
```json ... ```, ``` ... ```, and inline backticks) by detecting and extracting
the inner content (e.g., regex to capture between fences or strip
leading/trailing backticks/whitespace) and then pass that cleaned string to
json.loads so valid JSON wrapped in fences is still parsed.
In `@app/ai/voice/agents/breeze_buddy/memory/service.py`:
- Around line 110-120: The current top-k similarity is computed in Python by
loading all memories and using _cosine_similarity; change this to push the top-k
search into Postgres/pgvector by adding or updating a DB helper (e.g., replace
or augment list_user_memories with a new list_user_memories_topk or add a
parameter top_k) that issues a parameterized SQL query using the pgvector
distance operator (embedding <=> $1) with ORDER BY embedding <=>
$query_embedding LIMIT $k and returns only the top k fact values; update the
caller in service.py (the block using memories, _cosine_similarity,
query_embedding, and k) to call that DB helper and return the facts directly.
In `@app/database/__init__.py`:
- Around line 8-22: Move the pgvector-related import, flag and async helper
below the app.* import block so isort sees all third-party imports grouped
together; specifically relocate the try/except that defines _PGVECTOR_AVAILABLE
and the async function _init_connection (which calls await
_register_vector(conn) and references _PGVECTOR_AVAILABLE) to after the existing
app.* imports, keeping the symbols _register_vector, _PGVECTOR_AVAILABLE and
_init_connection intact and otherwise unchanged.
In `@app/database/accessor/breeze_buddy/customer_identity.py`:
- Around line 29-35: The except block in upsert_alias currently logs the error
then re-raises, which breaks the accessor-layer convention; change the behavior
in customer_identity.upsert_alias to log the failure (keep exc_info=True) and
return None as a safe sentinel instead of raising so callers like
resolve_customer_key don't need to wrap calls in try/except; ensure the function
signature and any callers that expect a value handle a None return
appropriately.
In `@app/database/accessor/breeze_buddy/user_memory.py`:
- Around line 29-34: The functions in user_memory.py currently log exceptions
then re-raise, but the accessor layer convention is to return sentinels instead
of propagating errors; update the four functions (list_user_memories,
get_user_memory, upsert_user_memory, merge_user_memories) so that on Exception
they log with exc_info=True and then return the appropriate sentinel:
list_user_memories -> return [], single-accessors like get_user_memory and
upsert_user_memory -> return None, and merge_user_memories (merge count) ->
return 0, removing the raise.
In `@app/database/migrations/032_create_memory_tables.sql`:
- Line 18: Add a migration to create an ANN vector index on the embedding column
of the user_memory table (the column shown as embedding vector(1536)) so
similarity searches in MemoryService.search avoid full-table scans when
pre-filters are not selective; use pgvector-backed index type (ivfflat or hnsw)
and specify appropriate parameters for dimension 1536 (e.g., lists for ivfflat
or ef_construction for hnsw), ensure the migration checks/creates the pgvector
extension if missing, and build the index CONCURRENTLY to avoid locking and run
ANALYZE afterward; reference the existing partial index name
idx_user_memory_identity to keep both indexes aligned with expected query
filters.
In `@app/database/queries/breeze_buddy/user_memory.py`:
- Line 27: The in-function "import json" should be moved to the module top to
follow stdlib→third-party→app ordering; remove the inline "import json" and add
"import json" alongside the other top-level imports in user_memory.py (near
existing "typing" imports) so the stdlib import is declared once at module
scope.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 80ca1970-7220-4520-a976-69a2edd32103
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (20)
app/ai/voice/agents/breeze_buddy/handlers/internal/end_conversation.pyapp/ai/voice/agents/breeze_buddy/memory/__init__.pyapp/ai/voice/agents/breeze_buddy/memory/embeddings.pyapp/ai/voice/agents/breeze_buddy/memory/extract.pyapp/ai/voice/agents/breeze_buddy/memory/identity.pyapp/ai/voice/agents/breeze_buddy/memory/service.pyapp/ai/voice/agents/breeze_buddy/memory/worker.pyapp/core/config/static.pyapp/database/__init__.pyapp/database/accessor/breeze_buddy/customer_identity.pyapp/database/accessor/breeze_buddy/user_memory.pyapp/database/decoder/breeze_buddy/customer_identity.pyapp/database/decoder/breeze_buddy/user_memory.pyapp/database/migrations/032_create_memory_tables.sqlapp/database/queries/breeze_buddy/customer_identity.pyapp/database/queries/breeze_buddy/user_memory.pyapp/main.pyapp/schemas/breeze_buddy/memory.pydocs/PERSISTENT_USER_MEMORY.mdpyproject.toml
| response = await client.embeddings.create( | ||
| model=AZURE_OPENAI_EMBEDDING_DEPLOYMENT, | ||
| input=texts, | ||
| ) | ||
| return [item.embedding for item in response.data] |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
Does Azure OpenAI / openai Python SDK embeddings.create guarantee response.data is returned in the same order as the input array?
💡 Result:
No—there is no explicit guarantee in the official API docs that the elements in response.data are in the same order as the input array. Instead, the response includes an index field for each embedding that identifies the embedding’s position in the input. In practice, response.data is commonly sorted/constructed to correspond to input positions, and the Python community helper does an explicit sort by embedding["index"] “to maintain the same order as input.”[1] That means you should not rely on response.data ordering; map by index instead. Azure OpenAI uses the same /embeddings response schema (data is an array of embedding objects with an index field), so the ordering guarantee question is answered the same way for Azure OpenAI embeddings.create: use the per-item index to align outputs to inputs.[2][3]
Citations:
- 1: https://github.com/openai/openai-python/blob/cf03fe16a92cd01f2a8867537399c12e183ba58e/openai/embeddings_utils.py
- 2: https://github.com/openai/openai-python/blob/5e8f09c2/src/openai/resources/embeddings.py
- 3: https://dev.to/artemooon/building-production-ready-semantic-search-with-python-and-snowflake-cortex-42a7
Ensure embeddings align with input texts by sorting response.data via item.index
response.data has no documented guarantee of preserving input order; align outputs to inputs using each embedding object's index.
🛡️ Defensive ordering
- return [item.embedding for item in response.data]
+ ordered = sorted(response.data, key=lambda item: item.index)
+ return [item.embedding for item in ordered]🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/ai/voice/agents/breeze_buddy/memory/embeddings.py` around lines 53 - 57,
The embeddings response may not preserve input order, so update the code that
calls client.embeddings.create (using AZURE_OPENAI_EMBEDDING_DEPLOYMENT and
input texts) to return embeddings aligned to the original texts by sorting
response.data by each item.index before mapping to embedding (i.e., sort
response.data by item.index and then extract item.embedding) so the returned
list matches the order of texts.
| ops: List[Dict[str, Any]] = json.loads(raw) | ||
| if not isinstance(ops, list): | ||
| logger.warning( | ||
| f"[memory.extract] LLM returned non-list: {raw[:200]}" | ||
| ) | ||
| return [] | ||
| return ops |
There was a problem hiding this comment.
Validate that each op is a dict, not just that the top level is a list.
The guard only checks isinstance(ops, list). If the model returns e.g. a list of strings or nested lists, those elements flow downstream where the worker indexes op["op"]/op["fact"], causing TypeError/KeyError. Filtering to dicts keeps the "safe to ignore" contract.
🛡️ Filter to dict ops
ops: List[Dict[str, Any]] = json.loads(raw)
if not isinstance(ops, list):
logger.warning(
f"[memory.extract] LLM returned non-list: {raw[:200]}"
)
return []
- return ops
+ return [op for op in ops if isinstance(op, dict)]📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| ops: List[Dict[str, Any]] = json.loads(raw) | |
| if not isinstance(ops, list): | |
| logger.warning( | |
| f"[memory.extract] LLM returned non-list: {raw[:200]}" | |
| ) | |
| return [] | |
| return ops | |
| ops: List[Dict[str, Any]] = json.loads(raw) | |
| if not isinstance(ops, list): | |
| logger.warning( | |
| f"[memory.extract] LLM returned non-list: {raw[:200]}" | |
| ) | |
| return [] | |
| return [op for op in ops if isinstance(op, dict)] |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/ai/voice/agents/breeze_buddy/memory/extract.py` around lines 128 - 134,
The code currently only checks that `ops` is a list but doesn't ensure each
element is a dict, so non-dict elements will later cause errors when the worker
accesses `op["op"]`/`op["fact"]`; update the block that parses `raw` to filter
`ops` to only dicts (e.g., [o for o in ops if isinstance(o, dict)]), log a
warning via `logger.warning` if any non-dict items were dropped (include a short
preview or count), and return the filtered list (or `[]` if none remain) while
keeping the existing branch that returns `[]` when `ops` is not a list.
| logger.warning( | ||
| f"[memory.identity] alias lookup failed for phone {normalized!r}: " | ||
| f"{alias_err}" | ||
| ) |
There was a problem hiding this comment.
Avoid logging the normalized phone number.
The alias-lookup warning logs normalized (the customer's phone) — a user identifier. Replace it with a masked form or omit it.
Based on learnings: in Breeze Buddy, verify that no PII/secrets are logged, transmitted, or persisted at every access point.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/ai/voice/agents/breeze_buddy/memory/identity.py` around lines 61 - 64,
The warning in memory.identity currently logs the full phone number variable
normalized in logger.warning; change this to avoid PII by omitting or masking
the phone before logging (e.g., use or create a helper like
mask_phone(normalized) or log only the last 4 digits with a fixed prefix), and
update the logger.warning call that references normalized to use the masked
value (or remove it) while still including alias_err for context; ensure the
change is applied to the logger.warning invocation and any similar logging in
the identity.py alias lookup path.
| except Exception as e: | ||
| logger.error( | ||
| f"[memory.service] get_profile_block failed " | ||
| f"(key={customer_key!r}): {e}", | ||
| exc_info=True, | ||
| ) | ||
| return None |
There was a problem hiding this comment.
Avoid logging raw customer_key — it can contain a phone number (PII).
When identity resolution falls back to a provisional phone:<normalized> key, these log lines emit a raw phone number. The same pattern repeats in enqueue_extraction (Lines 80-83) and search (Lines 121-126). Consider logging the key type/prefix or a masked/hashed value instead of the full key across all three call sites.
Based on learnings (manas-narra, PR 594): for files under app/ai/voice/agents/breeze_buddy, verify no PII/secrets are logged, transmitted, or persisted.
🧰 Tools
🪛 Ruff (0.15.15)
[warning] 38-38: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/ai/voice/agents/breeze_buddy/memory/service.py` around lines 38 - 44, The
error logging in get_profile_block, enqueue_extraction, and search currently
emits the raw customer_key (which may be a phone number); update those handlers
to avoid logging PII by replacing the raw key with either the key prefix/type
(e.g., "phone") and a hashed or masked representation (e.g., sha256 or show last
4 digits only) before passing to logger.error, and ensure the same masked/hashed
helper is used consistently across the three functions (get_profile_block,
enqueue_extraction, search) to prevent leaking phone numbers in logs.
| except Exception as merge_err: | ||
| logger.warning( | ||
| f"[memory.worker] merge failed for phone={phone!r}: {merge_err}" | ||
| ) |
There was a problem hiding this comment.
Avoid logging phone numbers (PII).
Line 90 logs the raw phone variable in a warning message. Phone numbers are personally identifiable information and should not be logged. Based on learnings, ensure that no PII is logged, transmitted, or persisted unnecessarily.
🔒 Proposed fix to redact phone number
except Exception as merge_err:
logger.warning(
- f"[memory.worker] merge failed for phone={phone!r}: {merge_err}"
+ f"[memory.worker] merge failed for phone=<redacted>: {merge_err}"
)Based on learnings: "ensure that LLM payloads are predetermined and never contain PII or secrets... verify that no PII/secrets are logged, transmitted, or persisted."
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| except Exception as merge_err: | |
| logger.warning( | |
| f"[memory.worker] merge failed for phone={phone!r}: {merge_err}" | |
| ) | |
| except Exception as merge_err: | |
| logger.warning( | |
| f"[memory.worker] merge failed for phone=<redacted>: {merge_err}" | |
| ) |
🧰 Tools
🪛 Ruff (0.15.15)
[warning] 88-88: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/ai/voice/agents/breeze_buddy/memory/worker.py` around lines 88 - 91, The
warning currently logs raw phone PII in the except block (logger.warning using
phone and merge_err); change the message to omit the raw phone and instead log a
non-PII identifier (e.g., a redacted/masked phone, phone length, or a stable
hash) along with the merge_err; update the logger.warning call in the memory
worker except block (the handler that captures merge_err) to emit only non-PII
context and the error details.
| logger.error( | ||
| f"[user_memory] insert_user_memory failed " | ||
| f"(key={customer_key}, fact={fact[:60]!r}): {e}", | ||
| exc_info=True, | ||
| ) | ||
| raise |
There was a problem hiding this comment.
Don't log fact content or raw customer_key (PII).
fact[:60] is extracted user memory (durable personal facts) and customer_key can be phone:<number>; both are PII and should not be written to logs. Drop them and rely on exc_info, or log a non-identifying surrogate (e.g. memory_id/hash). The same customer_key exposure exists in list_user_memories (Line 31), merge_phone_key_into_customer_id (Lines 112-119), and the info log on success.
Based on learnings: in Breeze Buddy, verify that no PII/secrets are logged, transmitted, or persisted at every access point.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/database/accessor/breeze_buddy/user_memory.py` around lines 67 - 72,
Remove PII from logs in user_memory.py: in insert_user_memory,
list_user_memories, merge_phone_key_into_customer_id, and the success/info
logging, stop including raw customer_key or fact content (e.g., fact[:60]) in
logger calls; instead log a non-identifying surrogate such as the memory_id or a
stable hash/UUID and rely on exc_info for error context. Update the logger.error
and logger.info calls inside functions named insert_user_memory,
list_user_memories, and merge_phone_key_into_customer_id to omit
customer_key/fact, replace with memory_id_or_hash, and ensure exc_info=True is
preserved for exceptions.
| structured=structured, | ||
| embedding=embedding, | ||
| source_channel=row.get("source_channel"), | ||
| confidence=row.get("confidence") or 1.0, |
There was a problem hiding this comment.
confidence or 1.0 silently rewrites a stored 0.0.
or treats 0.0 as falsy, so a memory persisted with confidence = 0.0 decodes back as 1.0, inverting the meaning of the lowest-confidence fact. Only substitute the default when the column is actually NULL.
🐛 Proposed fix
- confidence=row.get("confidence") or 1.0,
+ confidence=row["confidence"] if row.get("confidence") is not None else 1.0,📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| confidence=row.get("confidence") or 1.0, | |
| confidence=row["confidence"] if row.get("confidence") is not None else 1.0, |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/database/decoder/breeze_buddy/user_memory.py` at line 45, The current
decoding uses the expression "confidence=row.get('confidence') or 1.0" which
treats 0.0 as falsy and replaces stored zeros with 1.0; change it to only
substitute the default when the DB value is actually NULL by retrieving the raw
value first and performing an explicit None check (i.e. if the retrieved
confidence is None then use 1.0 else keep the retrieved value) so that a
persisted 0.0 is preserved; update the occurrence of the
"confidence=row.get('confidence') or 1.0" assignment in user_memory.py
accordingly.
| structured: Dict[str, Any] = {} | ||
| embedding: Optional[List[float]] = None | ||
| source_channel: Optional[str] = None | ||
| confidence: float = 1.0 |
There was a problem hiding this comment.
confidence is non-optional here but the DB column is nullable.
In 032_create_memory_tables.sql, confidence real DEFAULT 1.0 permits NULL. If any row is written with an explicit NULL confidence, decoding into this non-optional float field will raise a validation error. Either make the column NOT NULL DEFAULT 1.0 in the migration (preferred, since 032 is new and unapplied) or relax this field to Optional[float].
🛡️ Option: tighten the column in the migration
- confidence real DEFAULT 1.0,
+ confidence real NOT NULL DEFAULT 1.0,📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| confidence: float = 1.0 | |
| confidence: Optional[float] = 1.0 |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/schemas/breeze_buddy/memory.py` at line 21, The schema field confidence
in app/schemas/breeze_buddy/memory.py is typed as float but the DB column in
032_create_memory_tables.sql allows NULL; update the migration to set the column
to NOT NULL DEFAULT 1.0 (preferred) so DB and the model align, or if you opt to
accept NULLs, change the schema field to Optional[float] (and update any
callers); specifically modify the confidence column definition in
032_create_memory_tables.sql to "confidence real NOT NULL DEFAULT 1.0" (or
change the dataclass/Pydantic field typed name confidence to Optional[float])
and run tests/migrations to verify decoding no longer fails.
| @@ -0,0 +1,244 @@ | |||
| # Persistent User Memory — Chat + Voice (Breeze Buddy) | |||
|
|
|||
| > **Status:** Design / integration spec (not yet implemented). | |||
There was a problem hiding this comment.
Update status to reflect actual implementation.
The status line claims this is a design spec "not yet implemented," but the review stack context shows this PR delivers a complete implementation across 13 layers, from database migrations through background workers to voice agent integration. Update the status to reflect that this documents the implemented feature.
📝 Suggested status update
-> **Status:** Design / integration spec (not yet implemented).
+> **Status:** Implementation complete. This document describes the architecture and integration.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| > **Status:** Design / integration spec (not yet implemented). | |
| > **Status:** Implementation complete. This document describes the architecture and integration. |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@docs/PERSISTENT_USER_MEMORY.md` at line 3, The status line currently reads
"**Status:** Design / integration spec (not yet implemented)." — update that
single status to reflect the feature is implemented and integrated across the
stack (e.g., "**Status:** Implemented — fully integrated (DB migrations,
background workers, voice agent, etc.)"); ensure the phrasing mentions the
implemented layers (migrations, background workers, voice agent) and remove the
"not yet implemented" note and any contradictory wording elsewhere in
PERSISTENT_USER_MEMORY.md.
| ## 8. Read path — Phase 1 (profile block at start) | ||
|
|
||
| Symmetric; one injection point per mode, appended **after** `inject_language_rules` so memory lands in `role_messages`. | ||
|
|
||
| - **Voice** — add optional `memory_block: Optional[str] = None` to `prepare_initial_node` and `prepare_resume_node` (`agent/flow.py`); when present, append `{"role": "system", "content": memory_block}` to `role_messages`. Fetch in `agent/__init__.py` just before those call-sites: `resolve_customer_key(self.lead.reseller_id, self.lead.merchant_id, self.lead.payload, ...)`, then `memory_block = await MemoryService().get_profile_block(key)` (guarded by config + key presence). The fetch sits in the same `async` method that already `await`s `flow_manager.initialize`. | ||
| - **Chat** — add `memory_block: Optional[str] = None` to `ChatAgent.__init__` (`chat/agent.py`); inject it as a system message into `role_messages` in `_seed_context` before assembling `messages`. The handler `send_chat_message_handler` fetches it per turn (cheap DB read) where it already builds `template_vars` / passes `agent_state`, then `ChatAgent(..., memory_block=memory_block)`. Per-turn fetch also picks up memory written between turns. | ||
|
|
||
| Example injected block: | ||
|
|
||
| ```text | ||
| <user_memory> | ||
| - Prefers morning calls | ||
| - Address updated May 2026 | ||
| - Declined upsell twice | ||
| </user_memory> | ||
| ``` |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Description: Check if read-path files were modified to inject memory blocks
echo "=== Checking for memory block injection in voice agent flow ==="
rg -n -C5 'memory_block' app/ai/voice/agents/breeze_buddy/agent/flow.py app/ai/voice/agents/breeze_buddy/agent/__init__.py
echo -e "\n=== Checking for memory block injection in chat agent ==="
rg -n -C5 'memory_block' app/ai/voice/agents/breeze_buddy/chat/agent.py
echo -e "\n=== Checking for MemoryService.get_profile_block usage ==="
rg -n 'get_profile_block' app/ai/voice/agents/breeze_buddy/Repository: juspay/clairvoyance
Length of output: 449
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Call sites for get_profile_block across breeze_buddy ==="
rg -n 'get_profile_block\s*\(' app/ai/voice/agents/breeze_buddy/
echo -e "\n=== Look for injected memory / <user_memory> markers ==="
rg -n '<user_memory>|user_memory|profile_block|MemoryService\(' app/ai/voice/agents/breeze_buddy/
echo -e "\n=== Look for role_messages system-message assembly ==="
rg -n 'role_messages' app/ai/voice/agents/breeze_buddy/Repository: juspay/clairvoyance
Length of output: 14097
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Repo-wide call sites for get_profile_block( ==="
rg -n 'get_profile_block\s*\(' .
echo -e "\n=== Repo-wide occurrences of <user_memory> ==="
rg -n '<user_memory>' .Repository: juspay/clairvoyance
Length of output: 2195
Implement the Phase 1 <user_memory> read-path (currently only exists in docs)
MemoryService.get_profile_blockis only defined inapp/ai/voice/agents/breeze_buddy/memory/service.py; there are no code call-sites/injections ofget_profile_block(anywhere in the repo (only referenced indocs/PERSISTENT_USER_MEMORY.md).- No runtime code injects a rendered
<user_memory>...</user_memory>block intorole_messages; onlyrole_messagesassembly/in-language-rule injection exists. - As a result, the Phase 1 retrieval described in
docs/PERSISTENT_USER_MEMORY.md(§8, lines 163-178) is not implemented, so memories written/extracted won’t be retrieved at conversation start.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@docs/PERSISTENT_USER_MEMORY.md` around lines 163 - 178, The repo lacks the
Phase 1 read-path for injecting a rendered <user_memory> block into
role_messages; implement it by calling MemoryService.get_profile_block where
conversations are initialized and per-turn for chat: 1) In the voice flow,
update prepare_initial_node and prepare_resume_node to accept memory_block:
Optional[str] and append {"role":"system","content":memory_block} to
role_messages when memory_block is present; fetch memory_block before those
calls by invoking resolve_customer_key(...) then await
MemoryService().get_profile_block(key) inside the same async initialization that
awaits flow_manager.initialize (guard behind config + key). 2) In ChatAgent, add
memory_block: Optional[str] to ChatAgent.__init__ and inject it as a system
message in _seed_context before assembling messages; in
send_chat_message_handler, fetch memory_block each turn (cheap DB read) and pass
it into ChatAgent(..., memory_block=memory_block) so per-turn memory updates are
picked up. Ensure all new parameters are optional and guarded by configuration
and key presence.
d370f24 to
803d535
Compare
…backends
Adds the persistent per-user memory LIBRARY for Breeze Buddy (voice + chat)
as an orchestration layer that is OFF by default. The store + extraction
strategy sits behind a MemoryBackend interface so it is swappable. App-wiring
(the end_conversation enqueue call-site and the drain-worker registration)
lives in the stacked feat/memory-integration branch, not here.
Spec & DB layer (docs/PERSISTENT_USER_MEMORY.md, migration 032):
- Identity chain: customer_id -> alias lookup -> phone:* provisional -> None
- user_memory + customer_identity tables (pgvector ext), three-layer
queries/decoders/accessors; Pydantic UserMemory/CustomerIdentity/MemoryKey
- Queue + drain worker so extraction survives voice subprocess teardown
Pluggable backends (memory/backends/):
- base + factory: MemoryBackend ABC + MemoryIdentity + get_memory_backend()
- pgvector (default): app-side LLM extraction, Azure embeddings, cosine
dedup, supersede, phone->customer_id merge (extract + embeddings here)
- supermemory: official supermemory Python SDK (AsyncSupermemory) wrapped
with our proxy-aware httpx client. ingest -> client.add (one containerTag =
scope_tag); get_profile_block/search -> v4 client.search.memories (extracted
facts); merge_identity -> documents.list + documents.update re-tag.
supermemory owns extraction/embedding/dedup server-side
- service.py facade delegates to the backend; enqueue stamps the chosen
backend + SET NX EX dedup guard; worker.py is backend-agnostic
- template MemoryConfig.{enabled,backend}
Off by default: nothing connects unless BUDDY_MEMORY_ENABLED=true. The
pgvector codec registration is gated to (enabled && backend==pgvector)
and wrapped so a missing vector extension / un-applied migration 032
degrades with a warning instead of failing the whole DB pool. supermemory
calls fail open (no key / 401 / network error -> empty result, never blocks).
Deps: pgvector==0.4.2, supermemory>=3.45.0. Config: BUDDY_MEMORY_ENABLED,
BUDDY_MEMORY_BACKEND, AZURE_OPENAI_EMBEDDING_DEPLOYMENT, MEMORY_EXTRACTION_*,
SUPERMEMORY_API_KEY, SUPERMEMORY_BASE_URL.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
7bf737c to
e477e28
Compare
Wire the persistent-memory library into the running app: end_conversation.py (voice write-path): - Hoist memory imports to module top (resolve_customer_key, MemoryService, BUDDY_MEMORY_BACKEND as resolve_memory_backend, BUDDY_MEMORY_ENABLED) - Enqueue extraction after DB write: best-effort try/except, never blocks - Backend selection: template MemoryConfig.backend override wins, else Redis/DevCycle dynamic default via resolve_memory_backend() dynamic.py: - Add BUDDY_MEMORY_BACKEND() async function — re-reads BUDDY_MEMORY_BACKEND from Redis/DevCycle on every call, falls back to static env default. Ops can switch pgvector <-> supermemory at runtime without a redeploy. main.py: - Import drain_memory_queue from memory.worker - Register memory_extraction_drain BackgroundTaskScheduler task, gated on BUDDY_MEMORY_ENABLED (static kill-switch, off by default)
e477e28 to
d847354
Compare
Summary by CodeRabbit