Skip to content

feat: implemented datasources for google sheet#829

Open
cmd-err wants to merge 2 commits into
juspay:releasefrom
cmd-err:feat/data-sources-integration
Open

feat: implemented datasources for google sheet#829
cmd-err wants to merge 2 commits into
juspay:releasefrom
cmd-err:feat/data-sources-integration

Conversation

@cmd-err

@cmd-err cmd-err commented Jun 15, 2026

Copy link
Copy Markdown
Contributor

Summary by CodeRabbit

Release Notes

New Features

  • Added data source management system: create, retrieve, update, and delete external data sources
  • Integrated Google Sheets discovery with preview functionality for sheet tabs, columns, and sample rows
  • Templates can now reference and attach data sources for automatic data injection into conversations
  • Implemented performance optimization through intelligent data caching during dispatch

Copilot AI review requested due to automatic review settings June 15, 2026 07:39
@coderabbitai

coderabbitai Bot commented Jun 15, 2026

Copy link
Copy Markdown

Review Change Stack

Important

Review skipped

Auto incremental reviews are disabled on this repository.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 1ebbacef-1baa-49b5-b370-37ac69b4352f

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Use the checkbox below for a quick retry:

  • 🔍 Trigger review

Walkthrough

This PR introduces a Data Sources feature for Breeze Buddy: a new data_source table and full REST CRUD API, a Google Sheets integration service, Redis-cached content injection into the FlowConfigLoader, background dispatch-time prefetching into Redis, and propagation of sheet content as LLM system messages in the agent flow.

Changes

Data Sources Feature — Breeze Buddy

Layer / File(s) Summary
DB migrations: data_source table and template.data_sources column
app/database/migrations/033_create_data_source_table.sql, app/database/migrations/034_add_data_sources_column_to_template.sql
Creates the data_source table with UUID PK, reseller/merchant scoping, Google Sheets metadata columns, format CHECK constraints, and partial unique indexes on name. Adds a nullable data_sources JSONB column to the template table.
Pydantic schemas, DataSourceRef type, and scope helper
app/ai/voice/agents/breeze_buddy/template/types.py, app/schemas/breeze_buddy/data_source.py, app/services/data_sources.py
Defines DataSourceRef with Python-identifier validation and uniqueness enforcement; extends TemplateModel, CreateTemplateRequest, and ReplaceTemplateRequest with data_sources; adds all REST request/response schemas; adds DATA_SOURCE_UNAVAILABLE constant and data_source_in_template_scope helper.
Google Sheets service
app/services/google/sheets.py
Adds service-account-authenticated async wrappers for listing tabs, fetching column headers, and retrieving sheet rows. fetch_formatted returns content as markdown_table, csv, or json, falling back to DATA_SOURCE_UNAVAILABLE on empty or error.
DB query builders, accessors, and decoders
app/database/queries/breeze_buddy/data_source.py, app/database/queries/breeze_buddy/template.py, app/database/accessor/breeze_buddy/data_source.py, app/database/accessor/breeze_buddy/template.py, app/database/decoder/breeze_buddy/data_source.py, app/database/decoder/breeze_buddy/template.py
Adds parameterized SQL query builders and full async CRUD accessors for the data_source table; updates template query builders, accessors, and decoder to select, insert, update, and deserialize the new data_sources column.
Data Sources REST API and template handler integration
app/api/routers/breeze_buddy/__init__.py, app/api/routers/breeze_buddy/data_sources/__init__.py, app/api/routers/breeze_buddy/data_sources/handlers.py, app/api/routers/breeze_buddy/templates/handlers.py
Registers data_sources_router; implements RBAC-protected CRUD endpoints and Google Sheets discovery endpoints (tabs, columns, preview); adds _validate_data_source_refs and wires data_sources serialization into create_template_handler and replace_template_handler.
FlowConfigLoader data-source injection
app/ai/voice/agents/breeze_buddy/template/loader.py
Extends load_template to accept lead_id; adds _fetch_data_source_content with Redis cache lookup, 800ms-timeout live Sheets fetch, scope validation, and fallback; adds a concurrent injection phase that writes content into template_vars or _data_source_messages.
Dispatch prefetch and agent flow system message injection
app/ai/voice/agents/breeze_buddy/managers/data_source_prefetch.py, app/ai/voice/agents/breeze_buddy/dispatch/worker.py, app/ai/voice/agents/breeze_buddy/agent/flow.py
Adds the prefetch_data_sources manager (per-ref DB fetch, scope check, 5s timeout, Redis setex with 300s TTL); wires background prefetch into the dispatch worker; updates flow.py to pass lead_id into load_template, propagate _data_source_messages through build_flow_config, and prepend them to the initial node's task_messages.

