Skip to content

Commit 7c1a161

Browse files
authored
Revert rate limiter changes (#6237)
Signed-off-by: Andrey Sobolev <[email protected]>
1 parent 29b082f commit 7c1a161

File tree

1 file changed

+10
-14
lines changed

1 file changed

+10
-14
lines changed

Diff for: packages/core/src/utils.ts

+10-14
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,8 @@
1313
// limitations under the License.
1414
//
1515

16-
import { getEmbeddedLabel, IntlString, PlatformError, unknownError } from '@hcengineering/platform'
16+
import { getEmbeddedLabel, IntlString } from '@hcengineering/platform'
1717
import { deepEqual } from 'fast-equals'
18-
import { DOMAIN_BENCHMARK } from './benchmark'
1918
import {
2019
Account,
2120
AccountRole,
@@ -47,6 +46,7 @@ import { TxOperations } from './operations'
4746
import { isPredicate } from './predicate'
4847
import { DocumentQuery, FindResult } from './storage'
4948
import { DOMAIN_TX } from './tx'
49+
import { DOMAIN_BENCHMARK } from './benchmark'
5050

5151
function toHex (value: number, chars: number): string {
5252
const result = value.toString(16)
@@ -355,6 +355,7 @@ export class DocManager<T extends Doc> implements IDocManager<T> {
355355

356356
export class RateLimiter {
357357
idCounter: number = 0
358+
processingQueue = new Map<number, Promise<void>>()
358359
last: number = 0
359360
rate: number
360361

@@ -365,21 +366,21 @@ export class RateLimiter {
365366
}
366367

367368
notify: (() => void)[] = []
368-
finished: boolean = false
369369

370370
async exec<T, B extends Record<string, any> = any>(op: (args?: B) => Promise<T>, args?: B): Promise<T> {
371-
if (this.finished) {
372-
throw new PlatformError(unknownError('No Possible to add/exec on finished queue'))
373-
}
374-
while (this.notify.length >= this.rate) {
371+
const processingId = this.idCounter++
372+
373+
while (this.processingQueue.size >= this.rate) {
375374
await new Promise<void>((resolve) => {
376375
this.notify.push(resolve)
377376
})
378377
}
379378
try {
380379
const p = op(args)
380+
this.processingQueue.set(processingId, p as Promise<void>)
381381
return await p
382382
} finally {
383+
this.processingQueue.delete(processingId)
383384
const n = this.notify.shift()
384385
if (n !== undefined) {
385386
n()
@@ -388,20 +389,15 @@ export class RateLimiter {
388389
}
389390

390391
async add<T, B extends Record<string, any> = any>(op: (args?: B) => Promise<T>, args?: B): Promise<void> {
391-
if (this.notify.length < this.rate) {
392+
if (this.processingQueue.size < this.rate) {
392393
void this.exec(op, args)
393394
} else {
394395
await this.exec(op, args)
395396
}
396397
}
397398

398399
async waitProcessing (): Promise<void> {
399-
this.finished = true
400-
while (this.notify.length > 0) {
401-
await new Promise<void>((resolve) => {
402-
this.notify.push(resolve)
403-
})
404-
}
400+
await Promise.all(this.processingQueue.values())
405401
}
406402
}
407403

0 commit comments

Comments
 (0)