Skip to content

Commit 284e921

Browse files
author
Bernhard B
committed
fixed bug in json-rpc mode
* properly close websocket connection when client disconnects see #198
1 parent b64ee6b commit 284e921

File tree

3 files changed

+65
-36
lines changed

3 files changed

+65
-36
lines changed

src/api/api.go

Lines changed: 54 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"net/http"
77
"strconv"
88
"time"
9+
"errors"
910

1011
"github.com/gabriel-vasile/mimetype"
1112
"github.com/gin-gonic/gin"
@@ -332,32 +333,58 @@ func (a *Api) SendV2(c *gin.Context) {
332333
c.JSON(201, SendMessageResponse{Timestamp: strconv.FormatInt((*timestamps)[0].Timestamp, 10)})
333334
}
334335

335-
func (a *Api) handleSignalReceive(ws *websocket.Conn, number string) {
336+
func (a *Api) handleSignalReceive(ws *websocket.Conn, number string, stop chan struct{}) {
337+
receiveChannel, err := a.signalClient.GetReceiveChannel(number)
338+
if err != nil {
339+
log.Error("Couldn't get receive channel: ", err.Error())
340+
return
341+
}
342+
336343
for {
337-
data, err := a.signalClient.Receive(number, 0)
338-
if err == nil {
339-
err = ws.WriteMessage(websocket.TextMessage, []byte(data))
340-
if err != nil {
341-
log.Error("Couldn't write message: " + err.Error())
342-
return
343-
}
344-
} else {
345-
errorMsg := Error{Msg: err.Error()}
346-
errorMsgBytes, err := json.Marshal(errorMsg)
347-
if err != nil {
348-
log.Error("Couldn't serialize error message: " + err.Error())
349-
return
344+
select {
345+
case <-stop:
346+
ws.Close()
347+
return
348+
case msg := <-receiveChannel:
349+
var data string = string(msg.Params)
350+
var err error = nil
351+
if msg.Err.Code != 0 {
352+
err = errors.New(msg.Err.Message)
350353
}
351-
err = ws.WriteMessage(websocket.TextMessage, errorMsgBytes)
352-
if err != nil {
353-
log.Error("Couldn't write message: " + err.Error())
354-
return
354+
355+
if err == nil {
356+
if data != "" {
357+
err = ws.WriteMessage(websocket.TextMessage, []byte(data))
358+
if err != nil {
359+
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
360+
log.Error("Couldn't write message: " + err.Error())
361+
}
362+
return
363+
}
364+
}
365+
} else {
366+
errorMsg := Error{Msg: err.Error()}
367+
errorMsgBytes, err := json.Marshal(errorMsg)
368+
if err != nil {
369+
log.Error("Couldn't serialize error message: " + err.Error())
370+
return
371+
}
372+
err = ws.WriteMessage(websocket.TextMessage, errorMsgBytes)
373+
if err != nil {
374+
log.Error("Couldn't write message: " + err.Error())
375+
return
376+
}
355377
}
356378
}
357379
}
358380
}
359381

360-
func wsPong(ws *websocket.Conn) {
382+
func wsPong(ws *websocket.Conn, stop chan struct{}) {
383+
defer func() {
384+
close(stop)
385+
ws.Close()
386+
}()
387+
361388
ws.SetReadLimit(512)
362389
ws.SetPongHandler(func(string) error { log.Debug("Received pong"); return nil })
363390
for {
@@ -368,10 +395,13 @@ func wsPong(ws *websocket.Conn) {
368395
}
369396
}
370397

371-
func wsPing(ws *websocket.Conn) {
398+
func wsPing(ws *websocket.Conn, stop chan struct{}) {
372399
pingTicker := time.NewTicker(pingPeriod)
373400
for {
374401
select {
402+
case <-stop:
403+
ws.Close()
404+
return
375405
case <-pingTicker.C:
376406
if err := ws.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
377407
return
@@ -400,9 +430,10 @@ func (a *Api) Receive(c *gin.Context) {
400430
return
401431
}
402432
defer ws.Close()
403-
go a.handleSignalReceive(ws, number)
404-
go wsPing(ws)
405-
wsPong(ws)
433+
var stop = make(chan struct{})
434+
go a.handleSignalReceive(ws, number, stop)
435+
go wsPing(ws, stop)
436+
wsPong(ws, stop)
406437
} else {
407438
timeout := c.DefaultQuery("timeout", "1")
408439
timeoutInt, err := strconv.ParseInt(timeout, 10, 32)

src/client/client.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -591,15 +591,7 @@ func (s *SignalClient) SendV2(number string, message string, recps []string, bas
591591

592592
func (s *SignalClient) Receive(number string, timeout int64) (string, error) {
593593
if s.signalCliMode == JsonRpc {
594-
jsonRpc2Client, err := s.getJsonRpc2Client(number)
595-
if err != nil {
596-
return "", err
597-
}
598-
msg := jsonRpc2Client.ReceiveMessage()
599-
if msg.Err.Code != 0 {
600-
return "", errors.New(msg.Err.Message)
601-
}
602-
return string(msg.Params), nil
594+
return "", errors.New("Not implemented")
603595
} else {
604596
command := []string{"--config", s.signalCliConfig, "--output", "json", "-a", number, "receive", "-t", strconv.FormatInt(timeout, 10)}
605597

@@ -624,6 +616,14 @@ func (s *SignalClient) Receive(number string, timeout int64) (string, error) {
624616
}
625617
}
626618

619+
func (s *SignalClient) GetReceiveChannel(number string) (chan JsonRpc2ReceivedMessage, error) {
620+
jsonRpc2Client, err := s.getJsonRpc2Client(number)
621+
if err != nil {
622+
return nil, err
623+
}
624+
return jsonRpc2Client.GetReceiveChannel(), nil
625+
}
626+
627627
func (s *SignalClient) CreateGroup(number string, name string, members []string, description string, editGroupPermission GroupPermission, addMembersPermission GroupPermission, groupLinkState GroupLinkState) (string, error) {
628628
var internalGroupId string
629629
if s.signalCliMode == JsonRpc {

src/client/jsonrpc2.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,8 +133,6 @@ func (r *JsonRpc2Client) ReceiveData(number string) {
133133
}
134134
}
135135

136-
//blocks until message a message is received
137-
func (r *JsonRpc2Client) ReceiveMessage() JsonRpc2ReceivedMessage {
138-
resp := <-r.receivedMessages
139-
return resp
136+
func (r *JsonRpc2Client) GetReceiveChannel() chan JsonRpc2ReceivedMessage {
137+
return r.receivedMessages
140138
}

0 commit comments

Comments
 (0)