-
Notifications
You must be signed in to change notification settings - Fork 84
fixed the code to trigger callback executor when schedule action is triggered using pyscript #2148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
fixed the code to trigger callback executor when schedule action is triggered using pyscript #2148
Conversation
…riggered using pyscript.
WalkthroughExecutorFactory.get_executor now accepts an optional executor type and may return None per the diff. Scheduling logic routes non-flow tasks to a specific EventExecutor.callback via ExecutorFactory.get_executor(EventExecutor.callback). Tests are updated accordingly to import EventExecutor and call get_executor with the selector. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant S as Scheduler
participant F as ExecutorFactory
participant E as Executor (callback)
participant EF as Executor (flow)
rect rgba(200,230,255,0.3)
note over S: Non-flow job
S->>F: get_executor(EventExecutor.callback)
F-->>S: executor (callback)
S->>E: execute_task(event)
E-->>S: result
end
rect rgba(200,255,220,0.3)
note over S: Flow job
S->>F: get_executor()
F-->>S: executor (from env)
S->>EF: execute_task(event)
EF-->>S: result
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Pre-merge checks (2 passed, 1 warning)❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Poem
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).Please share your feedback with us on this Discord post. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
kairon/actions/definitions/schedule.py (1)
176-184: Unify executor invocation in add_schedule_job
Always pass the executor instance as the first arg toexecute_task(self, event_class, data), not thefuncref. For example:- if not is_flow: - func = obj_to_ref(ExecutorFactory.get_executor(EventExecutor.callback).execute_task) - else: - func = obj_to_ref(ExecutorFactory.get_executor().execute_task) - - _id = uuid7().hex - args = (func, "scheduler_evaluator", data,) - if is_flow: - args = (ExecutorFactory.get_executor(), EventClass.agentic_flow, data,) + executor = ExecutorFactory.get_executor(EventExecutor.callback) if not is_flow else ExecutorFactory.get_executor() + func = obj_to_ref(executor.execute_task) + _id = uuid7().hex + args = ( + executor, + "scheduler_evaluator" if not is_flow else EventClass.agentic_flow, + data, + )This ensures
execute_taskalways receives itsselfinstance first.kairon/events/executors/factory.py (2)
20-30: Fix enum/string mismatch and brittle config access in get_executor (currently raises even for valid config).Utility.environment['events']['executor'].get('type') returns a str (e.g., "dramatiq"), but __executors uses EventExecutor keys. The membership check and dict lookup will fail (and may KeyError if nested keys are absent). Normalize strings to EventExecutor and guard missing config.
Apply:
+from typing import Optional, Union @@ - def get_executor(executor_type: EventExecutor = None): + def get_executor(executor_type: Optional[Union[EventExecutor, str]] = None): @@ - if not executor_type: - executor_type = Utility.environment['events']['executor'].get('type') + if executor_type is None: + executor_type = ( + Utility.environment.get('events', {}) + .get('executor', {}) + .get('type') + ) + + # Normalize to Enum if provided as string (from config/callers) + if isinstance(executor_type, str): + try: + executor_type = EventExecutor(executor_type) + except ValueError: + valid_executors = [ex.value for ex in EventExecutor] + raise AppException(f"Executor type not configured in system.yaml. Valid types: {valid_executors}") - if executor_type not in ExecutorFactory.__executors.keys(): + if executor_type not in ExecutorFactory.__executors: valid_executors = [ex.value for ex in EventExecutor] raise AppException(f"Executor type not configured in system.yaml. Valid types: {valid_executors}") return ExecutorFactory.__executors[executor_type]()
37-44: get_executor_for_data will also fail due to the same enum/string mismatch—delegate to get_executor with normalization.Right now executor_type is a str from config and won’t match Enum keys. Reuse the fixed get_executor and avoid duplicating validation.
Apply:
- if data.get("task_type") == "Callback": - executor_type = Utility.environment['events']['executor'].get('callback_executor') - else: - executor_type = Utility.environment['events']['executor'].get('type') - if executor_type not in ExecutorFactory.__executors: - valid_executors = [ex.value for ex in EventExecutor] - raise AppException(f"Executor type not configured in system.yaml. Valid types: {valid_executors}") - return ExecutorFactory.__executors[executor_type]() + env = Utility.environment.get('events', {}).get('executor', {}) + raw = env.get('callback_executor') if data.get("task_type") == "Callback" else env.get('type') + return ExecutorFactory.get_executor(raw)
🧹 Nitpick comments (6)
kairon/actions/definitions/schedule.py (1)
39-41: Docstring typo.
“cakkback” → “callback”.- Initialize cakkback action. + Initialize callback action.tests/integration_test/action_service_test.py (4)
45-51: Import looks good; consider importing the class to avoid factory instantiation in tests.Importing
CallbackExecutorlets the assertions compare against the class method directly, avoiding side effects/env lookups fromExecutorFactory.get_executor(...).from kairon.shared.admin.data_objects import BotSecrets, LLMSecret -from kairon.shared.constants import KAIRON_USER_MSG_ENTITY, FORM_SLOT_SET_TYPE, EventClass, EventExecutor +from kairon.shared.constants import KAIRON_USER_MSG_ENTITY, FORM_SLOT_SET_TYPE, EventClass, EventExecutor +from kairon.events.executors.callback import CallbackExecutor
15110-15116: Stabilize assertion: compare to class method and use enum value, not magic string.This removes an executor instantiation from the test and avoids brittle string literals.
- assert job_state['args'][0] == obj_to_ref(ExecutorFactory.get_executor(EventExecutor.callback).execute_task) - assert job_state['args'][1] == 'scheduler_evaluator' + assert job_state['args'][0] == obj_to_ref(CallbackExecutor.execute_task) + assert job_state['args'][1] == EventClass.scheduler_evaluator.value
15218-15224: Apply the same stabilization here.- assert job_state['args'][0] == obj_to_ref(ExecutorFactory.get_executor(EventExecutor.callback).execute_task) - assert job_state['args'][1] == 'scheduler_evaluator' + assert job_state['args'][0] == obj_to_ref(CallbackExecutor.execute_task) + assert job_state['args'][1] == EventClass.scheduler_evaluator.value
15320-15326: Same stabilization + remove debug print.The print adds noise to CI logs.
- assert job_state['args'][0] == obj_to_ref(ExecutorFactory.get_executor(EventExecutor.callback).execute_task) - assert job_state['args'][1] == 'scheduler_evaluator' - print(job_state['args'][2]['predefined_objects']) + assert job_state['args'][0] == obj_to_ref(CallbackExecutor.execute_task) + assert job_state['args'][1] == EventClass.scheduler_evaluator.valuekairon/events/executors/factory.py (1)
27-27: Nit: no need to call .keys() for membership.executor_type in dict already checks keys.
Apply:
- if executor_type not in ExecutorFactory.__executors.keys(): + if executor_type not in ExecutorFactory.__executors:
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
kairon/actions/definitions/schedule.py(2 hunks)kairon/events/executors/factory.py(1 hunks)tests/integration_test/action_service_test.py(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (3)
kairon/actions/definitions/schedule.py (3)
kairon/shared/constants.py (2)
EventClass(73-89)EventExecutor(105-109)kairon/events/executors/factory.py (2)
ExecutorFactory(10-44)get_executor(20-30)kairon/events/executors/base.py (1)
execute_task(13-14)
tests/integration_test/action_service_test.py (4)
kairon/shared/constants.py (2)
EventClass(73-89)EventExecutor(105-109)kairon/events/executors/factory.py (2)
ExecutorFactory(10-44)get_executor(20-30)kairon/events/executors/base.py (1)
execute_task(13-14)kairon/events/executors/lamda.py (1)
execute_task(12-21)
kairon/events/executors/factory.py (2)
kairon/shared/constants.py (1)
EventExecutor(105-109)kairon/shared/utils.py (1)
Utility(91-2327)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: Codacy Static Code Analysis
- GitHub Check: Analyze (python)
- GitHub Check: Python CI
🔇 Additional comments (3)
kairon/actions/definitions/schedule.py (2)
29-29: Good addition: importing EventExecutor is necessary for explicit executor selection.
171-174: Correctly routing PYSCRIPT schedules to the callback executor.
This aligns with the PR objective; non-flow now targets CallbackExecutor while flow uses the environment default.kairon/events/executors/factory.py (1)
20-20: All ExecutorFactory.get_executor call sites validated No callers pass unsupported argument types; they either use the default or pass an EventExecutor enum.
| **kwargs): | ||
| func = obj_to_ref(ExecutorFactory.get_executor().execute_task) | ||
| if not is_flow: | ||
| func = obj_to_ref(ExecutorFactory.get_executor(EventExecutor.callback).execute_task) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove if else part
Summary by CodeRabbit