Skip to content

Commit 40bdbd4

Browse files
committed
Add socket stream
1 parent 0e5c140 commit 40bdbd4

File tree

1 file changed

+115
-0
lines changed

1 file changed

+115
-0
lines changed

packages/polling/src/socketstream.ts

+115
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Copyright (c) Jupyter Development Team.
2+
// Distributed under the terms of the Modified BSD License.
3+
4+
import { IDisposable } from '@lumino/disposable';
5+
6+
import { Signal, Stream } from '@lumino/signaling';
7+
8+
import { Poll } from './poll';
9+
10+
/**
11+
* A utility class to wrap and augment a web socket. A socket stream emits web
12+
* socket messages as an async iterable and also as a Lumino signal. It uses
13+
* an internal poll instance to manage reconnection logic automatically.
14+
*
15+
* @typeparam T - The type of the stream owner (i.e., the `sender` of a signal).
16+
*
17+
* @typeparam U - The type of the socket stream's emissions.
18+
*/
19+
export class SocketStream<T, U> extends Stream<T, U> implements IDisposable {
20+
/**
21+
* Construct a new socket stream.
22+
*
23+
* @param sender - The sender which owns the stream.
24+
*
25+
* @param options = The socket stream instantiation options.
26+
*/
27+
constructor(
28+
sender: T,
29+
protected readonly options: SocketStream.IOptions
30+
) {
31+
super(sender);
32+
this.subscription = new Poll({ factory: () => this.subscribe() });
33+
}
34+
35+
/**
36+
* Whether the stream is disposed.
37+
*/
38+
get isDisposed() {
39+
return this.subscription.isDisposed;
40+
}
41+
42+
/**
43+
* Dispose the stream.
44+
*/
45+
dispose() {
46+
super.stop();
47+
this.subscription.dispose();
48+
const { socket } = this;
49+
if (socket) {
50+
this.socket = null;
51+
socket.onclose = () => undefined;
52+
socket.onerror = () => undefined;
53+
socket.onmessage = () => undefined;
54+
socket.onopen = () => undefined;
55+
socket.close();
56+
}
57+
Signal.clearData(this);
58+
}
59+
60+
/**
61+
* Send a message to the underlying web socket.
62+
*
63+
* @param data - The payload of the message sent via the web socket.
64+
*/
65+
send(data: string | ArrayBufferLike | Blob | ArrayBufferView): void {
66+
this.socket?.send(data);
67+
}
68+
69+
/**
70+
* The current active socket. This value is updated by the `subscribe` method.
71+
*/
72+
protected socket: WebSocket | null = null;
73+
74+
/**
75+
* The poll instance that mediates the web socket lifecycle.
76+
*/
77+
protected readonly subscription: Poll;
78+
79+
/**
80+
* Open a web socket and subscribe to its updates.
81+
*
82+
* @returns A promise that rejects when the socket connection is closed.
83+
*/
84+
protected async subscribe(): Promise<void> {
85+
if (this.isDisposed) {
86+
return;
87+
}
88+
return new Promise<void>((_, reject) => {
89+
const Socket = this.options.WebSocket || WebSocket;
90+
const socket = (this.socket = new Socket(this.options.url));
91+
socket.onclose = () => reject(new Error('socket stream: socket closed'));
92+
socket.onmessage = msg => msg.data && this.emit(JSON.parse(msg.data));
93+
});
94+
}
95+
}
96+
97+
/**
98+
* A namespace for `SocketStream` statics.
99+
*/
100+
export namespace SocketStream {
101+
/**
102+
* Instantiation options for a socket stream.
103+
*/
104+
export interface IOptions {
105+
/**
106+
* The web socket URL to open.
107+
*/
108+
url: string;
109+
110+
/**
111+
* An optional web socket constructor.
112+
*/
113+
WebSocket?: typeof WebSocket;
114+
}
115+
}

0 commit comments

Comments
 (0)