Skip to content

Commit 12456ee

Browse files
Raise ResultNotFound when Job.result() finds no job and no result (#364)
* Raise ResultNotFound when Job.result() finds no job and no result Job.result() assumed that a result will eventually show up in Redis. This assumption does not hold for jobs which are not in the queue and jobs which do not keep result. In such cases Job.result() would hang forever (or until the optional timeout is reached). This PR changes Job.result() so that it raises a new exception ResultNotFound, rather than waiting forever, if result is not found and the job is not in the queue. * Subclass ResultNotFound from RuntimeError Co-authored-by: Samuel Colvin <[email protected]> * Add message when raising ResultNotFound * Improve docstring for Job.result() * Handle ResultNotFound in Job.abort() * Update arq/jobs.py Co-authored-by: Samuel Colvin <[email protected]>
1 parent e9452c5 commit 12456ee

File tree

3 files changed

+67
-11
lines changed

3 files changed

+67
-11
lines changed

arq/jobs.py

+27-9
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818
Deserializer = Callable[[bytes], Dict[str, Any]]
1919

2020

21+
class ResultNotFound(RuntimeError):
22+
pass
23+
24+
2125
class JobStatus(str, Enum):
2226
"""
2327
Enum of job statuses.
@@ -82,8 +86,10 @@ async def result(
8286
self, timeout: Optional[float] = None, *, poll_delay: float = 0.5, pole_delay: float = None
8387
) -> Any:
8488
"""
85-
Get the result of the job, including waiting if it's not yet available. If the job raised an exception,
86-
it will be raised here.
89+
Get the result of the job or, if the job raised an exception, reraise it.
90+
91+
This function waits for the result if it's not yet available and the job is
92+
present in the queue. Otherwise ``ResultNotFound`` is raised.
8793
8894
:param timeout: maximum time to wait for the job result before raising ``TimeoutError``, will wait forever
8995
:param poll_delay: how often to poll redis for the job result
@@ -96,15 +102,24 @@ async def result(
96102
poll_delay = pole_delay
97103

98104
async for delay in poll(poll_delay):
99-
info = await self.result_info()
100-
if info:
101-
result = info.result
105+
async with self._redis.pipeline(transaction=True):
106+
v = await self._redis.get(result_key_prefix + self.job_id)
107+
s = await self._redis.zscore(self._queue_name, self.job_id)
108+
109+
if v:
110+
info = deserialize_result(v, deserializer=self._deserializer)
102111
if info.success:
103-
return result
104-
elif isinstance(result, (Exception, asyncio.CancelledError)):
105-
raise result
112+
return info.result
113+
elif isinstance(info.result, (Exception, asyncio.CancelledError)):
114+
raise info.result
106115
else:
107-
raise SerializationError(result)
116+
raise SerializationError(info.result)
117+
elif s is None:
118+
raise ResultNotFound(
119+
'Not waiting for job result because the job is not in queue. '
120+
'Is the worker function configured to keep result?'
121+
)
122+
108123
if timeout is not None and delay > timeout:
109124
raise asyncio.TimeoutError()
110125

@@ -174,6 +189,9 @@ async def abort(self, *, timeout: Optional[float] = None, poll_delay: float = 0.
174189
await self.result(timeout=timeout, poll_delay=poll_delay)
175190
except asyncio.CancelledError:
176191
return True
192+
except ResultNotFound:
193+
# We do not know if the job was cancelled or not
194+
return False
177195
else:
178196
return False
179197

tests/test_jobs.py

+35-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,15 @@
77
from arq import Worker, func
88
from arq.connections import ArqRedis, RedisSettings, create_pool
99
from arq.constants import default_queue_name, in_progress_key_prefix, job_key_prefix, result_key_prefix
10-
from arq.jobs import DeserializationError, Job, JobResult, JobStatus, deserialize_job_raw, serialize_result
10+
from arq.jobs import (
11+
DeserializationError,
12+
Job,
13+
JobResult,
14+
JobStatus,
15+
ResultNotFound,
16+
deserialize_job_raw,
17+
serialize_result,
18+
)
1119

1220

1321
async def test_job_in_progress(arq_redis: ArqRedis):
@@ -25,11 +33,36 @@ async def test_unknown(arq_redis: ArqRedis):
2533

2634

2735
async def test_result_timeout(arq_redis: ArqRedis):
28-
j = Job('foobar', arq_redis)
36+
j = await arq_redis.enqueue_job('foobar')
2937
with pytest.raises(asyncio.TimeoutError):
3038
await j.result(0.1, poll_delay=0)
3139

3240

41+
async def test_result_not_found(arq_redis: ArqRedis):
42+
j = Job('foobar', arq_redis)
43+
with pytest.raises(ResultNotFound):
44+
await j.result()
45+
46+
47+
async def test_result_when_job_does_not_keep_result(arq_redis: ArqRedis, worker):
48+
async def foobar(ctx):
49+
pass
50+
51+
worker: Worker = worker(functions=[func(foobar, name='foobar', keep_result=0)])
52+
j = await arq_redis.enqueue_job('foobar')
53+
54+
result_call = asyncio.Task(j.result())
55+
56+
_, pending = await asyncio.wait([result_call], timeout=0.1)
57+
assert pending == {result_call}
58+
59+
await worker.main()
60+
61+
with pytest.raises(ResultNotFound):
62+
# Job has completed and did not store any result
63+
await asyncio.wait_for(result_call, timeout=5)
64+
65+
3366
async def test_enqueue_job(arq_redis: ArqRedis, worker, queue_name=default_queue_name):
3467
async def foobar(ctx, *args, **kwargs):
3568
return 42

tests/test_worker.py

+5
Original file line numberDiff line numberDiff line change
@@ -810,6 +810,11 @@ async def wait_and_abort(job, delay=0.1):
810810
assert worker.job_tasks == {}
811811

812812

813+
async def test_abort_job_which_is_not_in_queue(arq_redis: ArqRedis):
814+
job = Job(job_id='testing', redis=arq_redis)
815+
assert await job.abort() is False
816+
817+
813818
async def test_abort_job_before(arq_redis: ArqRedis, worker, caplog, loop):
814819
async def longfunc(ctx):
815820
await asyncio.sleep(3600)

0 commit comments

Comments
 (0)