forked from cadence-workflow/cadence-python-client
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path_registry.py
More file actions
176 lines (127 loc) · 5.36 KB
/
_registry.py
File metadata and controls
176 lines (127 loc) · 5.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#!/usr/bin/env python3
"""
Workflow and Activity Registry for Cadence Python Client.
This module provides a registry system for managing workflows and activities,
similar to the Go client's registry.go implementation.
"""
import logging
from typing import Callable, Dict, Optional, Unpack, TypedDict
from cadence._internal.type_utils import validate_fn_parameters
logger = logging.getLogger(__name__)
class RegisterWorkflowOptions(TypedDict, total=False):
"""Options for registering a workflow."""
name: Optional[str]
alias: Optional[str]
class RegisterActivityOptions(TypedDict, total=False):
"""Options for registering an activity."""
name: Optional[str]
alias: Optional[str]
class Registry:
"""
Registry for managing workflows and activities.
This class provides functionality to register, retrieve, and manage
workflows and activities in a Cadence application.
"""
def __init__(self) -> None:
"""Initialize the registry."""
self._workflows: Dict[str, Callable] = {}
self._activities: Dict[str, Callable] = {}
self._workflow_aliases: Dict[str, str] = {} # alias -> name mapping
self._activity_aliases: Dict[str, str] = {} # alias -> name mapping
def workflow(
self,
func: Optional[Callable] = None,
**kwargs: Unpack[RegisterWorkflowOptions]
) -> Callable:
"""
Register a workflow function.
This method can be used as a decorator or called directly.
Args:
func: The workflow function to register
**kwargs: Options for registration (name, alias)
Returns:
The decorated function or the function itself
Raises:
KeyError: If workflow name already exists
"""
options = RegisterWorkflowOptions(**kwargs)
def decorator(f: Callable) -> Callable:
workflow_name = options.get('name') or f.__name__
if workflow_name in self._workflows:
raise KeyError(f"Workflow '{workflow_name}' is already registered")
self._workflows[workflow_name] = f
# Register alias if provided
alias = options.get('alias')
if alias:
if alias in self._workflow_aliases:
raise KeyError(f"Workflow alias '{alias}' is already registered")
self._workflow_aliases[alias] = workflow_name
logger.info(f"Registered workflow '{workflow_name}'")
return f
if func is None:
return decorator
return decorator(func)
def activity(
self,
func: Optional[Callable] = None,
**kwargs: Unpack[RegisterActivityOptions]
) -> Callable:
"""
Register an activity function.
This method can be used as a decorator or called directly.
Args:
func: The activity function to register
**kwargs: Options for registration (name, alias)
Returns:
The decorated function or the function itself
Raises:
KeyError: If activity name already exists
"""
options = RegisterActivityOptions(**kwargs)
def decorator(f: Callable) -> Callable:
validate_fn_parameters(f)
activity_name = options.get('name') or f.__name__
if activity_name in self._activities:
raise KeyError(f"Activity '{activity_name}' is already registered")
self._activities[activity_name] = f
# Register alias if provided
alias = options.get('alias')
if alias:
if alias in self._activity_aliases:
raise KeyError(f"Activity alias '{alias}' is already registered")
self._activity_aliases[alias] = activity_name
logger.info(f"Registered activity '{activity_name}'")
return f
if func is None:
return decorator
return decorator(func)
def get_workflow(self, name: str) -> Callable:
"""
Get a registered workflow by name.
Args:
name: Name or alias of the workflow
Returns:
The workflow function
Raises:
KeyError: If workflow is not found
"""
# Check if it's an alias
actual_name = self._workflow_aliases.get(name, name)
if actual_name not in self._workflows:
raise KeyError(f"Workflow '{name}' not found in registry")
return self._workflows[actual_name]
def get_activity(self, name: str) -> Callable:
"""
Get a registered activity by name.
Args:
name: Name or alias of the activity
Returns:
The activity function
Raises:
KeyError: If activity is not found
"""
# Check if it's an alias
actual_name = self._activity_aliases.get(name, name)
if actual_name not in self._activities:
raise KeyError(f"Activity '{name}' not found in registry")
return self._activities[actual_name]