Skip to content

WIP: Async and deferred loaders #173

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 21 additions & 6 deletions examples/http_loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,32 +38,47 @@
# An example loader which is blocking and synchronous as a whole, but executes all of its operations concurrently.
module Loaders
class HTTPLoader < GraphQL::Batch::Loader
include GraphQL::Batch::Async

def initialize(host:, size: 4, timeout: 4)
super()
@host = host
@size = size
@timeout = timeout
@futures = {}
end

def perform(operations)
def perform_early(operations)
# This fans out and starts off all the concurrent work, which starts and
# immediately returns Concurrent::Promises::Future` objects for each operation.
operations.each do |operation|
future(operation)
end
end

def perform(operations)
# Collect the futures (and possibly trigger any newly added ones)
futures = operations.map do |operation|
Concurrent::Promises.future do
pool.with { |connection| operation.call(connection) }
end
future(operation)
end

# At this point, all of the concurrent work has been started.

# This converges back in, waiting on each concurrent future to finish, and fulfilling each
# Now it converges back in, waiting on each concurrent future to finish, and fulfilling each
# (non-concurrent) Promise.rb promise.
operations.each_with_index.each do |operation, index|
fulfill(operation, futures[index].value) # .value is a blocking call
fulfill(operation, futures[index].value!) # .value is a blocking call
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fulfill(operation, futures[index].value!) # .value is a blocking call
fulfill(operation, futures[index].value!) # .value! is a blocking call

end
end

private

def future(operation)
@futures[operation] ||= Concurrent::Promises.future do
pool.with { |connection| operation.call(connection) }
end
end

def pool
@pool ||= ConnectionPool.new(size: @size, timeout: @timeout) do
HTTP.persistent(@host)
Expand Down
1 change: 1 addition & 0 deletions lib/graphql/batch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,6 @@ def self.use(schema_defn, executor_class: GraphQL::Batch::Executor)

require_relative "batch/version"
require_relative "batch/loader"
require_relative "batch/async"
require_relative "batch/executor"
require_relative "batch/setup_multiplex"
25 changes: 25 additions & 0 deletions lib/graphql/batch/async.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
module GraphQL::Batch
module Async
def resolve
defer # Let other non-async loaders run to completion first.
@peek_queue_index = 0 # The queue is consumed in super, future peeks will start from the beinning.
super
end

def on_any_loader_wait
@peek_queue_index ||= 0
peek_queue = queue[@peek_queue_index..]
return if peek_queue.empty?
@peek_queue_index = peek_queue.size
perform_early(peek_queue)
end

def perform_early(keys)
raise NotImplementedError, "Implement GraphQL::Batch::Async#perform_early to trigger async operations early"
end

def perform(keys)
raise NotImplementedError, "Implement GraphQL::Batch::Async#perform to wait on the async operations"
end
end
end
16 changes: 16 additions & 0 deletions lib/graphql/batch/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,22 @@ def resolve(loader)
@loading = was_loading
end

# Defer the resolution of the current loader, allowing other loaders to be resolved first.
# This is useful when the current loader has kicked off async or concurrent work, and don't need to
# block execution of the current thread until later.
def defer_to_other_loaders
while (non_deferred_loader = @loaders.find { |_, loader| !loader.deferred && !loader.resolved? })
resolve(non_deferred_loader)
end
end

def on_wait
# FIXME: Better name?
@loaders.each do |_, loader|
loader.on_any_loader_wait
end
end

def tick
resolve(@loaders.shift.last)
end
Expand Down
18 changes: 17 additions & 1 deletion lib/graphql/batch/loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ def current_executor
end
end

attr_accessor :loader_key, :executor
attr_accessor :loader_key, :executor, :deferred
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned with deferred terminology conflating with GraphQL defer, to which this isn't directly related. Can this remain consistent with the "async" terminology, or be expressed on some kind of lazy priority scale?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree. I'd like to use yield and yielded, which is what https://graphql-ruby.org/dataloader/parallelism uses, but that's because it uses fibers. We're conceptually "yielding control" here too.

WDYT @gmac @swalkinshaw @amomchilov ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is an internal implementation detail now, why not just call it async? And it's more consistent with the Async module naming.

We could even get rid of this entirely and just filter out loaders that don't include GraphQL::Batch::Async? (though maybe that's worse for perf)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, though I think that's a worse API. I think yield is a nice fundamental API, that can be used beyond Async. Async builds on top of that (and the "perform on any wait thing).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you put a commit up showing what it looks like?


def initialize
@loader_key = nil
@executor = nil
@queue = nil
@cache = nil
@deferred = false
end

def load(key)
Expand All @@ -66,6 +67,13 @@ def prime(key, value)
cache[cache_key(key)] ||= ::Promise.resolve(value).tap { |p| p.source = self }
end

# Called when any GraphQL::Batch::Loader starts waiting. May be called more than once per loader, if
# the loader is waiting multiple times. Will not be called once per promise.
#
# Use GraphQL::Batch::Async for the common way to use this.
def on_any_loader_wait
end

def resolve # :nodoc:
return if resolved?
load_keys = queue
Expand All @@ -88,6 +96,7 @@ def around_perform
# For Promise#sync
def wait # :nodoc:
if executor
executor.on_wait
executor.resolve(self)
else
resolve
Expand Down Expand Up @@ -146,6 +155,13 @@ def finish_resolve(key)
end
end

def defer
@deferred = true
executor.defer_to_other_loaders
ensure
@deferred = false
end

def cache
@cache ||= {}
end
Expand Down
Loading