Skip to content

Commit 4ababb9

Browse files
authored
Merge pull request #28 from kaleido-io/fix-conn-dl
Fix hang broadcasting to broken connection
2 parents f319338 + 16c2b68 commit 4ababb9

File tree

6 files changed

+47
-13
lines changed

6 files changed

+47
-13
lines changed

internal/ws/wsserver.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,10 @@ func (s *webSocketServer) processReplies() {
224224

225225
func (s *webSocketServer) broadcastToConnections(connections []*webSocketConnection, message interface{}) {
226226
for _, c := range connections {
227-
c.broadcast <- message
227+
select {
228+
case c.broadcast <- message:
229+
case <-c.closing:
230+
log.L(s.ctx).Warnf("Connection %s closed while attempting to deliver reply", c.id)
231+
}
228232
}
229233
}

internal/ws/wsserver_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,3 +369,20 @@ func TestListenTopicClosing(t *testing.T) {
369369
topic: "test",
370370
})
371371
}
372+
373+
func TestBroadcastClosing(t *testing.T) {
374+
375+
w, ts := newTestWebSocketServer()
376+
defer ts.Close()
377+
w.getTopic("test")
378+
379+
c := &webSocketConnection{
380+
server: w,
381+
topics: make(map[string]*webSocketTopic),
382+
closing: make(chan struct{}),
383+
newTopic: make(chan bool),
384+
}
385+
close(c.closing)
386+
// Check this doesn't block
387+
c.server.broadcastToConnections([]*webSocketConnection{c}, "anything")
388+
}

pkg/fftm/api.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -83,13 +83,8 @@ func (m *manager) runAPIServer() {
8383
}
8484

8585
func (m *manager) runDebugServer() {
86-
var debugServer *http.Server
8786
debugPort := config.GetInt(tmconfig.DebugPort)
88-
8987
defer func() {
90-
if debugServer != nil {
91-
_ = debugServer.Close()
92-
}
9388
close(m.debugServerDone)
9489
}()
9590

@@ -100,8 +95,8 @@ func (m *manager) runDebugServer() {
10095
r.PathPrefix("/debug/pprof/symbol").HandlerFunc(pprof.Symbol)
10196
r.PathPrefix("/debug/pprof/trace").HandlerFunc(pprof.Trace)
10297
r.PathPrefix("/debug/pprof/").HandlerFunc(pprof.Index)
103-
debugServer = &http.Server{Addr: fmt.Sprintf("localhost:%d", debugPort), Handler: r, ReadHeaderTimeout: 30 * time.Second}
98+
m.debugServer = &http.Server{Addr: fmt.Sprintf("localhost:%d", debugPort), Handler: r, ReadHeaderTimeout: 30 * time.Second}
10499
log.L(m.ctx).Debugf("Debug HTTP endpoint listening on localhost:%d", debugPort)
105-
_ = debugServer.ListenAndServe()
100+
_ = m.debugServer.ListenAndServe()
106101
}
107102
}

pkg/fftm/api_test.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,10 @@ import (
2222
"testing"
2323

2424
"github.com/go-resty/resty/v2"
25+
"github.com/hyperledger/firefly-common/pkg/config"
2526
"github.com/hyperledger/firefly-common/pkg/fftypes"
2627
"github.com/hyperledger/firefly-transaction-manager/internal/confirmations"
28+
"github.com/hyperledger/firefly-transaction-manager/internal/tmconfig"
2729
"github.com/hyperledger/firefly-transaction-manager/mocks/confirmationsmocks"
2830
"github.com/hyperledger/firefly-transaction-manager/mocks/ffcapimocks"
2931
"github.com/hyperledger/firefly-transaction-manager/pkg/apitypes"
@@ -148,7 +150,7 @@ func TestSendTransactionE2E(t *testing.T) {
148150

149151
}
150152

151-
func TestDeployTransactionE2E(t *testing.T) {
153+
func TestDeployTransactionE2EWithDebugServer(t *testing.T) {
152154

153155
txSent := make(chan struct{})
154156

@@ -195,6 +197,7 @@ func TestDeployTransactionE2E(t *testing.T) {
195197
return n.NotificationType == confirmations.NewTransaction
196198
})).Return(nil)
197199

200+
config.Set(tmconfig.DebugPort, 0)
198201
m.Start()
199202

200203
req := strings.NewReader(sampleDeployTX)

pkg/fftm/manager.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package fftm
1818

1919
import (
2020
"context"
21+
"net/http"
2122
"sync"
2223
"time"
2324

@@ -87,6 +88,7 @@ type manager struct {
8788
blockListenerDone chan struct{}
8889
started bool
8990
apiServerDone chan error
91+
debugServer *http.Server
9092
debugServerDone chan struct{}
9193

9294
policyLoopInterval time.Duration
@@ -199,6 +201,9 @@ func (m *manager) Close() {
199201
m.cancelCtx()
200202
if m.started {
201203
m.started = false
204+
if m.debugServer != nil {
205+
m.debugServer.Close()
206+
}
202207
<-m.apiServerDone
203208
<-m.policyLoopDone
204209
<-m.blockListenerDone

pkg/fftm/policyloop.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,26 +37,34 @@ func (m *manager) policyLoop() {
3737
ctx := log.WithLogField(m.ctx, "role", "policyloop")
3838

3939
for {
40+
// Wait to be notified, or timeout to run
4041
timer := time.NewTimer(m.policyLoopInterval)
4142
select {
4243
case <-m.inflightUpdate:
43-
m.policyLoopCycle(ctx, false)
44-
case <-m.inflightStale:
45-
m.policyLoopCycle(ctx, true)
4644
case <-timer.C:
47-
m.policyLoopCycle(ctx, false)
4845
case <-ctx.Done():
4946
log.L(ctx).Infof("Receipt poller exiting")
5047
return
5148
}
49+
// Pop whether we were marked stale
50+
stale := false
51+
select {
52+
case <-m.inflightStale:
53+
stale = true
54+
default:
55+
}
56+
m.policyLoopCycle(ctx, stale)
5257
}
5358
}
5459

5560
func (m *manager) markInflightStale() {
61+
// First mark that we're stale
5662
select {
5763
case m.inflightStale <- true:
5864
default:
5965
}
66+
// Then ensure we queue a loop that picks up the stale marker
67+
m.markInflightUpdate()
6068
}
6169

6270
func (m *manager) markInflightUpdate() {
@@ -237,6 +245,7 @@ func (m *manager) execPolicy(ctx context.Context, pending *pendingState, syncDel
237245
log.L(ctx).Errorf("Policy engine returned error for transaction %s reason=%s: %s", mtx.ID, reason, err)
238246
m.addError(mtx, reason, err)
239247
} else {
248+
log.L(ctx).Debugf("Policy engine executed for tx %s (update=%d,status=%s,hash=%s)", mtx.ID, update, mtx.Status, mtx.TransactionHash)
240249
if mtx.FirstSubmit != nil && pending.trackingTransactionHash != mtx.TransactionHash {
241250
// If now submitted, add to confirmations manager for receipt checking
242251
m.trackSubmittedTransaction(ctx, pending)
@@ -257,6 +266,7 @@ func (m *manager) execPolicy(ctx context.Context, pending *pendingState, syncDel
257266
}
258267
if completed {
259268
pending.remove = true // for the next time round the loop
269+
log.L(ctx).Errorf("Transaction %s marked complete (status=%s): %s", mtx.ID, mtx.Status, err)
260270
m.markInflightStale()
261271
}
262272
case policyengine.UpdateDelete:

0 commit comments

Comments
 (0)