37
37
[2] https://github.com/Rykian/clockwork
38
38
[3] https://adam.herokuapp.com/past/2010/6/30/replace_cron_with_clockwork/
39
39
"""
40
+ import asyncio
41
+ import inspect
40
42
from collections .abc import Hashable
41
43
import datetime
42
44
import functools
@@ -87,7 +89,7 @@ def __init__(self) -> None:
87
89
88
90
def run_pending (self ) -> None :
89
91
"""
90
- Run all jobs that are scheduled to run.
92
+ Run all sync jobs that are scheduled to run.
91
93
92
94
Please note that it is *intended behavior that run_pending()
93
95
does not run missed jobs*. For example, if you've registered a job
@@ -99,9 +101,22 @@ def run_pending(self) -> None:
99
101
for job in sorted (runnable_jobs ):
100
102
self ._run_job (job )
101
103
104
+ async def run_pending_async (self ):
105
+ """
106
+ Run all sync and async jobs that are scheduled to run.
107
+
108
+ Please note that it is *intended behavior that run_pending_async()
109
+ does not run missed jobs*. For example, if you've registered a job
110
+ that should run every minute and you only call run_pending()
111
+ in one hour increments then your job won't be run 60 times in
112
+ between but only once.
113
+ """
114
+ runnable_jobs = (job for job in self .jobs if job .should_run )
115
+ await asyncio .gather (* [self ._run_job_async (job ) for job in runnable_jobs ])
116
+
102
117
def run_all (self , delay_seconds : int = 0 ) -> None :
103
118
"""
104
- Run all jobs regardless if they are scheduled to run or not.
119
+ Run all sync jobs regardless if they are scheduled to run or not.
105
120
106
121
A delay of `delay` seconds is added between each job. This helps
107
122
distribute system load generated by the jobs more evenly
@@ -118,6 +133,25 @@ def run_all(self, delay_seconds: int = 0) -> None:
118
133
self ._run_job (job )
119
134
time .sleep (delay_seconds )
120
135
136
+ async def run_all_async (self , delay_seconds = 0 ):
137
+ """
138
+ Run all sync and async jobs regardless if they are scheduled to run or not.
139
+
140
+ A delay of `delay` seconds is added between each job. This helps
141
+ distribute system load generated by the jobs more evenly
142
+ over time.
143
+
144
+ :param delay_seconds: A delay added between every executed job
145
+ """
146
+ logger .debug (
147
+ "Running *all* %i jobs with %is delay in between" ,
148
+ len (self .jobs ),
149
+ delay_seconds ,
150
+ )
151
+ for job in self .jobs [:]:
152
+ await self ._run_job_async (job )
153
+ await asyncio .sleep (delay_seconds )
154
+
121
155
def get_jobs (self , tag : Optional [Hashable ] = None ) -> List ["Job" ]:
122
156
"""
123
157
Gets scheduled jobs marked with the given tag, or all jobs
@@ -173,6 +207,17 @@ def _run_job(self, job: "Job") -> None:
173
207
if isinstance (ret , CancelJob ) or ret is CancelJob :
174
208
self .cancel_job (job )
175
209
210
+ async def _run_job_async (self , job : "Job" ) -> None :
211
+ ret = job .run ()
212
+ if inspect .isawaitable (ret ):
213
+ ret = await ret
214
+
215
+ self ._process_job_return_value (job , ret )
216
+
217
+ def _process_job_return_value (self , job : "Job" , ret : any ):
218
+ if isinstance (ret , CancelJob ) or ret is CancelJob :
219
+ self .cancel_job (job )
220
+
176
221
def get_next_run (
177
222
self , tag : Optional [Hashable ] = None
178
223
) -> Optional [datetime .datetime ]:
@@ -855,6 +900,13 @@ def run_pending() -> None:
855
900
default_scheduler .run_pending ()
856
901
857
902
903
+ async def run_pending_async () -> None :
904
+ """Calls :meth:`run_pending_async <Scheduler.run_pending_async>` on the
905
+ :data:`default scheduler instance <default_scheduler>`.
906
+ """
907
+ await default_scheduler .run_pending_async ()
908
+
909
+
858
910
def run_all (delay_seconds : int = 0 ) -> None :
859
911
"""Calls :meth:`run_all <Scheduler.run_all>` on the
860
912
:data:`default scheduler instance <default_scheduler>`.
0 commit comments