Skip to content

Commit bd5f61e

Browse files
committed
fix: worker index identification at tasks stealing under back pressure
Signed-off-by: Jérôme Benoit <jerome.benoit@piment-noir.org>
1 parent 3dcd2cf commit bd5f61e

6 files changed

Lines changed: 81 additions & 85 deletions

File tree

deno.json

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@
33
"version": "0.5.8",
44
"exports": "./src/mod.ts",
55
"compilerOptions": {
6-
"lib": [
7-
"deno.worker"
8-
],
6+
"lib": ["deno.worker"],
97
"strict": true
108
},
119
"tasks": {
@@ -27,9 +25,7 @@
2725
"documentation": "deno doc ./src/mod.ts"
2826
},
2927
"test": {
30-
"include": [
31-
"./tests/**/*.test.mjs"
32-
]
28+
"include": ["./tests/**/*.test.mjs"]
3329
},
3430
"fmt": {
3531
"semiColons": false,
@@ -42,18 +38,8 @@
4238
"@std/testing": "jsr:@std/testing@^1.0.15"
4339
},
4440
"publish": {
45-
"include": [
46-
"LICENSE",
47-
"README.md",
48-
"deno.json",
49-
"src/**/*.ts"
50-
]
41+
"include": ["LICENSE", "README.md", "deno.json", "src/**/*.ts"]
5142
},
5243
"lock": false,
53-
"exclude": [
54-
"./coverage",
55-
"./dist/browser",
56-
"./dist/esm",
57-
"./npm"
58-
]
44+
"exclude": ["./coverage", "./dist/browser", "./dist/esm", "./npm"]
5945
}

src/pools/abstract-pool.ts

Lines changed: 52 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,8 @@ export abstract class AbstractPool<
300300

