-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsimple_class_workflow.py
45 lines (30 loc) · 1.01 KB
/
simple_class_workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
#!/usr/bin/env python
from dotflow import DotFlow, action
@action(retry=5)
class Step:
def __init__(self, initial_context):
print(initial_context.storage, "__init__")
assert initial_context.storage
self.variable = True
def auxiliary_function(self):
"""This function will not be executed, because
it does not have an 'action' decorator.
"""
@action
def first_function(self, initial_context):
print(initial_context.storage, "first_function")
assert initial_context.storage == {"foo": "bar"}
assert self.variable is True
return {"foo": "bar"}
@action
def second_function(self, previous_context):
print(previous_context.storage, "second_function")
assert previous_context.storage == {"foo": "bar"}
return True
def main():
workflow = DotFlow()
workflow.task.add(step=Step, initial_context={"foo": "bar"})
workflow.start()
return workflow
if __name__ == "__main__":
main()