Skip to content

Commit b5d7bf5

Browse files
authored
Refactor batching logic (#220)
1 parent a3dd591 commit b5d7bf5

File tree

3 files changed

+86
-70
lines changed

3 files changed

+86
-70
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ Create a new `DataLoader` given a batch loading function and options.
321321

322322
| Option Key | Type | Default | Description |
323323
| ---------- | ---- | ------- | ----------- |
324-
| *batch* | Boolean | `true` | Set to `false` to disable batching, invoking `batchLoadFn` with a single load key.
324+
| *batch* | Boolean | `true` | Set to `false` to disable batching, invoking `batchLoadFn` with a single load key. This is equivalent to setting `maxBatchSize` to `1`.
325325
| *maxBatchSize* | Number | `Infinity` | Limits the number of items that get passed in to the `batchLoadFn`.
326326
| *cache* | Boolean | `true` | Set to `false` to disable memoization caching, creating a new Promise and new key in the `batchLoadFn` for every load of the same key.
327327
| *cacheKeyFn* | Function | `key => key` | Produces cache key for a given load key. Useful when objects are keys and two objects should be considered equivalent.

src/__tests__/dataloader.test.js

+18
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,24 @@ describe('Primary API', () => {
120120
expect(loadCalls).toEqual([ [ 1 ] ]);
121121
});
122122

123+
it('coalesces identical requests across sized batches', async () => {
124+
const [ identityLoader, loadCalls ] = idLoader<number>({ maxBatchSize: 2 });
125+
126+
const promise1a = identityLoader.load(1);
127+
const promise2 = identityLoader.load(2);
128+
const promise1b = identityLoader.load(1);
129+
const promise3 = identityLoader.load(3);
130+
131+
const [ value1a, value2, value1b, value3 ] =
132+
await Promise.all([ promise1a, promise2, promise1b, promise3 ]);
133+
expect(value1a).toBe(1);
134+
expect(value2).toBe(2);
135+
expect(value1b).toBe(1);
136+
expect(value3).toBe(3);
137+
138+
expect(loadCalls).toEqual([ [ 1, 2 ], [ 3 ] ]);
139+
});
140+
123141
it('caches repeated requests', async () => {
124142
const [ identityLoader, loadCalls ] = idLoader<string>();
125143

src/index.js

+67-69
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,14 @@ class DataLoader<K, V, C = K> {
5454
this._batchLoadFn = batchLoadFn;
5555
this._options = options;
5656
this._promiseCache = getValidCacheMap(options);
57-
this._queue = [];
57+
this._batch = null;
5858
}
5959

6060
// Private
6161
_batchLoadFn: BatchLoadFn<K, V>;
6262
_options: ?Options<K, V, C>;
6363
_promiseCache: ?CacheMap<C, Promise<V>>;
64-
_queue: LoaderQueue<K, V>;
64+
_batch: Batch<K, V> | null;
6565

6666
/**
6767
* Loads a key, returning a `Promise` for the value represented by that key.
@@ -76,7 +76,7 @@ class DataLoader<K, V, C = K> {
7676

7777
// Determine options
7878
var options = this._options;
79-
var shouldBatch = !options || options.batch !== false;
79+
var batch = getCurrentBatch(this);
8080
var cache = this._promiseCache;
8181
var cacheKey = getCacheKey(options, key);
8282

@@ -88,23 +88,11 @@ class DataLoader<K, V, C = K> {
8888
}
8989
}
9090

91-
// Otherwise, produce a new Promise for this value.
91+
// Otherwise, produce a new Promise for this key, and enqueue it to be
92+
// dispatched along with the current batch.
93+
batch.keys.push(key);
9294
var promise = new Promise((resolve, reject) => {
93-
// Enqueue this Promise to be dispatched.
94-
this._queue.push({ key, resolve, reject });
95-
96-
// Determine if a dispatch of this queue should be scheduled.
97-
// A single dispatch should be scheduled per queue at the time when the
98-
// queue changes from "empty" to "full".
99-
if (this._queue.length === 1) {
100-
if (shouldBatch) {
101-
// If batching, schedule a task to dispatch the queue.
102-
enqueuePostPromiseJob(() => dispatchQueue(this));
103-
} else {
104-
// Otherwise dispatch the (queue of one) immediately.
105-
dispatchQueue(this);
106-
}
107-
}
95+
batch.callbacks.push({ resolve, reject });
10896
});
10997

11098
// If caching, cache this promise.
@@ -246,43 +234,61 @@ var enqueuePostPromiseJob =
246234
// Private: cached resolved Promise instance
247235
var resolvedPromise;
248236

249-
// Private: given the current state of a Loader instance, perform a batch load
250-
// from its current queue.
251-
function dispatchQueue<K, V>(loader: DataLoader<K, V, any>) {
252-
// Take the current loader queue, replacing it with an empty queue.
253-
var queue = loader._queue;
254-
loader._queue = [];
255-
256-
// If a maxBatchSize was provided and the queue is longer, then segment the
257-
// queue into multiple batches, otherwise treat the queue as a single batch.
258-
var maxBatchSize = loader._options && loader._options.maxBatchSize;
259-
if (maxBatchSize && maxBatchSize > 0 && maxBatchSize < queue.length) {
260-
for (var i = 0; i < queue.length / maxBatchSize; i++) {
261-
dispatchQueueBatch(
262-
loader,
263-
queue.slice(i * maxBatchSize, (i + 1) * maxBatchSize)
264-
);
265-
}
266-
} else {
267-
dispatchQueueBatch(loader, queue);
237+
// Private: Describes a batch of requests
238+
type Batch<K, V> = {
239+
hasDispatched: boolean,
240+
keys: Array<K>,
241+
callbacks: Array<{
242+
resolve: (value: V) => void;
243+
reject: (error: Error) => void;
244+
}>
245+
}
246+
247+
// Private: Either returns the current batch, or creates and schedules a
248+
// dispatch of a new batch for the given loader.
249+
function getCurrentBatch<K, V>(loader: DataLoader<K, V, any>): Batch<K, V> {
250+
var options = loader._options;
251+
var maxBatchSize =
252+
(options && options.maxBatchSize) ||
253+
(options && options.batch === false ? 1 : 0);
254+
255+
// If there is an existing batch which has not yet dispatched and is within
256+
// the limit of the batch size, then return it.
257+
var existingBatch = loader._batch;
258+
if (
259+
existingBatch !== null &&
260+
!existingBatch.hasDispatched &&
261+
(maxBatchSize === 0 || existingBatch.keys.length < maxBatchSize)
262+
) {
263+
return existingBatch;
268264
}
265+
266+
// Otherwise, create a new batch for this loader.
267+
var newBatch = { hasDispatched: false, keys: [], callbacks: [] };
268+
269+
// Store it on the loader so it may be reused.
270+
loader._batch = newBatch;
271+
272+
// Then schedule a task to dispatch this batch of requests.
273+
enqueuePostPromiseJob(() => dispatchBatch(loader, newBatch));
274+
275+
return newBatch;
269276
}
270277

271-
function dispatchQueueBatch<K, V>(
278+
function dispatchBatch<K, V>(
272279
loader: DataLoader<K, V, any>,
273-
queue: LoaderQueue<K, V>
280+
batch: Batch<K, V>
274281
) {
275-
// Collect all keys to be loaded in this dispatch
276-
var keys = queue.map(({ key }) => key);
282+
// Mark this batch as having been dispatched.
283+
batch.hasDispatched = true;
277284

278-
// Call the provided batchLoadFn for this loader with the loader queue's keys.
279-
var batchLoadFn = loader._batchLoadFn;
280-
// Call with the loader as the `this` context.
281-
var batchPromise = batchLoadFn.call(loader, keys);
285+
// Call the provided batchLoadFn for this loader with the batch's keys and
286+
// with the loader as the `this` context.
287+
var batchPromise = loader._batchLoadFn(batch.keys);
282288

283289
// Assert the expected response from batchLoadFn
284290
if (!batchPromise || typeof batchPromise.then !== 'function') {
285-
return failedDispatch(loader, queue, new TypeError(
291+
return failedDispatch(loader, batch, new TypeError(
286292
'DataLoader must be constructed with a function which accepts ' +
287293
'Array<key> and returns Promise<Array<value>>, but the function did ' +
288294
`not return a Promise: ${String(batchPromise)}.`
@@ -300,41 +306,40 @@ function dispatchQueueBatch<K, V>(
300306
`not return a Promise of an Array: ${String(values)}.`
301307
);
302308
}
303-
if (values.length !== keys.length) {
309+
if (values.length !== batch.keys.length) {
304310
throw new TypeError(
305311
'DataLoader must be constructed with a function which accepts ' +
306312
'Array<key> and returns Promise<Array<value>>, but the function did ' +
307313
'not return a Promise of an Array of the same length as the Array ' +
308314
'of keys.' +
309-
`\n\nKeys:\n${String(keys)}` +
315+
`\n\nKeys:\n${String(batch.keys)}` +
310316
`\n\nValues:\n${String(values)}`
311317
);
312318
}
313319

314-
// Step through the values, resolving or rejecting each Promise in the
315-
// loaded queue.
316-
queue.forEach(({ resolve, reject }, index) => {
317-
var value = values[index];
320+
// Step through values, resolving or rejecting each Promise in the batch.
321+
for (var i = 0; i < batch.callbacks.length; i++) {
322+
var value = values[i];
318323
if (value instanceof Error) {
319-
reject(value);
324+
batch.callbacks[i].reject(value);
320325
} else {
321-
resolve(value);
326+
batch.callbacks[i].resolve(value);
322327
}
323-
});
324-
}).catch(error => failedDispatch(loader, queue, error));
328+
}
329+
}).catch(error => failedDispatch(loader, batch, error));
325330
}
326331

327332
// Private: do not cache individual loads if the entire batch dispatch fails,
328333
// but still reject each request so they do not hang.
329334
function failedDispatch<K, V>(
330335
loader: DataLoader<K, V, any>,
331-
queue: LoaderQueue<K, V>,
336+
batch: Batch<K, V>,
332337
error: Error
333338
) {
334-
queue.forEach(({ key, reject }) => {
335-
loader.clear(key);
336-
reject(error);
337-
});
339+
for (var i = 0; i < batch.keys.length; i++) {
340+
loader.clear(batch.keys[i]);
341+
batch.callbacks[i].reject(error);
342+
}
338343
}
339344

340345
// Private: produce a cache key for a given key (and options)
@@ -369,13 +374,6 @@ function getValidCacheMap<K, V, C>(
369374
return cacheMap;
370375
}
371376

372-
// Private
373-
type LoaderQueue<K, V> = Array<{
374-
key: K;
375-
resolve: (value: V) => void;
376-
reject: (error: Error) => void;
377-
}>;
378-
379377
// Private
380378
function isArrayLike(x: mixed): boolean {
381379
return (

0 commit comments

Comments
 (0)