Skip to content

Commit 1fcc834

Browse files
committed
Add support for Rails 5, more tests, proc key arguments.
1 parent 6fe8aad commit 1fcc834

File tree

6 files changed

+172
-42
lines changed

6 files changed

+172
-42
lines changed

lib/active_job/traffic_control.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def logger
3636
def client
3737
@client ||= begin
3838
logger.error "defaulting to Redis as the lock client; please set "\
39-
" `ActiveJob::TrafficControl.client` to a Redis or Memcached client,"
39+
" `ActiveJob::TrafficControl.client` to a Redis or Memcached client."
4040
@client_klass = Suo::Client::Redis
4141
Redis.new(url: ENV["REDIS_URL"])
4242
end

lib/active_job/traffic_control/base.rb

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,22 @@ def cleaned_name
1818
def cache_client
1919
ActiveJob::TrafficControl.cache_client
2020
end
21+
22+
def lock_key(prefix, job, config_attr)
23+
if config_attr
24+
if config_attr[:key].respond_to?(:call)
25+
"traffic_control:#{prefix}:#{config_attr[:key].call(job)}"
26+
else
27+
@static_job_key ||= begin
28+
if config_attr[:key].present?
29+
"traffic_control:#{prefix}:#{config_attr[:key]}"
30+
else
31+
"traffic_control:#{prefix}:#{cleaned_name}"
32+
end
33+
end
34+
end
35+
end
36+
end
2137
end
2238

2339
# convenience methods
@@ -28,12 +44,12 @@ def cleaned_name
2844
def reenqueue(range, reason)
2945
later_delay = rand(range).seconds
3046
retry_job(wait: later_delay)
31-
logger.error "Re-enqueing #{self.class.name} to run in #{later_delay}s due to #{reason}"
47+
logger.info "Re-enqueing #{self.class.name} to run in #{later_delay}s due to #{reason}"
3248
ActiveSupport::Notifications.instrument "re_enqueue.active_job", job: self, reason: reason
3349
end
3450

3551
def drop(reason)
36-
logger.error "Dropping #{self.class.name} due to #{reason}"
52+
logger.info "Dropping #{self.class.name} due to #{reason}"
3753
ActiveSupport::Notifications.instrument "dropped.active_job", job: self, reason: reason
3854
end
3955

lib/active_job/traffic_control/concurrency.rb

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@ module TrafficControl
55
module Concurrency
66
extend ::ActiveSupport::Concern
77

8+
CONCURRENCY_REENQUEUE_DELAY = ENV["RACK_ENV"] == "test" ? 1...5 : 30...(60 * 5)
9+
810
class_methods do
911
attr_accessor :job_concurrency
1012

