Skip to content

Commit 7577bd6

Browse files
authored
Added updatable timer sample. (#167)
1 parent 3bd017d commit 7577bd6

11 files changed

+242
-0
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
.venv
2+
.idea
23
__pycache__

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ Some examples require extra dependencies. See each sample's directory for specif
7373
* [schedules](schedules) - Demonstrates a Workflow Execution that occurs according to a schedule.
7474
* [sentry](sentry) - Report errors to Sentry.
7575
* [trio_async](trio_async) - Use asyncio Temporal in Trio-based environments.
76+
* [updatable_timer](updatable_timer) - A timer that can be updated while sleeping.
7677
* [worker_specific_task_queues](worker_specific_task_queues) - Use unique task queues to ensure activities run on specific workers.
7778
* [worker_versioning](worker_versioning) - Use the Worker Versioning feature to more easily version your workflows & other code.
7879

tests/updatable_timer/__init__.py

Whitespace-only changes.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
import datetime
2+
import logging
3+
import math
4+
import uuid
5+
6+
from temporalio.client import Client, WorkflowExecutionStatus
7+
from temporalio.testing import WorkflowEnvironment
8+
from temporalio.worker import Worker
9+
10+
from updatable_timer.workflow import Workflow
11+
12+
13+
async def test_updatable_timer_workflow(client: Client):
14+
logging.basicConfig(level=logging.DEBUG)
15+
16+
task_queue_name = str(uuid.uuid4())
17+
async with await WorkflowEnvironment.start_time_skipping() as env:
18+
async with Worker(env.client, task_queue=task_queue_name, workflows=[Workflow]):
19+
in_a_day = float(
20+
(datetime.datetime.now() + datetime.timedelta(days=1)).timestamp()
21+
)
22+
in_an_hour = float(
23+
(datetime.datetime.now() + datetime.timedelta(hours=1)).timestamp()
24+
)
25+
handle = await env.client.start_workflow(
26+
Workflow.run, in_a_day, id=str(uuid.uuid4()), task_queue=task_queue_name
27+
)
28+
wake_up_time1 = await handle.query(Workflow.get_wake_up_time)
29+
assert math.isclose(wake_up_time1, in_a_day)
30+
await handle.signal(Workflow.update_wake_up_time, in_an_hour)
31+
wake_up_time2 = await handle.query(Workflow.get_wake_up_time)
32+
assert math.isclose(wake_up_time2, in_an_hour)
33+
await handle.result()

updatable_timer/README.md

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# Updatable Timer Sample
2+
3+
Demonstrates a helper class which relies on `workflow.wait_condition` to implement a blocking sleep that can be updated at any moment.
4+
5+
The sample is composed of the three executables:
6+
7+
* `worker.py` hosts the Workflow Executions.
8+
* `starter.py` starts Workflow Executions.
9+
* `wake_up_timer_updater.py` Signals the Workflow Execution with the new time to wake up.
10+
11+
First start the Worker:
12+
13+
```bash
14+
poetry run python worker.py
15+
```
16+
Check the output of the Worker window. The expected output is:
17+
18+
```
19+
Worker started, ctrl+c to exit
20+
```
21+
22+
Then in a different terminal window start the Workflow Execution:
23+
24+
```bash
25+
poetry run python starter.py
26+
```
27+
Check the output of the Worker window. The expected output is:
28+
```
29+
Workflow started: run_id=...
30+
```
31+
32+
Then run the updater as many times as you want to change timer to 10 seconds from now:
33+
34+
```bash
35+
poetry run python wake_up_time_updater.py
36+
```
37+
38+
Check the output of the worker window. The expected output is:
39+
40+
```
41+
Updated wake up time to 10 seconds from now
42+
```

updatable_timer/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
TASK_QUEUE = "updatable-timer"

updatable_timer/starter.py

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import asyncio
2+
import logging
3+
from datetime import datetime, timedelta
4+
from typing import Optional
5+
6+
from temporalio import exceptions
7+
from temporalio.client import Client
8+
9+
from updatable_timer import TASK_QUEUE
10+
from updatable_timer.workflow import Workflow
11+
12+
13+
async def main(client: Optional[Client] = None):
14+
logging.basicConfig(level=logging.INFO)
15+
16+
client = client or await Client.connect("localhost:7233")
17+
try:
18+
handle = await client.start_workflow(
19+
Workflow.run,
20+
(datetime.now() + timedelta(days=1)).timestamp(),
21+
id=f"updatable-timer-workflow",
22+
task_queue=TASK_QUEUE,
23+
)
24+
logging.info(f"Workflow started: run_id={handle.result_run_id}")
25+
except exceptions.WorkflowAlreadyStartedError as e:
26+
logging.info(
27+
f"Workflow already running: workflow_id={e.workflow_id}, run_id={e.run_id}"
28+
)
29+
30+
31+
if __name__ == "__main__":
32+
asyncio.run(main())
+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import asyncio
2+
from datetime import datetime, timedelta
3+
4+
from temporalio import workflow
5+
6+
7+
class UpdatableTimer:
8+
def __init__(self, wake_up_time: datetime) -> None:
9+
self.wake_up_time = wake_up_time
10+
self.wake_up_time_updated = False
11+
12+
async def sleep(self) -> None:
13+
workflow.logger.info(f"sleep_until: {self.wake_up_time}")
14+
while True:
15+
now = workflow.now()
16+
17+
sleep_interval = self.wake_up_time - now
18+
if sleep_interval <= timedelta(0):
19+
break
20+
workflow.logger.info(f"Going to sleep for {sleep_interval}")
21+
22+
try:
23+
self.wake_up_time_updated = False
24+
await workflow.wait_condition(
25+
lambda: self.wake_up_time_updated,
26+
timeout=sleep_interval,
27+
)
28+
except asyncio.TimeoutError:
29+
# checks condition at the beginning of the loop
30+
continue
31+
workflow.logger.info(f"sleep_until completed")
32+
33+
def update_wake_up_time(self, wake_up_time: datetime) -> None:
34+
workflow.logger.info(f"update_wake_up_time: {wake_up_time}")
35+
self.wake_up_time = wake_up_time
36+
self.wake_up_time_updated = True
37+
38+
def get_wake_up_time(self) -> datetime:
39+
return self.wake_up_time
+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import asyncio
2+
import logging
3+
from datetime import datetime, timedelta
4+
from typing import Optional
5+
6+
from temporalio.client import Client
7+
8+
from updatable_timer.workflow import Workflow
9+
10+
11+
async def main(client: Optional[Client] = None):
12+
logging.basicConfig(level=logging.INFO)
13+
14+
client = client or await Client.connect("localhost:7233")
15+
handle = client.get_workflow_handle(workflow_id="updatable-timer-workflow")
16+
# signal workflow about the wake up time change
17+
await handle.signal(
18+
Workflow.update_wake_up_time,
19+
(datetime.now() + timedelta(seconds=10)).timestamp(),
20+
)
21+
22+
logging.info("Updated wake up time to 10 seconds from now")
23+
24+
25+
if __name__ == "__main__":
26+
asyncio.run(main())

updatable_timer/worker.py

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import asyncio
2+
import logging
3+
4+
from temporalio.client import Client
5+
from temporalio.worker import Worker
6+
7+
from updatable_timer import TASK_QUEUE
8+
from updatable_timer.workflow import Workflow
9+
10+
interrupt_event = asyncio.Event()
11+
12+
13+
async def main():
14+
logging.basicConfig(level=logging.INFO)
15+
16+
client = await Client.connect("localhost:7233")
17+
async with Worker(
18+
client,
19+
task_queue=TASK_QUEUE,
20+
workflows=[Workflow],
21+
):
22+
logging.info("Worker started, ctrl+c to exit")
23+
# Wait until interrupted
24+
await interrupt_event.wait()
25+
logging.info("Interrupt received, shutting down...")
26+
27+
28+
if __name__ == "__main__":
29+
loop = asyncio.new_event_loop()
30+
asyncio.set_event_loop(loop)
31+
try:
32+
loop.run_until_complete(main())
33+
except KeyboardInterrupt:
34+
interrupt_event.set()
35+
loop.run_until_complete(loop.shutdown_asyncgens())

updatable_timer/workflow.py

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from datetime import datetime, timezone
2+
from typing import Optional
3+
4+
from temporalio import workflow
5+
6+
from updatable_timer.updatable_timer_lib import UpdatableTimer
7+
8+
9+
@workflow.defn
10+
class Workflow:
11+
@workflow.init
12+
def __init__(self, wake_up_time: float) -> None:
13+
self.timer = UpdatableTimer(
14+
datetime.fromtimestamp(wake_up_time, tz=timezone.utc)
15+
)
16+
17+
@workflow.run
18+
async def run(self, wake_up_time: float):
19+
await self.timer.sleep()
20+
21+
@workflow.signal
22+
async def update_wake_up_time(self, wake_up_time: float) -> None:
23+
workflow.logger.info(f"update_wake_up_time: {wake_up_time}")
24+
25+
self.timer.update_wake_up_time(
26+
datetime.fromtimestamp(wake_up_time, tz=timezone.utc)
27+
)
28+
29+
@workflow.query
30+
def get_wake_up_time(self) -> float:
31+
workflow.logger.info(f"get_wake_up_time")
32+
return float(self.timer.get_wake_up_time().timestamp())

0 commit comments

Comments
 (0)