Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC 2: Skeleton for ExecutionContext #15350

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 31 additions & 10 deletions src/concurrent.cr
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
require "fiber"
require "channel"
require "crystal/scheduler"
require "crystal/tracing"

{% if flag?(:execution_context) %}
require "execution_context"
{% else %}
require "crystal/scheduler"
{% end %}

# Blocks the current fiber for the specified number of seconds.
#
# While this fiber is waiting this time, other ready-to-execute
Expand All @@ -12,25 +17,36 @@ def sleep(seconds : Number) : Nil
if seconds < 0
raise ArgumentError.new "Sleep seconds must be positive"
end

Crystal::Scheduler.sleep(seconds.seconds)
sleep(seconds.seconds)
end

# Blocks the current Fiber for the specified time span.
#
# While this fiber is waiting this time, other ready-to-execute
# fibers might start their execution.
def sleep(time : Time::Span) : Nil
Crystal::Scheduler.sleep(time)
Crystal.trace :sched, "sleep", for: time

{% if flag?(:execution_context) %}
Fiber.current.resume_event.add(time)
ExecutionContext.reschedule
ysbaddaden marked this conversation as resolved.
Show resolved Hide resolved
{% else %}
Crystal::Scheduler.sleep(time)
{% end %}
end

# Blocks the current fiber forever.
#
# Meanwhile, other ready-to-execute fibers might start their execution.
def sleep : Nil
Crystal::Scheduler.reschedule
{% if flag?(:execution_context) %}
ExecutionContext.reschedule
{% else %}
Crystal::Scheduler.reschedule
{% end %}
end

{% begin %}
# Spawns a new fiber.
#
# NOTE: The newly created fiber doesn't run as soon as spawned.
Expand Down Expand Up @@ -64,12 +80,17 @@ end
# wg.wait
# ```
def spawn(*, name : String? = nil, same_thread = false, &block)
fiber = Fiber.new(name, &block)
Crystal.trace :sched, "spawn", fiber: fiber
{% if flag?(:preview_mt) %} fiber.set_current_thread if same_thread {% end %}
fiber.enqueue
fiber
{% if flag?(:execution_context) %}
ExecutionContext::Scheduler.current.spawn(name: name, same_thread: same_thread, &block)
{% else %}
fiber = Fiber.new(name, &block)
Crystal.trace :sched, "spawn", fiber: fiber
{% if flag?(:preview_mt) %} fiber.set_current_thread if same_thread {% end %}
fiber.enqueue
fiber
{% end %}
end
{% end %}

# Spawns a fiber by first creating a `Proc`, passing the *call*'s
# expressions to it, and letting the `Proc` finally invoke the *call*.
Expand Down
14 changes: 11 additions & 3 deletions src/crystal/event_loop.cr
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,20 @@ abstract class Crystal::EventLoop

@[AlwaysInline]
def self.current : self
Crystal::Scheduler.event_loop
{% if flag?(:execution_context) %}
ExecutionContext.current.event_loop
{% else %}
Crystal::Scheduler.event_loop
{% end %}
end

@[AlwaysInline]
def self.current? : self?
Crystal::Scheduler.event_loop?
def self.current? : self | Nil
{% if flag?(:execution_context) %}
ExecutionContext.current.event_loop
{% else %}
Crystal::Scheduler.event_loop?
{% end %}
end

# Runs the loop.
Expand Down
18 changes: 15 additions & 3 deletions src/crystal/event_loop/polling.cr
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,11 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
# NOTE: thread unsafe
def run(blocking : Bool) : Bool
system_run(blocking) do |fiber|
Crystal::Scheduler.enqueue(fiber)
{% if flag?(:execution_context) %}
fiber.execution_context.enqueue(fiber)
{% else %}
Crystal::Scheduler.enqueue(fiber)
{% end %}
end
true
end
Expand Down Expand Up @@ -303,13 +307,21 @@ abstract class Crystal::EventLoop::Polling < Crystal::EventLoop
Polling.arena.free(index) do |pd|
[email protected]_all do |event|
pd.value.@event_loop.try(&.unsafe_resume_io(event) do |fiber|
Crystal::Scheduler.enqueue(fiber)
{% if flag?(:execution_context) %}
fiber.execution_context.enqueue(fiber)
{% else %}
Crystal::Scheduler.enqueue(fiber)
{% end %}
end)
end

[email protected]_all do |event|
pd.value.@event_loop.try(&.unsafe_resume_io(event) do |fiber|
Crystal::Scheduler.enqueue(fiber)
{% if flag?(:execution_context) %}
fiber.execution_context.enqueue(fiber)
{% else %}
Crystal::Scheduler.enqueue(fiber)
{% end %}
end)
end

Expand Down
3 changes: 2 additions & 1 deletion src/crystal/scheduler.cr
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{% skip_file if flag?(:execution_context) %}

require "crystal/event_loop"
require "crystal/system/print_error"
require "fiber"
Expand Down Expand Up @@ -66,7 +68,6 @@ class Crystal::Scheduler
end

def self.sleep(time : Time::Span) : Nil
Crystal.trace :sched, "sleep", for: time
Thread.current.scheduler.sleep(time)
end

Expand Down
41 changes: 33 additions & 8 deletions src/crystal/system/thread.cr
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,39 @@ class Thread

getter name : String?

{% if flag?(:execution_context) %}
# :nodoc:
getter! execution_context : ExecutionContext