Sequence Diagram(s)

sequenceDiagram
    rect rgba(100, 150, 255, 0.5)
        Note over dispatch_worker,Redis: Dispatch time (background)
        dispatch_worker->>data_source_prefetch: create_task(prefetch_data_sources(lead_id, template))
        data_source_prefetch->>DataSourceAccessor: get_data_source_by_id(ref.data_source_id)
        DataSourceAccessor-->>data_source_prefetch: DataSourceResponse
        data_source_prefetch->>sheets_service: fetch_formatted(spreadsheet_id, ...) [5s timeout]
        sheets_service-->>data_source_prefetch: formatted content
        data_source_prefetch->>Redis: setex("ds:{lead_id}:{name}", 300, content)
    end

    rect rgba(100, 200, 150, 0.5)
        Note over FlowConfigLoader,agent_flow: Call setup time
        agent_flow->>FlowConfigLoader: load_template(lead_id=lead.id)
        FlowConfigLoader->>Redis: GET "ds:{lead_id}:{ref.name}"
        Redis-->>FlowConfigLoader: cached content (or miss → fetch_formatted [800ms timeout])
        FlowConfigLoader->>FlowConfigLoader: inject into template_vars or _data_source_messages
        FlowConfigLoader-->>agent_flow: template with _data_source_messages
        agent_flow->>agent_flow: build_flow_config → copies _data_source_messages
        agent_flow->>agent_flow: prepare_initial_node → prepends to task_messages
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

  • juspay/clairvoyance#706: Modifies load_template_config in the same agent/flow.py file, adjusting template loading and playground flow override behavior that this PR further extends with lead_id and data-source system messages.

Poem

🐇 A spreadsheet so vast, a rabbit must hop,
Through Redis and Sheets, never stopping to drop.
Each row becomes context, injected with flair,
System messages bloom in the LLM's lair.
The data prefetched while the call is still near—
🌿 Clairvoyant bunny brings knowledge so clear!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 77.14% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately describes the main feature: implementing data sources for Google Sheets integration across the codebase, including APIs, database schema, services, and agent flow modifications.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copilot was unable to review this pull request because the user who requested the review has reached their quota limit.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

🧹 Nitpick comments (6)
app/ai/voice/agents/breeze_buddy/template/loader.py (1)

295-322: ⚡ Quick win

Add strict=True to zip() for defensive iteration.

Both template_obj.data_sources and contents should always have the same length since contents is produced by asyncio.gather over the same list. Adding strict=True makes this contract explicit and will raise a ValueError if the lengths ever mismatch due to a bug, rather than silently dropping items.

♻️ Proposed fix
-            for ref, result in zip(template_obj.data_sources, contents):
+            for ref, result in zip(template_obj.data_sources, contents, strict=True):
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/ai/voice/agents/breeze_buddy/template/loader.py` around lines 295 - 322,
In the data source injection section, the zip() call that iterates over
template_obj.data_sources and contents should include strict=True as a parameter
to make the length contract explicit and prevent silent dropping of items if the
lists ever mismatch. Add strict=True to the zip() call in the for loop where ref
and result are unpacked.

Source: Linters/SAST tools

app/services/google/sheets.py (2)

72-90: 💤 Low value

Consider caching the authenticated session to reduce overhead.

Each call to list_tabs, get_column_headers, and fetch_sheet_data creates a new AuthorizedSession via _get_sheets_session(). Since the service account credentials don't change at runtime, a module-level cached session (with lazy initialization) would reduce credential parsing and object creation overhead on repeated calls.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/services/google/sheets.py` around lines 72 - 90, Implement module-level
caching for the authenticated session to avoid creating a new session on every
call. Create a module-level variable to store a cached AuthorizedSession
instance initialized to None, then modify the session retrieval logic (likely in
or around _get_sheets_session) to implement lazy initialization: check if the
cached session exists, and if not, create it once and store it; otherwise return
the cached session. Apply this cached session across all functions that
currently call _get_sheets_session() such as list_tabs, get_column_headers, and
fetch_sheet_data, ensuring they all reuse the same authenticated session
instance rather than creating new ones on each call.

