@@ -80,13 +80,13 @@ func (sm *StreamManager) Run() error {
8080 sm .Metrics .setLoginTime ()
8181 case StateDisconnected :
8282 // Reconnect on disconnection
83- return sm .resume (e . SMState )
83+ return sm .resume ()
8484 case StateStreamError :
8585 sm .client .Disconnect ()
8686 // Only try reconnecting if we have not been kicked by another session to avoid connection loop.
8787 // TODO: Make this conflict exception a permanent error
8888 if e .StreamError != "conflict" {
89- return sm .connect ()
89+ return sm .resume ()
9090 }
9191 case StatePermanentError :
9292 // Do not attempt to reconnect
@@ -113,19 +113,32 @@ func (sm *StreamManager) Stop() {
113113}
114114
115115func (sm * StreamManager ) connect () error {
116- var state SMState
117- return sm .resume (state )
116+ if sm .client != nil {
117+ if c , ok := sm .client .(* Client ); ok {
118+ if c .CurrentState .getState () == StateDisconnected {
119+ sm .Metrics = initMetrics ()
120+ err := c .Connect ()
121+ if err != nil {
122+ return err
123+ }
124+ if sm .PostConnect != nil {
125+ sm .PostConnect (sm .client )
126+ }
127+ return nil
128+ }
129+ }
130+ }
131+ return errors .New ("client is not disconnected" )
118132}
119133
120134// resume manages the reconnection loop and apply the define backoff to avoid overloading the server.
121- func (sm * StreamManager ) resume (state SMState ) error {
135+ func (sm * StreamManager ) resume () error {
122136 var backoff backoff // TODO: Group backoff calculation features with connection manager?
123137
124138 for {
125139 var err error
126140 // TODO: Make it possible to define logger to log disconnect and reconnection attempts
127141 sm .Metrics = initMetrics ()
128-
129142 if err = sm .client .Resume (); err != nil {
130143 var actualErr ConnError
131144 if xerrors .As (err , & actualErr ) {
0 commit comments