Skip to content

Commit 2249213

Browse files
Add ability to cancel running jobs
1 parent ee79ce8 commit 2249213

File tree

9 files changed

+133
-24
lines changed

9 files changed

+133
-24
lines changed

lib/exq/api.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -560,6 +560,7 @@ defmodule Exq.Api do
560560
561561
Supported Signals
562562
* TSTP - unsubscibe from all queues
563+
* CANCEL:{"pid":"#PID<0.42.0>","jid":"uuid"} - cancel the job process
563564
564565
Returns:
565566
* :ok

lib/exq/middleware/job.ex

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ defmodule Exq.Middleware.Job do
99

1010
pipeline
1111
|> assign(:job, job)
12+
|> assign(:job_canceled, false)
1213
|> assign(:worker_module, Exq.Support.Coercion.to_module(job.class))
1314
end
1415

@@ -22,12 +23,21 @@ defmodule Exq.Middleware.Job do
2223

2324
defp retry_or_fail_job(%Pipeline{assigns: assigns} = pipeline) do
2425
if assigns.job do
25-
JobQueue.retry_or_fail_job(
26-
assigns.redis,
27-
assigns.namespace,
28-
assigns.job,
29-
to_string(assigns.error_message)
30-
)
26+
if assigns.job_canceled do
27+
JobQueue.fail_job(
28+
assigns.redis,
29+
assigns.namespace,
30+
assigns.job,
31+
"Canceled"
32+
)
33+
else
34+
JobQueue.retry_or_fail_job(
35+
assigns.redis,
36+
assigns.namespace,
37+
assigns.job,
38+
to_string(assigns.error_message)
39+
)
40+
end
3141
end
3242

3343
pipeline

lib/exq/node/server.ex

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ defmodule Exq.Node.Server do
55
alias Exq.Support.Time
66
alias Exq.Redis.JobStat
77
alias Exq.Support.Node
8+
alias Exq.Serializers.JsonSerializer
9+
alias Exq.Worker.Metadata
10+
alias Exq.Support.Job
11+
alias Exq.Worker.Server
812

913
defmodule State do
1014
defstruct [
@@ -14,6 +18,7 @@ defmodule Exq.Node.Server do
1418
:redis,
1519
:node_id,
1620
:manager,
21+
:metadata,
1722
:workers_sup,
1823
ping_count: 0
1924
]
@@ -26,6 +31,7 @@ defmodule Exq.Node.Server do
2631
__MODULE__,
2732
%State{
2833
manager: Keyword.fetch!(options, :manager),
34+
metadata: Keyword.fetch!(options, :metadata),
2935
workers_sup: Keyword.fetch!(options, :workers_sup),
3036
node_id: node_id,
3137
node: build_node(node_id),
@@ -58,7 +64,7 @@ defmodule Exq.Node.Server do
5864

5965
:ok =
6066
JobStat.node_ping(redis, namespace, node)
61-
|> process_signal(state)
67+
|> process_signals(state)
6268

6369
if Integer.mod(state.ping_count, 10) == 0 do
6470
JobStat.prune_dead_nodes(redis, namespace)
@@ -73,13 +79,53 @@ defmodule Exq.Node.Server do
7379
{:noreply, state}
7480
end
7581

76-
defp process_signal(nil, _), do: :ok
82+
defp process_signals(signals, state) do
83+
Enum.each(signals, fn signal ->
84+
:ok = process_signal(signal, state)
85+
end)
86+
87+
:ok
88+
end
7789

7890
defp process_signal("TSTP", state) do
7991
Logger.info("Received TSTP, unsubscribing from all queues")
8092
:ok = Exq.unsubscribe_all(state.manager)
8193
end
8294

95+
# Make sure the process is running the jid before canceling the
96+
# job. We don't want to send cancel message to unknown process,
97+
# which could happen if we process the signals after a restart, in
98+
# that case, the pid could point to a completely unrelated process.
99+
defp process_signal("CANCEL:" <> args, state) do
100+
case JsonSerializer.decode(args) do
101+
{:ok, %{"pid" => "#PID" <> worker_pid_string, "jid" => jid}} ->
102+
worker_pid = :erlang.list_to_pid(~c"#{worker_pid_string}")
103+
104+
case Process.info(worker_pid, :links) do
105+
{:links, links} when length(links) <= 10 ->
106+
job_pid =
107+
Enum.find(links, fn link ->
108+
match?(%Job{jid: ^jid}, Metadata.lookup(state.metadata, link))
109+
end)
110+
111+
if job_pid do
112+
Server.cancel(worker_pid)
113+
Logger.info("Canceled jid #{jid}")
114+
else
115+
Logger.warning("Not able to find worker process to cancel")
116+
end
117+
118+
nil ->
119+
Logger.warning("Not able to find worker process to cancel")
120+
end
121+
122+
_ ->
123+
Logger.warning("Received invalid args for cancel, args: #{args}")
124+
end
125+
126+
:ok
127+
end
128+
83129
defp process_signal(unknown, _) do
84130
Logger.warning("Received unsupported signal #{unknown}")
85131
:ok

lib/exq/redis/job_stat.ex

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ defmodule Exq.Redis.JobStat do
7070

7171
def node_ping(redis, namespace, node) do
7272
key = node_info_key(namespace, node.identity)
73+
max_signals = 100
7374

7475
case Connection.qp(
7576
redis,
@@ -89,12 +90,13 @@ defmodule Exq.Redis.JobStat do
8990
node.quiet
9091
],
9192
["EXPIRE", key, 60],
92-
["RPOP", "#{key}-signals"],
93+
["LRANGE", "#{key}-signals", 0, max_signals - 1],
94+
["LTRIM", "#{key}-signals", max_signals, -1],
9395
["EXEC"]
9496
]
9597
) do
96-
{:ok, ["OK", "QUEUED", "QUEUED", "QUEUED", "QUEUED", [_, "OK", 1, signal]]} ->
97-
signal
98+
{:ok, ["OK", "QUEUED", "QUEUED", "QUEUED", "QUEUED", "QUEUED", [_, "OK", 1, signals, "OK"]]} ->
99+
signals
98100

