-
Notifications
You must be signed in to change notification settings - Fork 94
/
Copy pathjobs_relation.rb
337 lines (281 loc) · 9.47 KB
/
jobs_relation.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
# A relation of jobs that can be filtered and acted on.
#
# Relations of jobs are normally fetched via +ActiveJob.jobs+
# or through a given queue (+ActiveJob::Queue#jobs+).
#
# This class offers a fluid interface to query a subset of jobs. For
# example:
#
# queue = ActiveJob.queues[:default]
# queue.jobs.limit(10).where(job_class_name: "DummyJob").last
#
# Relations are enumerable, so you can use +Enumerable+ methods on them.
# Notice however that using these methods will imply loading all the relation
# in memory, which could introduce performance concerns.
#
# Internally, +ActiveJob+ will always use paginated queries to the underlying
# queue adapter. The page size can be controlled via +config.active_job.default_page_size+
# (1000 by default).
#
# There are additional performance concerns depending on the configured
# adapter. Please check +ActiveJob::Relation#where+, +ActiveJob::Relation#count+.
class ActiveJob::JobsRelation
include Enumerable
STATUSES = %i[ pending failed in_progress blocked scheduled finished ]
FILTERS = %i[ queue_name job_class_name finished_at scheduled_at enqueued_at ]
PROPERTIES = %i[ queue_name status offset_value limit_value job_class_name worker_id recurring_task_id finished_at scheduled_at enqueued_at ]
attr_reader *PROPERTIES, :default_page_size
delegate :last, :[], :reverse, to: :to_a
delegate :logger, to: MissionControl::Jobs
ALL_JOBS_LIMIT = 100_000_000 # When no limit value it defaults to "all jobs"
def initialize(queue_adapter: ActiveJob::Base.queue_adapter, default_page_size: ActiveJob::Base.default_page_size)
@queue_adapter = queue_adapter
@default_page_size = default_page_size
set_defaults
end
# Returns a +ActiveJob::JobsRelation+ with the configured filtering options.
#
# === Options
#
# * <tt>:job_class_name</tt> - To only include the jobs of a given class.
# Depending on the configured queue adapter, this will perform the
# filtering in memory, which could introduce performance concerns
# for large sets of jobs.
# * <tt>:queue_name</tt> - To only include the jobs in the provided queue.
# * <tt>:worker_id</tt> - To only include the jobs processed by the provided worker.
# * <tt>:recurring_task_id</tt> - To only include the jobs corresponding to runs of a recurring task.
def where(job_class_name: nil, queue_name: nil, worker_id: nil, recurring_task_id: nil, finished_at: nil, scheduled_at: nil, enqueued_at: nil)
# Remove nil arguments to avoid overriding parameters when concatenating +where+ clauses
arguments = { job_class_name: job_class_name,
queue_name: queue_name,
worker_id: worker_id,
recurring_task_id: recurring_task_id,
finished_at: finished_at,
scheduled_at: scheduled_at,
enqueued_at: enqueued_at
}.compact.collect { |key, value| [ key, value.to_s ] }.to_h
clone_with **arguments
end
def with_status(status)
if status.to_sym.in? STATUSES
clone_with status: status.to_sym
else
self
end
end
STATUSES.each do |status|
define_method status do
with_status(status)
end
define_method "#{status}?" do
self.status == status
end
end
# Sets an offset for the jobs-fetching query. The first position is 0.
def offset(offset)
clone_with offset_value: offset
end
# Sets the max number of jobs to fetch in the query.
def limit(limit)
clone_with limit_value: limit
end
# Returns the number of jobs in the relation.
#
# When filtering jobs, if the adapter doesn't support the filter(s)
# directly, this will load all the jobs in memory to filter them.
def count
if loaded? || filtering_needed?
to_a.length
else
query_count
end
end
alias length count
alias size count
def empty?
count == 0
end
def to_s
properties_with_values = PROPERTIES.collect do |name|
value = public_send(name)
"#{name}: #{value}" unless value.nil?
end.compact.join(", ")
"<Jobs with [#{properties_with_values}]> (loaded: #{loaded?})"
end
alias inspect to_s
def each(&block)
loaded_jobs&.each(&block) || load_jobs(&block)
end
# Retry all the jobs in the queue.
#
# This operation is only valid for sets of failed jobs. It will
# raise an error +ActiveJob::Errors::InvalidOperation+ otherwise.
def retry_all
ensure_failed_status
queue_adapter.retry_all_jobs(self)
nil
end
# Retry the provided job.
#
# This operation is only valid for sets of failed jobs. It will
# raise an error +ActiveJob::Errors::InvalidOperation+ otherwise.
def retry_job(job)
ensure_failed_status
queue_adapter.retry_job(job, self)
end
# Discard all the jobs in the relation.
def discard_all
queue_adapter.discard_all_jobs(self)
nil
end
# Discard the provided job.
def discard_job(job)
queue_adapter.discard_job(job, self)
end
# Dispatch the provided job.
def dispatch_job(job)
queue_adapter.dispatch_job(job, self)
end
# Find a job by id.
#
# Returns nil when not found.
def find_by_id(job_id)
queue_adapter.find_job(job_id, self)
end
# Find a job by id.
#
# Raises +ActiveJob::Errors::JobNotFoundError+ when not found.
def find_by_id!(job_id)
queue_adapter.find_job(job_id, self) or raise ActiveJob::Errors::JobNotFoundError.new(job_id, self)
end
# Returns an array of jobs class names in the first +from_first+ jobs.
def job_class_names(from_first: 500)
first(from_first).collect(&:job_class_name).uniq
end
def reload
@count = nil
@loaded_jobs = nil
@filters = nil
self
end
def in_batches(of: default_page_size, order: :asc, &block)
validate_looping_in_batches_is_possible
case order
when :asc
in_ascending_batches(of: of, &block)
when :desc
in_descending_batches(of: of, &block)
else
raise "Unsupported order: #{order}. Valid values: :asc, :desc."
end
end
def paginated?
offset_value > 0 || limit_value_provided?
end
def limit_value_provided?
limit_value.present? && limit_value != ActiveJob::JobsRelation::ALL_JOBS_LIMIT
end
def filtering_needed?
filters.any?
end
private
attr_reader :queue_adapter, :loaded_jobs
attr_writer *PROPERTIES
def set_defaults
self.offset_value = 0
self.limit_value = ALL_JOBS_LIMIT
end
def clone_with(**properties)
dup.reload.tap do |relation|
properties.each do |key, value|
relation.send("#{key}=", value)
end
end
end
def query_count
@count ||= queue_adapter.jobs_count(self)
end
def load_jobs
@loaded_jobs = []
perform_each do |job|
@loaded_jobs << job
yield job
end
end
def perform_each
current_offset = offset_value
pending_count = limit_value || Float::INFINITY
begin
limit = [ pending_count, default_page_size ].min
page = offset(current_offset).limit(limit)
jobs = queue_adapter.fetch_jobs(page)
finished = jobs.empty?
jobs = filter(jobs) if filtering_needed?
Array(jobs).each { |job| yield job }
current_offset += limit
pending_count -= jobs.length
end until finished || pending_count.zero?
end
def loaded?
!@loaded_jobs.nil?
end
# Filtering for not natively supported filters is performed in memory
def filter(jobs)
jobs.filter { |job| satisfy_filter?(job) }
end
def satisfy_date_filter?(filter_value, job_value)
return false if job_value.nil?
# Treat date ranges
if filter_value.include?("..")
start_date, end_date = filter_value.split("..").map { |date| Time.zone.parse(date) }
filter_range = (start_date..end_date)
return filter_range.cover?(job_value)
end
filter = Time.zone.parse(filter_value)
job_value >= filter
end
def satisfy_filter?(job)
filters.all? do |property|
filter_value = public_send(property)
job_value = job.public_send(property)
is_date_filter?(property) ? satisfy_date_filter?(filter_value, job_value) : filter_value == job_value
end
end
def is_date_filter?(property)
[ :finished_at, :scheduled_at, :enqueued_at ].include?(property)
end
def filters
@filters ||= FILTERS.select { |property| public_send(property).present? && !queue_adapter.supports_job_filter?(self, property) }
end
def ensure_failed_status
raise ActiveJob::Errors::InvalidOperation, "This operation can only be performed on failed jobs, but these jobs are #{status}" unless failed?
end
def validate_looping_in_batches_is_possible
raise ActiveJob::Errors::InvalidOperation, "Looping in batches is not compatible with providing offset or limit" if paginated?
end
def in_ascending_batches(of:)
current_offset = 0
max = count
begin
page = offset(current_offset).limit(of)
current_offset += of
logger.info page
yield page
wait_batch_delay
end until current_offset >= max
end
def in_descending_batches(of:)
current_offset = count - of
begin
limit = current_offset < 0 ? of + current_offset : of
page = offset([ current_offset, 0 ].max).limit(limit)
current_offset -= of
logger.info page
yield page
wait_batch_delay
end until current_offset + of <= 0
end
def wait_batch_delay
sleep MissionControl::Jobs.delay_between_bulk_operation_batches if MissionControl::Jobs.delay_between_bulk_operation_batches.to_i > 0
end
end