Skip to content

Commit b2d397a

Browse files
authored
non retry mode (#140)
* non retry mode * update history, fix #139 * adding max_burst_jobs * history
1 parent 06c05d5 commit b2d397a

File tree

5 files changed

+55
-8
lines changed

5 files changed

+55
-8
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
/env
1+
/env/
2+
/env36/
23
/.idea
34
__pycache__/
45
*.py[cod]

HISTORY.rst

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
History
44
-------
55

6-
v0.16.1 (unreleased)
6+
v0.16.1 (2019-08-02)
77
....................
88
* prevent duplicate ``job_id`` when job result exists, fix #137
9+
* add "don't retry mode" via ``worker.retry_jobs = False``, fix #139
10+
* add ``worker.max_burst_jobs``
911

1012
v0.16 (2019-07-30)
1113
..................

arq/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22

33
__all__ = ['VERSION']
44

5-
VERSION = StrictVersion('0.16')
5+
VERSION = StrictVersion('0.16.1')

arq/worker.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,6 @@ class Retry(RuntimeError):
8888
:param defer: duration to wait before rerunning the job
8989
"""
9090

91-
__slots__ = ('defer_score',)
92-
9391
def __init__(self, defer: Optional[SecondsTimedelta] = None):
9492
self.defer_score = to_ms(defer)
9593

@@ -164,6 +162,8 @@ def __init__(
164162
health_check_interval: SecondsTimedelta = 3600,
165163
health_check_key: Optional[str] = None,
166164
ctx: Optional[Dict] = None,
165+
retry_jobs: bool = True,
166+
max_burst_jobs: int = -1,
167167
):
168168
self.functions: Dict[str, Union[Function, CronJob]] = {f.name: f for f in map(func, functions)}
169169
self.queue_name = queue_name
@@ -205,6 +205,9 @@ def __init__(
205205
self._add_signal_handler(signal.SIGINT, self.handle_sig)
206206
self._add_signal_handler(signal.SIGTERM, self.handle_sig)
207207
self.on_stop = None
208+
# whether or not to retry jobs on Retry and CancelledError
209+
self.retry_jobs = retry_jobs
210+
self.max_burst_jobs = max_burst_jobs
208211

209212
def run(self) -> None:
210213
"""
@@ -226,13 +229,17 @@ async def async_run(self) -> None:
226229
self.main_task = self.loop.create_task(self.main())
227230
await self.main_task
228231

229-
async def run_check(self) -> int:
232+
async def run_check(self, retry_jobs: Optional[bool] = None, max_burst_jobs: Optional[int] = None) -> int:
230233
"""
231234
Run :func:`arq.worker.Worker.async_run`, check for failed jobs and raise :class:`arq.worker.FailedJobs`
232235
if any jobs have failed.
233236
234237
:return: number of completed jobs
235238
"""
239+
if retry_jobs is not None:
240+
self.retry_jobs = retry_jobs
241+
if max_burst_jobs is not None:
242+
self.max_burst_jobs = max_burst_jobs
236243
await self.async_run()
237244
if self.jobs_failed:
238245
failed_job_results = [r for r in await self.pool.all_job_results() if not r.success]
@@ -265,6 +272,11 @@ async def main(self):
265272
await self.heart_beat()
266273

267274
if self.burst:
275+
if (
276+
self.max_burst_jobs >= 0
277+
and self.jobs_complete + self.jobs_retried + self.jobs_failed >= self.max_burst_jobs
278+
):
279+
return
268280
queued_jobs = await self.pool.zcard(self.queue_name)
269281
if queued_jobs == 0:
270282
return
@@ -405,13 +417,13 @@ async def run_job(self, job_id, score): # noqa: C901
405417
except Exception as e:
406418
finished_ms = timestamp_ms()
407419
t = (finished_ms - start_ms) / 1000
408-
if isinstance(e, Retry):
420+
if self.retry_jobs and isinstance(e, Retry):
409421
incr_score = e.defer_score
410422
logger.info('%6.2fs ↻ %s retrying job in %0.2fs', t, ref, (e.defer_score or 0) / 1000)
411423
if e.defer_score:
412424
incr_score = e.defer_score + (timestamp_ms() - score)
413425
self.jobs_retried += 1
414-
elif isinstance(e, asyncio.CancelledError):
426+
elif self.retry_jobs and isinstance(e, asyncio.CancelledError):
415427
logger.info('%6.2fs ↻ %s cancelled, will be run again', t, ref)
416428
self.jobs_retried += 1
417429
else:

tests/test_worker.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,38 @@ async def retry(ctx):
110110
assert '0.XXs ← testing:retry ●' in log
111111

112112

113+
async def test_job_retry_dont_retry(arq_redis: ArqRedis, worker, caplog):
114+
async def retry(ctx):
115+
raise Retry(defer=0.01)
116+
117+
caplog.set_level(logging.INFO)
118+
await arq_redis.enqueue_job('retry', _job_id='testing')
119+
worker: Worker = worker(functions=[func(retry, name='retry')])
120+
with pytest.raises(FailedJobs) as exc_info:
121+
await worker.run_check(retry_jobs=False)
122+
assert str(exc_info.value) == '1 job failed <Retry defer 0.01s>'
123+
124+
assert '↻' not in caplog.text
125+
assert '! testing:retry failed, Retry: <Retry defer 0.01s>\n' in caplog.text
126+
127+
128+
async def test_job_retry_max_jobs(arq_redis: ArqRedis, worker, caplog):
129+
async def retry(ctx):
130+
raise Retry(defer=0.01)
131+
132+
caplog.set_level(logging.INFO)
133+
await arq_redis.enqueue_job('retry', _job_id='testing')
134+
worker: Worker = worker(functions=[func(retry, name='retry')])
135+
assert await worker.run_check(max_burst_jobs=1) == 0
136+
assert worker.jobs_complete == 0
137+
assert worker.jobs_retried == 1
138+
assert worker.jobs_failed == 0
139+
140+
log = re.sub(r'(\d+).\d\ds', r'\1.XXs', caplog.text)
141+
assert '0.XXs ↻ testing:retry retrying job in 0.XXs\n' in log
142+
assert '0.XXs → testing:retry() try=2\n' not in log
143+
144+
113145
async def test_job_job_not_found(arq_redis: ArqRedis, worker, caplog):
114146
caplog.set_level(logging.INFO)
115147
await arq_redis.enqueue_job('missing', _job_id='testing')

0 commit comments

Comments
 (0)