Skip to content

Commit 964ee7a

Browse files
committed
feat(worker): wrap comlink with a worker init method
1 parent 90ccee4 commit 964ee7a

8 files changed

Lines changed: 60 additions & 44 deletions

File tree

Executable.ts

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,9 @@
1-
import { type Promisable } from "./deps.ts";
1+
import { comlink, type Promisable } from "./deps.ts";
22

33
/**
44
* The Runner interface.
55
*/
6-
export interface Executable<
7-
TPayload = unknown,
8-
TResult = unknown,
9-
TError extends Error = Error
10-
> {
6+
export interface Executable<TPayload, TResult, TError extends Error = Error> {
117
execute: (payload: TPayload) => Promisable<TResult>;
128

139
onSuccess?: (result: TResult) => Promisable<void>;
@@ -26,3 +22,16 @@ export interface Executable<
2622
*/
2723
dispose?: () => Promisable<void>;
2824
}
25+
26+
export const initializeWorker = <
27+
T extends // deno-lint-ignore no-explicit-any
28+
Executable<any, any>,
29+
>(
30+
callbacks: T,
31+
) => {
32+
if (!(self instanceof WorkerGlobalScope)) {
33+
throw new Error("This module is only intended to be used in a worker.");
34+
}
35+
36+
comlink.expose(callbacks);
37+
};

ExecutableWorker.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,8 @@ import { type Executable } from "./Executable.ts";
1616
export class ExecutableWorker<
1717
TPayload = unknown,
1818
TResult = unknown,
19-
TError extends Error = Error
20-
> implements Executable<TPayload, TResult, TError>
21-
{
19+
TError extends Error = Error,
20+
> implements Executable<TPayload, TResult, TError> {
2221
#worker: Worker;
2322
#linked: Remote<Executable<TPayload, TResult>>;
2423

README.md

Lines changed: 29 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,27 @@
11
# Workerpool
22

3-
An unopinionated small scale worker pool abstraction which serves as a base interface for more advanced worker managers.
3+
An unopinionated small scale worker pool abstraction which serves as a base
4+
interface for more advanced worker managers.
45

56
## Terminology
67

78
1. **Workerpool**
89

9-
A manager that creates workers on the fly, executing tasks up to defined concurrency.
10+
A manager that creates workers on the fly, executing tasks up to defined
11+
concurrency.
1012

1113
2. **Runner**
1214

13-
Runners are internal wrappers for user provided runner classes, they maintain internal states such as active/busy and the retry counter.
15+
Runners are internal wrappers for user provided runner classes, they maintain
16+
internal states such as active/busy and the retry counter.
1417

1518
3. **Executable**
1619

17-
User implementation of task executors, where they all implements the `Executable` interface.
20+
User implementation of task executors, where they all implements the
21+
`Executable` interface.
1822

19-
Also called _workers_ for the sake of naming convension, but the usage kept minimal to avoid confusion with Web Workers.
23+
Also called _workers_ for the sake of naming convension, but the usage kept
24+
minimal to avoid confusion with Web Workers.
2025

2126
4. **Task**
2227

@@ -94,15 +99,26 @@ const pool = new Workerpool<Payload>({
9499

95100
### Web Workers
96101

97-
Deno has built-in support for workers, our `ExecutableWorker` class serves as a simple proxy class via `comlink`.
102+
Deno has built-in support for workers, our `ExecutableWorker` class serves as a
103+
simple proxy class via `comlink`.
98104

99-
You'll need a separated script file for the worker.
105+
```ts
106+
import { ExecutableWorker } from "https://deno.land/x/workerpool/mod.ts";
107+
108+
class MyRunner extends ExecutableWorker<string, void> {
109+
constructor() {
110+
super(new URL("./worker.ts", import.meta.url).href);
111+
}
112+
}
113+
```
114+
115+
You'll also need a separated script file for the worker itself.
100116

101117
```ts
102118
// worker.ts
103-
import { expose } from "https://deno.land/x/comlink/mod.ts";
119+
import { initializeWorker } from "https://deno.land/x/workerpool/mod.ts";
104120

105-
expose({
121+
initializeWorker({
106122
execute: async (payload: string) => {
107123
// Simulate async actions
108124
await new Promise((resolve) => setTimeout(resolve, 1000));
@@ -112,18 +128,10 @@ expose({
112128
});
113129
```
114130

115-
Now register the runners into the workerpool:
131+
Now register the runner into the workerpool:
116132

117133
```ts
118-
import { ExecutableWorker } from "https://deno.land/x/workerpool/mod.ts";
119-
120-
class MyRunner extends ExecutableWorker<string, void> {
121-
constructor() {
122-
super(new URL("./worker.ts", import.meta.url).href);
123-
}
124-
}
125-
126-
const pool = new Workerpool({
134+
const pool = new Workerpool<string, void>({
127135
concurrency: 1,
128136
workers: [MyRunner],
129137
});
@@ -136,4 +144,5 @@ pool
136144

137145
## Sponsorship
138146

139-
If you appreciate my work, or want to see specific features to happen, [a coffee would do](https://www.github.com/sponsors/vicary).
147+
If you appreciate my work, or want to see specific features to happen,
148+
[a coffee would do](https://www.github.com/sponsors/vicary).

Runner.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ export class RunnerExecutionError extends Error {
44
constructor(
55
message: string,
66
readonly name: string,
7-
readonly retryable = false
7+
readonly retryable = false,
88
) {
99
super(message);
1010
}
@@ -22,7 +22,7 @@ export class Runner<TPayload = unknown, TResult = unknown> {
2222

2323
constructor(
2424
readonly runner: Executable<TPayload, TResult>,
25-
readonly name: string
25+
readonly name: string,
2626
) {}
2727

2828
get busy() {
@@ -68,7 +68,7 @@ export class Runner<TPayload = unknown, TResult = unknown> {
6868
throw new RunnerExecutionError(
6969
error.message,
7070
error.name,
71-
retryable ?? true
71+
retryable ?? true,
7272
);
7373
} finally {
7474
this.#executionnCount++;

Workerpool.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { Executable } from "./Executable.ts";
88
import { ExecutableWorker } from "./ExecutableWorker.ts";
99
import { Task } from "./Task.ts";
1010
import { Workerpool } from "./Workerpool.ts";
11-
import { comlink, type Class, type SetOptional } from "./deps.ts";
11+
import { type Class, comlink, type SetOptional } from "./deps.ts";
1212

1313
export type ArrowFunction = (...args: unknown[]) => unknown;
1414
type MemoryMutexTask<TPayload> = Task<TPayload> & { active?: boolean };

Workerpool.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,12 @@ export type WorkerpoolOptions<TPayload = JsonValue, TResult = unknown> = {
5757
(
5858
error: Error,
5959
result: null,
60-
context: CallbackContext<TPayload, TResult>
60+
context: CallbackContext<TPayload, TResult>,
6161
): Promisable<void>;
6262
(
6363
error: null,
6464
result: TResult,
65-
context: CallbackContext<TPayload, TResult>
65+
context: CallbackContext<TPayload, TResult>,
6666
): Promisable<void>;
6767
};
6868

@@ -241,7 +241,7 @@ export class Workerpool<TPayload = JsonValue, TResult = unknown> {
241241
} else {
242242
throw error;
243243
}
244-
}
244+
},
245245
)
246246
.finally(() => {
247247
if (runner.executionCount >= this.#maximumTaskPerRunner) {
@@ -272,7 +272,7 @@ export class Workerpool<TPayload = JsonValue, TResult = unknown> {
272272
#getRunner(name: string): Runner<TPayload, TResult> | undefined {
273273
const idleRunners = [...this.#runners].filter((runner) => !runner.busy);
274274
const runner = idleRunners.find(
275-
({ name: runnerName }) => runnerName === name
275+
({ name: runnerName }) => runnerName === name,
276276
);
277277
if (runner) {
278278
return runner;
@@ -286,7 +286,7 @@ export class Workerpool<TPayload = JsonValue, TResult = unknown> {
286286

287287
const runnerInstance = new Runner<TPayload, TResult>(
288288
new executableClass(),
289-
executableClass.name
289+
executableClass.name,
290290
);
291291

292292
this.#runners.add(runnerInstance);
@@ -295,7 +295,7 @@ export class Workerpool<TPayload = JsonValue, TResult = unknown> {
295295
} else {
296296
// Discard idle runners of other types, if available.
297297
const idleRunner = idleRunners.find(
298-
({ name: runnerName }) => runnerName !== name
298+
({ name: runnerName }) => runnerName !== name,
299299
);
300300

301301
if (idleRunner) {

__test__/example-worker.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import { comlink } from "../deps.ts";
55
import { type Executable } from "../Executable.ts";
66
import { type ArrowFunction } from "../Workerpool.test.ts";
77

8-
const exposedObject: Executable<ArrowFunction> = {
8+
const exposedObject: Executable<ArrowFunction, unknown> = {
99
async execute(payload) {
1010
// Mimic async action.
1111
await new Promise((resolve) => setTimeout(resolve, 100));

mod.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
export type { Executable } from "./Executable.ts";
1+
export { type Executable, initializeWorker } from "./Executable.ts";
22
export { ExecutableWorker } from "./ExecutableWorker.ts";
33
export { Runner, RunnerExecutionError } from "./Runner.ts";
4-
export type { Task } from "./Task.ts";
5-
export { Workerpool } from "./Workerpool.ts";
6-
export type { WorkerpoolOptions } from "./Workerpool.ts";
4+
export { type Task } from "./Task.ts";
5+
export { Workerpool, type WorkerpoolOptions } from "./Workerpool.ts";

0 commit comments

Comments
 (0)