Skip to content

Conversation

@rexa0310
Copy link
Contributor

@rexa0310 rexa0310 commented Nov 24, 2025

Summary by CodeRabbit

  • New Features

    • Analytics pipeline management with CRUD API endpoints for creating, listing, retrieving, updating, and deleting pipeline events
    • Support for scheduled executions (recurring cron and one-time epoch) and pipeline execution runner for custom analytics scripts
    • Raw analytics data collection model and helper functions to add, fetch, and mark processed data from scripts
  • Chore

    • Default chat server base URL injected into client configuration

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Nov 24, 2025

Walkthrough

Adds an analytics pipeline feature: API endpoints, scheduling and execution handlers, data models for pipeline config and raw data, a restricted script runner with analytics utilities, and integration with the event/task system and pyscript execution context.

Changes

Cohort / File(s) Summary
API & Router Registration
kairon/api/app/main.py, kairon/api/app/routers/bot/analytics.py
Registers analytics router at /api/bot/{bot}/pipeline_analytics; implements CRUD endpoints for analytics pipeline events with authentication scopes.
Client Config tweak
kairon/api/app/routers/bot/bot.py
Injects config["config"]["chat_server_base_url"] = "http://localhost:5000/" into returned client config.
Persistent Models: Scheduler & Pipeline
kairon/shared/actions/data_objects.py
Adds SchedulerConfiguration (cron/epoch/timezone validation) and AnalyticsPipelineConfig (pipeline metadata, unique bot+pipeline index).
Persistent Models: Raw Data Collection
kairon/shared/cognition/data_objects.py
Adds AnalyticsCollectionData model for storing raw analytics records with normalization and validation.
Pydantic API Models
kairon/shared/data/data_models.py
Adds AnalyticsSchedulerConfig and AnalyticsPipelineEventRequest request/response models.
Pipeline Processor
kairon/shared/analytics/analytics_pipeline_processor.py
New processor class with CRUD, retrieval, scheduler handling, timezone-aware epoch logic, and pipeline code lookup.
Event Handler
kairon/events/definitions/analytic_pipeline_handler.py, kairon/events/definitions/factory.py
New AnalyticsPipelineEvent handler (validate/execute/schedule/update/delete); registers event class in factory.
Script Runner & Actor Factory
kairon/shared/concurrency/actors/analytics_runner.py, kairon/shared/concurrency/actors/factory.py
New AnalyticsRunner executing restricted pyscripts with analytics utilities; factory imports/registers actor type.
Pyscript Utilities & Execution Context
kairon/shared/pyscript/callback_pyscript_utils.py, kairon/async_callback/utils.py
Adds analytics helpers (add_analyitcs_raw_data, get_analyitcs_raw_data, processed_analyitcs_raw_data) and exposes them as predefined objects in script execution partials.
Data Processor Imports
kairon/shared/data/processor.py
Imports AnalyticsPipelineConfig, PipelineCreate, and PipelineUpdate into processor imports.
Constants
kairon/shared/constants.py
Adds EventClass.analytics_pipeline and ActorType.analytics_runner enum members.

Sequence Diagram(s)

sequenceDiagram
    actor User
    participant API as Analytics API
    participant Handler as AnalyticsPipelineEvent
    participant Processor as AnalyticsPipelineProcessor
    participant EventSrv as Event Server
    participant Queue as Task Queue
    participant Runner as AnalyticsRunner

    User->>API: POST /api/bot/{bot}/pipeline_analytics/events
    API->>API: Authenticate (TESTER_ACCESS)
    API->>Processor: add_scheduled_task(config)
    Processor-->>API: event_id

    alt immediate / async
        API->>Queue: enqueue immediate task (event_id)
    else cron / recurring
        API->>EventSrv: subscribe cron (expr, timezone, event_id)
    else one-time
        API->>EventSrv: schedule run_at (timestamp, timezone, event_id)
    end

    rect rgb(230, 245, 255)
    Note over Queue,Runner: Execution phase (triggered by scheduler/event)
    Queue->>Handler: execute(event_id)
    Handler->>Processor: retrieve_config(event_id)
    Handler->>Processor: get_pipeline_code(callback_name)
    Handler->>Runner: execute(pyscript, predefined_objects)
    Runner-->>Handler: results
    Handler->>Processor: log results / update status
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

  • Files requiring extra attention:
    • kairon/events/definitions/analytic_pipeline_handler.py — scheduling paths, timezone and cron/epoch handling, error/cleanup logic.
    • kairon/shared/analytics/analytics_pipeline_processor.py — persistence, uniqueness checks, and exception mapping.
    • kairon/shared/concurrency/actors/analytics_runner.py and pyscript integrations — sandboxing, predefined object exposure, and result sanitization.
    • Integration points: event server notifications and actor/task enqueueing.

Possibly related PRs

Suggested reviewers

  • hiteshghuge
  • sfahad1414
  • sushantpatade

"🐰
I hop through code and splice a stream,
Schedules, scripts, a data dream,
Pipelines hum and runners play,
Events are logged, then hop away,
Joyful bytes in analytic gleam!"

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 35.48% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title accurately summarizes the main change: adding analytics pipeline event functionality and its associated handlers across multiple files including the API router, event definitions, and related processors.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 10

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
kairon/api/app/routers/bot/bot.py (1)

