Skip to content

Commit aa77ce3

Browse files
Merge pull request #461 from akira/more-api
Add api to immediatly enqeueue jobs from retry/scheduled queue
2 parents 23e3de6 + aa3455b commit aa77ce3

File tree

5 files changed

+94
-0
lines changed

5 files changed

+94
-0
lines changed

lib/exq/api.ex

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,21 @@ defmodule Exq.Api do
375375
GenServer.call(pid, :clear_retries)
376376
end
377377

378+
@doc """
379+
Re enqueue jobs from retry queue immediatly.
380+
381+
Expected args:
382+
* `pid` - Exq.Api process
383+
* `raw_job` - raw json encoded job value
384+
385+
Returns:
386+
* `{:ok, num_enqueued}`
387+
388+
"""
389+
def dequeue_retry_jobs(pid, raw_jobs) do
390+
GenServer.call(pid, {:dequeue_retry_jobs, raw_jobs})
391+
end
392+
378393
@doc """
379394
Number of jobs in the retry queue.
380395
@@ -447,6 +462,21 @@ defmodule Exq.Api do
447462
GenServer.call(pid, :clear_scheduled)
448463
end
449464

465+
@doc """
466+
Enqueue jobs from scheduled queue immediatly.
467+
468+
Expected args:
469+
* `pid` - Exq.Api process
470+
* `raw_job` - raw json encoded job value
471+
472+
Returns:
473+
* `{:ok, num_enqueued}`
474+
475+
"""
476+
def dequeue_scheduled_jobs(pid, raw_jobs) do
477+
GenServer.call(pid, {:dequeue_scheduled_jobs, raw_jobs})
478+
end
479+
450480
@doc """
451481
Number of scheduled jobs enqueued.
452482

lib/exq/api/server.ex

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,11 @@ defmodule Exq.Api.Server do
170170
{:reply, :ok, state}
171171
end
172172

173+
def handle_call({:dequeue_retry_jobs, raw_jobs}, _from, state) do
174+
result = JobQueue.dequeue_retry_jobs(state.redis, state.namespace, raw_jobs)
175+
{:reply, result, state}
176+
end
177+
173178
def handle_call({:remove_scheduled, jid}, _from, state) do
174179
JobQueue.remove_scheduled(state.redis, state.namespace, jid)
175180
{:reply, :ok, state}
@@ -180,6 +185,11 @@ defmodule Exq.Api.Server do
180185
{:reply, :ok, state}
181186
end
182187

188+
def handle_call({:dequeue_scheduled_jobs, raw_jobs}, _from, state) do
189+
result = JobQueue.dequeue_scheduled_jobs(state.redis, state.namespace, raw_jobs)
190+
{:reply, result, state}
191+
end
192+
183193
def handle_call({:remove_failed, jid}, _from, state) do
184194
JobStat.remove_failed(state.redis, state.namespace, jid)
185195
{:reply, :ok, state}

lib/exq/redis/job_queue.ex

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,10 @@ defmodule Exq.Redis.JobQueue do
398398
Connection.zrem!(redis, retry_queue_key(namespace), raw_jobs)
399399
end
400400

401+
def dequeue_retry_jobs(redis, namespace, raw_jobs) do
402+
dequeue_scheduled_jobs(redis, namespace, retry_queue_key(namespace), raw_jobs)
403+
end
404+
401405
def remove_scheduled(redis, namespace, jid) do
402406
{:ok, job} = find_job(redis, namespace, jid, :scheduled, false)
403407
Connection.zrem!(redis, scheduled_queue_key(namespace), job)
@@ -411,6 +415,10 @@ defmodule Exq.Redis.JobQueue do
411415
Connection.zrem!(redis, failed_queue_key(namespace), raw_jobs)
412416
end
413417

