Skip to content

Commit a28c322

Browse files
committed
honor-completion-with-immediate-effect workflows
1 parent ba5fd0f commit a28c322

File tree

1 file changed

+152
-0
lines changed

1 file changed

+152
-0
lines changed

Diff for: dan/honor-completion-with-immediate-effect.py

+152
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
import asyncio
2+
3+
from temporalio import common, workflow
4+
from temporalio.client import Client, WorkflowUpdateStage
5+
from temporalio.exceptions import ApplicationError
6+
from temporalio.worker import Worker
7+
8+
wid = __file__
9+
tq = "tq"
10+
11+
12+
@workflow.defn
13+
class MainCoroutineExitShouldHavePriorityOverHandlerWorkflow:
14+
def __init__(self) -> None:
15+
self.seen_first_signal = False
16+
self.seen_second_signal = False
17+
self.should_can = True # a signal could toggle this
18+
19+
@workflow.run
20+
async def run(self):
21+
await workflow.wait_condition(
22+
lambda: self.seen_first_signal and self.seen_second_signal
23+
)
24+
25+
@workflow.signal
26+
async def this_signal_always_executes_first(self):
27+
self.seen_first_signal = True
28+
if self.should_can:
29+
workflow.continue_as_new()
30+
31+
@workflow.signal
32+
async def this_signal_always_executes_second(self):
33+
await workflow.wait_condition(lambda: self.seen_first_signal)
34+
self.seen_second_signal = True
35+
raise ApplicationError("I don't expect this to be raised if signal 1 did CAN")
36+
37+
38+
@workflow.defn
39+
class SecretDetectorWorkflow:
40+
def __init__(self) -> None:
41+
self.data: list[str] = []
42+
43+
@workflow.run
44+
async def run(self) -> None:
45+
await workflow.wait_condition(lambda: False)
46+
47+
@workflow.signal
48+
def send_data(self, data: str):
49+
self.data.append(data)
50+
self.check_data()
51+
52+
def check_data(self):
53+
for d in self.data:
54+
if d.startswith("<SECRET>"):
55+
raise ApplicationError("secret detected, shutting down immediately")
56+
57+
@workflow.update
58+
async def read_data(self) -> list[str]:
59+
await workflow.wait_condition(lambda: len(self.data) > 0)
60+
return self.data
61+
62+
63+
@workflow.defn
64+
class ShutdownImmediatelyWorkflow:
65+
def __init__(self) -> None:
66+
self.data: list[str] = []
67+
68+
@workflow.run
69+
async def run(self) -> None:
70+
await workflow.wait_condition(lambda: False)
71+
72+
@workflow.signal
73+
async def shutdown_immediately(self):
74+
self.data.append("immediate shutdown requested")
75+
raise ApplicationError("immediate shutdown requested")
76+
77+
@workflow.update
78+
async def read_data(self) -> list[str]:
79+
await workflow.wait_condition(lambda: len(self.data) > 0)
80+
return self.data
81+
82+
83+
async def run_main_coroutine_exit_should_have_priority_over_handler():
84+
client = await Client.connect("localhost:7233")
85+
async with Worker(
86+
client,
87+
task_queue=tq,
88+
workflows=[MainCoroutineExitShouldHavePriorityOverHandlerWorkflow],
89+
):
90+
wf_handle = await client.start_workflow(
91+
MainCoroutineExitShouldHavePriorityOverHandlerWorkflow.run,
92+
id=wid,
93+
task_queue=tq,
94+
id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING,
95+
)
96+
await wf_handle.signal(
97+
MainCoroutineExitShouldHavePriorityOverHandlerWorkflow.this_signal_always_executes_second
98+
)
99+
await wf_handle.signal(
100+
MainCoroutineExitShouldHavePriorityOverHandlerWorkflow.this_signal_always_executes_first
101+
)
102+
103+
104+
async def run_secret_detector():
105+
client = await Client.connect("localhost:7233")
106+
async with Worker(
107+
client,
108+
task_queue=tq,
109+
workflows=[SecretDetectorWorkflow],
110+
):
111+
wf_handle = await client.start_workflow(
112+
SecretDetectorWorkflow.run,
113+
id=wid,
114+
task_queue=tq,
115+
id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING,
116+
)
117+
118+
upd_handle = await wf_handle.start_update(
119+
SecretDetectorWorkflow.read_data,
120+
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
121+
)
122+
await wf_handle.signal(
123+
SecretDetectorWorkflow.send_data,
124+
"<SECRET> Updates must not return this! </SECRET>",
125+
)
126+
print(await upd_handle.result())
127+
128+
129+
async def run_shutdown_immediately():
130+
client = await Client.connect("localhost:7233")
131+
async with Worker(
132+
client,
133+
task_queue=tq,
134+
workflows=[ShutdownImmediatelyWorkflow],
135+
):
136+
wf_handle = await client.start_workflow(
137+
ShutdownImmediatelyWorkflow.run,
138+
id=wid,
139+
task_queue=tq,
140+
id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING,
141+
)
142+
143+
upd_handle = await wf_handle.start_update(
144+
ShutdownImmediatelyWorkflow.read_data,
145+
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
146+
)
147+
await wf_handle.signal(ShutdownImmediatelyWorkflow.shutdown_immediately)
148+
print(await upd_handle.result())
149+
150+
151+
if __name__ == "__main__":
152+
asyncio.run(run_secret_detector())

0 commit comments

Comments
 (0)