diff --git a/sleep_for_days/README.md b/sleep_for_days/README.md new file mode 100644 index 00000000..4ae1d2dc --- /dev/null +++ b/sleep_for_days/README.md @@ -0,0 +1,18 @@ +# Sleep for Days + +This sample demonstrates how to create a Temporal workflow that runs forever, sending an email every 30 days. + +To run, first see the main [README.md](../../README.md) for prerequisites. + +Then create two terminals and `cd` to this directory. + +Run the worker in one terminal: + + poetry run python worker.py + +And execute the workflow in the other terminal: + + poetry run python starter.py + +This sample will run indefinitely until you send a signal to `complete`. See how to send a signal via Temporal CLI [here](https://docs.temporal.io/cli/workflow#signal). + diff --git a/sleep_for_days/__init__.py b/sleep_for_days/__init__.py new file mode 100644 index 00000000..04611d30 --- /dev/null +++ b/sleep_for_days/__init__.py @@ -0,0 +1 @@ +TASK_QUEUE = "sleep-for-days-task-queue" diff --git a/sleep_for_days/activities.py b/sleep_for_days/activities.py new file mode 100644 index 00000000..30972098 --- /dev/null +++ b/sleep_for_days/activities.py @@ -0,0 +1,18 @@ +from dataclasses import dataclass + +from temporalio import activity + + +@dataclass +class SendEmailInput: + email_msg: str + + +@activity.defn() +async def send_email(input: SendEmailInput) -> str: + """ + A stub Activity for sending an email. + """ + result = f"Email message: {input.email_msg}, sent" + activity.logger.info(result) + return result diff --git a/sleep_for_days/starter.py b/sleep_for_days/starter.py new file mode 100644 index 00000000..765842b2 --- /dev/null +++ b/sleep_for_days/starter.py @@ -0,0 +1,23 @@ +import asyncio +import uuid +from typing import Optional + +from temporalio.client import Client + +from sleep_for_days import TASK_QUEUE +from sleep_for_days.workflows import SleepForDaysWorkflow + + +async def main(client: Optional[Client] = None): + client = client or await Client.connect("localhost:7233") + wf_handle = await client.start_workflow( + SleepForDaysWorkflow.run, + id=f"sleep-for-days-workflow-id-{uuid.uuid4()}", + task_queue=TASK_QUEUE, + ) + # Wait for workflow completion (runs indefinitely until it receives a signal) + print(await wf_handle.result()) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/sleep_for_days/worker.py b/sleep_for_days/worker.py new file mode 100644 index 00000000..d03ec726 --- /dev/null +++ b/sleep_for_days/worker.py @@ -0,0 +1,27 @@ +import asyncio +import logging + +from temporalio.client import Client +from temporalio.worker import Worker + +from sleep_for_days import TASK_QUEUE +from sleep_for_days.activities import send_email +from sleep_for_days.workflows import SleepForDaysWorkflow + + +async def main(): + client = await Client.connect("localhost:7233") + + worker = Worker( + client, + task_queue=TASK_QUEUE, + workflows=[SleepForDaysWorkflow], + activities=[send_email], + ) + + await worker.run() + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + asyncio.run(main()) diff --git a/sleep_for_days/workflows.py b/sleep_for_days/workflows.py new file mode 100644 index 00000000..eff0b273 --- /dev/null +++ b/sleep_for_days/workflows.py @@ -0,0 +1,37 @@ +import asyncio +from dataclasses import dataclass +from datetime import timedelta + +from temporalio import workflow + +with workflow.unsafe.imports_passed_through(): + from sleep_for_days.activities import SendEmailInput, send_email + + +@workflow.defn() +class SleepForDaysWorkflow: + def __init__(self) -> None: + self.is_complete = False + + @workflow.run + async def run(self) -> str: + while not self.is_complete: + await workflow.execute_activity( + send_email, + SendEmailInput("30 days until the next email"), + start_to_close_timeout=timedelta(seconds=10), + ) + await workflow.wait( + [ + asyncio.create_task(workflow.sleep(timedelta(days=30))), + asyncio.create_task( + workflow.wait_condition(lambda: self.is_complete) + ), + ], + return_when=asyncio.FIRST_COMPLETED, + ) + return "done!" + + @workflow.signal + def complete(self): + self.is_complete = True diff --git a/tests/sleep_for_days/__init__.py b/tests/sleep_for_days/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/sleep_for_days/workflow_test.py b/tests/sleep_for_days/workflow_test.py new file mode 100644 index 00000000..fa3f0686 --- /dev/null +++ b/tests/sleep_for_days/workflow_test.py @@ -0,0 +1,53 @@ +import uuid +from datetime import timedelta + +from temporalio import activity +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker + +from sleep_for_days.starter import TASK_QUEUE +from sleep_for_days.workflows import SendEmailInput, SleepForDaysWorkflow + + +async def test_sleep_for_days_workflow(): + num_activity_executions = 0 + + # Mock out the activity to count executions + @activity.defn(name="send_email") + async def send_email_mock(input: SendEmailInput) -> str: + nonlocal num_activity_executions + num_activity_executions += 1 + return input.email_msg + + async with await WorkflowEnvironment.start_time_skipping() as env: + # if env.supports_time_skipping: + # pytest.skip( + # "Java test server: https://github.com/temporalio/sdk-java/issues/1903" + # ) + async with Worker( + env.client, + task_queue=TASK_QUEUE, + workflows=[SleepForDaysWorkflow], + activities=[send_email_mock], + ): + handle = await env.client.start_workflow( + SleepForDaysWorkflow.run, + id=str(uuid.uuid4()), + task_queue=TASK_QUEUE, + ) + + start_time = await env.get_current_time() + # Time-skip 5 minutes. + await env.sleep(timedelta(minutes=5)) + # Check that the activity has been called, we're now waiting for the sleep to finish. + assert num_activity_executions == 1 + # Time-skip 3 days. + await env.sleep(timedelta(days=90)) + # Expect 3 more activity calls. + assert num_activity_executions == 4 + # Send the signal to complete the workflow. + await handle.signal(SleepForDaysWorkflow.complete) + # Expect no more activity calls to have been made - workflow is complete. + assert num_activity_executions == 4 + # Expect more than 90 days to have passed. + assert (await env.get_current_time() - start_time) > timedelta(days=90)