217-244: ⚡ Quick win

Rename format parameter to avoid shadowing the Python builtin.

The format parameter shadows Python's built-in format() function. While this doesn't cause a bug here, it's poor practice and trips static analysis.

♻️ Proposed fix
 async def fetch_formatted(
     spreadsheet_id: str,
     sheet_name: Optional[str] = None,
     columns: Optional[List[str]] = None,
-    format: str = "markdown_table",
+    output_format: str = "markdown_table",
     max_rows: int = 500,
 ) -> str:
     """
     Fetch sheet data and return as a formatted string for LLM injection.

     Returns DATA_SOURCE_UNAVAILABLE on any error or empty sheet.
     """
     rows = await fetch_sheet_data(spreadsheet_id, sheet_name, columns, max_rows)
     if not rows:
         logger.warning(
             f"No data fetched from spreadsheet={spreadsheet_id}, sheet={sheet_name}"
         )
         return DATA_SOURCE_UNAVAILABLE

     headers = list(rows[0].keys()) if rows else []

-    if format == "csv":
+    if output_format == "csv":
         return _rows_to_csv(headers, rows)
-    elif format == "json":
+    elif output_format == "json":
         return _rows_to_json(rows)
     else:
         return _rows_to_markdown_table(headers, rows)

Note: Update callers (e.g., loader.py line 156) to use output_format= accordingly.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/services/google/sheets.py` around lines 217 - 244, The `format` parameter
in the `fetch_formatted` function shadows Python's built-in `format()` function,
which is poor practice and triggers static analysis warnings. Rename the
`format` parameter to `output_format` in the function signature, then update all
references to this parameter within the function body (the conditionals checking
`format == "csv"`, `format == "json"`, etc.) to use `output_format` instead.
Additionally, update all callers of the `fetch_formatted` function to pass the
parameter as `output_format=` instead of `format=`.

Source: Linters/SAST tools

app/ai/voice/agents/breeze_buddy/managers/data_source_prefetch.py (1)

22-23: ⚡ Quick win

Move prefetch TTL/timeout constants to static config.

These are environment-level tuning knobs and should be sourced from app/core/config/static.py instead of hardcoded file constants.

As per coding guidelines, “Load ALL configuration from app/core/config/static.py using get_required_env() for mandatory variables; never import directly from os.environ elsewhere.”

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/ai/voice/agents/breeze_buddy/managers/data_source_prefetch.py` around
lines 22 - 23, The constants _CACHE_TTL and _FETCH_TIMEOUT are hardcoded in the
data_source_prefetch.py file instead of being sourced from centralized
configuration. Move these constants to app/core/config/static.py using the
get_required_env() pattern (or appropriate config loading for tuning
parameters), then import and use those configuration values in
data_source_prefetch.py instead of the hardcoded definitions. This ensures all
environment-level configuration follows the standard pattern of loading from
app/core/config/static.py.

Source: Coding guidelines

app/api/routers/breeze_buddy/templates/handlers.py (1)

45-45: ⚡ Quick win

Tighten the helper type annotation for data source refs.

Use a parameterized container type instead of Optional[List] (for example Optional[List[DataSourceRef]]) so static checks enforce the expected ref shape.

