Skip to content

Commit 25bab17

Browse files
committed
RFC 0002: Skeleton (crystal-lang#15350)
1 parent a09040a commit 25bab17

15 files changed

+486
-52
lines changed

src/concurrent.cr

+31-10
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
require "fiber"
22
require "channel"
3-
require "crystal/scheduler"
43
require "crystal/tracing"
54

5+
{% if flag?(:execution_context) %}
6+
require "fiber/execution_context"
7+
{% else %}
8+
require "crystal/scheduler"
9+
{% end %}
10+
611
# Blocks the current fiber for the specified number of seconds.
712
#
813
# While this fiber is waiting this time, other ready-to-execute
@@ -12,25 +17,36 @@ def sleep(seconds : Number) : Nil
1217
if seconds < 0
1318
raise ArgumentError.new "Sleep seconds must be positive"
1419
end
15-
16-
Crystal::Scheduler.sleep(seconds.seconds)
20+
sleep(seconds.seconds)
1721
end
1822

1923
# Blocks the current Fiber for the specified time span.
2024
#
2125
# While this fiber is waiting this time, other ready-to-execute
2226
# fibers might start their execution.
2327
def sleep(time : Time::Span) : Nil
24-
Crystal::Scheduler.sleep(time)
28+
Crystal.trace :sched, "sleep", for: time
29+
30+
{% if flag?(:execution_context) %}
31+
Fiber.current.resume_event.add(time)
32+
Fiber::ExecutionContext.reschedule
33+
{% else %}
34+
Crystal::Scheduler.sleep(time)
35+
{% end %}
2536
end
2637

2738
# Blocks the current fiber forever.
2839
#
2940
# Meanwhile, other ready-to-execute fibers might start their execution.
3041
def sleep : Nil
31-
Crystal::Scheduler.reschedule
42+
{% if flag?(:execution_context) %}
43+
Fiber::ExecutionContext.reschedule
44+
{% else %}
45+
Crystal::Scheduler.reschedule
46+
{% end %}
3247
end
3348

49+
{% begin %}
3450
# Spawns a new fiber.
3551
#
3652
# NOTE: The newly created fiber doesn't run as soon as spawned.
@@ -64,12 +80,17 @@ end
6480
# wg.wait
6581
# ```
6682
def spawn(*, name : String? = nil, same_thread = false, &block)
67-
fiber = Fiber.new(name, &block)
68-
Crystal.trace :sched, "spawn", fiber: fiber
69-
{% if flag?(:preview_mt) %} fiber.set_current_thread if same_thread {% end %}
70-
fiber.enqueue
71-
fiber
83+
{% if flag?(:execution_context) %}
84+
Fiber::ExecutionContext::Scheduler.current.spawn(name: name, same_thread: same_thread, &block)
85+
{% else %}
86+
fiber = Fiber.new(name, &block)
87+
Crystal.trace :sched, "spawn", fiber: fiber
88+
{% if flag?(:preview_mt) %} fiber.set_current_thread if same_thread {% end %}
89+
fiber.enqueue
90+
fiber
91+
{% end %}
7292
end
93+
{% end %}
7394

7495
# Spawns a fiber by first creating a `Proc`, passing the *call*'s
7596
# expressions to it, and letting the `Proc` finally invoke the *call*.

src/crystal/event_loop.cr

+18-3
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,20 @@ abstract class Crystal::EventLoop
2727

2828
@[AlwaysInline]
2929
def self.current : self
30-
Crystal::Scheduler.event_loop
30+
{% if flag?(:execution_context) %}
31+
Fiber::ExecutionContext.current.event_loop
32+
{% else %}
33+
Crystal::Scheduler.event_loop
34+
{% end %}
3135
end
3236

3337
@[AlwaysInline]
34-
def self.current? : self?
35-
Crystal::Scheduler.event_loop?
38+
def self.current? : self | Nil
39+
{% if flag?(:execution_context) %}
40+
Fiber::ExecutionContext.current.event_loop
41+
{% else %}
42+
Crystal::Scheduler.event_loop?
43+
{% end %}
3644
end
3745

3846
# Runs the loop.
@@ -46,6 +54,13 @@ abstract class Crystal::EventLoop
4654
# events.
4755
abstract def run(blocking : Bool) : Bool
4856

57+
{% if flag?(:execution_context) %}
58+
# Same as `#run` but collects runnable fibers into *queue* instead of
59+
# enqueueing in parallel, so the caller is responsible and in control for
60+
# when and how the fibers will be enqueued.
61+
abstract def run(queue : Fiber::List*, blocking : Bool) : Nil
62+
{% end %}
63+
4964
# Tells a blocking run loop to no longer wait for events to activate. It may
5065
# for example enqueue a NOOP event with an immediate (or past) timeout. Having
5166
# activated an event, the loop shall return, allowing the blocked thread to

src/crystal/event_loop/iocp.cr

+8
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
5555
iocp
5656
end
5757

58+
# thread unsafe
5859
def run(blocking : Bool) : Bool
5960
enqueued = false
6061

@@ -66,6 +67,13 @@ class Crystal::EventLoop::IOCP < Crystal::EventLoop
6667
enqueued
6768
end
6869

70+
{% if flag?(:execution_context) %}
71+
# thread unsafe
72+
def run(queue : Fiber::List*, blocking : Bool) : Nil
73+
run_impl(blocking) { |fiber| queue.value.push(fiber) }
74+
end
75+
{% end %}
76+
6977
# Runs the event loop and enqueues the fiber for the next upcoming event or
7078
# completion.
7179
private def run_impl(blocking : Bool, &) : Nil

src/crystal/event_loop/libevent.cr

+36-7
Original file line numberDiff line numberDiff line change
@@ -20,26 +20,55 @@ class Crystal::EventLoop::LibEvent < Crystal::EventLoop
2020
event_base.loop(flags)
2121
end
2222

23+
{% if flag?(:execution_context) %}
24+
def run(queue : Fiber::List*, blocking : Bool) : Nil
25+
Crystal.trace :evloop, "run", fiber: fiber, blocking: blocking
26+
@runnables = queue
27+
run(blocking)
28+
ensure
29+
@runnables = nil
30+
end
31+
32+
def callback_enqueue(fiber : Fiber) : Nil
33+
if queue = @runnables
34+
queue.value.push(fiber)
35+
else
36+
raise "BUG: libevent callback executed outside of #run(queue*, blocking) call"
37+
end
38+
end
39+
{% end %}
40+
2341
def interrupt : Nil
2442
event_base.loop_exit
2543
end
2644

27-
# Create a new resume event for a fiber.
45+
# Create a new resume event for a fiber (sleep).
2846
def create_resume_event(fiber : Fiber) : Crystal::EventLoop::LibEvent::Event
2947
event_base.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data|
30-
data.as(Fiber).enqueue
48+
f = data.as(Fiber)
49+
{% if flag?(:execution_context) %}
50+
event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent)
51+
event_loop.callback_enqueue(f)
52+
{% else %}
53+
f.enqueue
54+
{% end %}
3155
end
3256
end
3357

