Skip to content

Commit 0af30f4

Browse files
Fix comm stack resource consumtion
- set reasonable read buffer size - cleanup subcon when they are closed - more efficient sessionID computation - remove delayed subcon close - remove delayed context deletion after view execution Signed-off-by: Marcus Brandenburger <bur@zurich.ibm.com>
1 parent 62469f8 commit 0af30f4

File tree

8 files changed

+49
-30
lines changed

8 files changed

+49
-30
lines changed

platform/view/services/comm/host/rest/host.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ package rest
99
import (
1010
"context"
1111
"crypto/tls"
12-
"fmt"
1312
"net/http"
13+
"strings"
1414

1515
"github.com/hyperledger-labs/fabric-smart-client/pkg/utils/errors"
1616
"github.com/hyperledger-labs/fabric-smart-client/platform/common/services/logging"
@@ -86,5 +86,13 @@ func (h *host) Close() error {
8686
func (h *host) Wait() {}
8787

8888
func StreamHash(info host2.StreamInfo) host2.StreamHash {
89-
return fmt.Sprintf("%s.%s.%s.%s", info.RemotePeerID, info.RemotePeerAddress, info.SessionID, info.ContextID)
89+
var sb strings.Builder
90+
sb.WriteString(info.RemotePeerID)
91+
sb.WriteRune('.')
92+
sb.WriteString(info.RemotePeerAddress)
93+
sb.WriteRune('.')
94+
sb.WriteString(info.SessionID)
95+
sb.WriteRune('.')
96+
sb.WriteString(info.ContextID)
97+
return sb.String()
9098
}

platform/view/services/comm/host/rest/websocket/multiplexed_provider.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -275,10 +275,7 @@ func (c *multiplexedServerConn) readIncoming(newStreamCallback func(pStream host
275275
logger.Debugf("server subconn errored: %v", mm.Err)
276276
} else if mm.Err != "" {
277277
logger.Debugf("Server subconn [%s] errored: %v", mm.ID, mm.Err)
278-
go func() {
279-
time.Sleep(1 * time.Second) // TODO: Find the point when the connection must close
280-
_ = sc.Close()
281-
}()
278+
_ = sc.Close()
282279
} else {
283280
sc.receiverChan <- result{value: mm.Msg}
284281
}
@@ -429,5 +426,10 @@ func (c *subConn) Close() error {
429426
// try to send closing handshake but ignore any error (in case connection is already closed)
430427
_ = c.parentConn.write(MultiplexedMessage{ID: c.id, Err: io.EOF.Error()})
431428

429+
// we need to clean up the parents subConns map
430+
c.parentConn.mu.Lock()
431+
delete(c.parentConn.subConns, c.id)
432+
c.parentConn.mu.Unlock()
433+
432434
return nil
433435
}

platform/view/services/comm/host/rest/websocket/stream.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"github.com/hyperledger-labs/fabric-smart-client/platform/common/services/logging"
1515
host2 "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host"
1616
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest"
17+
"go.uber.org/zap/zapcore"
1718
)
1819

1920
type connection interface {
@@ -70,7 +71,9 @@ func (s *stream) readMessages(ctx context.Context) {
7071
_ = s.Close()
7172
return
7273
}
73-
logger.Debugf("Read message of length [%d] on [%s]", len(msg), s.Hash())
74+
if logger.IsEnabledFor(zapcore.DebugLevel) {
75+
logger.Debugf("Read message of length [%d] on [%s]", len(msg), s.Hash())
76+
}
7477
s.reads <- result{value: msg, err: err}
7578
}
7679
}

platform/view/services/comm/host/rest/websocket/stream_test.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host"
2222
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest/websocket"
2323
"github.com/stretchr/testify/assert"
24+
"go.uber.org/goleak"
2425
)
2526

