Skip to content

Commit 58339eb

Browse files
authored
Merge pull request #24 from libsv/feature/issue_15
Fix #15 : Implement closing method for websocket
2 parents ff8b05d + 4624600 commit 58339eb

File tree

19 files changed

+310
-112
lines changed

19 files changed

+310
-112
lines changed

.github/workflows/run-tests.yml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,7 @@ jobs:
4444
- name: Unit Tests
4545
run: make test-unit
4646
- name: Integration Tests
47-
run: go test -v -tags=integration ./...
48-
- name: Update code coverage
49-
run: bash <(curl -s https://codecov.io/bash)
47+
run: go test -race -v -tags=integration ./...
5048
- name: Build Rest Example
5149
run: go build ./example/rest
5250
- name: Build Websocket Example

.make/go.mk

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ install-go: ## Install the application (Using Native Go)
4242

4343
lint: ## Run the golangci-lint application (install if not found)
4444
@echo "downloading golangci-lint..."
45-
@curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- v1.33.0
45+
@curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- v1.42.0
4646
@echo "running golangci-lint..."
4747
@GOGC=20 ./bin/golangci-lint run
4848

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,5 +58,5 @@ go clean -testcache && go test -v ./...
5858

5959
To run integration tests, make sure you have `docker-compose up -d` on your local machine, then run
6060
```
61-
go clean -testcache && go test -v -tags=integration ./...
61+
go clean -testcache && go test -race -v -tags=integration ./...
6262
```

Client.go renamed to client.go

Lines changed: 101 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,50 @@
11
package spvchannels
22

33
import (
4+
"context"
45
"crypto/tls"
56
"encoding/json"
67
"errors"
78
"fmt"
89
"net/http"
910
"net/url"
11+
"sync"
1012
"time"
1113

1214
ws "github.com/gorilla/websocket"
1315
)
1416

15-
// WSCallBack is a callback to process websocket messages
17+
// NotificationHandlerFunc is a callback to process websocket messages
18+
// ctx : the handling context
1619
// t : message type
1720
// msg : message content
1821
// err : message error
19-
type WSCallBack = func(t int, msg []byte, err error) error
22+
type NotificationHandlerFunc = func(ctx context.Context, t int, msg []byte, err error) error
23+
24+
// ErrWSClose can be returned by a NotificationHandlerFunc to instruct the
25+
// socket client that we are finished processing messages and to close.
26+
//
27+
// This could be emitted as a result of a server sending a message with a payload
28+
// of 'close stream' or equivalent.
29+
type ErrWSClose struct {
30+
error
31+
}
32+
33+
// ErrorHandlerFunc is a callback to handle the error after processing the message
34+
// err : the error to handle
35+
type ErrorHandlerFunc func(err error)
2036

2137
// spvConfig hold configuration for rest api connection
2238
type spvConfig struct {
23-
insecure bool // equivalent curl -k
24-
baseURL string
25-
version string
26-
user string
27-
passwd string
28-
token string
29-
channelID string
30-
procces WSCallBack
31-
maxNotified uint64
39+
insecure bool // equivalent curl -k
40+
baseURL string
41+
version string
42+
user string
43+
passwd string
44+
token string
45+
channelID string
46+
procces NotificationHandlerFunc
47+
errHandler ErrorHandlerFunc
3248
}
3349

3450
// SPVConfigFunc set the rest api configuration
@@ -84,17 +100,19 @@ func WithChannelID(id string) SPVConfigFunc {
84100
}
85101

86102
// WithWebsocketCallBack provide the callback function to process notification messages
87-
func WithWebsocketCallBack(f WSCallBack) SPVConfigFunc {
103+
func WithWebsocketCallBack(f NotificationHandlerFunc) SPVConfigFunc {
88104
return func(c *spvConfig) {
89105
c.procces = f
90106
}
91107
}
92108

93-
// WithMaxNotified define the max number of notifications that websocket process.
94-
// After receiving enough messages, the websocket will automatically close
95-
func WithMaxNotified(m uint64) SPVConfigFunc {
109+
// WithErrorHandler can be provided with a function used to handle
110+
// errors when processing Socket message callbacks.
111+
//
112+
// Here you could log the errors, send to another system to drop them etc.
113+
func WithErrorHandler(e ErrorHandlerFunc) SPVConfigFunc {
96114
return func(c *spvConfig) {
97-
c.maxNotified = m
115+
c.errHandler = e
98116
}
99117
}
100118

@@ -108,7 +126,12 @@ func defaultSPVConfig() *spvConfig {
108126
passwd: "dev",
109127
token: "",
110128
channelID: "",
111-
procces: nil,
129+
procces: func(ctx context.Context, t int, msg []byte, err error) error {
130+
return err
131+
},
132+
errHandler: func(err error) {
133+
fmt.Printf("received err: %s\n", err)
134+
},
112135
}
113136
return cfg
114137
}
@@ -119,7 +142,7 @@ type Client struct {
119142
HTTPClient HTTPClient
120143
}
121144

122-
// NewClient create a new rest api client by providing fuctional config settings
145+
// NewClient create a new rest api client by providing functional config settings
123146
//
124147
// Example of usage :
125148
//
@@ -251,12 +274,15 @@ func (c *Client) sendRequest(req *http.Request, out interface{}) error {
251274
// - websocket connection
252275
// - number of received notifications
253276
type WSClient struct {
254-
cfg *spvConfig
255-
ws *ws.Conn
256-
nbNotified uint64
277+
mu sync.Mutex
278+
cfg *spvConfig
279+
ws *ws.Conn
280+
close chan bool
281+
started bool
257282
}
258283

259-
// NewWSClient create a new websocket client by providing fuctional config settings.
284+
// NewWSClient create a new connected websocket client by providing fuctional config settings.
285+
// After being created (connected), the websocket client is ready to listen to new messages
260286
//
261287
// Example of usage :
262288
//
@@ -268,7 +294,6 @@ type WSClient struct {
268294
// spv.WithToken(tok),
269295
// spv.WithInsecure(),
270296
// spv.WithWebsocketCallBack(PullUnreadMessages),
271-
// spv.WithMaxNotified(10),
272297
// )
273298
//
274299
// The full list of functional settings for a websocket client are :
@@ -296,48 +321,39 @@ type WSClient struct {
296321
// To specify a callback function to process the notification
297322
//
298323
// WithWebsocketCallBack(p PullUnreadMessages)
299-
//
300-
// To set the max number of notifications that user want to receive ( used in test only)
301-
//
302-
// After receiving enough notifications, the socket stop and close
303-
//
304-
// WithMaxNotified(n uint64)
305-
func NewWSClient(opts ...SPVConfigFunc) *WSClient {
324+
func NewWSClient(opts ...SPVConfigFunc) (*WSClient, error) {
306325
// Start with the defaults then overwrite config with any set by user
307326
cfg := defaultSPVConfig()
308327
for _, opt := range opts {
309328
opt(cfg)
310329
}
311330

312-
if cfg.procces == nil {
313-
cfg.procces = func(t int, msg []byte, err error) error {
314-
return nil
315-
}
331+
ws := &WSClient{
332+
cfg: cfg,
333+
ws: nil,
334+
close: make(chan bool),
316335
}
317336

318-
return &WSClient{
319-
cfg: cfg,
320-
ws: nil,
321-
nbNotified: 0,
337+
err := ws.connectServer()
338+
339+
if err != nil {
340+
return nil, err
322341
}
342+
343+
return ws, nil
323344
}
324345

325346
// urlPath return the path part of the connection URL
326347
func (c *WSClient) urlPath() string {
327348
return fmt.Sprintf("/api/%s/channel/%s/notify", c.cfg.version, c.cfg.channelID)
328349
}
329350

330-
// NbNotified return the number of processed messages
331-
func (c *WSClient) NbNotified() uint64 {
332-
return c.nbNotified
333-
}
334-
335-
// Run establishes the connection and start listening the notification stream
336-
// process the notification if a callback is provided
337-
func (c *WSClient) Run() error {
351+
// connectServer establish the connection to the server
352+
// Return error if any
353+
func (c *WSClient) connectServer() error {
338354
u := url.URL{
339355
Scheme: "wss",
340-
Host: "localhost:5010",
356+
Host: c.cfg.baseURL,
341357
Path: c.urlPath(),
342358
}
343359

@@ -360,26 +376,51 @@ func (c *WSClient) Run() error {
360376
return err
361377
}
362378
defer func() {
363-
_ = conn.Close()
364379
_ = httpRESP.Body.Close()
365380
}()
366381

367382
c.ws = conn
368383

369-
for c.nbNotified < c.cfg.maxNotified {
370-
t, msg, err := c.ws.ReadMessage()
371-
if err != nil {
372-
return fmt.Errorf("%w. Total processed %d messages", err, c.nbNotified)
373-
}
384+
return nil
385+
}
386+
387+
// Close stops reading any notification and closes the websocket
388+
// Usually it is called from a separate goroutine
389+
func (c *WSClient) Close() {
390+
if c.close == nil {
391+
return
392+
}
393+
c.close <- true
394+
close(c.close)
395+
c.close = nil
396+
_ = c.ws.Close()
374397

375-
c.nbNotified++
376-
if c.cfg.procces != nil {
377-
err = c.cfg.procces(t, msg, err)
378-
if err != nil {
379-
return fmt.Errorf("%w. Total processed %d messages", err, c.nbNotified)
398+
}
399+
400+
// Run establishes the connection and start listening the notification stream
401+
// process the notification if a callback is provided
402+
func (c *WSClient) Run() {
403+
404+
go func() {
405+
defer func() {
406+
_ = recover()
407+
c.Close()
408+
}()
409+
c.mu.Lock()
410+
c.started = true
411+
c.mu.Unlock()
412+
for {
413+
t, msg, err := c.ws.ReadMessage()
414+
if c.cfg.procces != nil {
415+
if err2 := c.cfg.procces(context.Background(), t, msg, err); err2 != nil {
416+
if errors.Is(err2, ErrWSClose{}) {
417+
return
418+
}
419+
c.cfg.errHandler(err2)
420+
}
380421
}
381422
}
382-
}
423+
}()
383424

384-
return nil
425+
<-c.close
385426
}

0 commit comments

Comments
 (0)