1205-1211: Avoid hardcoding chat_server_base_url to localhost

Forcing chat_server_base_url to "http://localhost:5000/" will break non-local environments and overrides any configured/chat-server URL. This is environment-specific and should not be baked into the API response.

Consider reverting this line and, if a default is really needed, deriving it from configuration or request context instead.

 async def get_client_config(
         current_user: User = Security(Authentication.get_current_user_and_bot, scopes=TESTER_ACCESS)):
     config = mongo_processor.get_chat_client_config(current_user.get_bot(), current_user.email)
-    config = config.to_mongo().to_dict()
-    config["config"]["chat_server_base_url"] = "http://localhost:5000/"
-    return Response(data=config['config'])
+    config = config.to_mongo().to_dict()
+    return Response(data=config["config"])
kairon/shared/concurrency/actors/factory.py (1)

6-18: analytics_runner is wired to the wrong actor implementation

The registry entry for ActorType.analytics_runner starts a CallableRunner instead of an AnalyticsRunner, so calls requesting the analytics runner will be executed by the wrong actor.

 from kairon.exceptions import AppException
-from .analytics_runner import AnalyticsRunner
 from .callable_runner import CallableRunner
 from ..actors.pyscript_runner import PyScriptRunner
 from kairon.shared.constants import ActorType
+from .analytics_runner import AnalyticsRunner
 
 class ActorFactory:
     __actors = {
         ActorType.pyscript_runner.value: (PyScriptRunner, PyScriptRunner.start().proxy()),
-        ActorType.callable_runner.value: (CallableRunner, CallableRunner.start().proxy()),
-        ActorType.analytics_runner.value: (AnalyticsRunner, CallableRunner.start().proxy()),
+        ActorType.callable_runner.value: (CallableRunner, CallableRunner.start().proxy()),
+        ActorType.analytics_runner.value: (AnalyticsRunner, AnalyticsRunner.start().proxy()),
     }
kairon/shared/pyscript/callback_pyscript_utils.py (1)

327-339: Incorrect and non‑domain exception in save_as_pdf

The error path currently raises a generic Exception with an unrelated message "encryption failed-...", which looks like a copy‑paste error and will surface poorly to callers.

Consider aligning with the rest of this class and raising AppException with a relevant message, while preserving the original traceback:

-        except Exception as e:
-            raise Exception(f"encryption failed-{str(e)}")
+        except Exception as e:
+            logger.exception(e)
+            raise AppException(f"Failed to save PDF report: {e}") from e
🧹 Nitpick comments (3)
kairon/shared/cognition/data_objects.py (1)

127-150: AnalyticsCollectionData model looks good; a couple of small optional tweaks

The model and its validation/clean logic are consistent with the rest of this module. Two small, optional refinements you may consider:

  • Give data a default empty dict to avoid None handling at call sites, if appropriate for your use cases:
    data = DictField(default=dict)
  • If you expect (bot, collection_name) pairs to be unique (one collection per bot), add a compound unique index similar to others in the codebase.

These are non-blocking; current implementation is functionally fine.

kairon/shared/data/data_models.py (1)

1496-1508: Tighten types/defaults for analytics event request (optional)

The models work as-is, but a couple of tweaks would make them safer and more self-documenting:

  • Make scheduler_config explicitly optional and use Pydantic-friendly list defaults:
-class AnalyticsSchedulerConfig(BaseModel):
-    expression_type: str
-    schedule: Any
-    timezone: Optional[str] = "UTC"
+class AnalyticsSchedulerConfig(BaseModel):
+    expression_type: str  # e.g. "cron" | "epoch"
+    schedule: Any
+    timezone: Optional[str] = "UTC"
@@
-class AnalyticsPipelineEventRequest(BaseModel):
-    pipeline_name: str
-    callback_name: str
-    scheduler_config: AnalyticsSchedulerConfig = None
-    timestamp: str
-    data_deletion_policy: Optional[List[Any]] = []
-    triggers: Optional[List[Dict[str, Any]]] = []
+class AnalyticsPipelineEventRequest(BaseModel):
+    pipeline_name: str
+    callback_name: str
+    scheduler_config: Optional[AnalyticsSchedulerConfig] = None
+    timestamp: str
+    data_deletion_policy: List[Any] = []
+    triggers: List[Dict[str, Any]] = []

If you want to go further, consider restricting expression_type to a Literal["cron", "epoch"] or an Enum for better validation.

kairon/shared/concurrency/actors/analytics_runner.py (1)

60-69: Consider reusing existing cleanup helper to avoid divergence

__perform_cleanup is nearly the same as CallbackScriptUtility.perform_cleanup but without Response handling. To keep behavior consistent across runners and reduce duplication, consider delegating to the shared helper or moving cleanup to a common utility.

For example:

-    def __perform_cleanup(self, local_vars: Dict):
-        filtered_locals = {}
-        for key, value in local_vars.items():
-            if not isinstance(value, Callable) and not isinstance(value, ModuleType):
-                if isinstance(value, datetime):
-                    value = value.strftime("%m/%d/%Y, %H:%M:%S")
-                elif isinstance(value, date):
-                    value = value.strftime("%Y-%m-%d")
-                filtered_locals[key] = value
-        return filtered_locals
+    def __perform_cleanup(self, local_vars: Dict):
+        # Reuse existing logic to keep behavior aligned with other runners
+        from kairon.shared.pyscript.callback_pyscript_utils import CallbackScriptUtility
+        return CallbackScriptUtility.perform_cleanup(local_vars)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between dd7ddc4 and 2bae455.

📒 Files selected for processing (14)
  • kairon/api/app/main.py (2 hunks)
  • kairon/api/app/routers/bot/analytics.py (1 hunks)
  • kairon/api/app/routers/bot/bot.py (1 hunks)
  • kairon/async_callback/utils.py (1 hunks)
  • kairon/events/definitions/analytic_pipeline_handler.py (1 hunks)
  • kairon/shared/actions/data_objects.py (2 hunks)
  • kairon/shared/analytics/analytics_pipeline_processor.py (1 hunks)
  • kairon/shared/cognition/data_objects.py (1 hunks)
  • kairon/shared/concurrency/actors/analytics_runner.py (1 hunks)
  • kairon/shared/concurrency/actors/factory.py (2 hunks)
  • kairon/shared/constants.py (2 hunks)
  • kairon/shared/data/data_models.py (1 hunks)
  • kairon/shared/data/processor.py (3 hunks)
  • kairon/shared/pyscript/callback_pyscript_utils.py (1 hunks)
🧰 Additional context used
🧬 Code graph analysis (7)
kairon/shared/concurrency/actors/factory.py (3)
kairon/shared/concurrency/actors/analytics_runner.py (1)
  • AnalyticsRunner (20-69)
kairon/shared/constants.py (1)
  • ActorType (191-194)
kairon/shared/concurrency/actors/callable_runner.py (1)
  • CallableRunner (14-25)
kairon/shared/data/data_models.py (1)
kairon/shared/nlu/featurizer/lm_models/base.py (1)
  • BaseModel (7-27)
kairon/shared/data/processor.py (1)
kairon/shared/actions/data_objects.py (2)
  • ParallelActionConfig (1043-1056)
  • AnalyticsPipelineConfig (1101-1121)
kairon/shared/actions/data_objects.py (2)
kairon/shared/chat/broadcast/data_objects.py (1)
  • SchedulerConfiguration (13-59)
kairon/shared/chat/models.py (1)
  • SchedulerConfiguration (34-85)
kairon/shared/pyscript/callback_pyscript_utils.py (2)
kairon/shared/cognition/data_objects.py (1)
  • AnalyticsCollectionData (127-150)
kairon/exceptions.py (1)
  • AppException (1-3)
kairon/shared/cognition/data_objects.py (1)
kairon/shared/data/audit/data_objects.py (1)
  • Auditlog (24-77)
kairon/api/app/routers/bot/analytics.py (6)
kairon/events/definitions/analytic_pipeline_handler.py (3)
  • AnalyticsPipelineEvent (17-247)
  • validate (27-31)
  • delete_schedule (176-188)
kairon/shared/analytics/analytics_pipeline_processor.py (3)
  • AnalyticsPipelineProcessor (18-163)
  • get_all_analytics_pipelines (81-82)
  • get_analytics_pipeline (74-78)
kairon/shared/auth.py (2)
  • Authentication (30-465)
  • get_current_user_and_bot (83-109)
kairon/shared/constants.py (1)
  • EventRequestType (99-104)
kairon/shared/data/audit/data_objects.py (1)
  • delete (69-72)
kairon/shared/chat/cache/least_priority.py (1)
  • put (30-43)
🪛 Ruff (0.14.5)
kairon/shared/concurrency/actors/analytics_runner.py

51-51: Use of exec detected

(S102)


53-53: Consider moving this statement to an else block

(TRY300)


54-54: Do not catch blind exception: Exception

(BLE001)


56-56: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


56-56: Avoid specifying long messages outside the exception class

(TRY003)

kairon/shared/actions/data_objects.py

1056-1056: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


1078-1078: Avoid specifying long messages outside the exception class

(TRY003)


1082-1082: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


1082-1082: Avoid specifying long messages outside the exception class

(TRY003)


1086-1086: Avoid specifying long messages outside the exception class

(TRY003)


1095-1097: Avoid specifying long messages outside the exception class

(TRY003)


1112-1117: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


1119-1119: Unused method argument: clean

(ARG002)

kairon/shared/analytics/analytics_pipeline_processor.py

40-40: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


40-40: Avoid specifying long messages outside the exception class

(TRY003)


48-48: Consider moving this statement to an else block

(TRY300)


50-50: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


50-50: Avoid specifying long messages outside the exception class

(TRY003)


65-65: Unused static method argument: bot

(ARG004)


65-65: Unused static method argument: user

(ARG004)


65-65: Unused static method argument: config

(ARG004)


101-101: Avoid specifying long messages outside the exception class

(TRY003)


118-118: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


118-118: Avoid specifying long messages outside the exception class

(TRY003)


134-134: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


134-134: Avoid specifying long messages outside the exception class

(TRY003)


135-135: Do not catch blind exception: Exception

