Skip to content

Commit 38718fb

Browse files
authored
Merge pull request #12 from AthennaIO/develop
fix(target): atomically update buckets with lock
2 parents c7b0eca + 31c1e81 commit 38718fb

5 files changed

Lines changed: 256 additions & 84 deletions

File tree

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@athenna/ratelimiter",
3-
"version": "5.10.0",
3+
"version": "5.11.0",
44
"description": "Respect the rate limit rules of API's you need to consume.",
55
"license": "MIT",
66
"author": "João Lenon <lenon@athenna.io>",

src/ratelimiter/RateLimitStore.ts

Lines changed: 183 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -183,34 +183,39 @@ export class RateLimitStore extends Macroable {
183183
key
184184
)
185185

186-
const now = Date.now()
187-
const cache = Cache.store(this.options.store)
188-
const buckets = await this.getOrInit(key, rules)
189-
const ruleIndex = rules.findIndex(rule => rule.type === ruleType)
186+
await this.runWithLock(key, async () => {
187+
const now = Date.now()
188+
const cache = Cache.store(this.options.store)
189+
const buckets = await this.getOrInit(key, rules)
190+
const ruleIndex = rules.findIndex(rule => rule.type === ruleType)
190191

191-
if (ruleIndex === -1) {
192-
debug('rule type %s not found for key %s', ruleType, key)
193-
return
194-
}
192+
if (ruleIndex === -1) {
193+
debug('rule type %s not found for key %s', ruleType, key)
194+
return
195+
}
195196

196-
const rule = rules[ruleIndex]
197-
const bucket = buckets[ruleIndex]
198-
const used = Math.max(0, rule.limit - remaining)
197+
const rule = rules[ruleIndex]
198+
const bucket = buckets[ruleIndex]
199+
const window = this.options.windowMs[rule.type]
199200

200-
bucket.length = 0
201+
this.pruneExpiredEntries(bucket, window, now)
201202

202-
for (let i = 0; i < used; i++) {
203-
bucket.push(now)
204-
}
203+
const resetAt = bucket.length ? bucket[0] + window : now + window
204+
const boundedRemaining = this.normalizeRemaining(remaining, rule.limit)
205+
const used = rule.limit - boundedRemaining
205206

206-
await cache.set(key, JSON.stringify(buckets))
207+
this.rebuildBucket(bucket, used, resetAt, window)
207208

208-
debug(
209-
'updated bucket for rule type %s: %d used, %d remaining',
210-
ruleType,
211-
used,
212-
remaining
213-
)
209+
await cache.set(key, JSON.stringify(buckets))
210+
211+
debug(
212+
'updated bucket for rule type %s: %d used, %d remaining, resetAt %d',
213+
ruleType,
214+
used,
215+
boundedRemaining,
216+
resetAt
217+
)
218+
})
214219
}
215220

