diff --git a/hello/hello_activity.py b/hello/hello_activity.py index 9801ee1b..13b5fcbb 100644 --- a/hello/hello_activity.py +++ b/hello/hello_activity.py @@ -1,4 +1,5 @@ import asyncio +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import timedelta @@ -18,7 +19,7 @@ class ComposeGreetingInput: # Basic activity that logs and does string concatenation @activity.defn -async def compose_greeting(input: ComposeGreetingInput) -> str: +def compose_greeting(input: ComposeGreetingInput) -> str: activity.logger.info("Running activity with parameter %s" % input) return f"{input.greeting}, {input.name}!" @@ -50,6 +51,10 @@ async def main(): task_queue="hello-activity-task-queue", workflows=[GreetingWorkflow], activities=[compose_greeting], + # Non-async activities require an executor; + # a thread pool executor is recommended. + # This same thread pool could be passed to multiple workers if desired. + activity_executor=ThreadPoolExecutor(5), ): # While the worker is running, use the client to run the workflow and diff --git a/hello/hello_activity_async.py b/hello/hello_activity_async.py new file mode 100644 index 00000000..fd14a2cf --- /dev/null +++ b/hello/hello_activity_async.py @@ -0,0 +1,71 @@ +import asyncio +from dataclasses import dataclass +from datetime import timedelta + +from temporalio import activity, workflow +from temporalio.client import Client +from temporalio.worker import Worker + + +# While we could use multiple parameters in the activity, Temporal strongly +# encourages using a single dataclass instead which can have fields added to it +# in a backwards-compatible way. +@dataclass +class ComposeGreetingInput: + greeting: str + name: str + + +# Basic activity that logs and does string concatenation +@activity.defn +async def compose_greeting(input: ComposeGreetingInput) -> str: + activity.logger.info("Running activity with parameter %s" % input) + return f"{input.greeting}, {input.name}!" + + +# Basic workflow that logs and invokes an activity +@workflow.defn +class GreetingWorkflow: + @workflow.run + async def run(self, name: str) -> str: + workflow.logger.info("Running workflow with parameter %s" % name) + return await workflow.execute_activity( + compose_greeting, + ComposeGreetingInput("Hello", name), + start_to_close_timeout=timedelta(seconds=10), + ) + + +async def main(): + # Uncomment the lines below to see logging output + # import logging + # logging.basicConfig(level=logging.INFO) + + # Start client + client = await Client.connect("localhost:7233") + + # Run a worker for the workflow + async with Worker( + client, + task_queue="hello-activity-task-queue", + workflows=[GreetingWorkflow], + activities=[compose_greeting], + # If the worker is only running async activities, you don't need + # to supply an activity executor because they run in + # the worker's event loop. + ): + + # While the worker is running, use the client to run the workflow and + # print out its result. Note, in many production setups, the client + # would be in a completely separate process from the worker. + result = await client.execute_workflow( + GreetingWorkflow.run, + "World", + id="hello-activity-workflow-id", + task_queue="hello-activity-task-queue", + ) + print(f"Result: {result}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/hello/hello_activity_choice.py b/hello/hello_activity_choice.py index 6d15af53..7d01b019 100644 --- a/hello/hello_activity_choice.py +++ b/hello/hello_activity_choice.py @@ -1,4 +1,5 @@ import asyncio +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import timedelta from enum import IntEnum @@ -12,22 +13,22 @@ @activity.defn -async def order_apples(amount: int) -> str: +def order_apples(amount: int) -> str: return f"Ordered {amount} Apples..." @activity.defn -async def order_bananas(amount: int) -> str: +def order_bananas(amount: int) -> str: return f"Ordered {amount} Bananas..." @activity.defn -async def order_cherries(amount: int) -> str: +def order_cherries(amount: int) -> str: return f"Ordered {amount} Cherries..." @activity.defn -async def order_oranges(amount: int) -> str: +def order_oranges(amount: int) -> str: return f"Ordered {amount} Oranges..." @@ -88,6 +89,7 @@ async def main(): task_queue="hello-activity-choice-task-queue", workflows=[PurchaseFruitsWorkflow], activities=[order_apples, order_bananas, order_cherries, order_oranges], + activity_executor=ThreadPoolExecutor(5), ): # While the worker is running, use the client to run the workflow and diff --git a/hello/hello_activity_threaded.py b/hello/hello_activity_heartbeat.py similarity index 77% rename from hello/hello_activity_threaded.py rename to hello/hello_activity_heartbeat.py index e7bf8344..230621d3 100644 --- a/hello/hello_activity_threaded.py +++ b/hello/hello_activity_heartbeat.py @@ -1,5 +1,4 @@ import asyncio -import threading import time from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass @@ -21,7 +20,7 @@ def compose_greeting(input: ComposeGreetingInput) -> str: # We'll wait for 3 seconds, heartbeating in between (like all long-running # activities should do), then return the greeting for _ in range(0, 3): - print(f"Heartbeating activity on thread {threading.get_ident()}") + print(f"Heartbeating activity") activity.heartbeat() time.sleep(1) return f"{input.greeting}, {input.name}!" @@ -47,12 +46,9 @@ async def main(): # Run a worker for the workflow async with Worker( client, - task_queue="hello-activity-threaded-task-queue", + task_queue="hello-activity-heartbeating-task-queue", workflows=[GreetingWorkflow], activities=[compose_greeting], - # Synchronous activities are not allowed unless we provide some kind of - # executor. This same thread pool could be passed to multiple workers if - # desired. activity_executor=ThreadPoolExecutor(5), ): @@ -62,10 +58,10 @@ async def main(): result = await client.execute_workflow( GreetingWorkflow.run, "World", - id="hello-activity-threaded-workflow-id", - task_queue="hello-activity-threaded-task-queue", + id="hello-activity-heartbeating-workflow-id", + task_queue="hello-activity-heartbeating-task-queue", ) - print(f"Result on thread {threading.get_ident()}: {result}") + print(f"Result: {result}") if __name__ == "__main__": diff --git a/hello/hello_activity_retry.py b/hello/hello_activity_retry.py index 233f9613..f1acd529 100644 --- a/hello/hello_activity_retry.py +++ b/hello/hello_activity_retry.py @@ -1,4 +1,5 @@ import asyncio +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import timedelta @@ -15,7 +16,7 @@ class ComposeGreetingInput: @activity.defn -async def compose_greeting(input: ComposeGreetingInput) -> str: +def compose_greeting(input: ComposeGreetingInput) -> str: print(f"Invoking activity, attempt number {activity.info().attempt}") # Fail the first 3 attempts, succeed the 4th if activity.info().attempt < 4: @@ -52,6 +53,7 @@ async def main(): task_queue="hello-activity-retry-task-queue", workflows=[GreetingWorkflow], activities=[compose_greeting], + activity_executor=ThreadPoolExecutor(5), ): # While the worker is running, use the client to run the workflow and diff --git a/hello/hello_async_activity_completion.py b/hello/hello_async_activity_completion.py index 10aa89df..22fc10f5 100644 --- a/hello/hello_async_activity_completion.py +++ b/hello/hello_async_activity_completion.py @@ -1,6 +1,9 @@ import asyncio +import time +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import timedelta +from threading import Thread from temporalio import activity, workflow from temporalio.client import Client @@ -14,30 +17,25 @@ class ComposeGreetingInput: class GreetingComposer: - def __init__(self, client: Client) -> None: + def __init__(self, client: Client, loop: asyncio.AbstractEventLoop) -> None: self.client = client + self.loop = loop @activity.defn - async def compose_greeting(self, input: ComposeGreetingInput) -> str: - # Schedule a task to complete this asynchronously. This could be done in + def compose_greeting(self, input: ComposeGreetingInput) -> str: + # Make a thread to complete this externally. This could be done in # a completely different process or system. print("Completing activity asynchronously") - # Tasks stored by asyncio are weak references and therefore can get GC'd - # which can cause warnings like "Task was destroyed but it is pending!". - # So we store the tasks ourselves. - # See https://docs.python.org/3/library/asyncio-task.html#creating-tasks, - # https://bugs.python.org/issue21163 and others. - _ = asyncio.create_task( - self.complete_greeting(activity.info().task_token, input) - ) + Thread( + target=self.complete_greeting, + args=(activity.info().task_token, input), + ).start() # Raise the complete-async error which will complete this function but # does not consider the activity complete from the workflow perspective activity.raise_complete_async() - async def complete_greeting( - self, task_token: bytes, input: ComposeGreetingInput - ) -> None: + def complete_greeting(self, task_token: bytes, input: ComposeGreetingInput) -> None: # Let's wait three seconds, heartbeating each second. Note, heartbeating # during async activity completion is done via the client directly. It # is often important to heartbeat so the server can know when an @@ -45,11 +43,13 @@ async def complete_greeting( handle = self.client.get_async_activity_handle(task_token=task_token) for _ in range(0, 3): print("Waiting one second...") - await handle.heartbeat() - await asyncio.sleep(1) + asyncio.run_coroutine_threadsafe(handle.heartbeat(), self.loop).result() + time.sleep(1) # Complete using the handle - await handle.complete(f"{input.greeting}, {input.name}!") + asyncio.run_coroutine_threadsafe( + handle.complete(f"{input.greeting}, {input.name}!"), self.loop + ).result() @workflow.defn @@ -70,13 +70,16 @@ async def main(): # Start client client = await Client.connect("localhost:7233") + loop = asyncio.get_event_loop() + # Run a worker for the workflow - composer = GreetingComposer(client) + composer = GreetingComposer(client, loop) async with Worker( client, task_queue="hello-async-activity-completion-task-queue", workflows=[GreetingWorkflow], activities=[composer.compose_greeting], + activity_executor=ThreadPoolExecutor(5), ): # While the worker is running, use the client to run the workflow and diff --git a/hello/hello_cancellation.py b/hello/hello_cancellation.py index 3467893c..5bf38a66 100644 --- a/hello/hello_cancellation.py +++ b/hello/hello_cancellation.py @@ -1,29 +1,32 @@ import asyncio +import time import traceback +from concurrent.futures import ThreadPoolExecutor from datetime import timedelta from typing import NoReturn from temporalio import activity, workflow from temporalio.client import Client, WorkflowFailureError +from temporalio.exceptions import CancelledError from temporalio.worker import Worker @activity.defn -async def never_complete_activity() -> NoReturn: +def never_complete_activity() -> NoReturn: # All long-running activities should heartbeat. Heartbeat is how # cancellation is delivered from the server. try: while True: print("Heartbeating activity") activity.heartbeat() - await asyncio.sleep(1) - except asyncio.CancelledError: + time.sleep(1) + except CancelledError: print("Activity cancelled") raise @activity.defn -async def cleanup_activity() -> None: +def cleanup_activity() -> None: print("Executing cleanup activity") @@ -56,6 +59,7 @@ async def main(): task_queue="hello-cancellation-task-queue", workflows=[CancellationWorkflow], activities=[never_complete_activity, cleanup_activity], + activity_executor=ThreadPoolExecutor(5), ): # While the worker is running, use the client to start the workflow. diff --git a/hello/hello_cron.py b/hello/hello_cron.py index 68b26099..dbb5cba6 100644 --- a/hello/hello_cron.py +++ b/hello/hello_cron.py @@ -1,4 +1,5 @@ import asyncio +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import timedelta @@ -14,7 +15,7 @@ class ComposeGreetingInput: @activity.defn -async def compose_greeting(input: ComposeGreetingInput) -> str: +def compose_greeting(input: ComposeGreetingInput) -> str: return f"{input.greeting}, {input.name}!" @@ -40,6 +41,7 @@ async def main(): task_queue="hello-cron-task-queue", workflows=[GreetingWorkflow], activities=[compose_greeting], + activity_executor=ThreadPoolExecutor(5), ): print("Running workflow once a minute") diff --git a/hello/hello_exception.py b/hello/hello_exception.py index bfb198d5..628c10c5 100644 --- a/hello/hello_exception.py +++ b/hello/hello_exception.py @@ -1,5 +1,6 @@ import asyncio import logging +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import timedelta from typing import NoReturn, Optional @@ -18,7 +19,7 @@ class ComposeGreetingInput: @activity.defn -async def compose_greeting(input: ComposeGreetingInput) -> NoReturn: +def compose_greeting(input: ComposeGreetingInput) -> NoReturn: # Always raise exception raise RuntimeError(f"Greeting exception: {input.greeting}, {input.name}!") @@ -46,6 +47,7 @@ async def main(): task_queue="hello-exception-task-queue", workflows=[GreetingWorkflow], activities=[compose_greeting], + activity_executor=ThreadPoolExecutor(5), ): # While the worker is running, use the client to run the workflow and diff --git a/hello/hello_local_activity.py b/hello/hello_local_activity.py index 08c1d9f2..374c29c5 100644 --- a/hello/hello_local_activity.py +++ b/hello/hello_local_activity.py @@ -1,4 +1,5 @@ import asyncio +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import timedelta @@ -14,7 +15,7 @@ class ComposeGreetingInput: @activity.defn -async def compose_greeting(input: ComposeGreetingInput) -> str: +def compose_greeting(input: ComposeGreetingInput) -> str: return f"{input.greeting}, {input.name}!" @@ -39,6 +40,7 @@ async def main(): task_queue="hello-local-activity-task-queue", workflows=[GreetingWorkflow], activities=[compose_greeting], + activity_executor=ThreadPoolExecutor(5), ): # While the worker is running, use the client to run the workflow and diff --git a/hello/hello_mtls.py b/hello/hello_mtls.py index 1ac6f9d8..3ed15354 100644 --- a/hello/hello_mtls.py +++ b/hello/hello_mtls.py @@ -1,5 +1,6 @@ import argparse import asyncio +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import timedelta from typing import Optional @@ -18,7 +19,7 @@ class ComposeGreetingInput: # Basic activity that logs and does string concatenation @activity.defn -async def compose_greeting(input: ComposeGreetingInput) -> str: +def compose_greeting(input: ComposeGreetingInput) -> str: return f"{input.greeting}, {input.name}!" @@ -79,6 +80,7 @@ async def main(): task_queue="hello-mtls-task-queue", workflows=[GreetingWorkflow], activities=[compose_greeting], + activity_executor=ThreadPoolExecutor(5), ): # While the worker is running, use the client to run the workflow and diff --git a/hello/hello_parallel_activity.py b/hello/hello_parallel_activity.py index 9de09846..b32b02bb 100644 --- a/hello/hello_parallel_activity.py +++ b/hello/hello_parallel_activity.py @@ -1,4 +1,5 @@ import asyncio +from concurrent.futures import ThreadPoolExecutor from datetime import timedelta from typing import List @@ -8,7 +9,7 @@ @activity.defn -async def say_hello_activity(name: str) -> str: +def say_hello_activity(name: str) -> str: return f"Hello, {name}!" @@ -48,6 +49,7 @@ async def main(): task_queue="hello-parallel-activity-task-queue", workflows=[SayHelloWorkflow], activities=[say_hello_activity], + activity_executor=ThreadPoolExecutor(10), ): # While the worker is running, use the client to run the workflow and diff --git a/hello/hello_patch.py b/hello/hello_patch.py index 0ed19b2f..e511ad5b 100644 --- a/hello/hello_patch.py +++ b/hello/hello_patch.py @@ -1,5 +1,6 @@ import asyncio import sys +from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass from datetime import timedelta @@ -19,7 +20,7 @@ class ComposeGreetingInput: # Basic activity that logs and does string concatenation @activity.defn -async def compose_greeting(input: ComposeGreetingInput) -> str: +def compose_greeting(input: ComposeGreetingInput) -> str: activity.logger.info("Running activity with parameter %s" % input) return f"{input.greeting}, {input.name}!" @@ -123,6 +124,7 @@ async def main(): task_queue="hello-patch-task-queue", workflows=[workflow_class], # type: ignore activities=[compose_greeting], + activity_executor=ThreadPoolExecutor(5), ): try: result = await client.execute_workflow(