(BLE001)


137-137: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


161-161: Avoid specifying long messages outside the exception class

(TRY003)

kairon/shared/pyscript/callback_pyscript_utils.py

338-338: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


338-338: Create your own exception

(TRY002)


338-338: Avoid specifying long messages outside the exception class

(TRY003)


338-338: Use explicit conversion flag

Replace with conversion flag

(RUF010)


341-341: PEP 484 prohibits implicit Optional

Convert to T | None

(RUF013)


343-343: Create your own exception

(TRY002)


343-343: Avoid specifying long messages outside the exception class

(TRY003)


347-347: Undefined name AnalyticsCollectionData

(F821)


371-371: PEP 484 prohibits implicit Optional

Convert to T | None

(RUF013)


373-373: Create your own exception

(TRY002)


373-373: Avoid specifying long messages outside the exception class

(TRY003)


379-379: Undefined name AnalyticsCollectionData

(F821)


394-394: PEP 484 prohibits implicit Optional

Convert to T | None

(RUF013)


396-396: Create your own exception

(TRY002)


396-396: Avoid specifying long messages outside the exception class

(TRY003)


398-398: Undefined name AnalyticsCollectionData

(F821)


401-401: Avoid specifying long messages outside the exception class

(TRY003)

kairon/events/definitions/analytic_pipeline_handler.py

31-31: Avoid specifying long messages outside the exception class

(TRY003)


33-33: Unused method argument: kwargs

(ARG002)


83-83: Do not catch blind exception: Exception

(BLE001)


108-108: Consider moving this statement to an else block

(TRY300)


113-113: Use raise without specifying exception name

Remove exception name

(TRY201)


121-121: Avoid specifying long messages outside the exception class

(TRY003)


135-135: Consider moving this statement to an else block

(TRY300)


137-137: Do not catch blind exception: Exception

(BLE001)


141-141: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


167-167: Consider moving this statement to an else block

(TRY300)


169-169: Do not catch blind exception: Exception

(BLE001)


173-173: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


181-181: Abstract raise to an inner function

(TRY301)


181-181: Avoid specifying long messages outside the exception class

(TRY003)


188-188: Use raise without specifying exception name

Remove exception name

(TRY201)


198-198: Abstract raise to an inner function

(TRY301)


198-198: Avoid specifying long messages outside the exception class

(TRY003)


247-247: Use raise without specifying exception name

Remove exception name

(TRY201)

kairon/shared/cognition/data_objects.py

136-136: Mutable class attributes should be annotated with typing.ClassVar

(RUF012)


143-143: Avoid specifying long messages outside the exception class

(TRY003)


146-146: Avoid specifying long messages outside the exception class

(TRY003)

kairon/api/app/routers/bot/analytics.py

20-20: Do not perform function call Security in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


40-40: Do not perform function call Security in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


49-49: Do not perform function call Security in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


53-53: Create your own exception

(TRY002)


60-60: Do not perform function call Security in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


74-74: Do not perform function call Security in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: Analyze (python)
  • GitHub Check: Python CI
🔇 Additional comments (5)
kairon/shared/constants.py (1)

73-91: Analytics-related enum additions are consistent

Adding EventClass.analytics_pipeline and ActorType.analytics_runner is consistent with the existing enum patterns and cleanly separates analytics concerns.

Also applies to: 191-194

kairon/api/app/main.py (1)

26-28: Analytics router registration is consistent with existing routing

Importing analytics and mounting it under /api/bot/{bot}/pipeline_analytics with the "Analytics" tag aligns with the existing router structure (bot-scoped, tagged). No issues spotted here.

Also applies to: 263-280

kairon/shared/data/processor.py (3)

62-92: AnalyticsPipelineConfig import looks consistent with existing action config models

Adding AnalyticsPipelineConfig alongside other action-related documents is consistent and does not introduce any obvious issues.


141-141: PipelineCreate/PipelineUpdate import is structurally fine

Importing PipelineCreate and PipelineUpdate from local data_models follows existing patterns in this module; no concerns here.


9324-9367: sanitize_query_filter still correctly returns validated parameters

The function continues to return the sanitized dict assembled above; behavior is intact and coherent with the validation logic.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

♻️ Duplicate comments (4)
kairon/shared/concurrency/actors/analytics_runner.py (1)

31-33: Analytics helpers reference non-existent methods—will raise AttributeError.

CallbackScriptUtility defines add_analyitcs_raw_data, get_analyitcs_raw_data, and processed_analyitcs_raw_data, but you're binding to add_data_analytics, get_data_analytics, and mark_as_processed, which do not exist.

Apply this diff:

-        safe_globals['add_analytics_raw_data'] = partial(CallbackScriptUtility.add_data_analytics, bot=bot)
-        safe_globals['get_analytics_raw_data'] = partial(CallbackScriptUtility.get_data_analytics, bot=bot)
-        safe_globals['processed_analytics_raw_data'] = partial(CallbackScriptUtility.mark_as_processed, bot=bot)
+        safe_globals['add_analytics_raw_data'] = partial(CallbackScriptUtility.add_analyitcs_raw_data, bot=bot)
+        safe_globals['get_analytics_raw_data'] = partial(CallbackScriptUtility.get_analyitcs_raw_data, bot=bot)
+        safe_globals['processed_analytics_raw_data'] = partial(CallbackScriptUtility.processed_analyitcs_raw_data, bot=bot)
kairon/shared/pyscript/callback_pyscript_utils.py (2)

