Skip to content

Commit 51b16ee

Browse files
committed
Message passing introduction sample (used in docs)
1 parent d0cdd30 commit 51b16ee

File tree

8 files changed

+392
-2
lines changed

8 files changed

+392
-2
lines changed

Diff for: message_passing/introduction/README.md

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Introduction to message-passing
2+
3+
This sample provides an introduction to using Query, Signal, and Update.
4+
5+
See https://docs.temporal.io/develop/python/message-passing.
6+
7+
To run, first see the main [README.md](../../README.md) for prerequisites.
8+
9+
Then create two terminals and `cd` to this directory.
10+
11+
Run the worker in one terminal:
12+
13+
poetry run python worker.py
14+
15+
And execute the workflow in the other terminal:
16+
17+
poetry run python starter.py
18+

Diff for: message_passing/introduction/__init__.py

Whitespace-only changes.

Diff for: message_passing/introduction/starter.py

+49
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
import asyncio
2+
from typing import Optional
3+
4+
from temporalio.client import Client, WorkflowUpdateStage
5+
6+
from message_passing.introduction.workflows import (
7+
ApproveInput,
8+
GetLanguagesInput,
9+
GreetingWorkflow,
10+
Language,
11+
)
12+
13+
TASK_QUEUE = "message-passing-introduction-task-queue"
14+
15+
16+
async def main(client: Optional[Client] = None):
17+
client = client or await Client.connect("localhost:7233")
18+
wf_handle = await client.start_workflow(
19+
GreetingWorkflow.run,
20+
id="greeting-workflow-1234",
21+
task_queue=TASK_QUEUE,
22+
)
23+
24+
supported_languages = await wf_handle.query(
25+
GreetingWorkflow.get_languages, GetLanguagesInput(include_unsupported=False)
26+
)
27+
print(f"supported languages: {supported_languages}")
28+
29+
previous_language = await wf_handle.execute_update(
30+
GreetingWorkflow.set_language, Language.CHINESE
31+
)
32+
current_language = await wf_handle.query(GreetingWorkflow.get_language)
33+
print(f"language changed: {previous_language.name} -> {current_language.name}")
34+
35+
update_handle = await wf_handle.start_update(
36+
GreetingWorkflow.set_language,
37+
Language.ENGLISH,
38+
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
39+
)
40+
previous_language = await update_handle.result()
41+
current_language = await wf_handle.query(GreetingWorkflow.get_language)
42+
print(f"language changed: {previous_language.name} -> {current_language.name}")
43+
44+
await wf_handle.signal(GreetingWorkflow.approve, ApproveInput(name=""))
45+
print(await wf_handle.result())
46+
47+
48+
if __name__ == "__main__":
49+
asyncio.run(main())

Diff for: message_passing/introduction/worker.py

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import asyncio
2+
import logging
3+
4+
from temporalio.client import Client
5+
from temporalio.worker import Worker
6+
7+
from message_passing.introduction.workflows import (
8+
GreetingWorkflow,
9+
call_greeting_service,
10+
)
11+
12+
interrupt_event = asyncio.Event()
13+
14+
15+
async def main():
16+
logging.basicConfig(level=logging.INFO)
17+
18+
client = await Client.connect("localhost:7233")
19+
20+
async with Worker(
21+
client,
22+
task_queue="message-passing-introduction-task-queue",
23+
workflows=[GreetingWorkflow],
24+
activities=[call_greeting_service],
25+
):
26+
logging.info("Worker started, ctrl+c to exit")
27+
await interrupt_event.wait()
28+
logging.info("Shutting down")
29+
30+
31+
if __name__ == "__main__":
32+
loop = asyncio.new_event_loop()
33+
try:
34+
loop.run_until_complete(main())
35+
except KeyboardInterrupt:
36+
interrupt_event.set()
37+
loop.run_until_complete(loop.shutdown_asyncgens())

Diff for: message_passing/introduction/workflows.py

