Skip to content

Commit 10fd368

Browse files
Fix flaky session test
The TestSessionLifecycleConcurrent tests if messages can be enqueued on a closed session. As `enqueue` should drop messages when a session is closed, the test checked that the underlying chan is empty. However, this test is flaky as it depends on the consumer goroutine to consume all messages. To simplify this test, `enqueue` now returns true if a message was enqueued in the session, or false if the message is dropped. The test can now check the returned value. Signed-off-by: Marcus Brandenburger <bur@zurich.ibm.com>
1 parent c7ab59f commit 10fd368

File tree

4 files changed

+22
-12
lines changed

4 files changed

+22
-12
lines changed

platform/view/services/comm/master.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@ func (p *P2PNode) getOrCreateSession(sessionID, endpointAddress, contextID, call
4848

4949
if msg != nil {
5050
logger.Debugf("pushing first message to [%s], [%s]", internalSessionID, msg)
51-
s.enqueue(msg)
51+
if ok := s.enqueue(msg); !ok {
52+
panic("programming error: can not enqueue message in newly created session")
53+
}
5254
} else {
5355
logger.Debugf("no first message to push to [%s]", internalSessionID)
5456
}

platform/view/services/comm/p2p.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,11 @@ func (p *P2PNode) dispatchMessages(ctx context.Context) {
147147
}
148148
p.dispatchMutex.Unlock()
149149

150-
logger.Debugf("pushing message to [%s], [%s]", internalSessionID, msg.message)
151-
session.enqueue(msg.message)
150+
if ok = session.enqueue(msg.message); ok {
151+
logger.Debugf("pushing message to [%s], [%s]", internalSessionID, msg.message)
152+
} else {
153+
logger.Warnf("dropping message from %s for closed session [%s]", msg.message.Caller, msg.message.SessionID)
154+
}
152155
case <-ctx.Done():
153156
logger.Info("closing p2p comm...")
154157
return

platform/view/services/comm/session.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -86,15 +86,14 @@ func (n *NetworkStreamSession) Receive() <-chan *view.Message {
8686
}
8787

8888
// enqueue enqueues a message into the session's incoming channel.
89-
// If the session is closed, the message will be dropped and a warning is logged.
90-
func (n *NetworkStreamSession) enqueue(msg *view.Message) {
89+
// If the session is closed, the message will be dropped and false returned, otherwise true is returned.
90+
func (n *NetworkStreamSession) enqueue(msg *view.Message) bool {
9191
if msg == nil {
92-
return
92+
return false
9393
}
9494

9595
if n.isClosed() {
96-
logger.Warnf("dropping message from %s for closed session [%s]", msg.Caller, msg.SessionID)
97-
return
96+
return false
9897
}
9998

10099
n.wg.Add(1)
@@ -103,8 +102,9 @@ func (n *NetworkStreamSession) enqueue(msg *view.Message) {
103102
select {
104103
case <-n.ctx.Done():
105104
logger.Warnf("dropping message from %s for closed session [%s]", msg.Caller, msg.SessionID)
106-
return
105+
return false
107106
case n.incoming <- msg:
107+
return true
108108
}
109109
}
110110

platform/view/services/comm/session_test.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"time"
1515

1616
"github.com/hyperledger-labs/fabric-smart-client/pkg/utils/proto"
17+
"github.com/hyperledger-labs/fabric-smart-client/platform/common/services/logging"
1718
host2 "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host"
1819
"github.com/hyperledger-labs/fabric-smart-client/platform/view/view"
1920
"github.com/stretchr/testify/assert"
@@ -34,6 +35,10 @@ func (m *mockSender) sendTo(ctx context.Context, info host2.StreamInfo, msg prot
3435
}
3536

3637
func setup() *NetworkStreamSession {
38+
logging.Init(logging.Config{
39+
LogSpec: "fsc.view.services.comm=error",
40+
})
41+
3742
sessionID := "someSessionID"
3843
contextID := "someContextID"
3944
endpointAddress := "someEndpointAddress"
@@ -132,16 +137,15 @@ func TestSessionLifecycleConcurrent(t *testing.T) {
132137

133138
// send a few messages
134139
for i := range numMessage {
135-
s.enqueue(&view.Message{Payload: []byte(fmt.Sprintf("msg #%v", i))})
140+
assert.True(t, s.enqueue(&view.Message{Payload: []byte(fmt.Sprintf("msg #%v", i))}))
136141
}
137142

138143
// once we delivered all our messages we close
139144
sess.Close()
140145

141146
// try to send more but the session should not accept them
142147
for i := range numMessage {
143-
s.enqueue(&view.Message{Payload: []byte(fmt.Sprintf("msg #%v", i))})
144-
assert.Empty(t, s.incoming)
148+
assert.False(t, s.enqueue(&view.Message{Payload: []byte(fmt.Sprintf("msg #%v", i))}))
145149
}
146150
}()
147151

@@ -157,6 +161,7 @@ func TestSessionLifecycleConcurrent(t *testing.T) {
157161
}
158162
assert.Equal(t, numMessage, cnt)
159163
assert.True(t, sess.Info().Closed)
164+
assert.Empty(t, s.incoming)
160165
}()
161166

162167
wg.Wait()

0 commit comments

Comments
 (0)