418+
def dequeue_scheduled_jobs(redis, namespace, raw_jobs) do
419+
dequeue_scheduled_jobs(redis, namespace, scheduled_queue_key(namespace), raw_jobs)
420+
end
421+
414422
def list_queues(redis, namespace) do
415423
Connection.smembers!(redis, full_key(namespace, "queues"))
416424
end
@@ -492,6 +500,10 @@ defmodule Exq.Redis.JobQueue do
492500
{jid, Config.serializer().encode!(job)}
493501
end
494502

503+
defp dequeue_scheduled_jobs(redis, namespace, queue_key, raw_jobs) do
504+
Script.eval!(redis, :scheduler_dequeue_jobs, [queue_key, full_key(namespace, "")], raw_jobs)
505+
end
506+
495507
defp get_max_retries do
496508
:max_retries
497509
|> Config.get()

lib/exq/redis/script.ex

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,22 @@ defmodule Exq.Redis.Script do
1212
end
1313

1414
@scripts %{
15+
scheduler_dequeue_jobs:
16+
Prepare.script("""
17+
local schedule_queue, namespace_prefix = KEYS[1], KEYS[2]
18+
local jobs = ARGV
19+
local dequeued = 0
20+
for _, job in ipairs(jobs) do
21+
local job_queue = cjson.decode(job)['queue']
22+
local count = redis.call('ZREM', schedule_queue, job)
23+
if count == 1 then
24+
redis.call('SADD', namespace_prefix .. 'queues', job_queue)
25+
redis.call('LPUSH', namespace_prefix .. 'queue:' .. job_queue, job)
26+
dequeued = dequeued + 1
27+
end
28+
end
29+
return dequeued
30+
"""),
1531
scheduler_dequeue:
1632
Prepare.script("""
1733
local schedule_queue = KEYS[1]

test/api_test.exs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,24 @@ defmodule ApiTest do
252252
assert {:ok, nil} = Exq.Api.find_scheduled(Exq.Api, jid)
253253
end
254254

255+
test "re enqueue jobs in retry queue" do
256+
jid = "1234"
257+
258+
JobQueue.retry_job(
259+
:testredis,
260+
'test',
261+
%Job{jid: "1234", queue: "test"},
262+
1,
263+
"this is an error"
264+
)
265+
266+
{:ok, [raw_job]} = Exq.Api.retries(Exq.Api, raw: true)
267+
assert {:ok, 1} = Exq.Api.dequeue_retry_jobs(Exq.Api, [raw_job])
268+
assert {:ok, nil} = Exq.Api.find_scheduled(Exq.Api, jid)
269+
270+
assert {:ok, 0} = Exq.Api.dequeue_retry_jobs(Exq.Api, [raw_job])
271+
end
272+
255273
test "remove job in scheduled queue" do
256274
{:ok, jid} = Exq.enqueue_in(Exq, 'custom', 1000, Bogus, [])
257275
Exq.Api.remove_scheduled(Exq.Api, jid)
@@ -265,6 +283,14 @@ defmodule ApiTest do
265283
assert {:ok, nil} = Exq.Api.find_scheduled(Exq.Api, jid)
266284
end
267285

286+
test "enqueue jobs in scheduled queue" do
287+
{:ok, jid} = Exq.enqueue_in(Exq, "custom", 1000, Bogus, [])
288+
{:ok, [raw_job]} = Exq.Api.scheduled(Exq.Api, raw: true)
289+
{:ok, 1} = Exq.Api.dequeue_scheduled_jobs(Exq.Api, [raw_job])
290+
assert {:ok, nil} = Exq.Api.find_scheduled(Exq.Api, jid)
291+
{:ok, 0} = Exq.Api.dequeue_scheduled_jobs(Exq.Api, [raw_job])
292+
end
293+
268294
test "remove job in failed queue" do
269295
JobQueue.fail_job(:testredis, 'test', %Job{jid: "1234"}, "this is an error")
270296
Exq.Api.remove_failed(Exq.Api, "1234")

0 commit comments

Comments
 (0)