394-411: Use AppException for consistency.

Both error checks raise bare Exception, but the rest of this module uses AppException for domain errors, which ensures proper API error responses.

Apply this diff:

     @staticmethod
-    def processed_analyitcs_raw_data(user: str, collection_name: str, bot: str = None):
+    def processed_analyitcs_raw_data(user: str, collection_name: str, bot: str | None = None):
         if not bot:
-            raise Exception("Missing bot id")
+            raise AppException("Missing bot id")
 
         queryset = AnalyticsCollectionData.objects(bot=bot, collection_name=collection_name)
 
         if not queryset:
-            raise Exception("No records found for given bot and collection_name")
+            raise AppException("No records found for given bot and collection_name")

341-369: Use AppException for consistency and fix type hints.

The method raises bare Exception for missing bot, but the rest of this module uses AppException for domain errors. Additionally, the type hint for bot should be Text | None per PEP 484.

Apply this diff:

     @staticmethod
-    def get_analyitcs_raw_data(collection_name: str, bot: Text = None):
+    def get_analyitcs_raw_data(collection_name: str, bot: Text | None = None):
         if not bot:
-            raise Exception("Missing bot id")
+            raise AppException("Missing bot id")
kairon/shared/analytics/analytics_pipeline_processor.py (1)

18-25: Confirm: Missing name field causes runtime KeyError.

The review comment is verified as correct. AnalyticsPipelineEventRequest lacks a name field (it contains only pipeline_name, callback_name, scheduler_config, timestamp, data_deletion_policy, triggers), and AnalyticsPipelineConfig schema also has no name field, with a unique constraint on ["bot", "pipeline_name"] instead.

The current code at line 21-22 attempts Utility.is_exist(..., name=config['name'], ...) which will raise KeyError when the API endpoint (kairon/api/app/routers/bot/analytics.py) passes an AnalyticsPipelineEventRequest converted to a dict.

Fix: Use pipeline_name for the uniqueness check. Update lines 21-22:

-        Utility.is_exist(AnalyticsPipelineConfig, f"Schedule with name '{config['pipeline_name']}' exists!", bot=bot,
-                         name=config['name'], status=True)
+        Utility.is_exist(AnalyticsPipelineConfig, f"Schedule with name '{config['pipeline_name']}' exists!", bot=bot,
+                         pipeline_name=config['pipeline_name'], status=True)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2bae455 and b330e29.

