feat: Google Sheets data sources for Breeze Buddy templates#813
feat: Google Sheets data sources for Breeze Buddy templates#813cmd-err wants to merge 2 commits into
Conversation
- Migrations: 026 create data_source table, 027 add template.data_sources JSONB - Service: app/services/google/sheets.py — fetch_formatted, list_tabs, get_column_headers - Schemas: DataSourceCreate/Update/Response, DataSourceRef on TemplateModel - DB layer: data_source queries + decoder + accessor (CRUD + pagination) - DB layer: template queries/decoder/accessor updated to thread data_sources JSONB - API: POST/GET/PUT/DELETE /data-sources + discovery /sheets/tabs|columns|preview - Loader: Layer 5 data_source injection; _fetch_data_source_content (Redis→live→fallback) - flow.py: propagate _data_source_messages; prepend in prepare_initial_node - Prefetch: data_source_prefetch.py wired into dispatch worker alongside greeting TTS
WalkthroughThis PR introduces a comprehensive data source integration system for Breeze Buddy templates, enabling templates to reference external Google Sheets and inject their content during rendering. It spans database schema, Google Sheets API integration, template loading enhancements, a prefetch manager for Redis caching, REST APIs with RBAC, and agent flow wiring. ChangesData Source Feature
Sequence DiagramsequenceDiagram
participant Client as API Client
participant Handler as Data Source Handler
participant Accessor as Database Accessor
participant GoogleAPI as Google Sheets API
participant DB as PostgreSQL
Client->>Handler: POST /data-sources<br/>{spreadsheet_url, ...}
Handler->>Handler: Validate non-admin reseller
Handler->>GoogleAPI: extract_spreadsheet_id
GoogleAPI-->>Handler: spreadsheet_id
Handler->>Accessor: create_data_source(...)
Accessor->>GoogleAPI: fetch_sheet_data (validate)
Accessor->>DB: INSERT into data_source
DB-->>Accessor: inserted row
Accessor-->>Handler: DataSourceResponse
Handler-->>Client: 201 with response
Client->>Handler: GET /data-sources/sheets/tabs?url=...
Handler->>GoogleAPI: list_tabs(spreadsheet_id)
GoogleAPI-->>Handler: [tab names]
Handler-->>Client: TabsResponse
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes The PR spans multiple layers (API, database, services, template loading, prefetch) with heterogeneous logic including RBAC validation, Google Sheets integration, Redis caching, and agent flow wiring. While many individual components are straightforward, the interconnected nature requires reviewing the full data flow, error handling, and integration points across system boundaries. Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Adds “Data Sources” (Google Sheets-backed) that can be attached to Breeze Buddy templates, fetched and injected into LLM context (as variables or system messages) with Redis prefetch/caching to reduce call-start latency.
Changes:
- Introduce
data_sourcepersistence (migrations + query/accessor/decoder layers) and REST CRUD + discovery endpoints (tabs/columns/preview). - Extend templates to store
data_sourcesreferences and inject fetched content during template load / initial node preparation. - Update dependencies to support Google Sheets API access; apply multiple import-format refactors.
Reviewed changes
Copilot reviewed 30 out of 32 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/breeze_buddy/dispatch/test_end_to_end.py | Import formatting changes for dispatch modules. |
| tests/breeze_buddy/dispatch/test_chaos_and_edges.py | Import formatting changes for dispatch modules. |
| tests/breeze_buddy/dispatch/conftest.py | Import formatting changes for datetime + dispatch modules. |
| pyproject.toml | Adjust dependencies (Google API client, pipecat unpinning) and tooling configs. |
| app/services/redis/client.py | Import formatting changes for Redis exceptions. |
| app/services/google/sheets.py | New Google Sheets service for tab/header/data fetch + formatting helpers. |
| app/schemas/breeze_buddy/data_source.py | New Pydantic schemas for data source API. |
| app/schemas/breeze_buddy/init.py | Export new data source schemas; adjust user imports. |
| app/database/queries/breeze_buddy/template.py | Add data_sources column to template queries (select/insert/update). |
| app/database/queries/breeze_buddy/data_source.py | New SQL query builders for data_source table. |
| app/database/migrations/027_add_data_sources_column_to_template.sql | Add template.data_sources JSONB column. |
| app/database/migrations/026_create_data_source_table.sql | Create data_source table + indexes/uniqueness constraints. |
| app/database/decoder/breeze_buddy/template.py | Decode data_sources from JSONB into template model. |
| app/database/decoder/breeze_buddy/data_source.py | New decoder mapping data_source rows to schema models. |
| app/database/accessor/breeze_buddy/template.py | Persist data_sources for create/replace template operations. |
| app/database/accessor/breeze_buddy/data_source.py | New accessors for create/get/list/update/delete data sources. |
| app/api/routers/breeze_buddy/templates/handlers.py | Wire template data_sources to template create/replace calls. |
| app/api/routers/breeze_buddy/signup/handlers.py | Import formatting changes for accessors. |
| app/api/routers/breeze_buddy/data_sources/handlers.py | New handler layer for data source CRUD + Google Sheets discovery. |
| app/api/routers/breeze_buddy/data_sources/init.py | New FastAPI router exposing data source endpoints. |
| app/api/routers/breeze_buddy/init.py | Register the data-sources router under Breeze Buddy API. |
| app/ai/voice/agents/breeze_buddy/template/types.py | Add DataSourceRef; extend template requests/models with data_sources. |
| app/ai/voice/agents/breeze_buddy/template/loader.py | Fetch/cache data source content and inject into vars/system messages. |
| app/ai/voice/agents/breeze_buddy/managers/data_source_prefetch.py | New prefetch manager to pre-warm Redis at dispatch time. |
| app/ai/voice/agents/breeze_buddy/dispatch/worker.py | Run greeting prep + data source prefetch concurrently. |
| app/ai/voice/agents/breeze_buddy/dispatch/reconcilers.py | Import formatting change for dispatch accessors. |
| app/ai/voice/agents/breeze_buddy/agent/flow.py | Propagate/consume data-source system messages in initial node context. |
| app/ai/voice/agents/automatic/tools/breeze/init.py | Import formatting changes for configuration tool exports. |
| app/ai/voice/agents/automatic/tools/init.py | Import formatting changes for dummy/system tool exports. |
| app/ai/voice/agents/automatic/services/mcp/utils.py | Import formatting change for internet_tools. |
| "pipecat-ai[daily,google,assemblyai,silero,openai,azure,elevenlabs,aic,anthropic,deepgram,soniox,mcp,sarvam,cartesia]", | ||
| "pipecat-ai-flows", |
| "black", | ||
| "isort", | ||
| "autoflake", | ||
| "pyrefly", | ||
| "pytest>=9.0.3", | ||
| "pytest-asyncio>=1.3.0", | ||
| "pyrefly" | ||
| ] |
| logger.error(f"Unexpected error listing tabs for {spreadsheet_id}: {e}") | ||
| return [] | ||
|
|
||
| return await asyncio.get_event_loop().run_in_executor(None, _fetch) |
| ) | ||
| return [] | ||
|
|
||
| return await asyncio.get_event_loop().run_in_executor(None, _fetch) |
| ) | ||
| return [] | ||
|
|
||
| return await asyncio.get_event_loop().run_in_executor(None, _fetch) |
| async def update_data_source( | ||
| data_source_id: str, | ||
| name: Optional[str] = None, | ||
| spreadsheet_url: Optional[str] = None, | ||
| sheet_name: Optional[str] = None, | ||
| columns: Optional[List[str]] = None, | ||
| format: Optional[str] = None, | ||
| is_active: Optional[bool] = None, | ||
| ) -> Optional[DataSourceResponse]: | ||
| """Update an existing data source. Only provided fields are updated.""" | ||
| try: | ||
| new_spreadsheet_id = None | ||
| if spreadsheet_url: | ||
| new_spreadsheet_id = extract_spreadsheet_id(spreadsheet_url) | ||
| if not new_spreadsheet_id: | ||
| logger.error( | ||
| f"Cannot extract spreadsheet_id from URL: {spreadsheet_url}" | ||
| ) | ||
| return None | ||
|
|
||
| now = datetime.now(timezone.utc) | ||
| columns_json = json.dumps(columns) if columns is not None else None | ||
|
|
||
| query, values = update_data_source_query( | ||
| data_source_id=data_source_id, | ||
| name=name, | ||
| spreadsheet_url=spreadsheet_url, | ||
| spreadsheet_id=new_spreadsheet_id, | ||
| sheet_name=sheet_name, | ||
| columns_json=columns_json, | ||
| format=format, | ||
| is_active=is_active, | ||
| now=now, | ||
| ) |
| await asyncio.gather( | ||
| prepare_and_store_initial_greeting( | ||
| lead_id=locked.id, | ||
| payload=locked.payload or {}, | ||
| template=template, | ||
| ), | ||
| prefetch_data_sources( | ||
| lead_id=locked.id, | ||
| template=template, | ||
| ), | ||
| return_exceptions=True, | ||
| ) |
| redis = await get_redis_service() | ||
| cached = await redis.get(cache_key) | ||
| if cached: | ||
| logger.info( | ||
| "Data source cache hit: lead=%s name=%s", lead_id, ref.name | ||
| ) | ||
| return cached |
| data_source_id: str = Field(description="UUID of the data_source entity") | ||
| name: str = Field( | ||
| description="Variable name used as {name} placeholder in template prompts" | ||
| ) | ||
| inject_as: str = Field( | ||
| default="var", | ||
| description=( | ||
| '"var" — sheet content injected into template_vars as {name}. ' | ||
| '"message" — prepended as a system message to the initial node.' | ||
| ), | ||
| ) |
| if columns: | ||
| col_set = set(columns) | ||
| records = [ | ||
| {col: r.get(col, "") for col in columns if col in col_set} | ||
| for r in records | ||
| ] |
There was a problem hiding this comment.
Actionable comments posted: 9
🧹 Nitpick comments (6)
app/services/google/sheets.py (2)
189-194: ⚡ Quick winSimplify redundant column filtering logic.
The condition
if col in col_setis redundant since you're already iteratingfor col in columns, andcol_set = set(columns). Every column fromcolumnsis by definition incol_set.♻️ Simplified logic
if columns: - col_set = set(columns) records = [ - {col: r.get(col, "") for col in columns if col in col_set} + {col: r.get(col, "") for col in columns} for r in records ]🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/services/google/sheets.py` around lines 189 - 194, The code constructs col_set = set(columns) and then uses a list comprehension that checks "if col in col_set" while iterating "for col in columns", which is redundant; remove the col_set variable and the conditional and simplify the comprehension to build each record with "{col: r.get(col, "") for col in columns}", updating the "columns", "col_set", and "records" usage accordingly.
230-230: ⚡ Quick winCSV comma replacement is lossy and may break structured data.
Replacing commas with semicolons in cell values silently corrupts data containing commas (e.g., addresses, formatted numbers). Standard CSV handling should escape commas by quoting cells or using Python's
csvmodule.🛡️ Proposed fix using proper CSV escaping
+import csv +import io + def _rows_to_csv(headers: List[str], rows: List[dict]) -> str: if not headers or not rows: return "(no data)" - lines = [",".join(headers)] - for row in rows: - cells = [str(row.get(h, "")).replace(",", ";") for h in headers] - lines.append(",".join(cells)) - return "\n".join(lines) + output = io.StringIO() + writer = csv.writer(output) + writer.writerow(headers) + for row in rows: + writer.writerow([str(row.get(h, "")) for h in headers]) + return output.getvalue()🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/services/google/sheets.py` at line 230, The current list comprehension that builds cells by doing str(row.get(h, "")).replace(",", ";") (the variable cells using headers) corrupts data by replacing commas; instead use Python's csv module to produce a properly escaped/quoted CSV row: map None to empty string, convert values to str, then write the row via csv.writer (or csv.writer on an io.StringIO) so commas are escaped/quoted rather than replaced; update the code that constructs cells (the cells list and any downstream join or file write) to use this csv-based serialization.app/ai/voice/agents/breeze_buddy/template/loader.py (1)
133-133: 💤 Low valueConsider aligning fallback sentinel with
fetch_formatted's value.The fallback string
"[Data unavailable]"differs fromfetch_formatted's documented return value"[No data available]"(seeapp/services/google/sheets.py:241-267). If any downstream code checks for a specific sentinel value, this inconsistency could cause subtle bugs. Consider using a shared constant or aligning the strings.Also applies to: 152-152, 157-157
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/ai/voice/agents/breeze_buddy/template/loader.py` at line 133, The loader.py fallback string "[Data unavailable]" is inconsistent with fetch_formatted's sentinel "[No data available]" in app/services/google/sheets.py; update the returns in loader.py (the fallback branches around the loader functions) to use the same sentinel or, better, import/use a shared constant from the sheets module so both loader functions and fetch_formatted return exactly the same "[No data available]" value.app/schemas/breeze_buddy/__init__.py (1)
63-71: 💤 Low valueConsider consolidating user imports for consistency.
The user imports are now split across three blocks (lines 63-65, 66, 67-70, 71), whereas other modules use a single
from ... import (...)block. While functionally correct, this diverges from the file's established pattern and increases visual noise.♻️ Optional consolidation
from app.schemas.breeze_buddy.users import ( + UserCreate as UserAccountCreate, + UserUpdate as UserAccountUpdate, DeleteUserResponse, -) -from app.schemas.breeze_buddy.users import UserCreate as UserAccountCreate -from app.schemas.breeze_buddy.users import ( UserListResponse, UserResponse, ) -from app.schemas.breeze_buddy.users import UserUpdate as UserAccountUpdate🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/schemas/breeze_buddy/__init__.py` around lines 63 - 71, Consolidate the multiple user import statements into a single grouped import to match the module's existing style: replace the separate imports of DeleteUserResponse, UserCreate as UserAccountCreate, UserListResponse, UserResponse, and UserUpdate as UserAccountUpdate with one from app.schemas.breeze_buddy.users import (...) block listing those identifiers together so the file follows the established consistent pattern.app/api/routers/breeze_buddy/data_sources/handlers.py (2)
35-41: ⚡ Quick winRemove unused helper function.
_resolve_reseller_idsis defined but never called. Thelist_data_sources_handlerimplements the RBAC logic inline instead (lines 95-110).🧹 Proposed removal
-def _resolve_reseller_ids(current_user: UserInfo) -> List[str]: - """Return the reseller IDs the caller is allowed to access.""" - from app.schemas.breeze_buddy.auth import UserRole - - if current_user.role == UserRole.ADMIN: - return [] # admin can see all — no filter applied in list_data_sources - return current_user.reseller_ids - - async def create_data_source_handler(🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/api/routers/breeze_buddy/data_sources/handlers.py` around lines 35 - 41, Remove the unused helper function _resolve_reseller_ids from the file since list_data_sources_handler implements the same RBAC logic inline; delete the entire _resolve_reseller_ids function definition and any now-unused imports that only served it (e.g., UserRole import in that scope), or alternatively replace the inline RBAC block inside list_data_sources_handler with a call to _resolve_reseller_ids if you prefer reuse — but do not keep both implementations to avoid dead code.
196-223: Clarify deletion policy for data sources referenced by templates
Templates storedata_sourcesJSONB refs, andapp/ai/voice/agents/breeze_buddy/template/loader.pyintentionally degrades when a referenced data source can’t be fetched by returning the literal"[Data unavailable]"(including timeout/exception paths). Confirm whetherdelete_data_source_handlershould still:
- block deletion / clean up template
data_sourcesreferences to avoid degraded template output, or- allow deletion as-is (and optionally prefer soft delete via
is_active=falseso inactive sources can be handled more intentionally).🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@app/api/routers/breeze_buddy/data_sources/handlers.py` around lines 196 - 223, Clarify and enforce a deletion policy in delete_data_source_handler: before calling delete_data_source(data_source_id) check templates that reference the data_source_id (JSONB field data_sources used by app/ai/voice/agents/breeze_buddy/template/loader.py) and either (A) block hard deletes by returning a 409 Conflict with a clear message if any template references it, or (B) perform a safe removal/soft-delete flow — prefer setting is_active=false on the data source (update via the same persistence layer used by delete_data_source) or, if you choose to allow hard delete, atomically remove the id from all templates' data_sources JSONB arrays (update templates in DB) before deleting; implement one chosen policy consistently and document it in the handler and in code comments referencing delete_data_source_handler, delete_data_source, and the template loader behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@app/ai/voice/agents/breeze_buddy/managers/data_source_prefetch.py`:
- Line 49: The Redis setex call is missing the required namespace parameter;
update the call that uses redis.setex(cache_key, content,
ttl_seconds=_CACHE_TTL) to include the namespace argument (e.g.,
namespace=<appropriate_namespace_variable_or_constant>) so keys are scoped
properly, ensuring you pass the same namespace used elsewhere in this
module/service; adjust the call in data_source_prefetch.py where redis,
cache_key and _CACHE_TTL are referenced to include namespace.
In `@app/ai/voice/agents/breeze_buddy/template/types.py`:
- Around line 2036-2039: The data_sources list on the template lacks enforcement
that each DataSourceRef.name is unique, so add a Pydantic validator (either a
`@validator`("data_sources") or a `@root_validator` in the template model in
types.py) that collects DataSourceRef.name values from data_sources, detects
duplicates, and raises a ValidationError/ValueError with a clear message listing
the duplicate names; reference the DataSourceRef type and the data_sources field
so the check runs whenever a template is constructed or parsed.
- Around line 2010-2016: Update the inject_as field to enforce only the allowed
values by changing its type annotation to a Literal of "var" and "message"
(e.g., Literal["var", "message"]) and ensure the typing import (from typing or
typing_extensions) is added; keep the default="var" and existing description, so
Pydantic will validate at model creation and prevent arbitrary strings from
being accepted by the inject_as Field used by the loader/flow logic (refer to
the inject_as Field declaration in types.py).
- Around line 2007-2009: The name Field on the template variable lacks
validation for being a safe template identifier; add a Pydantic validator for
the model that defines name (the class that contains the name: str = Field(...))
to enforce a regex like ^[a-zA-Z_][a-zA-Z0-9_]*$ and raise a ValueError for
invalid names so template substitution won't break; reference the existing field
name and add a `@validator`("name") method (or pydantic root_validator if inside a
dataclass-style model) that checks the pattern and returns the cleaned value or
raises an error.
In `@app/api/routers/breeze_buddy/data_sources/__init__.py`:
- Around line 50-82: The three endpoints get_sheet_tabs, get_sheet_columns, and
preview_sheet call list_tabs_handler, list_columns_handler, and preview_handler
without using current_user; update these routes to validate the incoming
spreadsheet_url (extract spreadsheet_id) against the caller’s allowed
reseller/merchant or against the data_sources the user can access before calling
the shared Google Sheets service, and deny access if not authorized. Add rate
limiting to these handlers (e.g., decorator/middleware applied to
get_sheet_tabs/get_sheet_columns/preview_sheet) to prevent abuse. Add
audit-style logging prior to Google API calls that logs current_user identity
plus the derived spreadsheet_id and sheet_name (avoid logging sheet content),
and ensure Google API errors still log contextual audit fields. Ensure the
authorization check and audit logging occur inside or immediately before
invoking list_tabs_handler/list_columns_handler/preview_handler.
In `@app/schemas/breeze_buddy/data_source.py`:
- Around line 22-35: Replace the free-text fields with enum types: define enums
(e.g., DataSourceType with values like "google_sheet","file","url","text" and
DataSourceFormat with "markdown_table","csv","json") and change the source_type
and format Field types to those enums in this module; also update the related
Pydantic models DataSourceUpdate and DataSourceResponse to use the same enum
types so validation occurs at parse time and consumers like fetch_formatted()
will only receive valid values.
In `@app/services/google/sheets.py`:
- Line 245: The parameter named format in the function signature shadows
Python's built-in format(); rename it (e.g., to output_format: str =
"markdown_table") in the function declaration and update every reference inside
the function (and any callers) from format to output_format, keeping the same
default value and type annotation to preserve behavior.
In `@pyproject.toml`:
- Around line 15-16: Add pytest to the project's development extras and update
the lockfile: in pyproject.toml add "pytest" under
[project.optional-dependencies].dev (so dev installs include pytest), then
regenerate uv.lock (run uv sync/uv lock) so the lock contains a pytest entry;
ensure CI or Docker build that uses uv sync/--frozen continues to rely on
uv.lock for reproducible installs and/or update
.github/workflows/pr-build-check.yml to install and run tests if you want CI to
execute them.
- Around line 40-44: The dev extras in pyproject.toml are missing pytest, so add
"pytest" to the [project.optional-dependencies].dev list (alongside "black",
"isort", "autoflake", "pyrefly") to restore the test runner used by the code
(references: the dev extras block in pyproject.toml and tests that import
pytest); after updating the list, regenerate or update the lock/install (e.g.,
run your lockfile tool or pip install -e ".[dev]") so pytest is available for
the test suite—do not re-add pytest-asyncio since tests don't reference it.
---
Nitpick comments:
In `@app/ai/voice/agents/breeze_buddy/template/loader.py`:
- Line 133: The loader.py fallback string "[Data unavailable]" is inconsistent
with fetch_formatted's sentinel "[No data available]" in
app/services/google/sheets.py; update the returns in loader.py (the fallback
branches around the loader functions) to use the same sentinel or, better,
import/use a shared constant from the sheets module so both loader functions and
fetch_formatted return exactly the same "[No data available]" value.
In `@app/api/routers/breeze_buddy/data_sources/handlers.py`:
- Around line 35-41: Remove the unused helper function _resolve_reseller_ids
from the file since list_data_sources_handler implements the same RBAC logic
inline; delete the entire _resolve_reseller_ids function definition and any
now-unused imports that only served it (e.g., UserRole import in that scope), or
alternatively replace the inline RBAC block inside list_data_sources_handler
with a call to _resolve_reseller_ids if you prefer reuse — but do not keep both
implementations to avoid dead code.
- Around line 196-223: Clarify and enforce a deletion policy in
delete_data_source_handler: before calling delete_data_source(data_source_id)
check templates that reference the data_source_id (JSONB field data_sources used
by app/ai/voice/agents/breeze_buddy/template/loader.py) and either (A) block
hard deletes by returning a 409 Conflict with a clear message if any template
references it, or (B) perform a safe removal/soft-delete flow — prefer setting
is_active=false on the data source (update via the same persistence layer used
by delete_data_source) or, if you choose to allow hard delete, atomically remove
the id from all templates' data_sources JSONB arrays (update templates in DB)
before deleting; implement one chosen policy consistently and document it in the
handler and in code comments referencing delete_data_source_handler,
delete_data_source, and the template loader behavior.
In `@app/schemas/breeze_buddy/__init__.py`:
- Around line 63-71: Consolidate the multiple user import statements into a
single grouped import to match the module's existing style: replace the separate
imports of DeleteUserResponse, UserCreate as UserAccountCreate,
UserListResponse, UserResponse, and UserUpdate as UserAccountUpdate with one
from app.schemas.breeze_buddy.users import (...) block listing those identifiers
together so the file follows the established consistent pattern.
In `@app/services/google/sheets.py`:
- Around line 189-194: The code constructs col_set = set(columns) and then uses
a list comprehension that checks "if col in col_set" while iterating "for col in
columns", which is redundant; remove the col_set variable and the conditional
and simplify the comprehension to build each record with "{col: r.get(col, "")
for col in columns}", updating the "columns", "col_set", and "records" usage
accordingly.
- Line 230: The current list comprehension that builds cells by doing
str(row.get(h, "")).replace(",", ";") (the variable cells using headers)
corrupts data by replacing commas; instead use Python's csv module to produce a
properly escaped/quoted CSV row: map None to empty string, convert values to
str, then write the row via csv.writer (or csv.writer on an io.StringIO) so
commas are escaped/quoted rather than replaced; update the code that constructs
cells (the cells list and any downstream join or file write) to use this
csv-based serialization.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 980bb686-f000-4d99-9103-f2ae50537344
⛔ Files ignored due to path filters (1)
uv.lockis excluded by!**/*.lock
📒 Files selected for processing (31)
app/ai/voice/agents/automatic/services/mcp/utils.pyapp/ai/voice/agents/automatic/tools/__init__.pyapp/ai/voice/agents/automatic/tools/breeze/__init__.pyapp/ai/voice/agents/breeze_buddy/agent/flow.pyapp/ai/voice/agents/breeze_buddy/dispatch/reconcilers.pyapp/ai/voice/agents/breeze_buddy/dispatch/worker.pyapp/ai/voice/agents/breeze_buddy/managers/data_source_prefetch.pyapp/ai/voice/agents/breeze_buddy/template/loader.pyapp/ai/voice/agents/breeze_buddy/template/types.pyapp/api/routers/breeze_buddy/__init__.pyapp/api/routers/breeze_buddy/data_sources/__init__.pyapp/api/routers/breeze_buddy/data_sources/handlers.pyapp/api/routers/breeze_buddy/signup/handlers.pyapp/api/routers/breeze_buddy/templates/handlers.pyapp/database/accessor/breeze_buddy/data_source.pyapp/database/accessor/breeze_buddy/template.pyapp/database/decoder/breeze_buddy/data_source.pyapp/database/decoder/breeze_buddy/template.pyapp/database/migrations/026_create_data_source_table.sqlapp/database/migrations/027_add_data_sources_column_to_template.sqlapp/database/queries/breeze_buddy/data_source.pyapp/database/queries/breeze_buddy/template.pyapp/schemas/breeze_buddy/__init__.pyapp/schemas/breeze_buddy/data_source.pyapp/services/google/__init__.pyapp/services/google/sheets.pyapp/services/redis/client.pypyproject.tomltests/breeze_buddy/dispatch/conftest.pytests/breeze_buddy/dispatch/test_chaos_and_edges.pytests/breeze_buddy/dispatch/test_end_to_end.py
| ) | ||
|
|
||
| redis = await get_redis_service() | ||
| await redis.setex(cache_key, content, ttl_seconds=_CACHE_TTL) |
There was a problem hiding this comment.
Missing required namespace parameter in Redis setex call.
As per coding guidelines, all Redis operations must use the namespace parameter to prevent key collisions across services. The current call writes directly to the global key space.
🔧 Proposed fix
redis = await get_redis_service()
- await redis.setex(cache_key, content, ttl_seconds=_CACHE_TTL)
+ await redis.setex(cache_key, content, ttl_seconds=_CACHE_TTL, namespace="breeze_buddy")🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/ai/voice/agents/breeze_buddy/managers/data_source_prefetch.py` at line
49, The Redis setex call is missing the required namespace parameter; update the
call that uses redis.setex(cache_key, content, ttl_seconds=_CACHE_TTL) to
include the namespace argument (e.g.,
namespace=<appropriate_namespace_variable_or_constant>) so keys are scoped
properly, ensuring you pass the same namespace used elsewhere in this
module/service; adjust the call in data_source_prefetch.py where redis,
cache_key and _CACHE_TTL are referenced to include namespace.
Source: Coding guidelines
| name: str = Field( | ||
| description="Variable name used as {name} placeholder in template prompts" | ||
| ) |
There was a problem hiding this comment.
Validate name field for template variable compatibility.
The name field is used as a {name} placeholder in template prompts, but there's no validation ensuring it's a valid variable name. Names with spaces, special characters, or reserved words could break template substitution or cause confusion.
Consider adding a validator to enforce alphanumeric + underscore characters (e.g., ^[a-zA-Z_][a-zA-Z0-9_]*$).
🛡️ Proposed validation
+import re
+from pydantic import field_validator
+
class DataSourceRef(BaseModel):
"""
Reference to a data_source entity attached to a template.
data_source_id: FK to the data_source table
name: the {variable_name} placeholder (must be unique per template)
inject_as: how to land in LLM context
"""
data_source_id: str = Field(description="UUID of the data_source entity")
name: str = Field(
description="Variable name used as {name} placeholder in template prompts"
)
inject_as: Literal["var", "message"] = Field(
default="var",
description=(
'"var" — sheet content injected into template_vars as {name}. '
'"message" — prepended as a system message to the initial node.'
),
)
+
+ `@field_validator`("name")
+ `@classmethod`
+ def validate_name(cls, v: str) -> str:
+ if not re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', v):
+ raise ValueError(
+ "name must be a valid variable identifier (alphanumeric + underscore, cannot start with digit)"
+ )
+ return v🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/ai/voice/agents/breeze_buddy/template/types.py` around lines 2007 - 2009,
The name Field on the template variable lacks validation for being a safe
template identifier; add a Pydantic validator for the model that defines name
(the class that contains the name: str = Field(...)) to enforce a regex like
^[a-zA-Z_][a-zA-Z0-9_]*$ and raise a ValueError for invalid names so template
substitution won't break; reference the existing field name and add a
`@validator`("name") method (or pydantic root_validator if inside a
dataclass-style model) that checks the pattern and returns the cleaned value or
raises an error.
| inject_as: str = Field( | ||
| default="var", | ||
| description=( | ||
| '"var" — sheet content injected into template_vars as {name}. ' | ||
| '"message" — prepended as a system message to the initial node.' | ||
| ), | ||
| ) |
There was a problem hiding this comment.
Validate inject_as values using a Literal type.
The inject_as field defaults to "var" and the description mentions "var" or "message", but there's no type-level or runtime validation preventing arbitrary strings. Invalid values would break the content injection logic at runtime (see loader.py lines 95-157 and flow.py lines 128-159).
🔒 Proposed fix to enforce valid values
+from typing import Literal
+
class DataSourceRef(BaseModel):
"""
Reference to a data_source entity attached to a template.
data_source_id: FK to the data_source table
name: the {variable_name} placeholder (must be unique per template)
inject_as: how to land in LLM context
"""
data_source_id: str = Field(description="UUID of the data_source entity")
name: str = Field(
description="Variable name used as {name} placeholder in template prompts"
)
- inject_as: str = Field(
+ inject_as: Literal["var", "message"] = Field(
default="var",
description=(
'"var" — sheet content injected into template_vars as {name}. '
'"message" — prepended as a system message to the initial node.'
),
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| inject_as: str = Field( | |
| default="var", | |
| description=( | |
| '"var" — sheet content injected into template_vars as {name}. ' | |
| '"message" — prepended as a system message to the initial node.' | |
| ), | |
| ) | |
| from typing import Literal | |
| class DataSourceRef(BaseModel): | |
| """ | |
| Reference to a data_source entity attached to a template. | |
| data_source_id: FK to the data_source table | |
| name: the {variable_name} placeholder (must be unique per template) | |
| inject_as: how to land in LLM context | |
| """ | |
| data_source_id: str = Field(description="UUID of the data_source entity") | |
| name: str = Field( | |
| description="Variable name used as {name} placeholder in template prompts" | |
| ) | |
| inject_as: Literal["var", "message"] = Field( | |
| default="var", | |
| description=( | |
| '"var" — sheet content injected into template_vars as {name}. ' | |
| '"message" — prepended as a system message to the initial node.' | |
| ), | |
| ) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/ai/voice/agents/breeze_buddy/template/types.py` around lines 2010 - 2016,
Update the inject_as field to enforce only the allowed values by changing its
type annotation to a Literal of "var" and "message" (e.g., Literal["var",
"message"]) and ensure the typing import (from typing or typing_extensions) is
added; keep the default="var" and existing description, so Pydantic will
validate at model creation and prevent arbitrary strings from being accepted by
the inject_as Field used by the loader/flow logic (refer to the inject_as Field
declaration in types.py).
| data_sources: Optional[List["DataSourceRef"]] = Field( | ||
| None, | ||
| description="List of data source references attached to this template", | ||
| ) |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major | ⚡ Quick win
Consider validating name uniqueness across data source refs.
The DataSourceRef.name field description (line 2008) states that the name "must be unique per template", but there's no validation enforcing this constraint. Duplicate names would cause cache key collisions (see prefetch manager at datasource:{lead_id}:{ref.name}) or variable overwrites during template substitution.
♻️ Proposed validation
+from pydantic import model_validator
+
class TemplateModel(BaseModel):
# ... existing fields ...
data_sources: Optional[List["DataSourceRef"]] = Field(
None,
description="List of data source references attached to this template",
)
+
+ `@model_validator`(mode="after")
+ def validate_data_source_names_unique(self) -> "TemplateModel":
+ if self.data_sources:
+ names = [ref.name for ref in self.data_sources]
+ if len(names) != len(set(names)):
+ raise ValueError("data_sources: name must be unique per template")
+ return self🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/ai/voice/agents/breeze_buddy/template/types.py` around lines 2036 - 2039,
The data_sources list on the template lacks enforcement that each
DataSourceRef.name is unique, so add a Pydantic validator (either a
`@validator`("data_sources") or a `@root_validator` in the template model in
types.py) that collects DataSourceRef.name values from data_sources, detects
duplicates, and raises a ValidationError/ValueError with a clear message listing
the duplicate names; reference the DataSourceRef type and the data_sources field
so the check runs whenever a template is constructed or parsed.
| @router.get("/data-sources/sheets/tabs", response_model=TabsResponse) | ||
| async def get_sheet_tabs( | ||
| spreadsheet_url: str = Query(..., description="Full Google Sheets URL"), | ||
| current_user: UserInfo = Depends(get_current_user_with_rbac), | ||
| ): | ||
| """List all tab names in a Google Spreadsheet.""" | ||
| return await list_tabs_handler(spreadsheet_url) | ||
|
|
||
|
|
||
| @router.get("/data-sources/sheets/columns", response_model=ColumnsResponse) | ||
| async def get_sheet_columns( | ||
| spreadsheet_url: str = Query(..., description="Full Google Sheets URL"), | ||
| sheet_name: Optional[str] = Query( | ||
| None, description="Tab name (default: first tab)" | ||
| ), | ||
| current_user: UserInfo = Depends(get_current_user_with_rbac), | ||
| ): | ||
| """List column headers for a sheet tab.""" | ||
| return await list_columns_handler(spreadsheet_url, sheet_name) | ||
|
|
||
|
|
||
| @router.get("/data-sources/sheets/preview", response_model=PreviewResponse) | ||
| async def preview_sheet( | ||
| spreadsheet_url: str = Query(..., description="Full Google Sheets URL"), | ||
| sheet_name: Optional[str] = Query( | ||
| None, description="Tab name (default: first tab)" | ||
| ), | ||
| columns: Optional[List[str]] = Query(None, description="Columns to include"), | ||
| max_rows: int = Query(10, ge=1, le=100, description="Max rows to return"), | ||
| current_user: UserInfo = Depends(get_current_user_with_rbac), | ||
| ): | ||
| """Preview up to N rows from a sheet.""" | ||
| return await preview_handler(spreadsheet_url, sheet_name, columns, max_rows) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Search for rate limiting middleware or decorators
rg -n "rate_limit|RateLimit|Limiter" --type py
# Search for audit logging patterns in routers
rg -n "audit.*log|logger.*audit" app/api/routers/ --type pyRepository: juspay/clairvoyance
Length of output: 16496
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# 1) Inspect the router file around the discovery endpoints and the full file sections with handlers calls
FILE="app/api/routers/breeze_buddy/data_sources/__init__.py"
if [ -f "$FILE" ]; then
echo "---- $FILE (lines 1-220) ----"
sed -n '1,220p' "$FILE" | cat -n
else
echo "Missing $FILE"
fi
# 2) Find handler implementations
echo "---- searching for list_tabs_handler ----"
rg -n "list_tabs_handler" app/api/routers/breeze_buddy/data_sources -S || true
rg -n "list_tabs_handler" app -S -g'*.py'
echo "---- searching for list_columns_handler ----"
rg -n "list_columns_handler" app -S -g'*.py'
echo "---- searching for preview_handler ----"
rg -n "preview_handler" app -S -g'*.py'
# 3) Search for any audit / structured logging to Langfuse around routers (broader than previous regex)
echo "---- searching for audit / authorization / langfuse logging in routers ----"
rg -n "langfuse|Langfuse|audit|authorized|RBAC|reseller.*merchant|merchant.*reseller" app/api/routers/breeze_buddy -S -g'*.py'Repository: juspay/clairvoyance
Length of output: 41227
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="app/api/routers/breeze_buddy/data_sources/__init__.py"
if [ -f "$FILE" ]; then
echo "---- $FILE (lines 1-220) ----"
sed -n '1,220p' "$FILE" | cat -n
else
echo "Missing $FILE"
fi
echo "---- searching for list_tabs_handler ----"
rg -n "list_tabs_handler" app -S -g'*.py'
echo "---- searching for list_columns_handler ----"
rg -n "list_columns_handler" app -S -g'*.py'
echo "---- searching for preview_handler ----"
rg -n "preview_handler" app -S -g'*.py'
echo "---- searching for audit / langfuse logging and reseller/merchant RBAC checks in breeze_buddy routers ----"
rg -n "langfuse|Langfuse|audit|RBAC|reseller_id|merchant_id" app/api/routers/breeze_buddy -S -g'*.py'Repository: juspay/clairvoyance
Length of output: 50376
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="app/api/routers/breeze_buddy/data_sources/__init__.py"
if [ -f "$FILE" ]; then
echo "---- $FILE (lines 1-220) ----"
sed -n '1,220p' "$FILE" | cat -n
else
echo "Missing $FILE"
fi
echo "---- searching for list_tabs_handler ----"
rg -n "list_tabs_handler" app -S -g'*.py'
echo "---- searching for list_columns_handler ----"
rg -n "list_columns_handler" app -S -g'*.py'
echo "---- searching for preview_handler ----"
rg -n "preview_handler" app -S -g'*.py'
echo "---- searching for audit / langfuse logging and reseller/merchant RBAC checks in breeze_buddy routers ----"
rg -n "langfuse|Langfuse|audit|RBAC|reseller_id|merchant_id" app/api/routers/breeze_buddy -S -g'*.py'Repository: juspay/clairvoyance
Length of output: 50376
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Discovery endpoints: check whether any rate limiting / audit-style logging is applied in this router package
rg -n "rate_limit|check_rate_limit|Limiter|audit|langfuse|Langfuse" app/api/routers/breeze_buddy/data_sources -S -g'*.py' || true
# Locate and inspect handler implementations used by discovery endpoints
rg -n "list_tabs_handler|list_columns_handler|preview_handler" app/api/routers/breeze_buddy/data_sources -S -g'*.py' || true
# Print small relevant sections from any handlers module(s) in this package
for f in $(fd -e py -p "app/api/routers/breeze_buddy/data_sources" handlers -t f 2>/dev/null || true); do
echo "---- $f (tail/head around handler funcs) ----"
sed -n '1,240p' "$f" | cat -n
done
# Also search globally (scoped) for handler definitions if they live outside this package
rg -n "async def (list_tabs_handler|list_columns_handler|preview_handler)\b" app/api/routers/breeze_buddy -S -g'*.py' || trueRepository: juspay/clairvoyance
Length of output: 1275
🏁 Script executed:
#!/bin/bash
set -euo pipefail
FILE="app/api/routers/breeze_buddy/data_sources/handlers.py"
echo "---- ${FILE} (search rate/audit/logging) ----"
rg -n "rate_limit|check_rate_limit|audit|Langfuse|langfuse|logger|loguru" "$FILE" -S || true
echo "---- ${FILE} (functions bodies: list_tabs_handler/list_columns_handler/preview_handler) ----"
# Print just the relevant sections (avoid dumping entire file)
sed -n '180,340p' "$FILE" | cat -nRepository: juspay/clairvoyance
Length of output: 4251
🏁 Script executed:
#!/bin/bash
set -euo pipefail
# Find where the discovery helpers live and inspect whether they enforce any access checks / use shared SA
rg -n "def extract_spreadsheet_id|extract_spreadsheet_id\(" app -S -g'*.py'
rg -n "fetch_sheet_data\(|def fetch_sheet_data" app -S -g'*.py'
rg -n "list_tabs\(|def list_tabs" app -S -g'*.py'
rg -n "get_column_headers\(|def get_column_headers" app -S -g'*.py'
# Inspect the modules containing these functions (show small relevant sections)
for f in $(rg -l "extract_spreadsheet_id\(" app -g'*.py' || true); do
echo "---- file: $f (extract_spreadsheet_id usage) ----"
sed -n '1,220p' "$f" | cat -n
done
for f in $(rg -l "fetch_sheet_data\(" app -g'*.py' || true); do
echo "---- file: $f (fetch_sheet_data usage) ----"
sed -n '1,260p' "$f" | cat -n
done
for f in $(rg -l "async def list_tabs|def list_tabs" app -g'*.py' || true); do
echo "---- file: $f (list_tabs implementation) ----"
sed -n '1,260p' "$f" | cat -n
done
for f in $(rg -l "get_column_headers\(" app -g'*.py' || true); do
echo "---- file: $f (get_column_headers implementation) ----"
sed -n '1,260p' "$f" | cat -n
doneRepository: juspay/clairvoyance
Length of output: 50375
Enforce authorization + abuse controls for sheet discovery endpoints
/data-sources/sheets/tabs, /columns, and /preview require auth but the current_user is not used: the handlers only take spreadsheet_url and fetch tabs/headers/rows via the shared platform Google Sheets service account (app/services/google/sheets.py, read-only scope). This enables any authenticated user to query arbitrary spreadsheet structures/content for any spreadsheet the shared SA can view.
- Validate the provided
spreadsheet_url(spreadsheet_id) is tied to the caller’s allowed reseller/merchant (or restrict discovery to spreadsheets behind data_sources they can access) before hitting Google APIs. - Add rate limiting for these discovery routes.
- Add audit-style logging for discovery requests including user identity and the derived spreadsheet_id/sheet_name (not just Google API error logs).
🧰 Tools
🪛 Ruff (0.15.15)
[warning] 53-53: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
[warning] 65-65: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
[warning] 77-77: Do not perform function call Query in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
[warning] 79-79: Do not perform function call Depends in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable
(B008)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/api/routers/breeze_buddy/data_sources/__init__.py` around lines 50 - 82,
The three endpoints get_sheet_tabs, get_sheet_columns, and preview_sheet call
list_tabs_handler, list_columns_handler, and preview_handler without using
current_user; update these routes to validate the incoming spreadsheet_url
(extract spreadsheet_id) against the caller’s allowed reseller/merchant or
against the data_sources the user can access before calling the shared Google
Sheets service, and deny access if not authorized. Add rate limiting to these
handlers (e.g., decorator/middleware applied to
get_sheet_tabs/get_sheet_columns/preview_sheet) to prevent abuse. Add
audit-style logging prior to Google API calls that logs current_user identity
plus the derived spreadsheet_id and sheet_name (avoid logging sheet content),
and ensure Google API errors still log contextual audit fields. Ensure the
authorization check and audit logging occur inside or immediately before
invoking list_tabs_handler/list_columns_handler/preview_handler.
| source_type: str = Field( | ||
| default="google_sheet", description="Currently: 'google_sheet'" | ||
| ) | ||
| spreadsheet_url: str = Field(description="Full Google Sheets URL") | ||
| sheet_name: Optional[str] = Field( | ||
| None, description="Tab name. NULL = first tab in spreadsheet" | ||
| ) | ||
| columns: Optional[List[str]] = Field( | ||
| None, description="Columns to include. NULL = all columns" | ||
| ) | ||
| format: str = Field( | ||
| default="markdown_table", | ||
| description="Output format: 'markdown_table' | 'csv' | 'json'", | ||
| ) |
There was a problem hiding this comment.
Use enum types for source_type and format fields.
Both source_type and format are currently free-text strings with defaults, but their descriptions imply a fixed set of valid values. Free-text allows invalid values that would fail at runtime:
source_type: PR objectives mention a CHECK constraint in the DB for future extensibility (file,url,text). An enum provides type safety and self-documentation.format: The Google Sheets service expects specific format values (markdown_table,csv,json). Invalid formats would cause errors infetch_formatted().
🔒 Proposed enum types
+from enum import Enum
+
+class DataSourceType(str, Enum):
+ """Supported data source types."""
+ GOOGLE_SHEET = "google_sheet"
+ # Future: FILE = "file", URL = "url", TEXT = "text"
+
+class DataSourceFormat(str, Enum):
+ """Output formats for data sources."""
+ MARKDOWN_TABLE = "markdown_table"
+ CSV = "csv"
+ JSON = "json"
+
class DataSourceCreate(BaseModel):
"""Request body for POST /data-sources"""
reseller_id: str = Field(description="Reseller that owns this data source")
merchant_id: Optional[str] = Field(
None,
description="Scope to a specific merchant. NULL = all merchants of reseller",
)
name: str = Field(
description="Human-readable name; also becomes the {variable_name} placeholder"
)
- source_type: str = Field(
- default="google_sheet", description="Currently: 'google_sheet'"
+ source_type: DataSourceType = Field(
+ default=DataSourceType.GOOGLE_SHEET, description="Currently: 'google_sheet'"
)
spreadsheet_url: str = Field(description="Full Google Sheets URL")
sheet_name: Optional[str] = Field(
None, description="Tab name. NULL = first tab in spreadsheet"
)
columns: Optional[List[str]] = Field(
None, description="Columns to include. NULL = all columns"
)
- format: str = Field(
- default="markdown_table",
+ format: DataSourceFormat = Field(
+ default=DataSourceFormat.MARKDOWN_TABLE,
description="Output format: 'markdown_table' | 'csv' | 'json'",
)
is_active: bool = Field(default=True)Apply the same enum types to DataSourceUpdate and DataSourceResponse.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/schemas/breeze_buddy/data_source.py` around lines 22 - 35, Replace the
free-text fields with enum types: define enums (e.g., DataSourceType with values
like "google_sheet","file","url","text" and DataSourceFormat with
"markdown_table","csv","json") and change the source_type and format Field types
to those enums in this module; also update the related Pydantic models
DataSourceUpdate and DataSourceResponse to use the same enum types so validation
occurs at parse time and consumers like fetch_formatted() will only receive
valid values.
| spreadsheet_id: str, | ||
| sheet_name: Optional[str] = None, | ||
| columns: Optional[List[str]] = None, | ||
| format: str = "markdown_table", |
There was a problem hiding this comment.
Rename format parameter to avoid shadowing Python builtin.
The parameter name format shadows the built-in format() function, which can cause confusion and breaks the ability to call the builtin within this function's scope.
📝 Proposed fix
async def fetch_formatted(
spreadsheet_id: str,
sheet_name: Optional[str] = None,
columns: Optional[List[str]] = None,
- format: str = "markdown_table",
+ output_format: str = "markdown_table",
max_rows: int = 500,
) -> str:
"""
Fetch sheet data and return as a formatted string for LLM injection.
Returns "[No data available]" on any error or empty sheet.
"""
rows = await fetch_sheet_data(spreadsheet_id, sheet_name, columns, max_rows)
if not rows:
logger.warning(
f"No data fetched from spreadsheet={spreadsheet_id}, sheet={sheet_name}"
)
return "[No data available]"
headers = list(rows[0].keys()) if rows else []
- if format == "csv":
+ if output_format == "csv":
return _rows_to_csv(headers, rows)
- elif format == "json":
+ elif output_format == "json":
return _rows_to_json(rows)
else:
return _rows_to_markdown_table(headers, rows)🧰 Tools
🪛 Ruff (0.15.15)
[error] 245-245: Function argument format is shadowing a Python builtin
(A002)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@app/services/google/sheets.py` at line 245, The parameter named format in the
function signature shadows Python's built-in format(); rename it (e.g., to
output_format: str = "markdown_table") in the function declaration and update
every reference inside the function (and any callers) from format to
output_format, keeping the same default value and type annotation to preserve
behavior.
| "pipecat-ai[daily,google,assemblyai,silero,openai,azure,elevenlabs,aic,anthropic,deepgram,soniox,mcp,sarvam,cartesia]", | ||
| "pipecat-ai-flows", |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Verify uv.lock exists and CI uses uv sync instead of pip install
echo "=== Checking for uv.lock file ==="
if [ -f "uv.lock" ]; then
echo "✓ uv.lock found"
echo "Lock file size: $(wc -l < uv.lock) lines"
else
echo "✗ uv.lock NOT found - unpinned dependencies are not reproducible!"
fi
echo -e "\n=== Checking CI/CD for uv sync usage ==="
rg -n "uv sync|uv pip|pip install" .github/ .gitlab-ci.yml Dockerfile docker-compose.yml 2>/dev/null || echo "No CI config found in common locations"
echo -e "\n=== Checking pipecat-ai in lock file ==="
if [ -f "uv.lock" ]; then
rg -A2 "name = \"pipecat-ai\"" uv.lock | head -20
fiRepository: juspay/clairvoyance
Length of output: 1363
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== pyproject.toml: show relevant dependency/dev-dependency sections ==="
python - <<'PY'
import itertools, pathlib, re
p = pathlib.Path("pyproject.toml")
txt = p.read_text().splitlines()
for i,line in enumerate(txt, start=1):
if "dependencies" in line or "dev" in line or "optional-dependencies" in line:
if any(k in line for k in ["dependencies","optional-dependencies","dev-dependencies"]):
pass
# print around lines 1-120 for context
start=1; end=120
for i in range(start, min(end, len(txt))+1):
print(f"{i:4d}: {txt[i-1]}")
PY
echo
echo "=== Search repo for pytest usage/deps ==="
rg -n --hidden --no-ignore -S "pytest" pyproject.toml uv.lock .github/workflows .gitlab-ci.yml Dockerfile docker-compose.yml || true
echo
echo "=== Check CI workflows for running tests or pytest install ==="
rg -n --hidden --no-ignore -S "uv sync|pytest -|pytest\b|python -m pytest" .github/workflows || true
echo
echo "=== uv.lock: locate pipecat-ai-flows pinned version ==="
if [ -f "uv.lock" ]; then
# show a bit more context around the pipecat-ai-flows entry
rg -n -C 2 'name = "pipecat-ai-flows"' uv.lock | head -50
echo
# try to extract the version field near that block
python - <<'PY'
import pathlib, re
txt = pathlib.Path("uv.lock").read_text()
# heuristic: find the first block with name pipecat-ai-flows and capture subsequent 'version = "...'
m = re.search(r'name = "pipecat-ai-flows".{0,800}?version = "([^"]+)"', txt, flags=re.S)
print("pipecat-ai-flows pinned version:", m.group(1) if m else "NOT_FOUND")
PY
fiRepository: juspay/clairvoyance
Length of output: 3045
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== List likely test directories/files ==="
find . -maxdepth 4 -type f \( -name "test_*.py" -o -name "*_test.py" -o -name "*tests*.py" \) 2>/dev/null | head -200
echo
echo "=== Search for pytest invocation in workflows/scripts ==="
rg -n --hidden --no-ignore -S "pytest\b|python -m pytest|uv run .*pytest|poetry run .*pytest" .github/workflows . || true
echo
echo "=== Inspect .github/workflows/pr-build-check.yml ==="
sed -n '1,200p' .github/workflows/pr-build-check.yml
echo
echo "=== List all workflow ymls and grep for 'test' or 'run' steps ==="
ls -1 .github/workflows 2>/dev/null || true
rg -n "pytest|test\b|uv sync|uv run|tox|nox" .github/workflows || trueRepository: juspay/clairvoyance
Length of output: 7815
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Search uv.lock for pytest ==="
if [ -f uv.lock ]; then
rg -n 'name = "pytest"' uv.lock | head -50 || echo "No pytest package entry in uv.lock"
rg -n 'pytest' uv.lock | head -50 || true
else
echo "uv.lock missing"
fi
echo
echo "=== Locate ALL pipecat-ai-flows entries in uv.lock ==="
rg -n 'name = "pipecat-ai-flows"' uv.lock | head -50
echo
echo "=== Show surrounding package block(s) for pipecat-ai-flows ==="
# Print from the first matched line back a few lines to include [[package]] header
python - <<'PY'
import pathlib, re
txt = pathlib.Path("uv.lock").read_text().splitlines()
indices = [i for i,l in enumerate(txt) if l.strip()=='name = "pipecat-ai-flows"']
print("count:", len(indices))
for idx in indices[:10]:
start = max(0, idx-5)
end = min(len(txt), idx+15)
print("\n--- block excerpt around line", idx+1, "---")
for j in range(start,end):
print(f"{j+1:5d}: {txt[j]}")
PY
echo
echo "=== Inspect tests/test_tool_result_normalizer.py around the uv run pytest line ==="
sed -n '1,120p' tests/test_tool_result_normalizer.py
echo
echo "=== Search workflows/ scripts for pytest execution ==="
rg -n --hidden --no-ignore -S "uv run pytest|python -m pytest|pytest tests/|pytest\b" .github/workflows . || trueRepository: juspay/clairvoyance
Length of output: 8324
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Search for pytest dependency declaration ==="
rg -n "pytest" pyproject.toml uv.lock requirements*.txt .github/workflows .gitlab-ci.yml Dockerfile 2>/dev/null || true
echo
echo "=== Search for running tests in workflows (pytest/uv run) ==="
rg -n --hidden --no-ignore -S "uv run .*pytest|python -m pytest|pytest tests/|pytest\b" .github/workflows 2>/dev/null || trueRepository: juspay/clairvoyance
Length of output: 175
Fix dev testability: add pytest (and update uv.lock); uv.lock already covers reproducible pipecat-ai installs
pipecat-ai/pipecat-ai-flowsno longer pinned inpyproject.tomlis fine:uv.lockis present and CI/build usesuv sync(including--frozenin Dockerfile), so exact versions come from the lock.pytestis missing from[project.optional-dependencies].devinpyproject.tomland there is nopytestentry inuv.lock;.github/workflows/pr-build-check.ymldoesn’t run tests (only black/isort/autoflake/pyrefly). Sincetests/contains manyimport pytesttests, addpytestto dev deps (and regenerateuv.lock) or update CI to install/run tests accordingly.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pyproject.toml` around lines 15 - 16, Add pytest to the project's development
extras and update the lockfile: in pyproject.toml add "pytest" under
[project.optional-dependencies].dev (so dev installs include pytest), then
regenerate uv.lock (run uv sync/uv lock) so the lock contains a pytest entry;
ensure CI or Docker build that uses uv sync/--frozen continues to rely on
uv.lock for reproducible installs and/or update
.github/workflows/pr-build-check.yml to install and run tests if you want CI to
execute them.
| dev = [ | ||
| "black", | ||
| "isort", | ||
| "autoflake", | ||
| "pyrefly", | ||
| "pytest>=9.0.3", | ||
| "pytest-asyncio>=1.3.0", | ||
| "pyrefly" |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check if pytest is specified in uv.lock or other dependency sources
echo "=== Searching for pytest in dependency files ==="
rg -n "pytest" pyproject.toml uv.lock requirements*.txt 2>/dev/null || echo "pytest not found in common dependency files"
echo -e "\n=== Checking for alternative test runners ==="
rg -n "unittest|nose|tox" pyproject.toml
echo -e "\n=== Listing test files that require pytest ==="
fd -e py "^test_" tests/ | head -10Repository: juspay/clairvoyance
Length of output: 705
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== pyproject.toml dev optional-dependencies ==="
python3 - <<'PY'
import tomllib, pathlib
p = pathlib.Path("pyproject.toml")
data = tomllib.loads(p.read_text())
deps = data.get("project", {}).get("optional-dependencies", {}).get("dev", [])
print("dev deps:", deps)
PY
echo
echo "=== Any pytest usage in tests/ ==="
rg -n "import pytest|from pytest|pytest\.|pytest_asyncio" tests || true
echo
echo "=== Check dependency/lock files for pytest mentions ==="
for f in uv.lock requirements*.txt requirements*.in poetry.lock Pipfile* Pipfile.lock; do
if [ -f "$f" ]; then
echo "--- $f ---"
rg -n "pytest" "$f" || true
fi
doneRepository: juspay/clairvoyance
Length of output: 2852
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Any pytest_asyncio usage in tests/ ==="
rg -n "pytest_asyncio|pytest-asyncio|pytest\.mark\.asyncio" tests || true
echo
echo "=== Any pytest usage in conftest/roots outside tests/ (quick check) ==="
rg -n "import pytest|pytest\.fixture|pytest\.raises" -S . || trueRepository: juspay/clairvoyance
Length of output: 2767
🏁 Script executed:
#!/bin/bash
set -euo pipefail
echo "=== Any 'pytest' mentions in pyproject.toml (all sections) ==="
rg -n "pytest(-asyncio)?|pytest_asyncio" pyproject.toml || true
echo
echo "=== List optional-dependency groups in pyproject.toml ==="
python3 - <<'PY'
import tomllib, pathlib
data = tomllib.loads(pathlib.Path("pyproject.toml").read_text())
opt = data.get("project", {}).get("optional-dependencies", {})
print("optional dependency groups:", list(opt.keys()))
for k,v in opt.items():
if any("pytest" in s for s in v) or any("pytest" in s.lower() for s in v):
print("group", k, "contains pytest:", v)
PYRepository: juspay/clairvoyance
Length of output: 221
Add pytest back to [project.optional-dependencies].dev
pyproject.toml’s dev extras include only black, isort, autoflake, and pyrefly (no pytest), but the test suite imports and uses pytest (import pytest, pytest.fixture, pytest.raises, pytest.skip/xfail). uv.lock also contains no pytest entries, so pip install -e ".[dev]" won’t provide a working test runner. pytest-asyncio isn’t referenced by the tests, so it doesn’t need to be restored.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pyproject.toml` around lines 40 - 44, The dev extras in pyproject.toml are
missing pytest, so add "pytest" to the [project.optional-dependencies].dev list
(alongside "black", "isort", "autoflake", "pyrefly") to restore the test runner
used by the code (references: the dev extras block in pyproject.toml and tests
that import pytest); after updating the list, regenerate or update the
lock/install (e.g., run your lockfile tool or pip install -e ".[dev]") so pytest
is available for the test suite—do not re-add pytest-asyncio since tests don't
reference it.
Summary
Adds a Google Sheets data source feature to Breeze Buddy. Merchants attach a Google Sheet to a template; sheet content is fetched pre-call and injected into LLM context either as a
{variable}placeholder or as a prepended system message.What Changed
Database
026_create_data_source_table.sql— new standalonedata_sourceentity (reseller-scoped, reusable across templates)027_add_data_sources_column_to_template.sql—template.data_sources JSONBstores[{data_source_id, name, inject_as}]refsService Layer
app/services/google/sheets.py— Google Sheets API v4 wrapper;fetch_formatted(markdown_table/csv/json),list_tabs,get_column_headers,fetch_sheet_data; sync SDK wrapped inrun_in_executor; uses existingGOOGLE_CREDENTIALS_JSONSADB Layer (three-layer pattern)
queries/breeze_buddy/data_source.py— SQL builders for CRUD + paginated listdecoder/breeze_buddy/data_source.py— asyncpg Record →DataSourceResponseaccessor/breeze_buddy/data_source.py— business logic; extractsspreadsheet_idfrom URL at write timequeries/breeze_buddy/template.py—data_sourcesadded to all SELECT/INSERT/UPDATE queriesdecoder/breeze_buddy/template.py— parsesdata_sourcesJSONB →List[DataSourceRef]accessor/breeze_buddy/template.py— threadsdata_sources_jsonthrough create/replaceSchemas
schemas/breeze_buddy/data_source.py—DataSourceCreate,DataSourceUpdate,DataSourceResponse,DataSourceListResponse,TabsResponse,ColumnsResponse,PreviewResponsetemplate/types.py—DataSourceRefmodel;data_sourcesfield onTemplateModel,CreateTemplateRequest,ReplaceTemplateRequestAPI
routers/breeze_buddy/data_sources/— 8 endpoints:GET /data-sources/sheets/tabs|columns|preview— discovery (declared before/{id}to avoid FastAPI path conflict)POST/GET /data-sources— CRUDGET/PUT/DELETE /data-sources/{id}— single resourcereseller_idsRuntime — Pre-warm
managers/data_source_prefetch.py— fetches allDataSourceRefs concurrently, writes to Redis (datasource:{lead_id}:{name}, TTL=300s)dispatch/worker.py— wired intoasyncio.gatheralongside greeting TTS; both complete before dialRuntime — Call Time
template/loader.py— Layer 5 inload_template():_fetch_data_source_content: Redis hit → live fetch (800ms timeout) →"[Data unavailable]"fallbackinject_as="var"→template_vars[name](rendered as{name}in prompts)inject_as="message"→template.flow["_data_source_messages"]agent/flow.py—build_flow_configpropagates_data_source_messages;prepare_initial_nodeprepends totask_messagesInjection Modes
inject_as"var"(default){name}substituted inline in prompts"message"Extensibility
source_typeCHECK constraint designed for future sources (file,url,text). Adding a new type touches only: new migration + new service + dispatch switch in_fetch_data_source_content. All inject/cache/API layers reused unchanged.Not in Scope (Phase 2)
GOOGLE_CREDENTIALS_JSON)Summary by CodeRabbit
Release Notes
New Features
Chores