As per coding guidelines, use Optional[T], List[T], Dict[str, Any], Union for type hints.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/api/routers/breeze_buddy/templates/handlers.py` at line 45, The
`data_sources` parameter on line 45 uses a loose `Optional[List]` type
annotation that does not specify the element type, which prevents static type
checkers from enforcing the expected shape. Replace `Optional[List]` with
`Optional[List[DataSourceRef]]` to explicitly specify that the list contains
DataSourceRef objects, enabling proper type checking and enforcement as per the
coding guidelines.

Source: Coding guidelines

app/ai/voice/agents/breeze_buddy/template/types.py (1)

2012-2013: ⚡ Quick win

Use UUID type for data_source_id at the schema boundary.

data_source_id is documented as UUID but typed as str, so malformed IDs can slip through request validation and fail later in DB/accessor paths.

Proposed change
+from uuid import UUID
...
 class DataSourceRef(BaseModel):
...
-    data_source_id: str = Field(description="UUID of the data_source entity")
+    data_source_id: UUID = Field(description="UUID of the data_source entity")
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/ai/voice/agents/breeze_buddy/template/types.py` around lines 2012 - 2013,
The data_source_id field in the schema is typed as str but documented as UUID,
which allows invalid ID formats to pass validation. Change the type annotation
of the data_source_id field from str to UUID to enforce proper validation at the
schema boundary, ensuring only valid UUIDs are accepted during request
validation rather than failing later in database or accessor operations.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@app/ai/voice/agents/breeze_buddy/agent/flow.py`:
- Around line 149-151: The `_data_source_messages` variable retrieved from the
untyped runtime data `template.flow` is not validated for its shape before being
assigned to `flow_config["_data_source_messages"]`. Since this data is later
used in a concatenation operation at line 207 (ds_messages + task_messages), it
must be a list of message dicts. Add validation after retrieving `ds_messages`
to ensure it is a list before propagating it to `flow_config`, otherwise the
code will fail when attempting to concatenate it with task_messages.

In `@app/ai/voice/agents/breeze_buddy/managers/data_source_prefetch.py`:
- Around line 63-70: The `redis.setex()` call in the prefetch operation does not
validate its return value before logging success. Since `RedisService.setex()`
returns `False` on Redis write failures, the current code logs a success message
even when the cache write failed, which can mask prefetch issues. Capture the
boolean return value from the `await redis.setex(cache_key, content,
ttl_seconds=_CACHE_TTL)` call and only proceed with the logger.info success
message if the operation returned `True`. If the operation returns `False`, log
an error message instead to properly reflect the failure and enable accurate
operational debugging.

In `@app/ai/voice/agents/breeze_buddy/template/loader.py`:
- Around line 99-127: The `redis.get(cache_key)` call in the
`_fetch_data_source_content` method is missing the `namespace` parameter
required by coding guidelines to prevent key collisions across services. Add the
`namespace` parameter to the `redis.get()` call to include it alongside the
cache_key argument. Additionally, verify that the prefetch manager in the
dependent layer (data_source_prefetch.py) uses the same namespace value when
writing to Redis to ensure consistency across both read and write operations.

In `@app/api/routers/breeze_buddy/data_sources/handlers.py`:
- Around line 226-283: The three handler functions `list_tabs_handler`,
`list_columns_handler`, and `preview_handler` lack tenant-scoped authorization
checks, allowing authenticated users to potentially access Google Sheets outside
their tenant. Add caller context (user/tenant information) as a parameter to
each of these handler functions, then enforce reseller-scoped authorization
checks before calling the underlying sheet access functions (`list_tabs`,
`get_column_headers`, and `fetch_sheet_data` respectively). Verify that the
spreadsheet belongs to the user's tenant or is accessible to them before
proceeding with the sheet data operations.

In `@app/database/queries/breeze_buddy/data_source.py`:
- Line 14: The parameter name `id` shadows Python's built-in `id` function,
triggering Ruff A002 linting errors. Rename all occurrences of the `id`
parameter to `data_source_id` in the function signatures and update all
corresponding call sites throughout the file. This applies to the parameter at
line 14 and all other affected locations (lines 23 and 105 as mentioned) where
`id` is used as a parameter name. Ensure both the function/method signatures and
any internal references to this parameter are updated consistently.
- Around line 116-126: The current `is not None` guards in the update builder
prevent clients from explicitly resetting nullable fields like `sheet_name` and
`columns` to NULL because there is no way to distinguish between "parameter was
not provided" and "parameter was explicitly set to None". Use a sentinel value
(e.g., a module-level `_UNSET` object) as the default for all optional
parameters instead of None, then check against that sentinel value in the
conditional guards (e.g., `if sheet_name is not _UNSET` instead of `if
sheet_name is not None`). This allows clients to pass None explicitly to clear a
field to NULL while still omitting the parameter to leave it unchanged.

---

Nitpick comments:
In `@app/ai/voice/agents/breeze_buddy/managers/data_source_prefetch.py`:
- Around line 22-23: The constants _CACHE_TTL and _FETCH_TIMEOUT are hardcoded
in the data_source_prefetch.py file instead of being sourced from centralized
configuration. Move these constants to app/core/config/static.py using the
get_required_env() pattern (or appropriate config loading for tuning
parameters), then import and use those configuration values in
data_source_prefetch.py instead of the hardcoded definitions. This ensures all
environment-level configuration follows the standard pattern of loading from
app/core/config/static.py.

In `@app/ai/voice/agents/breeze_buddy/template/loader.py`:
- Around line 295-322: In the data source injection section, the zip() call that
iterates over template_obj.data_sources and contents should include strict=True
as a parameter to make the length contract explicit and prevent silent dropping
of items if the lists ever mismatch. Add strict=True to the zip() call in the
for loop where ref and result are unpacked.

In `@app/ai/voice/agents/breeze_buddy/template/types.py`:
- Around line 2012-2013: The data_source_id field in the schema is typed as str
but documented as UUID, which allows invalid ID formats to pass validation.
Change the type annotation of the data_source_id field from str to UUID to
enforce proper validation at the schema boundary, ensuring only valid UUIDs are
accepted during request validation rather than failing later in database or
accessor operations.

In `@app/api/routers/breeze_buddy/templates/handlers.py`:
- Line 45: The `data_sources` parameter on line 45 uses a loose `Optional[List]`
type annotation that does not specify the element type, which prevents static
type checkers from enforcing the expected shape. Replace `Optional[List]` with
`Optional[List[DataSourceRef]]` to explicitly specify that the list contains
DataSourceRef objects, enabling proper type checking and enforcement as per the
coding guidelines.

In `@app/services/google/sheets.py`:
- Around line 72-90: Implement module-level caching for the authenticated
session to avoid creating a new session on every call. Create a module-level
variable to store a cached AuthorizedSession instance initialized to None, then
modify the session retrieval logic (likely in or around _get_sheets_session) to
implement lazy initialization: check if the cached session exists, and if not,
create it once and store it; otherwise return the cached session. Apply this
cached session across all functions that currently call _get_sheets_session()
such as list_tabs, get_column_headers, and fetch_sheet_data, ensuring they all
reuse the same authenticated session instance rather than creating new ones on
each call.
- Around line 217-244: The `format` parameter in the `fetch_formatted` function
shadows Python's built-in `format()` function, which is poor practice and
triggers static analysis warnings. Rename the `format` parameter to
`output_format` in the function signature, then update all references to this
parameter within the function body (the conditionals checking `format == "csv"`,
`format == "json"`, etc.) to use `output_format` instead. Additionally, update
all callers of the `fetch_formatted` function to pass the parameter as
`output_format=` instead of `format=`.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 3f3ba705-d8c2-414a-b20f-00c678b7eeed

📥 Commits

Reviewing files that changed from the base of the PR and between 7dfe6a6 and b01b12a.

📒 Files selected for processing (21)
  • app/ai/voice/agents/breeze_buddy/agent/flow.py
  • app/ai/voice/agents/breeze_buddy/dispatch/worker.py
  • app/ai/voice/agents/breeze_buddy/managers/data_source_prefetch.py
  • app/ai/voice/agents/breeze_buddy/template/loader.py
  • app/ai/voice/agents/breeze_buddy/template/types.py
  • app/api/routers/breeze_buddy/__init__.py
  • app/api/routers/breeze_buddy/data_sources/__init__.py
  • app/api/routers/breeze_buddy/data_sources/handlers.py
  • app/api/routers/breeze_buddy/templates/handlers.py
  • app/database/accessor/breeze_buddy/data_source.py
  • app/database/accessor/breeze_buddy/template.py
  • app/database/decoder/breeze_buddy/data_source.py
  • app/database/decoder/breeze_buddy/template.py
  • app/database/migrations/033_create_data_source_table.sql
  • app/database/migrations/034_add_data_sources_column_to_template.sql
  • app/database/queries/breeze_buddy/data_source.py
  • app/database/queries/breeze_buddy/template.py
  • app/schemas/breeze_buddy/data_source.py
  • app/services/data_sources.py
  • app/services/google/__init__.py
  • app/services/google/sheets.py

Comment on lines +149 to +151
ds_messages = template.flow.get("_data_source_messages")
if ds_messages:
flow_config["_data_source_messages"] = ds_messages

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Validate _data_source_messages shape before propagating it.

template.flow is untyped runtime data. If _data_source_messages is not a list of message dicts, it will reach Line 207 and raise at ds_messages + task_messages, failing node preparation.

💡 Suggested patch
-    ds_messages = template.flow.get("_data_source_messages")
-    if ds_messages:
-        flow_config["_data_source_messages"] = ds_messages
+    raw_ds_messages = template.flow.get("_data_source_messages")
+    if isinstance(raw_ds_messages, list):
+        ds_messages = [
+            msg
+            for msg in raw_ds_messages
+            if isinstance(msg, dict)
+            and msg.get("role") == "system"
+            and isinstance(msg.get("content"), str)
+        ]
+        if ds_messages:
+            flow_config["_data_source_messages"] = ds_messages
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/ai/voice/agents/breeze_buddy/agent/flow.py` around lines 149 - 151, The
`_data_source_messages` variable retrieved from the untyped runtime data
`template.flow` is not validated for its shape before being assigned to
`flow_config["_data_source_messages"]`. Since this data is later used in a
concatenation operation at line 207 (ds_messages + task_messages), it must be a
list of message dicts. Add validation after retrieving `ds_messages` to ensure
it is a list before propagating it to `flow_config`, otherwise the code will
fail when attempting to concatenate it with task_messages.

