Skip to content

Commit

Permalink
Rename failure handling API
Browse files Browse the repository at this point in the history
  • Loading branch information
longquanzheng committed Dec 10, 2024
1 parent c3bd12a commit ee3f968
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 13 deletions.
2 changes: 1 addition & 1 deletion iwf/tests/test_state_failure_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def execute(
def get_state_options(self) -> WorkflowStateOptions:
return WorkflowStateOptions(
execute_api_retry_policy=RetryPolicy(maximum_attempts=1),
execute_failure_handling_state=RecoveryState,
execute_failure_handling_state_when_retry_exhausted=RecoveryState,
)


Expand Down
27 changes: 15 additions & 12 deletions iwf/workflow_state_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
WaitUntilApiFailurePolicy,
WorkflowStateOptions as IdlWorkflowStateOptions,
)
from iwf.workflow_state import WorkflowState


@dataclass
Expand All @@ -30,17 +31,19 @@ class WorkflowStateOptions:
execute_api_timeout_seconds: Optional[int] = None
execute_api_retry_policy: Optional[RetryPolicy] = None
"""
note that the failing handling state will take the same input as the failed state
the type is Optional[type[WorkflowState]] but there is an issue with type hint...
TODO fix this type hint
Set the state to proceed to the specified state after the execute API exhausted all retries
This is useful for some advanced use cases like SAGA pattern.
RetryPolicy is required to be set with maximumAttempts or maximumAttemptsDurationSeconds for execute API.
Note that the failing handling state will take the same input as the failed state
TODO the type should be the type is Optional[type[WorkflowState]] but -- there is an issue with type hint...
"""
execute_failure_handling_state: Optional[type] = None
execute_api_data_attributes_loading_policy: Optional[PersistenceLoadingPolicy] = (
None
)
execute_api_search_attributes_loading_policy: Optional[PersistenceLoadingPolicy] = (
None
)
execute_failure_handling_state_when_retry_exhausted: Optional[type[WorkflowState]] = None
execute_api_data_attributes_loading_policy: Optional[
PersistenceLoadingPolicy
] = None
execute_api_search_attributes_loading_policy: Optional[
PersistenceLoadingPolicy
] = None


def _to_idl_state_options(
Expand Down Expand Up @@ -87,14 +90,14 @@ def _to_idl_state_options(
res.execute_api_retry_policy = options.execute_api_retry_policy
if options.execute_api_timeout_seconds is not None:
res.execute_api_timeout_seconds = options.execute_api_timeout_seconds
if options.execute_failure_handling_state is not None:
if options.execute_failure_handling_state_when_retry_exhausted is not None:
res.execute_api_failure_policy = (
ExecuteApiFailurePolicy.PROCEED_TO_CONFIGURED_STATE
)
from iwf.workflow_state import get_state_id_by_class

res.execute_api_failure_proceed_state_id = get_state_id_by_class(
options.execute_failure_handling_state
options.execute_failure_handling_state_when_retry_exhausted
)
state = state_store[res.execute_api_failure_proceed_state_id]
proceed_state_options = state.get_state_options()
Expand Down

0 comments on commit ee3f968

Please sign in to comment.