From 0213238a3f9d6381cd0b333a137ab303c450348a Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Mon, 27 Jan 2025 08:00:17 -0800 Subject: [PATCH 1/5] sleep-for-days sample --- sleep_for_days/README.md | 18 ++++++++++++++++++ sleep_for_days/__init__.py | 1 + sleep_for_days/activities.py | 15 +++++++++++++++ sleep_for_days/starter.py | 25 +++++++++++++++++++++++++ sleep_for_days/worker.py | 23 +++++++++++++++++++++++ sleep_for_days/workflows.py | 26 ++++++++++++++++++++++++++ 6 files changed, 108 insertions(+) create mode 100644 sleep_for_days/README.md create mode 100644 sleep_for_days/__init__.py create mode 100644 sleep_for_days/activities.py create mode 100644 sleep_for_days/starter.py create mode 100644 sleep_for_days/worker.py create mode 100644 sleep_for_days/workflows.py diff --git a/sleep_for_days/README.md b/sleep_for_days/README.md new file mode 100644 index 00000000..97e565e6 --- /dev/null +++ b/sleep_for_days/README.md @@ -0,0 +1,18 @@ +# Sleep for Days + +This sample demonstrates how to use Temporal to run a workflow that periodically sleeps for a number of days. + +To run, first see the main [README.md](../../README.md) for prerequisites. + +Then create two terminals and `cd` to this directory. + +Run the worker in one terminal: + + poetry run python worker.py + +And execute the workflow in the other terminal: + + poetry run python starter.py + +This sample will run indefinitely until you send a signal to `complete`. See how to send a signal via Temporal CLI [here](https://docs.temporal.io/cli/workflow#signal). + diff --git a/sleep_for_days/__init__.py b/sleep_for_days/__init__.py new file mode 100644 index 00000000..33f064c3 --- /dev/null +++ b/sleep_for_days/__init__.py @@ -0,0 +1 @@ +TASK_QUEUE = "sleep-for-days-task-queue" \ No newline at end of file diff --git a/sleep_for_days/activities.py b/sleep_for_days/activities.py new file mode 100644 index 00000000..fa13fd55 --- /dev/null +++ b/sleep_for_days/activities.py @@ -0,0 +1,15 @@ +from dataclasses import dataclass +from temporalio import activity + +@dataclass +class SendEmailInput: + email_msg: str + +@activity.defn(name="send_email") +async def send_email(input: SendEmailInput) -> str: + """ + A stub Activity for sending an email. + """ + result = f"Email message: {input.email_msg}, sent" + print(result) + return result \ No newline at end of file diff --git a/sleep_for_days/starter.py b/sleep_for_days/starter.py new file mode 100644 index 00000000..71fc9739 --- /dev/null +++ b/sleep_for_days/starter.py @@ -0,0 +1,25 @@ +import asyncio +from typing import Optional +import uuid + +from temporalio.client import Client + +from sleep_for_days import TASK_QUEUE +from sleep_for_days.workflows import ( + SleepForDaysWorkflow, + SleepForDaysInput, +) + +async def main(client: Optional[Client] = None): + client = client or await Client.connect("localhost:7233") + wf_handle = await client.start_workflow( + SleepForDaysWorkflow.run, + SleepForDaysInput(numOfDays=3), + id=f"sleep-for-days-workflow-id-{uuid.uuid4()}", + task_queue=TASK_QUEUE, + ) + # Wait for workflow completion (runs indefinitely until it receives a signal) + print(await wf_handle.result()); + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/sleep_for_days/worker.py b/sleep_for_days/worker.py new file mode 100644 index 00000000..5b7f6dfb --- /dev/null +++ b/sleep_for_days/worker.py @@ -0,0 +1,23 @@ +import asyncio + +from temporalio.client import Client +from temporalio.worker import Worker + +from sleep_for_days import TASK_QUEUE +from sleep_for_days.activities import send_email +from sleep_for_days.workflows import SleepForDaysWorkflow + +async def main(): + client = await Client.connect("localhost:7233") + + worker = Worker( + client, + task_queue=TASK_QUEUE, + workflows=[SleepForDaysWorkflow], + activities=[send_email], + ) + + await worker.run() + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/sleep_for_days/workflows.py b/sleep_for_days/workflows.py new file mode 100644 index 00000000..a260f500 --- /dev/null +++ b/sleep_for_days/workflows.py @@ -0,0 +1,26 @@ +import asyncio +from datetime import timedelta +from temporalio import workflow +from dataclasses import dataclass +from sleep_for_days.activities import send_email, SendEmailInput + +@dataclass +class SleepForDaysInput: + numOfDays: int + +@workflow.defn(name="SleepForDaysWorkflow") +class SleepForDaysWorkflow: + + def __init__(self) -> None: + self.is_complete = False + + @workflow.run + async def run(self, input: SleepForDaysInput) -> str: + while(not self.is_complete): + await workflow.execute_activity(send_email, SendEmailInput(f"{input.numOfDays} until the next email"), start_to_close_timeout=timedelta(seconds=10)) + await asyncio.sleep(timedelta(days=input.numOfDays).seconds) + return "done!" + + @workflow.signal + def complete(self): + self.is_complete = True \ No newline at end of file From ab65e33e7ee0eb73daba01e82ecdb51e6a67c941 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Mon, 27 Jan 2025 10:24:07 -0800 Subject: [PATCH 2/5] added race between signal and sleep to terminate immediately on signal --- sleep_for_days/workflows.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sleep_for_days/workflows.py b/sleep_for_days/workflows.py index a260f500..24b86264 100644 --- a/sleep_for_days/workflows.py +++ b/sleep_for_days/workflows.py @@ -18,7 +18,10 @@ def __init__(self) -> None: async def run(self, input: SleepForDaysInput) -> str: while(not self.is_complete): await workflow.execute_activity(send_email, SendEmailInput(f"{input.numOfDays} until the next email"), start_to_close_timeout=timedelta(seconds=10)) - await asyncio.sleep(timedelta(days=input.numOfDays).seconds) + await workflow.wait([ + asyncio.create_task(workflow.sleep(input.numOfDays * 24 * 60 * 60)), + asyncio.create_task(workflow.wait_condition(lambda: self.is_complete)) + ], return_when=asyncio.FIRST_COMPLETED) return "done!" @workflow.signal From 9d330ca91c820506fe19eb1d131ece82c08130f8 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Wed, 29 Jan 2025 20:51:25 -0800 Subject: [PATCH 3/5] added test, removed input param for workflow --- sleep_for_days/__init__.py | 2 +- sleep_for_days/activities.py | 7 +++- sleep_for_days/starter.py | 12 +++--- sleep_for_days/worker.py | 4 +- sleep_for_days/workflows.py | 40 ++++++++++++-------- tests/sleep_for_days/__init__.py | 0 tests/sleep_for_days/workflow_test.py | 53 +++++++++++++++++++++++++++ 7 files changed, 91 insertions(+), 27 deletions(-) create mode 100644 tests/sleep_for_days/__init__.py create mode 100644 tests/sleep_for_days/workflow_test.py diff --git a/sleep_for_days/__init__.py b/sleep_for_days/__init__.py index 33f064c3..04611d30 100644 --- a/sleep_for_days/__init__.py +++ b/sleep_for_days/__init__.py @@ -1 +1 @@ -TASK_QUEUE = "sleep-for-days-task-queue" \ No newline at end of file +TASK_QUEUE = "sleep-for-days-task-queue" diff --git a/sleep_for_days/activities.py b/sleep_for_days/activities.py index fa13fd55..c38670f2 100644 --- a/sleep_for_days/activities.py +++ b/sleep_for_days/activities.py @@ -1,15 +1,18 @@ from dataclasses import dataclass + from temporalio import activity + @dataclass class SendEmailInput: email_msg: str -@activity.defn(name="send_email") + +@activity.defn() async def send_email(input: SendEmailInput) -> str: """ A stub Activity for sending an email. """ result = f"Email message: {input.email_msg}, sent" print(result) - return result \ No newline at end of file + return result diff --git a/sleep_for_days/starter.py b/sleep_for_days/starter.py index 71fc9739..765842b2 100644 --- a/sleep_for_days/starter.py +++ b/sleep_for_days/starter.py @@ -1,25 +1,23 @@ import asyncio -from typing import Optional import uuid +from typing import Optional from temporalio.client import Client from sleep_for_days import TASK_QUEUE -from sleep_for_days.workflows import ( - SleepForDaysWorkflow, - SleepForDaysInput, -) +from sleep_for_days.workflows import SleepForDaysWorkflow + async def main(client: Optional[Client] = None): client = client or await Client.connect("localhost:7233") wf_handle = await client.start_workflow( SleepForDaysWorkflow.run, - SleepForDaysInput(numOfDays=3), id=f"sleep-for-days-workflow-id-{uuid.uuid4()}", task_queue=TASK_QUEUE, ) # Wait for workflow completion (runs indefinitely until it receives a signal) - print(await wf_handle.result()); + print(await wf_handle.result()) + if __name__ == "__main__": asyncio.run(main()) diff --git a/sleep_for_days/worker.py b/sleep_for_days/worker.py index 5b7f6dfb..8d0ce278 100644 --- a/sleep_for_days/worker.py +++ b/sleep_for_days/worker.py @@ -7,6 +7,7 @@ from sleep_for_days.activities import send_email from sleep_for_days.workflows import SleepForDaysWorkflow + async def main(): client = await Client.connect("localhost:7233") @@ -19,5 +20,6 @@ async def main(): await worker.run() + if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file + asyncio.run(main()) diff --git a/sleep_for_days/workflows.py b/sleep_for_days/workflows.py index 24b86264..c05febcb 100644 --- a/sleep_for_days/workflows.py +++ b/sleep_for_days/workflows.py @@ -1,29 +1,37 @@ import asyncio +from dataclasses import dataclass from datetime import timedelta + from temporalio import workflow -from dataclasses import dataclass -from sleep_for_days.activities import send_email, SendEmailInput -@dataclass -class SleepForDaysInput: - numOfDays: int +with workflow.unsafe.imports_passed_through(): + from sleep_for_days.activities import SendEmailInput, send_email -@workflow.defn(name="SleepForDaysWorkflow") -class SleepForDaysWorkflow: +@workflow.defn() +class SleepForDaysWorkflow: def __init__(self) -> None: self.is_complete = False @workflow.run - async def run(self, input: SleepForDaysInput) -> str: - while(not self.is_complete): - await workflow.execute_activity(send_email, SendEmailInput(f"{input.numOfDays} until the next email"), start_to_close_timeout=timedelta(seconds=10)) - await workflow.wait([ - asyncio.create_task(workflow.sleep(input.numOfDays * 24 * 60 * 60)), - asyncio.create_task(workflow.wait_condition(lambda: self.is_complete)) - ], return_when=asyncio.FIRST_COMPLETED) + async def run(self) -> str: + while not self.is_complete: + await workflow.execute_activity( + send_email, + SendEmailInput("30 until the next email"), + start_to_close_timeout=timedelta(seconds=10), + ) + await workflow.wait( + [ + asyncio.create_task(workflow.sleep(timedelta(days=30))), + asyncio.create_task( + workflow.wait_condition(lambda: self.is_complete) + ), + ], + return_when=asyncio.FIRST_COMPLETED, + ) return "done!" - + @workflow.signal def complete(self): - self.is_complete = True \ No newline at end of file + self.is_complete = True diff --git a/tests/sleep_for_days/__init__.py b/tests/sleep_for_days/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/sleep_for_days/workflow_test.py b/tests/sleep_for_days/workflow_test.py new file mode 100644 index 00000000..fa3f0686 --- /dev/null +++ b/tests/sleep_for_days/workflow_test.py @@ -0,0 +1,53 @@ +import uuid +from datetime import timedelta + +from temporalio import activity +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + +from sleep_for_days.starter import TASK_QUEUE +from sleep_for_days.workflows import SendEmailInput, SleepForDaysWorkflow + + +async def test_sleep_for_days_workflow(): + num_activity_executions = 0 + + # Mock out the activity to count executions + @activity.defn(name="send_email") + async def send_email_mock(input: SendEmailInput) -> str: + nonlocal num_activity_executions + num_activity_executions += 1 + return input.email_msg + + async with await WorkflowEnvironment.start_time_skipping() as env: + # if env.supports_time_skipping: + # pytest.skip( + # "Java test server: https://github.com/temporalio/sdk-java/issues/1903" + # ) + async with Worker( + env.client, + task_queue=TASK_QUEUE, + workflows=[SleepForDaysWorkflow], + activities=[send_email_mock], + ): + handle = await env.client.start_workflow( + SleepForDaysWorkflow.run, + id=str(uuid.uuid4()), + task_queue=TASK_QUEUE, + ) + + start_time = await env.get_current_time() + # Time-skip 5 minutes. + await env.sleep(timedelta(minutes=5)) + # Check that the activity has been called, we're now waiting for the sleep to finish. + assert num_activity_executions == 1 + # Time-skip 3 days. + await env.sleep(timedelta(days=90)) + # Expect 3 more activity calls. + assert num_activity_executions == 4 + # Send the signal to complete the workflow. + await handle.signal(SleepForDaysWorkflow.complete) + # Expect no more activity calls to have been made - workflow is complete. + assert num_activity_executions == 4 + # Expect more than 90 days to have passed. + assert (await env.get_current_time() - start_time) > timedelta(days=90) From 8173e7244239b51b1a88ec622a4c36094872d8b3 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Thu, 30 Jan 2025 10:18:15 -0800 Subject: [PATCH 4/5] fixed text string --- sleep_for_days/workflows.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sleep_for_days/workflows.py b/sleep_for_days/workflows.py index c05febcb..eff0b273 100644 --- a/sleep_for_days/workflows.py +++ b/sleep_for_days/workflows.py @@ -18,7 +18,7 @@ async def run(self) -> str: while not self.is_complete: await workflow.execute_activity( send_email, - SendEmailInput("30 until the next email"), + SendEmailInput("30 days until the next email"), start_to_close_timeout=timedelta(seconds=10), ) await workflow.wait( From 88c6e9c845e95f7dd78ba53dad042499ed286bf5 Mon Sep 17 00:00:00 2001 From: Thomas Hardy Date: Thu, 30 Jan 2025 10:52:55 -0800 Subject: [PATCH 5/5] address pr comments --- sleep_for_days/README.md | 2 +- sleep_for_days/activities.py | 2 +- sleep_for_days/worker.py | 2 ++ 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/sleep_for_days/README.md b/sleep_for_days/README.md index 97e565e6..4ae1d2dc 100644 --- a/sleep_for_days/README.md +++ b/sleep_for_days/README.md @@ -1,6 +1,6 @@ # Sleep for Days -This sample demonstrates how to use Temporal to run a workflow that periodically sleeps for a number of days. +This sample demonstrates how to create a Temporal workflow that runs forever, sending an email every 30 days. To run, first see the main [README.md](../../README.md) for prerequisites. diff --git a/sleep_for_days/activities.py b/sleep_for_days/activities.py index c38670f2..30972098 100644 --- a/sleep_for_days/activities.py +++ b/sleep_for_days/activities.py @@ -14,5 +14,5 @@ async def send_email(input: SendEmailInput) -> str: A stub Activity for sending an email. """ result = f"Email message: {input.email_msg}, sent" - print(result) + activity.logger.info(result) return result diff --git a/sleep_for_days/worker.py b/sleep_for_days/worker.py index 8d0ce278..d03ec726 100644 --- a/sleep_for_days/worker.py +++ b/sleep_for_days/worker.py @@ -1,4 +1,5 @@ import asyncio +import logging from temporalio.client import Client from temporalio.worker import Worker @@ -22,4 +23,5 @@ async def main(): if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) asyncio.run(main())