📒 Files selected for processing (8)
  • kairon/api/app/routers/bot/analytics.py (1 hunks)
  • kairon/async_callback/utils.py (1 hunks)
  • kairon/events/definitions/analytic_pipeline_handler.py (1 hunks)
  • kairon/events/definitions/factory.py (2 hunks)
  • kairon/shared/analytics/analytics_pipeline_processor.py (1 hunks)
  • kairon/shared/concurrency/actors/analytics_runner.py (1 hunks)
  • kairon/shared/concurrency/actors/factory.py (2 hunks)
  • kairon/shared/pyscript/callback_pyscript_utils.py (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • kairon/shared/concurrency/actors/factory.py
🧰 Additional context used
🧬 Code graph analysis (6)
kairon/events/definitions/factory.py (2)
kairon/events/definitions/analytic_pipeline_handler.py (1)
  • AnalyticsPipelineEvent (18-218)
kairon/shared/constants.py (1)
  • EventClass (73-90)
kairon/shared/analytics/analytics_pipeline_processor.py (6)
kairon/shared/utils.py (1)
  • Utility (93-2358)
kairon/exceptions.py (1)
  • AppException (1-3)
kairon/shared/actions/data_objects.py (1)
  • AnalyticsPipelineConfig (1101-1121)
kairon/shared/callback/data_objects.py (1)
  • CallbackConfig (89-237)
kairon/shared/data/data_models.py (1)
  • AnalyticsSchedulerConfig (1496-1499)
tests/unit_test/utility_test.py (1)
  • bot (3357-3358)
kairon/async_callback/utils.py (1)
kairon/shared/pyscript/callback_pyscript_utils.py (3)
  • add_analyitcs_raw_data (372-392)
  • get_analyitcs_raw_data (342-369)
  • processed_analyitcs_raw_data (395-411)
kairon/events/definitions/analytic_pipeline_handler.py (5)
kairon/exceptions.py (1)
  • AppException (1-3)
kairon/shared/analytics/analytics_pipeline_processor.py (6)
  • get_analytics_pipeline (52-56)
  • retrieve_config (41-48)
  • get_pipeline_code (130-153)
  • delete_task (28-38)
  • add_scheduled_task (19-25)
  • update_scheduled_task (86-127)
kairon/shared/concurrency/actors/analytics_runner.py (2)
  • AnalyticsRunner (17-62)
  • execute (19-47)
kairon/shared/constants.py (1)
  • EventClass (73-90)
kairon/shared/data/constant.py (1)
  • EVENT_STATUS (93-111)
kairon/api/app/routers/bot/analytics.py (5)
kairon/shared/data/data_models.py (1)
  • AnalyticsPipelineEventRequest (1502-1508)
kairon/events/definitions/analytic_pipeline_handler.py (2)
  • AnalyticsPipelineEvent (18-218)
  • validate (28-32)
kairon/shared/analytics/analytics_pipeline_processor.py (2)
  • AnalyticsPipelineProcessor (16-153)
  • get_all_analytics_pipelines (59-72)
kairon/shared/auth.py (2)
  • Authentication (30-465)
  • get_current_user_and_bot (83-109)
kairon/shared/constants.py (1)
  • EventRequestType (99-104)
kairon/shared/pyscript/callback_pyscript_utils.py (1)
kairon/shared/cognition/data_objects.py (1)
  • AnalyticsCollectionData (127-150)
🪛 Ruff (0.14.5)
kairon/shared/analytics/analytics_pipeline_processor.py

38-38: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


38-38: Avoid specifying long messages outside the exception class

(TRY003)


46-46: Consider moving this statement to an else block

(TRY300)


48-48: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


48-48: Avoid specifying long messages outside the exception class

(TRY003)


91-91: Avoid specifying long messages outside the exception class

(TRY003)


108-108: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


108-108: Avoid specifying long messages outside the exception class

(TRY003)


124-124: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


124-124: Avoid specifying long messages outside the exception class

(TRY003)


125-125: Do not catch blind exception: Exception

(BLE001)


127-127: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


151-151: Avoid specifying long messages outside the exception class

(TRY003)

kairon/events/definitions/analytic_pipeline_handler.py

32-32: Avoid specifying long messages outside the exception class

(TRY003)


34-34: Unused method argument: kwargs

(ARG002)


67-67: Local variable output is assigned to but never used

Remove assignment to unused variable output

(F841)


70-70: Do not catch blind exception: Exception

(BLE001)


93-93: Avoid specifying long messages outside the exception class

(TRY003)


107-107: Consider moving this statement to an else block

(TRY300)


109-109: Do not catch blind exception: Exception

(BLE001)


113-113: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


139-139: Consider moving this statement to an else block

(TRY300)


141-141: Do not catch blind exception: Exception

(BLE001)


145-145: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


153-153: Abstract raise to an inner function

(TRY301)


153-153: Avoid specifying long messages outside the exception class

(TRY003)


160-160: Use raise without specifying exception name

Remove exception name

(TRY201)


170-170: Abstract raise to an inner function

(TRY301)


170-170: Avoid specifying long messages outside the exception class

(TRY003)


218-218: Use raise without specifying exception name

Remove exception name

(TRY201)

kairon/shared/concurrency/actors/analytics_runner.py

19-19: Unused method argument: kwargs

(ARG002)


41-41: Use of exec detected

(S102)


45-45: Do not catch blind exception: Exception

(BLE001)


47-47: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


47-47: Avoid specifying long messages outside the exception class

(TRY003)

kairon/api/app/routers/bot/analytics.py

20-20: Do not perform function call Security in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


38-38: Do not perform function call Security in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


47-47: Do not perform function call Security in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


56-56: Do not perform function call Security in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


67-67: Do not perform function call Security in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

kairon/shared/pyscript/callback_pyscript_utils.py

339-339: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


339-339: Create your own exception

(TRY002)


339-339: Avoid specifying long messages outside the exception class

(TRY003)


339-339: Use explicit conversion flag

Replace with conversion flag

(RUF010)


342-342: PEP 484 prohibits implicit Optional

Convert to T | None

(RUF013)


344-344: Create your own exception

(TRY002)


344-344: Avoid specifying long messages outside the exception class

(TRY003)


372-372: PEP 484 prohibits implicit Optional

Convert to T | None

(RUF013)


374-374: Create your own exception

(TRY002)


374-374: Avoid specifying long messages outside the exception class

(TRY003)


395-395: PEP 484 prohibits implicit Optional

Convert to T | None

(RUF013)


397-397: Create your own exception

(TRY002)


397-397: Avoid specifying long messages outside the exception class

(TRY003)


402-402: Create your own exception

(TRY002)


402-402: Avoid specifying long messages outside the exception class

(TRY003)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Codacy Static Code Analysis
  • GitHub Check: Python CI
  • GitHub Check: Analyze (python)
🔇 Additional comments (17)
kairon/events/definitions/factory.py (1)

2-2: LGTM!

The import and event factory registration are correct. The mapping properly wires EventClass.analytics_pipeline to AnalyticsPipelineEvent, enabling analytics pipeline events to be instantiated via the factory.

Also applies to: 32-33

kairon/async_callback/utils.py (1)

85-87: LGTM!

The analytics utility bindings correctly reference the implemented methods on CallbackScriptUtility (add_analyitcs_raw_data, get_analyitcs_raw_data, processed_analyitcs_raw_data), making them available in the script execution environment with the bot context properly bound via partial.

kairon/shared/concurrency/actors/analytics_runner.py (1)

19-25: LGTM on the defensive copy pattern.

The code correctly copies _safe_globals before adding bot-specific partials, preventing cross-request pollution in concurrent scenarios.

kairon/api/app/routers/bot/analytics.py (5)

17-33: LGTM!

The create endpoint properly validates the pipeline, derives the event type from scheduler config, and enqueues the event. The conditional logic correctly maps cron/epoch schedules to the appropriate event types.


36-41: LGTM!

The list endpoint delegates to the processor's get_all_analytics_pipelines, which handles formatting.


44-50: LGTM!

The get endpoint now correctly uses retrieve_config with event_id and bot, aligning with the id-based semantics used by delete/update/execute paths.


53-60: LGTM!

The delete endpoint correctly instantiates the event handler and delegates to delete_analytics_event.


63-75: LGTM!

The update endpoint enqueues an update-schedule request with the provided config and event_id.

kairon/shared/analytics/analytics_pipeline_processor.py (4)

51-56: LGTM on query methods.

The get/list/delete methods correctly query AnalyticsPipelineConfig, format responses, and handle not-found cases with appropriate exceptions.

Also applies to: 58-72, 75-82


85-127: LGTM on update logic.

The update method properly validates scheduler config, handles epoch-to-datetime conversion with timezone awareness, and saves all relevant fields.


129-153: LGTM on callback code retrieval.

The method correctly fetches pyscript code from CallbackConfig by callback name and raises AppException if not found.


40-48: No changes needed—return value is correct.

All callers of AnalyticsPipelineProcessor.retrieve_config() expect and use a single dict, not a tuple:

  • analytic_pipeline_handler.py:44-46 accesses config["pipeline_name"] and config["callback_name"]
  • analytic_pipeline_handler.py:172 stores result as dict
  • kairon/api/app/routers/bot/analytics.py:49 returns result in a Response object

The tuple unpacking pattern (config, reference_id) seen elsewhere is from a different private __retrieve_config() method in message_broadcast.py, not this one.

kairon/events/definitions/analytic_pipeline_handler.py (5)

23-32: LGTM on initialization and validation.

The constructor properly initializes bot/user, and validate checks pipeline existence and active status before allowing operations.


88-113: LGTM on cron schedule creation.

The method validates config, creates the scheduled task, notifies the event server with the correct event class and cron parameters, and performs cleanup on error.


116-145: LGTM on one-time schedule creation.

The method handles epoch-to-datetime conversion with timezone awareness, creates the task, and correctly uses run_at.isoformat() for the event server.


148-160: LGTM on event deletion.

The method validates existence, deletes from the event server, and removes the task from storage.


163-218: LGTM on schedule updates.

The method correctly uses EventClass.analytics_pipeline for both cron and epoch updates (past issue resolved), handles timezone-aware epoch conversion, and performs rollback on error.

Comment on lines +34 to +86
def execute(self, event_id: Text, **kwargs):
"""
Worker executes this method when event server triggers it.
"""
config = None
reference_id = None
status = EVENT_STATUS.INITIATED.value
exception = None

try:
config = AnalyticsPipelineProcessor.retrieve_config(event_id, self.bot)

pipeline_name = config["pipeline_name"]
callback_name = config["callback_name"]

logger.info(f"Executing analytics pipeline: {pipeline_name}")

source_code = AnalyticsPipelineProcessor.get_pipeline_code(
bot=self.bot,
callback_name=callback_name
)

predefined_objects = {
"config": config,
"bot": self.bot,
"user": self.user,
"pipeline_name": pipeline_name,
"callback_name": callback_name,
"event_id": event_id,
"slot": {"bot": self.bot},
}

runner = AnalyticsRunner()
output = runner.execute(source_code, predefined_objects=predefined_objects)
status = EVENT_STATUS.COMPLETED.value

except Exception as e:
logger.exception(e)
status = EVENT_STATUS.FAIL.value
exception = str(e)

finally:
AnalyticsPipelineProcessor.add_event_log(
bot=self.bot,
event_id=event_id,
reference_id=reference_id,
status=status,
exception=exception,
config=config
)
if config and config.get("scheduler_config", {}).get("expression_type") != "cron":
AnalyticsPipelineProcessor.delete_task(event_id, self.bot)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Missing add_event_log method will cause AttributeError.

Line 76 calls AnalyticsPipelineProcessor.add_event_log(...), but this method is not defined in the processor class. This will raise AttributeError in the finally block after every execution.

You need to add add_event_log to AnalyticsPipelineProcessor. Would you like me to generate a stub implementation?

Additionally, note that the output variable on Line 67 is assigned but never used—consider removing it or logging it if needed.

🧰 Tools
🪛 Ruff (0.14.5)

34-34: Unused method argument: kwargs

(ARG002)


67-67: Local variable output is assigned to but never used

Remove assignment to unused variable output

(F841)


70-70: Do not catch blind exception: Exception

(BLE001)


predefined_objects = predefined_objects or {}
local_vars = predefined_objects.copy()
safe_globals = _safe_globals.copy()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid mutating the shared _safe_globals.

_safe_globals is module-level. Since this actor can be reused concurrently, bot-specific partials may leak between runs. Copy it first.

The code already does .copy() on line 23, so this is correct. Let me approve instead.

Actually, looking more carefully, the code DOES copy: safe_globals = _safe_globals.copy() on line 23. So this is fine.

🤖 Prompt for AI Agents
In kairon/shared/concurrency/actors/analytics_runner.py around line 23, the code
correctly avoids mutating the module-level _safe_globals by assigning
safe_globals = _safe_globals.copy(); keep this copy behavior as-is to prevent
cross-run leakage of bot-specific partials and do not modify _safe_globals
in-place.

return media_id
except Exception as e:
raise Exception(f"encryption failed-{str(e)}")
raise Exception(f"encryption failed-{str(e)}")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix the error message.

The exception message says "encryption failed" but this method is save_as_pdf, which deals with PDF generation, not encryption.

Apply this diff:

-            raise Exception(f"encryption failed-{str(e)}")
+            raise Exception(f"PDF generation failed-{str(e)}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
raise Exception(f"encryption failed-{str(e)}")
raise Exception(f"PDF generation failed-{str(e)}")
🧰 Tools
🪛 Ruff (0.14.5)

339-339: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


339-339: Create your own exception

(TRY002)


339-339: Avoid specifying long messages outside the exception class

(TRY003)


339-339: Use explicit conversion flag

Replace with conversion flag

(RUF010)

🤖 Prompt for AI Agents
In kairon/shared/pyscript/callback_pyscript_utils.py around line 339, the raised
exception incorrectly says "encryption failed" even though the method is
save_as_pdf; change the message to accurately reflect the operation and include
the original exception details (e.g., "save_as_pdf failed - {e}") so the error
indicates PDF generation failed and preserves the underlying error for
debugging.

Comment on lines +371 to +392
@staticmethod
def add_analyitcs_raw_data(user: str, payload: dict, bot: str = None):
if not bot:
raise Exception("Missing bot id")

collection_name = payload.get("collection_name", None)
data = payload.get('data')
source = payload.get('source')
received_at = payload.get('received_at')
collection_obj = AnalyticsCollectionData()
collection_obj.bot = bot
collection_obj.data = data
collection_obj.collection_name = collection_name
collection_obj.source = source
collection_obj.received_at = received_at
collection_obj.user = user
collection_id = collection_obj.save().to_mongo().to_dict()["_id"].__str__()

return {
"message": "Record saved!",
"data": {"_id": collection_id}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Use AppException and preserve model defaults for received_at.

Two issues:

  1. Bare Exception instead of AppException for consistency
  2. Explicitly assigning received_at = None overrides the model's default=datetime.utcnow, breaking the default timestamp

Apply this diff:

     @staticmethod
-    def add_analyitcs_raw_data(user: str, payload: dict, bot: str = None):
+    def add_analyitcs_raw_data(user: str, payload: dict, bot: str | None = None):
         if not bot:
-            raise Exception("Missing bot id")
+            raise AppException("Missing bot id")
 
         collection_name = payload.get("collection_name", None)
         data = payload.get('data')
         source = payload.get('source')
-        received_at = payload.get('received_at')
         collection_obj = AnalyticsCollectionData()
         collection_obj.bot = bot
         collection_obj.data = data
         collection_obj.collection_name = collection_name
         collection_obj.source = source
-        collection_obj.received_at = received_at
+        if 'received_at' in payload:
+            collection_obj.received_at = payload['received_at']
         collection_obj.user = user
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@staticmethod
def add_analyitcs_raw_data(user: str, payload: dict, bot: str = None):
if not bot:
raise Exception("Missing bot id")
collection_name = payload.get("collection_name", None)
data = payload.get('data')
source = payload.get('source')
received_at = payload.get('received_at')
collection_obj = AnalyticsCollectionData()
collection_obj.bot = bot
collection_obj.data = data
collection_obj.collection_name = collection_name
collection_obj.source = source
collection_obj.received_at = received_at
collection_obj.user = user
collection_id = collection_obj.save().to_mongo().to_dict()["_id"].__str__()
return {
"message": "Record saved!",
"data": {"_id": collection_id}
}
@staticmethod
def add_analyitcs_raw_data(user: str, payload: dict, bot: str | None = None):
if not bot:
raise AppException("Missing bot id")
collection_name = payload.get("collection_name", None)
data = payload.get('data')
source = payload.get('source')
collection_obj = AnalyticsCollectionData()
collection_obj.bot = bot
collection_obj.data = data
collection_obj.collection_name = collection_name
collection_obj.source = source
if 'received_at' in payload:
collection_obj.received_at = payload['received_at']
collection_obj.user = user
collection_id = collection_obj.save().to_mongo().to_dict()["_id"].__str__()
return {
"message": "Record saved!",
"data": {"_id": collection_id}
}
🧰 Tools
🪛 Ruff (0.14.5)

372-372: PEP 484 prohibits implicit Optional

Convert to T | None

(RUF013)


374-374: Create your own exception

(TRY002)


374-374: Avoid specifying long messages outside the exception class

(TRY003)

🤖 Prompt for AI Agents
In kairon/shared/pyscript/callback_pyscript_utils.py around lines 371-392,
replace the bare Exception with AppException for the missing bot case and avoid
overwriting the model's default received_at timestamp by only setting
collection_obj.received_at when payload supplies a non-null value (i.e., check
if received_at is present before assigning); keep all other field assignments
and return value unchanged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant