forked from temporalio/samples-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworkflow_v1_1.py
45 lines (36 loc) · 1.46 KB
/
workflow_v1_1.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from datetime import timedelta
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from worker_versioning.activities import greet, super_greet
@workflow.defn
class MyWorkflow:
"""
The 1.1 version of the workflow, which is compatible with the first version.
The compatible changes we've made are:
- Altering the log lines
- Using the `patched` API to properly introduce branching behavior while maintaining
compatibility
"""
should_finish: bool = False
@workflow.run
async def run(self) -> str:
workflow.logger.info("Running workflow V1.1")
await workflow.wait_condition(lambda: self.should_finish)
return "Concluded workflow on V1.1"
@workflow.signal
async def proceeder(self, inp: str):
if workflow.patched("different-activity"):
await workflow.execute_activity(
super_greet,
args=["V1.1", 100],
start_to_close_timeout=timedelta(seconds=5),
)
else:
# Note it is a valid compatible change to alter the input to an activity. However, because
# we're using the patched API, this branch would only be taken if the workflow was started on
# a v1 worker.
await workflow.execute_activity(
greet, "V1.1", start_to_close_timeout=timedelta(seconds=5)
)
if inp == "finish":
self.should_finish = True