Skip to content

Commit 06c05d5

Browse files
authored
prevent duplicate jobs when result exists (#138)
1 parent c4e89b3 commit 06c05d5

File tree

3 files changed

+20
-1
lines changed

3 files changed

+20
-1
lines changed

HISTORY.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
History
44
-------
55

6+
v0.16.1 (unreleased)
7+
....................
8+
* prevent duplicate ``job_id`` when job result exists, fix #137
9+
610
v0.16 (2019-07-30)
711
..................
812
* improved error when a job is aborted (eg. function not found)

arq/connections.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,9 @@ async def enqueue_job(
8383
pipe.unwatch()
8484
pipe.watch(job_key)
8585
job_exists = pipe.exists(job_key)
86+
job_result_exists = pipe.exists(result_key_prefix + job_id)
8687
await pipe.execute()
87-
if await job_exists:
88+
if await job_exists or await job_result_exists:
8889
return
8990

9091
enqueue_time_ms = timestamp_ms()

tests/test_worker.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from arq.connections import ArqRedis
1111
from arq.constants import default_queue_name, health_check_key_suffix, job_key_prefix
12+
from arq.jobs import Job, JobStatus
1213
from arq.worker import FailedJobs, JobExecutionFailed, Retry, Worker, async_check_health, check_health, func, run_worker
1314

1415

@@ -403,3 +404,16 @@ async def test_many_jobs_expire(arq_redis: ArqRedis, worker, caplog):
403404
log = '\n'.join(r.message for r in caplog.records)
404405
assert 'job testing-0 expired' in log
405406
assert log.count(' expired') == 100
407+
408+
409+
async def test_repeat_job_result(arq_redis: ArqRedis, worker):
410+
j1 = await arq_redis.enqueue_job('foobar', _job_id='job_id')
411+
assert isinstance(j1, Job)
412+
assert await j1.status() == JobStatus.queued
413+
414+
assert await arq_redis.enqueue_job('foobar', _job_id='job_id') is None
415+
416+
await worker(functions=[foobar]).run_check()
417+
assert await j1.status() == JobStatus.complete
418+
419+
assert await arq_redis.enqueue_job('foobar', _job_id='job_id') is None

0 commit comments

Comments
 (0)