216221
/**
@@ -323,7 +328,8 @@ export class RateLimitStore extends Macroable {
323328

324329
/**
325330
* Manually update the reset time for a specific rule type based on API headers.
326-
* This shifts all timestamps in the bucket to align with the API's reset schedule.
331+
* This rebuilds the bucket so its reset window matches the external schedule
332+
* without changing the current used count.
327333
*/
328334
public async setResetAt(
329335
key: string,
@@ -339,85 +345,114 @@ export class RateLimitStore extends Macroable {
339345
key
340346
)
341347

342-
const now = Date.now()
343-
const cache = Cache.store(this.options.store)
344-
const buckets = await this.getOrInit(key, rules)
345-
const ruleIndex = rules.findIndex(rule => rule.type === ruleType)
348+
await this.runWithLock(key, async () => {
349+
const now = Date.now()
350+
const cache = Cache.store(this.options.store)
351+
const buckets = await this.getOrInit(key, rules)
352+
const ruleIndex = rules.findIndex(rule => rule.type === ruleType)
346353

347-
if (ruleIndex === -1) {
348-
debug('rule type %s not found for key %s', ruleType, key)
349-
return
350-
}
354+
if (ruleIndex === -1) {
355+
debug('rule type %s not found for key %s', ruleType, key)
356+
return
357+
}
351358

352-
const rule = rules[ruleIndex]
353-
const bucket = buckets[ruleIndex]
354-
const window = this.options.windowMs[rule.type]
359+
const rule = rules[ruleIndex]
360+
const bucket = buckets[ruleIndex]
361+
const window = this.options.windowMs[rule.type]
355362

356-
while (bucket.length && bucket[0] <= now - window) {
357-
bucket.shift()
358-
}
363+
this.pruneExpiredEntries(bucket, window, now)
359364

360-
if (bucket.length === 0) {
361-
debug('bucket empty, nothing to shift')
362-
return
363-
}
365+
if (bucket.length === 0) {
366+
debug('bucket empty, nothing to shift')
367+
return
368+
}
364369

365-
const maxSeconds = 365 * 24 * 60 * 60
370+
const targetResetAt = this.getTargetResetAt(now, secondsUntilReset)
366371

367-
if (secondsUntilReset < 0 || secondsUntilReset > maxSeconds) {
368-
debug(
369-
'invalid secondsUntilReset (%d), must be between 0 and %d',
370-
secondsUntilReset,
371-
maxSeconds
372-
)
372+
if (!targetResetAt) {
373+
return
374+
}
373375

374-
return
375-
}
376+
this.rebuildBucket(bucket, bucket.length, targetResetAt, window)
376377

377-
const targetResetAt = now + secondsUntilReset * 1000
378-
const earliestTimestamp = bucket[0]
379-
const oneYearAgo = now - 365 * 86_400_000
380-
const oneYearFromNow = now + 365 * 86_400_000
378+
await cache.set(key, JSON.stringify(buckets))
381379

382-
if (earliestTimestamp < oneYearAgo || earliestTimestamp > oneYearFromNow) {
383380
debug(
384-
'corrupted timestamp detected in bucket (%d), skipping shift',
385-
earliestTimestamp
381+
'rebuilt bucket for rule type %s with %d used requests and resetAt %d',
382+
ruleType,
383+
bucket.length,
384+
targetResetAt
386385
)
386+
})
387+
}
387388

388-
return
389-
}
390-
391-
const currentResetAt = earliestTimestamp + window
392-
const timeDiff = targetResetAt - currentResetAt
389+
/**
390+
* Atomically sync the current rate limit state for a specific rule type.
391+
* This is the safest way to mirror external API headers because it updates
392+
* the bucket count and reset window in a single locked operation.
393+
*/
394+
public async syncState(
395+
key: string,
396+
ruleType: RateLimitRule['type'],
397+
state: {
398+
remaining: number
399+
secondsUntilReset?: number
400+
},
401+
rules: RateLimitRule[]
402+
) {
403+
debug(
404+
'syncing rate limit state for rule type %s in %s store for key %s with state %o',
405+
ruleType,
406+
this.options.store,
407+
key,
408+
state
409+
)
393410

394-
if (Math.abs(timeDiff) > 365 * 86_400_000) {
395-
debug(
396-
'time difference too large (%d ms), skipping shift to prevent corruption',
397-
timeDiff
398-
)
411+
await this.runWithLock(key, async () => {
412+
const now = Date.now()
413+
const cache = Cache.store(this.options.store)
414+
const buckets = await this.getOrInit(key, rules)
415+
const ruleIndex = rules.findIndex(rule => rule.type === ruleType)
399416

400-
return
401-
}
417+
if (ruleIndex === -1) {
418+
debug('rule type %s not found for key %s', ruleType, key)
419+
return
420+
}
402421

403-
for (let i = 0; i < bucket.length; i++) {
404-
const newTimestamp = bucket[i] + timeDiff
422+
const rule = rules[ruleIndex]
423+
const bucket = buckets[ruleIndex]
424+
const window = this.options.windowMs[rule.type]
405425

406-
if (newTimestamp < oneYearAgo || newTimestamp > oneYearFromNow + window) {
407-
debug(
408-
'shifted timestamp would be invalid (%d), aborting operation',
409-
newTimestamp
410-
)
426+
this.pruneExpiredEntries(bucket, window, now)
411427

428+
const boundedRemaining = this.normalizeRemaining(
429+
state.remaining,
430+
rule.limit
431+
)
432+
const used = rule.limit - boundedRemaining
433+
const targetResetAt =
434+
state.secondsUntilReset !== undefined
435+
? this.getTargetResetAt(now, state.secondsUntilReset)
436+
: bucket.length
437+
? bucket[0] + window
438+
: now + window
439+
440+
if (!targetResetAt) {
412441
return
413442
}
414443

415-
bucket[i] = newTimestamp
416-
}
444+
this.rebuildBucket(bucket, used, targetResetAt, window)
417445

418-
await cache.set(key, JSON.stringify(buckets))
446+
await cache.set(key, JSON.stringify(buckets))
419447

420-
debug('shifted timestamps by %d ms for rule type %s', timeDiff, ruleType)
448+
debug(
449+
'synced bucket for rule type %s: %d used, %d remaining, resetAt %d',
450+
ruleType,
451+
used,
452+
boundedRemaining,
453+
targetResetAt
454+
)
455+
})
421456
}
422457

423458
/**
@@ -503,4 +538,72 @@ export class RateLimitStore extends Macroable {
503538
debug('error releasing lock for key %s: %o', lockKey, error)
504539
}
505540
}
541+
542+
/**
543+
* Serialize write operations for a target key so remaining and resetAt
544+
* updates cannot corrupt each other when requests finish concurrently.
545+
*/
546+
private async runWithLock(key: string, callback: () => Promise<any>) {
547+
const lockAcquired = await this.acquireLock(key)
548+
549+
if (!lockAcquired) {
550+
debug('failed to acquire mutation lock for key %s', key)
551+
return
552+
}
553+
554+
try {
555+
await callback()
556+
} finally {
557+
await this.releaseLock(key)
558+
}
559+
}
560+
561+
private pruneExpiredEntries(bucket: number[], window: number, now: number) {
562+
while (bucket.length && bucket[0] <= now - window) {
563+
bucket.shift()
564+
}
565+
}
566+
567+
private normalizeRemaining(remaining: number, limit: number) {
568+
if (!Number.isFinite(remaining)) {
569+
return limit
570+
}
571+
572+
return Math.min(limit, Math.max(0, Math.trunc(remaining)))
573+
}
574+
575+
private getTargetResetAt(now: number, secondsUntilReset: number) {
576+
const maxSeconds = 365 * 24 * 60 * 60
577+
578+
if (
579+
!Number.isFinite(secondsUntilReset) ||
580+
secondsUntilReset < 0 ||
581+
secondsUntilReset > maxSeconds
582+
) {
583+
debug(
584+
'invalid secondsUntilReset (%d), must be between 0 and %d',
585+
secondsUntilReset,
586+
maxSeconds
587+
)
588+
589+
return null
590+
}
591+
592+
return now + Math.trunc(secondsUntilReset * 1000)
593+
}
594+
595+
private rebuildBucket(
596+
bucket: number[],
597+
used: number,
598+
resetAt: number,
599+
window: number
600+
) {
601+
const timestamp = resetAt - window
602+
603+
bucket.length = 0
604+
605+
for (let i = 0; i < used; i++) {
606+
bucket.push(timestamp)
607+
}
608+
}
506609
}

src/ratelimiter/RateLimitTarget.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,8 @@ export class RateLimitTarget extends Macroable {
102102

103103
/**
104104
* Manually update the reset time for a specific rule type based on API headers.
105-
* This shifts all timestamps in the bucket to align with the API's reset schedule.
105+
* This rebuilds the bucket so its reset window matches the external schedule
106+
* without changing the current used count.
106107
*/
107108
public async updateResetAt(seconds: number, type: RateLimitRule['type']) {
108109
await this.options.store!.setResetAt(
@@ -112,4 +113,18 @@ export class RateLimitTarget extends Macroable {
112113
this.rules
113114
)
114115
}
116+
117+
/**
118+
* Atomically sync the target rate limit state from external metadata, such
119+
* as API headers, so remaining and resetAt stay aligned.
120+
*/
121+
public async syncState(
122+
type: RateLimitRule['type'],
123+
state: {
124+
remaining: number
125+
secondsUntilReset?: number
126+
}
127+
) {
128+
await this.options.store!.syncState(this.getKey(), type, state, this.rules)
129+
}
115130
}

0 commit comments

Comments
 (0)