Skip to content

Commit a09040a

Browse files
committed
RFC 0002: Queues (crystal-lang#15345)
1 parent cb7782d commit a09040a

File tree

10 files changed

+1132
-0
lines changed

10 files changed

+1132
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
require "./spec_helper"
2+
3+
describe Fiber::ExecutionContext::GlobalQueue do
4+
it "#initialize" do
5+
q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new)
6+
q.empty?.should be_true
7+
end
8+
9+
it "#unsafe_push and #unsafe_pop" do
10+
f1 = new_fake_fiber("f1")
11+
f2 = new_fake_fiber("f2")
12+
f3 = new_fake_fiber("f3")
13+
14+
q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new)
15+
q.unsafe_push(f1)
16+
q.size.should eq(1)
17+
18+
q.unsafe_push(f2)
19+
q.unsafe_push(f3)
20+
q.size.should eq(3)
21+
22+
q.unsafe_pop?.should be(f3)
23+
q.size.should eq(2)
24+
25+
q.unsafe_pop?.should be(f2)
26+
q.unsafe_pop?.should be(f1)
27+
q.unsafe_pop?.should be_nil
28+
q.size.should eq(0)
29+
q.empty?.should be_true
30+
end
31+
32+
describe "#unsafe_grab?" do
33+
it "can't grab from empty queue" do
34+
q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new)
35+
runnables = Fiber::ExecutionContext::Runnables(6).new(q)
36+
q.unsafe_grab?(runnables, 4).should be_nil
37+
end
38+
39+
it "grabs fibers" do
40+
q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new)
41+
fibers = 10.times.map { |i| new_fake_fiber("f#{i}") }.to_a
42+
fibers.each { |f| q.unsafe_push(f) }
43+
44+
runnables = Fiber::ExecutionContext::Runnables(6).new(q)
45+
fiber = q.unsafe_grab?(runnables, 4)
46+
47+
# returned the last enqueued fiber
48+
fiber.should be(fibers[9])
49+
50+
# enqueued the next 2 fibers
51+
runnables.size.should eq(2)
52+
runnables.shift?.should be(fibers[8])
53+
runnables.shift?.should be(fibers[7])
54+
55+
# the remaining fibers are still there:
56+
6.downto(0).each do |i|
57+
q.unsafe_pop?.should be(fibers[i])
58+
end
59+
end
60+
61+
it "can't grab more than available" do
62+
f = new_fake_fiber
63+
q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new)
64+
q.unsafe_push(f)
65+
66+
# dequeues the unique fiber
67+
runnables = Fiber::ExecutionContext::Runnables(6).new(q)
68+
fiber = q.unsafe_grab?(runnables, 4)
69+
fiber.should be(f)
70+
71+
# had nothing left to dequeue
72+
runnables.size.should eq(0)
73+
end
74+
75+
it "clamps divisor to 1" do
76+
f = new_fake_fiber
77+
q = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new)
78+
q.unsafe_push(f)
79+
80+
# dequeues the unique fiber
81+
runnables = Fiber::ExecutionContext::Runnables(6).new(q)
82+
fiber = q.unsafe_grab?(runnables, 0)
83+
fiber.should be(f)
84+
85+
# had nothing left to dequeue
86+
runnables.size.should eq(0)
87+
end
88+
end
89+
90+
# interpreter doesn't support threads yet (#14287)
91+
pending_interpreted describe: "thread safety" do
92+
it "one by one" do
93+
fibers = StaticArray(Fiber::ExecutionContext::FiberCounter, 763).new do |i|
94+
Fiber::ExecutionContext::FiberCounter.new(new_fake_fiber("f#{i}"))
95+
end
96+
97+
n = 7
98+
increments = 15
99+
queue = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new)
100+
ready = Thread::WaitGroup.new(n)
101+
shutdown = Thread::WaitGroup.new(n)
102+
103+
n.times do |i|
104+
Thread.new("ONE-#{i}") do |thread|
105+
slept = 0
106+
ready.done
107+
108+
loop do
109+
if fiber = queue.pop?
110+
fc = fibers.find { |x| x.@fiber == fiber }.not_nil!
111+
queue.push(fiber) if fc.increment < increments
112+
slept = 0
113+
elsif slept < 100
114+
slept += 1
115+
Thread.sleep(1.nanosecond) # don't burn CPU
116+
else
117+
break
118+
end
119+
end
120+
rescue exception
121+
Crystal::System.print_error "\nthread: #{thread.name}: exception: #{exception}"
122+
ensure
123+
shutdown.done
124+
end
125+
end
126+
ready.wait
127+
128+
fibers.each_with_index do |fc, i|
129+
queue.push(fc.@fiber)
130+
Thread.sleep(10.nanoseconds) if i % 10 == 9
131+
end
132+
133+
shutdown.wait
134+
135+
# must have dequeued each fiber exactly X times
136+
fibers.each { |fc| fc.counter.should eq(increments) }
137+
end
138+
139+
it "bulk operations" do
140+
n = 7
141+
increments = 15
142+
143+
fibers = StaticArray(Fiber::ExecutionContext::FiberCounter, 765).new do |i| # 765 can be divided by 3 and 5
144+
Fiber::ExecutionContext::FiberCounter.new(new_fake_fiber("f#{i}"))
145+
end
146+
147+
queue = Fiber::ExecutionContext::GlobalQueue.new(Thread::Mutex.new)
148+
ready = Thread::WaitGroup.new(n)
149+
shutdown = Thread::WaitGroup.new(n)
150+
151+
n.times do |i|
152+
Thread.new("BULK-#{i}") do |thread|
153+
slept = 0
154+
155+
r = Fiber::ExecutionContext::Runnables(3).new(queue)
156+
157+
batch = Fiber::List.new
158+
size = 0
159+
160+
reenqueue = -> {
161+
if size > 0
162+
queue.bulk_push(pointerof(batch))
163+
names = [] of String?
164+
batch.each { |f| names << f.name }
165+
batch.clear
166+
size = 0
167+
end
168+
}
169+
170+
execute = ->(fiber : Fiber) {
171+
fc = fibers.find { |x| x.@fiber == fiber }.not_nil!
172+
173+
if fc.increment < increments
174+
batch.push(fc.@fiber)
175+
size += 1
176+
end
177+
}
178+
179+
ready.done
180+
181+
loop do
182+
if fiber = r.shift?
183+
execute.call(fiber)
184+
slept = 0
185+
next
186+
end
187+
188+
if fiber = queue.grab?(r, 1)
189+
reenqueue.call
190+
execute.call(fiber)
191+
slept = 0
192+
next
193+
end
194+
195+
if slept >= 100
196+
break
197+
end
198+
199+
reenqueue.call
200+
slept += 1
201+
Thread.sleep(1.nanosecond) # don't burn CPU
202+
end
203+
rescue exception
204+
Crystal::System.print_error "\nthread #{thread.name} raised: #{exception}"
205+
ensure
206+
shutdown.done
207+
end
208+
end
209+
ready.wait
210+
211+
# enqueue in batches of 5
212+
0.step(to: fibers.size - 1, by: 5) do |i|
213+
list = Fiber::List.new
214+
5.times { |j| list.push(fibers[i + j].@fiber) }
215+
queue.bulk_push(pointerof(list))
216+
Thread.sleep(10.nanoseconds) if i % 4 == 3
217+
end
218+
219+
shutdown.wait
220+
221+
# must have dequeued each fiber exactly X times (no less, no more)
222+
fibers.each { |fc| fc.counter.should eq(increments) }
223+
end
224+
end
225+
end

0 commit comments

Comments
 (0)