Skip to content

Commit 7b9bd6e

Browse files
committed
Combine into a single workflow
1 parent 51b16ee commit 7b9bd6e

File tree

3 files changed

+32
-52
lines changed

3 files changed

+32
-52
lines changed

Diff for: message_passing/introduction/starter.py

+6-2
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,30 @@ async def main(client: Optional[Client] = None):
2121
task_queue=TASK_QUEUE,
2222
)
2323

24+
# 👉 Send a Query
2425
supported_languages = await wf_handle.query(
2526
GreetingWorkflow.get_languages, GetLanguagesInput(include_unsupported=False)
2627
)
2728
print(f"supported languages: {supported_languages}")
2829

30+
# 👉 Execute an Update
2931
previous_language = await wf_handle.execute_update(
3032
GreetingWorkflow.set_language, Language.CHINESE
3133
)
3234
current_language = await wf_handle.query(GreetingWorkflow.get_language)
3335
print(f"language changed: {previous_language.name} -> {current_language.name}")
3436

37+
# 👉 Start an Update and then wait for it to complete
3538
update_handle = await wf_handle.start_update(
36-
GreetingWorkflow.set_language,
37-
Language.ENGLISH,
39+
GreetingWorkflow.set_language_using_activity,
40+
Language.ARABIC,
3841
wait_for_stage=WorkflowUpdateStage.ACCEPTED,
3942
)
4043
previous_language = await update_handle.result()
4144
current_language = await wf_handle.query(GreetingWorkflow.get_language)
4245
print(f"language changed: {previous_language.name} -> {current_language.name}")
4346

47+
# 👉 Send a Signal
4448
await wf_handle.signal(GreetingWorkflow.approve, ApproveInput(name=""))
4549
print(await wf_handle.result())
4650

Diff for: message_passing/introduction/workflows.py

+20-38
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from dataclasses import dataclass
33
from datetime import timedelta
44
from enum import IntEnum
5-
from typing import Optional
5+
from typing import List, Optional
66

77
from temporalio import activity, workflow
88
from temporalio.exceptions import ApplicationError
@@ -51,14 +51,21 @@ def __init__(self) -> None:
5151
Language.ENGLISH: "Hello, world",
5252
}
5353
self.language = Language.ENGLISH
54+
self.lock = asyncio.Lock() # used by the async handler below
5455

5556
@workflow.run
5657
async def run(self) -> str:
57-
await workflow.wait_condition(lambda: self.approved_for_release)
58+
# 👉 In addition to waiting for the `approve` Signal, we also wait for
59+
# all handlers to finish. Otherwise, the Workflow might return its
60+
# result while an async set_language_using_activity Update is in
61+
# progress.
62+
await workflow.wait_condition(
63+
lambda: self.approved_for_release and workflow.all_handlers_finished()
64+
)
5865
return self.greetings[self.language]
5966

6067
@workflow.query
61-
def get_languages(self, input: GetLanguagesInput) -> list[Language]:
68+
def get_languages(self, input: GetLanguagesInput) -> List[Language]:
6269
# 👉 A Query handler returns a value: it can inspect but must not mutate the Workflow state.
6370
if input.include_unsupported:
6471
return sorted(Language)
@@ -83,42 +90,9 @@ def validate_language(self, language: Language) -> None:
8390
# 👉 In an Update validator you raise any exception to reject the Update.
8491
raise ValueError(f"{language.name} is not supported")
8592

86-
@workflow.query
87-
def get_language(self) -> Language:
88-
return self.language
89-
90-
91-
@workflow.defn
92-
class GreetingWorkflowWithAsyncHandler(GreetingWorkflow):
93-
"""
94-
A workflow that that returns a greeting in one of many available languages.
95-
96-
It supports a Query to obtain the current language, an Update to change the
97-
current language and receive the previous language in response, and a Signal
98-
to approve the Workflow so that it is allowed to return its result.
99-
"""
100-
101-
# 👉 This workflow supports the full range of languages, because the update
102-
# handler is async and uses an activity to call a remote service.
103-
104-
def __init__(self) -> None:
105-
super().__init__()
106-
self.lock = asyncio.Lock()
107-
108-
@workflow.run
109-
async def run(self) -> str:
110-
# 👉 In addition to waiting for the `approve` Signal, we also wait for
111-
# all handlers to finish. Otherwise, the Workflow might return its
112-
# result while a set_language Update is in progress.
113-
await workflow.wait_condition(
114-
lambda: self.approved_for_release and workflow.all_handlers_finished()
115-
)
116-
return self.greetings[self.language]
117-
11893
@workflow.update
119-
async def set_language(self, language: Language) -> Language:
120-
# 👉 An Update handler can mutate the Workflow state and return a value.
121-
# 👉 Since this update handler is async, it can execute an activity.
94+
async def set_language_using_activity(self, language: Language) -> Language:
95+
# 👉 This update handler is async, so it can execute an activity.
12296
if language not in self.greetings:
12397
# 👉 We use a lock so that, if this handler is executed multiple
12498
# times, each execution can schedule the activity only when the
@@ -143,9 +117,17 @@ async def set_language(self, language: Language) -> Language:
143117
previous_language, self.language = self.language, language
144118
return previous_language
145119

120+
@workflow.query
121+
def get_language(self) -> Language:
122+
return self.language
123+
146124

147125
@activity.defn
148126
async def call_greeting_service(to_language: Language) -> Optional[str]:
127+
"""
128+
An Activity that simulates a call to a remote greeting service.
129+
The remote greeting service supports the full range of languages.
130+
"""
149131
greetings = {
150132
Language.ARABIC: "مرحبا بالعالم",
151133
Language.CHINESE: "你好,世界",

Diff for: tests/message_passing/introduction/workflow_test.py

+6-12
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
from message_passing.introduction.workflows import (
1010
GetLanguagesInput,
1111
GreetingWorkflow,
12-
GreetingWorkflowWithAsyncHandler,
1312
Language,
1413
call_greeting_service,
1514
)
@@ -107,23 +106,18 @@ async def test_set_language_that_is_only_available_via_remote_service(
107106
async with Worker(
108107
client,
109108
task_queue=TASK_QUEUE,
110-
workflows=[GreetingWorkflowWithAsyncHandler],
109+
workflows=[GreetingWorkflow],
111110
activities=[call_greeting_service],
112111
):
113112
wf_handle = await client.start_workflow(
114-
GreetingWorkflowWithAsyncHandler.run,
113+
GreetingWorkflow.run,
115114
id=str(uuid.uuid4()),
116115
task_queue=TASK_QUEUE,
117116
)
118-
assert (
119-
await wf_handle.query(GreetingWorkflowWithAsyncHandler.get_language)
120-
== Language.ENGLISH
121-
)
117+
assert await wf_handle.query(GreetingWorkflow.get_language) == Language.ENGLISH
122118
previous_language = await wf_handle.execute_update(
123-
GreetingWorkflowWithAsyncHandler.set_language, Language.ARABIC
119+
GreetingWorkflow.set_language_using_activity,
120+
Language.ARABIC,
124121
)
125122
assert previous_language == Language.ENGLISH
126-
assert (
127-
await wf_handle.query(GreetingWorkflowWithAsyncHandler.get_language)
128-
== Language.ARABIC
129-
)
123+
assert await wf_handle.query(GreetingWorkflow.get_language) == Language.ARABIC

0 commit comments

Comments
 (0)