34-
# Creates a timeout_event.
58+
# Creates a timeout event (timeout action of select expression).
3559
def create_timeout_event(fiber) : Crystal::EventLoop::LibEvent::Event
3660
event_base.new_event(-1, LibEvent2::EventFlags::None, fiber) do |s, flags, data|
3761
f = data.as(Fiber)
38-
if (select_action = f.timeout_select_action)
62+
if select_action = f.timeout_select_action
3963
f.timeout_select_action = nil
40-
select_action.time_expired(f)
41-
else
42-
f.enqueue
64+
if select_action.time_expired?
65+
{% if flag?(:execution_context) %}
66+
event_loop = Crystal::EventLoop.current.as(Crystal::EventLoop::LibEvent)
67+
event_loop.callback_enqueue(f)
68+
{% else %}
69+
f.enqueue
70+
{% end %}
71+
end
4372
end
4473
end
4574
end

src/crystal/event_loop/polling.cr

+23-4
Original file line numberDiff line numberDiff line change
@@ -112,14 +112,25 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
112112
end
113113
{% end %}
114114

115-
# NOTE: thread unsafe
115+
# thread unsafe
116116
def run(blocking : Bool) : Bool
117117
system_run(blocking) do |fiber|
118-
Crystal::Scheduler.enqueue(fiber)
118+
{% if flag?(:execution_context) %}
119+
fiber.execution_context.enqueue(fiber)
120+
{% else %}
121+
Crystal::Scheduler.enqueue(fiber)
122+
{% end %}
119123
end
120124
true
121125
end
122126

