Skip to content

Commit 2dbcad9

Browse files
Make snooze and unique middleware play nice with each other
Unique middleware unlocks the keys even if the job is snoozed and not completed. Make it aware of the snooze middleware.
1 parent a5ce5b6 commit 2dbcad9

7 files changed

Lines changed: 59 additions & 15 deletions

File tree

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,9 @@ Exq comes with an `enqueue_all` method which guarantees atomicity.
558558

559559
A Job can be snoozed by returning `{:snooze, time_to_sleep_in_seconds}`
560560
from perform method. By default this feature is not enabled. Add
561-
`Exq.Middleware.Snooze` to the middleware list to enable this feature.
561+
`Exq.Middleware.Snooze` to the middleware list to enable this
562+
feature. If you use `Exq.Middleware.Unique`, it must be placed
563+
before it.
562564

563565
```elixir
564566
defmodule MyWorker do

config/test.exs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ config :exq,
2626
Exq.Middleware.Stats,
2727
Exq.Middleware.Job,
2828
Exq.Middleware.Manager,
29+
Exq.Middleware.Snooze,
2930
Exq.Middleware.Unique,
30-
Exq.Middleware.Telemetry,
31-
Exq.Middleware.Snooze
31+
Exq.Middleware.Telemetry
3232
],
3333
queue_adapter: Exq.Adapters.Queue.Mock

lib/exq/middleware/snooze.ex

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ defmodule Exq.Middleware.Snooze do
22
@behaviour Exq.Middleware.Behaviour
33
alias Exq.Redis.JobQueue
44
alias Exq.Middleware.Pipeline
5+
import Pipeline
56

67
def before_work(pipeline) do
78
pipeline
@@ -13,15 +14,19 @@ defmodule Exq.Middleware.Snooze do
1314
)
1415
when is_number(seconds) do
1516
if assigns.job do
16-
JobQueue.snooze_job(
17-
assigns.redis,
18-
assigns.namespace,
19-
assigns.job,
20-
seconds
21-
)
22-
end
17+
{:ok, _jid} =
18+
JobQueue.snooze_job(
19+
assigns.redis,
20+
assigns.namespace,
21+
assigns.job,
22+
seconds
23+
)
2324

24-
pipeline
25+
pipeline
26+
|> assign(:job_snoozed, true)
27+
else
28+
pipeline
29+
end
2530
end
2631

2732
def after_processed_work(pipeline) do

lib/exq/middleware/unique.ex

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ defmodule Exq.Middleware.Unique do
2121
pipeline
2222
end
2323

24+
def after_processed_work(%Pipeline{assigns: %{job_snoozed: true}} = pipeline) do
25+
pipeline
26+
end
27+
2428
def after_processed_work(
2529
%Pipeline{assigns: %{job_serialized: job_serialized, redis: redis, namespace: namespace}} =
2630
pipeline

test/config_test.exs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -314,9 +314,9 @@ defmodule Exq.ConfigTest do
314314
Exq.Middleware.Stats,
315315
Exq.Middleware.Job,
316316
Exq.Middleware.Manager,
317+
Exq.Middleware.Snooze,
317318
Exq.Middleware.Unique,
318-
Exq.Middleware.Telemetry,
319-
Exq.Middleware.Snooze
319+
Exq.Middleware.Telemetry
320320
]
321321

322322
assert mode == :default

test/exq_test.exs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,39 @@ defmodule ExqTest do
814814
stop_process(sup)
815815
end
816816

817+
test "snooze + unique job" do
818+
Process.register(self(), :exqtest)
819+
{:ok, sup} = Exq.start_link(concurrency: 1, queues: ["q1"], scheduler_enable: true)
820+
821+
{:ok, j1} =
822+
Exq.enqueue(Exq, "q1", ExqTest.SnoozeWorker, [0.050, :snoozed],
823+
max_retries: 0,
824+
unique_for: 500,
825+
unique_token: "t1"
826+
)
827+
828+
{:conflict, ^j1} =
829+
Exq.enqueue(Exq, "q1", ExqTest.SnoozeWorker, [0.050, :snoozed],
830+
max_retries: 0,
831+
unique_for: 500,
832+
unique_token: "t1"
833+
)
834+
835+
:timer.sleep(100)
836+
assert_received {"snoozed"}
837+
838+
{:conflict, ^j1} =
839+
Exq.enqueue(Exq, "q1", ExqTest.SnoozeWorker, [0.050, :snoozed],
840+
max_retries: 0,
841+
unique_for: 500,
842+
unique_token: "t1"
843+
)
844+
845+
:timer.sleep(100)
846+
assert_received {"snoozed"}
847+
stop_process(sup)
848+
end
849+
817850
test "cancel job" do
818851
{:ok, sup} = Exq.start_link()
819852
{:ok, _} = Exq.enqueue(Exq, "default", ExqTest.SleepWorker, [60 * 1000, :worked])

test/middleware_test.exs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -239,9 +239,9 @@ defmodule MiddlewareTest do
239239
Exq.Middleware.Stats,
240240
Exq.Middleware.Job,
241241
Exq.Middleware.Manager,
242+
Exq.Middleware.Snooze,
242243
Exq.Middleware.Unique,
243-
Exq.Middleware.Telemetry,
244-
Exq.Middleware.Snooze
244+
Exq.Middleware.Telemetry
245245
]
246246

247247
assert Middleware.all(Middleware) == chain

0 commit comments

Comments
 (0)