Skip to content

Add concurrent websocket session test #887

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 104 additions & 1 deletion platform/view/services/comm/comm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"context"
"sync"
"testing"
"time"

"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest"
Expand All @@ -20,6 +21,26 @@ import (
func TestWebsocketSession(t *testing.T) {
RegisterTestingT(t)

aliceNode, bobNode := setupWebsocketSession()

testExchange(aliceNode, bobNode)
}

// TestWebsocketSessionManySenders tests with multiple sender goroutines; creating a new session for every interaction
// TODO: current this test seems to deadlock, and cause the test to timeout;
// go test -v -count 10 -failfast -timeout 30s -run ^TestWebsocketSessionManySenders$
func TestWebsocketSessionManySenders(t *testing.T) {
RegisterTestingT(t)

numWorkers := 1
numMessages := 1000

aliceNode, bobNode := setupWebsocketSession()
testExchangeManySenders(aliceNode, bobNode, numWorkers, numMessages)
shutdown(aliceNode, bobNode)
}

func setupWebsocketSession() (Node, Node) {
aliceConfig, bobConfig := GetConfig("initiator"), GetConfig("responder")

router := &routing.StaticIDRouter{
Expand All @@ -42,7 +63,16 @@ func TestWebsocketSession(t *testing.T) {
pkID: []byte("bob"),
}

testExchange(aliceNode, bobNode)
return aliceNode, bobNode
}

func shutdown(nodes ...Node) {
// TODO: how to check that the comm service is actually stopped?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stop normally waits. What is it that's failing? Maybe the connections that are not done sending?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See bug description in #884

for _, n := range nodes {
n.commService.Stop()
}
// until we figure out how to check when the comm service has stopped completely we give it a bit of time to ZzzZzz
time.Sleep(1 * time.Second)
}

func TestLibp2pSession(t *testing.T) {
Expand Down Expand Up @@ -111,3 +141,76 @@ func testExchange(aliceNode, bobNode Node) {

wg.Wait()
}

func testExchangeManySenders(aliceNode, bobNode Node, numWorker, numOfMsgs int) {
var wgBob sync.WaitGroup

ctx, cancel := context.WithCancel(context.Background())

// setup bob as our receiver
bobMasterSession, err := bobNode.commService.MasterSession()
Expect(err).ToNot(HaveOccurred())
wgBob.Add(1)
go func() {
defer wgBob.Done()
for {
select {
// run until we close via the context
case <-ctx.Done():
return
case response := <-bobMasterSession.Receive():
// get our message from master session
Expect(response).ToNot(BeNil())
Expect(response.Payload).To(Equal([]byte("ping")))

// create a response session
bobSession, err := bobNode.commService.NewSessionWithID(response.SessionID, "", response.FromEndpoint, response.FromPKID, nil, nil)
Expect(err).ToNot(HaveOccurred())
Expect(bobSession.Send([]byte("pong"))).To(Succeed())

// close it
bobSession.Close()
Eventually(bobSession.Info().Closed).Should(BeTrue())
}
}
}()

// setup alice our sender
var wgAlice sync.WaitGroup
for i := 0; i <= numWorker; i++ {
wgAlice.Add(1)
go func() {
defer wgAlice.Done()

// we send every message in a fresh session
for j := 0; j <= numOfMsgs; j++ {
// setup
aliceSession, err := aliceNode.commService.NewSession("", "", rest.ConvertAddress(bobNode.address), bobNode.pkID)
Expect(err).ToNot(HaveOccurred())
Expect(aliceSession.Info().Endpoint).To(Equal(rest.ConvertAddress(bobNode.address)))
Expect(aliceSession.Info().EndpointPKID).To(Equal(bobNode.pkID.Bytes()))

// send
Eventually(aliceSession.Send([]byte("ping"))).Should(Succeed())

// receive
Eventually(func(g Gomega) {
response := <-aliceSession.Receive()
g.Expect(response).ToNot(BeNil())
g.Expect(response).To(HaveField("Payload", Equal([]byte("pong"))))
}).Should(Succeed())

// close
aliceSession.Close()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While writing this test, did you notice the logs were giving confusing messages? Or that we didn't shut down gracefully?

Maybe we should think of more test cases. For instance if we close the channel instead of sending back pong, what do we expect to receive on the other end?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While writing this test, did you notice the logs were giving confusing messages? Or that we didn't shut down gracefully?

Oh the test log is wilde and outputs many warnings. Example:

2025-04-11 13:03:03.957 CEST 9c66 WARN [view-sdk.services.comm.rest.host] readMessages -> Context for stream [5E1V9rQHbsYVziNe7bHPJktJjyQBQTjYYhucotHcmEr.127.0.0.1:57126.BnsK7rcN3qKRV8y7LYUyWE0jEbfTsauQ.] closed by us. Error: %!w(*errors.errorString=&{context canceled})

Eventually(aliceSession.Info().Closed).Should(BeTrue())
}
}()
}

wgAlice.Wait()
cancel()
wgBob.Wait()

bobMasterSession.Close()
Eventually(bobMasterSession.Info().Closed).Should(BeTrue())
}