-
Notifications
You must be signed in to change notification settings - Fork 64
/
Copy pathworkflow.py
32 lines (24 loc) · 935 Bytes
/
workflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from datetime import datetime, timezone
from typing import Optional
from temporalio import workflow
from updatable_timer.updatable_timer_lib import UpdatableTimer
@workflow.defn
class Workflow:
@workflow.init
def __init__(self, wake_up_time: float) -> None:
self.timer = UpdatableTimer(
datetime.fromtimestamp(wake_up_time, tz=timezone.utc)
)
@workflow.run
async def run(self, wake_up_time: float):
await self.timer.sleep()
@workflow.signal
async def update_wake_up_time(self, wake_up_time: float) -> None:
workflow.logger.info(f"update_wake_up_time: {wake_up_time}")
self.timer.update_wake_up_time(
datetime.fromtimestamp(wake_up_time, tz=timezone.utc)
)
@workflow.query
def get_wake_up_time(self) -> float:
workflow.logger.info(f"get_wake_up_time")
return float(self.timer.get_wake_up_time().timestamp())