@@ -362,30 +362,44 @@ func (w *WebSocketCommon) initializePool() {
362362//
363363// @param config The WebSocketConfig containing configuration details.
364364// @param userAgent The user agent string to be used for the connection.
365+ // @param streams A slice of stream names to subscribe to upon connection.
365366// @return An error if the connection fails, otherwise nil.
366- func (w * WebSocketCommon ) Connect (config WebSocketConfig , userAgent string ) error {
367+ func (w * WebSocketCommon ) Connect (config WebSocketConfig , userAgent string , streams [] string ) error {
367368 if err := w .setupProxyDialer (config ); err != nil {
368369 return fmt .Errorf ("proxy setup failed: %v" , err )
369370 }
370371
371- BasePath := w .prepareBasePath (config )
372372 headers := w .prepareHeaders (config , userAgent )
373373 dialer := w .CreateWebSocketDialer (config )
374374
375375 if w .Mode == SINGLE {
376+ BasePath := w .prepareBasePath (config , streams , true )
376377 return w .connectSingleMode (BasePath , headers , dialer , config , userAgent )
377378 }
378- return w .connectPoolMode (BasePath , headers , dialer , config , userAgent )
379+ return w .connectPoolMode (headers , dialer , config , userAgent , streams )
379380}
380381
381382// prepareBasePath constructs the base path for the WebSocket connection.
382383//
383384// @param config The WebSocketConfig containing configuration details.
385+ // @param streams The list of streams to include in the base path.
386+ // @param includeStreams A boolean indicating whether to include streams in the base path.
384387// @return The constructed base path string.
385- func (w * WebSocketCommon ) prepareBasePath (config WebSocketConfig ) string {
388+ func (w * WebSocketCommon ) prepareBasePath (config WebSocketConfig , streams [] string , includeStreams bool ) string {
386389 BasePath := config .GetBasePath ()
390+ if includeStreams && streams != nil && len (streams ) > 0 {
391+ BasePath += "?streams="
392+ for _ , stream := range streams {
393+ BasePath += stream + "/"
394+ }
395+ BasePath = strings .TrimSuffix (BasePath , "/" )
396+ }
387397 if timeUnit := config .GetTimeUnit (); timeUnit != "" {
388- BasePath = BasePath + "?timeUnit=" + string (timeUnit )
398+ if streams != nil && len (streams ) > 0 {
399+ BasePath += "&timeUnit=" + string (timeUnit )
400+ } else {
401+ BasePath += "?timeUnit=" + string (timeUnit )
402+ }
389403 }
390404 return BasePath
391405}
@@ -485,13 +499,13 @@ func (w *WebSocketCommon) connectSingleMode(BasePath string, headers http.Header
485499
486500// connectPoolMode establishes WebSocket connections in pool mode.
487501//
488- // @param BasePath The base URL for the WebSocket connection.
489502// @param headers The HTTP headers to include in the connection request.
490503// @param dialer The WebSocket dialer to use for the connection.
491504// @param config The WebSocketConfig containing configuration details.
492505// @param userAgent The user agent string to use for the connection.
506+ // @param streams A slice of stream names to subscribe to upon connection.
493507// @return An error if the connection fails, otherwise nil.
494- func (w * WebSocketCommon ) connectPoolMode (BasePath string , headers http.Header , dialer websocket.Dialer , config WebSocketConfig , userAgent string ) error {
508+ func (w * WebSocketCommon ) connectPoolMode (headers http.Header , dialer websocket.Dialer , config WebSocketConfig , userAgent string , streams [] string ) error {
495509 var wg sync.WaitGroup
496510 successChan := make (chan bool , len (w .Connections ))
497511 errChan := make (chan error , len (w .Connections ))
@@ -502,6 +516,7 @@ func (w *WebSocketCommon) connectPoolMode(BasePath string, headers http.Header,
502516 go func (num int , conn * WebSocketConnection ) {
503517 defer wg .Done ()
504518
519+ BasePath := w .prepareBasePath (config , streams , num == 0 )
505520 wsConn , _ , err := dialer .Dial (BasePath , headers )
506521 if err != nil {
507522 log .Printf ("WebSocket connection error: %v" , err )
@@ -601,7 +616,7 @@ func (w *WebSocketCommon) KeepAlive(connection *WebSocketConnection, config WebS
601616// @param userAgent The user agent string to use for the connection.
602617// @return An error if the reconnection fails, otherwise nil.
603618func (w * WebSocketCommon ) reconnect (conn * WebSocketConnection , config WebSocketConfig , userAgent string ) error {
604- BasePath := w .prepareBasePath (config )
619+ BasePath := w .prepareBasePath (config , conn . StreamConnectionMap , conn . SessionLogonRequest == nil )
605620 headers := w .prepareHeaders (config , userAgent )
606621 dialer := w .CreateWebSocketDialer (config )
607622
@@ -617,8 +632,6 @@ func (w *WebSocketCommon) reconnect(conn *WebSocketConnection, config WebSocketC
617632 w .restoreSessionIfNeeded (conn )
618633 time .Sleep (1 * time .Second )
619634 w .resubscribeUserDataStreams (conn )
620- } else if len (conn .StreamConnectionMap ) > 0 {
621- w .resubscribeRegularStreams (conn )
622635 }
623636
624637 return nil
@@ -713,34 +726,6 @@ func (w *WebSocketCommon) resubscribeUserDataStreams(connection *WebSocketConnec
713726 }
714727}
715728
716- // resubscribeRegularStreams resubscribes to all regular streams after reconnection.
717- //
718- // @param connection The WebSocketConnection to resubscribe streams on.
719- func (w * WebSocketCommon ) resubscribeRegularStreams (connection * WebSocketConnection ) {
720- for _ , stream := range connection .StreamConnectionMap {
721- subscribePayload := map [string ]interface {}{
722- "method" : "SUBSCRIBE" ,
723- "params" : []string {stream },
724- "id" : GenerateUUID (),
725- }
726-
727- message , err := json .Marshal (subscribePayload )
728- if err != nil {
729- log .Printf ("Error during resubscription to stream %s: %v" , stream , err )
730- continue
731- }
732-
733- connection .mu .Lock ()
734- err = connection .Websocket .WriteMessage (websocket .TextMessage , message )
735- connection .mu .Unlock ()
736- if err != nil {
737- log .Printf ("Error sending resubscription message for stream %s: %v" , stream , err )
738- continue
739- }
740- log .Printf ("Resubscribed to stream %s on reconnection" , stream )
741- }
742- }
743-
744729// isConnectionReady checks if the WebSocket connection is open.
745730//
746731// @param connection The WebSocketConnection to check.
@@ -867,7 +852,7 @@ func NewWebsocketAPI(cfg *ConfigurationWebsocketApi) (*WebsocketAPI, error) {
867852// @param userAgent The user agent string to be used for the connection.
868853// @return An error if the connection fails, otherwise nil.
869854func (w * WebsocketAPI ) Connect (userAgent string ) error {
870- return w .WsCommon .Connect (w .Cfg , userAgent )
855+ return w .WsCommon .Connect (w .Cfg , userAgent , [] string {} )
871856}
872857
873858// SendMessage sends a message over the WebSocket connection and returns channels for the response and error.
@@ -1121,8 +1106,25 @@ func NewWebsocketStreams(cfg *ConfigurationWebsocketStreams) (*WebsocketStreams,
11211106 }, nil
11221107}
11231108
1124- func (w * WebsocketStreams ) Connect (userAgent string ) error {
1125- return w .WsCommon .Connect (w .Cfg , userAgent )
1109+ // Connect establishes the WebSocket connection using the provided user agent and streams.
1110+ //
1111+ // @param userAgent The user agent string to be used for the connection.
1112+ // @param streams A slice of stream names to subscribe to upon connection.
1113+ // @return An error if the connection fails, otherwise nil.
1114+ func (w * WebsocketStreams ) Connect (userAgent string , streams []string ) error {
1115+ err := w .WsCommon .Connect (w .Cfg , userAgent , streams )
1116+ if err != nil {
1117+ fmt .Println ("WebSocket connection error:" , err )
1118+ return err
1119+ }
1120+ if streams != nil && len (streams ) > 0 {
1121+ conn := w .WsCommon .Connections [0 ]
1122+ for _ , stream := range streams {
1123+ w .GlobalStreamConnectionMap [stream ] = append (w .GlobalStreamConnectionMap [stream ], conn )
1124+ conn .StreamConnectionMap = append (conn .StreamConnectionMap , stream )
1125+ }
1126+ }
1127+ return nil
11261128}
11271129
11281130// Subscribe subscribes to the specified streams.
0 commit comments