-
Notifications
You must be signed in to change notification settings - Fork 64
/
Copy pathstarter.py
78 lines (66 loc) · 2.56 KB
/
starter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import asyncio
import uuid
from typing import Optional, Tuple
from temporalio import common
from temporalio.client import (
Client,
WithStartWorkflowOperation,
WorkflowHandle,
WorkflowUpdateFailedError,
)
from temporalio.exceptions import ApplicationError
from message_passing.update_with_start.lazy_initialization import TASK_QUEUE
from message_passing.update_with_start.lazy_initialization.workflows import (
ShoppingCartItem,
ShoppingCartWorkflow,
)
async def handle_add_item_request(
session_id: str, item_id: str, quantity: int, temporal_client: Client
) -> Tuple[Optional[int], WorkflowHandle]:
"""
Handle a client request to add an item to the shopping cart. The user is not logged in, but a session ID is
available from a cookie, and we use this as the cart ID. The Temporal client was created at service-start
time and is shared by all request handlers.
A Workflow Type exists that can be used to represent a shopping cart. The method uses update-with-start to
add an item to the shopping cart, creating the cart if it doesn't already exist.
Note that the workflow handle is available, even if the Update fails.
"""
cart_id = f"cart-{session_id}"
start_op = WithStartWorkflowOperation(
ShoppingCartWorkflow.run,
id=cart_id,
id_conflict_policy=common.WorkflowIDConflictPolicy.USE_EXISTING,
task_queue=TASK_QUEUE,
)
try:
price = await temporal_client.execute_update_with_start_workflow(
ShoppingCartWorkflow.add_item,
ShoppingCartItem(sku=item_id, quantity=quantity),
start_workflow_operation=start_op,
)
except WorkflowUpdateFailedError as err:
if (
isinstance(err.cause, ApplicationError)
and err.cause.type == "ItemUnavailableError"
):
price = None
else:
raise err
workflow_handle = await start_op.workflow_handle()
return price, workflow_handle
async def main():
print("🛒")
session_id = f"session-{uuid.uuid4()}"
temporal_client = await Client.connect("localhost:7233")
subtotal_1, _ = await handle_add_item_request(
session_id, "sku-123", 1, temporal_client
)
subtotal_2, wf_handle = await handle_add_item_request(
session_id, "sku-456", 1, temporal_client
)
print(f"subtotals were, {[subtotal_1, subtotal_2]}")
await wf_handle.signal(ShoppingCartWorkflow.checkout)
final_order = await wf_handle.result()
print(f"final order: {final_order}")
if __name__ == "__main__":
asyncio.run(main())