-
Notifications
You must be signed in to change notification settings - Fork 169
/
Copy pathhash.rb
275 lines (234 loc) · 9.04 KB
/
hash.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
require 'securerandom'
module Resque
module Plugins
module Status
# Resque::Plugins::Status::Hash is a Hash object that has helper methods for dealing with
# the common status attributes. It also has a number of class methods for
# creating/updating/retrieving status objects from Redis
class Hash < ::Hash
extend Resque::Helpers
# Create a status, generating a new UUID, passing the message to the status
# Returns the UUID of the new status.
def self.create(uuid, *messages)
set(uuid, *messages)
redis.zadd(set_key, Time.now.to_i, uuid)
redis.zremrangebyscore(set_key, 0, Time.now.to_i - @expire_in) if @expire_in
uuid
end
# Get a status by UUID. Returns a Resque::Plugins::Status::Hash
def self.get(uuid)
val = redis.get(status_key(uuid))
val ? Resque::Plugins::Status::Hash.new(uuid, decode(val)) : nil
end
# Get multiple statuses by UUID. Returns array of Resque::Plugins::Status::Hash
def self.mget(uuids)
status_keys = uuids.map{|u| status_key(u)}
vals = redis.mget(*status_keys)
uuids.zip(vals).map do |uuid, val|
val ? Resque::Plugins::Status::Hash.new(uuid, decode(val)) : nil
end
end
# set a status by UUID. <tt>messages</tt> can be any number of strings or hashes
# that are merged in order to create a single status.
def self.set(uuid, *messages)
val = Resque::Plugins::Status::Hash.new(uuid, *messages)
redis.set(status_key(uuid), encode(val))
if expire_in
redis.expire(status_key(uuid), expire_in)
end
val
end
# clear statuses from redis passing an optional range. See `statuses` for info
# about ranges
def self.clear(range_start = nil, range_end = nil)
status_ids(range_start, range_end).each do |id|
remove(id)
end
end
def self.clear_completed(range_start = nil, range_end = nil)
status_ids(range_start, range_end).select do |id|
get(id).completed?
end.each do |id|
remove(id)
end
end
def self.clear_failed(range_start = nil, range_end = nil)
status_ids(range_start, range_end).select do |id|
get(id).failed?
end.each do |id|
remove(id)
end
end
def self.remove(uuid)
redis.del(status_key(uuid))
redis.zrem(set_key, uuid)
end
def self.count
redis.zcard(set_key)
end
# Return <tt>num</tt> Resque::Plugins::Status::Hash objects in reverse chronological order.
# By default returns the entire set.
# @param [Numeric] range_start The optional starting range
# @param [Numeric] range_end The optional ending range
# @example retuning the last 20 statuses
# Resque::Plugins::Status::Hash.statuses(0, 20)
def self.statuses(range_start = nil, range_end = nil, filter=STATUSES)
filter = [filter || STATUSES].flatten
status_ids(range_start, range_end).collect do |id|
h = get(id)
filter.include?(h.status) ? h : nil
end.compact
end
# Return the <tt>num</tt> most recent status/job UUIDs in reverse chronological order.
def self.status_ids(range_start = nil, range_end = nil)
if range_end && range_start
# Because we want a reverse chronological order, we need to get a range starting
# by the higest negative number. The ordering is transparent from the API user's
# perspective so we need to convert the passed params
(redis.zrevrange(set_key, (range_start.abs), ((range_end || 1).abs)) || [])
else
# Because we want a reverse chronological order, we need to get a range starting
# by the higest negative number.
redis.zrevrange(set_key, 0, -1) || []
end
end
# Kill the job at UUID on its next iteration this works by adding the UUID to a
# kill list (a.k.a. a list of jobs to be killed. Each iteration the job checks
# if it _should_ be killed by calling <tt>tick</tt> or <tt>at</tt>. If so, it raises
# a <tt>Resque::Plugins::Status::Killed</tt> error and sets the status to 'killed'.
def self.kill(uuid)
redis.sadd(kill_key, uuid)
end
# Remove the job at UUID from the kill list
def self.killed(uuid)
redis.srem(kill_key, uuid)
end
# Return the UUIDs of the jobs on the kill list
def self.kill_ids
redis.smembers(kill_key)
end
# Kills <tt>num</tt> jobs within range starting with the most recent first.
# By default kills all jobs.
# Note that the same conditions apply as <tt>kill</tt>, i.e. only jobs that check
# on each iteration by calling <tt>tick</tt> or <tt>at</tt> are eligible to killed.
# @param [Numeric] range_start The optional starting range
# @param [Numeric] range_end The optional ending range
# @example killing the last 20 submitted jobs
# Resque::Plugins::Status::Hash.killall(0, 20)
def self.killall(range_start = nil, range_end = nil)
status_ids(range_start, range_end).collect do |id|
kill(id)
end
end
# Check whether a job with UUID is on the kill list
def self.should_kill?(uuid)
redis.sismember(kill_key, uuid)
end
# The time in seconds that jobs and statuses should expire from Redis (after
# the last time they are touched/updated)
def self.expire_in
@expire_in
end
# Set the <tt>expire_in</tt> time in seconds
def self.expire_in=(seconds)
@expire_in = seconds.nil? ? nil : seconds.to_i
end
def self.status_key(uuid)
"status:#{uuid}"
end
def self.set_key
"_statuses"
end
def self.kill_key
"_kill"
end
def self.generate_uuid
SecureRandom.hex.to_s
end
def self.hash_accessor(name, options = {})
options[:default] ||= nil
coerce = options[:coerce] ? ".#{options[:coerce]}" : ""
module_eval <<-EOT
def #{name}
value = (self['#{name}'] ? self['#{name}']#{coerce} : #{options[:default].inspect})
yield value if block_given?
value
end
def #{name}=(value)
self['#{name}'] = value
end
def #{name}?
!!self['#{name}']
end
EOT
end
STATUSES = %w{queued working completed failed killed}.freeze
hash_accessor :uuid
hash_accessor :name
hash_accessor :status
hash_accessor :message
hash_accessor :time
hash_accessor :options
hash_accessor :num
hash_accessor :total
# Create a new Resque::Plugins::Status::Hash object. If multiple arguments are passed
# it is assumed the first argument is the UUID and the rest are status objects.
# All arguments are subsequentily merged in order. Strings are assumed to
# be messages.
def initialize(*args)
super nil
base_status = {
'time' => Time.now.to_i,
'status' => 'queued'
}
base_status['uuid'] = args.shift if args.length > 1
status_hash = args.inject(base_status) do |final, m|
m = {'message' => m} if m.is_a?(String)
final.merge(m || {})
end
self.replace(status_hash)
end
# calculate the % completion of the job based on <tt>status</tt>, <tt>num</tt>
# and <tt>total</tt>
def pct_complete
case status
when 'completed' then 100
when 'queued' then 0
else
t = (total == 0 || total.nil?) ? 1 : total
(((num || 0).to_f / t.to_f) * 100).to_i
end
end
# Return the time of the status initialization. If set returns a <tt>Time</tt>
# object, otherwise returns nil
def time
time? ? Time.at(self['time']) : nil
end
STATUSES.each do |status|
define_method("#{status}?") do
self['status'] === status
end
end
# Can the job be killed? 'failed', 'completed', and 'killed' jobs cant be killed
# (for pretty obvious reasons)
def killable?
!['failed', 'completed', 'killed'].include?(self.status)
end
unless method_defined?(:to_json)
def to_json(*args)
json
end
end
# Return a JSON representation of the current object.
def json
h = self.dup
h['pct_complete'] = pct_complete
self.class.encode(h)
end
def inspect
"#<Resque::Plugins::Status::Hash #{super}>"
end
end
end
end
end