Skip to content

Commit dc16139

Browse files
committed
feat: combine callbacks to simplify the API
1 parent 3e4f387 commit dc16139

3 files changed

Lines changed: 28 additions & 33 deletions

File tree

README.md

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -49,18 +49,17 @@ As a proof of concept, this is the most basic implementation of an in-memory que
4949

5050
```ts
5151
type Payload = any;
52-
5352
type MemoryMutexTask = Task<Payload> & { active?: boolean };
5453

55-
const tasks = new Set() < MemoryMutexTask > [];
54+
const tasks = new Set<MemoryMutexTask>();
5655
const pool = new Workerpool<Payload>({
5756
concurrency: 1,
5857
runners: [runnerA, runnerB],
59-
enqueue: (task: MemoryMutexTask) => {
58+
enqueue(task: MemoryMutexTask) {
6059
task.active = false;
6160
tasks.add(task);
6261
},
63-
dequeue: () => {
62+
dequeue() {
6463
// Uncomment the following line for FIFO queues
6564
// if ([...tasks].find(({ active }) => active)) return;
6665

@@ -70,19 +69,15 @@ const pool = new Workerpool<Payload>({
7069
return task;
7170
}
7271
},
73-
success: (result, { task }) => {
74-
console.log("Result:", result);
72+
onTaskFinish(error, result, { task }) {
73+
tasks.delete(task);
7574

76-
const index = tasks.indexOf(task);
77-
if (index > -1) {
78-
tasks.splice(index, 1);
75+
if (error) {
76+
console.error(error);
77+
} else {
78+
console.log(result);
7979
}
8080
},
81-
failure: (error, { task }) => {
82-
console.error(error);
83-
84-
const index = tasks.indexOf(task);
85-
},
8681
});
8782
```
8883

Workerpool.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ describe("Workerpool", () => {
4545
return task;
4646
}
4747
},
48-
success: (_, { task }) => {
48+
onTaskFinished: (_error, _result, { task }) => {
4949
const index = queue.indexOf(task);
5050
if (index > -1) {
5151
queue.splice(index, 1);

Workerpool.ts

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -51,21 +51,20 @@ export type WorkerpoolOptions<TPayload = JsonValue, TResult = unknown> = {
5151
dequeue: () => Promisable<Task<TPayload> | undefined>;
5252

5353
/**
54-
* Called when a dequeued task is successful, use this function to remove
55-
* finished tasks (mutex).
54+
* Callback style task handler.
5655
*/
57-
success?: (
58-
result: TResult,
59-
context: CallbackContext<TPayload, TResult>
60-
) => Promisable<void>;
61-
62-
/**
63-
* Called when a failing task has exceeded maximum retries.
64-
*/
65-
failure?: (
66-
error: Error,
67-
context: CallbackContext<TPayload, TResult>
68-
) => Promisable<void>;
56+
onTaskFinished?: {
57+
(
58+
error: Error,
59+
result: null,
60+
context: CallbackContext<TPayload, TResult>
61+
): Promisable<void>;
62+
(
63+
error: null,
64+
result: TResult,
65+
context: CallbackContext<TPayload, TResult>
66+
): Promisable<void>;
67+
};
6968

7069
/**
7170
* Called when the state of the pool is changed.
@@ -127,7 +126,7 @@ export class Workerpool<TPayload = JsonValue, TResult = unknown> {
127126

128127
start() {
129128
if (this.#active) {
130-
return;
129+
return this;
131130
}
132131

133132
this.#active = true;
@@ -221,16 +220,17 @@ export class Workerpool<TPayload = JsonValue, TResult = unknown> {
221220
runner
222221
.execute(task.payload)
223222
.then(
224-
(result) => this.options.success?.(result, { task, runner }),
223+
(result) =>
224+
this.options.onTaskFinished?.(null, result, { task, runner }),
225225
(error) => {
226226
if (
227227
error instanceof RunnerExecutionError &&
228228
error.retryable &&
229229
task.executionCount < this.#maximumRetries
230230
) {
231231
this.enqueue(task);
232-
} else if (this.options.failure) {
233-
return this.options.failure(error, { task, runner });
232+
} else if (this.options.onTaskFinished) {
233+
return this.options.onTaskFinished(error, null, { task, runner });
234234
} else {
235235
throw error;
236236
}

0 commit comments

Comments
 (0)