Skip to content

Commit e942b93

Browse files
committed
Serialized, non-ordered handling of n messages
1 parent 4a4ab8c commit e942b93

File tree

1 file changed

+120
-0
lines changed

1 file changed

+120
-0
lines changed
+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
import asyncio
2+
import logging
3+
from datetime import timedelta
4+
from typing import Optional
5+
6+
from temporalio import activity, common, workflow
7+
from temporalio.client import Client, WorkflowHandle
8+
from temporalio.worker import Worker
9+
10+
Arg = str
11+
Result = str
12+
13+
# Problem:
14+
# -------
15+
# - Your workflow receives an unbounded number of updates.
16+
# - Each update must be processed by calling two activities.
17+
# - The next update may not start processing until the previous has completed.
18+
19+
# Solution:
20+
# --------
21+
# Enqueue updates, and process items from the queue in a single coroutine (the main workflow
22+
# coroutine).
23+
24+
# Discussion:
25+
# ----------
26+
# The queue is necessary because Temporal's async update & signal handlers will interleave if they
27+
# contain multiple yield points.
28+
29+
30+
class Queue(asyncio.Queue[tuple[Arg, asyncio.Future[Result]]]):
31+
def __init__(self, serialized_queue_state: list[Arg]) -> None:
32+
super().__init__()
33+
for arg in serialized_queue_state:
34+
self.put_nowait((arg, asyncio.Future()))
35+
36+
def serialize(self) -> list[Arg]:
37+
args = []
38+
while True:
39+
try:
40+
args.append(self.get_nowait())
41+
except asyncio.QueueEmpty:
42+
return args
43+
44+
45+
@workflow.defn
46+
class MessageProcessor:
47+
48+
@workflow.run
49+
async def run(self, serialized_queue_state: Optional[list[Arg]] = None):
50+
print(f"run(): serialized_queue_state = {serialized_queue_state}")
51+
self.queue = Queue(serialized_queue_state or [])
52+
while True:
53+
arg, fut = await self.queue.get()
54+
fut.set_result(await self.process_task(arg))
55+
if workflow.info().is_continue_as_new_suggested():
56+
# Footgun: If we don't let the event loop tick, then CAN will end the workflow
57+
# before the update handler is notified that the result future has completed.
58+
# See https://github.com/temporalio/features/issues/481
59+
await asyncio.sleep(0) # Let update handler complete
60+
return workflow.continue_as_new(args=[self.queue.serialize()])
61+
62+
# Note: handler must be async if we are both enqueuing, and returning an update result
63+
# => We could add SDK APIs to manually complete updates.
64+
@workflow.update
65+
async def add_task(self, arg: Arg) -> Result:
66+
# Footgun: handler must wait for workflow initialization
67+
# See https://github.com/temporalio/features/issues/400
68+
await workflow.wait_condition(lambda: hasattr(self, "queue"))
69+
fut = asyncio.Future[Result]()
70+
self.queue.put_nowait((arg, fut)) # Note: update validation gates enqueue
71+
return await fut
72+
73+
async def process_task(self, arg):
74+
t1, t2 = [
75+
await workflow.execute_activity(
76+
get_current_time, start_to_close_timeout=timedelta(seconds=10)
77+
)
78+
for _ in range(2)
79+
]
80+
return f"{arg}-result-{t1}-{t2}"
81+
82+
83+
time = 0
84+
85+
86+
@activity.defn
87+
async def get_current_time() -> int:
88+
global time
89+
time += 1
90+
return time
91+
92+
93+
async def app(wf: WorkflowHandle):
94+
for i in range(20):
95+
print(f"app(): sending update {i}")
96+
result = await wf.execute_update(MessageProcessor.add_task, f"arg {i}")
97+
print(f"app(): {result}")
98+
99+
100+
async def main():
101+
client = await Client.connect("localhost:7233")
102+
103+
async with Worker(
104+
client,
105+
task_queue="tq",
106+
workflows=[MessageProcessor],
107+
activities=[get_current_time],
108+
):
109+
wf = await client.start_workflow(
110+
MessageProcessor.run,
111+
id="wid",
112+
task_queue="tq",
113+
id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING,
114+
)
115+
await asyncio.gather(app(wf), wf.result())
116+
117+
118+
if __name__ == "__main__":
119+
logging.basicConfig(level=logging.INFO)
120+
asyncio.run(main())

0 commit comments

Comments
 (0)