forked from quirkey/resque-status
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathstatus.rb
254 lines (225 loc) · 8.36 KB
/
status.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
module Resque
module Plugins
# Resque::Plugins::Status is a module your jobs will include.
# It provides helper methods for updating the status/etc from within an
# instance as well as class methods for creating and queuing the jobs.
#
# All you have to do to get this functionality is include Resque::Plugins::Status
# and then implement a <tt>perform<tt> method.
#
# For example
#
# class ExampleJob
# include Resque::Plugins::Status
#
# def perform
# num = options['num']
# i = 0
# while i < num
# i += 1
# at(i, num)
# end
# completed("Finished!")
# end
#
# end
#
# This job would iterate num times updating the status as it goes. At the end
# we update the status telling anyone listening to this job that its complete.
module Status
VERSION = '0.6.0'
STATUS_QUEUED = 'queued'
STATUS_WORKING = 'working'
STATUS_COMPLETED = 'completed'
STATUS_FAILED = 'failed'
STATUS_KILLED = 'killed'
STATUSES = [
STATUS_QUEUED,
STATUS_WORKING,
STATUS_COMPLETED,
STATUS_FAILED,
STATUS_KILLED
].freeze
autoload :Hash, 'resque/plugins/status/hash'
# The error class raised when a job is killed
class Killed < RuntimeError; end
class NotANumber < RuntimeError; end
attr_reader :uuid, :options
def self.included(base)
base.extend(ClassMethods)
end
module ClassMethods
# The default queue is :statused, this can be ovveridden in the specific job
# class to put the jobs on a specific worker queue
def queue
:statused
end
# used when displaying the Job in the resque-web UI and identifiyng the job
# type by status. By default this is the name of the job class, but can be
# ovveridden in the specific job class to present a more user friendly job
# name
def name
self.to_s
end
# Create is the primary method for adding jobs to the queue. This would be
# called on the job class to create a job of that type. Any options passed are
# passed to the Job instance as a hash of options. It returns the UUID of the
# job.
#
# == Example:
#
# class ExampleJob
# include Resque::Plugins::Status
#
# def perform
# set_status "Hey I'm a job num #{options['num']}"
# end
#
# end
#
# job_id = ExampleJob.create(:num => 100)
#
def create(options = {})
self.enqueue(self, options)
end
# Adds a job of type <tt>klass<tt> to the queue with <tt>options<tt>.
#
# Returns the UUID of the job if the job was queued, or nil if the job was
# rejected by a before_enqueue hook.
def enqueue(klass, options = {})
self.enqueue_to(Resque.queue_from_class(klass) || queue, klass, options)
end
# Adds a job of type <tt>klass<tt> to a specified queue with <tt>options<tt>.
#
# Returns the UUID of the job if the job was queued, or nil if the job was
# rejected by a before_enqueue hook.
def enqueue_to(queue, klass, options = {})
uuid = Resque::Plugins::Status::Hash.generate_uuid
Resque::Plugins::Status::Hash.create uuid, :options => options
if Resque.enqueue_to(queue, klass, uuid, options)
uuid
else
Resque::Plugins::Status::Hash.remove(uuid)
nil
end
end
# Removes a job of type <tt>klass<tt> from the queue.
#
# The initially given options are retrieved from the status hash.
# (Resque needs the options to find the correct queue entry)
def dequeue(klass, uuid)
status = Resque::Plugins::Status::Hash.get(uuid)
Resque.dequeue(klass, uuid, status.options)
end
# This is the method called by Resque::Worker when processing jobs. It
# creates a new instance of the job class and populates it with the uuid and
# options.
#
# You should not override this method, rahter the <tt>perform</tt> instance method.
def perform(uuid=nil, options = {})
uuid ||= Resque::Plugins::Status::Hash.generate_uuid
instance = new(uuid, options)
instance.safe_perform!
instance
end
# Wrapper API to forward a Resque::Job creation API call into a Resque::Plugins::Status call.
# This is needed to be used with resque scheduler
# http://github.com/bvandenbos/resque-scheduler
def scheduled(queue, klass, *args)
self.enqueue_to(queue, self, *args)
end
end
# Create a new instance with <tt>uuid</tt> and <tt>options</tt>
def initialize(uuid, options = {})
@uuid = uuid
@options = options
end
# Run by the Resque::Worker when processing this job. It wraps the <tt>perform</tt>
# method ensuring that the final status of the job is set regardless of error.
# If an error occurs within the job's work, it will set the status as failed and
# re-raise the error.
def safe_perform!
set_status({'status' => STATUS_WORKING})
perform
if status && status.failed?
on_failure(status.message) if respond_to?(:on_failure)
return
elsif status && !status.completed?
completed
end
on_success if respond_to?(:on_success)
rescue Killed
Resque::Plugins::Status::Hash.killed(uuid)
on_killed if respond_to?(:on_killed)
rescue => e
failed("The task failed because of an error: #{e}")
if respond_to?(:on_failure)
on_failure(e)
else
raise e
end
end
# Set the jobs status. Can take an array of strings or hashes that are merged
# (in order) into a final status hash.
def status=(new_status)
Resque::Plugins::Status::Hash.set(uuid, *new_status)
end
# get the Resque::Plugins::Status::Hash object for the current uuid
def status
Resque::Plugins::Status::Hash.get(uuid)
end
def name
"#{self.class.name}(#{options.inspect unless options.empty?})"
end
# Checks against the kill list if this specific job instance should be killed
# on the next iteration
def should_kill?
Resque::Plugins::Status::Hash.should_kill?(uuid)
end
# set the status of the job for the current itteration. <tt>num</tt> and
# <tt>total</tt> are passed to the status as well as any messages.
# This will kill the job if it has been added to the kill list with
# <tt>Resque::Plugins::Status::Hash.kill()</tt>
def at(num, total, *messages)
if total.to_f <= 0.0
raise(NotANumber, "Called at() with total=#{total} which is not a number")
end
tick({
'num' => num,
'total' => total
}, *messages)
end
# sets the status of the job for the current itteration. You should use
# the <tt>at</tt> method if you have actual numbers to track the iteration count.
# This will kill the job if it has been added to the kill list with
# <tt>Resque::Plugins::Status::Hash.kill()</tt>
def tick(*messages)
kill! if should_kill?
set_status({'status' => STATUS_WORKING}, *messages)
end
# set the status to 'failed' passing along any additional messages
def failed(*messages)
set_status({'status' => STATUS_FAILED}, *messages)
end
# set the status to 'completed' passing along any addional messages
def completed(*messages)
set_status({
'status' => STATUS_COMPLETED,
'message' => "Completed at #{Time.now}"
}, *messages)
end
# kill the current job, setting the status to 'killed' and raising <tt>Killed</tt>
def kill!
set_status({
'status' => STATUS_KILLED,
'message' => "Killed at #{Time.now}"
})
raise Killed
end
private
def set_status(*args)
self.status = [status, {'name' => self.name}, args].flatten
end
end
end
end