Skip to content

Commit 599b7b1

Browse files
authored
Merge pull request #183 from hyperledger/bg-connect
2 parents 8eb7508 + 8b0ff37 commit 599b7b1

File tree

5 files changed

+77
-7
lines changed

5 files changed

+77
-7
lines changed

pkg/i18n/en_base_config_descriptions.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ var (
6161

6262
ConfigGlobalWsConnectionTimeout = ffc("config.global.ws.connectionTimeout", "The amount of time to wait while establishing a connection (or auto-reconnection)", TimeDurationType)
6363
ConfigGlobalWsHeartbeatInterval = ffc("config.global.ws.heartbeatInterval", "The amount of time to wait between heartbeat signals on the WebSocket connection", TimeDurationType)
64+
ConfigGlobalWsBackgroundConnect = ffc("config.global.ws.backgroundConnect", "When true the connection is established in the background with infinite reconnect (makes initialConnectAttempts redundant when set)", BooleanType)
6465
ConfigGlobalWsInitialConnectAttempts = ffc("config.global.ws.initialConnectAttempts", "The number of attempts FireFly will make to connect to the WebSocket when starting up, before failing", IntType)
6566
ConfigGlobalWsPath = ffc("config.global.ws.path", "The WebSocket sever URL to which FireFly should connect", "WebSocket URL "+StringType)
6667
ConfigGlobalWsReadBufferSize = ffc("config.global.ws.readBufferSize", "The size in bytes of the read buffer for the WebSocket connection", ByteSizeType)

pkg/wsclient/wsclient.go

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ type WSConfig struct {
4646
InitialDelay time.Duration `json:"initialDelay,omitempty"`
4747
MaximumDelay time.Duration `json:"maximumDelay,omitempty"`
4848
DelayFactor float64 `json:"delayFactor,omitempty"`
49-
InitialConnectAttempts int `json:"initialConnectAttempts,omitempty"`
49+
BackgroundConnect bool `json:"backgroundConnect,omitempty"`
50+
InitialConnectAttempts int `json:"initialConnectAttempts,omitempty"` // recommend backgroundConnect instead
5051
DisableReconnect bool `json:"disableReconnect"`
5152
AuthUsername string `json:"authUsername,omitempty"`
5253
AuthPassword string `json:"authPassword,omitempty"`
@@ -104,6 +105,7 @@ type wsClient struct {
104105
ctx context.Context
105106
headers http.Header
106107
url string
108+
backgroundConnect bool
107109
initialRetryAttempts int
108110
wsdialer *websocket.Dialer
109111
wsconn *websocket.Conn
@@ -114,6 +116,8 @@ type wsClient struct {
114116
receiveExt chan *WSPayload
115117
send chan []byte
116118
sendDone chan []byte
119+
bgConnCancelCtx context.CancelFunc
120+
bgConnDone chan struct{}
117121
closing chan struct{}
118122
beforeConnect WSPreConnectHandler
119123
afterConnect WSPostConnectHandler
@@ -153,6 +157,7 @@ func New(ctx context.Context, config *WSConfig, beforeConnect WSPreConnectHandle
153157
MaximumDelay: config.MaximumDelay,
154158
Factor: config.DelayFactor,
155159
},
160+
backgroundConnect: config.BackgroundConnect,
156161
initialRetryAttempts: config.InitialConnectAttempts,
157162
headers: make(http.Header),
158163
send: make(chan []byte),
@@ -225,12 +230,28 @@ func (w *wsClient) setupReceiveChannel() {
225230

226231
func (w *wsClient) Connect() error {
227232

233+
if w.backgroundConnect && w.bgConnDone == nil {
234+
w.bgConnDone = make(chan struct{})
235+
w.ctx, w.bgConnCancelCtx = context.WithCancel(w.ctx)
236+
go func() {
237+
defer close(w.bgConnDone)
238+
err := w.initialConnect()
239+
if err != nil {
240+
// Retry means we only reach here if context closes
241+
log.L(w.ctx).Errorf("Connection to WebSocket %s was never established before shutdown: %s", w.url, err)
242+
}
243+
}()
244+
return nil
245+
}
246+
247+
return w.initialConnect()
248+
}
249+
250+
func (w *wsClient) initialConnect() error {
228251
if err := w.connect(true); err != nil {
229252
return err
230253
}
231-
232254
go w.receiveReconnectLoop()
233-
234255
return nil
235256
}
236257

@@ -242,6 +263,12 @@ func (w *wsClient) Close() {
242263
if c != nil {
243264
_ = c.Close()
244265
}
266+
bgc := w.bgConnDone
267+
if bgc != nil {
268+
w.bgConnCancelCtx()
269+
<-w.bgConnDone
270+
w.bgConnDone = nil
271+
}
245272
}
246273
}
247274

@@ -337,7 +364,7 @@ func (w *wsClient) connect(initial bool) error {
337364
return false, i18n.NewError(w.ctx, i18n.MsgWSClosing)
338365
}
339366

340-
retry = !initial || attempt < w.initialRetryAttempts
367+
retry = w.backgroundConnect || !initial || attempt < w.initialRetryAttempts
341368
if w.beforeConnect != nil {
342369
if err = w.beforeConnect(w.ctx, w); err != nil {
343370
l.Warnf("WS %s connect attempt %d failed in beforeConnect", w.url, attempt)
@@ -346,7 +373,7 @@ func (w *wsClient) connect(initial bool) error {
346373
}
347374

348375
var res *http.Response
349-
w.wsconn, res, err = w.wsdialer.Dial(w.url, w.headers)
376+
w.wsconn, res, err = w.wsdialer.DialContext(w.ctx, w.url, w.headers)
350377
if err != nil {
351378
var b []byte
352379
var status = -1

pkg/wsclient/wsclient_test.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func TestWSClientE2ETLS(t *testing.T) {
130130

131131
}
132132

133-
func TestWSClientE2E(t *testing.T) {
133+
func TestWSClientE2EBG(t *testing.T) {
134134

135135
toServer, fromServer, url, close := NewTestWSServer(func(req *http.Request) {
136136
assert.Equal(t, "/test/updated", req.URL.Path)
@@ -155,7 +155,7 @@ func TestWSClientE2E(t *testing.T) {
155155
wsConfig.HTTPURL = url
156156
wsConfig.WSKeyPath = "/test"
157157
wsConfig.HeartbeatInterval = 50 * time.Millisecond
158-
wsConfig.InitialConnectAttempts = 2
158+
wsConfig.BackgroundConnect = true
159159

160160
wsc, err := New(context.Background(), wsConfig, beforeConnect, afterConnect)
161161
assert.NoError(t, err)
@@ -261,6 +261,21 @@ func TestWSClientE2EReceiveExt(t *testing.T) {
261261

262262
}
263263

264+
func TestWSNeverConnectBG(t *testing.T) {
265+
closedSvr := httptest.NewServer(&http.ServeMux{})
266+
closedSvr.Close()
267+
268+
wsc, err := New(context.Background(), &WSConfig{
269+
HTTPURL: closedSvr.URL,
270+
BackgroundConnect: true,
271+
}, nil, nil)
272+
assert.NoError(t, err)
273+
err = wsc.Connect()
274+
assert.NoError(t, err)
275+
276+
wsc.Close()
277+
}
278+
264279
func TestWSClientBadWSURL(t *testing.T) {
265280
wsConfig := generateConfig()
266281
wsConfig.WebSocketURL = ":::"

pkg/wsclient/wsconfig.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ const (
4242
WSConfigKeyReadBufferSize = "ws.readBufferSize"
4343
// WSConfigKeyInitialConnectAttempts sets how many times the websocket should attempt to connect on startup, before failing (after initial connection, retry is indefinite)
4444
WSConfigKeyInitialConnectAttempts = "ws.initialConnectAttempts"
45+
// WSConfigKeyBackgroundConnect is recommended instead of initialConnectAttempts for new uses of this library, and makes initial connection and reconnection identical in behavior
46+
WSConfigKeyBackgroundConnect = "ws.backgroundConnect"
4547
// WSConfigKeyPath if set will define the path to connect to - allows sharing of the same URL between HTTP and WebSocket connection info
4648
WSConfigKeyPath = "ws.path"
4749
// WSConfigURL if set will be a completely separate URL for WebSockets (must be a ws: or wss: scheme)
@@ -60,7 +62,15 @@ func InitConfig(conf config.Section) {
6062
ffresty.InitConfig(conf)
6163
conf.AddKnownKey(WSConfigKeyWriteBufferSize, defaultBufferSize)
6264
conf.AddKnownKey(WSConfigKeyReadBufferSize, defaultBufferSize)
65+
66+
// Note that conf.SetDefault(WSConfigKeyBackgroundConnect, true) is recommended for implementations
67+
// that embed this library, which will cause continual exponential backoff retry connection
68+
// even on the initial connection.
69+
conf.AddKnownKey(WSConfigKeyBackgroundConnect, false)
70+
71+
// Ignored if WSConfigKeyBackgroundConnect is true
6372
conf.AddKnownKey(WSConfigKeyInitialConnectAttempts, defaultInitialConnectAttempts)
73+
6474
conf.AddKnownKey(WSConfigKeyPath)
6575
conf.AddKnownKey(WSConfigURL)
6676
conf.AddKnownKey(WSConfigKeyHeartbeatInterval, defaultHeartbeatInterval)
@@ -84,6 +94,7 @@ func GenerateConfig(ctx context.Context, conf config.Section) (*WSConfig, error)
8494
MaximumDelay: conf.GetDuration(ffresty.HTTPConfigRetryMaxDelay),
8595
DelayFactor: conf.GetFloat64(WSConfigDelayFactor),
8696
InitialConnectAttempts: conf.GetInt(WSConfigKeyInitialConnectAttempts),
97+
BackgroundConnect: conf.GetBool(WSConfigKeyBackgroundConnect),
8798
HTTPHeaders: conf.GetObject(ffresty.HTTPConfigHeaders),
8899
AuthUsername: conf.GetString(ffresty.HTTPConfigAuthUsername),
89100
AuthPassword: conf.GetString(ffresty.HTTPConfigAuthPassword),

pkg/wsclient/wsconfig_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ func TestWSConfigGeneration(t *testing.T) {
3333
utConf.Set(WSConfigKeyWriteBufferSize, 1024)
3434
utConf.Set(WSConfigKeyInitialConnectAttempts, 1)
3535
utConf.Set(WSConfigKeyPath, "/websocket")
36+
utConf.Set(WSConfigKeyBackgroundConnect, true)
37+
utConf.Set(WSConfigKeyHeartbeatInterval, "42ms")
3638

3739
ctx := context.Background()
3840
wsConfig, err := GenerateConfig(ctx, utConf)
@@ -48,6 +50,20 @@ func TestWSConfigGeneration(t *testing.T) {
4850
assert.Equal(t, "custom value", wsConfig.HTTPHeaders.GetString("custom-header"))
4951
assert.Equal(t, 1024, wsConfig.ReadBufferSize)
5052
assert.Equal(t, 1024, wsConfig.WriteBufferSize)
53+
assert.True(t, wsConfig.BackgroundConnect)
54+
assert.Equal(t, 42*time.Millisecond, wsConfig.HeartbeatInterval)
55+
}
56+
57+
func TestWSConfigGenerationDefaults(t *testing.T) {
58+
resetConf()
59+
60+
ctx := context.Background()
61+
wsConfig, err := GenerateConfig(ctx, utConf)
62+
assert.NoError(t, err)
63+
64+
assert.Equal(t, defaultInitialConnectAttempts, wsConfig.InitialConnectAttempts)
65+
assert.False(t, wsConfig.BackgroundConnect)
66+
assert.Equal(t, 30*time.Second, wsConfig.HeartbeatInterval)
5167
}
5268

5369
func TestWSConfigTLSGenerationFail(t *testing.T) {

0 commit comments

Comments
 (0)