# :nodoc:
property! current_scheduler : ExecutionContext::Scheduler

# :nodoc:
def execution_context=(@execution_context : ExecutionContext) : ExecutionContext
main_fiber.execution_context = execution_context
end

# :nodoc:
def dead_fiber=(@dead_fiber : Fiber) : Fiber
end
ysbaddaden marked this conversation as resolved.
Show resolved Hide resolved

# :nodoc:
def dead_fiber? : Fiber?
if fiber = @dead_fiber
@dead_fiber = nil
fiber
end
end
{% else %}
# :nodoc:
getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(self) }

# :nodoc:
def scheduler? : ::Crystal::Scheduler?
@scheduler
end
{% end %}

def self.unsafe_each(&)
# nothing to iterate when @@threads is nil + don't lazily allocate in a
# method called from a GC collection callback!
Expand Down Expand Up @@ -154,14 +187,6 @@ class Thread
thread.name = name
end

# :nodoc:
getter scheduler : Crystal::Scheduler { Crystal::Scheduler.new(self) }

# :nodoc:
def scheduler? : ::Crystal::Scheduler?
@scheduler
end

protected def start
Thread.threads.push(self)
Thread.current = self
Expand Down
1 change: 1 addition & 0 deletions src/crystal/system/unix/signal.cr
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ require "c/signal"
require "c/stdio"
require "c/sys/wait"
require "c/unistd"
require "../print_error"
ysbaddaden marked this conversation as resolved.
Show resolved Hide resolved

module Crystal::System::Signal
# The number of libc functions that can be called safely from a signal(2)
Expand Down
10 changes: 10 additions & 0 deletions src/crystal/tracing.cr
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,16 @@ module Crystal
write value.name || '?'
end

{% if flag?(:execution_context) %}
def write(value : ExecutionContext) : Nil
write value.name
end

def write(value : ExecutionContext::Scheduler) : Nil
write value.name
end
{% end %}

def write(value : Pointer) : Nil
write "0x"
System.to_int_slice(value.address, 16, true, 2) { |bytes| write(bytes) }
Expand Down
100 changes: 100 additions & 0 deletions src/execution_context/execution_context.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
require "../crystal/event_loop"
require "../crystal/system/thread"
require "../crystal/system/thread_linked_list"
require "../fiber"
require "../fiber/stack_pool"
require "./scheduler"

{% raise "ERROR: execution contexts require the `preview_mt` compilation flag" unless flag?(:preview_mt) %}

module ExecutionContext
ysbaddaden marked this conversation as resolved.
Show resolved Hide resolved
@@default : ExecutionContext?

@[AlwaysInline]
def self.default : ExecutionContext
@@default.not_nil!("expected default execution context to have been setup")
end

# :nodoc:
def self.init_default_context : Nil
raise NotImplementedError.new("No execution context implementations (yet)")
end

# Returns the default number of workers to start in the execution context.
def self.default_workers_count : Int32
ENV["CRYSTAL_WORKERS"]?.try(&.to_i?) || Math.min(System.cpu_count.to_i, 32)
end

# :nodoc:
protected class_getter(execution_contexts) { Thread::LinkedList(ExecutionContext).new }

# :nodoc:
property next : ExecutionContext?

# :nodoc:
property previous : ExecutionContext?

# :nodoc:
def self.unsafe_each(&) : Nil
@@execution_contexts.try(&.unsafe_each { |execution_context| yield execution_context })
end

def self.each(&) : Nil
execution_contexts.each { |execution_context| yield execution_context }
end

@[AlwaysInline]
def self.current : ExecutionContext
Thread.current.execution_context
end

# Tells the current scheduler to suspend the current fiber and resume the
# next runnable fiber. The current fiber will never be resumed; you're
# responsible to reenqueue it.
#
# This method is safe as it only operates on the current `ExecutionContext`
# and `Scheduler`.
@[AlwaysInline]
def self.reschedule : Nil
Scheduler.current.reschedule
end

# Tells the current scheduler to suspend the current fiber and to resume
# *fiber* instead. The current fiber will never be resumed; you're responsible
# to reenqueue it.
#
# Raises `RuntimeError` if the fiber doesn't belong to the current execution
# context.
#
# This method is safe as it only operates on the current `ExecutionContext`
# and `Scheduler`.
def self.resume(fiber : Fiber) : Nil
if fiber.execution_context == current
Scheduler.current.resume(fiber)
else
raise RuntimeError.new("Can't resume fiber from #{fiber.execution_context} into #{current}")
end
end

# Creates a new fiber then calls `#enqueue` to add it to the execution
# context.
#
# May be called from any `ExecutionContext` (i.e. must be thread-safe).
def spawn(*, name : String? = nil, &block : ->) : Fiber
Fiber.new(name, self, &block).tap { |fiber| enqueue(fiber) }
end

# Legacy support for the `same_thread` argument. Each execution context may
# decide to support it or not (e.g. a single threaded context can accept it).
abstract def spawn(*, name : String? = nil, same_thread : Bool, &block : ->) : Fiber

abstract def stack_pool : Fiber::StackPool
abstract def stack_pool? : Fiber::StackPool?

abstract def event_loop : Crystal::EventLoop

# Enqueues a fiber to be resumed inside the execution context.
#
# May be called from any ExecutionContext (i.e. must be thread-safe).
abstract def enqueue(fiber : Fiber) : Nil
end
Loading