1- import { Api } from "../api" ;
2- import { SessionsMessage } from "../generated-client" ;
3- import getWebSocketUrl from "../utils/url/websocket" ;
1+ import { Api , AUTHORIZATION_PARAMETER } from "../api" ;
2+ import { OutboundWebSocketMessage , SessionsMessage } from "../generated-client" ;
43
54
65/**
@@ -9,49 +8,141 @@ import getWebSocketUrl from "../utils/url/websocket";
98 */
109export class WebSocketService {
1110
11+ /**
12+ * The {@link Api} instance used to construct the websocket URL
13+ */
1214 private api ;
15+
16+ /**
17+ * The active websocket connection, if one exists
18+ */
1319 private socket : WebSocket | undefined
1420
21+ /**
22+ * A map of message type subscriptions to their respective handlers
23+ */
24+ private subscriptions : Map < string , SocketMessageHandler < any > [ ] > = new Map ( ) ;
25+
1526 constructor ( api : Api ) {
1627 this . api = api
1728 }
1829
19- private shouldOpenWebSocket ( accessToken : string ) {
20-
21- const socketClosed = this . socket === undefined || this . socket . readyState === WebSocket . CLOSED ;
30+ /**
31+ * Gets the websocket URL for the given API instance
32+ *
33+ * @param api The authenticated {@link Api} instance
34+ * @returns The websocket URL
35+ */
36+ private getWebSocketUrl ( api : Api ) : URL {
37+ return new URL (
38+ api . getUri ( "socket" , {
39+ [ AUTHORIZATION_PARAMETER ] : api . accessToken
40+ } ) . replace ( / ^ h t t p / , "ws" )
41+ ) ;
42+ }
2243
23- // TODO: Add validation around accessToken
24- return socketClosed && accessToken . length > 0 ;
44+ /**
45+ * Gets the current status of the websocket connection
46+ * @returns The {@link WebSocket.readyState} status
47+ */
48+ getSocketStatus ( ) : WebSocket [ 'readyState' ] | undefined {
49+ return this . socket ?. readyState ;
2550 }
2651
2752 /**
28- * Ensures a websocket connection to the server is active by establishing a new connection
29- * if a connection doesn't already exist or the previous connection is closed.
53+ * Adds message listeners for the provided message types
54+ *
55+ * Adding a listener will establish a websocket connection if one does not already exist
56+ *
57+ * Listeners will be automatically re-added if the connection is lost and re-established
58+ *
59+ * Listeners can be removed by invoking the returned unsubscribe function
3060 *
31- * Depends on the {@link Api.accessToken} being valid and
32- * populated.
61+ * @param messageTypes Any array of {@link OutboundWebSocketMessage} message types to listen for
62+ * @param onMessage The callback to invoke when a message is received
3363 *
34- * @param onOpen An optional callback to run when the socket is opened
35- * @param onClose An optional callback to run when the socket is closed
64+ * @returns A function which can be invoked to remove the added listeners
3665 */
37- ensureWebSocket (
38- onOpen : ( e ?: Event ) => Promise < void > ,
39- onMessage : ( e : MessageEvent < SessionsMessage > ) => Promise < void > ,
40- onClose : ( e ?: CloseEvent ) => Promise < void >
41- ) {
42-
43- if ( this . shouldOpenWebSocket ( this . api . accessToken ) ) {
44-
45- const webSocketUrl = getWebSocketUrl ( this . api ) ;
46-
47- this . socket = new WebSocket ( webSocketUrl ) ;
48- this . socket . onopen = onOpen ;
49- this . socket . onclose = onClose ;
50- this . socket . onmessage = onMessage ;
66+ subscribe < T extends OutboundWebSocketMessage [ 'MessageType' ] > ( messageTypes : T [ ] , onMessage : SocketMessageHandler < T > ) {
67+ if ( ! this . socket || this . socket . readyState !== WebSocket . OPEN ) {
68+ const url = this . getWebSocketUrl ( this . api ) ;
69+ this . socket = new WebSocket ( url . toString ( ) ) ;
70+ }
71+
72+ // Send startMessages and stopMessages to the server for three specific message types
73+ // Sessions
74+ // ActivityLogEntry
75+ // ScheduledTaskSSSSSSSSSSSSSSSSSS🐍Info
76+
77+ this . socket . addEventListener ( 'message' , ( event ) => {
78+ const message = JSON . parse ( event . data ) as OutboundWebSocketMessage ;
79+ if ( messageTypes . includes ( message . MessageType as T ) ) {
80+ onMessage ( message as Extract < OutboundWebSocketMessage , { MessageType : T } > ) ;
81+ }
82+ } ) ;
83+
84+ this . socket . addEventListener ( 'close' , ( ) => {
85+ /**
86+ * If the socket is closed, reopen it if there are subscriptions
87+ * and re-add the listeners
88+ */
89+ if ( this . subscriptions . size > 0 ) {
90+ const url = this . getWebSocketUrl ( this . api ) ;
91+ this . socket = new WebSocket ( url . toString ( ) ) ;
92+
93+ this . socket . addEventListener ( 'open' , ( ) => {
94+ for ( const [ messageType , handlers ] of this . subscriptions . entries ( ) ) {
95+ for ( const handler of handlers ) {
96+ this . socket ?. addEventListener ( 'message' , ( event ) => {
97+ const message = JSON . parse ( event . data ) as OutboundWebSocketMessage ;
98+ if ( message . MessageType === messageType ) {
99+ handler ( message ) ;
100+ }
101+ } ) ;
102+ }
103+ }
104+ } ) ;
105+ }
106+ // Else close and dispose
107+ else {
108+ this . socket ?. close ( ) ;
109+ this . socket = undefined ;
110+ }
111+ } ) ;
112+
113+ // If the last subscription has been removed, close the socket
114+
115+
116+ // Catalog all existing subscriptions for the given message types
117+ for ( const messageType of messageTypes ) {
118+ if ( ! this . subscriptions . has ( messageType ) ) {
119+ this . subscriptions . set ( messageType , [ ] ) ;
120+ }
121+ this . subscriptions . get ( messageType ) ! . push ( onMessage ) ;
122+ }
123+
124+ // Return an unsubscribe function
125+ return ( ) => {
126+ for ( const messageType of messageTypes ) {
127+ const handlers = this . subscriptions . get ( messageType ) ;
128+ if ( handlers ) {
129+ const index = handlers . indexOf ( onMessage ) ;
130+ if ( index !== - 1 ) {
131+ handlers . splice ( index , 1 ) ;
132+ }
133+ if ( handlers . length === 0 ) {
134+ this . subscriptions . delete ( messageType ) ;
135+ }
136+ }
137+ }
138+
139+ // If there are no more subscriptions, close the socket
140+ if ( this . subscriptions . size === 0 ) {
141+ this . socket ?. close ( ) ;
142+ this . socket = undefined ;
143+ }
51144 }
52145 }
146+ }
53147
54- sendMessage ( message : SessionsMessage ) {
55- this . socket ?. send ( JSON . stringify ( message ) ) ;
56- }
57- }
148+ type SocketMessageHandler < T extends OutboundWebSocketMessage [ 'MessageType' ] > = ( message : Extract < OutboundWebSocketMessage , { MessageType : T } > ) => void
0 commit comments