127+
{% if flag?(:execution_context) %}
128+
# thread unsafe
129+
def run(queue : Fiber::List*, blocking : Bool) : Nil
130+
system_run(blocking) { |fiber| queue.value.push(fiber) }
131+
end
132+
{% end %}
133+
123134
# fiber interface, see Crystal::EventLoop
124135

125136
def create_resume_event(fiber : Fiber) : FiberEvent
@@ -327,13 +338,21 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
327338
Polling.arena.free(index) do |pd|
328339
pd.value.@readers.ready_all do |event|
329340
pd.value.@event_loop.try(&.unsafe_resume_io(event) do |fiber|
330-
Crystal::Scheduler.enqueue(fiber)
341+
{% if flag?(:execution_context) %}
342+
fiber.execution_context.enqueue(fiber)
343+
{% else %}
344+
Crystal::Scheduler.enqueue(fiber)
345+
{% end %}
331346
end)
332347
end
333348

334349
pd.value.@writers.ready_all do |event|
335350
pd.value.@event_loop.try(&.unsafe_resume_io(event) do |fiber|
336-
Crystal::Scheduler.enqueue(fiber)
351+
{% if flag?(:execution_context) %}
352+
fiber.execution_context.enqueue(fiber)
353+
{% else %}
354+
Crystal::Scheduler.enqueue(fiber)
355+
{% end %}
337356
end)
338357
end
339358

src/crystal/scheduler.cr

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
{% skip_file if flag?(:execution_context) %}
2+
13
require "crystal/event_loop"
24
require "crystal/system/print_error"
35
require "fiber"
@@ -66,7 +68,6 @@ class Crystal::Scheduler
6668
end
6769

6870
def self.sleep(time : Time::Span) : Nil
69-
Crystal.trace :sched, "sleep", for: time
7071
Thread.current.scheduler.sleep(time)
7172
end
7273

src/crystal/system/thread.cr

+41-8
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,47 @@ class Thread
7979

8080
getter name : String?
8181

82+
{% if flag?(:execution_context) %}
83+
# :nodoc:
84+
getter! execution_context : Fiber::ExecutionContext
85+
86+
# :nodoc:
87+
property! scheduler : Fiber::ExecutionContext::Scheduler
88+
89+
# :nodoc:
90+
def execution_context=(@execution_context : Fiber::ExecutionContext) : Fiber::ExecutionContext
91+
main_fiber.execution_context = execution_context
92+
end
93+
94+
# When a fiber terminates we can't release its stack until we swap context
95+
# to another fiber. We can't free/unmap nor push it to a shared stack pool,
96+
# that would result in a segfault.
97+
@dead_fiber_stack : Fiber::Stack?
98+
99+
# :nodoc:
100+
def dying_fiber(fiber : Fiber) : Fiber::Stack?
101+
stack = @dead_fiber_stack
102+
@dead_fiber_stack = fiber.@stack
103+
stack
104+
end
105+
106+
# :nodoc:
107+
def dead_fiber_stack? : Fiber::Stack?
108+
if stack = @dead_fiber_stack
109+
@dead_fiber_stack = nil
110+
stack
111+
end
112+
end
113+
{% else %}
114+
# :nodoc:
115+
getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(self) }
116+
117+
# :nodoc:
118+
def scheduler? : ::Crystal::Scheduler?
119+
@scheduler
120+
end
121+
{% end %}
122+
82123
def self.unsafe_each(&)
83124
# nothing to iterate when @@threads is nil + don't lazily allocate in a
84125
# method called from a GC collection callback!
@@ -165,14 +206,6 @@ class Thread
165206
thread.name = name
166207
end
167208

168-
# :nodoc:
169-
getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(self) }
170-
171-
# :nodoc:
172-
def scheduler? : ::Crystal::Scheduler?
173-
@scheduler
174-
end
175-
176209
protected def start
177210
Thread.threads.push(self)
178211
Thread.current = self

src/crystal/tracing.cr

+10
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,16 @@ module Crystal
8181
write value.name || '?'
8282
end
8383

84+
{% if flag?(:execution_context) %}
85+
def write(value : Fiber::ExecutionContext) : Nil
86+
write value.name
87+
end
88+
89+
def write(value : Fiber::ExecutionContext::Scheduler) : Nil
90+
write value.name
91+
end
92+
{% end %}
93+
8494
def write(value : Pointer) : Nil
8595
write "0x"
8696
System.to_int_slice(value.address, 16, true, 2) { |bytes| write(bytes) }

0 commit comments

Comments
 (0)