forked from cadence-workflow/cadence-python-client
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patherror.py
More file actions
131 lines (100 loc) · 3.61 KB
/
error.py
File metadata and controls
131 lines (100 loc) · 3.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
from datetime import timedelta
from typing import Any
import grpc
class ContinueAsNewError(Exception):
def __init__(
self,
*args: Any,
workflow_type: str | None = None,
task_list: str | None = None,
execution_start_to_close_timeout: timedelta | None = None,
task_start_to_close_timeout: timedelta | None = None,
):
super().__init__("ContinueAsNew")
self.workflow_args = args
self.workflow_type = workflow_type
self.task_list = task_list
self.execution_start_to_close_timeout = execution_start_to_close_timeout
self.task_start_to_close_timeout = task_start_to_close_timeout
class ActivityFailure(Exception):
def __init__(self, message: str) -> None:
super().__init__(message)
class WorkflowFailure(Exception):
def __init__(self, message: str) -> None:
super().__init__(message)
class CadenceRpcError(Exception):
def __init__(self, message: str, code: grpc.StatusCode, *args):
super().__init__(message, code, *args)
self.code = code
class WorkflowExecutionAlreadyStartedError(CadenceRpcError):
def __init__(
self, message: str, code: grpc.StatusCode, start_request_id: str, run_id: str
) -> None:
super().__init__(message, code, start_request_id, run_id)
self.start_request_id = start_request_id
self.run_id = run_id
class EntityNotExistsError(CadenceRpcError):
def __init__(
self,
message: str,
code: grpc.StatusCode,
current_cluster: str,
active_cluster: str,
active_clusters: list[str],
) -> None:
super().__init__(
message, code, current_cluster, active_cluster, active_clusters
)
self.current_cluster = current_cluster
self.active_cluster = active_cluster
self.active_clusters = active_clusters
class WorkflowExecutionAlreadyCompletedError(CadenceRpcError):
pass
class DomainNotActiveError(CadenceRpcError):
def __init__(
self,
message: str,
code: grpc.StatusCode,
domain: str,
current_cluster: str,
active_cluster: str,
active_clusters: list[str],
) -> None:
super().__init__(
message, code, domain, current_cluster, active_cluster, active_clusters
)
self.domain = domain
self.current_cluster = current_cluster
self.active_cluster = active_cluster
self.active_clusters = active_clusters
class ClientVersionNotSupportedError(CadenceRpcError):
def __init__(
self,
message: str,
code: grpc.StatusCode,
feature_version: str,
client_impl: str,
supported_versions: str,
) -> None:
super().__init__(
message, code, feature_version, client_impl, supported_versions
)
self.feature_version = feature_version
self.client_impl = client_impl
self.supported_versions = supported_versions
class FeatureNotEnabledError(CadenceRpcError):
def __init__(self, message: str, code: grpc.StatusCode, feature_flag: str) -> None:
super().__init__(message, code, feature_flag)
self.feature_flag = feature_flag
class CancellationAlreadyRequestedError(CadenceRpcError):
pass
class DomainAlreadyExistsError(CadenceRpcError):
pass
class LimitExceededError(CadenceRpcError):
pass
class QueryFailedError(CadenceRpcError):
pass
class ServiceBusyError(CadenceRpcError):
def __init__(self, message: str, code: grpc.StatusCode, reason: str) -> None:
super().__init__(message, code, reason)
self.reason = reason