Skip to content

Commit 2105f14

Browse files
authored
add run_check (#116)
* add run_check, fix #115 * fix job results * better exception message
1 parent 9848edd commit 2105f14

File tree

6 files changed

+106
-10
lines changed

6 files changed

+106
-10
lines changed

HISTORY.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
History
44
-------
55

6+
v0.16.0a4 (unreleased)
7+
......................
8+
* add ``Worker.run_check``, fix #115
9+
610
v0.16.0a3 (2010-03-12)
711
......................
812
* fix ``Worker`` with custom redis settings

arq/jobs.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,10 @@ async def result(self, timeout: Optional[float] = None, *, pole_delay: float = 0
4747
info = await self.result_info()
4848
if info:
4949
result = info['result']
50-
if isinstance(result, Exception):
51-
raise result
52-
else:
50+
if info['success']:
5351
return result
52+
else:
53+
raise result
5454
if timeout is not None and delay > timeout:
5555
raise asyncio.TimeoutError()
5656

@@ -81,14 +81,15 @@ async def result_info(self) -> Optional[Dict[str, Any]]:
8181
"""
8282
v = await self._redis.get(result_key_prefix + self.job_id, encoding=None)
8383
if v:
84-
enqueue_time_ms, job_try, function, args, kwargs, result, start_time_ms, finish_time_ms = pickle.loads(v)
84+
enqueue_time_ms, job_try, function, args, kwargs, s, r, start_time_ms, finish_time_ms = pickle.loads(v)
8585
return dict(
8686
enqueue_time=ms_to_datetime(enqueue_time_ms),
8787
job_try=job_try,
8888
function=function,
8989
args=args,
9090
kwargs=kwargs,
91-
result=result,
91+
result=r,
92+
success=s,
9293
start_time=ms_to_datetime(start_time_ms),
9394
finish_time=ms_to_datetime(finish_time_ms),
9495
)

arq/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22

33
__all__ = ['VERSION']
44

5-
VERSION = StrictVersion('0.16a3')
5+
VERSION = StrictVersion('0.16a4')

arq/worker.py

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,22 @@ def __str__(self):
100100
return repr(self)
101101

102102

103+
class FailedJobs(RuntimeError):
104+
def __init__(self, count, job_results):
105+
self.count = count
106+
self.job_results = job_results
107+
108+
def __str__(self):
109+
if self.count == 1 and self.job_results:
110+
exc = self.job_results[0]['result']
111+
return f'1 job failed "{exc.__class__.__name__}: {exc}"'
112+
else:
113+
return f'{self.count} jobs failed'
114+
115+
def __repr__(self):
116+
return f'<{str(self)}>'
117+
118+
103119
class Worker:
104120
"""
105121
Main class for running jobs.
@@ -174,7 +190,10 @@ def __init__(
174190
self._add_signal_handler(signal.SIGTERM, self.handle_sig)
175191
self.on_stop = None
176192

177-
def run(self):
193+
def run(self) -> None:
194+
"""
195+
Sync function to run the worker, finally closes worker connections.
196+
"""
178197
self.main_task = self.loop.create_task(self.main())
179198
try:
180199
self.loop.run_until_complete(self.main_task)
@@ -184,10 +203,27 @@ def run(self):
184203
finally:
185204
self.loop.run_until_complete(self.close())
186205

187-
async def async_run(self):
206+
async def async_run(self) -> None:
207+
"""
208+
Asynchronously run the worker, does not close connections. Useful when testing.
209+
"""
188210
self.main_task = self.loop.create_task(self.main())
189211
await self.main_task
190212

213+
async def run_check(self) -> int:
214+
"""
215+
Run :func:`arq.worker.Worker.async_run`, check for failed jobs and raise :class:`arq.worker.FailedJobs`
216+
if any jobs have failed.
217+
218+
:return: number of completed jobs
219+
"""
220+
await self.async_run()
221+
if self.jobs_failed:
222+
failed_job_results = [r for r in await self.pool.all_job_results() if not r['success']]
223+
raise FailedJobs(self.jobs_failed, failed_job_results)
224+
else:
225+
return self.jobs_complete
226+
191227
async def main(self):
192228
if self.pool is None:
193229
self.pool = await create_pool(self.redis_settings)
@@ -293,6 +329,7 @@ async def run_job(self, job_id, score): # noqa: C901
293329
}
294330
ctx = {**self.ctx, **job_ctx}
295331
start_ms = timestamp_ms()
332+
success = False
296333
try:
297334
s = args_to_string(args, kwargs)
298335
extra = f' try={job_try}' if job_try > 1 else ''
@@ -330,6 +367,7 @@ async def run_job(self, job_id, score): # noqa: C901
330367
finish = True
331368
self.jobs_failed += 1
332369
else:
370+
success = True
333371
finished_ms = timestamp_ms()
334372
logger.info('%6.2fs ← %s ● %s', (finished_ms - start_ms) / 1000, ref, result_str)
335373
finish = True
@@ -338,7 +376,7 @@ async def run_job(self, job_id, score): # noqa: C901
338376
result_timeout_s = self.keep_result_s if function.keep_result_s is None else function.keep_result_s
339377
result_data = None
340378
if result is not no_result and result_timeout_s > 0:
341-
d = enqueue_time_ms, job_try, function_name, args, kwargs, result, start_ms, finished_ms
379+
d = enqueue_time_ms, job_try, function_name, args, kwargs, success, result, start_ms, finished_ms
342380
try:
343381
result_data = pickle.dumps(d)
344382
except AttributeError:

tests/test_jobs.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ async def foobar(ctx, *args, **kwargs):
4848
'function': 'foobar',
4949
'args': (1, 2),
5050
'kwargs': {'c': 3},
51+
'success': True,
5152
'result': 42,
5253
'start_time': CloseToNow(),
5354
'finish_time': CloseToNow(),
@@ -61,6 +62,7 @@ async def foobar(ctx, *args, **kwargs):
6162
'function': 'foobar',
6263
'args': (1, 2),
6364
'kwargs': {'c': 3},
65+
'success': True,
6466
'result': 42,
6567
'start_time': CloseToNow(),
6668
'finish_time': CloseToNow(),

tests/test_worker.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,22 @@
44
import signal
55
from unittest.mock import MagicMock
66

7+
import pytest
78
from aioredis import create_redis_pool
89

910
from arq.connections import ArqRedis
1011
from arq.constants import health_check_key, job_key_prefix
11-
from arq.worker import Retry, Worker, async_check_health, check_health, func, run_worker
12+
from arq.worker import FailedJobs, Retry, Worker, async_check_health, check_health, func, run_worker
1213

1314

1415
async def foobar(ctx):
1516
return 42
1617

1718

19+
async def fails(ctx):
20+
raise TypeError('my type error')
21+
22+
1823
def test_no_jobs(arq_redis: ArqRedis, loop):
1924
class Settings:
2025
functions = [func(foobar, name='foobar')]
@@ -287,3 +292,49 @@ async def test_remain_keys_no_results(arq_redis: ArqRedis, worker):
287292
worker: Worker = worker(functions=[func(foobar, keep_result=0)])
288293
await worker.main()
289294
assert sorted(await arq_redis.keys('*')) == ['arq:health-check']
295+
296+
297+
async def test_run_check_passes(arq_redis: ArqRedis, worker):
298+
await arq_redis.enqueue_job('foobar')
299+
await arq_redis.enqueue_job('foobar')
300+
worker: Worker = worker(functions=[func(foobar, name='foobar')])
301+
assert 2 == await worker.run_check()
302+
303+
304+
async def test_run_check_error(arq_redis: ArqRedis, worker):
305+
await arq_redis.enqueue_job('fails')
306+
worker: Worker = worker(functions=[func(fails, name='fails')])
307+
with pytest.raises(FailedJobs, match='1 job failed "TypeError: my type error"'):
308+
await worker.run_check()
309+
310+
311+
async def test_run_check_error2(arq_redis: ArqRedis, worker):
312+
await arq_redis.enqueue_job('fails')
313+
await arq_redis.enqueue_job('fails')
314+
worker: Worker = worker(functions=[func(fails, name='fails')])
315+
with pytest.raises(FailedJobs, match='2 jobs failed') as exc_info:
316+
await worker.run_check()
317+
assert len(exc_info.value.job_results) == 2
318+
319+
320+
async def test_return_exception(arq_redis: ArqRedis, worker):
321+
async def return_error(ctx):
322+
return TypeError('xxx')
323+
324+
j = await arq_redis.enqueue_job('return_error')
325+
worker: Worker = worker(functions=[func(return_error, name='return_error')])
326+
await worker.async_run()
327+
assert (worker.jobs_complete, worker.jobs_failed, worker.jobs_retried) == (1, 0, 0)
328+
r = await j.result(pole_delay=0)
329+
assert isinstance(r, TypeError)
330+
info = await j.result_info()
331+
assert info['success'] is True
332+
333+
334+
async def test_error_success(arq_redis: ArqRedis, worker):
335+
j = await arq_redis.enqueue_job('fails')
336+
worker: Worker = worker(functions=[func(fails, name='fails')])
337+
await worker.async_run()
338+
assert (worker.jobs_complete, worker.jobs_failed, worker.jobs_retried) == (0, 1, 0)
339+
info = await j.result_info()
340+
assert info['success'] is False

0 commit comments

Comments
 (0)