Skip to content

Commit 6a750a2

Browse files
authored
implement queued_jobs, fix #145 (#146)
1 parent 3b35919 commit 6a750a2

File tree

3 files changed

+60
-6
lines changed

3 files changed

+60
-6
lines changed

HISTORY.rst

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@
33
History
44
-------
55

6-
v0.17.0 (unreleased)
7-
....................
6+
v0.17 (unreleased)
7+
..................
88
* add ``worker.queue_read_limit``, fix #141, by @rubik
99
* custom serializers, eg. to use msgpack rather than pickle, #143 by @rubik
10+
* add ``ArqRedis.queued_jobs`` utility method for getting queued jobs while testing, fix #145 by @samuelcolvin
1011

1112
v0.16.1 (2019-08-02)
1213
....................

arq/connections.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from aioredis import MultiExecError, Redis
1212

1313
from .constants import default_queue_name, job_key_prefix, result_key_prefix
14-
from .jobs import Deserializer, Job, JobResult, Serializer, serialize_job
14+
from .jobs import Deserializer, Job, JobDef, JobResult, Serializer, deserialize_job, serialize_job
1515
from .utils import timestamp_ms, to_ms, to_unix_ms
1616

1717
logger = logging.getLogger('arq.connections')
@@ -126,7 +126,7 @@ async def enqueue_job(
126126
return
127127
return Job(job_id, redis=self, _deserializer=self.job_deserializer)
128128

129-
async def _get_job_result(self, key):
129+
async def _get_job_result(self, key) -> JobResult:
130130
job_id = key[len(result_key_prefix) :]
131131
job = Job(job_id, self, _deserializer=self.job_deserializer)
132132
r = await job.result_info()
@@ -141,6 +141,19 @@ async def all_job_results(self) -> List[JobResult]:
141141
results = await asyncio.gather(*[self._get_job_result(k) for k in keys])
142142
return sorted(results, key=attrgetter('enqueue_time'))
143143

144+
async def _get_job_def(self, job_id, score) -> JobDef:
145+
v = await self.get(job_key_prefix + job_id, encoding=None)
146+
jd = deserialize_job(v, deserializer=self.job_deserializer)
147+
jd.score = score
148+
return jd
149+
150+
async def queued_jobs(self, *, queue_name: str = default_queue_name) -> List[JobDef]:
151+
"""
152+
Get information about queued, mostly useful when testing.
153+
"""
154+
jobs = await self.zrange(queue_name, withscores=True)
155+
return await asyncio.gather(*[self._get_job_def(job_id, score) for job_id, score in jobs])
156+
144157

145158
async def create_pool(
146159
settings: RedisSettings = None,

tests/test_main.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
import asyncio
2+
import dataclasses
23
from collections import Counter
34
from datetime import datetime
45
from random import shuffle
56
from time import time
67

78
import pytest
8-
from pytest_toolbox.comparison import CloseToNow
9+
from pytest_toolbox.comparison import AnyInt, CloseToNow
910

1011
from arq.connections import ArqRedis
1112
from arq.constants import default_queue_name
12-
from arq.jobs import Job, SerializationError
13+
from arq.jobs import Job, JobDef, SerializationError
1314
from arq.utils import timestamp_ms
1415
from arq.worker import Retry, Worker, func
1516

@@ -162,3 +163,42 @@ async def foobar(ctx):
162163
await w.main()
163164
with pytest.raises(SerializationError):
164165
await j1.result(pole_delay=0)
166+
167+
168+
async def test_get_jobs(arq_redis: ArqRedis):
169+
await arq_redis.enqueue_job('foobar', a=1, b=2, c=3)
170+
await asyncio.sleep(0.01)
171+
await arq_redis.enqueue_job('second', 4, b=5, c=6)
172+
await asyncio.sleep(0.01)
173+
await arq_redis.enqueue_job('third', 7, b=8)
174+
jobs = await arq_redis.queued_jobs()
175+
assert [dataclasses.asdict(j) for j in jobs] == [
176+
{
177+
'function': 'foobar',
178+
'args': (),
179+
'kwargs': {'a': 1, 'b': 2, 'c': 3},
180+
'job_try': None,
181+
'enqueue_time': CloseToNow(),
182+
'score': AnyInt(),
183+
},
184+
{
185+
'function': 'second',
186+
'args': (4,),
187+
'kwargs': {'b': 5, 'c': 6},
188+
'job_try': None,
189+
'enqueue_time': CloseToNow(),
190+
'score': AnyInt(),
191+
},
192+
{
193+
'function': 'third',
194+
'args': (7,),
195+
'kwargs': {'b': 8},
196+
'job_try': None,
197+
'enqueue_time': CloseToNow(),
198+
'score': AnyInt(),
199+
},
200+
]
201+
assert jobs[0].score < jobs[1].score < jobs[2].score
202+
assert isinstance(jobs[0], JobDef)
203+
assert isinstance(jobs[1], JobDef)
204+
assert isinstance(jobs[2], JobDef)

0 commit comments

Comments
 (0)