|
| 1 | +import asyncio |
| 2 | +import logging |
| 3 | +import random |
| 4 | +from typing import Optional |
| 5 | + |
| 6 | +from temporalio import common, workflow |
| 7 | +from temporalio.client import Client, WorkflowHandle |
| 8 | +from temporalio.worker import Worker |
| 9 | + |
| 10 | +Payload = str |
| 11 | + |
| 12 | + |
| 13 | +@workflow.defn |
| 14 | +class MessageProcessor: |
| 15 | + def __init__(self) -> None: |
| 16 | + self.queue = asyncio.Queue() |
| 17 | + |
| 18 | + @workflow.run |
| 19 | + async def run(self, serialized_queue_state: Optional[SerializedQueueState] = None): |
| 20 | + # Initialize workflow state after CAN. Note that handler is sync, so it cannot wait for |
| 21 | + # workflow initialization. |
| 22 | + if serialized_queue_state: |
| 23 | + self.queue.update_from_serialized_state(serialized_queue_state) |
| 24 | + while True: |
| 25 | + workflow.logger.info(f"waiting for msg {self.queue.head + 1}") |
| 26 | + payload = await self.queue.next() |
| 27 | + workflow.logger.info(payload) |
| 28 | + if workflow.info().is_continue_as_new_suggested(): |
| 29 | + workflow.logger.info("CAN") |
| 30 | + workflow.continue_as_new(args=[self.queue.serialize()]) |
| 31 | + |
| 32 | + # Note: sync handler |
| 33 | + @workflow.update |
| 34 | + def process_message(self, sequence_number: int, payload: Payload): |
| 35 | + self.queue.add(payload, sequence_number) |
| 36 | + |
| 37 | + |
| 38 | +async def app(wf: WorkflowHandle): |
| 39 | + for i in range(100): |
| 40 | + print(f"sending update {i}") |
| 41 | + await wf.execute_update( |
| 42 | + MessageProcessor.process_message, args=[i, f"payload {i}"] |
| 43 | + ) |
| 44 | + |
| 45 | + |
| 46 | +async def main(): |
| 47 | + client = await Client.connect("localhost:7233") |
| 48 | + |
| 49 | + async with Worker( |
| 50 | + client, |
| 51 | + task_queue="tq", |
| 52 | + workflows=[MessageProcessor], |
| 53 | + ): |
| 54 | + wf = await client.start_workflow( |
| 55 | + MessageProcessor.run, |
| 56 | + id="wid", |
| 57 | + task_queue="tq", |
| 58 | + id_reuse_policy=common.WorkflowIDReusePolicy.TERMINATE_IF_RUNNING, |
| 59 | + ) |
| 60 | + await asyncio.gather(app(wf), wf.result()) |
| 61 | + |
| 62 | + |
| 63 | +if __name__ == "__main__": |
| 64 | + logging.basicConfig(level=logging.INFO) |
| 65 | + asyncio.run(main()) |
0 commit comments