Skip to content

Commit 2f3f2ac

Browse files
authored
sleep-for-days sample (#160)
* sleep-for-days sample * added race between signal and sleep to terminate immediately on signal * added test, removed input param for workflow * fixed text string * address pr comments
1 parent d9658d5 commit 2f3f2ac

File tree

8 files changed

+177
-0
lines changed

8 files changed

+177
-0
lines changed

sleep_for_days/README.md

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# Sleep for Days
2+
3+
This sample demonstrates how to create a Temporal workflow that runs forever, sending an email every 30 days.
4+
5+
To run, first see the main [README.md](../../README.md) for prerequisites.
6+
7+
Then create two terminals and `cd` to this directory.
8+
9+
Run the worker in one terminal:
10+
11+
poetry run python worker.py
12+
13+
And execute the workflow in the other terminal:
14+
15+
poetry run python starter.py
16+
17+
This sample will run indefinitely until you send a signal to `complete`. See how to send a signal via Temporal CLI [here](https://docs.temporal.io/cli/workflow#signal).
18+

sleep_for_days/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
TASK_QUEUE = "sleep-for-days-task-queue"

sleep_for_days/activities.py

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
from dataclasses import dataclass
2+
3+
from temporalio import activity
4+
5+
6+
@dataclass
7+
class SendEmailInput:
8+
email_msg: str
9+
10+
11+
@activity.defn()
12+
async def send_email(input: SendEmailInput) -> str:
13+
"""
14+
A stub Activity for sending an email.
15+
"""
16+
result = f"Email message: {input.email_msg}, sent"
17+
activity.logger.info(result)
18+
return result

sleep_for_days/starter.py

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import asyncio
2+
import uuid
3+
from typing import Optional
4+
5+
from temporalio.client import Client
6+
7+
from sleep_for_days import TASK_QUEUE
8+
from sleep_for_days.workflows import SleepForDaysWorkflow
9+
10+
11+
async def main(client: Optional[Client] = None):
12+
client = client or await Client.connect("localhost:7233")
13+
wf_handle = await client.start_workflow(
14+
SleepForDaysWorkflow.run,
15+
id=f"sleep-for-days-workflow-id-{uuid.uuid4()}",
16+
task_queue=TASK_QUEUE,
17+
)
18+
# Wait for workflow completion (runs indefinitely until it receives a signal)
19+
print(await wf_handle.result())
20+
21+
22+
if __name__ == "__main__":
23+
asyncio.run(main())

sleep_for_days/worker.py

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import asyncio
2+
import logging
3+
4+
from temporalio.client import Client
5+
from temporalio.worker import Worker
6+
7+
from sleep_for_days import TASK_QUEUE
8+
from sleep_for_days.activities import send_email
9+
from sleep_for_days.workflows import SleepForDaysWorkflow
10+
11+
12+
async def main():
13+
client = await Client.connect("localhost:7233")
14+
15+
worker = Worker(
16+
client,
17+
task_queue=TASK_QUEUE,
18+
workflows=[SleepForDaysWorkflow],
19+
activities=[send_email],
20+
)
21+
22+
await worker.run()
23+
24+
25+
if __name__ == "__main__":
26+
logging.basicConfig(level=logging.INFO)
27+
asyncio.run(main())

sleep_for_days/workflows.py

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import asyncio
2+
from dataclasses import dataclass
3+
from datetime import timedelta
4+
5+
from temporalio import workflow
6+
7+
with workflow.unsafe.imports_passed_through():
8+
from sleep_for_days.activities import SendEmailInput, send_email
9+
10+
11+
@workflow.defn()
12+
class SleepForDaysWorkflow:
13+
def __init__(self) -> None:
14+
self.is_complete = False
15+
16+
@workflow.run
17+
async def run(self) -> str:
18+
while not self.is_complete:
19+
await workflow.execute_activity(
20+
send_email,
21+
SendEmailInput("30 days until the next email"),
22+
start_to_close_timeout=timedelta(seconds=10),
23+
)
24+
await workflow.wait(
25+
[
26+
asyncio.create_task(workflow.sleep(timedelta(days=30))),
27+
asyncio.create_task(
28+
workflow.wait_condition(lambda: self.is_complete)
29+
),
30+
],
31+
return_when=asyncio.FIRST_COMPLETED,
32+
)
33+
return "done!"
34+
35+
@workflow.signal
36+
def complete(self):
37+
self.is_complete = True

tests/sleep_for_days/__init__.py

Whitespace-only changes.

tests/sleep_for_days/workflow_test.py

+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import uuid
2+
from datetime import timedelta
3+
4+
from temporalio import activity
5+
from temporalio.testing import WorkflowEnvironment
6+
from temporalio.worker import Worker
7+
8+
from sleep_for_days.starter import TASK_QUEUE
9+
from sleep_for_days.workflows import SendEmailInput, SleepForDaysWorkflow
10+
11+
12+
async def test_sleep_for_days_workflow():
13+
num_activity_executions = 0
14+
15+
# Mock out the activity to count executions
16+
@activity.defn(name="send_email")
17+
async def send_email_mock(input: SendEmailInput) -> str:
18+
nonlocal num_activity_executions
19+
num_activity_executions += 1
20+
return input.email_msg
21+
22+
async with await WorkflowEnvironment.start_time_skipping() as env:
23+
# if env.supports_time_skipping:
24+
# pytest.skip(
25+
# "Java test server: https://github.com/temporalio/sdk-java/issues/1903"
26+
# )
27+
async with Worker(
28+
env.client,
29+
task_queue=TASK_QUEUE,
30+
workflows=[SleepForDaysWorkflow],
31+
activities=[send_email_mock],
32+
):
33+
handle = await env.client.start_workflow(
34+
SleepForDaysWorkflow.run,
35+
id=str(uuid.uuid4()),
36+
task_queue=TASK_QUEUE,
37+
)
38+
39+
start_time = await env.get_current_time()
40+
# Time-skip 5 minutes.
41+
await env.sleep(timedelta(minutes=5))
42+
# Check that the activity has been called, we're now waiting for the sleep to finish.
43+
assert num_activity_executions == 1
44+
# Time-skip 3 days.
45+
await env.sleep(timedelta(days=90))
46+
# Expect 3 more activity calls.
47+
assert num_activity_executions == 4
48+
# Send the signal to complete the workflow.
49+
await handle.signal(SleepForDaysWorkflow.complete)
50+
# Expect no more activity calls to have been made - workflow is complete.
51+
assert num_activity_executions == 4
52+
# Expect more than 90 days to have passed.
53+
assert (await env.get_current_time() - start_time) > timedelta(days=90)

0 commit comments

Comments
 (0)