Skip to content

Commit 5a11738

Browse files
committed
chore: update
1 parent 4912ca2 commit 5a11738

File tree

5 files changed

+35
-15
lines changed

5 files changed

+35
-15
lines changed

plugins/scheduler/state/mod.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ export namespace $ScheduleState {
3535
}
3636

3737
export const Config: z<Config> = z.object({
38-
mode: z.union(['work-steal', 'semaphore']).default('semaphore'),
38+
mode: z.union(['work-steal', 'semaphore']).default('work-steal'),
3939
cap: z.number().min(1).max(99999),
4040
period: z.number().min(10).default(1000),
4141
})

plugins/scheduler/state/semaphore/mod.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import util from 'node:util'
22
import type { Context } from '@cordisjs/core'
33
import { symbols } from '@cordisjs/core'
4-
import type { Awaitable, Promisify } from 'cosmokit'
4+
import type { Promisify } from 'cosmokit'
55
import z from 'schemastery'
66
import { Tracker } from '../../tracker.ts'
77
import { after } from '../../utils.ts'

plugins/scheduler/state/worksteal/fate.ts

+11-4
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ export class WorkerFate {
88
#decision: Promise<symbol>
99
#resolve: (decision: symbol) => void
1010

11+
#reflection: WorkerFate | null = null
12+
1113
static decided(fate: WorkerFate | symbol) {
1214
if (typeof fate === 'symbol') return new WorkerFate(fate)
1315
return fate
@@ -21,17 +23,22 @@ export class WorkerFate {
2123
}
2224

2325
async found(): Promise<symbol> {
26+
const self = this.#reflection || this
27+
if (self) return await self.#decision
2428
return await this.#decision
2529
}
2630

2731
decide(decision: symbol, next?: Promise<WorkerFate>) {
28-
this.#resolve(decision)
29-
this.#nexted = next ?? Promise.resolve(this)
32+
const self = this.#reflection || this
33+
self.#resolve(decision)
34+
if (next) self.#nexted = next
3035
}
3136

3237
async next(): Promise<WorkerFate> {
33-
await this.#decision
34-
return this.#nexted || Promise.resolve(this)
38+
const self = this.#reflection || this
39+
await self.#decision
40+
if (self.#nexted) this.#reflection = await self.#nexted
41+
return self.#nexted || Promise.resolve(self)
3542
}
3643
}
3744

plugins/scheduler/state/worksteal/queue.ts

+15-2
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ export class WorkQueue extends List<WorkTask> {
88
tag: string | undefined,
99
fn: (...args: A[]) => Awaitable<R>,
1010
...args: A[]
11-
)
11+
): WorkTask
1212

1313
put<R, A extends any[]>(
1414
tag: string | undefined,
1515
promise: Promise<unknown>,
1616
fn: (...args: A[]) => Awaitable<R>,
1717
...args: A[]
18-
)
18+
): WorkTask
1919

2020
put(
2121
tag: string | undefined,
@@ -57,6 +57,9 @@ export class WorkTask<R = any, A extends any[] = any[]> extends Item {
5757
args: A[]
5858
promise?: Promise<unknown>
5959

60+
hasError = false
61+
error?: unknown
62+
6063
constructor(
6164
public tag: string | undefined,
6265
fn: (...args: A[]) => Awaitable<R>,
@@ -72,6 +75,16 @@ export class WorkTask<R = any, A extends any[] = any[]> extends Item {
7275
return this
7376
}
7477

78+
markError(reason?: unknown) {
79+
this.hasError = true
80+
this.error = reason
81+
}
82+
83+
throwIfError() {
84+
if (!this.hasError) return
85+
throw this.error
86+
}
87+
7588
call(): Promisify<R> {
7689
const resolved = Tracker.promise(
7790
Promise.try(this.fn, ...this.args),

plugins/scheduler/state/worksteal/worker.ts

+7-7
Original file line numberDiff line numberDiff line change
@@ -12,20 +12,20 @@ export async function worker(
1212
let { value: newTask } = Atomics.waitAsync(shared, TASK_NOTIFY, shared[0])
1313

1414
while (await newTask) {
15-
const task = queue.get()
16-
await Tracker.promise(
17-
task.call(),
18-
'worksteal::worker::[email protected]',
19-
`worker-${id}`,
20-
)
2115
switch (await decideMy(fate)) {
2216
case WorkerFate.KILL:
2317
return
2418
case WorkerFate.CONTINUE:
2519
break
2620
}
21+
const task = queue.get()
22+
await Tracker.promise(
23+
task.call(),
24+
'worksteal::worker::[email protected]',
25+
`worker-${id}`,
26+
).catch(reason => task.markError(reason))
2727
if (queue.has()) continue
2828
;({ value: newTask } = Atomics.waitAsync(shared, TASK_NOTIFY, shared[0]))
2929
}
30-
await fate
30+
await decideMy(fate)
3131
}

0 commit comments

Comments
 (0)