Skip to content

Commit 5812b16

Browse files
P2P comm session no more panic
- multiplexed websockets do not panic anymore - safe send and receive on sessions - add tests Signed-off-by: Marcus Brandenburger <bur@zurich.ibm.com>
1 parent e8e9d16 commit 5812b16

File tree

7 files changed

+368
-54
lines changed

7 files changed

+368
-54
lines changed

platform/view/services/comm/host/rest/websocket/multiplexed_provider.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ func (c *MultiplexedProvider) KillAll() error {
6363
}
6464

6565
// cleanup clients
66-
c.clients = make(map[string]*multiplexedClientConn)
66+
clear(c.clients)
6767

6868
return err
6969
}
@@ -227,11 +227,14 @@ func (c *multiplexedClientConn) readIncoming() {
227227
c.mu.RUnlock()
228228

229229
if !ok && mm.Err == "" {
230-
panic("subconn not found")
230+
// it might happen that we receive a message from the server after we have already closed the sub-connection
231+
// in this case we just ignore the message and drop it
232+
logger.Warnf("client sub-connection does not exist mmId=%v, dropping message", mm.ID)
233+
logger.Debugf("dropping message: `%s`", string(mm.Msg))
231234
} else if !ok && mm.Err != "" {
232-
logger.Debugf("Client subconnection errored: %v", mm.Err)
235+
logger.Debugf("client sub-connection does not exist mmId=%v, errored: %v", mm.ID, mm.Err)
233236
} else if mm.Err != "" {
234-
logger.Debugf("Client subconn errored: %v", mm.Err)
237+
logger.Debugf("client sub-connection mmId=%v errored: %v", mm.ID, mm.Err)
235238
} else {
236239
sc.receiverChan <- result{value: mm.Msg}
237240
}

platform/view/services/comm/host/rest/websocket/multiplexed_provider_test.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,12 @@ import (
2222
"testing"
2323
"time"
2424

25+
"github.com/gorilla/websocket"
2526
"github.com/hyperledger-labs/fabric-smart-client/platform/common/services/logging"
2627
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host"
2728
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics/disabled"
2829
"github.com/stretchr/testify/assert"
30+
"github.com/stretchr/testify/require"
2931
"go.opentelemetry.io/otel/trace/noop"
3032
"go.uber.org/goleak"
3133
)
@@ -147,6 +149,94 @@ func TestConnections(t *testing.T) {
147149
p.mu.RUnlock()
148150
}
149151

152+
func TestSendingOnClosedSubConnections(t *testing.T) {
153+
testSetup(t)
154+
155+
// let check that at the end of this test all our go routines are stopped
156+
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
157+
158+
p := NewMultiplexedProvider(noop.NewTracerProvider(), &disabled.Provider{})
159+
var wg sync.WaitGroup
160+
161+
wait := make(chan struct{})
162+
163+
// server
164+
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
165+
err := p.NewServerStream(w, r, func(s host.P2PStream) {
166+
wg.Add(1)
167+
go func(srv host.P2PStream) {
168+
defer wg.Done()
169+
serverLogger.Debugf("[server] new stream established with %v (ID=%v) sessionID=%v", srv.RemotePeerID(), srv.RemotePeerID(), srv.Hash())
170+
serverLogger.Debugf("[server] reading ...")
171+
172+
// server receives first message
173+
answer, err := readMsg(srv)
174+
require.NoError(t, err)
175+
require.EqualValues(t, []byte("ping"), answer)
176+
177+
// sends back a message
178+
err = sendMsg(srv, []byte("pong"))
179+
require.NoError(t, err)
180+
181+
// we wait for next orders
182+
<-wait
183+
184+
// we send a few messages at a high rate; eventually we should receive a channel closed message
185+
require.EventuallyWithT(t, func(c *assert.CollectT) {
186+
err = sendMsg(srv, []byte("pong again"))
187+
require.ErrorIs(c, err, websocket.ErrCloseSent)
188+
}, 100*time.Millisecond, time.Nanosecond)
189+
}(s)
190+
})
191+
require.NoError(t, err)
192+
}))
193+
194+
srvEndpoint := strings.TrimPrefix(srv.URL, "http://")
195+
196+
ctx, cancel := context.WithCancel(context.Background())
197+
defer cancel()
198+
info := host.StreamInfo{
199+
RemotePeerID: "serverID",
200+
RemotePeerAddress: srvEndpoint,
201+
ContextID: "someContextID",
202+
SessionID: "someSessionID",
203+
}
204+
src := host.PeerID("somePeerID")
205+
config := &tls.Config{}
206+
207+
client, err := p.NewClientStream(info, ctx, src, config)
208+
require.NoError(t, err)
209+
210+
// send ping
211+
err = sendMsg(client, []byte("ping"))
212+
require.NoError(t, err)
213+
214+
// expects pong
215+
answer, err := readMsg(client)
216+
require.NoError(t, err)
217+
require.EqualValues(t, []byte("pong"), answer)
218+
219+
// now the client is actually done and closes the subconn
220+
err = client.Close()
221+
require.NoError(t, err)
222+
223+
//time.Sleep(snoozeTime)
224+
// now we use our superpowers to let the server send another message
225+
wait <- struct{}{}
226+
227+
wg.Wait()
228+
229+
// close server
230+
srv.Close()
231+
232+
err = p.KillAll()
233+
require.NoError(t, err)
234+
235+
p.mu.RLock()
236+
require.Equal(t, 0, len(p.clients))
237+
p.mu.RUnlock()
238+
}
239+
150240
func testSetup(_ *testing.T) {
151241
logSpec := cmp.Or(
152242
os.Getenv("FABRIC_LOGGING_SPEC"),

platform/view/services/comm/master.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,24 @@ func (p *P2PNode) getOrCreateSession(sessionID, endpointAddress, contextID, call
3131
return session, nil
3232
}
3333

34+
ctx, cancel := context.WithCancel(context.Background())
3435
s := &NetworkStreamSession{
36+
node: p,
3537
endpointID: endpointID,
3638
endpointAddress: endpointAddress,
3739
contextID: contextID,
38-
callerViewID: callerViewID,
39-
caller: caller,
4040
sessionID: sessionID,
41-
node: p,
41+
caller: caller,
42+
callerViewID: callerViewID,
4243
incoming: make(chan *view.Message, 1),
4344
streams: make(map[*streamHandler]struct{}),
45+
ctx: ctx,
46+
cancel: cancel,
4447
}
4548

4649
if msg != nil {
4750
logger.Debugf("pushing first message to [%s], [%s]", internalSessionID, msg)
48-
s.incoming <- msg
51+
s.enqueue(msg)
4952
} else {
5053
logger.Debugf("no first message to push to [%s]", internalSessionID)
5154
}

platform/view/services/comm/p2p.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func (p *P2PNode) dispatchMessages(ctx context.Context) {
148148
p.dispatchMutex.Unlock()
149149

150150
logger.Debugf("pushing message to [%s], [%s]", internalSessionID, msg.message)
151-
session.incoming <- msg.message
151+
session.enqueue(msg.message)
152152
case <-ctx.Done():
153153
logger.Info("closing p2p comm...")
154154
return

platform/view/services/comm/session.go

Lines changed: 82 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,25 @@ package comm
99
import (
1010
"context"
1111
"sync"
12+
"sync/atomic"
1213

14+
"github.com/hyperledger-labs/fabric-smart-client/pkg/utils/errors"
15+
"github.com/hyperledger-labs/fabric-smart-client/pkg/utils/proto"
1316
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host"
1417
"github.com/hyperledger-labs/fabric-smart-client/platform/view/view"
1518
"go.uber.org/zap/zapcore"
1619
)
1720

21+
// ErrSessionClosed is returned when a message is sent when the session is closed.
22+
var ErrSessionClosed = errors.New("session closed")
23+
24+
type sender interface {
25+
sendTo(ctx context.Context, info host.StreamInfo, msg proto.Message) error
26+
}
27+
1828
// NetworkStreamSession implements view.Session
1929
type NetworkStreamSession struct {
20-
node *P2PNode
30+
node sender
2131
endpointID []byte
2232
endpointAddress string
2333
contextID string
@@ -26,10 +36,16 @@ type NetworkStreamSession struct {
2636
callerViewID string
2737
incoming chan *view.Message
2838
streams map[*streamHandler]struct{}
29-
closed bool
3039
mutex sync.RWMutex
40+
41+
closed atomic.Bool
42+
closeOnce sync.Once
43+
ctx context.Context
44+
cancel context.CancelFunc
45+
wg sync.WaitGroup
3146
}
3247

48+
// Info returns a view.SessionInfo.
3349
func (n *NetworkStreamSession) Info() view.SessionInfo {
3450
n.mutex.RLock()
3551
defer n.mutex.RUnlock()
@@ -39,25 +55,27 @@ func (n *NetworkStreamSession) Info() view.SessionInfo {
3955
CallerViewID: n.callerViewID,
4056
Endpoint: n.endpointAddress,
4157
EndpointPKID: n.endpointID,
42-
Closed: n.closed,
58+
Closed: n.isClosed(),
4359
}
4460
return ret
4561
}
4662

47-
// Send sends the payload to the endpoint
63+
// Send sends the payload to the endpoint.
4864
func (n *NetworkStreamSession) Send(payload []byte) error {
4965
return n.SendWithContext(context.TODO(), payload)
5066
}
5167

68+
// SendWithContext sends the payload to the endpoint with the passed context.Context.
5269
func (n *NetworkStreamSession) SendWithContext(ctx context.Context, payload []byte) error {
5370
return n.sendWithStatus(ctx, payload, view.OK)
5471
}
5572

56-
// SendError sends an error to the endpoint with the passed payload
73+
// SendError sends an error to the endpoint with the passed payload.
5774
func (n *NetworkStreamSession) SendError(payload []byte) error {
5875
return n.SendErrorWithContext(context.TODO(), payload)
5976
}
6077

78+
// SendErrorWithContext sends an error to the endpoint with the passed context.Context and payload.
6179
func (n *NetworkStreamSession) SendErrorWithContext(ctx context.Context, payload []byte) error {
6280
return n.sendWithStatus(ctx, payload, view.ERROR)
6381
}
@@ -67,45 +85,77 @@ func (n *NetworkStreamSession) Receive() <-chan *view.Message {
6785
return n.incoming
6886
}
6987

88+
// enqueue enqueues a message into the session's incoming channel.
89+
// If the session is closed, the message will be dropped and a warning is logged.
90+
func (n *NetworkStreamSession) enqueue(msg *view.Message) {
91+
if msg == nil {
92+
return
93+
}
94+
95+
if n.isClosed() {
96+
logger.Warnf("dropping message from %s for closed session [%s]", msg.Caller, msg.SessionID)
97+
return
98+
}
99+
100+
n.wg.Add(1)
101+
defer n.wg.Done()
102+
103+
select {
104+
case <-n.ctx.Done():
105+
logger.Warnf("dropping message from %s for closed session [%s]", msg.Caller, msg.SessionID)
106+
return
107+
case n.incoming <- msg:
108+
}
109+
}
110+
70111
// Close releases all the resources allocated by this session
71112
func (n *NetworkStreamSession) Close() {
72-
n.node.sessionsMutex.Lock()
73-
defer n.node.sessionsMutex.Unlock()
74-
75113
n.closeInternal()
76114
}
77115

78116
func (n *NetworkStreamSession) closeInternal() {
79-
if n.closed {
80-
return
81-
}
82-
83-
logger.Debugf("closing session [%s] with [%d] streams", n.sessionID, len(n.streams))
84-
toClose := make([]*streamHandler, 0, len(n.streams))
85-
for stream := range n.streams {
86-
if logger.IsEnabledFor(zapcore.DebugLevel) {
87-
logger.Debugf("session [%s], stream [%s], refCtr [%d]", n.sessionID, stream.stream.Hash(), stream.refCtr)
117+
n.closeOnce.Do(func() {
118+
n.mutex.Lock()
119+
defer n.mutex.Unlock()
120+
121+
logger.Debugf("closing session [%s] with [%d] streams", n.sessionID, len(n.streams))
122+
toClose := make([]*streamHandler, 0, len(n.streams))
123+
for stream := range n.streams {
124+
if logger.IsEnabledFor(zapcore.DebugLevel) {
125+
logger.Debugf("session [%s], stream [%s], refCtr [%d]", n.sessionID, stream.stream.Hash(), stream.refCtr)
126+
}
127+
stream.refCtr--
128+
if stream.refCtr == 0 {
129+
toClose = append(toClose, stream)
130+
}
88131
}
89-
stream.refCtr--
90-
if stream.refCtr == 0 {
91-
toClose = append(toClose, stream)
132+
133+
logger.Debugf("closing session [%s]'s streams [%d]", n.sessionID, len(toClose))
134+
for _, stream := range toClose {
135+
stream.close(context.TODO())
92136
}
93-
}
137+
logger.Debugf("closing session [%s]'s streams [%d] done", n.sessionID, len(toClose))
94138

95-
logger.Debugf("closing session [%s]'s streams [%d]", n.sessionID, len(toClose))
96-
for _, stream := range toClose {
97-
stream.close(context.TODO())
98-
}
139+
// next we are closing the incoming and the closing signal channel to drain the receivers;
140+
n.closed.Store(true)
141+
n.cancel()
142+
n.wg.Wait()
143+
close(n.incoming)
144+
n.streams = make(map[*streamHandler]struct{})
99145

100-
logger.Debugf("closing session [%s]'s streams [%d] done", n.sessionID, len(toClose))
101-
close(n.incoming)
102-
n.closed = true
103-
n.streams = make(map[*streamHandler]struct{})
146+
logger.Debugf("closing session [%s] done", n.sessionID)
147+
})
148+
}
104149

105-
logger.Debugf("closing session [%s] done", n.sessionID)
150+
func (n *NetworkStreamSession) isClosed() bool {
151+
return n.closed.Load()
106152
}
107153

108154
func (n *NetworkStreamSession) sendWithStatus(ctx context.Context, payload []byte, status int32) error {
155+
if n.isClosed() {
156+
return ErrSessionClosed
157+
}
158+
109159
n.mutex.RLock()
110160
info := host.StreamInfo{
111161
RemotePeerID: string(n.endpointID),
@@ -123,11 +173,12 @@ func (n *NetworkStreamSession) sendWithStatus(ctx context.Context, payload []byt
123173
n.mutex.RUnlock()
124174

125175
err := n.node.sendTo(ctx, info, packet)
126-
logger.Debugf("sent message [len:%d] to [%s:%s] from [%s] with err [%v]",
176+
logger.Debugf("sent message [len:%d] to [%s:%s] from [%s] [status:%v] with err [%v]",
127177
len(payload),
128178
info.RemotePeerID,
129179
info.RemotePeerAddress,
130180
packet.Caller,
181+
status,
131182
err,
132183
)
133184
return err

0 commit comments

Comments
 (0)