Skip to content

Commit c9782ae

Browse files
authored
feat: Coalesce reset options, reset coalesce based on result. (error, success, timeout) (#908)
* Feature: option to reset coalesce depending on result of action. * Fix: Don't crash when coalescing is disabled. * Bugfix: Fix max size for cache entries. * Fixed error whilst merging, accidentally removed cache delete function Merge branch 'main' into coalesce-reset-options * main: chore(main): release 8.3.1 (#907) fix: Incorrect default value of maxEntries for MemoryCache #904 (#906) # Conflicts: # lib/cache.js * Docs: Update readme with coalesceResetOn and Fetch example. --------- Co-authored-by: Daan <>
1 parent 64d3d40 commit c9782ae

File tree

4 files changed

+134
-1
lines changed

4 files changed

+134
-1
lines changed

README.md

+39-1
Original file line numberDiff line numberDiff line change
@@ -459,7 +459,45 @@ The code that is summing the stats samples is here:
459459

460460
### Coalesce calls
461461

462-
Circuitbreaker offers coalescing your calls. If options.coalesce is set, multiple calls to the circuitbreaker will be handled as one, within the given timeframe (options.coalesceTTL). Performance will improve when rapidly firing the circuitbreaker with the same request, especially on a slower action. This is especially useful if multiple events can trigger the same functions at the same time. Of course, caching has the same function, but will only be effective when the call has been executed once to store the return value. Coalescing and cache can be used at the same time, coalescing calls will always use the internal cache.
462+
Circuitbreaker offers coalescing your calls. If options.coalesce is set, multiple calls to the circuitbreaker will be handled as one, within the given timeframe (options.coalesceTTL). Performance will improve when rapidly firing the circuitbreaker with the same request, especially on a slower action. This is especially useful if multiple events can trigger the same functions at the same time. Of course, caching has the same function, but will only be effective when the call has been successfully executed once to store the return value. Coalescing and cache can be used at the same time, coalescing calls will always use the internal cache. Accessing cache is done prior to coalescing. When using `capacity` option, coalescing reduces the capacity used for the CircuitBreaker and will allow higher throughput of the rest of the application without actually firing the CircuitBreaker protected function. The `cacheGetKey` option is used for coalescing as well.
463+
464+
#### Finetuning Coalesce behavior
465+
466+
By default, all calls within given timeframe are coalesced, including errors and timeouts. This might be unwanted, as this twarths retry mechanisms etc. To finetune coalesce behavior, use the coalesceResetOn parameter. Some examples:
467+
468+
| coalesceResetOn value | behavior |
469+
| --------------------- | -------- |
470+
| `error`, `success`, `timeout` | coalescing is reset after every 'done' status, so only concurrent 'running' calls are coalesced. One could consider this the most essential form of coalescing. |
471+
| `error`, `timeout` | No error state is being stored for longer than the running call, you might want to start here if you use any retry mechanisms. |
472+
| `error` | Reset on errors. |
473+
| `timeout` | Reset on timeouts. |
474+
| `success` | Reset on success. |
475+
476+
You can use any combination of `error`, `success`, `timeout`.
477+
478+
#### Using CircuitBreaker with Coalescing and fetch.
479+
480+
When using the CircuitBreaker with Coalescing enabled to protect calling external services using the Fetch API, it's important to keep this in mind: The Response interface of the Fetch API does not allow reading the same body multiple times, cloning the Response will not help either as it will delay the reading of the response until the slowest reader is done. To work around this you can either choose to wrap handling of the response (e.g. parsing) in the protected function as well, keep in mind any errors and delays in this process will amount to the error thresholds configured. This might not be suitable for complexer setups. Another option would be to flatten the response and revive it. This might come in handy when working with libraries that expect a Response object. Example below:
481+
482+
```js
483+
const flattenResponse = async (r) => ({
484+
arrayBuffer: await r.arrayBuffer(),
485+
init: {
486+
headers: r.headers,
487+
ok: r.ok,
488+
redirected: r.redirected,
489+
status: r.status,
490+
statusText: r.statusText,
491+
type: r.type,
492+
url: r.url,
493+
},
494+
});
495+
496+
const reviveResponse = (r) => new Response(r.arrayBuffer, r.init);
497+
```
498+
499+
Also note, Fetch doesn't fail on HTTP errors (e.g. 50x). If you want to protect your application from calling failing APIs, check the response status and throw errors accordingly.
500+
463501

464502
### Typings
465503

lib/cache.js

+9
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,15 @@ class MemoryCache {
4444
});
4545
}
4646

47+
/**
48+
* Delete cache key
49+
* @param {string} key Cache key
50+
* @return {void}
51+
*/
52+
delete (key) {
53+
this.cache.delete(key);
54+
}
55+
4756
/**
4857
* Clear cache
4958
* @returns {void}

lib/circuit.js

+28
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ Please use options.errorThresholdPercentage`;
111111
* in milliseconds. Set 0 for infinity cache. Default: same as options.timeout
112112
* @param {Number} options.coalesceSize the max amount of entries in the
113113
* coalescing cache. Default: max size of JS map (2^24).
114+
* @param {string[]} options.coalesceResetOn when to reset the coalesce cache.
115+
* Options: `error`, `success`, `timeout`. Default: not set, reset using TTL.
114116
* @param {AbortController} options.abortController this allows Opossum to
115117
* signal upon timeout and properly abort your on going requests instead of
116118
* leaving it in the background
@@ -193,6 +195,7 @@ class CircuitBreaker extends EventEmitter {
193195
this.options.rotateBucketController = options.rotateBucketController;
194196
this.options.coalesce = !!options.coalesce;
195197
this.options.coalesceTTL = options.coalesceTTL ?? this.options.timeout;
198+
this.options.coalesceResetOn = options.coalesceResetOn?.filter(o => ['error', 'success', 'timeout'].includes(o)) || [];
196199

197200
// Set default cache transport if not provided
198201
if (this.options.cache) {
@@ -743,6 +746,8 @@ class CircuitBreaker extends EventEmitter {
743746
*/
744747
this.emit('timeout', error, latency, args);
745748
handleError(error, this, timeout, args, latency, resolve, reject);
749+
resetCoalesce(this, cacheKey, 'timeout');
750+
746751
if (this.options.abortController) {
747752
this.options.abortController.abort();
748753
}
@@ -764,6 +769,7 @@ class CircuitBreaker extends EventEmitter {
764769
* @type {any} the return value from the circuit
765770
*/
766771
this.emit('success', result, (Date.now() - latencyStartTime));
772+
resetCoalesce(this, cacheKey, 'success');
767773
this.semaphore.release();
768774
resolve(result);
769775
if (this.options.cache) {
@@ -783,12 +789,14 @@ class CircuitBreaker extends EventEmitter {
783789
const latencyEndTime = Date.now() - latencyStartTime;
784790
handleError(
785791
error, this, timeout, args, latencyEndTime, resolve, reject);
792+
resetCoalesce(this, cacheKey, 'error');
786793
}
787794
});
788795
} catch (error) {
789796
this.semaphore.release();
790797
const latency = Date.now() - latencyStartTime;
791798
handleError(error, this, timeout, args, latency, resolve, reject);
799+
resetCoalesce(this, cacheKey, 'error');
792800
}
793801
} else {
794802
const latency = Date.now() - latencyStartTime;
@@ -801,6 +809,7 @@ class CircuitBreaker extends EventEmitter {
801809
*/
802810
this.emit('semaphoreLocked', err, latency);
803811
handleError(err, this, timeout, args, latency, resolve, reject);
812+
resetCoalesce(this, cacheKey);
804813
}
805814
});
806815

@@ -826,6 +835,10 @@ class CircuitBreaker extends EventEmitter {
826835
if (this.options.cache) {
827836
this.options.cacheTransport.flush();
828837
}
838+
839+
if (this.options.coalesceCache) {
840+
this.options.coalesceCache.flush();
841+
}
829842
}
830843

831844
/**
@@ -940,6 +953,7 @@ function handleError (error, circuit, timeout, args, latency, resolve, reject) {
940953
const fb = fallback(circuit, error, args);
941954
if (fb) return resolve(fb);
942955
}
956+
943957
// In all other cases, reject
944958
reject(error);
945959
}
@@ -983,6 +997,20 @@ function fail (circuit, err, args, latency) {
983997
}
984998
}
985999

1000+
function resetCoalesce (circuit, cacheKey, event) {
1001+
/**
1002+
* Reset coalesce cache for this cacheKey, depending on
1003+
* options.coalesceResetOn set.
1004+
* @param {@link CircuitBreaker} circuit what circuit is to be cleared
1005+
* @param {string} cacheKey cache key to clear.
1006+
* @param {string} event optional, can be `error`, `success`, `timeout`
1007+
* @returns {void}
1008+
*/
1009+
if (!event || circuit.options.coalesceResetOn.includes(event)) {
1010+
circuit.options.coalesceCache?.delete(cacheKey);
1011+
}
1012+
}
1013+
9861014
function buildError (msg, code) {
9871015
const error = new Error(msg);
9881016
error.code = code;

test/cache.js

+58
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,64 @@ test('Using coalesce cache only.', t => {
195195
.catch(t.fail);
196196
});
197197

198+
// Test coalesce coalesceResetOn.
199+
(function () {
200+
const options = {
201+
coalesce: true,
202+
timeout: 200,
203+
coalesceResetOn: ['error', 'success', 'timeout', 'foobar'],
204+
errorThresholdPercentage: 99,
205+
allowWarmUp: true
206+
};
207+
208+
test('coalesceResetOn: expect proper parsing of options', t => {
209+
t.plan(1);
210+
const breaker = new CircuitBreaker(passFail, options);
211+
t.same(breaker.options.coalesceResetOn, ['error', 'success', 'timeout']);
212+
t.end();
213+
});
214+
215+
test('coalesceResetOn: expect no hit after success', t => {
216+
t.plan(1);
217+
const breaker = new CircuitBreaker(passFail, options);
218+
breaker
219+
.fire(1)
220+
.then(() => {
221+
breaker.fire(1).then(() => {
222+
const stats = breaker.status.stats;
223+
t.equals(stats.coalesceCacheHits, 0, 'no hits to coalesce cache, it is reset when action succeeded.');
224+
t.end();
225+
});
226+
});
227+
});
228+
229+
test('coalesceResetOn: expect no hit after error', t => {
230+
t.plan(1);
231+
const breaker = new CircuitBreaker(passFail, options);
232+
breaker
233+
.fire(-1)
234+
.catch(() => {
235+
breaker.fire(1).then(() => {
236+
const stats = breaker.status.stats;
237+
t.equals(stats.coalesceCacheHits, 0, 'no hits to coalesce cache, it is reset when action failed.');
238+
t.end();
239+
});
240+
});
241+
});
242+
243+
test('coalesceResetOn: expect no hit after timeout', t => {
244+
t.plan(1);
245+
const timedBreaker = new CircuitBreaker(common.timedFunction, options);
246+
timedBreaker.fire(1000).catch(() => {
247+
timedBreaker.fire(1).then(() => {
248+
const stats = timedBreaker.status.stats;
249+
t.equals(stats.coalesceCacheHits, 0, 'no hits to coalesce cache, it is reset when action timed out.');
250+
t.end();
251+
});
252+
});
253+
});
254+
})();
255+
198256
test('No coalesce cache.', t => {
199257
t.plan(5);
200258
const breaker = new CircuitBreaker(passFail);

0 commit comments

Comments
 (0)