Skip to content

Commit d9658d5

Browse files
authored
Waiting for handlers to finish in all exit cases + abort and compensation in a message handler (#144)
1 parent 8179fdc commit d9658d5

File tree

15 files changed

+836
-3
lines changed

15 files changed

+836
-3
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
# Waiting for message handlers
2+
3+
This workflow demonstrates how to wait for signal and update handlers to
4+
finish in the following circumstances:
5+
6+
- Before a successful return
7+
- On failure
8+
- On cancellation
9+
10+
Your workflow can also exit via Continue-As-New. In that case you would
11+
usually wait for the handlers to finish immediately before the call to
12+
continue_as_new(); that's not illustrated in this sample.
13+
14+
15+
To run, open two terminals and `cd` to this directory in them.
16+
17+
Run the worker in one terminal:
18+
19+
poetry run python worker.py
20+
21+
And run the workflow-starter code in the other terminal:
22+
23+
poetry run python starter.py
24+
25+
26+
Here's the output you'll see:
27+
28+
```
29+
workflow exit type: SUCCESS
30+
🟢 caller received update result
31+
🟢 caller received workflow result
32+
33+
34+
workflow exit type: FAILURE
35+
🟢 caller received update result
36+
🔴 caught exception while waiting for workflow result: Workflow execution failed: deliberately failing workflow
37+
38+
39+
workflow exit type: CANCELLATION
40+
🟢 caller received update result
41+
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from dataclasses import dataclass
2+
from enum import IntEnum
3+
4+
TASK_QUEUE = "my-task-queue"
5+
WORKFLOW_ID = "my-workflow-id"
6+
7+
8+
class WorkflowExitType(IntEnum):
9+
SUCCESS = 0
10+
FAILURE = 1
11+
CANCELLATION = 2
12+
13+
14+
@dataclass
15+
class WorkflowInput:
16+
exit_type: WorkflowExitType
17+
18+
19+
@dataclass
20+
class WorkflowResult:
21+
data: str
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import asyncio
2+
3+
from temporalio import activity
4+
5+
6+
@activity.defn
7+
async def activity_executed_by_update_handler():
8+
await asyncio.sleep(1)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import asyncio
2+
3+
from temporalio import client, common
4+
5+
from message_passing.waiting_for_handlers import (
6+
TASK_QUEUE,
7+
WORKFLOW_ID,
8+
WorkflowExitType,
9+
WorkflowInput,
10+
)
11+
from message_passing.waiting_for_handlers.workflows import WaitingForHandlersWorkflow
12+
13+
14+
async def starter(exit_type: WorkflowExitType):
15+
cl = await client.Client.connect("localhost:7233")
16+
wf_handle = await cl.start_workflow(
17+
WaitingForHandlersWorkflow.run,
18+
WorkflowInput(exit_type=exit_type),
19+
id=WORKFLOW_ID,
20+
task_queue=TASK_QUEUE,
21+
id_conflict_policy=common.WorkflowIDConflictPolicy.TERMINATE_EXISTING,
22+
)
23+
await _check_run(wf_handle, exit_type)
24+
25+
26+
async def _check_run(
27+
wf_handle: client.WorkflowHandle,
28+
exit_type: WorkflowExitType,
29+
):
30+
try:
31+
up_handle = await wf_handle.start_update(
32+
WaitingForHandlersWorkflow.my_update,
33+
wait_for_stage=client.WorkflowUpdateStage.ACCEPTED,
34+
)
35+
except Exception as e:
36+
print(f" 🔴 caught exception while starting update: {e}: {e.__cause__ or ''}")
37+
38+
if exit_type == WorkflowExitType.CANCELLATION:
39+
await wf_handle.cancel()
40+
41+
try:
42+
await up_handle.result()
43+
print(" 🟢 caller received update result")
44+
except Exception as e:
45+
print(
46+
f" 🔴 caught exception while waiting for update result: {e}: {e.__cause__ or ''}"
47+
)
48+
49+
try:
50+
await wf_handle.result()
51+
print(" 🟢 caller received workflow result")
52+
except BaseException as e:
53+
print(
54+
f" 🔴 caught exception while waiting for workflow result: {e}: {e.__cause__ or ''}"
55+
)
56+
57+
58+
async def main():
59+
for exit_type in [
60+
WorkflowExitType.SUCCESS,
61+
WorkflowExitType.FAILURE,
62+
WorkflowExitType.CANCELLATION,
63+
]:
64+
print(f"\n\nworkflow exit type: {exit_type.name}")
65+
await starter(exit_type)
66+
67+
68+
if __name__ == "__main__":
69+
asyncio.run(main())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import asyncio
2+
import logging
3+
4+
from temporalio.client import Client
5+
from temporalio.worker import Worker
6+
7+
from message_passing.waiting_for_handlers import TASK_QUEUE
8+
from message_passing.waiting_for_handlers.activities import (
9+
activity_executed_by_update_handler,
10+
)
11+
from message_passing.waiting_for_handlers.workflows import WaitingForHandlersWorkflow
12+
13+
interrupt_event = asyncio.Event()
14+
15+
16+
async def main():
17+
logging.basicConfig(level=logging.INFO)
18+
19+
client = await Client.connect("localhost:7233")
20+
21+
async with Worker(
22+
client,
23+
task_queue=TASK_QUEUE,
24+
workflows=[WaitingForHandlersWorkflow],
25+
activities=[
26+
activity_executed_by_update_handler,
27+
],
28+
):
29+
logging.info("Worker started, ctrl+c to exit")
30+
await interrupt_event.wait()
31+
logging.info("Shutting down")
32+
33+
34+
if __name__ == "__main__":
35+
loop = asyncio.new_event_loop()
36+
try:
37+
loop.run_until_complete(main())
38+
except KeyboardInterrupt:
39+
interrupt_event.set()
40+
loop.run_until_complete(loop.shutdown_asyncgens())
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import asyncio
2+
from datetime import timedelta
3+
4+
from temporalio import exceptions, workflow
5+
6+
from message_passing.waiting_for_handlers import (
7+
WorkflowExitType,
8+
WorkflowInput,
9+
WorkflowResult,
10+
)
11+
from message_passing.waiting_for_handlers.activities import (
12+
activity_executed_by_update_handler,
13+
)
14+
15+
16+
def is_workflow_exit_exception(e: BaseException) -> bool:
17+
"""
18+
True if the exception is of a type that will cause the workflow to exit.
19+
20+
This is as opposed to exceptions that cause a workflow task failure, which
21+
are retried automatically by Temporal.
22+
"""
23+
# 👉 If you have set additional failure_exception_types you should also
24+
# check for these here.
25+
return isinstance(e, (asyncio.CancelledError, exceptions.FailureError))
26+
27+
28+
@workflow.defn
29+
class WaitingForHandlersWorkflow:
30+
@workflow.run
31+
async def run(self, input: WorkflowInput) -> WorkflowResult:
32+
"""
33+
This workflow.run method demonstrates a pattern that can be used to wait for signal and
34+
update handlers to finish in the following circumstances:
35+
36+
- On successful workflow return
37+
- On workflow cancellation
38+
- On workflow failure
39+
40+
Your workflow can also exit via Continue-As-New. In that case you would usually wait for
41+
the handlers to finish immediately before the call to continue_as_new(); that's not
42+
illustrated in this sample.
43+
44+
If you additionally need to perform cleanup or compensation on workflow failure or
45+
cancellation, see the message_passing/waiting_for_handlers_and_compensation sample.
46+
"""
47+
try:
48+
# 👉 Use this `try...except` style, instead of waiting for message
49+
# handlers to finish in a `finally` block. The reason is that some
50+
# exception types cause a workflow task failure as opposed to
51+
# workflow exit, in which case we do *not* want to wait for message
52+
# handlers to finish.
53+
result = await self._my_workflow_application_logic(input)
54+
await workflow.wait_condition(workflow.all_handlers_finished)
55+
return result
56+
# 👉 Catch BaseException since asyncio.CancelledError does not inherit
57+
# from Exception.
58+
except BaseException as e:
59+
if is_workflow_exit_exception(e):
60+
await workflow.wait_condition(workflow.all_handlers_finished)
61+
raise
62+
63+
# Methods below this point can be ignored unless you are interested in
64+
# the implementation details of this sample.
65+
66+
def __init__(self) -> None:
67+
self._update_started = False
68+
69+
@workflow.update
70+
async def my_update(self) -> str:
71+
self._update_started = True
72+
await workflow.execute_activity(
73+
activity_executed_by_update_handler,
74+
start_to_close_timeout=timedelta(seconds=10),
75+
)
76+
return "update-result"
77+
78+
async def _my_workflow_application_logic(
79+
self, input: WorkflowInput
80+
) -> WorkflowResult:
81+
# The main workflow logic is implemented in a separate method in order
82+
# to separate "platform-level" concerns (waiting for handlers to finish
83+
# and error handling) from application logic.
84+
85+
# Wait until handlers have started, so that we are demonstrating that we
86+
# wait for them to finish.
87+
await workflow.wait_condition(lambda: self._update_started)
88+
if input.exit_type == WorkflowExitType.SUCCESS:
89+
return WorkflowResult(data="workflow-result")
90+
elif input.exit_type == WorkflowExitType.FAILURE:
91+
raise exceptions.ApplicationError("deliberately failing workflow")
92+
elif input.exit_type == WorkflowExitType.CANCELLATION:
93+
# Block forever; the starter will send a workflow cancellation request.
94+
await asyncio.Future()
95+
raise AssertionError("unreachable")
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Waiting for message handlers, and performing compensation and cleanup in message handlers
2+
3+
This sample demonstrates how to do the following:
4+
5+
1. Ensure that all update/signal handlers are finished before a successful
6+
workflow return, and on workflow cancellation and failure.
7+
2. Perform compensation/cleanup in an update handler when the workflow is
8+
cancelled or fails.
9+
10+
For a simpler sample showing how to do (1) without (2), see [safe_message_handlers](../safe_message_handlers/README.md).
11+
12+
To run, open two terminals and `cd` to this directory in them.
13+
14+
Run the worker in one terminal:
15+
16+
poetry run python worker.py
17+
18+
And run the workflow-starter code in the other terminal:
19+
20+
poetry run python starter.py
21+
22+
23+
Here's the output you'll see:
24+
25+
```
26+
workflow exit type: SUCCESS
27+
🟢 caller received update result
28+
🟢 caller received workflow result
29+
30+
31+
workflow exit type: FAILURE
32+
🔴 caught exception while waiting for update result: Workflow update failed: The update failed because the workflow run exited
33+
🔴 caught exception while waiting for workflow result: Workflow execution failed: deliberately failing workflow
34+
35+
36+
workflow exit type: CANCELLATION
37+
🔴 caught exception while waiting for update result: Workflow update failed: The update failed because the workflow run exited
38+
🔴 caught exception while waiting for workflow result: Workflow execution failed: Workflow cancelled
39+
```
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from dataclasses import dataclass
2+
from enum import IntEnum
3+
4+
TASK_QUEUE = "my-task-queue"
5+
WORKFLOW_ID = "my-workflow-id"
6+
7+
8+
class WorkflowExitType(IntEnum):
9+
SUCCESS = 0
10+
FAILURE = 1
11+
CANCELLATION = 2
12+
13+
14+
@dataclass
15+
class WorkflowInput:
16+
exit_type: WorkflowExitType
17+
18+
19+
@dataclass
20+
class WorkflowResult:
21+
data: str
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
import asyncio
2+
3+
from temporalio import activity
4+
5+
6+
@activity.defn
7+
async def activity_executed_to_perform_workflow_compensation():
8+
await asyncio.sleep(1)
9+
10+
11+
@activity.defn
12+
async def activity_executed_by_update_handler():
13+
await asyncio.sleep(1)
14+
15+
16+
@activity.defn
17+
async def activity_executed_by_update_handler_to_perform_compensation():
18+
await asyncio.sleep(1)

0 commit comments

Comments
 (0)