Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert hello activities to sync #173

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
7 changes: 6 additions & 1 deletion hello/hello_activity.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from datetime import timedelta

Expand All @@ -18,7 +19,7 @@ class ComposeGreetingInput:

# Basic activity that logs and does string concatenation
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
def compose_greeting(input: ComposeGreetingInput) -> str:
activity.logger.info("Running activity with parameter %s" % input)
return f"{input.greeting}, {input.name}!"

Expand Down Expand Up @@ -50,6 +51,10 @@ async def main():
task_queue="hello-activity-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
# Non-async activities require an executor;
# a thread pool executor is recommended.
# This same thread pool could be passed to multiple workers if desired.
activity_executor=ThreadPoolExecutor(5),
Comment on lines +54 to +57
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think all of these samples are going to give warnings of:

Worker max_concurrent_activities is 100 but activity_executor's max_workers is only 5

Can you confirm you are getting these warnings? Can you up the value passed here to 100?

Copy link
Author

@GSmithApps GSmithApps Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think i got those warnings. (Chad and I subsequently discussed off-PR)

Or I can just go ahead and up it to 100 anyway

Copy link
Member

@cretz cretz Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, I wonder why they aren't showing, we may have to investigate, but lack of warnings appearing is not a blocker for this issue of course

):

# While the worker is running, use the client to run the workflow and
Expand Down
71 changes: 71 additions & 0 deletions hello/hello_activity_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import asyncio
from dataclasses import dataclass
from datetime import timedelta

from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker


# While we could use multiple parameters in the activity, Temporal strongly
# encourages using a single dataclass instead which can have fields added to it
# in a backwards-compatible way.
@dataclass
class ComposeGreetingInput:
greeting: str
name: str


# Basic activity that logs and does string concatenation
@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
activity.logger.info("Running activity with parameter %s" % input)
return f"{input.greeting}, {input.name}!"


# Basic workflow that logs and invokes an activity
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
workflow.logger.info("Running workflow with parameter %s" % name)
return await workflow.execute_activity(
compose_greeting,
ComposeGreetingInput("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)


async def main():
# Uncomment the lines below to see logging output
# import logging
# logging.basicConfig(level=logging.INFO)

# Start client
client = await Client.connect("localhost:7233")

# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-activity-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
# If the worker is only running async activities, you don't need
# to supply an activity executor because they run in
# the worker's event loop.
):

# While the worker is running, use the client to run the workflow and
# print out its result. Note, in many production setups, the client
# would be in a completely separate process from the worker.
result = await client.execute_workflow(
GreetingWorkflow.run,
"World",
id="hello-activity-workflow-id",
task_queue="hello-activity-task-queue",
)
print(f"Result: {result}")


if __name__ == "__main__":
asyncio.run(main())
10 changes: 6 additions & 4 deletions hello/hello_activity_choice.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from datetime import timedelta
from enum import IntEnum
Expand All @@ -12,22 +13,22 @@


@activity.defn
async def order_apples(amount: int) -> str:
def order_apples(amount: int) -> str:
return f"Ordered {amount} Apples..."


@activity.defn
async def order_bananas(amount: int) -> str:
def order_bananas(amount: int) -> str:
return f"Ordered {amount} Bananas..."


@activity.defn
async def order_cherries(amount: int) -> str:
def order_cherries(amount: int) -> str:
return f"Ordered {amount} Cherries..."


@activity.defn
async def order_oranges(amount: int) -> str:
def order_oranges(amount: int) -> str:
return f"Ordered {amount} Oranges..."


Expand Down Expand Up @@ -88,6 +89,7 @@ async def main():
task_queue="hello-activity-choice-task-queue",
workflows=[PurchaseFruitsWorkflow],
activities=[order_apples, order_bananas, order_cherries, order_oranges],
activity_executor=ThreadPoolExecutor(5),
):

