Skip to content

Commit 50227fe

Browse files
committed
Add GraphQL::Batch::Async
1 parent 046a00b commit 50227fe

File tree

5 files changed

+42
-45
lines changed

5 files changed

+42
-45
lines changed

examples/http_loader.rb

+4-5
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
# An example loader which is blocking and synchronous as a whole, but executes all of its operations concurrently.
3939
module Loaders
4040
class HTTPLoader < GraphQL::Batch::Loader
41+
include GraphQL::Batch::Async
42+
4143
def initialize(host:, size: 4, timeout: 4)
4244
super()
4345
@host = host
@@ -46,7 +48,7 @@ def initialize(host:, size: 4, timeout: 4)
4648
@futures = {}
4749
end
4850

49-
def perform_on_wait(operations)
51+
def perform_early(operations)
5052
# This fans out and starts off all the concurrent work, which starts and
5153
# immediately returns Concurrent::Promises::Future` objects for each operation.
5254
operations.each do |operation|
@@ -55,17 +57,14 @@ def perform_on_wait(operations)
5557
end
5658

5759
def perform(operations)
58-
# Defer to let other non-async loaders run to completion first.
59-
defer
60-
6160
# Collect the futures (and possibly trigger any newly added ones)
6261
futures = operations.map do |operation|
6362
future(operation)
6463
end
6564

6665
# At this point, all of the concurrent work has been started.
6766

68-
# This converges back in, waiting on each concurrent future to finish, and fulfilling each
67+
# Now it converges back in, waiting on each concurrent future to finish, and fulfilling each
6968
# (non-concurrent) Promise.rb promise.
7069
operations.each_with_index.each do |operation, index|
7170
fulfill(operation, futures[index].value!) # .value is a blocking call

lib/graphql/batch.rb

+1
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,6 @@ def self.use(schema_defn, executor_class: GraphQL::Batch::Executor)
3838

3939
require_relative "batch/version"
4040
require_relative "batch/loader"
41+
require_relative "batch/async"
4142
require_relative "batch/executor"
4243
require_relative "batch/setup_multiplex"

lib/graphql/batch/async.rb

+25
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
module GraphQL::Batch
2+
module Async
3+
def resolve
4+
defer # Let other non-async loaders run to completion first.
5+
@peek_queue_index = 0 # The queue is consumed in super, future peeks will start from the beinning.
6+
super
7+
end
8+
9+
def on_any_loader_wait
10+
@peek_queue_index ||= 0
11+
peek_queue = queue[@peek_queue_index..]
12+
return if peek_queue.empty?
13+
@peek_queue_index = peek_queue.size
14+
perform_early(peek_queue)
15+
end
16+
17+
def perform_early(keys)
18+
raise NotImplementedError, "Implement GraphQL::Batch::Async#perform_early to trigger async operations early"
19+
end
20+
21+
def perform(keys)
22+
raise NotImplementedError, "Implement GraphQL::Batch::Async#perform to wait on the async operations"
23+
end
24+
end
25+
end

lib/graphql/batch/executor.rb

+6-3
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,19 @@ def resolve(loader)
5353
@loading = was_loading
5454
end
5555

56-
def defer(_loader)
57-
while (non_deferred_loader = @loaders.find { |_, loader| !loader.deferred })
56+
# Defer the resolution of the current loader, allowing other loaders to be resolved first.
57+
# This is useful when the current loader has kicked off async or concurrent work, and don't need to
58+
# block execution of the current thread until later.
59+
def defer_to_other_loaders
60+
while (non_deferred_loader = @loaders.find { |_, loader| !loader.deferred && !loader.resolved? })
5861
resolve(non_deferred_loader)
5962
end
6063
end
6164

6265
def on_wait
6366
# FIXME: Better name?
6467
@loaders.each do |_, loader|
65-
loader.on_any_wait
68+
loader.on_any_loader_wait
6669
end
6770
end
6871

lib/graphql/batch/loader.rb

+6-37
Original file line numberDiff line numberDiff line change
@@ -67,12 +67,11 @@ def prime(key, value)
6767
cache[cache_key(key)] ||= ::Promise.resolve(value).tap { |p| p.source = self }
6868
end
6969

70-
def on_any_wait
71-
return if resolved?
72-
load_keys = queue # "Peek" the queue, but don't consume it.
73-
# TODO: Should we have a "peek queue" / "async queue", that we can consume here, to prevent
74-
# duplicate calls to perform_on_wait? (perform_on_wait should be idempotent anyway, but...)
75-
perform_on_wait(load_keys)
70+
# Called when any GraphQL::Batch::Loader starts waiting. May be called more than once per loader, if
71+
# the loader is waiting multiple times. Will not be called once per promise.
72+
#
73+
# Use GraphQL::Batch::Async for the common way to use this.
74+
def on_any_loader_wait
7675
end
7776

7877
def resolve # :nodoc:
@@ -136,36 +135,6 @@ def fulfilled?(key)
136135
promise.pending? && promise.source != self
137136
end
138137

139-
def perform_on_wait(keys)
140-
# FIXME: Better name?
141-
# Interface to add custom code to e.g. trigger async operations when any loader starts waiting.
142-
# Example:
143-
#
144-
# def initialize
145-
# super()
146-
# @futures = {}
147-
# end
148-
#
149-
# def perform_on_wait(keys)
150-
# keys.each do |key|
151-
# future(key)
152-
# end
153-
# end
154-
#
155-
# def perform(keys)
156-
# defer # let other non-async loaders run to completion first.
157-
# keys.each do |key|
158-
# future(key).value!
159-
# end
160-
# end
161-
#
162-
# def future(key)
163-
# @futures[key] ||= Concurrent::Promises.future do
164-
# # Perform the async operation
165-
# end
166-
# end
167-
end
168-
169138
# Must override to load the keys and call #fulfill for each key
170139
def perform(keys)
171140
raise NotImplementedError
@@ -188,7 +157,7 @@ def finish_resolve(key)
188157

189158
def defer
190159
@deferred = true
191-
executor.defer(self)
160+
executor.defer_to_other_loaders
192161
ensure
193162
@deferred = false
194163
end

0 commit comments

Comments
 (0)