99101
error ->
100102
Logger.error("Failed to send node stats. Unexpected error from redis: #{inspect(error)}")

lib/exq/worker/metadata.ex

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,10 @@ defmodule Exq.Worker.Metadata do
1818
end
1919

2020
def lookup(server, pid) when is_pid(pid) do
21-
:ets.lookup_element(server, pid, 3)
21+
case :ets.lookup(server, pid) do
22+
[{_pid, _ref, value}] -> value
23+
_ -> nil
24+
end
2225
end
2326

2427
## ===========================================================

lib/exq/worker/server.ex

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ defmodule Exq.Worker.Server do
3333
middleware: nil,
3434
pipeline: nil,
3535
metadata: nil,
36-
middleware_state: nil
36+
middleware_state: nil,
37+
task_pid: nil
3738
end
3839

3940
def start_link(
@@ -61,6 +62,13 @@ defmodule Exq.Worker.Server do
6162
GenServer.cast(pid, :work)
6263
end
6364

65+
@doc """
66+
Cancel the current job
67+
"""
68+
def cancel(pid) do
69+
GenServer.cast(pid, :cancel)
70+
end
71+
6472
## ===========================================================
6573
## GenServer callbacks
6674
## ===========================================================
@@ -106,12 +114,14 @@ defmodule Exq.Worker.Server do
106114

107115
# Dispatch work to the target module (call :perform method of target).
108116
def handle_cast(:dispatch, state) do
109-
dispatch_work(
110-
state.pipeline.assigns.worker_module,
111-
state.pipeline.assigns.job,
112-
state.metadata
113-
)
114-
117+
task_pid =
118+
dispatch_work(
119+
state.pipeline.assigns.worker_module,
120+
state.pipeline.assigns.job,
121+
state.metadata
122+
)
123+
124+
state = %{state | task_pid: task_pid}
115125
{:noreply, state}
116126
end
117127

@@ -127,6 +137,20 @@ defmodule Exq.Worker.Server do
127137
{:stop, :normal, state}
128138
end
129139

140+
def handle_cast(:cancel, state) do
141+
task_pid = state.task_pid
142+
143+
state =
144+
if task_pid do
145+
Process.exit(task_pid, :kill)
146+
%{state | pipeline: Pipeline.assign(state.pipeline, :job_canceled, true)}
147+
else
148+
state
149+
end
150+
151+
{:noreply, state}
152+
end
153+
130154
def handle_info({:DOWN, _, _, _, :normal}, state) do
131155
state =
132156
if !has_pipeline_after_work_ran?(state.pipeline) do
@@ -177,6 +201,7 @@ defmodule Exq.Worker.Server do
177201
end)
178202

179203
Process.monitor(pid)
204+
pid
180205
end
181206

182207
defp before_work(state) do

test/api_test.exs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,10 @@ defmodule ApiTest do
100100
end
101101

102102
test "send signal" do
103-
assert nil == JobStat.node_ping(:testredis, "test", %Node{identity: "host1", busy: 1})
103+
assert [] == JobStat.node_ping(:testredis, "test", %Node{identity: "host1", busy: 1})
104104
assert :ok = Exq.Api.send_signal(Exq.Api, "host1", "TSTP")
105-
assert "TSTP" == JobStat.node_ping(:testredis, "test", %Node{identity: "host1", busy: 1})
105+
assert ["TSTP"] == JobStat.node_ping(:testredis, "test", %Node{identity: "host1", busy: 1})
106+
assert [] == JobStat.node_ping(:testredis, "test", %Node{identity: "host1", busy: 1})
106107
end
107108

108109
test "jobs when empty" do

test/exq_test.exs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -814,6 +814,29 @@ defmodule ExqTest do
814814
stop_process(sup)
815815
end
816816

817+
test "cancel job" do
818+
{:ok, sup} = Exq.start_link()
819+
{:ok, _} = Exq.enqueue(Exq, "default", ExqTest.SleepWorker, [60 * 1000, :worked])
820+
821+
Process.sleep(100)
822+
host = Exq.NodeIdentifier.HostnameIdentifier.node_id()
823+
JobStat.node_ping(:testredis, "test", %Node{identity: host, busy: 1})
824+
825+
{:ok, [%{payload: %{jid: jid}, pid: pid}]} = Exq.Api.processes(Exq.Api)
826+
assert {:ok, []} = Exq.Api.failed(Exq.Api)
827+
828+
:ok =
829+
Exq.Api.send_signal(
830+
Exq.Api,
831+
host,
832+
"CANCEL:#{Jason.encode!(%{jid: jid, pid: pid})}"
833+
)
834+
835+
Process.sleep(5200)
836+
assert {:ok, [%{jid: ^jid, error_message: "Canceled"}]} = Exq.Api.failed(Exq.Api)
837+
{:ok, []} = Exq.Api.processes(Exq.Api)
838+
end
839+
817840
defp enqueue_fail_job(count) do
818841
for _ <- 0..(count - 1) do
819842
{:ok, _} =

test/metadata_test.exs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@ defmodule MetadataTest do
2525
send(pid, :fetch_and_quit)
2626
Process.sleep(50)
2727

28-
assert_raise ArgumentError, fn ->
29-
Metadata.lookup(metadata, pid)
30-
end
28+
assert Metadata.lookup(metadata, pid) == nil
3129
end
3230

3331
test "custom name" do

0 commit comments

Comments
 (0)