301301
/** @inheritDoc */
302302
public get info(): PoolInfo {
303+
const taskStatisticsRequirements = this.workerChoiceStrategiesContext
304+
?.getTaskStatisticsRequirements()
303305
return {
304306
version,
305307
type: this.type,
@@ -310,13 +312,10 @@ export abstract class AbstractPool<
310312
strategyRetries: this.workerChoiceStrategiesContext?.retriesCount ?? 0,
311313
minSize: this.minimumNumberOfWorkers,
312314
maxSize: this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers,
313-
...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
314-
.runTime.aggregate === true &&
315-
this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
316-
.waitTime.aggregate &&
317-
{
318-
utilization: round(this.utilization),
319-
}),
315+
...(taskStatisticsRequirements?.runTime.aggregate === true &&
316+
taskStatisticsRequirements.waitTime.aggregate && {
317+
utilization: round(this.utilization),
318+
}),
320319
workerNodes: this.workerNodes.length,
321320
...(this.type === PoolTypes.dynamic && {
322321
dynamicWorkerNodes: this.workerNodes.reduce(
@@ -382,8 +381,7 @@ export abstract class AbstractPool<
382381
0,
383382
),
384383
}),
385-
...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
386-
.runTime.aggregate === true && {
384+
...(taskStatisticsRequirements?.runTime.aggregate === true && {
387385
runTime: {
388386
minimum: round(
389387
min(
@@ -401,8 +399,7 @@ export abstract class AbstractPool<
401399
),
402400
),
403401
),
404-
...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
405-
.runTime.average && {
402+
...(taskStatisticsRequirements?.runTime.average && {
406403
average: round(
407404
average(
408405
this.workerNodes.reduce<number[]>(
@@ -415,8 +412,7 @@ export abstract class AbstractPool<
415412
),
416413
),
417414
}),
418-
...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
419-
.runTime.median && {
415+
...(taskStatisticsRequirements?.runTime.median && {
420416
median: round(
421417
median(
422418
this.workerNodes.reduce<number[]>(
@@ -431,8 +427,7 @@ export abstract class AbstractPool<
431427
}),
432428
},
433429
}),
434-
...(this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
435-
.waitTime.aggregate === true && {
430+
...(taskStatisticsRequirements?.waitTime.aggregate === true && {
436431
waitTime: {
437432
minimum: round(
438433
min(
@@ -450,8 +445,7 @@ export abstract class AbstractPool<
450445
),
451446
),
452447
),
453-
...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
454-
.waitTime.average && {
448+
...(taskStatisticsRequirements?.waitTime.average && {
455449
average: round(
456450
average(
457451
this.workerNodes.reduce<number[]>(
@@ -464,8 +458,7 @@ export abstract class AbstractPool<
464458
),
465459
),
466460
}),
467-
...(this.workerChoiceStrategiesContext.getTaskStatisticsRequirements()
468-
.waitTime.median && {
461+
...(taskStatisticsRequirements?.waitTime.median && {
469462
median: round(
470463
median(
471464
this.workerNodes.reduce<number[]>(
@@ -514,6 +507,9 @@ export abstract class AbstractPool<
514507
}
515508
const poolTimeCapacity = (performance.now() - this.startTimestamp) *
516509
(this.maximumNumberOfWorkers ?? this.minimumNumberOfWorkers)
510+
if (!Number.isFinite(poolTimeCapacity) || poolTimeCapacity <= 0) {
511+
return 0
512+
}
517513
const totalTasksRunTime = this.workerNodes.reduce(
518514
(accumulator, workerNode) =>
519515
accumulator + (workerNode.usage.runTime.aggregate ?? 0),
@@ -747,6 +743,7 @@ export abstract class AbstractPool<
747743
* @returns Worker nodes busyness boolean status.
748744
*/
749745
protected internalBusy(): boolean {
746+
if (this.workerNodes.length === 0) return false
750747
return (
751748
this.workerNodes.reduce(
752749
(accumulator, _, workerNodeKey) =>
@@ -855,12 +852,12 @@ export abstract class AbstractPool<
855852
message: MessageValue<Data>,
856853
): Promise<boolean> {
857854
const targetWorkerNodeKeys = [...this.workerNodes.keys()]
855+
const responsesReceived: MessageValue<Response>[] = []
858856
const taskFunctionOperationsListener = (
859857
message: MessageValue<Response>,
860858
resolve: (value: boolean | PromiseLike<boolean>) => void,
861859
reject: (reason?: unknown) => void,
862860
): void => {
863-
const responsesReceived: MessageValue<Response>[] = []
864861
this.checkMessageWorkerId(message)
865862
if (
866863
message.taskFunctionOperationStatus != null &&
@@ -1119,7 +1116,7 @@ export abstract class AbstractPool<
11191116
data?: Data,
11201117
name?: string,
11211118
abortSignal?: AbortSignal,
1122-
transferList?: Transferable[],
1119+
transferList?: readonly Transferable[],
11231120
): Promise<Response> {
11241121
return await new Promise<Response>((resolve, reject) => {
11251122
const timestamp = performance.now()
@@ -1572,32 +1569,25 @@ export abstract class AbstractPool<
15721569
* @param workerNode - The worker node.
15731570
*/
15741571
private initWorkerNodeUsage(workerNode: IWorkerNode<Worker, Data>): void {
1575-
if (
1576-
this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
1577-
.runTime.aggregate === true
1578-
) {
1572+
const taskStatisticsRequirements = this.workerChoiceStrategiesContext
1573+
?.getTaskStatisticsRequirements()
1574+
if (taskStatisticsRequirements?.runTime.aggregate === true) {
15791575
workerNode.usage.runTime.aggregate = min(
15801576
...this.workerNodes.map(
15811577
(workerNode) =>
15821578
workerNode.usage.runTime.aggregate ?? Number.POSITIVE_INFINITY,
15831579
),
15841580
)
15851581
}
1586-
if (
1587-
this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
1588-
.waitTime.aggregate === true
1589-
) {
1582+
if (taskStatisticsRequirements?.waitTime.aggregate === true) {
15901583
workerNode.usage.waitTime.aggregate = min(
15911584
...this.workerNodes.map(
15921585
(workerNode) =>
15931586
workerNode.usage.waitTime.aggregate ?? Number.POSITIVE_INFINITY,
15941587
),
15951588
)
15961589
}
1597-
if (
1598-
this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements().elu
1599-
.aggregate === true
1600-
) {
1590+
if (taskStatisticsRequirements?.elu.aggregate === true) {
16011591
workerNode.usage.elu.active.aggregate = min(
16021592
...this.workerNodes.map(
16031593
(workerNode) =>
@@ -1822,13 +1812,12 @@ export abstract class AbstractPool<
18221812
* @param workerNodeKey - The worker node key.
18231813
*/
18241814
private sendStatisticsMessageToWorker(workerNodeKey: number): void {
1815+
const taskStatisticsRequirements = this.workerChoiceStrategiesContext
1816+
?.getTaskStatisticsRequirements()
18251817
this.sendToWorker(workerNodeKey, {
18261818
statistics: {
1827-
runTime:
1828-
this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
1829-
.runTime.aggregate ?? false,
1830-
// elu: this.workerChoiceStrategiesContext?.getTaskStatisticsRequirements()
1831-
// .elu.aggregate ?? false,
1819+
runTime: taskStatisticsRequirements?.runTime.aggregate ?? false,
1820+
// elu: taskStatisticsRequirements?.elu.aggregate ?? false,
18321821
},
18331822
})
18341823
}
@@ -1857,15 +1846,22 @@ export abstract class AbstractPool<
18571846
while (this.tasksQueueSize(sourceWorkerNodeKey) > 0) {
18581847
const destinationWorkerNodeKey = this.workerNodes.reduce(
18591848
(minWorkerNodeKey, workerNode, workerNodeKey, workerNodes) => {
1860-
return workerNodeKey !== sourceWorkerNodeKey &&
1861-
workerNode.info.ready &&
1862-
workerNode.usage.tasks.queued <
1863-
workerNodes[minWorkerNodeKey].usage.tasks.queued
1849+
if (workerNodeKey === sourceWorkerNodeKey || !workerNode.info.ready) {
1850+
return minWorkerNodeKey
1851+
}
1852+
if (minWorkerNodeKey === -1) {
1853+
return workerNodeKey
1854+
}
1855+
return workerNode.usage.tasks.queued <
1856+
workerNodes[minWorkerNodeKey].usage.tasks.queued
18641857
? workerNodeKey
18651858
: minWorkerNodeKey
18661859
},
1867-
0,
1860+
-1,
18681861
)
1862+
if (destinationWorkerNodeKey === -1) {
1863+
break
1864+
}
18691865
this.handleTask(
18701866
destinationWorkerNodeKey,
18711867
this.dequeueTask(sourceWorkerNodeKey) as Task<Data>,
@@ -1961,7 +1957,12 @@ export abstract class AbstractPool<
19611957
}
19621958
destinationWorkerNode.info.stealing = true
19631959
sourceWorkerNode.info.stolen = true
1964-
const stolenTask = sourceWorkerNode.dequeueLastPrioritizedTask()!
1960+
const stolenTask = sourceWorkerNode.dequeueLastPrioritizedTask()
1961+
if (stolenTask == null) {
1962+
sourceWorkerNode.info.stolen = false
1963+
destinationWorkerNode.info.stealing = false
1964+
return
1965+
}
19651966
sourceWorkerNode.info.stolen = false
19661967
destinationWorkerNode.info.stealing = false
19671968
this.handleTask(destinationWorkerNodeKey, stolenTask)
@@ -2038,15 +2039,17 @@ export abstract class AbstractPool<
20382039
private readonly workerNodeStealTask = (
20392040
workerNodeKey: number,
20402041
): Task<Data> | undefined => {
2042+
const workerNode = this.workerNodes[workerNodeKey]
2043+
if (workerNode == null) return
20412044
const workerNodes = this.workerNodes
20422045
.slice()
20432046
.sort(
20442047
(workerNodeA, workerNodeB) =>
20452048
workerNodeB.usage.tasks.queued - workerNodeA.usage.tasks.queued,
20462049
)
20472050
const sourceWorkerNode = workerNodes.find(
2048-
(sourceWorkerNode, sourceWorkerNodeKey) =>
2049-
sourceWorkerNodeKey !== workerNodeKey &&
2051+
(sourceWorkerNode) =>
2052+
sourceWorkerNode !== workerNode &&
20502053
sourceWorkerNode.usage.tasks.queued > 0,
20512054
)
20522055
if (sourceWorkerNode != null) {
@@ -2080,7 +2083,7 @@ export abstract class AbstractPool<
20802083
(workerNodeA, workerNodeB) =>
20812084
workerNodeA.usage.tasks.queued - workerNodeB.usage.tasks.queued,
20822085
)
2083-
for (const [workerNodeKey, workerNode] of workerNodes.entries()) {
2086+
for (const workerNode of workerNodes) {
20842087
if (sourceWorkerNode.usage.tasks.queued === 0) {
20852088
break
20862089
}
@@ -2090,6 +2093,7 @@ export abstract class AbstractPool<
20902093
workerNode.usage.tasks.queued <
20912094
this.opts.tasksQueueOptions!.size! - sizeOffset
20922095
) {
2096+
const workerNodeKey = this.workerNodes.indexOf(workerNode)
20932097
workerNode.info.backPressureStealing = true
20942098
this.stealTask(sourceWorkerNode, workerNodeKey)
20952099
workerNode.info.backPressureStealing = false
@@ -2392,6 +2396,7 @@ export abstract class AbstractPool<
23922396
* @returns Worker nodes back pressure boolean status.
23932397
*/
23942398
protected internalBackPressure(): boolean {
2399+
if (this.workerNodes.length === 0) return false
23952400
return (
23962401
this.workerNodes.reduce(
23972402
(accumulator, _, workerNodeKey) =>

src/utility-types.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ export interface Task<Data = unknown> {
124124
/**
125125
* Array of transferable objects.
126126
*/
127-
readonly transferList?: Transferable[]
127+
readonly transferList?: readonly Transferable[]
128128
/**
129129
* Timestamp.
130130
*/

src/utils.ts

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,10 @@ export const exponentialDelay = (
6262
* @internal
6363
*/
6464
export const average = (dataSet: number[]): number => {
65-
if (Array.isArray(dataSet) && dataSet.length === 0) {
65+
if (!Array.isArray(dataSet) || dataSet.length === 0) {
6666
return 0
6767
}
68-
if (Array.isArray(dataSet) && dataSet.length === 1) {
68+
if (dataSet.length === 1) {
6969
return dataSet[0]
7070
}
7171
return (
@@ -82,10 +82,10 @@ export const average = (dataSet: number[]): number => {
8282
* @internal
8383
*/
8484
export const median = (dataSet: number[]): number => {
85-
if (Array.isArray(dataSet) && dataSet.length === 0) {
85+
if (!Array.isArray(dataSet) || dataSet.length === 0) {
8686
return 0
8787
}
88-
if (Array.isArray(dataSet) && dataSet.length === 1) {
88+
if (dataSet.length === 1) {
8989
return dataSet[0]
9090
}
9191
const sortedDataSet = dataSet.slice().sort((a, b) => a - b)
@@ -98,7 +98,6 @@ export const median = (dataSet: number[]): number => {
9898

9999
/**
100100
* Rounds the given number to the given scale.
101-
* The rounding is done using the "round half away from zero" method.
102101
*
103102
* @param num - The number to round.
104103
* @param scale - The scale to round to.
@@ -107,7 +106,7 @@ export const median = (dataSet: number[]): number => {
107106
*/
108107
export const round = (num: number, scale = 2): number => {
109108
const rounder = 10 ** scale
110-
return Math.round(num * rounder * (1 + Number.EPSILON)) / rounder
109+
return Math.round((num + Math.sign(num) * Number.EPSILON) * rounder) / rounder
111110
}
112111

113112
/**

0 commit comments

Comments
 (0)