Skip to content

Commit 1b6725a

Browse files
authored
fix(io): revise websocket connection (#3728)
Signed-off-by: Song Gao <disxiaofei@163.com>
1 parent abd1f0b commit 1b6725a

File tree

3 files changed

+10
-8
lines changed

3 files changed

+10
-8
lines changed

internal/io/http/httpserver/websocket_server.go

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"context"
1919
"fmt"
2020
"net/http"
21-
"strings"
2221
"sync"
2322

2423
"github.com/gorilla/websocket"
@@ -92,9 +91,8 @@ func sendProcess(ctx api.StreamContext, topic, sourceID string, c *websocket.Con
9291
case d := <-ch:
9392
data := d.([]byte)
9493
if err := c.WriteMessage(websocket.TextMessage, data); err != nil {
95-
if websocket.IsCloseError(err) || strings.Contains(err.Error(), "close") {
96-
return
97-
}
94+
conf.Log.Errorf("write websocket msg err:%v, topic:%v", err, topic)
95+
return
9896
}
9997
}
10098
}
@@ -114,10 +112,8 @@ func recvProcess(ctx api.StreamContext, topic string, c *websocket.Conn, cancel
114112
}
115113
msgType, data, err := c.ReadMessage()
116114
if err != nil {
117-
if websocket.IsCloseError(err) || strings.Contains(err.Error(), "close") {
118-
return
119-
}
120-
continue
115+
conf.Log.Errorf("read websocket msg err:%v, topic:%v", err, topic)
116+
return
121117
}
122118
switch msgType {
123119
case websocket.TextMessage:

internal/io/websocket/websocket_sink.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ func (w *WebsocketSink) Connect(ctx api.StreamContext, sch api.StatusChangeHandl
6060
return err
6161
}
6262
conn, err := w.cw.Wait(ctx)
63+
if err != nil {
64+
return err
65+
}
6366
if conn == nil {
6467
return fmt.Errorf("websocket endpoint not ready: %v", err)
6568
}

internal/io/websocket/websocket_source.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ func (w *WebsocketSource) Connect(ctx api.StreamContext, sc api.StatusChangeHand
6464
return err
6565
}
6666
conn, err := cw.Wait(ctx)
67+
if err != nil {
68+
return err
69+
}
6770
if conn == nil {
6871
return fmt.Errorf("websocket endpoint not ready: %v", err)
6972
}

0 commit comments

Comments
 (0)