Skip to content

WIP: Async and deferred loaders #173

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions examples/http_loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,43 @@ def initialize(host:, size: 4, timeout: 4)
@host = host
@size = size
@timeout = timeout
@futures = {}
end

def perform(operations)
def perform_on_wait(operations)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this perhaps be:

Suggested change
def perform_on_wait(operations)
def perform_on_any_wait(operations)

To match the naming of Loader#on_any_wait

# This fans out and starts off all the concurrent work, which starts and
# immediately returns Concurrent::Promises::Future` objects for each operation.
operations.each do |operation|
future(operation)
end
end

def perform(operations)
# Defer to let other non-async loaders run to completion first.
defer

# Collect the futures (and possibly trigger any newly added ones)
futures = operations.map do |operation|
Concurrent::Promises.future do
pool.with { |connection| operation.call(connection) }
end
future(operation)
end

# At this point, all of the concurrent work has been started.

# This converges back in, waiting on each concurrent future to finish, and fulfilling each
# (non-concurrent) Promise.rb promise.
operations.each_with_index.each do |operation, index|
fulfill(operation, futures[index].value) # .value is a blocking call
fulfill(operation, futures[index].value!) # .value is a blocking call
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fulfill(operation, futures[index].value!) # .value is a blocking call
fulfill(operation, futures[index].value!) # .value! is a blocking call

end
end

private

def future(operation)
@futures[operation] ||= Concurrent::Promises.future do
pool.with { |connection| operation.call(connection) }
end
end

def pool
@pool ||= ConnectionPool.new(size: @size, timeout: @timeout) do
HTTP.persistent(@host)
Expand Down
13 changes: 13 additions & 0 deletions lib/graphql/batch/executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,19 @@ def resolve(loader)
@loading = was_loading
end

def defer(_loader)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't used; is it needed at all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, probably not needed, I added it to make it similar to resolve.

while (non_deferred_loader = @loaders.find { |_, loader| !loader.deferred })
resolve(non_deferred_loader)
end
end

def on_wait
# FIXME: Better name?
@loaders.each do |_, loader|
loader.on_any_wait
end
end

def tick
resolve(@loaders.shift.last)
end
Expand Down
49 changes: 48 additions & 1 deletion lib/graphql/batch/loader.rb
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ def current_executor
end
end

attr_accessor :loader_key, :executor
attr_accessor :loader_key, :executor, :deferred
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned with deferred terminology conflating with GraphQL defer, to which this isn't directly related. Can this remain consistent with the "async" terminology, or be expressed on some kind of lazy priority scale?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree. I'd like to use yield and yielded, which is what https://graphql-ruby.org/dataloader/parallelism uses, but that's because it uses fibers. We're conceptually "yielding control" here too.

WDYT @gmac @swalkinshaw @amomchilov ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is an internal implementation detail now, why not just call it async? And it's more consistent with the Async module naming.

We could even get rid of this entirely and just filter out loaders that don't include GraphQL::Batch::Async? (though maybe that's worse for perf)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, though I think that's a worse API. I think yield is a nice fundamental API, that can be used beyond Async. Async builds on top of that (and the "perform on any wait thing).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you put a commit up showing what it looks like?


def initialize
@loader_key = nil
@executor = nil
@queue = nil
@cache = nil
@deferred = false
end

def load(key)
Expand All @@ -66,6 +67,14 @@ def prime(key, value)
cache[cache_key(key)] ||= ::Promise.resolve(value).tap { |p| p.source = self }
end

def on_any_wait
return if resolved?
load_keys = queue # "Peek" the queue, but don't consume it.
# TODO: Should we have a "peek queue" / "async queue", that we can consume here, to prevent
# duplicate calls to perform_on_wait? (perform_on_wait should be idempotent anyway, but...)
perform_on_wait(load_keys)
end

def resolve # :nodoc:
return if resolved?
load_keys = queue
Expand All @@ -88,6 +97,7 @@ def around_perform
# For Promise#sync
def wait # :nodoc:
if executor
executor.on_wait
executor.resolve(self)
else
resolve
Expand Down Expand Up @@ -126,6 +136,36 @@ def fulfilled?(key)
promise.pending? && promise.source != self
end

def perform_on_wait(keys)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a loader opts into using perform_on_wait, should defer then always be called in perform? Or is there a situation where defer wouldn't be needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, good question.

I can't think of any such situation. Given that the perform_on_wait hook is always called first, all those async loaders will have their opportunity to run first, and then they can always defer and run at the very end. Perhaps we could simplify it so that the loader doesn't have to call defer, that's done automatically. Perhaps a new module that a loader can include like AsyncAndDeferred, which adds the on_any_wait hook etc.

Not entirely sure that's better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that better, especially for the reason that it avoids a potential footgun of not calling defer manually. @amomchilov also mentioned expressing this as a separate class, so maybe the module is the solution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed a commit with a GraphQL::Batch::Async module. What do you think @amomchilov @swalkinshaw ?

# FIXME: Better name?
# Interface to add custom code to e.g. trigger async operations when any loader starts waiting.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move these comments into the method docs above the def? That way they show on hover, in generated docs, etc.

# Example:
#
# def initialize
# super()
# @futures = {}
# end
#
# def perform_on_wait(keys)
# keys.each do |key|
# future(key)
# end
# end
#
# def perform(keys)
# defer # let other non-async loaders run to completion first.
# keys.each do |key|
# future(key).value!
# end
# end
#
# def future(key)
# @futures[key] ||= Concurrent::Promises.future do
# # Perform the async operation
# end
# end
end

# Must override to load the keys and call #fulfill for each key
def perform(keys)
raise NotImplementedError
Expand All @@ -146,6 +186,13 @@ def finish_resolve(key)
end
end

def defer
@deferred = true
executor.defer(self)
ensure
@deferred = false
end

def cache
@cache ||= {}
end
Expand Down
Loading