Skip to content

Commit 6408709

Browse files
Add ability to snooze job
Implementation is same as how retry is implemented, just that retry_count is not incremented, so a job can be snoozed unlimited times
1 parent 2a23a33 commit 6408709

File tree

9 files changed

+124
-4
lines changed

9 files changed

+124
-4
lines changed

README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,20 @@ Exq comes with an `enqueue_all` method which guarantees atomicity.
554554
])
555555
```
556556

557+
## Snooze
558+
559+
A Job can be snoozed by returning `{:snooze, time_to_sleep_in_seconds}`
560+
from perform method. By default this feature is not enabled. Add
561+
`Exq.Middleware.Snooze` to the middleware list to enable this feature.
562+
563+
```elixir
564+
defmodule MyWorker do
565+
def perform do
566+
{:snooze, 10}
567+
end
568+
end
569+
```
570+
557571
## Web UI
558572

559573
Exq has a separate repo, exq_ui which provides with a Web UI to monitor your workers:

config/test.exs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ config :exq,
2727
Exq.Middleware.Job,
2828
Exq.Middleware.Manager,
2929
Exq.Middleware.Unique,
30-
Exq.Middleware.Telemetry
30+
Exq.Middleware.Telemetry,
31+
Exq.Middleware.Snooze
3132
],
3233
queue_adapter: Exq.Adapters.Queue.Mock

lib/exq/middleware/pipeline.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ defmodule Exq.Middleware.Pipeline do
4747
end
4848

4949
@doc """
50-
Sets `terminated` to true
50+
Sets `terminated` to true
5151
"""
5252
def terminate(%Pipeline{} = pipeline) do
5353
%{pipeline | terminated: true}

lib/exq/middleware/snooze.ex

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
defmodule Exq.Middleware.Snooze do
2+
@behaviour Exq.Middleware.Behaviour
3+
alias Exq.Redis.JobQueue
4+
alias Exq.Middleware.Pipeline
5+
6+
def before_work(pipeline) do
7+
pipeline
8+
end
9+
10+
def after_processed_work(
11+
%Pipeline{assigns: %{result: {:snooze, seconds}} = assigns} =
12+
pipeline
13+
)
14+
when is_number(seconds) do
15+
if assigns.job do
16+
JobQueue.snooze_job(
17+
assigns.redis,
18+
assigns.namespace,
19+
assigns.job,
20+
seconds
21+
)
22+
end
23+
24+
pipeline
25+
end
26+
27+
def after_processed_work(pipeline) do
28+
pipeline
29+
end
30+
31+
def after_failed_work(pipeline) do
32+
pipeline
33+
end
34+
end

lib/exq/redis/job_queue.ex

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,26 @@ defmodule Exq.Redis.JobQueue do
317317
end
318318
end
319319

320+
def snooze_job(redis, namespace, job, offset) do
321+
job =
322+
%{job | error_message: "Snoozed for #{offset} seconds"}
323+
|> add_failure_timestamp()
324+
325+
time = Time.offset_from_now(offset)
326+
Logger.info("Queueing job #{job.jid} to retry in #{offset} seconds")
327+
328+
{:ok, _jid} =
329+
do_enqueue_job_at(
330+
redis,
331+
namespace,
332+
job,
333+
Job.encode(job),
334+
job.jid,
335+
time,
336+
retry_queue_key(namespace)
337+
)
338+
end
339+
320340
def retry_or_fail_job(redis, namespace, job, error) do
321341
if dead?(job) do
322342
Logger.info("Max retries on job #{job.jid} exceeded")

test/config_test.exs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,8 @@ defmodule Exq.ConfigTest do
315315
Exq.Middleware.Job,
316316
Exq.Middleware.Manager,
317317
Exq.Middleware.Unique,
318-
Exq.Middleware.Telemetry
318+
Exq.Middleware.Telemetry,
319+
Exq.Middleware.Snooze
319320
]
320321

321322
assert mode == :default

test/exq_test.exs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,13 @@ defmodule ExqTest do
4747
end
4848
end
4949

50+
defmodule SnoozeWorker do
51+
def perform(time, message) do
52+
send(:exqtest, {message})
53+
{:snooze, time}
54+
end
55+
end
56+
5057
setup do
5158
TestRedis.setup()
5259

@@ -766,6 +773,17 @@ defmodule ExqTest do
766773
stop_process(sup)
767774
end
768775

776+
test "snooze job" do
777+
Process.register(self(), :exqtest)
778+
{:ok, sup} = Exq.start_link(concurrency: 1, queues: ["q1"], scheduler_enable: true)
779+
{:ok, _} = Exq.enqueue(Exq, "q1", ExqTest.SnoozeWorker, [0.050, :snoozed], max_retries: 0)
780+
:timer.sleep(100)
781+
assert_received {"snoozed"}
782+
:timer.sleep(100)
783+
assert_received {"snoozed"}
784+
stop_process(sup)
785+
end
786+
769787
defp enqueue_fail_job(count) do
770788
for _ <- 0..(count - 1) do
771789
{:ok, _} =

test/job_queue_test.exs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,37 @@ defmodule JobQueueTest do
171171
end)
172172
end
173173

174+
test "snooze job" do
175+
with_application_env(:exq, :max_retries, 1, fn ->
176+
JobQueue.snooze_job(
177+
:testredis,
178+
"test",
179+
%{
180+
retry_count: 0,
181+
retry: true,
182+
queue: "default",
183+
class: "MyWorker",
184+
jid: UUID.uuid4(),
185+
error_class: nil,
186+
error_message: "failed",
187+
retried_at: Time.unix_seconds(),
188+
failed_at: Time.unix_seconds(),
189+
enqueued_at: Time.unix_seconds(),
190+
finished_at: nil,
191+
processor: nil,
192+
args: [],
193+
unique_for: nil,
194+
unique_until: nil,
195+
unique_token: nil,
196+
unlocks_at: nil
197+
},
198+
10
199+
)
200+
201+
assert JobQueue.queue_size(:testredis, "test", :retry) == 1
202+
end)
203+
end
204+
174205
test "scheduler_dequeue max_score" do
175206
add_usecs = fn time, offset ->
176207
base = time |> DateTime.to_unix(:microsecond)

test/middleware_test.exs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,8 @@ defmodule MiddlewareTest do
240240
Exq.Middleware.Job,
241241
Exq.Middleware.Manager,
242242
Exq.Middleware.Unique,
243-
Exq.Middleware.Telemetry
243+
Exq.Middleware.Telemetry,
244+
Exq.Middleware.Snooze
244245
]
245246

246247
assert Middleware.all(Middleware) == chain

0 commit comments

Comments
 (0)