1113
def concurrency(threshold, drop: true, key: nil, wait_timeout: 0.1, stale_timeout: 60 * 10)
1214
raise ArgumentError, "Concurrent jobs needs to be an integer > 0" if threshold.to_i < 1
15+
1316
@job_concurrency = {
1417
threshold: threshold.to_i,
1518
drop: drop,
@@ -19,31 +22,23 @@ def concurrency(threshold, drop: true, key: nil, wait_timeout: 0.1, stale_timeou
1922
}
2023
end
2124

22-
def concurrency_key
23-
if job_concurrency
24-
@concurrency_key ||= begin
25-
if job_concurrency[:key].present?
26-
job_concurrency[:key]
27-
else
28-
"traffic_control:concurrency:#{cleaned_name}"
29-
end
30-
end
31-
end
25+
def concurrency_lock_key(job)
26+
lock_key("concurrency", job, job_concurrency)
3227
end
3328
end
3429

3530
included do
3631
include ActiveJob::TrafficControl::Base
3732

38-
around_perform do |_, block|
33+
around_perform do |job, block|
3934
if self.class.job_concurrency.present?
4035
lock_options = {
4136
resources: self.class.job_concurrency[:threshold],
4237
acquisition_lock: self.class.job_concurrency[:wait_timeout],
4338
stale_lock_expiration: self.class.job_concurrency[:stale_timeout]
4439
}
4540

46-
with_lock_client(self.class.concurrency_key, lock_options) do |client|
41+
with_lock_client(self.class.concurrency_lock_key(job), lock_options) do |client|
4742
locked = client.lock do
4843
block.call
4944
true

lib/active_job/traffic_control/throttle.rb

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -10,33 +10,31 @@ module Throttle
1010

1111
def throttle(threshold:, period:, drop: false, key: nil)
1212
raise ArgumentError, "Threshold needs to be an integer > 0" if threshold.to_i < 1
13-
@job_throttling = {threshold: threshold, period: period, drop: drop, key: key}
13+
14+
@job_throttling = {
15+
threshold: threshold,
16+
period: period,
17+
drop: drop,
18+
key: key
19+
}
1420
end
1521

16-
def throttling_key
17-
if job_throttling
18-
@throttling_key ||= begin
19-
if job_throttling[:key].present?
20-
job_throttling[:key]
21-
else
22-
"traffic_control:throttling:#{cleaned_name}"
23-
end
24-
end
25-
end
22+
def throttling_lock_key(job)
23+
lock_key("throttle", job, job_throttling)
2624
end
2725
end
2826

2927
included do
3028
include ActiveJob::TrafficControl::Base
3129

32-
around_perform do |_, block|
30+
around_perform do |job, block|
3331
if self.class.job_throttling.present?
3432
lock_options = {
3533
resources: self.class.job_throttling[:threshold],
3634
stale_lock_expiration: self.class.job_throttling[:period]
3735
}
3836

39-
with_lock_client(self.class.throttling_key, lock_options) do |client|
37+
with_lock_client(self.class.throttling_lock_key(job), lock_options) do |client|
4038
token = client.lock
4139

4240
if token

test/active_job/traffic_control_test.rb

Lines changed: 127 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,40 @@ class ThrottleTestJob < ActiveJob::Base
1919
throttle threshold: 2, period: 1.second, drop: true
2020

2121
def perform
22+
sleep 0.5
23+
$count += 1
24+
end
25+
end
26+
27+
class ThrottleWithKeyTestJob < ActiveJob::Base
28+
include ActiveJob::TrafficControl::Throttle
29+
30+
throttle threshold: 2, period: 1.second, drop: true, key: "throttle_test_key"
31+
32+
def perform
33+
sleep 0.5
34+
$count += 1
35+
end
36+
end
37+
38+
class ThrottleWithProcKeyTestJob < ActiveJob::Base
39+
include ActiveJob::TrafficControl::Throttle
40+
41+
throttle threshold: 2, period: 1.second, drop: true, key: -> (_) { "throttle_proc_job_name" }
42+
43+
def perform
44+
sleep 0.5
45+
$count += 1
46+
end
47+
end
48+
49+
class ThrottleNotDroppedTestJob < ActiveJob::Base
50+
include ActiveJob::TrafficControl::Throttle
51+
52+
throttle threshold: 2, period: 1.second, drop: false
53+
54+
def perform
55+
sleep 0.5
2256
$count += 1
2357
end
2458
end
@@ -34,6 +68,39 @@ def perform
3468
end
3569
end
3670

71+
class ConcurrencyNotDroppedTestJob < ActiveJob::Base
72+
include ActiveJob::TrafficControl::Concurrency
73+
74+
concurrency 1, drop: false
75+
76+
def perform
77+
sleep 0.5
78+
$count += 1
79+
end
80+
end
81+
82+
class ConcurrencyWithKeyTestJob < ActiveJob::Base
83+
include ActiveJob::TrafficControl::Concurrency
84+
85+
concurrency 1, drop: true, key: "concurrency_test_key"
86+
87+
def perform
88+
sleep 0.5
89+
$count += 1
90+
end
91+
end
92+
93+
class ConcurrencyWithProcKeyTestJob < ActiveJob::Base
94+
include ActiveJob::TrafficControl::Concurrency
95+
96+
concurrency 1, drop: true, key: -> (_) { "concurrency_proc_job_name" }
97+
98+
def perform
99+
sleep 0.5
100+
$count += 1
101+
end
102+
end
103+
37104
class InheritedConcurrencyJob < ConcurrencyTestJob
38105
def perform
39106
$count += 1
@@ -70,33 +137,80 @@ def test_disable
70137
assert_equal 2, $count
71138
end
72139

73-
def test_throttle
74-
t1 = Thread.new { ThrottleTestJob.perform_now }
75-
t2 = Thread.new { ThrottleTestJob.perform_now }
76-
t3 = Thread.new { ThrottleTestJob.perform_now }
140+
def throttle_helper(klass)
141+
t1 = Thread.new { klass.perform_now }
142+
t2 = Thread.new { klass.perform_now }
143+
t3 = Thread.new { klass.perform_now }
77144
[t1, t2, t3].map(&:join)
145+
sleep 0.5
78146
assert_equal 2, $count
79147
sleep 1
80-
ThrottleTestJob.perform_now
148+
klass.perform_now
149+
assert_equal 3, $count
150+
end
151+
152+
def test_throttle
153+
throttle_helper(ThrottleTestJob)
154+
end
155+
156+
def test_throttle_with_key
157+
throttle_helper(ThrottleWithKeyTestJob)
158+
end
159+
160+
def test_throttle_with_proc_key
161+
throttle_helper(ThrottleWithProcKeyTestJob)
162+
end
163+
164+
def test_throttle_not_dropped
165+
return unless ActiveJob::Base.queue_adapter == :async
166+
167+
t1 = Thread.new { ThrottleNotDroppedTestJob.perform_now }
168+
t2 = Thread.new { ThrottleNotDroppedTestJob.perform_now }
169+
t3 = Thread.new { ThrottleNotDroppedTestJob.perform_now }
170+
[t1, t2, t3].map(&:join)
171+
sleep 0.5
172+
assert_equal 2, $count
173+
sleep 6
81174
assert_equal 3, $count
82175
end
83176

177+
def concurrency_helper(klass)
178+
t1 = Thread.new { klass.perform_now }
179+
t2 = Thread.new { klass.perform_now }
180+
[t1, t2].map(&:join)
181+
sleep 0.5
182+
assert_equal 1, $count
183+
klass.perform_now
184+
assert_equal 2, $count
185+
end
186+
84187
def test_concurrency
85-
t1 = Thread.new { ConcurrencyTestJob.perform_now }
86-
t2 = Thread.new { ConcurrencyTestJob.perform_now }
188+
concurrency_helper(ConcurrencyTestJob)
189+
end
190+
191+
def test_concurrency_with_key
192+
concurrency_helper(ConcurrencyWithKeyTestJob)
193+
end
194+
195+
def test_concurrency_with_proc_key
196+
concurrency_helper(ConcurrencyWithProcKeyTestJob)
197+
end
198+
199+
def test_concurrent_not_dropped
200+
return unless ActiveJob::Base.queue_adapter == :async
201+
202+
t1 = Thread.new { ConcurrencyNotDroppedTestJob.perform_now }
203+
t2 = Thread.new { ConcurrencyNotDroppedTestJob.perform_now }
87204
[t1, t2].map(&:join)
88-
sleep 1
89205
assert_equal 1, $count
90-
ConcurrencyTestJob.perform_later
91-
sleep 1
206+
sleep 6
92207
assert_equal 2, $count
93208
end
94209

95210
def test_concurrency_is_not_inherited
96-
t1 = Thread.new { InheritedConcurrencyJob.perform_later }
97-
t2 = Thread.new { InheritedConcurrencyJob.perform_later }
211+
t1 = Thread.new { InheritedConcurrencyJob.perform_now }
212+
t2 = Thread.new { InheritedConcurrencyJob.perform_now }
98213
[t1, t2].map(&:join)
99-
sleep 1
100214
assert_equal 2, $count
101215
end
102216

test/test_helper.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
$LOAD_PATH.unshift File.expand_path("../../lib", __FILE__)
22

3+
ENV["RACK_ENV"] = ENV["RAILS_ENV"] = "test"
4+
35
if ENV["CODECLIMATE_REPO_TOKEN"]
46
require "codeclimate-test-reporter"
57
::SimpleCov.add_filter "helper"
@@ -18,5 +20,10 @@
1820
l
1921
end
2022

21-
ActiveJob::Base.queue_adapter = :inline
23+
begin
24+
ActiveJob::Base.queue_adapter = :async # ActiveJob 5
25+
rescue => _
26+
ActiveJob::Base.queue_adapter = :inline
27+
end
28+
2229
ActiveJob::Base.logger = test_logger

0 commit comments

Comments
 (0)