Skip to content

Commit 19f0a88

Browse files
committed
get_redis_pool lock to avoid multiple pool being created
1 parent c764765 commit 19f0a88

File tree

5 files changed

+19
-7
lines changed

5 files changed

+19
-7
lines changed

HISTORY.rst

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,12 @@
33
History
44
-------
55

6-
v0.10.2 (2017-XX-XX)
6+
v0.10.3 (2017-08-19)
7+
....................
8+
* fix bug with ``RedisMixin.get_redis_pool`` creating multiple queues
9+
* tweak drain logs
10+
11+
v0.10.2 (2017-08-17)
712
....................
813
* only save job on task in drain if re-enqueuing
914
* add semaphore timeout to drains

arq/drain.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
__all__ = ['Drain']
1919

20+
# these loggers could do with more sensible names
2021
work_logger = logging.getLogger('arq.work')
2122
jobs_logger = logging.getLogger('arq.jobs')
2223

@@ -97,6 +98,7 @@ async def iter(self, *raw_queues: bytes, pop_timeout=1):
9798
await self.redis.rpush(quit_queue, b'1')
9899
raw_queues = tuple(raw_queues) + (quit_queue,)
99100
while True:
101+
work_logger.debug('task semaphore locked: %r', self.task_semaphore.locked())
100102
try:
101103
with timeout(self.semaphore_timeout):
102104
await self.task_semaphore.acquire()
@@ -115,7 +117,7 @@ async def iter(self, *raw_queues: bytes, pop_timeout=1):
115117
if self.burst_mode and raw_queue == quit_queue:
116118
work_logger.debug('got job from the quit queue, stopping')
117119
break
118-
work_logger.debug('jobs in progress %d', self.jobs_in_progress)
120+
work_logger.debug('yielding job, jobs in progress %d', self.jobs_in_progress)
119121
yield raw_queue, raw_data
120122

121123
def add(self, coro, job, re_enqueue=False):

arq/utils.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ def __init__(self, *,
6666
self.loop = loop or getattr(self, 'loop', None) or asyncio.get_event_loop()
6767
self.redis_settings = redis_settings or getattr(self, 'redis_settings', None) or RedisSettings()
6868
self._redis_pool = existing_pool
69+
self._create_pool_lock = asyncio.Lock(loop=self.loop)
6970

7071
async def create_redis_pool(self, *, _retry=0) -> RedisPool:
7172
"""
@@ -93,8 +94,9 @@ async def get_redis_pool(self) -> RedisPool:
9394
"""
9495
Get the redis pool, if a pool is already initialised it's returned, else one is crated.
9596
"""
96-
if self._redis_pool is None:
97-
self._redis_pool = await self.create_redis_pool()
97+
async with self._create_pool_lock:
98+
if self._redis_pool is None:
99+
self._redis_pool = await self.create_redis_pool()
98100
return self._redis_pool
99101

100102
async def get_redis_conn(self):

arq/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22

33
__all__ = ['VERSION']
44

5-
VERSION = StrictVersion('0.10.2')
5+
VERSION = StrictVersion('0.10.3')

tests/test_main.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,13 @@ async def test_dispatch_work(tmpworkdir, loop, caplog, redis_conn):
7575
'recording health: <date time2> j_complete=0 j_failed=0 j_timedout=0 j_ongoing=0 q_high=1 q_dft=1 q_low=0\n'
7676
'starting main blpop loop\n'
7777
'populating quit queue to prompt exit: arq:quit-<random>\n'
78-
'jobs in progress 1\n'
78+
'task semaphore locked: False\n'
79+
'yielding job, jobs in progress 1\n'
7980
'scheduling job <Job __id__ MockRedisDemoActor.high_add_numbers(3, 4, c=5) on high>, re-enqueue: False\n'
80-
'jobs in progress 2\n'
81+
'task semaphore locked: False\n'
82+
'yielding job, jobs in progress 2\n'
8183
'scheduling job <Job __id__ MockRedisDemoActor.add_numbers(1, 2) on dft>, re-enqueue: False\n'
84+
'task semaphore locked: False\n'
8285
'got job from the quit queue, stopping\n'
8386
'drain waiting 5.0s for 2 tasks to finish\n'
8487
'high queued 0.0XXs → __id__ MockRedisDemoActor.high_add_numbers(3, 4, c=5)\n'

0 commit comments

Comments
 (0)