1
+ import asyncio
1
2
from dataclasses import dataclass
3
+ from datetime import timedelta
2
4
from enum import IntEnum
3
5
from typing import Optional
4
6
5
- from temporalio import workflow
7
+ from docutils import ApplicationError
8
+ from temporalio import activity , workflow
6
9
7
10
8
11
class Language (IntEnum ):
@@ -11,6 +14,23 @@ class Language(IntEnum):
11
14
FRENCH = 3
12
15
SPANISH = 4
13
16
PORTUGUESE = 5
17
+ ARABIC = 6
18
+ HINDI = 7
19
+
20
+
21
+ @activity .defn
22
+ async def call_greeting_service (to_language : Language ) -> Optional [str ]:
23
+ greetings = {
24
+ Language .CHINESE : "你好,世界" ,
25
+ Language .ENGLISH : "Hello, world" ,
26
+ Language .FRENCH : "Bonjour, monde" ,
27
+ Language .SPANISH : "¡Hola mundo" ,
28
+ Language .PORTUGUESE : "Olá mundo" ,
29
+ Language .ARABIC : "مرحبا بالعالم" ,
30
+ Language .HINDI : "नमस्ते दुनिया" ,
31
+ }
32
+ await asyncio .sleep (0.2 ) # Pretend to do a network call
33
+ return greetings .get (to_language )
14
34
15
35
16
36
@dataclass
@@ -33,10 +53,12 @@ def __init__(self) -> None:
33
53
Language .ENGLISH : "Hello, world" ,
34
54
Language .CHINESE : "你好,世界" ,
35
55
}
56
+ self .lock = asyncio .Lock ()
36
57
37
58
@workflow .run
38
59
async def run (self ) -> str :
39
60
await workflow .wait_condition (lambda : self .approved_for_release )
61
+ await workflow .wait_condition (workflow .all_handlers_finished )
40
62
return self .greetings [self .language ]
41
63
42
64
@workflow .query
@@ -54,17 +76,29 @@ def approve(self, input: ApproveInput) -> None:
54
76
self .approver_name = input .name
55
77
56
78
@workflow .update
57
- def set_language (self , language : Language ) -> Language :
79
+ async def set_language (self , language : Language ) -> Language :
58
80
# 👉 An Update handler can mutate the Workflow state and return a value.
81
+ # 👉 Since this update handler is async, it can execute an activity.
82
+ if language not in self .greetings :
83
+ # 👉 Use a lock here to ensure that multiple calls to set_language are processed in order.
84
+ async with self .lock :
85
+ greeting = await workflow .execute_activity (
86
+ call_greeting_service ,
87
+ language ,
88
+ start_to_close_timeout = timedelta (seconds = 10 ),
89
+ )
90
+ if greeting is None :
91
+ # 👉 An update validator cannot be async, so cannot be used to check that the remote
92
+ # call_greeting_service supports the requested language. Raising ApplicationError
93
+ # will fail the Update, but the WorkflowExecutionUpdateAccepted event will still be
94
+ # added to history.
95
+ raise ApplicationError (
96
+ f"Greeting service does not support { language .name } "
97
+ )
98
+ self .greetings [language ] = greeting
59
99
previous_language , self .language = self .language , language
60
100
return previous_language
61
101
62
- @set_language .validator
63
- def validate_language (self , language : Language ) -> None :
64
- if language not in self .greetings :
65
- # 👉 In an Update validator you raise any exception to reject the Update.
66
- raise ValueError (f"{ language .name } is not supported" )
67
-
68
102
@workflow .query
69
103
def get_language (self ) -> Language :
70
104
return self .language
0 commit comments