Skip to content

Commit 742631a

Browse files
Add api to re-enqueue dead job
1 parent aa77ce3 commit 742631a

File tree

4 files changed

+37
-3
lines changed

4 files changed

+37
-3
lines changed

lib/exq/api.ex

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -303,6 +303,21 @@ defmodule Exq.Api do
303303
GenServer.call(pid, :clear_failed)
304304
end
305305

306+
@doc """
307+
Re Enqueue jobs from dead queue.
308+
309+
Expected args:
310+
* `pid` - Exq.Api process
311+
* `raw_job` - raw json encoded job value
312+
313+
Returns:
314+
* `{:ok, num_enqueued}`
315+
316+
"""
317+
def dequeue_failed_jobs(pid, raw_jobs) do
318+
GenServer.call(pid, {:dequeue_failed_jobs, raw_jobs})
319+
end
320+
306321
@doc """
307322
Number of jobs that have failed and exceeded their retry count.
308323

lib/exq/api/server.ex

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,11 @@ defmodule Exq.Api.Server do
205205
{:reply, :ok, state}
206206
end
207207

208+
def handle_call({:dequeue_failed_jobs, raw_jobs}, _from, state) do
209+
result = JobQueue.dequeue_failed_jobs(state.redis, state.namespace, raw_jobs)
210+
{:reply, result, state}
211+
end
212+
208213
def handle_call(:clear_processes, _from, state) do
209214
JobStat.clear_processes(state.redis, state.namespace)
210215
{:reply, :ok, state}

lib/exq/redis/job_queue.ex

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -411,12 +411,16 @@ defmodule Exq.Redis.JobQueue do
411411
Connection.zrem!(redis, scheduled_queue_key(namespace), raw_jobs)
412412
end
413413

414+
def dequeue_scheduled_jobs(redis, namespace, raw_jobs) do
415+
dequeue_scheduled_jobs(redis, namespace, scheduled_queue_key(namespace), raw_jobs)
416+
end
417+
414418
def remove_failed_jobs(redis, namespace, raw_jobs) do
415419
Connection.zrem!(redis, failed_queue_key(namespace), raw_jobs)
416420
end
417421

418-
def dequeue_scheduled_jobs(redis, namespace, raw_jobs) do
419-
dequeue_scheduled_jobs(redis, namespace, scheduled_queue_key(namespace), raw_jobs)
422+
def dequeue_failed_jobs(redis, namespace, raw_jobs) do
423+
dequeue_scheduled_jobs(redis, namespace, failed_queue_key(namespace), raw_jobs)
420424
end
421425

422426
def list_queues(redis, namespace) do

test/api_test.exs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,8 +266,8 @@ defmodule ApiTest do
266266
{:ok, [raw_job]} = Exq.Api.retries(Exq.Api, raw: true)
267267
assert {:ok, 1} = Exq.Api.dequeue_retry_jobs(Exq.Api, [raw_job])
268268
assert {:ok, nil} = Exq.Api.find_scheduled(Exq.Api, jid)
269-
270269
assert {:ok, 0} = Exq.Api.dequeue_retry_jobs(Exq.Api, [raw_job])
270+
assert {:ok, [^raw_job]} = Exq.Api.jobs(Exq.Api, "test", raw: true)
271271
end
272272

273273
test "remove job in scheduled queue" do
@@ -289,6 +289,7 @@ defmodule ApiTest do
289289
{:ok, 1} = Exq.Api.dequeue_scheduled_jobs(Exq.Api, [raw_job])
290290
assert {:ok, nil} = Exq.Api.find_scheduled(Exq.Api, jid)
291291
{:ok, 0} = Exq.Api.dequeue_scheduled_jobs(Exq.Api, [raw_job])
292+
assert {:ok, [^raw_job]} = Exq.Api.jobs(Exq.Api, "custom", raw: true)
292293
end
293294

294295
test "remove job in failed queue" do
@@ -304,6 +305,15 @@ defmodule ApiTest do
304305
{:ok, nil} = Exq.Api.find_failed(Exq.Api, "1234")
305306
end
306307

308+
test "enqueue jobs in failed queue" do
309+
JobQueue.fail_job(:testredis, 'test', %Job{jid: "1234", queue: "test"}, "this is an error")
310+
{:ok, [raw_job]} = Exq.Api.failed(Exq.Api, raw: true)
311+
{:ok, 1} = Exq.Api.dequeue_failed_jobs(Exq.Api, [raw_job])
312+
assert {:ok, nil} = Exq.Api.find_failed(Exq.Api, "1234")
313+
{:ok, 0} = Exq.Api.dequeue_failed_jobs(Exq.Api, [raw_job])
314+
assert {:ok, [^raw_job]} = Exq.Api.jobs(Exq.Api, "test", raw: true)
315+
end
316+
307317
test "clear job queue" do
308318
{:ok, jid} = Exq.enqueue(Exq, 'custom', Bogus, [])
309319
Exq.Api.remove_queue(Exq.Api, 'custom')

0 commit comments

Comments
 (0)