# While the worker is running, use the client to run the workflow and
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import asyncio
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
Expand All @@ -21,7 +20,7 @@ def compose_greeting(input: ComposeGreetingInput) -> str:
# We'll wait for 3 seconds, heartbeating in between (like all long-running
# activities should do), then return the greeting
for _ in range(0, 3):
print(f"Heartbeating activity on thread {threading.get_ident()}")
print(f"Heartbeating activity")
activity.heartbeat()
time.sleep(1)
return f"{input.greeting}, {input.name}!"
Expand All @@ -47,12 +46,9 @@ async def main():
# Run a worker for the workflow
async with Worker(
client,
task_queue="hello-activity-threaded-task-queue",
task_queue="hello-activity-heartbeating-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
# Synchronous activities are not allowed unless we provide some kind of
# executor. This same thread pool could be passed to multiple workers if
# desired.
activity_executor=ThreadPoolExecutor(5),
):

Expand All @@ -62,10 +58,10 @@ async def main():
result = await client.execute_workflow(
GreetingWorkflow.run,
"World",
id="hello-activity-threaded-workflow-id",
task_queue="hello-activity-threaded-task-queue",
id="hello-activity-heartbeating-workflow-id",
task_queue="hello-activity-heartbeating-task-queue",
)
print(f"Result on thread {threading.get_ident()}: {result}")
print(f"Result: {result}")


if __name__ == "__main__":
Expand Down
4 changes: 3 additions & 1 deletion hello/hello_activity_retry.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from datetime import timedelta

Expand All @@ -15,7 +16,7 @@ class ComposeGreetingInput:


@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
def compose_greeting(input: ComposeGreetingInput) -> str:
print(f"Invoking activity, attempt number {activity.info().attempt}")
# Fail the first 3 attempts, succeed the 4th
if activity.info().attempt < 4:
Expand Down Expand Up @@ -52,6 +53,7 @@ async def main():
task_queue="hello-activity-retry-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
activity_executor=ThreadPoolExecutor(5),
):

# While the worker is running, use the client to run the workflow and
Expand Down
23 changes: 13 additions & 10 deletions hello/hello_async_activity_completion.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from datetime import timedelta
from threading import Thread

from temporalio import activity, workflow
from temporalio.client import Client
Expand All @@ -18,7 +21,7 @@ def __init__(self, client: Client) -> None:
self.client = client

@activity.defn
async def compose_greeting(self, input: ComposeGreetingInput) -> str:
def compose_greeting(self, input: ComposeGreetingInput) -> str:
# Schedule a task to complete this asynchronously. This could be done in
# a completely different process or system.
print("Completing activity asynchronously")
Expand All @@ -27,29 +30,28 @@ async def compose_greeting(self, input: ComposeGreetingInput) -> str:
# So we store the tasks ourselves.
# See https://docs.python.org/3/library/asyncio-task.html#creating-tasks,
# https://bugs.python.org/issue21163 and others.
_ = asyncio.create_task(
self.complete_greeting(activity.info().task_token, input)
)
Thread(
target=self.complete_greeting,
args=(activity.info().task_token, input),
).start()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this definitely our recommendation or is this a case where it feels more natural for the user to use an asyncio task than to have to explicitly create a thread, seeing as they use asyncio elsewhere with the SDK?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole approach is not our recommendation (to just async complete in the background and raise complete). It's meant to demonstrate that it can be done somewhere else. So hacking in a whole new thread like this is probably ok since it's a demonstration of something you would never do anyways, but I would suggest removing the asyncio comments above that are asyncio specific and don't apply to this new code.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm okay either way on this! Up to you folks. I went ahead and removed the comments though ✅


# Raise the complete-async error which will complete this function but
# does not consider the activity complete from the workflow perspective
activity.raise_complete_async()

async def complete_greeting(
self, task_token: bytes, input: ComposeGreetingInput
) -> None:
def complete_greeting(self, task_token: bytes, input: ComposeGreetingInput) -> None:
# Let's wait three seconds, heartbeating each second. Note, heartbeating
# during async activity completion is done via the client directly. It
# is often important to heartbeat so the server can know when an
# activity has crashed.
handle = self.client.get_async_activity_handle(task_token=task_token)
for _ in range(0, 3):
print("Waiting one second...")
await handle.heartbeat()
await asyncio.sleep(1)
asyncio.run(handle.heartbeat())
time.sleep(1)

