Skip to content

Commit ae2143f

Browse files
Only lock unique lock if it's acquired by the given job
Currently any job can unlock unique token, even if it is set by different job. 1) j1 sets lock (expires) 2) j2 sets lock 3) j1 unlocks the lock set by j2 This creates more problem, as more jobs can be enqueued with same unique token. Be conservative and only unlock if the lock is acquired by the same jid. Incase of expired lock, unlock will become a no-op instead of unlock lock set by another jid.
1 parent 9aa3f0c commit ae2143f

File tree

4 files changed

+63
-11
lines changed

4 files changed

+63
-11
lines changed

lib/exq/middleware/unique.ex

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ defmodule Exq.Middleware.Unique do
1010
job = Exq.Support.Job.decode(job_serialized)
1111

1212
case job do
13-
%{unique_until: "start", unique_token: unique_token, retry_count: retry_count}
13+
%{unique_until: "start", unique_token: unique_token, retry_count: retry_count, jid: jid}
1414
when retry_count in [0, nil] ->
15-
{:ok, _} = JobQueue.unlock(redis, namespace, unique_token)
15+
{:ok, _} = JobQueue.unlock(redis, namespace, unique_token, jid)
1616

1717
_ ->
1818
:ok
@@ -28,8 +28,8 @@ defmodule Exq.Middleware.Unique do
2828
job = Exq.Support.Job.decode(job_serialized)
2929

3030
case job do
31-
%{unique_until: "success", unique_token: unique_token} ->
32-
{:ok, _} = JobQueue.unlock(redis, namespace, unique_token)
31+
%{unique_until: "success", unique_token: unique_token, jid: jid} ->
32+
{:ok, _} = JobQueue.unlock(redis, namespace, unique_token, jid)
3333

3434
_ ->
3535
:ok
@@ -45,9 +45,9 @@ defmodule Exq.Middleware.Unique do
4545
job = Exq.Support.Job.decode(job_serialized)
4646

4747
case job do
48-
%{unique_until: "success", unique_token: unique_token} ->
48+
%{unique_until: "success", unique_token: unique_token, jid: jid} ->
4949
if JobQueue.dead?(job) do
50-
{:ok, _} = JobQueue.unlock(redis, namespace, unique_token)
50+
{:ok, _} = JobQueue.unlock(redis, namespace, unique_token, jid)
5151
end
5252

5353
_ ->

lib/exq/redis/job_queue.ex

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,25 @@ defmodule Exq.Redis.JobQueue do
5858
end
5959

6060
def unlock_jobs(redis, namespace, raw_jobs) do
61-
for job <- raw_jobs,
62-
unique_token = Job.decode(job).unique_token do
63-
unlock(redis, namespace, unique_token)
61+
for raw_job <- raw_jobs,
62+
job = Job.decode(raw_job),
63+
unique_token = job.unique_token do
64+
unlock(redis, namespace, unique_token, job.jid)
6465
end
6566
end
6667

67-
def unlock(redis, namespace, unique_token) do
68-
Connection.del!(redis, unique_key(namespace, unique_token), retry_on_connection_error: 3)
68+
def unlock(redis, namespace, unique_token, jid) do
69+
Exq.Support.Redis.with_retry_on_connection_error(
70+
fn ->
71+
Script.eval!(
72+
redis,
73+
:compare_and_delete,
74+
[unique_key(namespace, unique_token)],
75+
[jid]
76+
)
77+
end,
78+
3
79+
)
6980
end
7081

7182
def enqueue_in(redis, namespace, queue, offset, worker, args, options)

lib/exq/redis/script.ex

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,17 @@ defmodule Exq.Redis.Script do
1212
end
1313

1414
@scripts %{
15+
compare_and_delete:
16+
Prepare.script("""
17+
local key = KEYS[1]
18+
local expected_value = ARGV[1]
19+
local current_value = redis.call("get", key)
20+
if current_value == expected_value then
21+
return redis.call("del", key)
22+
else
23+
return 0
24+
end
25+
"""),
1526
enqueue:
1627
Prepare.script("""
1728
local queues_key, job_queue_key, unique_key = KEYS[1], KEYS[2], KEYS[3]

test/exq_test.exs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,36 @@ defmodule ExqTest do
599599
stop_process(sup)
600600
end
601601

602+
test "handle lock expiry gracefully" do
603+
Process.register(self(), :exqtest)
604+
{:ok, sup} = Exq.start_link(concurrency: 1, queues: ["q1"])
605+
606+
{:ok, j1} =
607+
Exq.enqueue(Exq, "q1", ExqTest.SleepWorker, [3000, :worked1],
608+
unique_for: 1,
609+
unique_token: "t1"
610+
)
611+
612+
:timer.sleep(2000)
613+
614+
{:ok, j2} =
615+
Exq.enqueue(Exq, "q1", ExqTest.SleepWorker, [3000, :worked],
616+
unique_for: 60,
617+
unique_token: "t1"
618+
)
619+
620+
:timer.sleep(1500)
621+
assert_received {"worked1"}
622+
623+
{:conflict, ^j2} =
624+
Exq.enqueue(Exq, "q1", ExqTest.SleepWorker, [3000, :worked],
625+
unique_for: 60,
626+
unique_token: "t1"
627+
)
628+
629+
stop_process(sup)
630+
end
631+
602632
defmodule ConstantBackoff do
603633
@behaviour Exq.Backoff.Behaviour
604634

0 commit comments

Comments
 (0)