+159
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
import asyncio
2+
from dataclasses import dataclass
3+
from datetime import timedelta
4+
from enum import IntEnum
5+
from typing import Optional
6+
7+
from temporalio import activity, workflow
8+
from temporalio.exceptions import ApplicationError
9+
10+
11+
class Language(IntEnum):
12+
ARABIC = 1
13+
CHINESE = 2
14+
ENGLISH = 3
15+
FRENCH = 4
16+
HINDI = 5
17+
PORTUGUESE = 6
18+
SPANISH = 7
19+
20+
21+
@dataclass
22+
class GetLanguagesInput:
23+
include_unsupported: bool
24+
25+
26+
@dataclass
27+
class ApproveInput:
28+
name: str
29+
30+
31+
@workflow.defn
32+
class GreetingWorkflow:
33+
"""
34+
A workflow that that returns a greeting in one of two languages.
35+
36+
It supports a Query to obtain the current language, an Update to change the
37+
current language and receive the previous language in response, and a Signal
38+
to approve the Workflow so that it is allowed to return its result.
39+
"""
40+
41+
# 👉 This Workflow does not use any async handlers and so cannot use any
42+
# Activities. It only supports two languages, whose greetings are hardcoded
43+
# in the Workflow definition. See GreetingWorkflowWithAsyncHandler below for
44+
# a Workflow that uses an async Update handler to call an Activity.
45+
46+
def __init__(self) -> None:
47+
self.approved_for_release = False
48+
self.approver_name: Optional[str] = None
49+
self.greetings = {
50+
Language.CHINESE: "你好,世界",
51+
Language.ENGLISH: "Hello, world",
52+
}
53+
self.language = Language.ENGLISH
54+
55+
@workflow.run
56+
async def run(self) -> str:
57+
await workflow.wait_condition(lambda: self.approved_for_release)
58+
return self.greetings[self.language]
59+
60+
@workflow.query
61+
def get_languages(self, input: GetLanguagesInput) -> list[Language]:
62+
# 👉 A Query handler returns a value: it can inspect but must not mutate the Workflow state.
63+
if input.include_unsupported:
64+
return sorted(Language)
65+
else:
66+
return sorted(self.greetings)
67+
68+
@workflow.signal
69+
def approve(self, input: ApproveInput) -> None:
70+
# 👉 A Signal handler mutates the Workflow state but cannot return a value.
71+
self.approved_for_release = True
72+
self.approver_name = input.name
73+
74+
@workflow.update
75+
def set_language(self, language: Language) -> Language:
76+
# 👉 An Update handler can mutate the Workflow state and return a value.
77+
previous_language, self.language = self.language, language
78+
return previous_language
79+
80+
@set_language.validator
81+
def validate_language(self, language: Language) -> None:
82+
if language not in self.greetings:
83+
# 👉 In an Update validator you raise any exception to reject the Update.
84+
raise ValueError(f"{language.name} is not supported")
85+
86+
@workflow.query
87+
def get_language(self) -> Language:
88+
return self.language
89+
90+
91+
@workflow.defn
92+
class GreetingWorkflowWithAsyncHandler(GreetingWorkflow):
93+
"""
94+
A workflow that that returns a greeting in one of many available languages.
95+
96+
It supports a Query to obtain the current language, an Update to change the
97+
current language and receive the previous language in response, and a Signal
98+
to approve the Workflow so that it is allowed to return its result.
99+
"""
100+
101+
# 👉 This workflow supports the full range of languages, because the update
102+
# handler is async and uses an activity to call a remote service.
103+
104+
def __init__(self) -> None:
105+
super().__init__()
106+
self.lock = asyncio.Lock()
107+
108+
@workflow.run
109+
async def run(self) -> str:
110+
# 👉 In addition to waiting for the `approve` Signal, we also wait for
111+
# all handlers to finish. Otherwise, the Workflow might return its
112+
# result while a set_language Update is in progress.
113+
await workflow.wait_condition(
114+
lambda: self.approved_for_release and workflow.all_handlers_finished()
115+
)
116+
return self.greetings[self.language]
117+
118+
@workflow.update
119+
async def set_language(self, language: Language) -> Language:
120+
# 👉 An Update handler can mutate the Workflow state and return a value.
121+
# 👉 Since this update handler is async, it can execute an activity.
122+
if language not in self.greetings:
123+
# 👉 We use a lock so that, if this handler is executed multiple
124+
# times, each execution can schedule the activity only when the
125+
# previously scheduled activity has completed. This ensures that
126+
# multiple calls to set_language are processed in order.
127+
async with self.lock:
128+
greeting = await workflow.execute_activity(
129+
call_greeting_service,
130+
language,
131+
start_to_close_timeout=timedelta(seconds=10),
132+
)
133+
if greeting is None:
134+
# 👉 An update validator cannot be async, so cannot be used
135+
# to check that the remote call_greeting_service supports
136+
# the requested language. Raising ApplicationError will fail
137+
# the Update, but the WorkflowExecutionUpdateAccepted event
138+
# will still be added to history.
139+
raise ApplicationError(
140+
f"Greeting service does not support {language.name}"
141+
)
142+
self.greetings[language] = greeting
143+
previous_language, self.language = self.language, language
144+
return previous_language
145+
146+
147+
@activity.defn
148+
async def call_greeting_service(to_language: Language) -> Optional[str]:
149+
greetings = {
150+
Language.ARABIC: "مرحبا بالعالم",
151+
Language.CHINESE: "你好,世界",
152+
Language.ENGLISH: "Hello, world",
153+
Language.FRENCH: "Bonjour, monde",
154+
Language.HINDI: "नमस्ते दुनिया",
155+
Language.PORTUGUESE: "Olá mundo",
156+
Language.SPANISH: "¡Hola mundo",
157+
}
158+
await asyncio.sleep(0.2) # Simulate a network call
159+
return greetings.get(to_language)

Diff for: message_passing/safe_message_handlers/worker.py

-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616

1717
async def main():
18-
# Connect client
1918
client = await Client.connect("localhost:7233")
2019

2120
async with Worker(
@@ -24,7 +23,6 @@ async def main():
2423
workflows=[ClusterManagerWorkflow],
2524
activities=[assign_nodes_to_job, unassign_nodes_for_job, find_bad_nodes],
2625
):
27-
# Wait until interrupted
2826
logging.info("ClusterManagerWorkflow worker started, ctrl+c to exit")
2927
await interrupt_event.wait()
3028
logging.info("Shutting down")

0 commit comments

Comments
 (0)