# Complete using the handle
await handle.complete(f"{input.greeting}, {input.name}!")
asyncio.run(handle.complete(f"{input.greeting}, {input.name}!"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using asyncio.run twice in a function in a expositional example surprises me (but I'm new to the sync vs async activity recommendation discussion)

Copy link
Member

@cretz cretz Apr 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is actually bad and a bug. This can cause our client to run on two different event loops which is wrong (but may technically work most of the time). To do this right, you should pass the event loop alongside the client in the constructor of this class, and then use run_coroutine_threadsafe on the loop per https://docs.python.org/3/library/asyncio-dev.html#asyncio-multithreading.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks so much for catching this! Should be fixed 👍



@workflow.defn
Expand Down Expand Up @@ -77,6 +79,7 @@ async def main():
task_queue="hello-async-activity-completion-task-queue",
workflows=[GreetingWorkflow],
activities=[composer.compose_greeting],
activity_executor=ThreadPoolExecutor(5),
):

# While the worker is running, use the client to run the workflow and
Expand Down
12 changes: 8 additions & 4 deletions hello/hello_cancellation.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,32 @@
import asyncio
import time
import traceback
from concurrent.futures import ThreadPoolExecutor
from datetime import timedelta
from typing import NoReturn

from temporalio import activity, workflow
from temporalio.client import Client, WorkflowFailureError
from temporalio.exceptions import CancelledError
from temporalio.worker import Worker


@activity.defn
async def never_complete_activity() -> NoReturn:
def never_complete_activity() -> NoReturn:
# All long-running activities should heartbeat. Heartbeat is how
# cancellation is delivered from the server.
try:
while True:
print("Heartbeating activity")
activity.heartbeat()
await asyncio.sleep(1)
except asyncio.CancelledError:
time.sleep(1)
except CancelledError:
print("Activity cancelled")
raise


@activity.defn
async def cleanup_activity() -> None:
def cleanup_activity() -> None:
print("Executing cleanup activity")


Expand Down Expand Up @@ -56,6 +59,7 @@ async def main():
task_queue="hello-cancellation-task-queue",
workflows=[CancellationWorkflow],
activities=[never_complete_activity, cleanup_activity],
activity_executor=ThreadPoolExecutor(5),
):

# While the worker is running, use the client to start the workflow.
Expand Down
4 changes: 3 additions & 1 deletion hello/hello_cron.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from datetime import timedelta

Expand All @@ -14,7 +15,7 @@ class ComposeGreetingInput:


@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
def compose_greeting(input: ComposeGreetingInput) -> str:
return f"{input.greeting}, {input.name}!"


Expand All @@ -40,6 +41,7 @@ async def main():
task_queue="hello-cron-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
activity_executor=ThreadPoolExecutor(5),
):

print("Running workflow once a minute")
Expand Down
4 changes: 3 additions & 1 deletion hello/hello_exception.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from datetime import timedelta
from typing import NoReturn, Optional
Expand All @@ -18,7 +19,7 @@ class ComposeGreetingInput:


@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> NoReturn:
def compose_greeting(input: ComposeGreetingInput) -> NoReturn:
# Always raise exception
raise RuntimeError(f"Greeting exception: {input.greeting}, {input.name}!")

Expand Down Expand Up @@ -46,6 +47,7 @@ async def main():
task_queue="hello-exception-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
activity_executor=ThreadPoolExecutor(5),
):

# While the worker is running, use the client to run the workflow and
Expand Down
4 changes: 3 additions & 1 deletion hello/hello_local_activity.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from datetime import timedelta

Expand All @@ -14,7 +15,7 @@ class ComposeGreetingInput:


@activity.defn
async def compose_greeting(input: ComposeGreetingInput) -> str:
def compose_greeting(input: ComposeGreetingInput) -> str:
return f"{input.greeting}, {input.name}!"


Expand All @@ -39,6 +40,7 @@ async def main():
task_queue="hello-local-activity-task-queue",
workflows=[GreetingWorkflow],
activities=[compose_greeting],
activity_executor=ThreadPoolExecutor(5),
):

# While the worker is running, use the client to run the workflow and
Expand Down
Loading
Loading