-
Notifications
You must be signed in to change notification settings - Fork 66
/
Copy pathworkflows.py
37 lines (31 loc) · 1.07 KB
/
workflows.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import asyncio
from dataclasses import dataclass
from datetime import timedelta
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from sleep_for_days.activities import SendEmailInput, send_email
@workflow.defn()
class SleepForDaysWorkflow:
def __init__(self) -> None:
self.is_complete = False
@workflow.run
async def run(self) -> str:
while not self.is_complete:
await workflow.execute_activity(
send_email,
SendEmailInput("30 days until the next email"),
start_to_close_timeout=timedelta(seconds=10),
)
await workflow.wait(
[
asyncio.create_task(workflow.sleep(timedelta(days=30))),
asyncio.create_task(
workflow.wait_condition(lambda: self.is_complete)
),
],
return_when=asyncio.FIRST_COMPLETED,
)
return "done!"
@workflow.signal
def complete(self):
self.is_complete = True