Comment on lines +63 to +70
await redis.setex(cache_key, content, ttl_seconds=_CACHE_TTL)
logger.info(
"Prefetched data source '%s' for lead=%s (%d chars, TTL=%ds)",
ref.name,
lead_id,
len(content),
_CACHE_TTL,
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Check Redis setex outcome before logging prefetch success.

RedisService.setex() returns False on Redis write failures; the current path logs success even when the cache write did not persist. This can mask prefetch misses and mislead operational debugging.

💡 Suggested patch
         redis = await get_redis_service()
-        await redis.setex(cache_key, content, ttl_seconds=_CACHE_TTL)
+        written = await redis.setex(cache_key, content, ttl_seconds=_CACHE_TTL)
+        if not written:
+            logger.warning(
+                "Prefetch cache write failed for data source '%s', lead=%s",
+                ref.name,
+                lead_id,
+            )
+            return
         logger.info(
             "Prefetched data source '%s' for lead=%s (%d chars, TTL=%ds)",
             ref.name,
             lead_id,
             len(content),
             _CACHE_TTL,
         )
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/ai/voice/agents/breeze_buddy/managers/data_source_prefetch.py` around
lines 63 - 70, The `redis.setex()` call in the prefetch operation does not
validate its return value before logging success. Since `RedisService.setex()`
returns `False` on Redis write failures, the current code logs a success message
even when the cache write failed, which can mask prefetch issues. Capture the
boolean return value from the `await redis.setex(cache_key, content,
ttl_seconds=_CACHE_TTL)` call and only proceed with the logger.info success
message if the operation returned `True`. If the operation returns `False`, log
an error message instead to properly reflect the failure and enable accurate
operational debugging.

Comment on lines +99 to +127
async def _fetch_data_source_content(
self,
lead_id: Optional[str],
ref: DataSourceRef,
template_obj: TemplateModel,
) -> str:
"""
Fetch formatted sheet content for a DataSourceRef.

Priority:
1. Redis cache (key ``datasource:{lead_id}:{ref.name}``, pre-warmed by prefetch manager)
2. Live Google Sheets fetch with 800 ms timeout
3. Fallback: DATA_SOURCE_UNAVAILABLE
"""
# 1. Redis cache check (requires lead_id)
if lead_id:
cache_key = f"datasource:{lead_id}:{ref.name}"
try:
redis = await get_redis_service()
cached = await redis.get(cache_key)
if cached:
logger.info(
"Data source cache hit: lead=%s name=%s", lead_id, ref.name
)
return cached
except Exception as exc:
logger.warning(
"Redis cache check failed for datasource '%s': %s", ref.name, exc
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Missing namespace parameter in Redis get() call.

The coding guidelines require using the namespace parameter in redis_get/redis_set calls to prevent key collisions across services. The cache key datasource:{lead_id}:{ref.name} should use a namespace.

🐛 Proposed fix
         if lead_id:
             cache_key = f"datasource:{lead_id}:{ref.name}"
             try:
                 redis = await get_redis_service()
-                cached = await redis.get(cache_key)
+                cached = await redis.get(cache_key, namespace="breeze_buddy")
                 if cached:
                     logger.info(
                         "Data source cache hit: lead=%s name=%s", lead_id, ref.name
                     )
                     return cached

Also ensure the prefetch manager (in the dependent layer data_source_prefetch.py) uses the same namespace when writing to Redis.

Based on coding guidelines: "Always use namespace parameter in redis_get/redis_set calls to prevent key collisions across services"

🧰 Tools
🪛 Ruff (0.15.15)

[warning] 124-124: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/ai/voice/agents/breeze_buddy/template/loader.py` around lines 99 - 127,
The `redis.get(cache_key)` call in the `_fetch_data_source_content` method is
missing the `namespace` parameter required by coding guidelines to prevent key
collisions across services. Add the `namespace` parameter to the `redis.get()`
call to include it alongside the cache_key argument. Additionally, verify that
the prefetch manager in the dependent layer (data_source_prefetch.py) uses the
same namespace value when writing to Redis to ensure consistency across both
read and write operations.

Source: Coding guidelines

Comment on lines +226 to +283
async def list_tabs_handler(spreadsheet_url: str) -> TabsResponse:
"""List tabs in a Google Sheet."""
spreadsheet_id = extract_spreadsheet_id(spreadsheet_url)
if not spreadsheet_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid Google Sheets URL",
)

