Add base and decision task handler#28
Conversation
Signed-off-by: Tim Li <ltim@uber.com>
Signed-off-by: Tim Li <ltim@uber.com>
8d7af58 to
7818de9
Compare
fe6773d to
5d759c3
Compare
Signed-off-by: Tim Li <ltim@uber.com>
f6a8ed8 to
5637a8f
Compare
Signed-off-by: Tim Li <ltim@uber.com>
5637a8f to
08b0e3b
Compare
| force_create_new_decision_task: bool = False | ||
| query_results: Optional[dict] = None |
There was a problem hiding this comment.
not needed at the moment. This is related to local activities and workflow query. We might have better solutions later than copying java legacy
There was a problem hiding this comment.
those fields are used in RespondDecisionTaskCompletedRequest but we can remove them for now
| workflow_type = task.workflow_type | ||
|
|
||
| if not workflow_execution or not workflow_type: | ||
| logger.error("Decision task missing workflow execution or type") |
| cause = DecisionTaskFailedCause.DECISION_TASK_FAILED_CAUSE_BAD_SCHEDULE_ACTIVITY_ATTRIBUTES | ||
|
|
||
| # Create error details | ||
| error_message = str(error).encode('utf-8') |
There was a problem hiding this comment.
add a TODO here. I think we need a data converter for this.
|
|
||
| logger.info("Decision task failure response sent") | ||
|
|
||
| except Exception: |
There was a problem hiding this comment.
maybe just don't handle exception at all and let worker to handle it
| return_new_decision_task=decision_result.force_create_new_decision_task, | ||
| force_create_new_decision_task=decision_result.force_create_new_decision_task |
There was a problem hiding this comment.
nit: these are for locally dispatched activities. We can add it later so it's cleaner. Same thing for query related logic
| logger.exception("Error responding to decision task completion") | ||
| raise | ||
|
|
||
| def cleanup_workflow_engine(self, workflow_id: str, run_id: str) -> None: |
There was a problem hiding this comment.
This method is not used anywhere
|
|
||
| # Get or create workflow engine for this workflow execution | ||
| engine_key = f"{workflow_id}:{run_id}" | ||
| if engine_key not in self._workflow_engines: |
There was a problem hiding this comment.
since we don't support sticky cache yet. This will cause memory leak without exit logic. Maybe just remove _workflow_engines for now and add it when sticky cache is implemented.
Signed-off-by: Tim Li <ltim@uber.com>
What changed?
Add base and decision task handler
Why?
We need task handler to handle polled tasks
How did you test it?
unit test
Potential risks
Release notes
Documentation Changes