Skip to content

Commit fe9b4aa

Browse files
committed
WIP: Async and deferred loaders
1 parent d609126 commit fe9b4aa

File tree

3 files changed

+80
-6
lines changed

3 files changed

+80
-6
lines changed

examples/http_loader.rb

+21-5
Original file line numberDiff line numberDiff line change
@@ -43,27 +43,43 @@ def initialize(host:, size: 4, timeout: 4)
4343
@host = host
4444
@size = size
4545
@timeout = timeout
46+
@futures = {}
4647
end
4748

48-
def perform(operations)
49+
def perform_on_wait(operations)
4950
# This fans out and starts off all the concurrent work, which starts and
5051
# immediately returns Concurrent::Promises::Future` objects for each operation.
52+
operations.each do |operation|
53+
future(operation)
54+
end
55+
end
56+
57+
def perform(operations)
58+
# Defer to let other non-async loaders run to completion first.
59+
defer
60+
61+
# Collect the futures (and possibly trigger any newly added ones)
5162
futures = operations.map do |operation|
52-
Concurrent::Promises.future do
53-
pool.with { |connection| operation.call(connection) }
54-
end
63+
future(operation)
5564
end
65+
5666
# At this point, all of the concurrent work has been started.
5767

5868
# This converges back in, waiting on each concurrent future to finish, and fulfilling each
5969
# (non-concurrent) Promise.rb promise.
6070
operations.each_with_index.each do |operation, index|
61-
fulfill(operation, futures[index].value) # .value is a blocking call
71+
fulfill(operation, futures[index].value!) # .value is a blocking call
6272
end
6373
end
6474

6575
private
6676

77+
def future(operation)
78+
@futures[operation] ||= Concurrent::Promises.future do
79+
pool.with { |connection| operation.call(connection) }
80+
end
81+
end
82+
6783
def pool
6884
@pool ||= ConnectionPool.new(size: @size, timeout: @timeout) do
6985
HTTP.persistent(@host)

lib/graphql/batch/executor.rb

+13
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,19 @@ def resolve(loader)
5353
@loading = was_loading
5454
end
5555

56+
def defer(loader)
57+
while (non_deferred_loader = @loaders.find { |_, loader| !loader.deferred })
58+
resolve(non_deferred_loader)
59+
end
60+
end
61+
62+
def on_wait
63+
# FIXME: Better name?
64+
@loaders.each do |_, loader|
65+
loader.on_any_wait
66+
end
67+
end
68+
5669
def tick
5770
resolve(@loaders.shift.last)
5871
end

lib/graphql/batch/loader.rb

+46-1
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,14 @@ def current_executor
4242
end
4343
end
4444

45-
attr_accessor :loader_key, :executor
45+
attr_accessor :loader_key, :executor, :deferred
4646

4747
def initialize
4848
@loader_key = nil
4949
@executor = nil
5050
@queue = nil
5151
@cache = nil
52+
@deferred = false
5253
end
5354

5455
def load(key)
@@ -66,6 +67,12 @@ def prime(key, value)
6667
cache[cache_key(key)] ||= ::Promise.resolve(value).tap { |p| p.source = self }
6768
end
6869

70+
def on_any_wait
71+
return if resolved?
72+
load_keys = queue # "Peek" the queue, but don't consume it.
73+
perform_on_wait(load_keys)
74+
end
75+
6976
def resolve # :nodoc:
7077
return if resolved?
7178
load_keys = queue
@@ -88,6 +95,7 @@ def around_perform
8895
# For Promise#sync
8996
def wait # :nodoc:
9097
if executor
98+
executor.on_wait
9199
executor.resolve(self)
92100
else
93101
resolve
@@ -126,6 +134,36 @@ def fulfilled?(key)
126134
promise.pending? && promise.source != self
127135
end
128136

137+
def perform_on_wait(keys)
138+
# FIXME: Better name?
139+
# Interface to add custom code to e.g. trigger async operations when any loader starts waiting.
140+
# Example:
141+
#
142+
# def initialize
143+
# super()
144+
# @futures = {}
145+
# end
146+
#
147+
# def perform_on_wait(keys)
148+
# keys.each do |key|
149+
# future(key)
150+
# end
151+
# end
152+
#
153+
# def perform(keys)
154+
# defer # let other non-async loaders run to completion first.
155+
# keys.each do |key|
156+
# future(key).value!
157+
# end
158+
# end
159+
#
160+
# def future(key)
161+
# @futures[key] ||= Concurrent::Promises.future do
162+
# # Perform the async operation
163+
# end
164+
# end
165+
end
166+
129167
# Must override to load the keys and call #fulfill for each key
130168
def perform(keys)
131169
raise NotImplementedError
@@ -146,6 +184,13 @@ def finish_resolve(key)
146184
end
147185
end
148186

187+
def defer
188+
@deferred = true
189+
executor.defer(self)
190+
ensure
191+
@deferred = false
192+
end
193+
149194
def cache
150195
@cache ||= {}
151196
end

0 commit comments

Comments
 (0)