@@ -5,6 +5,10 @@ defmodule Exq.Node.Server do
55 alias Exq.Support.Time
66 alias Exq.Redis.JobStat
77 alias Exq.Support.Node
8+ alias Exq.Serializers.JsonSerializer
9+ alias Exq.Worker.Metadata
10+ alias Exq.Support.Job
11+ alias Exq.Worker.Server
812
913 defmodule State do
1014 defstruct [
@@ -14,6 +18,7 @@ defmodule Exq.Node.Server do
1418 :redis ,
1519 :node_id ,
1620 :manager ,
21+ :metadata ,
1722 :workers_sup ,
1823 ping_count: 0
1924 ]
@@ -26,6 +31,7 @@ defmodule Exq.Node.Server do
2631 __MODULE__ ,
2732 % State {
2833 manager: Keyword . fetch! ( options , :manager ) ,
34+ metadata: Keyword . fetch! ( options , :metadata ) ,
2935 workers_sup: Keyword . fetch! ( options , :workers_sup ) ,
3036 node_id: node_id ,
3137 node: build_node ( node_id ) ,
@@ -58,7 +64,7 @@ defmodule Exq.Node.Server do
5864
5965 :ok =
6066 JobStat . node_ping ( redis , namespace , node )
61- |> process_signal ( state )
67+ |> process_signals ( state )
6268
6369 if Integer . mod ( state . ping_count , 10 ) == 0 do
6470 JobStat . prune_dead_nodes ( redis , namespace )
@@ -73,13 +79,50 @@ defmodule Exq.Node.Server do
7379 { :noreply , state }
7480 end
7581
76- defp process_signal ( nil , _ ) , do: :ok
82+ defp process_signals ( signals , state ) do
83+ Enum . each ( signals , fn signal ->
84+ :ok = process_signal ( signal , state )
85+ end )
86+
87+ :ok
88+ end
7789
7890 defp process_signal ( "TSTP" , state ) do
7991 Logger . info ( "Received TSTP, unsubscribing from all queues" )
8092 :ok = Exq . unsubscribe_all ( state . manager )
8193 end
8294
95+ # Make sure the process is running the jid before canceling the
96+ # job. We don't want to send cancel message to unknown process,
97+ # which could happen if we process the signals after a restart, in
98+ # that case, the pid could point to a completely unrelated process.
99+ defp process_signal ( "CANCEL:" <> args , state ) do
100+ case JsonSerializer . decode ( args ) do
101+ { :ok , % { "pid" => "#PID" <> worker_pid_string , "jid" => jid } } ->
102+ worker_pid = :erlang . list_to_pid ( ~c" #{ worker_pid_string } " )
103+
104+ case Process . info ( worker_pid , :links ) do
105+ { :links , links } when length ( links ) <= 10 ->
106+ if Enum . any? ( links , fn link ->
107+ match? ( % Job { jid: ^ jid } , Metadata . lookup ( state . metadata , link ) )
108+ end ) do
109+ Server . cancel ( worker_pid )
110+ Logger . info ( "Canceled jid #{ jid } " )
111+ else
112+ Logger . warning ( "Not able to find worker process to cancel" )
113+ end
114+
115+ _ ->
116+ Logger . warning ( "Not able to find worker process to cancel" )
117+ end
118+
119+ _ ->
120+ Logger . warning ( "Received invalid args for cancel, args: #{ args } " )
121+ end
122+
123+ :ok
124+ end
125+
83126 defp process_signal ( unknown , _ ) do
84127 Logger . warning ( "Received unsupported signal #{ unknown } " )
85128 :ok
0 commit comments