2627
func newMockStream(conn *mockConn) host.P2PStream {
@@ -65,6 +66,9 @@ func (c *mockConn) WrittenValues() <-chan []byte {
6566
}
6667

6768
func TestWriter(t *testing.T) {
69+
// let check that at the end of this test all our go routines are stopped
70+
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
71+
6872
conn := &mockConn{
6973
written: make(chan []byte, 100),
7074
read: make(chan []byte, 100),
@@ -100,6 +104,9 @@ func TestWriter(t *testing.T) {
100104
}
101105

102106
func TestReader(t *testing.T) {
107+
// let check that at the end of this test all our go routines are stopped
108+
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
109+
103110
conn := &mockConn{
104111
written: make(chan []byte, 100),
105112
read: make(chan []byte, 100),
@@ -117,23 +124,19 @@ func TestReader(t *testing.T) {
117124
assert.NoError(t, conn.ReadValue(message))
118125
}
119126
wg := sync.WaitGroup{}
120-
wg.Add(len(input))
127+
wg.Add(1)
121128

122-
output := make([]*comm.ViewPacket, 0, len(input))
123-
m := sync.RWMutex{}
124129
go func() {
125-
for {
130+
defer wg.Done()
131+
for _, in := range input {
126132
read := &comm.ViewPacket{}
127133
assert.NoError(t, r.ReadMsg(read))
128-
m.Lock()
129-
output = append(output, read)
130-
m.Unlock()
131-
wg.Done()
134+
assert.True(t, proto.Equal(in, read))
132135
}
133136
}()
134137
wg.Wait()
135138

136-
assert.Equal(t, len(input), len(output))
139+
assert.NoError(t, stream.Close())
137140
}
138141

139142
func messageOfSize(size int) proto.Message {

platform/view/services/comm/ids.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,13 @@ SPDX-License-Identifier: Apache-2.0
66

77
package comm
88

9-
import "encoding/base64"
9+
import (
10+
"crypto/sha256"
11+
"encoding/hex"
12+
)
1013

1114
func computeInternalSessionID(topic string, pkid []byte) string {
12-
return topic + "." + base64.StdEncoding.EncodeToString(pkid)
15+
hasher := sha256.New()
16+
hasher.Write(pkid)
17+
return topic + "." + hex.EncodeToString(hasher.Sum(nil))
1318
}

platform/view/services/comm/p2p.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,10 @@ import (
2424
)
2525

2626
const (
27-
masterSession = "master of puppets I'm pulling your strings"
28-
contextIDLabel tracing.LabelName = "context_id"
29-
sessionIDLabel tracing.LabelName = "session_id"
27+
masterSession = "master of puppets I'm pulling your strings"
28+
contextIDLabel tracing.LabelName = "context_id"
29+
sessionIDLabel tracing.LabelName = "session_id"
30+
defaultBufferSize = 4096
3031
)
3132

3233
var errStreamNotFound = errors.New("stream not found")
@@ -209,7 +210,7 @@ func (p *P2PNode) handleIncomingStream(stream host2.P2PStream) {
209210
func (p *P2PNode) handleStream(stream host2.P2PStream) {
210211
sh := &streamHandler{
211212
stream: stream,
212-
reader: io.NewVarintProtoReader(stream, 655360*2),
213+
reader: io.NewVarintProtoReader(stream, defaultBufferSize),
213214
writer: io.NewVarintProtoWriter(stream),
214215
node: p,
215216
}

platform/view/services/comm/session.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212

1313
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host"
1414
"github.com/hyperledger-labs/fabric-smart-client/platform/view/view"
15+
"go.uber.org/zap/zapcore"
1516
)
1617

1718
// NetworkStreamSession implements view.Session
@@ -82,7 +83,9 @@ func (n *NetworkStreamSession) closeInternal() {
8283
logger.Debugf("closing session [%s] with [%d] streams", n.sessionID, len(n.streams))
8384
toClose := make([]*streamHandler, 0, len(n.streams))
8485
for stream := range n.streams {
85-
logger.Debugf("session [%s], stream [%s], refCtr [%d]", n.sessionID, stream.stream.Hash(), stream.refCtr)
86+
if logger.IsEnabledFor(zapcore.DebugLevel) {
87+
logger.Debugf("session [%s], stream [%s], refCtr [%d]", n.sessionID, stream.stream.Hash(), stream.refCtr)
88+
}
8689
stream.refCtr--
8790
if stream.refCtr == 0 {
8891
toClose = append(toClose, stream)

platform/view/services/view/manager.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import (
1111
"reflect"
1212
"runtime/debug"
1313
"sync"
14-
"time"
1514

1615
"github.com/hyperledger-labs/fabric-smart-client/pkg/utils/errors"
1716
"github.com/hyperledger-labs/fabric-smart-client/platform/common/services/logging"
@@ -277,12 +276,7 @@ func (cm *Manager) respond(responder view.View, id view.Identity, msg *view.Mess
277276
if isNew {
278277
// delete context at the end of the execution
279278
res, err = func(ctx view.Context, responder view.View) (interface{}, error) {
280-
defer func() {
281-
// TODO: this is a workaround
282-
// give some time to flush anything can be in queues
283-
time.Sleep(5 * time.Second)
284-
cm.deleteContext(id, ctx.ID())
285-
}()
279+
defer cm.deleteContext(id, ctx.ID())
286280
return ctx.RunView(responder)
287281
}(ctx, responder)
288282
} else {

0 commit comments

Comments
 (0)