Skip to content

Commit 046a00b

Browse files
committed
WIP: Async and deferred loaders
1 parent d609126 commit 046a00b

File tree

3 files changed

+82
-6
lines changed

3 files changed

+82
-6
lines changed

examples/http_loader.rb

+21-5
Original file line numberDiff line numberDiff line change
@@ -43,27 +43,43 @@ def initialize(host:, size: 4, timeout: 4)
4343
@host = host
4444
@size = size
4545
@timeout = timeout
46+
@futures = {}
4647
end
4748

48-
def perform(operations)
49+
def perform_on_wait(operations)
4950
# This fans out and starts off all the concurrent work, which starts and
5051
# immediately returns Concurrent::Promises::Future` objects for each operation.
52+
operations.each do |operation|
53+
future(operation)
54+
end
55+
end
56+
57+
def perform(operations)
58+
# Defer to let other non-async loaders run to completion first.
59+
defer
60+
61+
# Collect the futures (and possibly trigger any newly added ones)
5162
futures = operations.map do |operation|
52-
Concurrent::Promises.future do
53-
pool.with { |connection| operation.call(connection) }
54-
end
63+
future(operation)
5564
end
65+
5666
# At this point, all of the concurrent work has been started.
5767

5868
# This converges back in, waiting on each concurrent future to finish, and fulfilling each
5969
# (non-concurrent) Promise.rb promise.
6070
operations.each_with_index.each do |operation, index|
61-
fulfill(operation, futures[index].value) # .value is a blocking call
71+
fulfill(operation, futures[index].value!) # .value is a blocking call
6272
end
6373
end
6474

6575
private
6676

77+
def future(operation)
78+
@futures[operation] ||= Concurrent::Promises.future do
79+
pool.with { |connection| operation.call(connection) }
80+
end
81+
end
82+
6783
def pool
6884
@pool ||= ConnectionPool.new(size: @size, timeout: @timeout) do
6985
HTTP.persistent(@host)

lib/graphql/batch/executor.rb

+13
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,19 @@ def resolve(loader)
5353
@loading = was_loading
5454
end
5555

56+
def defer(_loader)
57+
while (non_deferred_loader = @loaders.find { |_, loader| !loader.deferred })
58+
resolve(non_deferred_loader)
59+
end
60+
end
61+
62+
def on_wait
63+
# FIXME: Better name?
64+
@loaders.each do |_, loader|
65+
loader.on_any_wait
66+
end
67+
end
68+
5669
def tick
5770
resolve(@loaders.shift.last)
5871
end

lib/graphql/batch/loader.rb

+48-1
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,14 @@ def current_executor
4242
end
4343
end
4444

45-
attr_accessor :loader_key, :executor
45+
attr_accessor :loader_key, :executor, :deferred
4646

4747
def initialize
4848
@loader_key = nil
4949
@executor = nil
5050
@queue = nil
5151
@cache = nil
52+
@deferred = false
5253
end
5354

5455
def load(key)
@@ -66,6 +67,14 @@ def prime(key, value)
6667
cache[cache_key(key)] ||= ::Promise.resolve(value).tap { |p| p.source = self }
6768
end
6869

70+
def on_any_wait
71+
return if resolved?
72+
load_keys = queue # "Peek" the queue, but don't consume it.
73+
# TODO: Should we have a "peek queue" / "async queue", that we can consume here, to prevent
74+
# duplicate calls to perform_on_wait? (perform_on_wait should be idempotent anyway, but...)
75+
perform_on_wait(load_keys)
76+
end
77+
6978
def resolve # :nodoc:
7079
return if resolved?
7180
load_keys = queue
@@ -88,6 +97,7 @@ def around_perform
8897
# For Promise#sync
8998
def wait # :nodoc:
9099
if executor
100+
executor.on_wait
91101
executor.resolve(self)
92102
else
93103
resolve
@@ -126,6 +136,36 @@ def fulfilled?(key)
126136
promise.pending? && promise.source != self
127137
end
128138

139+
def perform_on_wait(keys)
140+
# FIXME: Better name?
141+
# Interface to add custom code to e.g. trigger async operations when any loader starts waiting.
142+
# Example:
143+
#
144+
# def initialize
145+
# super()
146+
# @futures = {}
147+
# end
148+
#
149+
# def perform_on_wait(keys)
150+
# keys.each do |key|
151+
# future(key)
152+
# end
153+
# end
154+
#
155+
# def perform(keys)
156+
# defer # let other non-async loaders run to completion first.
157+
# keys.each do |key|
158+
# future(key).value!
159+
# end
160+
# end
161+
#
162+
# def future(key)
163+
# @futures[key] ||= Concurrent::Promises.future do
164+
# # Perform the async operation
165+
# end
166+
# end
167+
end
168+
129169
# Must override to load the keys and call #fulfill for each key
130170
def perform(keys)
131171
raise NotImplementedError
@@ -146,6 +186,13 @@ def finish_resolve(key)
146186
end
147187
end
148188

189+
def defer
190+
@deferred = true
191+
executor.defer(self)
192+
ensure
193+
@deferred = false
194+
end
195+
149196
def cache
150197
@cache ||= {}
151198
end

0 commit comments

Comments
 (0)