-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathget_registry.py
More file actions
59 lines (48 loc) · 2.02 KB
/
get_registry.py
File metadata and controls
59 lines (48 loc) · 2.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import importlib
import inspect
import sys
from pathlib import Path
from typing import get_type_hints
from fluxqueue import Context
def get_registry(module_path: str, queue: str, module_dir: str | None = None):
if module_dir:
module_dir_path = Path(module_dir).resolve()
if str(module_dir_path) not in sys.path:
sys.path.insert(0, str(module_dir_path))
module = importlib.import_module(module_path)
registry = {"tasks": {}, "contexts": {}}
for _name, obj in inspect.getmembers(module):
if inspect.isfunction(obj):
task_name = getattr(obj, "task_name", None)
task_queue = getattr(obj, "queue", None)
if not task_queue or task_queue != queue:
continue
if registry["tasks"].get(task_name):
raise ValueError(f"Task '{task_name}' is duplicated")
original_func = getattr(obj, "__wrapped__", obj)
hints = get_type_hints(original_func)
sig = inspect.signature(original_func)
context_params = {
name: hints[name]
for name in sig.parameters
if name in hints
and isinstance(hints[name], type)
and issubclass(hints[name], Context)
}
if not context_params:
context_name = None
else:
context = context_params[next(iter(context_params))]
context_name = getattr(context, "__fluxqueue_context__", None)
registry["tasks"][task_name] = {
"func": original_func,
"context_name": context_name,
}
elif inspect.isclass(obj):
if not issubclass(obj, Context):
continue
context_name = getattr(obj, "__fluxqueue_context__", None)
if registry["contexts"].get(context_name):
raise ValueError(f"Context '{context_name}' is duplicated")
registry["contexts"][context_name] = obj
return registry