-
-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy path__init__.py
More file actions
119 lines (89 loc) · 3.48 KB
/
__init__.py
File metadata and controls
119 lines (89 loc) · 3.48 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
"""Cron style scheduler for asynchronous Dramatiq tasks in Django."""
from unittest.mock import Mock
from apscheduler.schedulers.base import STATE_STOPPED
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from django.utils import timezone
from . import _version
try:
from sentry_sdk.crons import monitor
except ImportError:
monitor = None
__version__ = _version.version
VERSION = _version.version_tuple
__all__ = ["cron", "interval", "scheduler"]
class LazyBlockingScheduler(BlockingScheduler):
"""Avoid annoying info logs for pending jobs."""
def add_job(self, *args, **kwargs):
logger = self._logger
if self.state == STATE_STOPPED:
# We don't want to schedule jobs before the scheduler is started.
self._logger = Mock()
super().add_job(*args, **kwargs)
self._logger = logger
scheduler = LazyBlockingScheduler()
def cron(schedule):
"""
Run task on a scheduler with a cron schedule.
Usage:
@cron("0 0 * * *")
@dramatiq.actor
def cron_test():
print("Cron test")
Please don't forget to set up a sentry monitor for the actor, otherwise you won't
get any notifications if the cron job fails.
The monitor slug is your actor name, the schedule should be set to the same
cron schedule as the cron decorator. The schedule type should be set to cron.
The monitors timezone should be set to Europe/Berlin.
"""
def decorator(actor):
*_, day_schedule = schedule.split(" ")
# CronTrigger uses Python's timezone dependent first weekday,
# so in Berlin monday is 0 and sunday is 6. We use literals to avoid
# confusion. Literals are also more readable and crontab conform.
if any(i.isdigit() for i in day_schedule):
raise ValueError(
"Please use a literal day of week (Mon, Tue, Wed, Thu, Fri, Sat, Sun) or *"
)
if monitor is not None:
actor.fn = monitor(actor.actor_name)(actor.fn)
scheduler.add_job(
actor.send,
CronTrigger.from_crontab(
schedule,
timezone=timezone.get_default_timezone(),
),
name=actor.actor_name,
)
# We don't add the Sentry monitor on the actor itself, because we only want to
# monitor the cron job, not the actor itself, or it's direct invocations.
return actor
return decorator
def interval(*, seconds):
"""
Run task on a periodic interval.
Usage:
@interval(seconds=30)
@dramatiq.actor
def interval_test():
print("Interval test")
Please note that the interval is relative to the time the scheduler is started. For
example, if you start the scheduler at 12:00:00, the first run will be at 12:00:30.
However, if you restart the scheduler at 12:00:15, the first run will be at
12:00:45.
For an interval that is consistent with the clock, use the `cron` decorator instead.
"""
def decorator(actor):
if monitor is not None:
actor.fn = monitor(actor.actor_name)(actor.fn)
scheduler.add_job(
actor.send,
IntervalTrigger(
seconds=seconds,
timezone=timezone.get_default_timezone(),
),
name=actor.actor_name,
)
return actor
return decorator