tabs = await list_tabs(spreadsheet_id)
return TabsResponse(spreadsheet_id=spreadsheet_id, tabs=tabs)


async def list_columns_handler(
spreadsheet_url: str, sheet_name: Optional[str] = None
) -> ColumnsResponse:
"""List column headers for a sheet tab."""
spreadsheet_id = extract_spreadsheet_id(spreadsheet_url)
if not spreadsheet_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid Google Sheets URL",
)

columns = await get_column_headers(spreadsheet_id, sheet_name)
return ColumnsResponse(
spreadsheet_id=spreadsheet_id,
sheet_name=sheet_name or "",
columns=columns,
)


async def preview_handler(
spreadsheet_url: str,
sheet_name: Optional[str] = None,
columns: Optional[List[str]] = None,
max_rows: int = 10,
) -> PreviewResponse:
"""Preview sheet data (first N rows)."""
spreadsheet_id = extract_spreadsheet_id(spreadsheet_url)
if not spreadsheet_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid Google Sheets URL",
)

rows = await fetch_sheet_data(
spreadsheet_id, sheet_name, columns, max_rows=max_rows
)
col_names = list(rows[0].keys()) if rows else []

return PreviewResponse(
spreadsheet_id=spreadsheet_id,
sheet_name=sheet_name,
columns=col_names,
rows=rows,
total_rows=len(rows),
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Enforce tenant-scoped authorization on discovery handlers.

list_tabs_handler, list_columns_handler, and preview_handler fetch Google Sheet content from user-supplied URLs without any reseller/tenant scope check. With shared backend Google credentials, this can let an authenticated user read sheets outside their tenant if they know a valid URL.

Please require caller context in these handlers and enforce reseller-scoped authorization (or temporarily admin-only access) before performing sheet reads.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/api/routers/breeze_buddy/data_sources/handlers.py` around lines 226 -
283, The three handler functions `list_tabs_handler`, `list_columns_handler`,
and `preview_handler` lack tenant-scoped authorization checks, allowing
authenticated users to potentially access Google Sheets outside their tenant.
Add caller context (user/tenant information) as a parameter to each of these
handler functions, then enforce reseller-scoped authorization checks before
calling the underlying sheet access functions (`list_tabs`,
`get_column_headers`, and `fetch_sheet_data` respectively). Verify that the
spreadsheet belongs to the user's tenant or is accessible to them before
proceeding with the sheet data operations.



def insert_data_source_query(
id: str,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Rename parameters that shadow Python builtins (id, format).

These names trigger Ruff A002 and will keep lint noisy/blocking depending on CI policy. Prefer data_source_id / output_format in signatures and call sites.
Based on learnings from static analysis, Ruff is already flagging these exact lines.

Also applies to: 23-23, 105-105

🧰 Tools
🪛 Ruff (0.15.15)

[error] 14-14: Function argument id is shadowing a Python builtin

(A002)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/database/queries/breeze_buddy/data_source.py` at line 14, The parameter
name `id` shadows Python's built-in `id` function, triggering Ruff A002 linting
errors. Rename all occurrences of the `id` parameter to `data_source_id` in the
function signatures and update all corresponding call sites throughout the file.
This applies to the parameter at line 14 and all other affected locations (lines
23 and 105 as mentioned) where `id` is used as a parameter name. Ensure both the
function/method signatures and any internal references to this parameter are
updated consistently.

Source: Linters/SAST tools

Comment on lines +116 to +126
if name is not None:
_add("name", name)
if spreadsheet_url is not None:
_add("spreadsheet_url", spreadsheet_url)
if spreadsheet_id is not None:
_add("spreadsheet_id", spreadsheet_id)
if sheet_name is not None:
_add("sheet_name", sheet_name)
if columns_json is not None:
_add("columns", columns_json, "::jsonb")
if format is not None:

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Update builder cannot clear nullable fields to NULL.

The current is not None guards mean clients cannot reset sheet_name/columns back to NULL (even when explicitly requested), which breaks the API contract for reverting to “first tab/all columns” behavior.

Suggested direction
 def update_data_source_query(
     data_source_id: str,
     name: Optional[str],
     spreadsheet_url: Optional[str],
     spreadsheet_id: Optional[str],
     sheet_name: Optional[str],
     columns_json: Optional[str],
     format: Optional[str],
     is_active: Optional[bool],
     now: datetime,
+    fields_to_update: set[str],
 ) -> Tuple[str, List[Any]]:
 ...
-    if sheet_name is not None:
+    if "sheet_name" in fields_to_update:
         _add("sheet_name", sheet_name)
-    if columns_json is not None:
+    if "columns" in fields_to_update:
         _add("columns", columns_json, "::jsonb")
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@app/database/queries/breeze_buddy/data_source.py` around lines 116 - 126, The
current `is not None` guards in the update builder prevent clients from
explicitly resetting nullable fields like `sheet_name` and `columns` to NULL
because there is no way to distinguish between "parameter was not provided" and
"parameter was explicitly set to None". Use a sentinel value (e.g., a
module-level `_UNSET` object) as the default for all optional parameters instead
of None, then check against that sentinel value in the conditional guards (e.g.,
`if sheet_name is not _UNSET` instead of `if sheet_name is not None`). This
allows clients to pass None explicitly to clear a field to NULL while still
omitting the parameter to leave it unchanged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants