Skip to content

Commit c7da2bb

Browse files
cursoragentpablo
andcommitted
feat: Add multi-rule throttler and tests
Implement a multi-rule throttler with backward compatibility and add corresponding tests. Co-authored-by: pablo <pablo@ciudadela.eu>
1 parent 8eba307 commit c7da2bb

3 files changed

Lines changed: 185 additions & 6 deletions

File tree

ts/src/base/functions/throttle.ts

Lines changed: 103 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
import { now, sleep } from './time.js';
66
/* ------------------------------------------------------------------------ */
77

8+
// Multi-rule throttler with backward compatibility for single-rule configs
9+
// Rule fields: id, refillRate, delay (global), capacity, maxCapacity (global), tokens, cost
10+
811
class Throttler {
912
constructor (config) {
1013
this.config = {
@@ -16,16 +19,101 @@ class Throttler {
1619
'cost': 1.0,
1720
};
1821
Object.assign (this.config, config);
22+
// If multi-rule config provided, normalize rules
23+
this.rules = undefined;
24+
if (Array.isArray (this.config['rules'])) {
25+
// deep-clone minimal fields for internal use
26+
this.rules = this.config['rules'].map ((r, idx) => ({
27+
'id': (r['id'] !== undefined) ? r['id'] : ('rule' + idx.toString ()),
28+
'refillRate': (r['refillRate'] !== undefined) ? r['refillRate'] : this.config['refillRate'],
29+
'capacity': (r['capacity'] !== undefined) ? r['capacity'] : this.config['capacity'],
30+
'tokens': (r['tokens'] !== undefined) ? r['tokens'] : 0,
31+
'cost': (r['cost'] !== undefined) ? r['cost'] : this.config['cost'],
32+
}));
33+
// ensure stable ids
34+
const seen = {};
35+
for (let i = 0; i < this.rules.length; i++) {
36+
const id = this.rules[i]['id'];
37+
if (seen[id]) {
38+
this.rules[i]['id'] = this.rules[i]['id'] + ':' + i.toString ();
39+
}
40+
seen[this.rules[i]['id']] = true;
41+
}
42+
}
1943
this.queue = [];
2044
this.running = false;
2145
}
2246

47+
// determine if the head of queue can run given current tokens across rules
48+
canRunHead () {
49+
if (this.queue.length === 0) {
50+
return false;
51+
}
52+
const head = this.queue[0];
53+
if (!this.rules) { // legacy single-rule mode
54+
return this.config['tokens'] >= 0;
55+
}
56+
// multi-rule mode
57+
const costs = head.cost;
58+
for (let i = 0; i < this.rules.length; i++) {
59+
const rule = this.rules[i];
60+
const ruleId = rule['id'];
61+
let cost = 0;
62+
if (costs === undefined) {
63+
cost = rule['cost'];
64+
} else if (typeof costs === 'number') {
65+
// if a specific 'default' rule exists, apply to that; otherwise apply to all rules
66+
const hasDefault = this.rules.find ((r) => r['id'] === 'default') !== undefined;
67+
if (hasDefault) {
68+
cost = (ruleId === 'default') ? costs : 0;
69+
} else {
70+
cost = costs;
71+
}
72+
} else if (typeof costs === 'object') {
73+
cost = (costs[ruleId] !== undefined) ? costs[ruleId] : 0;
74+
}
75+
if (cost > 0 && rule['tokens'] < 0) {
76+
return false;
77+
}
78+
}
79+
return true;
80+
}
81+
2382
async loop () {
2483
let lastTimestamp = now ();
2584
while (this.running) {
85+
if (this.queue.length === 0) {
86+
this.running = false;
87+
break;
88+
}
2689
const { resolver, cost } = this.queue[0];
27-
if (this.config['tokens'] >= 0) {
28-
this.config['tokens'] -= cost;
90+
if (this.canRunHead ()) {
91+
// consume tokens and resolve
92+
if (!this.rules) {
93+
const consume = (cost === undefined) ? this.config['cost'] : cost;
94+
this.config['tokens'] -= consume;
95+
} else {
96+
for (let i = 0; i < this.rules.length; i++) {
97+
const rule = this.rules[i];
98+
const ruleId = rule['id'];
99+
let consume = 0;
100+
if (cost === undefined) {
101+
consume = rule['cost'];
102+
} else if (typeof cost === 'number') {
103+
const hasDefault = this.rules.find ((r) => r['id'] === 'default') !== undefined;
104+
if (hasDefault) {
105+
consume = (ruleId === 'default') ? cost : 0;
106+
} else {
107+
consume = cost;
108+
}
109+
} else if (typeof cost === 'object') {
110+
consume = (cost[ruleId] !== undefined) ? cost[ruleId] : 0;
111+
}
112+
if (consume !== 0) {
113+
rule['tokens'] -= consume;
114+
}
115+
}
116+
}
29117
resolver ();
30118
this.queue.shift ();
31119
// contextswitch
@@ -38,8 +126,16 @@ class Throttler {
38126
const current = now ();
39127
const elapsed = current - lastTimestamp;
40128
lastTimestamp = current;
41-
const tokens = this.config['tokens'] + (this.config['refillRate'] * elapsed);
42-
this.config['tokens'] = Math.min (tokens, this.config['capacity']);
129+
if (!this.rules) {
130+
const tokens = this.config['tokens'] + (this.config['refillRate'] * elapsed);
131+
this.config['tokens'] = Math.min (tokens, this.config['capacity']);
132+
} else {
133+
for (let i = 0; i < this.rules.length; i++) {
134+
const rule = this.rules[i];
135+
const tokens = rule['tokens'] + (rule['refillRate'] * elapsed);
136+
rule['tokens'] = Math.min (tokens, rule['capacity']);
137+
}
138+
}
43139
}
44140
}
45141
}
@@ -52,8 +148,9 @@ class Throttler {
52148
if (this.queue.length > this.config['maxCapacity']) {
53149
throw new Error ('throttle queue is over maxCapacity (' + this.config['maxCapacity'].toString () + '), see https://github.com/ccxt/ccxt/issues/11645#issuecomment-1195695526');
54150
}
55-
cost = (cost === undefined) ? this.config['cost'] : cost;
56-
this.queue.push ({ resolver, cost });
151+
// in multi-rule mode, cost can be a number or a map of ruleId->cost
152+
const effectiveCost = (cost === undefined) ? undefined : cost;
153+
this.queue.push ({ resolver, cost: effectiveCost });
57154
if (!this.running) {
58155
this.running = true;
59156
this.loop ();

ts/src/test/base/language_specific/test.languageSpecific.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import ccxt from '../../../../ccxt.js';
77
import testCamelCase from './test.camelcase.js';
88
import testUnCamelCase from './test.uncamelcase.js';
99
import testThrottle from './test.throttle.js';
10+
import testThrottleMultiRule from './test.throttle.multirule.js';
1011
import testCalculateFee from './test.calculateFee.js';
1112
import testAggregate from './test.aggregate.js';
1213
import testSafeBalance from './test.safeBalance.js';
@@ -20,6 +21,7 @@ function testLanguageSpecific () {
2021
testCamelCase ();
2122
testUnCamelCase ();
2223
testThrottle ();
24+
testThrottleMultiRule ();
2325
testCalculateFee ();
2426
testAggregate ();
2527
testSafeBalance ();
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
// @ts-nocheck
2+
/* eslint-disable */
3+
import { Throttler } from '../../../base/functions/throttle.js'
4+
5+
function testThrottleMultiRule () {
6+
const delta = 15
7+
8+
const testCases = [
9+
{
10+
name: 'two-rules-same-cost-slowest-dominates',
11+
rules: [
12+
{ id: 'default', tokens: 0, refillRate: 1 / 50, cost: 1, capacity: 1 },
13+
{ id: 'orders', tokens: 0, refillRate: 1 / 80, cost: 1, capacity: 1 },
14+
],
15+
runs: 40,
16+
cost: { default: 1, orders: 1 },
17+
},
18+
{
19+
name: 'two-rules-different-costs',
20+
rules: [
21+
{ id: 'default', tokens: 0, refillRate: 1 / 30, cost: 1, capacity: 1 },
22+
{ id: 'weight', tokens: 0, refillRate: 1 / 60, cost: 2, capacity: 1 },
23+
],
24+
runs: 25,
25+
cost: { default: 1, weight: 2 },
26+
},
27+
{
28+
name: 'number-cost-applies-to-all-when-no-default',
29+
rules: [
30+
{ id: 'ip', tokens: 0, refillRate: 1 / 40, cost: 1, capacity: 1 },
31+
{ id: 'uid', tokens: 0, refillRate: 1 / 70, cost: 1, capacity: 1 },
32+
],
33+
runs: 30,
34+
cost: 1,
35+
},
36+
]
37+
38+
function expectedMs (test) {
39+
// initial run happens immediately, then bottleneck per-run time is max(cost_i / refillRate_i)
40+
const perRuleTimes = []
41+
for (const rule of test.rules) {
42+
let c = 0
43+
if (typeof test.cost === 'number') {
44+
c = test.cost
45+
} else if (typeof test.cost === 'object') {
46+
c = (test.cost[rule.id] !== undefined) ? test.cost[rule.id] : 0
47+
}
48+
if (c > 0) {
49+
perRuleTimes.push(c / rule.refillRate)
50+
}
51+
}
52+
const perRun = perRuleTimes.length ? Math.max(...perRuleTimes) : 0
53+
const runsAfterFirst = Math.max(0, test.runs - 1)
54+
return runsAfterFirst * perRun
55+
}
56+
57+
async function runner (test) {
58+
const throttler = new Throttler({
59+
rules: test.rules,
60+
delay: 0.001,
61+
maxCapacity: 2000,
62+
})
63+
const start = performance.now()
64+
for (let i = 0; i < test.runs; i++) {
65+
await throttler.throttle(test.cost)
66+
}
67+
const end = performance.now()
68+
const elapsed = end - start
69+
const expected = expectedMs(test)
70+
const ok = Math.abs(elapsed - expected) < delta
71+
console.log(`multi ${test.name} ${ok ? 'succeeded' : 'failed'} in ${elapsed}ms expected ${expected}ms`)
72+
}
73+
74+
for (const test of testCases) {
75+
runner(test)
76+
}
77+
}
78+
79+
export default testThrottleMultiRule
80+

0 commit comments

Comments
 (0)