Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 15 additions & 26 deletions relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,45 +166,34 @@ func (r *Relay) ConnectWithTLS(ctx context.Context, tlsConfig *tls.Config) error
// ping every 29 seconds
ticker := time.NewTicker(29 * time.Second)

// to be used when the connection is closed
go func() {
<-r.connectionContext.Done()

// stop the ticker
ticker.Stop()

// nil the connection
r.Connection = nil

// close all subscriptions
for _, sub := range r.Subscriptions.Range {
sub.unsub(fmt.Errorf("relay connection closed: %w / %w", context.Cause(r.connectionContext), r.ConnectionError))
}
}()

// queue all write operations here so we don't do mutex spaghetti
go func() {
for {
select {
case <-r.connectionContext.Done():
ticker.Stop()
r.Connection = nil

for _, sub := range r.Subscriptions.Range {
sub.unsub(fmt.Errorf("relay connection closed: %w / %w", context.Cause(r.connectionContext), r.ConnectionError))
}
return

case <-ticker.C:
if r.Connection != nil {
err := r.Connection.Ping(r.connectionContext)
if err != nil && !strings.Contains(err.Error(), "failed to wait for pong") {
InfoLogger.Printf("{%s} error writing ping: %v; closing websocket", r.URL, err)
r.Close() // this should trigger a context cancelation
return
}
err := r.Connection.Ping(r.connectionContext)
if err != nil && !strings.Contains(err.Error(), "failed to wait for pong") {
InfoLogger.Printf("{%s} error writing ping: %v; closing websocket", r.URL, err)
r.Close() // this should trigger a context cancelation
return
}

case writeRequest := <-r.writeQueue:
// all write requests will go through this to prevent races
debugLogf("{%s} sending %v\n", r.URL, string(writeRequest.msg))
if err := r.Connection.WriteMessage(r.connectionContext, writeRequest.msg); err != nil {
writeRequest.answer <- err
}
close(writeRequest.answer)
case <-r.connectionContext.Done():
// stop here
return
}
}
}()
Expand Down
Loading