-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy paththread_pool.rb
105 lines (86 loc) · 2.72 KB
/
thread_pool.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
# frozen_string_literal: true
require 'grpc_kit/rpc_dispatcher/auto_trimmer'
module Griffin
class ThreadPool
DEFAULT_MAX = 5
DEFAULT_MIN = 1
QUEUE_SIZE = 128
def initialize(interval: 60, max: DEFAULT_MAX, min: DEFAULT_MIN, &block)
@max_pool_size = max
@min_pool_size = min
@block = block
@shutdown = false
@tasks = SizedQueue.new(QUEUE_SIZE)
@spawned = 0
@workers = []
@mutex = Mutex.new
@waiting = 0
@min_pool_size.times { spawn_thread }
@auto_trimmer = GrpcKit::RpcDispatcher::AutoTrimmer.new(self, interval: interval + rand(10)).tap(&:start!)
end
def schedule(task, &block)
if task.nil?
return
end
if @shutdown
raise "scheduling new task isn't allowed during shutdown"
end
# TODO: blocking now..
@tasks.push(block || task)
if @mutex.synchronize { (@waiting < @tasks.size) && (@spawned < @max_pool_size) }
spawn_thread
end
end
def resouce_available?
(@waiting != 0) || (@spawned != @max_pool_size)
end
def shutdown
@shutdown = true
@max_pool_size.times { @tasks.push(nil) }
@auto_trimmer.stop
until @workers.empty?
Griffin.logger.debug("Shutdown waiting #{@waiting} workers")
sleep 1
end
end
# For GrpcKit::ThreadPool::AutoTrimmer
# TODO: Change interface to use a keyword argument.
def trim(force = false) # rubocop:disable Style/OptionalBooleanParameter
if @mutex.synchronize { (force || (@waiting > 0)) && (@spawned > @min_pool_size) }
GrpcKit.logger.info("Trim worker! Next worker size #{@spawned - 1}")
@tasks.push(nil)
end
end
private
def spawn_thread
@spawned += 1
worker = Thread.new(@spawned) do |i|
Thread.current.name = "Griffin worker thread #{i}"
Griffin.logger.debug("#{Thread.current.name} started")
loop do
if @shutdown
break
end
@mutex.synchronize { @waiting += 1 }
task = @tasks.pop
@mutex.synchronize { @waiting -= 1 }
if task.nil?
break
end
begin
@block.call(task)
rescue Exception => e # rubocop:disable Lint/RescueException
backtrace = Thread.current.backtrace.join("\n")
Griffin.logger.error("An error occured on top level in worker #{Thread.current.name}: #{e.message} (#{e.class})\n #{backtrace} ")
end
end
Griffin.logger.debug("worker thread #{Thread.current.name} is stopping")
@mutex.synchronize do
@spawned -= 1
@workers.delete(worker)
end
end
@workers.push(worker)
end
end
end