Skip to content
This repository was archived by the owner on May 29, 2023. It is now read-only.

Commit 4f278c1

Browse files
authored
Merge pull request #3 from ng-web-apis/bugfix/terminate
fix: close all subscriptions, if the worker was terminated
2 parents 1b75cdd + e1e7d0a commit 4f278c1

File tree

3 files changed

+53
-16
lines changed

3 files changed

+53
-16
lines changed

projects/workers/src/worker/classes/web-worker.spec.ts

+21
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,25 @@ describe('WebWorker', () => {
8989
'Uncaught reason',
9090
);
9191
});
92+
93+
it('should close all subscriptions, if the worker was terminated', async () => {
94+
const worker = WebWorker.fromFunction<void, string>(() => 'some data');
95+
96+
const subscriptions = [
97+
worker.subscribe(),
98+
worker.subscribe(),
99+
worker.subscribe(),
100+
];
101+
102+
worker.terminate();
103+
expect(subscriptions.map(s => s.closed)).toEqual([true, true, true]);
104+
});
105+
106+
it("shouldn't throw any errors, if the worker was terminated twice", async () => {
107+
const worker = WebWorker.fromFunction<void, string>(() => 'some data');
108+
109+
worker.terminate();
110+
worker.terminate();
111+
expect(await worker.toPromise()).toBeUndefined();
112+
});
92113
});

projects/workers/src/worker/classes/web-worker.ts

+30-15
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
1-
import {EMPTY, fromEvent, merge, Observable} from 'rxjs';
2-
import {take, tap} from 'rxjs/operators';
1+
import {EMPTY, fromEvent, merge, Observable, Subject} from 'rxjs';
2+
import {take, takeUntil, tap} from 'rxjs/operators';
33
import {WORKER_BLANK_FN} from '../consts/worker-fn-template';
44
import {TypedMessageEvent} from '../types/typed-message-event';
55
import {WorkerFunction} from '../types/worker-function';
66

77
export class WebWorker<T = any, R = any> extends Observable<TypedMessageEvent<R>> {
88
private readonly worker: Worker | undefined;
99
private readonly url: string;
10+
private readonly destroy$: Subject<void>;
1011

1112
constructor(url: string, options?: WorkerOptions) {
1213
let worker: Worker | undefined;
@@ -19,26 +20,29 @@ export class WebWorker<T = any, R = any> extends Observable<TypedMessageEvent<R>
1920
}
2021

2122
super(subscriber => {
23+
let eventStream$: Observable<TypedMessageEvent<R> | ErrorEvent> = EMPTY;
24+
2225
if (error) {
2326
subscriber.error(error);
27+
} else if (this.destroy$.isStopped) {
28+
subscriber.complete();
29+
} else if (worker) {
30+
eventStream$ = merge(
31+
fromEvent<TypedMessageEvent<R>>(worker, 'message').pipe(
32+
tap(event => subscriber.next(event)),
33+
),
34+
fromEvent<ErrorEvent>(worker, 'error').pipe(
35+
tap(event => subscriber.error(event)),
36+
),
37+
).pipe(takeUntil(this.destroy$));
2438
}
2539

26-
const eventStream$ = worker
27-
? merge(
28-
fromEvent<TypedMessageEvent<R>>(worker, 'message').pipe(
29-
tap(event => subscriber.next(event)),
30-
),
31-
fromEvent<ErrorEvent>(worker, 'error').pipe(
32-
tap(event => subscriber.error(event)),
33-
),
34-
)
35-
: EMPTY;
36-
37-
return eventStream$.subscribe();
40+
eventStream$.subscribe().add(subscriber);
3841
});
3942

4043
this.worker = worker;
4144
this.url = url;
45+
this.destroy$ = new Subject<void>();
4246
}
4347

4448
static fromFunction<T, R>(
@@ -57,7 +61,11 @@ export class WebWorker<T = any, R = any> extends Observable<TypedMessageEvent<R>
5761

5862
worker.postMessage(data);
5963

60-
return promise;
64+
return promise.then(result => {
65+
worker.terminate();
66+
67+
return result;
68+
});
6169
}
6270

6371
private static createFnUrl(fn: WorkerFunction): string {
@@ -69,11 +77,18 @@ export class WebWorker<T = any, R = any> extends Observable<TypedMessageEvent<R>
6977
}
7078

7179
terminate() {
80+
if (this.destroy$.isStopped) {
81+
return;
82+
}
83+
7284
if (this.worker) {
7385
this.worker.terminate();
7486
}
7587

7688
URL.revokeObjectURL(this.url);
89+
90+
this.destroy$.next();
91+
this.destroy$.complete();
7792
}
7893

7994
postMessage(value: T) {

projects/workers/src/worker/pipes/worker.pipe.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ export class WorkerPipe implements PipeTransform {
1212
private observers = new WeakMap<WebWorker, Observable<any>>();
1313

1414
transform<T, R>(value: T, fn: WorkerFunction<T, R>): Observable<R> {
15-
const worker = this.workers.get(fn) || WebWorker.fromFunction(fn);
15+
const worker: WebWorker<T, R> =
16+
this.workers.get(fn) || WebWorker.fromFunction(fn);
1617

1718
this.workers.set(fn, worker);
1819
worker.postMessage(value);

0 commit comments

Comments
 (0)