@@ -24,11 +24,9 @@ export class SocketStream<T, U> extends Stream<T, U> implements IDisposable {
24
24
*
25
25
* @param options = The socket stream instantiation options.
26
26
*/
27
- constructor (
28
- sender : T ,
29
- protected readonly options : SocketStream . IOptions
30
- ) {
27
+ constructor ( sender : T , options : SocketStream . IOptions ) {
31
28
super ( sender ) ;
29
+ this . factory = ( ) => new ( options . WebSocket || WebSocket ) ( options . url ) ;
32
30
this . subscription = new Poll ( { factory : ( ) => this . subscribe ( ) } ) ;
33
31
}
34
32
@@ -66,6 +64,11 @@ export class SocketStream<T, U> extends Stream<T, U> implements IDisposable {
66
64
this . socket ?. send ( data ) ;
67
65
}
68
66
67
+ /**
68
+ * A factory that generates a new web socket instance for subscription.
69
+ */
70
+ protected readonly factory : ( ) => WebSocket ;
71
+
69
72
/**
70
73
* The current active socket. This value is updated by the `subscribe` method.
71
74
*/
@@ -86,10 +89,9 @@ export class SocketStream<T, U> extends Stream<T, U> implements IDisposable {
86
89
return ;
87
90
}
88
91
return new Promise < void > ( ( _ , reject ) => {
89
- const Socket = this . options . WebSocket || WebSocket ;
90
- const socket = ( this . socket = new Socket ( this . options . url ) ) ;
91
- socket . onclose = ( ) => reject ( new Error ( 'socket stream: socket closed' ) ) ;
92
- socket . onmessage = msg => msg . data && this . emit ( JSON . parse ( msg . data ) ) ;
92
+ this . socket = this . factory ( ) ;
93
+ this . socket . onclose = ( ) => reject ( new Error ( 'socket stream has closed' ) ) ;
94
+ this . socket . onmessage = ( { data } ) => data && this . emit ( JSON . parse ( data ) ) ;
93
95
} ) ;
94
96
}
95
97
}
0 commit comments