From 9f279d5d6613d2b7943c4229e0ae03c69b9d71f6 Mon Sep 17 00:00:00 2001 From: Marcus Brandenburger Date: Thu, 10 Apr 2025 16:47:58 +0200 Subject: [PATCH] Add concurrent websocket session test Websocket connects may be used by multiple goroutines concurrently. This commit adds a test to stress the session handling, open, send, receive, and eventual close. Signed-off-by: Marcus Brandenburger --- platform/view/services/comm/comm_test.go | 105 ++++++++++++++++++++++- 1 file changed, 104 insertions(+), 1 deletion(-) diff --git a/platform/view/services/comm/comm_test.go b/platform/view/services/comm/comm_test.go index ac151618d..8e8d7f0fe 100644 --- a/platform/view/services/comm/comm_test.go +++ b/platform/view/services/comm/comm_test.go @@ -10,6 +10,7 @@ import ( "context" "sync" "testing" + "time" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest" @@ -20,6 +21,26 @@ import ( func TestWebsocketSession(t *testing.T) { RegisterTestingT(t) + aliceNode, bobNode := setupWebsocketSession() + + testExchange(aliceNode, bobNode) +} + +// TestWebsocketSessionManySenders tests with multiple sender goroutines; creating a new session for every interaction +// TODO: current this test seems to deadlock, and cause the test to timeout; +// go test -v -count 10 -failfast -timeout 30s -run ^TestWebsocketSessionManySenders$ +func TestWebsocketSessionManySenders(t *testing.T) { + RegisterTestingT(t) + + numWorkers := 1 + numMessages := 1000 + + aliceNode, bobNode := setupWebsocketSession() + testExchangeManySenders(aliceNode, bobNode, numWorkers, numMessages) + shutdown(aliceNode, bobNode) +} + +func setupWebsocketSession() (Node, Node) { aliceConfig, bobConfig := GetConfig("initiator"), GetConfig("responder") router := &routing.StaticIDRouter{ @@ -42,7 +63,16 @@ func TestWebsocketSession(t *testing.T) { pkID: []byte("bob"), } - testExchange(aliceNode, bobNode) + return aliceNode, bobNode +} + +func shutdown(nodes ...Node) { + // TODO: how to check that the comm service is actually stopped? + for _, n := range nodes { + n.commService.Stop() + } + // until we figure out how to check when the comm service has stopped completely we give it a bit of time to ZzzZzz + time.Sleep(1 * time.Second) } func TestLibp2pSession(t *testing.T) { @@ -111,3 +141,76 @@ func testExchange(aliceNode, bobNode Node) { wg.Wait() } + +func testExchangeManySenders(aliceNode, bobNode Node, numWorker, numOfMsgs int) { + var wgBob sync.WaitGroup + + ctx, cancel := context.WithCancel(context.Background()) + + // setup bob as our receiver + bobMasterSession, err := bobNode.commService.MasterSession() + Expect(err).ToNot(HaveOccurred()) + wgBob.Add(1) + go func() { + defer wgBob.Done() + for { + select { + // run until we close via the context + case <-ctx.Done(): + return + case response := <-bobMasterSession.Receive(): + // get our message from master session + Expect(response).ToNot(BeNil()) + Expect(response.Payload).To(Equal([]byte("ping"))) + + // create a response session + bobSession, err := bobNode.commService.NewSessionWithID(response.SessionID, "", response.FromEndpoint, response.FromPKID, nil, nil) + Expect(err).ToNot(HaveOccurred()) + Expect(bobSession.Send([]byte("pong"))).To(Succeed()) + + // close it + bobSession.Close() + Eventually(bobSession.Info().Closed).Should(BeTrue()) + } + } + }() + + // setup alice our sender + var wgAlice sync.WaitGroup + for i := 0; i <= numWorker; i++ { + wgAlice.Add(1) + go func() { + defer wgAlice.Done() + + // we send every message in a fresh session + for j := 0; j <= numOfMsgs; j++ { + // setup + aliceSession, err := aliceNode.commService.NewSession("", "", rest.ConvertAddress(bobNode.address), bobNode.pkID) + Expect(err).ToNot(HaveOccurred()) + Expect(aliceSession.Info().Endpoint).To(Equal(rest.ConvertAddress(bobNode.address))) + Expect(aliceSession.Info().EndpointPKID).To(Equal(bobNode.pkID.Bytes())) + + // send + Eventually(aliceSession.Send([]byte("ping"))).Should(Succeed()) + + // receive + Eventually(func(g Gomega) { + response := <-aliceSession.Receive() + g.Expect(response).ToNot(BeNil()) + g.Expect(response).To(HaveField("Payload", Equal([]byte("pong")))) + }).Should(Succeed()) + + // close + aliceSession.Close() + Eventually(aliceSession.Info().Closed).Should(BeTrue()) + } + }() + } + + wgAlice.Wait() + cancel() + wgBob.Wait() + + bobMasterSession.Close() + Eventually(bobMasterSession.Info().Closed).Should(BeTrue()) +}