@@ -126,28 +126,48 @@ export class WebsocketClient extends BaseWebsocketClient<
126126 public subscribe (
127127 wsEvents : WsChannelSubUnSubRequestArg [ ] | WsChannelSubUnSubRequestArg ,
128128 isPrivateTopic ?: boolean ,
129- ) {
129+ ) : Promise < unknown > [ ] {
130130 const wsEventArgs = Array . isArray ( wsEvents ) ? wsEvents : [ wsEvents ] ;
131- const topicRequests = wsEventArgs . map ( ( wsEvent ) => {
131+
132+ const topicRequestsByWsKey : Record <
133+ WsKey | string ,
134+ WsTopicRequest < string , object > [ ]
135+ > = { } ;
136+
137+ // Format and batch topic requests by WsKey (resolved dynamically)
138+ wsEventArgs . forEach ( ( wsEvent ) => {
132139 const { channel, ...payload } = wsEvent ;
140+
133141 const normalisedEvent : WsTopicRequest < string , object > = {
134142 topic : channel ,
135143 payload,
136144 } ;
137145
138- return normalisedEvent ;
139- } ) ;
146+ const wsKey = getWsKeyForTopicChannel (
147+ this . options . market ,
148+ channel ,
149+ isPrivateTopic ,
150+ ) ;
140151
141- const normalisedTopicRequests = getNormalisedTopicRequests ( topicRequests ) ;
152+ // Arrange into per-wsKey sorted lists
153+ if ( ! topicRequestsByWsKey [ wsKey ] ) {
154+ topicRequestsByWsKey [ wsKey ] = [ ] ;
155+ }
142156
143- // TODO: sort topic requests into wsKeys, and then batch sub for the diff wsKeys
144- const wsKey = getWsKeyForTopicChannel (
145- this . options . market ,
146- wsEventArg . channel ,
147- isPrivateTopic ,
148- ) ;
157+ topicRequestsByWsKey [ wsKey ] . push ( normalisedEvent ) ;
158+ } ) ;
149159
150- return this . subscribeTopicsForWsKey ( normalisedTopicRequests , wsKey ) ;
160+ const subscribeRequestPromises : Promise < unknown > [ ] = [ ] ;
161+ for ( const wsKeyUntyped in topicRequestsByWsKey ) {
162+ subscribeRequestPromises . push (
163+ this . subscribeTopicsForWsKey (
164+ topicRequestsByWsKey [ wsKeyUntyped ] ,
165+ wsKeyUntyped as WsKey ,
166+ ) ,
167+ ) ;
168+ }
169+
170+ return subscribeRequestPromises ;
151171 }
152172
153173 /**
@@ -156,30 +176,50 @@ export class WebsocketClient extends BaseWebsocketClient<
156176 * @param isPrivateTopic optional - the library will try to detect private topics, you can use this to mark a topic as private (if the topic isn't recognised yet)
157177 */
158178 public unsubscribe (
159- wsTopics : WsChannelSubUnSubRequestArg [ ] | WsChannelSubUnSubRequestArg ,
179+ wsEvents : WsChannelSubUnSubRequestArg [ ] | WsChannelSubUnSubRequestArg ,
160180 isPrivateTopic ?: boolean ,
161181 ) {
162- const wsEventArgs = Array . isArray ( wsTopics ) ? wsTopics : [ wsTopics ] ;
163- wsEventArgs . forEach ( ( wsEventArg ) => {
182+ const wsEventArgs = Array . isArray ( wsEvents ) ? wsEvents : [ wsEvents ] ;
183+
184+ const topicRequestsByWsKey : Record <
185+ WsKey | string ,
186+ WsTopicRequest < string , object > [ ]
187+ > = { } ;
188+
189+ // Format and batch topic requests by WsKey (resolved dynamically)
190+ wsEventArgs . forEach ( ( wsEvent ) => {
191+ const { channel, ...payload } = wsEvent ;
192+
193+ const normalisedEvent : WsTopicRequest < string , object > = {
194+ topic : channel ,
195+ payload,
196+ } ;
197+
164198 const wsKey = getWsKeyForTopicChannel (
165199 this . options . market ,
166- wsEventArg . channel ,
200+ channel ,
167201 isPrivateTopic ,
168202 ) ;
169203
170- // Remove topic from persistence for reconnects
171- this . getWsStore ( ) . deleteTopic ( wsKey , wsEventArg ) ;
172-
173- // unsubscribe request only necessary if active connection exists
174- if (
175- this . getWsStore ( ) . isConnectionState (
176- wsKey ,
177- WsConnectionStateEnum . CONNECTED ,
178- )
179- ) {
180- this . requestUnsubscribeTopics ( wsKey , [ wsEventArg ] ) ;
204+ // Arrange into per-wsKey sorted lists
205+ if ( ! topicRequestsByWsKey [ wsKey ] ) {
206+ topicRequestsByWsKey [ wsKey ] = [ ] ;
181207 }
208+
209+ topicRequestsByWsKey [ wsKey ] . push ( normalisedEvent ) ;
182210 } ) ;
211+
212+ const unsubscribeRequestPromises : Promise < unknown > [ ] = [ ] ;
213+ for ( const wsKeyUntyped in topicRequestsByWsKey ) {
214+ unsubscribeRequestPromises . push (
215+ this . unsubscribeTopicsForWsKey (
216+ topicRequestsByWsKey [ wsKeyUntyped ] ,
217+ wsKeyUntyped as WsKey ,
218+ ) ,
219+ ) ;
220+ }
221+
222+ return unsubscribeRequestPromises ;
183223 }
184224
185225 /**
@@ -613,86 +653,6 @@ export class WebsocketClient extends BaseWebsocketClient<
613653 // }
614654 // }
615655
616- // /**
617- // * @private Use the `subscribe(topics)` method to subscribe to topics. Send WS message to subscribe to topics.
618- // */
619- // private requestSubscribeTopics(
620- // wsKey: WsKey,
621- // topics: WsChannelSubUnSubRequestArg[],
622- // ) {
623- // if (!topics.length) {
624- // return;
625- // }
626-
627- // const maxTopicsPerEvent = getMaxTopicsPerSubscribeEvent(
628- // this.options.market,
629- // );
630- // if (maxTopicsPerEvent && topics.length > maxTopicsPerEvent) {
631- // this.logger.trace(
632- // `Subscribing to topics in batches of ${maxTopicsPerEvent}`,
633- // );
634- // for (let i = 0; i < topics.length; i += maxTopicsPerEvent) {
635- // const batch = topics.slice(i, i + maxTopicsPerEvent);
636- // this.logger.trace(`Subscribing to batch of ${batch.length}`);
637- // this.requestSubscribeTopics(wsKey, batch);
638- // }
639- // this.logger.trace(
640- // `Finished batch subscribing to ${topics.length} topics`,
641- // );
642- // return;
643- // }
644-
645- // const request: WsSubRequest = {
646- // id: `${this.getNewRequestId()}`,
647- // op: 'subscribe',
648- // args: topics,
649- // };
650-
651- // const wsMessage = JSON.stringify(request);
652-
653- // this.tryWsSend(wsKey, wsMessage);
654- // }
655-
656- // /**
657- // * @private Use the `unsubscribe(topics)` method to unsubscribe from topics. Send WS message to unsubscribe from topics.
658- // */
659- // private requestUnsubscribeTopics(
660- // wsKey: WsKey,
661- // topics: WsChannelSubUnSubRequestArg[],
662- // ) {
663- // if (!topics.length) {
664- // return;
665- // }
666-
667- // const maxTopicsPerEvent = getMaxTopicsPerSubscribeEvent(
668- // this.options.market,
669- // );
670- // if (maxTopicsPerEvent && topics.length > maxTopicsPerEvent) {
671- // this.logger.trace(
672- // `Unsubscribing to topics in batches of ${maxTopicsPerEvent}`,
673- // );
674- // for (let i = 0; i < topics.length; i += maxTopicsPerEvent) {
675- // const batch = topics.slice(i, i + maxTopicsPerEvent);
676- // this.logger.trace(`Unsubscribing to batch of ${batch.length}`);
677- // this.requestUnsubscribeTopics(wsKey, batch);
678- // }
679- // this.logger.trace(
680- // `Finished batch unsubscribing to ${topics.length} topics`,
681- // );
682- // return;
683- // }
684-
685- // const request: WsUnsubRequest = {
686- // id: `${this.getNewRequestId()}`,
687- // op: 'unsubscribe',
688- // args: topics,
689- // };
690-
691- // const wsMessage = JSON.stringify(request);
692-
693- // this.tryWsSend(wsKey, wsMessage);
694- // }
695-
696656 // private onWsMessageLegacy(event: any, wsKey: WsKey, ws: WebSocket) {
697657 // const logContext = { ...WS_LOGGER_CATEGORY, wsKey, method: 'onWsMessage' };
698658
0 commit comments