1
- import { EMPTY , fromEvent , merge , Observable , Observer } from 'rxjs' ;
2
- import { take , tap } from 'rxjs/operators' ;
1
+ import { EMPTY , fromEvent , merge , Observable , Subject } from 'rxjs' ;
2
+ import { take , takeUntil , tap } from 'rxjs/operators' ;
3
3
import { WORKER_BLANK_FN } from '../consts/worker-fn-template' ;
4
4
import { TypedMessageEvent } from '../types/typed-message-event' ;
5
5
import { WorkerFunction } from '../types/worker-function' ;
6
6
7
7
export class WebWorker < T = any , R = any > extends Observable < TypedMessageEvent < R > > {
8
8
private readonly worker : Worker | undefined ;
9
9
private readonly url : string ;
10
- private isStopped : boolean ;
11
- private observers : Observer < TypedMessageEvent < R > > [ ] ;
10
+ private readonly destroy$ : Subject < void > ;
12
11
13
12
constructor ( url : string , options ?: WorkerOptions ) {
14
13
let worker : Worker | undefined ;
@@ -25,7 +24,7 @@ export class WebWorker<T = any, R = any> extends Observable<TypedMessageEvent<R>
25
24
26
25
if ( error ) {
27
26
subscriber . error ( error ) ;
28
- } else if ( this . isStopped ) {
27
+ } else if ( this . destroy$ . isStopped ) {
29
28
subscriber . complete ( ) ;
30
29
} else if ( worker ) {
31
30
eventStream$ = merge (
@@ -35,19 +34,15 @@ export class WebWorker<T = any, R = any> extends Observable<TypedMessageEvent<R>
35
34
fromEvent < ErrorEvent > ( worker , 'error' ) . pipe (
36
35
tap ( event => subscriber . error ( event ) ) ,
37
36
) ,
38
- ) ;
39
-
40
- this . observers . push ( subscriber ) ;
37
+ ) . pipe ( takeUntil ( this . destroy$ ) ) ;
41
38
}
42
39
43
- return eventStream$ . subscribe ( ) ;
40
+ eventStream$ . subscribe ( ) . add ( subscriber ) ;
44
41
} ) ;
45
42
46
43
this . worker = worker ;
47
44
this . url = url ;
48
-
49
- this . isStopped = false ;
50
- this . observers = [ ] ;
45
+ this . destroy$ = new Subject < void > ( ) ;
51
46
}
52
47
53
48
static fromFunction < T , R > (
@@ -82,7 +77,7 @@ export class WebWorker<T = any, R = any> extends Observable<TypedMessageEvent<R>
82
77
}
83
78
84
79
terminate ( ) {
85
- if ( this . isStopped ) {
80
+ if ( this . destroy$ . isStopped ) {
86
81
return ;
87
82
}
88
83
@@ -92,13 +87,8 @@ export class WebWorker<T = any, R = any> extends Observable<TypedMessageEvent<R>
92
87
93
88
URL . revokeObjectURL ( this . url ) ;
94
89
95
- this . isStopped = true ;
96
- this . observers . forEach ( observer => {
97
- if ( ! observer . closed ) {
98
- observer . complete ( ) ;
99
- }
100
- } ) ;
101
- this . observers = [ ] ;
90
+ this . destroy$ . next ( ) ;
91
+ this . destroy$ . complete ( ) ;
102
92
}
103
93
104
94
postMessage ( value : T ) {
0 commit comments