From 2c84cbf9eaf935ca37afb59ab8ee9e154c0d6242 Mon Sep 17 00:00:00 2001 From: Alexandros Filios Date: Thu, 27 Mar 2025 08:36:42 +0100 Subject: [PATCH 01/12] Unit test for websocket comm service Signed-off-by: Alexandros Filios --- platform/view/core/endpoint/endpoint.go | 71 ++-------- platform/view/core/endpoint/endpoint_test.go | 2 +- platform/view/core/endpoint/extractor.go | 91 +++++++++++++ platform/view/services/comm/comm.go | 32 +++-- platform/view/services/comm/comm_test.go | 121 ++++++++++++++++++ .../view/services/comm/host/rest/provider.go | 4 +- 6 files changed, 247 insertions(+), 74 deletions(-) create mode 100644 platform/view/core/endpoint/extractor.go create mode 100644 platform/view/services/comm/comm_test.go diff --git a/platform/view/core/endpoint/endpoint.go b/platform/view/core/endpoint/endpoint.go index b970372bc..c91046d78 100644 --- a/platform/view/core/endpoint/endpoint.go +++ b/platform/view/core/endpoint/endpoint.go @@ -9,7 +9,6 @@ package endpoint import ( "bytes" "net" - "reflect" "strings" "sync" @@ -19,7 +18,6 @@ import ( "github.com/hyperledger-labs/fabric-smart-client/platform/view/driver" "github.com/hyperledger-labs/fabric-smart-client/platform/view/view" "github.com/pkg/errors" - "go.uber.org/zap/zapcore" "golang.org/x/exp/slices" ) @@ -69,17 +67,14 @@ type Service struct { resolversMutex sync.RWMutex bindingKVS driver2.BindingStore - pkiExtractorsLock sync.RWMutex - publicKeyExtractors []driver.PublicKeyExtractor - publicKeyIDSynthesizer driver.PublicKeyIDSynthesizer + pkiExtractor *PKIExtractor } // NewService returns a new instance of the view-sdk endpoint service func NewService(bindingKVS driver2.BindingStore) (*Service, error) { er := &Service{ - bindingKVS: bindingKVS, - publicKeyExtractors: []driver.PublicKeyExtractor{}, - publicKeyIDSynthesizer: DefaultPublicKeyIDSynthesizer{}, + bindingKVS: bindingKVS, + pkiExtractor: NewPKIExtractor(), } return er, nil } @@ -89,7 +84,7 @@ func (r *Service) Resolve(party view.Identity) (driver.Resolver, []byte, error) if err != nil { return nil, nil, err } - return resolver, r.pkiResolve(resolver), nil + return resolver, r.pkiExtractor.PkiResolve(resolver), nil } func (r *Service) GetResolver(party view.Identity) (driver.Resolver, error) { @@ -161,7 +156,7 @@ func (r *Service) matchesResolver(endpoint string, pkID []byte, resolver *Resolv } return len(pkID) > 0 && (bytes.Equal(pkID, resolver.Id) || - bytes.Equal(pkID, r.pkiResolve(resolver))) + bytes.Equal(pkID, r.pkiExtractor.PkiResolve(resolver))) } func (r *Service) AddResolver(name string, domain string, addresses map[string]string, aliases []string, id []byte) (view.Identity, error) { @@ -203,54 +198,15 @@ func (r *Service) AddResolver(name string, domain string, addresses map[string]s } func (r *Service) AddPublicKeyExtractor(publicKeyExtractor driver.PublicKeyExtractor) error { - r.pkiExtractorsLock.Lock() - defer r.pkiExtractorsLock.Unlock() - - if publicKeyExtractor == nil { - return errors.New("pki resolver should not be nil") - } - r.publicKeyExtractors = append(r.publicKeyExtractors, publicKeyExtractor) - return nil + return r.pkiExtractor.AddPublicKeyExtractor(publicKeyExtractor) } func (r *Service) SetPublicKeyIDSynthesizer(publicKeyIDSynthesizer driver.PublicKeyIDSynthesizer) { - r.publicKeyIDSynthesizer = publicKeyIDSynthesizer -} - -func (r *Service) pkiResolve(resolver *Resolver) []byte { - resolver.PKILock.RLock() - if len(resolver.PKI) != 0 { - resolver.PKILock.RUnlock() - return resolver.PKI - } - resolver.PKILock.RUnlock() - - resolver.PKILock.Lock() - defer resolver.PKILock.Unlock() - if len(resolver.PKI) == 0 { - resolver.PKI = r.ExtractPKI(resolver.Id) - } - return resolver.PKI + r.pkiExtractor.SetPublicKeyIDSynthesizer(publicKeyIDSynthesizer) } func (r *Service) ExtractPKI(id []byte) []byte { - r.pkiExtractorsLock.RLock() - defer r.pkiExtractorsLock.RUnlock() - - for _, extractor := range r.publicKeyExtractors { - if pk, err := extractor.ExtractPublicKey(id); pk != nil { - if logger.IsEnabledFor(zapcore.DebugLevel) { - logger.Debugf("pki resolved for [%s]", id) - } - return r.publicKeyIDSynthesizer.PublicKeyID(pk) - } else { - if logger.IsEnabledFor(zapcore.DebugLevel) { - logger.Debugf("pki not resolved by [%s] for [%s]: [%s]", getIdentifier(extractor), id, err) - } - } - } - logger.Warnf("cannot resolve pki for [%s]", id) - return nil + return r.pkiExtractor.ExtractPKI(id) } func (r *Service) rootEndpoint(party view.Identity) (*Resolver, error) { @@ -296,14 +252,3 @@ func LookupIPv4(endpoint string) string { port := s[1] return net.JoinHostPort(addrS, port) } - -func getIdentifier(f any) string { - if f == nil { - return "" - } - t := reflect.TypeOf(f) - for t.Kind() == reflect.Ptr { - t = t.Elem() - } - return t.PkgPath() + "/" + t.Name() -} diff --git a/platform/view/core/endpoint/endpoint_test.go b/platform/view/core/endpoint/endpoint_test.go index fa54c74e2..92dc47e78 100644 --- a/platform/view/core/endpoint/endpoint_test.go +++ b/platform/view/core/endpoint/endpoint_test.go @@ -41,7 +41,7 @@ func TestPKIResolveConcurrency(t *testing.T) { for i := 0; i < 100; i++ { go func() { defer wg.Done() - svc.pkiResolve(resolver) + svc.pkiExtractor.PkiResolve(resolver) }() } wg.Wait() diff --git a/platform/view/core/endpoint/extractor.go b/platform/view/core/endpoint/extractor.go new file mode 100644 index 000000000..f516823b2 --- /dev/null +++ b/platform/view/core/endpoint/extractor.go @@ -0,0 +1,91 @@ +package endpoint + +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +import ( + "reflect" + "sync" + + "github.com/hyperledger-labs/fabric-smart-client/platform/view/driver" + "github.com/pkg/errors" + "go.uber.org/zap/zapcore" +) + +func NewPKIExtractor() *PKIExtractor { + return &PKIExtractor{ + publicKeyExtractors: []driver.PublicKeyExtractor{}, + publicKeyIDSynthesizer: DefaultPublicKeyIDSynthesizer{}, + } +} + +type PKIExtractor struct { + pkiExtractorsLock sync.RWMutex + publicKeyExtractors []driver.PublicKeyExtractor + publicKeyIDSynthesizer driver.PublicKeyIDSynthesizer +} + +func (r *PKIExtractor) AddPublicKeyExtractor(publicKeyExtractor driver.PublicKeyExtractor) error { + r.pkiExtractorsLock.Lock() + defer r.pkiExtractorsLock.Unlock() + + if publicKeyExtractor == nil { + return errors.New("pki resolver should not be nil") + } + r.publicKeyExtractors = append(r.publicKeyExtractors, publicKeyExtractor) + return nil +} + +func (r *PKIExtractor) SetPublicKeyIDSynthesizer(publicKeyIDSynthesizer driver.PublicKeyIDSynthesizer) { + r.publicKeyIDSynthesizer = publicKeyIDSynthesizer +} + +func (r *PKIExtractor) PkiResolve(resolver *Resolver) []byte { + resolver.PKILock.RLock() + if len(resolver.PKI) != 0 { + resolver.PKILock.RUnlock() + return resolver.PKI + } + resolver.PKILock.RUnlock() + + resolver.PKILock.Lock() + defer resolver.PKILock.Unlock() + if len(resolver.PKI) == 0 { + resolver.PKI = r.ExtractPKI(resolver.Id) + } + return resolver.PKI +} + +func (r *PKIExtractor) ExtractPKI(id []byte) []byte { + r.pkiExtractorsLock.RLock() + defer r.pkiExtractorsLock.RUnlock() + + for _, extractor := range r.publicKeyExtractors { + if pk, err := extractor.ExtractPublicKey(id); pk != nil { + if logger.IsEnabledFor(zapcore.DebugLevel) { + logger.Debugf("pki resolved for [%s]", id) + } + return r.publicKeyIDSynthesizer.PublicKeyID(pk) + } else { + if logger.IsEnabledFor(zapcore.DebugLevel) { + logger.Debugf("pki not resolved by [%s] for [%s]: [%s]", getIdentifier(extractor), id, err) + } + } + } + logger.Warnf("cannot resolve pki for [%s]", id) + return nil +} + +func getIdentifier(f any) string { + if f == nil { + return "" + } + t := reflect.TypeOf(f) + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + return t.PkgPath() + "/" + t.Name() +} diff --git a/platform/view/services/comm/comm.go b/platform/view/services/comm/comm.go index f6fffa0b8..db37c2306 100644 --- a/platform/view/services/comm/comm.go +++ b/platform/view/services/comm/comm.go @@ -33,7 +33,7 @@ type ConfigService interface { type Service struct { HostProvider host.GeneratorProvider EndpointService EndpointService - ConfigService ConfigService + config *Config Node *P2PNode NodeSync sync.RWMutex @@ -41,15 +41,31 @@ type Service struct { metricsProvider metrics.Provider } +type Config struct { + ListenAddress string + BootstrapNode string + KeyFile string + CertFile string +} + func NewService(hostProvider host.GeneratorProvider, endpointService EndpointService, configService ConfigService, tracerProvider trace.TracerProvider, metricsProvider metrics.Provider) (*Service, error) { - s := &Service{ + config := &Config{ + ListenAddress: configService.GetString("fsc.p2p.listenAddress"), + BootstrapNode: configService.GetString("fsc.p2p.opts.bootstrapNode"), + KeyFile: configService.GetPath("fsc.identity.key.file"), + CertFile: configService.GetPath("fsc.identity.cert.file"), + } + return newService(hostProvider, endpointService, config, tracerProvider, metricsProvider), nil +} + +func newService(hostProvider host.GeneratorProvider, endpointService EndpointService, config *Config, tracerProvider trace.TracerProvider, metricsProvider metrics.Provider) *Service { + return &Service{ HostProvider: hostProvider, EndpointService: endpointService, - ConfigService: configService, + config: config, tracerProvider: tracerProvider, metricsProvider: metricsProvider, } - return s, nil } func (s *Service) Start(ctx context.Context) { @@ -123,10 +139,10 @@ func (s *Service) init() error { return nil } - p2pListenAddress := s.ConfigService.GetString("fsc.p2p.listenAddress") - p2pBootstrapNode := s.ConfigService.GetString("fsc.p2p.opts.bootstrapNode") - keyFile := s.ConfigService.GetPath("fsc.identity.key.file") - certFile := s.ConfigService.GetPath("fsc.identity.cert.file") + p2pListenAddress := s.config.ListenAddress + p2pBootstrapNode := s.config.BootstrapNode + keyFile := s.config.KeyFile + certFile := s.config.CertFile if len(p2pBootstrapNode) == 0 { // this is a bootstrap node diff --git a/platform/view/services/comm/comm_test.go b/platform/view/services/comm/comm_test.go new file mode 100644 index 000000000..aa2a93a4b --- /dev/null +++ b/platform/view/services/comm/comm_test.go @@ -0,0 +1,121 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package comm + +import ( + "context" + "fmt" + "os" + "path" + "sync" + "testing" + + "github.com/hashicorp/consul/sdk/freeport" + "github.com/hyperledger-labs/fabric-smart-client/platform/common/utils" + "github.com/hyperledger-labs/fabric-smart-client/platform/view" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/core/endpoint" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest/routing" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest/websocket" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics/disabled" + view2 "github.com/hyperledger-labs/fabric-smart-client/platform/view/view" + . "github.com/onsi/gomega" + "go.opentelemetry.io/otel/trace/noop" +) + +func TestWebsocketSession(t *testing.T) { + RegisterTestingT(t) + + addresses := map[string]string{ + "alice": freeAddress(), + "bob": freeAddress(), + } + routing := &routing.StaticIDRouter{ + "alice": []host.PeerIPAddress{rest.ConvertAddress(addresses["alice"])}, + "bob": []host.PeerIPAddress{rest.ConvertAddress(addresses["bob"])}, + } + rootPath, ok := os.LookupEnv("GOPATH") + Expect(ok).To(BeFalse(), "GOPATH is not set") + projectPath := path.Join(rootPath, "src", "github.com", "hyperledger-labs", "fabric-smart-client") + testDataPath := path.Join(projectPath, "integration", "fsc", "pingpong", "testdata", "fsc", "crypto", "peerOrganizations", "fsc.example.com", "peers") + aliceConfig := &Config{ + ListenAddress: addresses["alice"], + KeyFile: path.Join(testDataPath, "initiator.fsc.example.com", "msp", "keystore", "priv_sk"), + CertFile: path.Join(testDataPath, "initiator.fsc.example.com", "msp", "signcerts", "initiator.fsc.example.com-cert.pem"), + } + bobConfig := &Config{ + ListenAddress: addresses["bob"], + KeyFile: path.Join(testDataPath, "responder.fsc.example.com", "msp", "keystore", "priv_sk"), + CertFile: path.Join(testDataPath, "responder.fsc.example.com", "msp", "signcerts", "responder.fsc.example.com-cert.pem"), + } + alice := newWebsocketCommService(routing, aliceConfig) + alice.Start(context.Background()) + bob := newWebsocketCommService(routing, bobConfig) + bob.Start(context.Background()) + + wg := sync.WaitGroup{} + wg.Add(2) + + aliceSession, err := alice.NewSession("", "", rest.ConvertAddress(addresses["bob"]), []byte("bob")) + Expect(err).ToNot(HaveOccurred()) + Expect(aliceSession.Info().Endpoint).To(Equal(rest.ConvertAddress(addresses["bob"]))) + Expect(aliceSession.Info().EndpointPKID).To(Equal([]byte("bob"))) + bobMasterSession, err := bob.MasterSession() + Expect(err).ToNot(HaveOccurred()) + go func() { + defer wg.Done() + Expect(aliceSession.Send([]byte("msg1"))).To(Succeed()) + Expect(aliceSession.Send([]byte("msg2"))).To(Succeed()) + response := <-aliceSession.Receive() + Expect(response).ToNot(BeNil()) + Expect(response).To(HaveField("Payload", Equal([]byte("msg3")))) + }() + + go func() { + defer wg.Done() + response := <-bobMasterSession.Receive() + Expect(response).ToNot(BeNil()) + Expect(response.Payload).To(Equal([]byte("msg1"))) + Expect(response.SessionID).To(Equal(aliceSession.Info().ID)) + + response = <-bobMasterSession.Receive() + Expect(response).ToNot(BeNil()) + Expect(response.Payload).To(Equal([]byte("msg2"))) + + bobSession, err := bob.NewSessionWithID(response.SessionID, "", response.FromEndpoint, response.FromPKID, nil, nil) + Expect(err).ToNot(HaveOccurred()) + Expect(bobSession.Send([]byte("msg3"))).To(Succeed()) + }() + + wg.Wait() +} + +func freeAddress() string { + return fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", utils.MustGet(freeport.Take(1))[0]) +} + +func newWebsocketCommService(addresses *routing.StaticIDRouter, config *Config) *Service { + discovery := routing.NewServiceDiscovery(addresses, routing.Random[host.PeerIPAddress]()) + + pkiExtractor := endpoint.NewPKIExtractor() + utils.Must(pkiExtractor.AddPublicKeyExtractor(&PKExtractor{})) + pkiExtractor.SetPublicKeyIDSynthesizer(&rest.PKIDSynthesizer{}) + + hostProvider := rest.NewEndpointBasedProvider(pkiExtractor, discovery, noop.NewTracerProvider(), websocket.NewMultiplexedProvider(noop.NewTracerProvider(), &disabled.Provider{})) + + return newService(hostProvider, nil, config, noop.NewTracerProvider(), &disabled.Provider{}) +} + +type noopEndpointService struct{} + +func (s *noopEndpointService) Resolve(party view2.Identity) (view2.Identity, map[view.PortName]string, []byte, error) { + return nil, nil, nil, nil +} +func (s *noopEndpointService) GetIdentity(label string, pkID []byte) (view2.Identity, error) { + return nil, nil +} diff --git a/platform/view/services/comm/host/rest/provider.go b/platform/view/services/comm/host/rest/provider.go index 1bdad7930..eed10960e 100644 --- a/platform/view/services/comm/host/rest/provider.go +++ b/platform/view/services/comm/host/rest/provider.go @@ -42,14 +42,14 @@ func (p *endpointServiceBasedProvider) NewBootstrapHost(listenAddress host2.Peer return nil, errors.Wrapf(err, "failed to load identity in [%s]", certPath) } nodeID := string(p.pkiExtractor.ExtractPKI(raw)) - return NewHost(nodeID, convertAddress(listenAddress), p.routing, p.tracerProvider, p.streamProvider, privateKeyPath, certPath, nil) + return NewHost(nodeID, ConvertAddress(listenAddress), p.routing, p.tracerProvider, p.streamProvider, privateKeyPath, certPath, nil) } func (p *endpointServiceBasedProvider) NewHost(listenAddress host2.PeerIPAddress, privateKeyPath, certPath string, _ host2.PeerIPAddress) (host2.P2PHost, error) { return p.NewBootstrapHost(listenAddress, privateKeyPath, certPath) } -func convertAddress(addr string) string { +func ConvertAddress(addr string) string { parts := strings.Split(addr, "/") if len(parts) != 5 { panic("unexpected address found: " + addr) From 0594dc7475a995bc9fb8fb6254fc488cd9456ae5 Mon Sep 17 00:00:00 2001 From: Alexandros Filios Date: Thu, 27 Mar 2025 08:53:55 +0100 Subject: [PATCH 02/12] Unit test for websocket comm service Signed-off-by: Alexandros Filios --- platform/view/services/comm/comm_test.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/platform/view/services/comm/comm_test.go b/platform/view/services/comm/comm_test.go index aa2a93a4b..1b03061e1 100644 --- a/platform/view/services/comm/comm_test.go +++ b/platform/view/services/comm/comm_test.go @@ -16,14 +16,12 @@ import ( "github.com/hashicorp/consul/sdk/freeport" "github.com/hyperledger-labs/fabric-smart-client/platform/common/utils" - "github.com/hyperledger-labs/fabric-smart-client/platform/view" "github.com/hyperledger-labs/fabric-smart-client/platform/view/core/endpoint" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest/routing" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest/websocket" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics/disabled" - view2 "github.com/hyperledger-labs/fabric-smart-client/platform/view/view" . "github.com/onsi/gomega" "go.opentelemetry.io/otel/trace/noop" ) @@ -110,12 +108,3 @@ func newWebsocketCommService(addresses *routing.StaticIDRouter, config *Config) return newService(hostProvider, nil, config, noop.NewTracerProvider(), &disabled.Provider{}) } - -type noopEndpointService struct{} - -func (s *noopEndpointService) Resolve(party view2.Identity) (view2.Identity, map[view.PortName]string, []byte, error) { - return nil, nil, nil, nil -} -func (s *noopEndpointService) GetIdentity(label string, pkID []byte) (view2.Identity, error) { - return nil, nil -} From 8f71ddd61a8fe02381be70a4572189d9aa11e0ff Mon Sep 17 00:00:00 2001 From: Alexandros Filios Date: Thu, 27 Mar 2025 16:40:39 +0100 Subject: [PATCH 03/12] fixup! Unit test for websocket comm service Signed-off-by: Alexandros Filios --- platform/view/services/comm/comm_test.go | 124 ++++++++++++++++++----- 1 file changed, 98 insertions(+), 26 deletions(-) diff --git a/platform/view/services/comm/comm_test.go b/platform/view/services/comm/comm_test.go index 1b03061e1..469f68fe0 100644 --- a/platform/view/services/comm/comm_test.go +++ b/platform/view/services/comm/comm_test.go @@ -16,54 +16,89 @@ import ( "github.com/hashicorp/consul/sdk/freeport" "github.com/hyperledger-labs/fabric-smart-client/platform/common/utils" + endpoint2 "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/endpoint" + "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/msp/x509" + "github.com/hyperledger-labs/fabric-smart-client/platform/view" "github.com/hyperledger-labs/fabric-smart-client/platform/view/core/endpoint" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/libp2p" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest/routing" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest/websocket" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics/disabled" + view2 "github.com/hyperledger-labs/fabric-smart-client/platform/view/view" . "github.com/onsi/gomega" "go.opentelemetry.io/otel/trace/noop" ) +type node struct { + commService *Service + address string + pkID view2.Identity +} + func TestWebsocketSession(t *testing.T) { RegisterTestingT(t) - addresses := map[string]string{ - "alice": freeAddress(), - "bob": freeAddress(), - } - routing := &routing.StaticIDRouter{ - "alice": []host.PeerIPAddress{rest.ConvertAddress(addresses["alice"])}, - "bob": []host.PeerIPAddress{rest.ConvertAddress(addresses["bob"])}, + aliceConfig, bobConfig := getConfig("initiator"), getConfig("responder") + + router := &routing.StaticIDRouter{ + "alice": []host.PeerIPAddress{rest.ConvertAddress(aliceConfig.ListenAddress)}, + "bob": []host.PeerIPAddress{rest.ConvertAddress(bobConfig.ListenAddress)}, } - rootPath, ok := os.LookupEnv("GOPATH") - Expect(ok).To(BeFalse(), "GOPATH is not set") - projectPath := path.Join(rootPath, "src", "github.com", "hyperledger-labs", "fabric-smart-client") - testDataPath := path.Join(projectPath, "integration", "fsc", "pingpong", "testdata", "fsc", "crypto", "peerOrganizations", "fsc.example.com", "peers") - aliceConfig := &Config{ - ListenAddress: addresses["alice"], - KeyFile: path.Join(testDataPath, "initiator.fsc.example.com", "msp", "keystore", "priv_sk"), - CertFile: path.Join(testDataPath, "initiator.fsc.example.com", "msp", "signcerts", "initiator.fsc.example.com-cert.pem"), + alice := newWebsocketCommService(router, aliceConfig) + alice.Start(context.Background()) + bob := newWebsocketCommService(router, bobConfig) + bob.Start(context.Background()) + + aliceNode := node{ + commService: alice, + address: aliceConfig.ListenAddress, + pkID: []byte("alice"), } - bobConfig := &Config{ - ListenAddress: addresses["bob"], - KeyFile: path.Join(testDataPath, "responder.fsc.example.com", "msp", "keystore", "priv_sk"), - CertFile: path.Join(testDataPath, "responder.fsc.example.com", "msp", "signcerts", "responder.fsc.example.com-cert.pem"), + bobNode := node{ + commService: bob, + address: bobConfig.ListenAddress, + pkID: []byte("bob"), } - alice := newWebsocketCommService(routing, aliceConfig) + + testExchange(aliceNode, bobNode) +} + +func TestLibp2pSession(t *testing.T) { + RegisterTestingT(t) + + aliceConfig, bobConfig := getConfig("initiator"), getConfig("responder") + bobConfig.BootstrapNode = "alice" + + alice, alicePkID := newLibP2PCommService(aliceConfig, nil) alice.Start(context.Background()) - bob := newWebsocketCommService(routing, bobConfig) + bob, bobPkID := newLibP2PCommService(bobConfig, &bootstrapNodeResolver{nodeID: alicePkID, nodeAddress: aliceConfig.ListenAddress}) bob.Start(context.Background()) + aliceNode := node{ + commService: alice, + address: aliceConfig.ListenAddress, + pkID: alicePkID, + } + bobNode := node{ + commService: bob, + address: bobConfig.ListenAddress, + pkID: bobPkID, + } + + testExchange(aliceNode, bobNode) +} + +func testExchange(aliceNode, bobNode node) { wg := sync.WaitGroup{} wg.Add(2) - aliceSession, err := alice.NewSession("", "", rest.ConvertAddress(addresses["bob"]), []byte("bob")) + aliceSession, err := aliceNode.commService.NewSession("", "", rest.ConvertAddress(bobNode.address), bobNode.pkID) Expect(err).ToNot(HaveOccurred()) - Expect(aliceSession.Info().Endpoint).To(Equal(rest.ConvertAddress(addresses["bob"]))) - Expect(aliceSession.Info().EndpointPKID).To(Equal([]byte("bob"))) - bobMasterSession, err := bob.MasterSession() + Expect(aliceSession.Info().Endpoint).To(Equal(rest.ConvertAddress(bobNode.address))) + Expect(aliceSession.Info().EndpointPKID).To(Equal(bobNode.pkID.Bytes())) + bobMasterSession, err := bobNode.commService.MasterSession() Expect(err).ToNot(HaveOccurred()) go func() { defer wg.Done() @@ -85,7 +120,7 @@ func TestWebsocketSession(t *testing.T) { Expect(response).ToNot(BeNil()) Expect(response.Payload).To(Equal([]byte("msg2"))) - bobSession, err := bob.NewSessionWithID(response.SessionID, "", response.FromEndpoint, response.FromPKID, nil, nil) + bobSession, err := bobNode.commService.NewSessionWithID(response.SessionID, "", response.FromEndpoint, response.FromPKID, nil, nil) Expect(err).ToNot(HaveOccurred()) Expect(bobSession.Send([]byte("msg3"))).To(Succeed()) }() @@ -108,3 +143,40 @@ func newWebsocketCommService(addresses *routing.StaticIDRouter, config *Config) return newService(hostProvider, nil, config, noop.NewTracerProvider(), &disabled.Provider{}) } + +func newLibP2PCommService(config *Config, resolver EndpointService) (*Service, view2.Identity) { + pkiExtractor := endpoint.NewPKIExtractor() + utils.Must(pkiExtractor.AddPublicKeyExtractor(&PKExtractor{})) + utils.Must(pkiExtractor.AddPublicKeyExtractor(endpoint2.PublicKeyExtractor{})) + pkiExtractor.SetPublicKeyIDSynthesizer(&libp2p.PKIDSynthesizer{}) + + hostProvider := libp2p.NewHostGeneratorProvider(&disabled.Provider{}) + + pkID := pkiExtractor.ExtractPKI(utils.MustGet(x509.Serialize("", config.CertFile))) + + return newService(hostProvider, resolver, config, noop.NewTracerProvider(), &disabled.Provider{}), pkID +} + +type bootstrapNodeResolver struct { + nodeAddress string + nodeID []byte +} + +func (r *bootstrapNodeResolver) Resolve(view2.Identity) (view2.Identity, map[view.PortName]string, []byte, error) { + return nil, map[view.PortName]string{view.P2PPort: rest.ConvertAddress(r.nodeAddress)}, r.nodeID, nil +} + +func (r *bootstrapNodeResolver) GetIdentity(string, []byte) (view2.Identity, error) { + return view2.Identity("bstpnd"), nil +} +func getConfig(name string) *Config { + rootPath, ok := os.LookupEnv("GOPATH") + Expect(ok).To(BeFalse(), "GOPATH is not set") + projectPath := path.Join(rootPath, "src", "github.com", "hyperledger-labs", "fabric-smart-client") + mspDir := path.Join(projectPath, "integration", "fsc", "pingpong", "testdata", "fsc", "crypto", "peerOrganizations", "fsc.example.com", "peers", fmt.Sprintf("%s.fsc.example.com", name), "msp") + return &Config{ + ListenAddress: freeAddress(), + KeyFile: path.Join(mspDir, "keystore", "priv_sk"), + CertFile: path.Join(mspDir, "signcerts", fmt.Sprintf("%s.fsc.example.com-cert.pem", name)), + } +} From 034b2f5887dee2276a5d9b6676a644f3368228e7 Mon Sep 17 00:00:00 2001 From: akram Date: Mon, 7 Apr 2025 17:28:15 +0300 Subject: [PATCH 04/12] Init version --- .../view/services/comm/comm_bench_test.go | 133 ++++++++++++++++++ platform/view/services/comm/comm_test.go | 95 ++----------- platform/view/services/comm/test_utils.go | 87 +++++++++++- 3 files changed, 226 insertions(+), 89 deletions(-) create mode 100644 platform/view/services/comm/comm_bench_test.go diff --git a/platform/view/services/comm/comm_bench_test.go b/platform/view/services/comm/comm_bench_test.go new file mode 100644 index 000000000..f2f580a9e --- /dev/null +++ b/platform/view/services/comm/comm_bench_test.go @@ -0,0 +1,133 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package comm + +import ( + "context" + "fmt" + "strconv" + "sync" + "testing" + + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest/routing" + . "github.com/onsi/gomega" +) + +// type ( +const numOfConn int = 1 + +// const numOfMsg int = 1 +// ) + +func BenchmarkTestWebsocketSession(b *testing.B) { + RegisterTestingT(b) + + // senderNodes := []Node{} + // receiverNodes := []Node{} + + for i := range numOfConn { + + senderConfig, receiverConfig := GetConfig("initiator"), GetConfig("responder") + + router := &routing.StaticIDRouter{ + "sender" + strconv.Itoa(i): []host.PeerIPAddress{rest.ConvertAddress(senderConfig.ListenAddress)}, + "receiver" + strconv.Itoa(i): []host.PeerIPAddress{rest.ConvertAddress(receiverConfig.ListenAddress)}, + } + + // Need to move at the begining + sender := NewWebsocketCommService(router, senderConfig) + sender.Start(context.Background()) + receiver := NewWebsocketCommService(router, receiverConfig) + receiver.Start(context.Background()) + + senderNode := Node{ + commService: sender, + address: senderConfig.ListenAddress, + pkID: []byte("sender" + strconv.Itoa(i))} + + receiverNode := Node{ + commService: receiver, + address: receiverConfig.ListenAddress, + pkID: []byte("receiver" + strconv.Itoa(i))} + + benchmarkTestExchange(senderNode, receiverNode) + } +} + +func BenchmarkTestLibp2pSession(b *testing.B) { + RegisterTestingT(b) + + for i := range numOfConn { + senderConfig, receiverConfig := GetConfig("initiator"), GetConfig("responder") + receiverConfig.BootstrapNode = "Sender" + strconv.Itoa(i) + + sender, senderPkID := NewLibP2PCommService(senderConfig, nil) + sender.Start(context.Background()) + receiver, receiverPkID := NewLibP2PCommService(receiverConfig, &BootstrapNodeResolver{nodeID: senderPkID, nodeAddress: senderConfig.ListenAddress}) + receiver.Start(context.Background()) + + senderNode := Node{ + commService: sender, + address: senderConfig.ListenAddress, + pkID: senderPkID, + } + receiverNode := Node{ + commService: receiver, + address: receiverConfig.ListenAddress, + pkID: receiverPkID, + } + + benchmarkTestExchange(senderNode, receiverNode) + } +} + +func benchmarkTestExchange(senderNode, receiverNode Node) { + wg := sync.WaitGroup{} + wg.Add(2) + + //NewSessionWithID + senderSession, err := senderNode.commService.NewSession("", "", rest.ConvertAddress(receiverNode.address), receiverNode.pkID) + Expect(err).ToNot(HaveOccurred()) + Expect(senderSession.Info().Endpoint).To(Equal(rest.ConvertAddress(receiverNode.address))) + Expect(senderSession.Info().EndpointPKID).To(Equal(receiverNode.pkID.Bytes())) + recevierSession, err := receiverNode.commService.MasterSession() + receiverNode, err := receiverNode.commService.NewSessionWithID(response.SessionID, "", response.FromEndpoint, response.FromPKID, nil, nil) + + Expect(err).ToNot(HaveOccurred()) + go func() { + defer wg.Done() + for i := 1; i <= 1; i++ { + fmt.Print("---> Sender: sends request message\n") + Expect(senderSession.Send([]byte("request"))).To(Succeed()) + + fmt.Print("---> Sender: receives response message\n") + response := <-senderSession.Receive() + Expect(response).ToNot(BeNil()) + Expect(response).To(HaveField("Payload", Equal([]byte("response")))) + senderSession.Close() + } + }() + + go func() { + defer wg.Done() + for i := 1; i <= 1; i++ { + fmt.Print("---> Receiver: receive request message\n") + response := <-recevierSession.Receive() + Expect(response).ToNot(BeNil()) + Expect(response.Payload).To(Equal([]byte("request"))) + Expect(response.SessionID).To(Equal(senderSession.Info().ID)) + + fmt.Print("---> Receiver: sends response message\n") + Expect(err).ToNot(HaveOccurred()) + Expect(receiverNode.Send([]byte("response"))).To(Succeed()) + } + }() + + wg.Wait() +} diff --git a/platform/view/services/comm/comm_test.go b/platform/view/services/comm/comm_test.go index 469f68fe0..3bcb12248 100644 --- a/platform/view/services/comm/comm_test.go +++ b/platform/view/services/comm/comm_test.go @@ -8,55 +8,35 @@ package comm import ( "context" - "fmt" - "os" - "path" "sync" "testing" - "github.com/hashicorp/consul/sdk/freeport" - "github.com/hyperledger-labs/fabric-smart-client/platform/common/utils" - endpoint2 "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/endpoint" - "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/msp/x509" - "github.com/hyperledger-labs/fabric-smart-client/platform/view" - "github.com/hyperledger-labs/fabric-smart-client/platform/view/core/endpoint" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host" - "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/libp2p" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest/routing" - "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest/websocket" - "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics/disabled" - view2 "github.com/hyperledger-labs/fabric-smart-client/platform/view/view" . "github.com/onsi/gomega" - "go.opentelemetry.io/otel/trace/noop" ) -type node struct { - commService *Service - address string - pkID view2.Identity -} - func TestWebsocketSession(t *testing.T) { RegisterTestingT(t) - aliceConfig, bobConfig := getConfig("initiator"), getConfig("responder") + aliceConfig, bobConfig := GetConfig("initiator"), GetConfig("responder") router := &routing.StaticIDRouter{ "alice": []host.PeerIPAddress{rest.ConvertAddress(aliceConfig.ListenAddress)}, "bob": []host.PeerIPAddress{rest.ConvertAddress(bobConfig.ListenAddress)}, } - alice := newWebsocketCommService(router, aliceConfig) + alice := NewWebsocketCommService(router, aliceConfig) alice.Start(context.Background()) - bob := newWebsocketCommService(router, bobConfig) + bob := NewWebsocketCommService(router, bobConfig) bob.Start(context.Background()) - aliceNode := node{ + aliceNode := Node{ commService: alice, address: aliceConfig.ListenAddress, pkID: []byte("alice"), } - bobNode := node{ + bobNode := Node{ commService: bob, address: bobConfig.ListenAddress, pkID: []byte("bob"), @@ -68,20 +48,20 @@ func TestWebsocketSession(t *testing.T) { func TestLibp2pSession(t *testing.T) { RegisterTestingT(t) - aliceConfig, bobConfig := getConfig("initiator"), getConfig("responder") + aliceConfig, bobConfig := GetConfig("initiator"), GetConfig("responder") bobConfig.BootstrapNode = "alice" - alice, alicePkID := newLibP2PCommService(aliceConfig, nil) + alice, alicePkID := NewLibP2PCommService(aliceConfig, nil) alice.Start(context.Background()) - bob, bobPkID := newLibP2PCommService(bobConfig, &bootstrapNodeResolver{nodeID: alicePkID, nodeAddress: aliceConfig.ListenAddress}) + bob, bobPkID := NewLibP2PCommService(bobConfig, &BootstrapNodeResolver{nodeID: alicePkID, nodeAddress: aliceConfig.ListenAddress}) bob.Start(context.Background()) - aliceNode := node{ + aliceNode := Node{ commService: alice, address: aliceConfig.ListenAddress, pkID: alicePkID, } - bobNode := node{ + bobNode := Node{ commService: bob, address: bobConfig.ListenAddress, pkID: bobPkID, @@ -90,7 +70,7 @@ func TestLibp2pSession(t *testing.T) { testExchange(aliceNode, bobNode) } -func testExchange(aliceNode, bobNode node) { +func testExchange(aliceNode, bobNode Node) { wg := sync.WaitGroup{} wg.Add(2) @@ -127,56 +107,3 @@ func testExchange(aliceNode, bobNode node) { wg.Wait() } - -func freeAddress() string { - return fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", utils.MustGet(freeport.Take(1))[0]) -} - -func newWebsocketCommService(addresses *routing.StaticIDRouter, config *Config) *Service { - discovery := routing.NewServiceDiscovery(addresses, routing.Random[host.PeerIPAddress]()) - - pkiExtractor := endpoint.NewPKIExtractor() - utils.Must(pkiExtractor.AddPublicKeyExtractor(&PKExtractor{})) - pkiExtractor.SetPublicKeyIDSynthesizer(&rest.PKIDSynthesizer{}) - - hostProvider := rest.NewEndpointBasedProvider(pkiExtractor, discovery, noop.NewTracerProvider(), websocket.NewMultiplexedProvider(noop.NewTracerProvider(), &disabled.Provider{})) - - return newService(hostProvider, nil, config, noop.NewTracerProvider(), &disabled.Provider{}) -} - -func newLibP2PCommService(config *Config, resolver EndpointService) (*Service, view2.Identity) { - pkiExtractor := endpoint.NewPKIExtractor() - utils.Must(pkiExtractor.AddPublicKeyExtractor(&PKExtractor{})) - utils.Must(pkiExtractor.AddPublicKeyExtractor(endpoint2.PublicKeyExtractor{})) - pkiExtractor.SetPublicKeyIDSynthesizer(&libp2p.PKIDSynthesizer{}) - - hostProvider := libp2p.NewHostGeneratorProvider(&disabled.Provider{}) - - pkID := pkiExtractor.ExtractPKI(utils.MustGet(x509.Serialize("", config.CertFile))) - - return newService(hostProvider, resolver, config, noop.NewTracerProvider(), &disabled.Provider{}), pkID -} - -type bootstrapNodeResolver struct { - nodeAddress string - nodeID []byte -} - -func (r *bootstrapNodeResolver) Resolve(view2.Identity) (view2.Identity, map[view.PortName]string, []byte, error) { - return nil, map[view.PortName]string{view.P2PPort: rest.ConvertAddress(r.nodeAddress)}, r.nodeID, nil -} - -func (r *bootstrapNodeResolver) GetIdentity(string, []byte) (view2.Identity, error) { - return view2.Identity("bstpnd"), nil -} -func getConfig(name string) *Config { - rootPath, ok := os.LookupEnv("GOPATH") - Expect(ok).To(BeFalse(), "GOPATH is not set") - projectPath := path.Join(rootPath, "src", "github.com", "hyperledger-labs", "fabric-smart-client") - mspDir := path.Join(projectPath, "integration", "fsc", "pingpong", "testdata", "fsc", "crypto", "peerOrganizations", "fsc.example.com", "peers", fmt.Sprintf("%s.fsc.example.com", name), "msp") - return &Config{ - ListenAddress: freeAddress(), - KeyFile: path.Join(mspDir, "keystore", "priv_sk"), - CertFile: path.Join(mspDir, "signcerts", fmt.Sprintf("%s.fsc.example.com-cert.pem", name)), - } -} diff --git a/platform/view/services/comm/test_utils.go b/platform/view/services/comm/test_utils.go index 6f7798e31..9d2af37e3 100644 --- a/platform/view/services/comm/test_utils.go +++ b/platform/view/services/comm/test_utils.go @@ -8,17 +8,40 @@ package comm import ( "context" + "fmt" + "os" + "path" "sync" "testing" - host2 "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host" + "github.com/hashicorp/consul/sdk/freeport" + "github.com/hyperledger-labs/fabric-smart-client/platform/common/utils" + endpoint2 "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/endpoint" + "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/msp/x509" + view3 "github.com/hyperledger-labs/fabric-smart-client/platform/view" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/core/endpoint" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/libp2p" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest/routing" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest/websocket" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics/disabled" + view2 "github.com/hyperledger-labs/fabric-smart-client/platform/view/view" + . "github.com/onsi/gomega" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/trace/noop" ) type HostNode struct { *P2PNode - ID host2.PeerID - Address host2.PeerIPAddress + ID host.PeerID + Address host.PeerIPAddress +} + +type Node struct { + commService *Service + address string + pkID view2.Identity } func P2PLayerTestRound(t *testing.T, bootstrapNode *HostNode, node *HostNode) { @@ -28,7 +51,7 @@ func P2PLayerTestRound(t *testing.T, bootstrapNode *HostNode, node *HostNode) { defer wg.Done() messages := bootstrapNode.incomingMessages - info := host2.StreamInfo{ + info := host.StreamInfo{ RemotePeerID: node.ID, RemotePeerAddress: node.Address, ContextID: "context", @@ -54,7 +77,7 @@ func P2PLayerTestRound(t *testing.T, bootstrapNode *HostNode, node *HostNode) { assert.NotNil(t, msg) assert.Equal(t, []byte("msg2"), msg.message.Payload) - info := host2.StreamInfo{ + info := host.StreamInfo{ RemotePeerID: bootstrapNode.ID, RemotePeerAddress: bootstrapNode.Address, ContextID: "context", @@ -165,3 +188,57 @@ func SessionsForMPCTestRound(t *testing.T, bootstrapNode *HostNode, node *HostNo bootstrapNode.Stop() node.Stop() } + +func freeAddress() string { + return fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", utils.MustGet(freeport.Take(1))[0]) +} + +func GetConfig(name string) *Config { + rootPath, ok := os.LookupEnv("GOPATH") + Expect(ok).To(BeTrue(), "GOPATH is not set") + projectPath := path.Join(rootPath, "src", "github.com", "hyperledger-labs", "fabric-smart-client") + mspDir := path.Join(projectPath, "integration", "fsc", "pingpong", "testdata", "fsc", "crypto", "peerOrganizations", "fsc.example.com", "peers", fmt.Sprintf("%s.fsc.example.com", name), "msp") + return &Config{ + ListenAddress: freeAddress(), + KeyFile: path.Join(mspDir, "keystore", "priv_sk"), + CertFile: path.Join(mspDir, "signcerts", fmt.Sprintf("%s.fsc.example.com-cert.pem", name)), + } +} + +type BootstrapNodeResolver struct { + nodeAddress string + nodeID []byte +} + +func (r *BootstrapNodeResolver) Resolve(view2.Identity) (view2.Identity, map[view3.PortName]string, []byte, error) { + return nil, map[view3.PortName]string{view3.P2PPort: rest.ConvertAddress(r.nodeAddress)}, r.nodeID, nil +} + +func (r *BootstrapNodeResolver) GetIdentity(string, []byte) (view2.Identity, error) { + return view2.Identity("bstpnd"), nil +} + +func NewWebsocketCommService(addresses *routing.StaticIDRouter, config *Config) *Service { + discovery := routing.NewServiceDiscovery(addresses, routing.Random[host.PeerIPAddress]()) + + pkiExtractor := endpoint.NewPKIExtractor() + utils.Must(pkiExtractor.AddPublicKeyExtractor(&PKExtractor{})) + pkiExtractor.SetPublicKeyIDSynthesizer(&rest.PKIDSynthesizer{}) + + hostProvider := rest.NewEndpointBasedProvider(pkiExtractor, discovery, noop.NewTracerProvider(), websocket.NewMultiplexedProvider(noop.NewTracerProvider(), &disabled.Provider{})) + + return newService(hostProvider, nil, config, noop.NewTracerProvider(), &disabled.Provider{}) +} + +func NewLibP2PCommService(config *Config, resolver EndpointService) (*Service, view2.Identity) { + pkiExtractor := endpoint.NewPKIExtractor() + utils.Must(pkiExtractor.AddPublicKeyExtractor(&PKExtractor{})) + utils.Must(pkiExtractor.AddPublicKeyExtractor(endpoint2.PublicKeyExtractor{})) + pkiExtractor.SetPublicKeyIDSynthesizer(&libp2p.PKIDSynthesizer{}) + + hostProvider := libp2p.NewHostGeneratorProvider(&disabled.Provider{}) + + pkID := pkiExtractor.ExtractPKI(utils.MustGet(x509.Serialize("", config.CertFile))) + + return newService(hostProvider, resolver, config, noop.NewTracerProvider(), &disabled.Provider{}), pkID +} From 0c43b5c91a3c8eedf9b169ac4dcd5d58ec114d28 Mon Sep 17 00:00:00 2001 From: Alexandros Filios Date: Tue, 8 Apr 2025 05:31:15 +0200 Subject: [PATCH 05/12] fixup! Unit test for websocket comm service Signed-off-by: Alexandros Filios --- platform/view/services/comm/comm_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/platform/view/services/comm/comm_test.go b/platform/view/services/comm/comm_test.go index 3bcb12248..ac151618d 100644 --- a/platform/view/services/comm/comm_test.go +++ b/platform/view/services/comm/comm_test.go @@ -87,6 +87,7 @@ func testExchange(aliceNode, bobNode Node) { response := <-aliceSession.Receive() Expect(response).ToNot(BeNil()) Expect(response).To(HaveField("Payload", Equal([]byte("msg3")))) + Expect(aliceSession.Send([]byte("msg4"))).To(Succeed()) }() go func() { @@ -103,6 +104,9 @@ func testExchange(aliceNode, bobNode Node) { bobSession, err := bobNode.commService.NewSessionWithID(response.SessionID, "", response.FromEndpoint, response.FromPKID, nil, nil) Expect(err).ToNot(HaveOccurred()) Expect(bobSession.Send([]byte("msg3"))).To(Succeed()) + response = <-bobSession.Receive() + Expect(response).ToNot(BeNil()) + Expect(response.Payload).To(Equal([]byte("msg4"))) }() wg.Wait() From 0461bd70cf6b25dd27b27dbcc4123196d0349aa0 Mon Sep 17 00:00:00 2001 From: akram Date: Tue, 8 Apr 2025 18:24:16 +0300 Subject: [PATCH 06/12] Add benchmark test --- .../view/services/comm/comm_bench_test.go | 174 ++++++++---------- 1 file changed, 76 insertions(+), 98 deletions(-) diff --git a/platform/view/services/comm/comm_bench_test.go b/platform/view/services/comm/comm_bench_test.go index f2f580a9e..0130823c8 100644 --- a/platform/view/services/comm/comm_bench_test.go +++ b/platform/view/services/comm/comm_bench_test.go @@ -9,7 +9,6 @@ package comm import ( "context" "fmt" - "strconv" "sync" "testing" @@ -19,115 +18,94 @@ import ( . "github.com/onsi/gomega" ) -// type ( -const numOfConn int = 1 +const ( + // numOfNodes int = 2 + numOfSessions int = 1 + numOfMsgs int = 2 +) -// const numOfMsg int = 1 -// ) +// func setUpCommServices() { +// } func BenchmarkTestWebsocketSession(b *testing.B) { RegisterTestingT(b) - // senderNodes := []Node{} - // receiverNodes := []Node{} - - for i := range numOfConn { - - senderConfig, receiverConfig := GetConfig("initiator"), GetConfig("responder") - - router := &routing.StaticIDRouter{ - "sender" + strconv.Itoa(i): []host.PeerIPAddress{rest.ConvertAddress(senderConfig.ListenAddress)}, - "receiver" + strconv.Itoa(i): []host.PeerIPAddress{rest.ConvertAddress(receiverConfig.ListenAddress)}, - } - - // Need to move at the begining - sender := NewWebsocketCommService(router, senderConfig) - sender.Start(context.Background()) - receiver := NewWebsocketCommService(router, receiverConfig) - receiver.Start(context.Background()) + senderConfig, receiverConfig := GetConfig("initiator"), GetConfig("responder") - senderNode := Node{ - commService: sender, - address: senderConfig.ListenAddress, - pkID: []byte("sender" + strconv.Itoa(i))} - - receiverNode := Node{ - commService: receiver, - address: receiverConfig.ListenAddress, - pkID: []byte("receiver" + strconv.Itoa(i))} - - benchmarkTestExchange(senderNode, receiverNode) + router := &routing.StaticIDRouter{ + "sender": []host.PeerIPAddress{rest.ConvertAddress(senderConfig.ListenAddress)}, + "receiver": []host.PeerIPAddress{rest.ConvertAddress(receiverConfig.ListenAddress)}, } -} - -func BenchmarkTestLibp2pSession(b *testing.B) { - RegisterTestingT(b) - - for i := range numOfConn { - senderConfig, receiverConfig := GetConfig("initiator"), GetConfig("responder") - receiverConfig.BootstrapNode = "Sender" + strconv.Itoa(i) - - sender, senderPkID := NewLibP2PCommService(senderConfig, nil) - sender.Start(context.Background()) - receiver, receiverPkID := NewLibP2PCommService(receiverConfig, &BootstrapNodeResolver{nodeID: senderPkID, nodeAddress: senderConfig.ListenAddress}) - receiver.Start(context.Background()) - - senderNode := Node{ - commService: sender, - address: senderConfig.ListenAddress, - pkID: senderPkID, - } - receiverNode := Node{ - commService: receiver, - address: receiverConfig.ListenAddress, - pkID: receiverPkID, - } - - benchmarkTestExchange(senderNode, receiverNode) + sender := NewWebsocketCommService(router, senderConfig) + sender.Start(context.Background()) + receiver := NewWebsocketCommService(router, receiverConfig) + receiver.Start(context.Background()) + + senderNode := Node{ + commService: sender, + address: senderConfig.ListenAddress, + pkID: []byte("sender"), + } + receiverNode := Node{ + commService: receiver, + address: senderConfig.ListenAddress, + pkID: []byte("receiver"), } + benchmarkTestExchange(senderNode, receiverNode) } func benchmarkTestExchange(senderNode, receiverNode Node) { wg := sync.WaitGroup{} - wg.Add(2) - - //NewSessionWithID - senderSession, err := senderNode.commService.NewSession("", "", rest.ConvertAddress(receiverNode.address), receiverNode.pkID) - Expect(err).ToNot(HaveOccurred()) - Expect(senderSession.Info().Endpoint).To(Equal(rest.ConvertAddress(receiverNode.address))) - Expect(senderSession.Info().EndpointPKID).To(Equal(receiverNode.pkID.Bytes())) - recevierSession, err := receiverNode.commService.MasterSession() - receiverNode, err := receiverNode.commService.NewSessionWithID(response.SessionID, "", response.FromEndpoint, response.FromPKID, nil, nil) - - Expect(err).ToNot(HaveOccurred()) - go func() { - defer wg.Done() - for i := 1; i <= 1; i++ { - fmt.Print("---> Sender: sends request message\n") - Expect(senderSession.Send([]byte("request"))).To(Succeed()) - - fmt.Print("---> Sender: receives response message\n") - response := <-senderSession.Receive() - Expect(response).ToNot(BeNil()) - Expect(response).To(HaveField("Payload", Equal([]byte("response")))) - senderSession.Close() - } - }() - - go func() { - defer wg.Done() - for i := 1; i <= 1; i++ { - fmt.Print("---> Receiver: receive request message\n") - response := <-recevierSession.Receive() - Expect(response).ToNot(BeNil()) - Expect(response.Payload).To(Equal([]byte("request"))) - Expect(response.SessionID).To(Equal(senderSession.Info().ID)) - - fmt.Print("---> Receiver: sends response message\n") - Expect(err).ToNot(HaveOccurred()) - Expect(receiverNode.Send([]byte("response"))).To(Succeed()) - } - }() + wg.Add(2 * numOfSessions) + + for j := 1; j <= numOfSessions; j++ { + + fmt.Printf("---> Create new session # %d \n", j) + go func(sessionNum int) { + defer wg.Done() + + senderSession, err := senderNode.commService.NewSession("", "", rest.ConvertAddress(receiverNode.address), receiverNode.pkID) + Expect(err).ToNot(HaveOccurred()) + Expect(senderSession.Info().Endpoint).To(Equal(rest.ConvertAddress(receiverNode.address))) + Expect(senderSession.Info().EndpointPKID).To(Equal(receiverNode.pkID.Bytes())) + //defer senderSession.Close() + + for i := 1; i <= numOfMsgs; i++ { + fmt.Printf("---> Sender: sends request message. Session #: %d, Msg #: %d \n", sessionNum, i) + Expect(senderSession.Send([]byte("request"))).To(Succeed()) + //fmt.Printf("---> Sender: request message was sent successfully. Session #: %d, Msg #: %d \n", sessionNum, i) + + fmt.Printf("---> Sender: wait on receive response message. Session #: %d, Msg #: %d \n", sessionNum, i) + response := <-senderSession.Receive() + Expect(response).ToNot(BeNil()) + Expect(response).To(HaveField("Payload", Equal([]byte("response")))) + //fmt.Printf("---> Sender: receives request message. Session #: %d, Msg #: %d \n", sessionNum, i) + } + }(j) + + go func(sessionNum int) { + defer wg.Done() + + recevierMasterSession, err := receiverNode.commService.MasterSession() + Expect(err).ToNot(HaveOccurred()) + //defer recevierMasterSession.Close() + + for i := 1; i <= numOfMsgs; i++ { + fmt.Printf("---> Receiver: wait on receive request message. Session #: %d, Msg #: %d \n", sessionNum, i) + response := <-recevierMasterSession.Receive() + Expect(response).ToNot(BeNil()) + Expect(response.Payload).To(Equal([]byte("request"))) + //Expect(response.SessionID).To(Equal(senderSession.Info().ID)) + //fmt.Printf("---> Receiver: receives request message. Session #: %d, Msg #: %d \n", sessionNum, i) + + fmt.Printf("---> Receiver: sends response message. Session #: %d, Msg #: %d \n", sessionNum, i) + receiverSession, err := receiverNode.commService.NewSessionWithID(response.SessionID, "", response.FromEndpoint, response.FromPKID, nil, nil) + Expect(err).ToNot(HaveOccurred()) + Expect(receiverSession.Send([]byte("response"))).To(Succeed()) + //fmt.Printf("---> Receiver: response message was sent successfully. Session #: %d, Msg #: %d \n", sessionNum, i) + } + }(j) + } wg.Wait() } From 9d9ebd33de2e30187be368a77d0b1b983603b940 Mon Sep 17 00:00:00 2001 From: Alexandros Filios Date: Tue, 8 Apr 2025 19:38:45 +0200 Subject: [PATCH 07/12] fixup! Unit test for websocket comm service Signed-off-by: Alexandros Filios --- .../view/services/comm/comm_bench_test.go | 134 ++++++++++++------ 1 file changed, 90 insertions(+), 44 deletions(-) diff --git a/platform/view/services/comm/comm_bench_test.go b/platform/view/services/comm/comm_bench_test.go index 0130823c8..511abb3e6 100644 --- a/platform/view/services/comm/comm_bench_test.go +++ b/platform/view/services/comm/comm_bench_test.go @@ -9,24 +9,25 @@ package comm import ( "context" "fmt" + "strconv" + "strings" "sync" "testing" + "github.com/hyperledger-labs/fabric-smart-client/platform/common/utils" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest/routing" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/view" . "github.com/onsi/gomega" ) const ( // numOfNodes int = 2 - numOfSessions int = 1 - numOfMsgs int = 2 + numOfSessions int = 3 + numOfMsgs int = 20 ) -// func setUpCommServices() { - -// } func BenchmarkTestWebsocketSession(b *testing.B) { RegisterTestingT(b) @@ -48,64 +49,109 @@ func BenchmarkTestWebsocketSession(b *testing.B) { } receiverNode := Node{ commService: receiver, - address: senderConfig.ListenAddress, + address: receiverConfig.ListenAddress, pkID: []byte("receiver"), } + benchmarkTestExchange(senderNode, receiverNode) + sender.Stop() + receiver.Stop() } func benchmarkTestExchange(senderNode, receiverNode Node) { wg := sync.WaitGroup{} wg.Add(2 * numOfSessions) - for j := 1; j <= numOfSessions; j++ { - - fmt.Printf("---> Create new session # %d \n", j) + sessions := make([]view.Session, 0, 2*numOfSessions+1) + mu := sync.Mutex{} - go func(sessionNum int) { - defer wg.Done() + for sessId := 0; sessId < numOfSessions; sessId++ { + go func(sessId int) { + logger.Infof("---> Create new session # %d", sessId) senderSession, err := senderNode.commService.NewSession("", "", rest.ConvertAddress(receiverNode.address), receiverNode.pkID) Expect(err).ToNot(HaveOccurred()) + mu.Lock() + sessions = append(sessions, senderSession) + mu.Unlock() Expect(senderSession.Info().Endpoint).To(Equal(rest.ConvertAddress(receiverNode.address))) Expect(senderSession.Info().EndpointPKID).To(Equal(receiverNode.pkID.Bytes())) - //defer senderSession.Close() - - for i := 1; i <= numOfMsgs; i++ { - fmt.Printf("---> Sender: sends request message. Session #: %d, Msg #: %d \n", sessionNum, i) - Expect(senderSession.Send([]byte("request"))).To(Succeed()) - //fmt.Printf("---> Sender: request message was sent successfully. Session #: %d, Msg #: %d \n", sessionNum, i) - - fmt.Printf("---> Sender: wait on receive response message. Session #: %d, Msg #: %d \n", sessionNum, i) - response := <-senderSession.Receive() - Expect(response).ToNot(BeNil()) - Expect(response).To(HaveField("Payload", Equal([]byte("response")))) - //fmt.Printf("---> Sender: receives request message. Session #: %d, Msg #: %d \n", sessionNum, i) - } - }(j) + logger.Infof("---> Created new session # %d [%v]", sessId, senderSession.Info()) - go func(sessionNum int) { - defer wg.Done() + for msgId := 0; msgId < numOfMsgs; msgId++ { + payload := fmt.Sprintf("request-%d-%d", sessId, msgId) + logger.Infof("---> Sender: sends message [%s]", payload) + Expect(senderSession.Send([]byte(payload))).To(Succeed()) + logger.Infof("---> Sender: sent message [%s]", payload) - recevierMasterSession, err := receiverNode.commService.MasterSession() - Expect(err).ToNot(HaveOccurred()) - //defer recevierMasterSession.Close() - - for i := 1; i <= numOfMsgs; i++ { - fmt.Printf("---> Receiver: wait on receive request message. Session #: %d, Msg #: %d \n", sessionNum, i) - response := <-recevierMasterSession.Receive() + response := <-senderSession.Receive() Expect(response).ToNot(BeNil()) - Expect(response.Payload).To(Equal([]byte("request"))) - //Expect(response.SessionID).To(Equal(senderSession.Info().ID)) - //fmt.Printf("---> Receiver: receives request message. Session #: %d, Msg #: %d \n", sessionNum, i) - - fmt.Printf("---> Receiver: sends response message. Session #: %d, Msg #: %d \n", sessionNum, i) - receiverSession, err := receiverNode.commService.NewSessionWithID(response.SessionID, "", response.FromEndpoint, response.FromPKID, nil, nil) - Expect(err).ToNot(HaveOccurred()) - Expect(receiverSession.Send([]byte("response"))).To(Succeed()) - //fmt.Printf("---> Receiver: response message was sent successfully. Session #: %d, Msg #: %d \n", sessionNum, i) + Expect(string(response.Payload)).To(Equal(fmt.Sprintf("response-%d-%d", sessId, msgId))) + logger.Infof("---> Sender: received message: [%s]", string(response.Payload)) } - }(j) + logger.Infof("Send EOF") + Expect(senderSession.Send([]byte("EOF"))).To(Succeed()) + logger.Infof("Sent EOF") + wg.Done() + }(sessId) } + + go func() { + receiverMasterSession, err := receiverNode.commService.MasterSession() + Expect(err).ToNot(HaveOccurred()) + + mu.Lock() + sessions = append(sessions, receiverMasterSession) + mu.Unlock() + sessionMap := map[string]struct{}{} + + logger.Infof("---> Receiver: start receiving on master session") + for response := range receiverMasterSession.Receive() { + Expect(response).ToNot(BeNil()) + logger.Infof("---> Receiver: received message [%s]", string(response.Payload)) + Expect(string(response.Payload)).To(ContainSubstring("request")) + elements := strings.Split(string(response.Payload), "-") + Expect(elements).To(HaveLen(3)) + sessId, msgId := utils.MustGet(strconv.Atoi(elements[1])), utils.MustGet(strconv.Atoi(elements[2])) + Expect(msgId).To(Equal(0)) + + _, ok := sessionMap[response.SessionID] + Expect(ok).To(BeFalse(), "we should not receive a second message on the master session [%s]", string(response.Payload)) + logger.Infof("---> Receiver: open session [%d]", sessId) + sess, err := receiverNode.commService.NewSessionWithID(response.SessionID, "", response.FromEndpoint, response.FromPKID, nil, nil) + Expect(err).To(BeNil()) + logger.Infof("---> Receiver: opened session [%d] [%v]", sessId, sess.Info()) + sessionMap[response.SessionID] = struct{}{} + mu.Lock() + sessions = append(sessions, sess) + mu.Unlock() + go func(sess view.Session, sessId int) { + defer wg.Done() + payload := fmt.Sprintf("response-%d-0", sessId) + logger.Infof("---> Receiver: Send message [%s]", payload) + Expect(sess.Send([]byte(payload))).To(Succeed()) + logger.Infof("---> Receiver: Sent message [%s]", payload) + for response := range sess.Receive() { + if string(response.Payload) == "EOF" { + logger.Infof("---> Receiver: Received EOF on [%d]", sessId) + return + } + elements := strings.Split(string(response.Payload), "-") + Expect(elements).To(HaveLen(3)) + sessId, msgId := utils.MustGet(strconv.Atoi(elements[1])), utils.MustGet(strconv.Atoi(elements[2])) + payload := fmt.Sprintf("response-%d-%d", sessId, msgId) + logger.Infof("---> Receiver: Send message [%s]", payload) + Expect(sess.Send([]byte(payload))).To(Succeed()) + logger.Infof("---> Receiver: Sent message [%s]", payload) + } + }(sess, sessId) + } + }() + logger.Infof("Waiting on execution...") + wg.Wait() + logger.Infof("Execution finished. Closing sessions") + for _, s := range sessions { + s.Close() + } } From 47c0af8f55bca223991c7aafc867d31358f9d56c Mon Sep 17 00:00:00 2001 From: akram Date: Thu, 10 Apr 2025 18:06:28 +0300 Subject: [PATCH 08/12] Add metrics and enehnce the test Signed-off-by: akram --- .../view/services/comm/comm_bench_test.go | 311 ++++++++++++------ 1 file changed, 207 insertions(+), 104 deletions(-) diff --git a/platform/view/services/comm/comm_bench_test.go b/platform/view/services/comm/comm_bench_test.go index 511abb3e6..edcea81b8 100644 --- a/platform/view/services/comm/comm_bench_test.go +++ b/platform/view/services/comm/comm_bench_test.go @@ -13,6 +13,7 @@ import ( "strings" "sync" "testing" + "time" "github.com/hyperledger-labs/fabric-smart-client/platform/common/utils" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host" @@ -22,136 +23,238 @@ import ( . "github.com/onsi/gomega" ) +type routerNodes struct { + sender *Service + senderNode Node + receiverNode Node + receiver *Service +} + +type benchmarkMetrics struct { + latency time.Duration +} + const ( - // numOfNodes int = 2 - numOfSessions int = 3 - numOfMsgs int = 20 + numOfRoutes int = 40 + numOfSessions int = 10 + numOfMsgs int = 10 ) -func BenchmarkTestWebsocketSession(b *testing.B) { - RegisterTestingT(b) +func createWebsocketRoutes() []routerNodes { + routersNodes := make([]routerNodes, numOfRoutes) - senderConfig, receiverConfig := GetConfig("initiator"), GetConfig("responder") + for routeNum := 0; routeNum < numOfRoutes; routeNum++ { + senderConfig := GetConfig("initiator") + receiverConfig := GetConfig("responder") - router := &routing.StaticIDRouter{ - "sender": []host.PeerIPAddress{rest.ConvertAddress(senderConfig.ListenAddress)}, - "receiver": []host.PeerIPAddress{rest.ConvertAddress(receiverConfig.ListenAddress)}, + router := &routing.StaticIDRouter{ + "sender": []host.PeerIPAddress{rest.ConvertAddress(senderConfig.ListenAddress)}, + "receiver": []host.PeerIPAddress{rest.ConvertAddress(receiverConfig.ListenAddress)}, + } + + sender := NewWebsocketCommService(router, senderConfig) + sender.Start(context.Background()) + receiver := NewWebsocketCommService(router, receiverConfig) + receiver.Start(context.Background()) + + senderNode := Node{ + commService: sender, + address: senderConfig.ListenAddress, + pkID: []byte("sender"), + } + receiverNode := Node{ + commService: receiver, + address: receiverConfig.ListenAddress, + pkID: []byte("receiver"), + } + routersNodes[routeNum] = routerNodes{sender: sender, senderNode: senderNode, receiver: receiver, receiverNode: receiverNode} } - sender := NewWebsocketCommService(router, senderConfig) - sender.Start(context.Background()) - receiver := NewWebsocketCommService(router, receiverConfig) - receiver.Start(context.Background()) - - senderNode := Node{ - commService: sender, - address: senderConfig.ListenAddress, - pkID: []byte("sender"), + + return routersNodes +} + +func createLibp2pRoutes() []routerNodes { + routersNodes := make([]routerNodes, numOfRoutes) + for routeNum := 0; routeNum < numOfRoutes; routeNum++ { + senderConfig := GetConfig("initiator") + receiverConfig := GetConfig("responder") + receiverConfig.BootstrapNode = "sender" + + sender, senderPkID := NewLibP2PCommService(senderConfig, nil) + sender.Start(context.Background()) + receiver, receiverPkID := NewLibP2PCommService(receiverConfig, &BootstrapNodeResolver{nodeID: senderPkID, nodeAddress: senderConfig.ListenAddress}) + receiver.Start(context.Background()) + + senderNode := Node{ + commService: sender, + address: senderConfig.ListenAddress, + pkID: senderPkID, + } + + receiverNode := Node{ + commService: receiver, + address: receiverConfig.ListenAddress, + pkID: receiverPkID, + } + routersNodes[routeNum] = routerNodes{sender: sender, senderNode: senderNode, receiver: receiver, receiverNode: receiverNode} + } + + return routersNodes +} + +func testNodesExchange(routersNodes []routerNodes, protocol string) { + metricsMap := map[string]benchmarkMetrics{} + + startTime := time.Now() + for routeNum := 0; routeNum < numOfRoutes; routeNum++ { + testExchangeMsgs(routersNodes[routeNum].senderNode, routersNodes[routeNum].receiverNode, metricsMap) } - receiverNode := Node{ - commService: receiver, - address: receiverConfig.ListenAddress, - pkID: []byte("receiver"), + + endTime := time.Now() + + for routeNum := 0; routeNum < numOfRoutes; routeNum++ { + routersNodes[routeNum].sender.Stop() + routersNodes[routeNum].receiver.Stop() } - benchmarkTestExchange(senderNode, receiverNode) - sender.Stop() - receiver.Stop() + displayMetrics(endTime.Sub(startTime), metricsMap, protocol) } -func benchmarkTestExchange(senderNode, receiverNode Node) { +func BenchmarkTestWebsocket(b *testing.B) { + RegisterTestingT(b) + testNodesExchange(createWebsocketRoutes(), "Websocket") +} + +func BenchmarkTestLibp2p(b *testing.B) { + RegisterTestingT(b) + testNodesExchange(createLibp2pRoutes(), "LipP2P") +} + +func testExchangeMsgs(senderNode, receiverNode Node, metricsMap map[string]benchmarkMetrics) { wg := sync.WaitGroup{} wg.Add(2 * numOfSessions) sessions := make([]view.Session, 0, 2*numOfSessions+1) mu := sync.Mutex{} - for sessId := 0; sessId < numOfSessions; sessId++ { + go senderExchangeMsgs(senderNode, receiverNode, &sessions, metricsMap, &mu, &wg) + go receiverExchangeMsgs(receiverNode, &sessions, &mu, &wg) - go func(sessId int) { - logger.Infof("---> Create new session # %d", sessId) - senderSession, err := senderNode.commService.NewSession("", "", rest.ConvertAddress(receiverNode.address), receiverNode.pkID) - Expect(err).ToNot(HaveOccurred()) - mu.Lock() - sessions = append(sessions, senderSession) - mu.Unlock() - Expect(senderSession.Info().Endpoint).To(Equal(rest.ConvertAddress(receiverNode.address))) - Expect(senderSession.Info().EndpointPKID).To(Equal(receiverNode.pkID.Bytes())) - logger.Infof("---> Created new session # %d [%v]", sessId, senderSession.Info()) - - for msgId := 0; msgId < numOfMsgs; msgId++ { - payload := fmt.Sprintf("request-%d-%d", sessId, msgId) - logger.Infof("---> Sender: sends message [%s]", payload) - Expect(senderSession.Send([]byte(payload))).To(Succeed()) - logger.Infof("---> Sender: sent message [%s]", payload) - - response := <-senderSession.Receive() - Expect(response).ToNot(BeNil()) - Expect(string(response.Payload)).To(Equal(fmt.Sprintf("response-%d-%d", sessId, msgId))) - logger.Infof("---> Sender: received message: [%s]", string(response.Payload)) - } - logger.Infof("Send EOF") - Expect(senderSession.Send([]byte("EOF"))).To(Succeed()) - logger.Infof("Sent EOF") - wg.Done() - }(sessId) + logger.Infof("Waiting on execution...") + + wg.Wait() + logger.Infof("Execution finished. Closing sessions") + for _, s := range sessions { + s.Close() } +} - go func() { - receiverMasterSession, err := receiverNode.commService.MasterSession() +func senderExchangeMsgs(senderNode, receiverNode Node, sessions *[]view.Session, metricsMap map[string]benchmarkMetrics, mu *sync.Mutex, wg *sync.WaitGroup) { + for sessId := 0; sessId < numOfSessions; sessId++ { + defer wg.Done() + logger.Infof("---> Create new session # %d", sessId) + senderSession, err := senderNode.commService.NewSession("", "", rest.ConvertAddress(receiverNode.address), receiverNode.pkID) Expect(err).ToNot(HaveOccurred()) - mu.Lock() - sessions = append(sessions, receiverMasterSession) + *sessions = append(*sessions, senderSession) mu.Unlock() - sessionMap := map[string]struct{}{} + Expect(senderSession.Info().Endpoint).To(Equal(rest.ConvertAddress(receiverNode.address))) + Expect(senderSession.Info().EndpointPKID).To(Equal(receiverNode.pkID.Bytes())) + logger.Infof("---> Created new session # %d [%v]", sessId, senderSession.Info()) - logger.Infof("---> Receiver: start receiving on master session") - for response := range receiverMasterSession.Receive() { + for msgId := 0; msgId < numOfMsgs; msgId++ { + payload := fmt.Sprintf("request-%d-%d", sessId, msgId) + logger.Infof("---> Sender: sends message [%s]", payload) + start := time.Now() + Expect(senderSession.Send([]byte(payload))).To(Succeed()) + logger.Infof("---> Sender: sent message [%s]", payload) + response := <-senderSession.Receive() + end := time.Now() + elapsed := end.Sub(start) Expect(response).ToNot(BeNil()) - logger.Infof("---> Receiver: received message [%s]", string(response.Payload)) - Expect(string(response.Payload)).To(ContainSubstring("request")) - elements := strings.Split(string(response.Payload), "-") - Expect(elements).To(HaveLen(3)) - sessId, msgId := utils.MustGet(strconv.Atoi(elements[1])), utils.MustGet(strconv.Atoi(elements[2])) - Expect(msgId).To(Equal(0)) - - _, ok := sessionMap[response.SessionID] - Expect(ok).To(BeFalse(), "we should not receive a second message on the master session [%s]", string(response.Payload)) - logger.Infof("---> Receiver: open session [%d]", sessId) - sess, err := receiverNode.commService.NewSessionWithID(response.SessionID, "", response.FromEndpoint, response.FromPKID, nil, nil) - Expect(err).To(BeNil()) - logger.Infof("---> Receiver: opened session [%d] [%v]", sessId, sess.Info()) - sessionMap[response.SessionID] = struct{}{} - mu.Lock() - sessions = append(sessions, sess) - mu.Unlock() - go func(sess view.Session, sessId int) { - defer wg.Done() - payload := fmt.Sprintf("response-%d-0", sessId) - logger.Infof("---> Receiver: Send message [%s]", payload) - Expect(sess.Send([]byte(payload))).To(Succeed()) - logger.Infof("---> Receiver: Sent message [%s]", payload) - for response := range sess.Receive() { - if string(response.Payload) == "EOF" { - logger.Infof("---> Receiver: Received EOF on [%d]", sessId) - return - } - elements := strings.Split(string(response.Payload), "-") - Expect(elements).To(HaveLen(3)) - sessId, msgId := utils.MustGet(strconv.Atoi(elements[1])), utils.MustGet(strconv.Atoi(elements[2])) - payload := fmt.Sprintf("response-%d-%d", sessId, msgId) - logger.Infof("---> Receiver: Send message [%s]", payload) - Expect(sess.Send([]byte(payload))).To(Succeed()) - logger.Infof("---> Receiver: Sent message [%s]", payload) - } - }(sess, sessId) + Expect(string(response.Payload)).To(Equal(fmt.Sprintf("response-%d-%d", sessId, msgId))) + logger.Infof("---> Sender: received message: [%s]", string(response.Payload)) + + metricsMap[senderSession.Info().ID+strconv.Itoa(sessId)+strconv.Itoa(msgId)] = benchmarkMetrics{latency: elapsed} } - }() - logger.Infof("Waiting on execution...") + logger.Infof("Send EOF") + Expect(senderSession.Send([]byte("EOF"))).To(Succeed()) + logger.Infof("Sent EOF") + } +} - wg.Wait() - logger.Infof("Execution finished. Closing sessions") - for _, s := range sessions { - s.Close() +func receiverExchangeMsgs(receiverNode Node, sessions *[]view.Session, mu *sync.Mutex, wg *sync.WaitGroup) { + receiverMasterSession, err := receiverNode.commService.MasterSession() + Expect(err).ToNot(HaveOccurred()) + + mu.Lock() + *sessions = append(*sessions, receiverMasterSession) + mu.Unlock() + sessionMap := map[string]struct{}{} + + logger.Infof("---> Receiver: start receiving on master session") + for response := range receiverMasterSession.Receive() { + Expect(response).ToNot(BeNil()) + logger.Infof("---> Receiver: received message [%s]", string(response.Payload)) + Expect(string(response.Payload)).To(ContainSubstring("request")) + elements := strings.Split(string(response.Payload), "-") + Expect(elements).To(HaveLen(3)) + sessId, msgId := utils.MustGet(strconv.Atoi(elements[1])), utils.MustGet(strconv.Atoi(elements[2])) + Expect(msgId).To(Equal(0)) + + _, ok := sessionMap[response.SessionID] + Expect(ok).To(BeFalse(), "we should not receive a second message on the master session [%s]", string(response.Payload)) + logger.Infof("---> Receiver: open session [%d]", sessId) + session, err := receiverNode.commService.NewSessionWithID(response.SessionID, "", response.FromEndpoint, response.FromPKID, nil, nil) + Expect(err).To(BeNil()) + logger.Infof("---> Receiver: opened session [%d] [%v]", sessId, session.Info()) + sessionMap[response.SessionID] = struct{}{} + mu.Lock() + *sessions = append(*sessions, session) + mu.Unlock() + + go receiverSessionExchangeMsgs(sessId, session, wg) + } +} + +func receiverSessionExchangeMsgs(sessId int, session view.Session, wg *sync.WaitGroup) { + defer wg.Done() + payload := fmt.Sprintf("response-%d-0", sessId) + logger.Infof("---> Receiver: Send message [%s]", payload) + Expect(session.Send([]byte(payload))).To(Succeed()) + logger.Infof("---> Receiver: Sent message [%s]", payload) + for response := range session.Receive() { + if string(response.Payload) == "EOF" { + logger.Infof("---> Receiver: Received EOF on [%d]", sessId) + return + } + elements := strings.Split(string(response.Payload), "-") + Expect(elements).To(HaveLen(3)) + sessId, msgId := utils.MustGet(strconv.Atoi(elements[1])), utils.MustGet(strconv.Atoi(elements[2])) + payload := fmt.Sprintf("response-%d-%d", sessId, msgId) + logger.Infof("---> Receiver: Send message [%s]", payload) + Expect(session.Send([]byte(payload))).To(Succeed()) + logger.Infof("---> Receiver: Sent message [%s]", payload) + } +} + +func displayMetrics(duration time.Duration, metricsMap map[string]benchmarkMetrics, protocol string) { + var latency int64 = 0 + + for _, metric := range metricsMap { + latency = latency + int64(metric.latency) } + + averageLatency := latency / int64(len(metricsMap)) + averageThrouput := int64(duration) / int64(2*numOfMsgs) + // Expect(senderSession.Info().EndpointPKID).To(Equal(receiverNode.pkID.Bytes())) + + fmt.Printf("============================= %s Metrics =============================\n", protocol) + fmt.Printf("Number of nodes: %d\n", numOfRoutes*2) + fmt.Printf("Number of sessions per node: %d\n", numOfSessions) + fmt.Printf("Number of Msgs per session: %d\n", 2*numOfMsgs) + fmt.Printf("Number of Msgs per session: %d\n", numOfMsgs) + fmt.Printf("Average latency: %d (ms) \n", averageLatency) + fmt.Printf("Average throuput: %d (msg/ms) \n", averageThrouput) + fmt.Printf("Total run time: %d (ms) \n", duration) } From 53218e6614390756a9e529e50904de15575a34b9 Mon Sep 17 00:00:00 2001 From: akram Date: Thu, 10 Apr 2025 18:16:30 +0300 Subject: [PATCH 09/12] Add metrics and enehnce the test Signed-off-by: akram --- platform/view/services/comm/comm_bench_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/platform/view/services/comm/comm_bench_test.go b/platform/view/services/comm/comm_bench_test.go index edcea81b8..b767fdeaf 100644 --- a/platform/view/services/comm/comm_bench_test.go +++ b/platform/view/services/comm/comm_bench_test.go @@ -35,7 +35,7 @@ type benchmarkMetrics struct { } const ( - numOfRoutes int = 40 + numOfRoutes int = 2 numOfSessions int = 10 numOfMsgs int = 10 ) @@ -250,10 +250,10 @@ func displayMetrics(duration time.Duration, metricsMap map[string]benchmarkMetri // Expect(senderSession.Info().EndpointPKID).To(Equal(receiverNode.pkID.Bytes())) fmt.Printf("============================= %s Metrics =============================\n", protocol) + fmt.Printf("Number of routes: %d\n", numOfRoutes) fmt.Printf("Number of nodes: %d\n", numOfRoutes*2) fmt.Printf("Number of sessions per node: %d\n", numOfSessions) fmt.Printf("Number of Msgs per session: %d\n", 2*numOfMsgs) - fmt.Printf("Number of Msgs per session: %d\n", numOfMsgs) fmt.Printf("Average latency: %d (ms) \n", averageLatency) fmt.Printf("Average throuput: %d (msg/ms) \n", averageThrouput) fmt.Printf("Total run time: %d (ms) \n", duration) From a15d2aa6c5285e46aad4998c13ce7d2fda03ff11 Mon Sep 17 00:00:00 2001 From: akram Date: Tue, 15 Apr 2025 11:12:12 +0300 Subject: [PATCH 10/12] test enhencements --- .../view/services/comm/comm_bench_test.go | 231 ++++++++++-------- 1 file changed, 132 insertions(+), 99 deletions(-) diff --git a/platform/view/services/comm/comm_bench_test.go b/platform/view/services/comm/comm_bench_test.go index b767fdeaf..2b1bf6d55 100644 --- a/platform/view/services/comm/comm_bench_test.go +++ b/platform/view/services/comm/comm_bench_test.go @@ -23,7 +23,13 @@ import ( . "github.com/onsi/gomega" ) -type routerNodes struct { +const ( + numOfNodes int = 2 + numOfSessions int = 1 + numOfMsgs int = 1 +) + +type Nodes struct { sender *Service senderNode Node receiverNode Node @@ -34,87 +40,107 @@ type benchmarkMetrics struct { latency time.Duration } -const ( - numOfRoutes int = 2 - numOfSessions int = 10 - numOfMsgs int = 10 -) +func createWebsocketNodes() []Nodes { -func createWebsocketRoutes() []routerNodes { - routersNodes := make([]routerNodes, numOfRoutes) + var nodes []Nodes + + for senderNodeNum := 0; senderNodeNum < numOfNodes; senderNodeNum++ { - for routeNum := 0; routeNum < numOfRoutes; routeNum++ { senderConfig := GetConfig("initiator") - receiverConfig := GetConfig("responder") + + var receiverConfigs []*Config + var receivesIPAddress []host.PeerIPAddress + for receivedNodeNum := 0; receivedNodeNum < numOfNodes; receivedNodeNum++ { + if senderNodeNum == receivedNodeNum { + continue + } + + receiverConfig := GetConfig("responder") + receivesIPAddress = append(receivesIPAddress, rest.ConvertAddress(receiverConfig.ListenAddress)) + receiverConfigs = append(receiverConfigs, receiverConfig) + } router := &routing.StaticIDRouter{ "sender": []host.PeerIPAddress{rest.ConvertAddress(senderConfig.ListenAddress)}, - "receiver": []host.PeerIPAddress{rest.ConvertAddress(receiverConfig.ListenAddress)}, - } + "receiver": receivesIPAddress} sender := NewWebsocketCommService(router, senderConfig) sender.Start(context.Background()) - receiver := NewWebsocketCommService(router, receiverConfig) - receiver.Start(context.Background()) senderNode := Node{ commService: sender, address: senderConfig.ListenAddress, pkID: []byte("sender"), } - receiverNode := Node{ - commService: receiver, - address: receiverConfig.ListenAddress, - pkID: []byte("receiver"), + + for _, receiverConfig := range receiverConfigs { + receiver := NewWebsocketCommService(router, receiverConfig) + receiver.Start(context.Background()) + receiverNode := Node{ + commService: receiver, + address: receiverConfig.ListenAddress, + pkID: []byte("receiver"), + } + nodes = append(nodes, Nodes{sender: sender, senderNode: senderNode, receiver: receiver, receiverNode: receiverNode}) } - routersNodes[routeNum] = routerNodes{sender: sender, senderNode: senderNode, receiver: receiver, receiverNode: receiverNode} } - - return routersNodes + return nodes } -func createLibp2pRoutes() []routerNodes { - routersNodes := make([]routerNodes, numOfRoutes) - for routeNum := 0; routeNum < numOfRoutes; routeNum++ { - senderConfig := GetConfig("initiator") - receiverConfig := GetConfig("responder") - receiverConfig.BootstrapNode = "sender" +func createLibp2pNodes() []Nodes { - sender, senderPkID := NewLibP2PCommService(senderConfig, nil) - sender.Start(context.Background()) - receiver, receiverPkID := NewLibP2PCommService(receiverConfig, &BootstrapNodeResolver{nodeID: senderPkID, nodeAddress: senderConfig.ListenAddress}) - receiver.Start(context.Background()) + var nodes []Nodes - senderNode := Node{ - commService: sender, - address: senderConfig.ListenAddress, - pkID: senderPkID, - } + for senderNodeNum := 0; senderNodeNum < numOfNodes; senderNodeNum++ { - receiverNode := Node{ - commService: receiver, - address: receiverConfig.ListenAddress, - pkID: receiverPkID, + senderConfig := GetConfig("initiator") + + for receivedNodeNum := 0; receivedNodeNum < numOfNodes; receivedNodeNum++ { + if senderNodeNum == receivedNodeNum { + continue + } + + receiverConfig := GetConfig("responder") + receiverConfig.BootstrapNode = "sender" + sender, senderPkID := NewLibP2PCommService(senderConfig, nil) + sender.Start(context.Background()) + receiver, receiverPkID := NewLibP2PCommService(receiverConfig, &BootstrapNodeResolver{nodeID: senderPkID, nodeAddress: senderConfig.ListenAddress}) + receiver.Start(context.Background()) + + senderNode := Node{ + commService: sender, + address: senderConfig.ListenAddress, + pkID: senderPkID, + } + + receiverNode := Node{ + commService: receiver, + address: receiverConfig.ListenAddress, + pkID: receiverPkID, + } + nodes = append(nodes, Nodes{sender: sender, senderNode: senderNode, receiver: receiver, receiverNode: receiverNode}) } - routersNodes[routeNum] = routerNodes{sender: sender, senderNode: senderNode, receiver: receiver, receiverNode: receiverNode} } - return routersNodes + return nodes } -func testNodesExchange(routersNodes []routerNodes, protocol string) { +func testNodesExchange(nodes []Nodes, protocol string) { metricsMap := map[string]benchmarkMetrics{} + mainwg := sync.WaitGroup{} startTime := time.Now() - for routeNum := 0; routeNum < numOfRoutes; routeNum++ { - testExchangeMsgs(routersNodes[routeNum].senderNode, routersNodes[routeNum].receiverNode, metricsMap) - } + for _, exchangeNodes := range nodes { + mainwg.Add(1) + go testExchangeMsgs(exchangeNodes.senderNode, exchangeNodes.receiverNode, metricsMap, &mainwg) + } + mainwg.Wait() endTime := time.Now() - for routeNum := 0; routeNum < numOfRoutes; routeNum++ { - routersNodes[routeNum].sender.Stop() - routersNodes[routeNum].receiver.Stop() + for _, exchangeNodes := range nodes { + exchangeNodes.sender.Stop() + exchangeNodes.receiver.Stop() } displayMetrics(endTime.Sub(startTime), metricsMap, protocol) @@ -122,65 +148,74 @@ func testNodesExchange(routersNodes []routerNodes, protocol string) { func BenchmarkTestWebsocket(b *testing.B) { RegisterTestingT(b) - testNodesExchange(createWebsocketRoutes(), "Websocket") + testNodesExchange(createWebsocketNodes(), "Websocket") } func BenchmarkTestLibp2p(b *testing.B) { RegisterTestingT(b) - testNodesExchange(createLibp2pRoutes(), "LipP2P") + testNodesExchange(createLibp2pNodes(), "LipP2P") } -func testExchangeMsgs(senderNode, receiverNode Node, metricsMap map[string]benchmarkMetrics) { +func testExchangeMsgs(senderNode, receiverNode Node, metricsMap map[string]benchmarkMetrics, mainwg *sync.WaitGroup) { + defer mainwg.Done() + wg := sync.WaitGroup{} wg.Add(2 * numOfSessions) - sessions := make([]view.Session, 0, 2*numOfSessions+1) + //sessions := make([]view.Session, 0, 2*numOfSessions+1) + var sessions []view.Session mu := sync.Mutex{} - go senderExchangeMsgs(senderNode, receiverNode, &sessions, metricsMap, &mu, &wg) + for sessId := 0; sessId < numOfSessions; sessId++ { + go senderExchangeMsgs(senderNode, receiverNode, sessId, &sessions, metricsMap, &mu, &wg) + } go receiverExchangeMsgs(receiverNode, &sessions, &mu, &wg) logger.Infof("Waiting on execution...") wg.Wait() + logger.Infof("Execution finished. Closing sessions") for _, s := range sessions { s.Close() } } -func senderExchangeMsgs(senderNode, receiverNode Node, sessions *[]view.Session, metricsMap map[string]benchmarkMetrics, mu *sync.Mutex, wg *sync.WaitGroup) { - for sessId := 0; sessId < numOfSessions; sessId++ { - defer wg.Done() - logger.Infof("---> Create new session # %d", sessId) - senderSession, err := senderNode.commService.NewSession("", "", rest.ConvertAddress(receiverNode.address), receiverNode.pkID) - Expect(err).ToNot(HaveOccurred()) - mu.Lock() - *sessions = append(*sessions, senderSession) - mu.Unlock() - Expect(senderSession.Info().Endpoint).To(Equal(rest.ConvertAddress(receiverNode.address))) - Expect(senderSession.Info().EndpointPKID).To(Equal(receiverNode.pkID.Bytes())) - logger.Infof("---> Created new session # %d [%v]", sessId, senderSession.Info()) - - for msgId := 0; msgId < numOfMsgs; msgId++ { - payload := fmt.Sprintf("request-%d-%d", sessId, msgId) - logger.Infof("---> Sender: sends message [%s]", payload) - start := time.Now() - Expect(senderSession.Send([]byte(payload))).To(Succeed()) - logger.Infof("---> Sender: sent message [%s]", payload) - response := <-senderSession.Receive() - end := time.Now() - elapsed := end.Sub(start) - Expect(response).ToNot(BeNil()) - Expect(string(response.Payload)).To(Equal(fmt.Sprintf("response-%d-%d", sessId, msgId))) - logger.Infof("---> Sender: received message: [%s]", string(response.Payload)) - - metricsMap[senderSession.Info().ID+strconv.Itoa(sessId)+strconv.Itoa(msgId)] = benchmarkMetrics{latency: elapsed} +func senderExchangeMsgs(senderNode, receiverNode Node, sessId int, sessions *[]view.Session, metricsMap map[string]benchmarkMetrics, mu *sync.Mutex, wg *sync.WaitGroup) { + logger.Infof("---> Create new session # %d", sessId) + senderSession, err := senderNode.commService.NewSession("", "", rest.ConvertAddress(receiverNode.address), receiverNode.pkID) + Expect(err).ToNot(HaveOccurred()) + mu.Lock() + *sessions = append(*sessions, senderSession) + mu.Unlock() + Expect(senderSession.Info().Endpoint).To(Equal(rest.ConvertAddress(receiverNode.address))) + Expect(senderSession.Info().EndpointPKID).To(Equal(receiverNode.pkID.Bytes())) + logger.Infof("---> Created new session # %d [%v]", sessId, senderSession.Info()) + + for msgId := 0; msgId < numOfMsgs; msgId++ { + payload := fmt.Sprintf("request-%d-%d", sessId, msgId) + logger.Infof("---> Sender: sends message [%s]", payload) + start := time.Now() + Expect(senderSession.Send([]byte(payload))).To(Succeed()) + logger.Infof("---> Sender: sent message [%s]", payload) + response := <-senderSession.Receive() + end := time.Now() + elapsed := end.Sub(start) + Expect(response).ToNot(BeNil()) + Expect(string(response.Payload)).To(Equal(fmt.Sprintf("response-%d-%d", sessId, msgId))) + logger.Infof("---> Sender: received message: [%s]", string(response.Payload)) + + _, ok := metricsMap[senderSession.Info().ID+strconv.Itoa(sessId)+strconv.Itoa(msgId)] + if ok { + Expect(ok).To(BeFalse(), "Metrics map keys should be unique") } - logger.Infof("Send EOF") - Expect(senderSession.Send([]byte("EOF"))).To(Succeed()) - logger.Infof("Sent EOF") + + metricsMap[senderSession.Info().ID+strconv.Itoa(sessId)+strconv.Itoa(msgId)] = benchmarkMetrics{latency: elapsed} } + logger.Infof("Send EOF") + Expect(senderSession.Send([]byte("EOF"))).To(Succeed()) + logger.Infof("Sent EOF") + defer wg.Done() } func receiverExchangeMsgs(receiverNode Node, sessions *[]view.Session, mu *sync.Mutex, wg *sync.WaitGroup) { @@ -213,7 +248,7 @@ func receiverExchangeMsgs(receiverNode Node, sessions *[]view.Session, mu *sync. *sessions = append(*sessions, session) mu.Unlock() - go receiverSessionExchangeMsgs(sessId, session, wg) + receiverSessionExchangeMsgs(sessId, session, wg) } } @@ -239,22 +274,20 @@ func receiverSessionExchangeMsgs(sessId int, session view.Session, wg *sync.Wait } func displayMetrics(duration time.Duration, metricsMap map[string]benchmarkMetrics, protocol string) { - var latency int64 = 0 + var latency time.Duration = 0 for _, metric := range metricsMap { - latency = latency + int64(metric.latency) + latency = latency + metric.latency } - averageLatency := latency / int64(len(metricsMap)) - averageThrouput := int64(duration) / int64(2*numOfMsgs) - // Expect(senderSession.Info().EndpointPKID).To(Equal(receiverNode.pkID.Bytes())) - - fmt.Printf("============================= %s Metrics =============================\n", protocol) - fmt.Printf("Number of routes: %d\n", numOfRoutes) - fmt.Printf("Number of nodes: %d\n", numOfRoutes*2) - fmt.Printf("Number of sessions per node: %d\n", numOfSessions) - fmt.Printf("Number of Msgs per session: %d\n", 2*numOfMsgs) - fmt.Printf("Average latency: %d (ms) \n", averageLatency) - fmt.Printf("Average throuput: %d (msg/ms) \n", averageThrouput) - fmt.Printf("Total run time: %d (ms) \n", duration) + averageLatency := latency / time.Duration(len(metricsMap)) + throughput := float64(2*numOfMsgs) / float64(duration.Seconds()) + + logger.Infof("============================= %s Metrics =============================\n", protocol) + logger.Infof("Number of nodes: %d\n", numOfNodes*2) + logger.Infof("Number of sessions per node: %d\n", numOfSessions) + logger.Infof("Number of Msgs per session: %d\n", 2*numOfMsgs) + logger.Infof("Average latency: %v\n", averageLatency) + logger.Infof("Throughput: %f messages per second \n", throughput) + logger.Infof("Total run time: %v\n", duration) } From e06ff9514a3152ca1f7ab8e10e312fdc0f4c8cb4 Mon Sep 17 00:00:00 2001 From: akram Date: Tue, 15 Apr 2025 17:19:07 +0300 Subject: [PATCH 11/12] test enhencements Signed-off-by: akram --- platform/view/services/comm/comm_bench_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/platform/view/services/comm/comm_bench_test.go b/platform/view/services/comm/comm_bench_test.go index 2b1bf6d55..1d9483501 100644 --- a/platform/view/services/comm/comm_bench_test.go +++ b/platform/view/services/comm/comm_bench_test.go @@ -131,8 +131,8 @@ func testNodesExchange(nodes []Nodes, protocol string) { mainwg := sync.WaitGroup{} startTime := time.Now() + mainwg.Add(len(nodes)) for _, exchangeNodes := range nodes { - mainwg.Add(1) go testExchangeMsgs(exchangeNodes.senderNode, exchangeNodes.receiverNode, metricsMap, &mainwg) } mainwg.Wait() @@ -248,7 +248,7 @@ func receiverExchangeMsgs(receiverNode Node, sessions *[]view.Session, mu *sync. *sessions = append(*sessions, session) mu.Unlock() - receiverSessionExchangeMsgs(sessId, session, wg) + go receiverSessionExchangeMsgs(sessId, session, wg) } } From 8cfce6478c4e207878a28df285b67ee1bb9cf045 Mon Sep 17 00:00:00 2001 From: Marcus Brandenburger Date: Wed, 16 Apr 2025 10:59:15 +0200 Subject: [PATCH 12/12] Add concurrent websocket session test (#887) Websocket connects may be used by multiple goroutines concurrently. This commit adds a test to stress the session handling, open, send, receive, and eventual close. Signed-off-by: Marcus Brandenburger --- platform/view/services/comm/comm_test.go | 105 ++++++++++++++++++++++- 1 file changed, 104 insertions(+), 1 deletion(-) diff --git a/platform/view/services/comm/comm_test.go b/platform/view/services/comm/comm_test.go index ac151618d..8e8d7f0fe 100644 --- a/platform/view/services/comm/comm_test.go +++ b/platform/view/services/comm/comm_test.go @@ -10,6 +10,7 @@ import ( "context" "sync" "testing" + "time" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host" "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest" @@ -20,6 +21,26 @@ import ( func TestWebsocketSession(t *testing.T) { RegisterTestingT(t) + aliceNode, bobNode := setupWebsocketSession() + + testExchange(aliceNode, bobNode) +} + +// TestWebsocketSessionManySenders tests with multiple sender goroutines; creating a new session for every interaction +// TODO: current this test seems to deadlock, and cause the test to timeout; +// go test -v -count 10 -failfast -timeout 30s -run ^TestWebsocketSessionManySenders$ +func TestWebsocketSessionManySenders(t *testing.T) { + RegisterTestingT(t) + + numWorkers := 1 + numMessages := 1000 + + aliceNode, bobNode := setupWebsocketSession() + testExchangeManySenders(aliceNode, bobNode, numWorkers, numMessages) + shutdown(aliceNode, bobNode) +} + +func setupWebsocketSession() (Node, Node) { aliceConfig, bobConfig := GetConfig("initiator"), GetConfig("responder") router := &routing.StaticIDRouter{ @@ -42,7 +63,16 @@ func TestWebsocketSession(t *testing.T) { pkID: []byte("bob"), } - testExchange(aliceNode, bobNode) + return aliceNode, bobNode +} + +func shutdown(nodes ...Node) { + // TODO: how to check that the comm service is actually stopped? + for _, n := range nodes { + n.commService.Stop() + } + // until we figure out how to check when the comm service has stopped completely we give it a bit of time to ZzzZzz + time.Sleep(1 * time.Second) } func TestLibp2pSession(t *testing.T) { @@ -111,3 +141,76 @@ func testExchange(aliceNode, bobNode Node) { wg.Wait() } + +func testExchangeManySenders(aliceNode, bobNode Node, numWorker, numOfMsgs int) { + var wgBob sync.WaitGroup + + ctx, cancel := context.WithCancel(context.Background()) + + // setup bob as our receiver + bobMasterSession, err := bobNode.commService.MasterSession() + Expect(err).ToNot(HaveOccurred()) + wgBob.Add(1) + go func() { + defer wgBob.Done() + for { + select { + // run until we close via the context + case <-ctx.Done(): + return + case response := <-bobMasterSession.Receive(): + // get our message from master session + Expect(response).ToNot(BeNil()) + Expect(response.Payload).To(Equal([]byte("ping"))) + + // create a response session + bobSession, err := bobNode.commService.NewSessionWithID(response.SessionID, "", response.FromEndpoint, response.FromPKID, nil, nil) + Expect(err).ToNot(HaveOccurred()) + Expect(bobSession.Send([]byte("pong"))).To(Succeed()) + + // close it + bobSession.Close() + Eventually(bobSession.Info().Closed).Should(BeTrue()) + } + } + }() + + // setup alice our sender + var wgAlice sync.WaitGroup + for i := 0; i <= numWorker; i++ { + wgAlice.Add(1) + go func() { + defer wgAlice.Done() + + // we send every message in a fresh session + for j := 0; j <= numOfMsgs; j++ { + // setup + aliceSession, err := aliceNode.commService.NewSession("", "", rest.ConvertAddress(bobNode.address), bobNode.pkID) + Expect(err).ToNot(HaveOccurred()) + Expect(aliceSession.Info().Endpoint).To(Equal(rest.ConvertAddress(bobNode.address))) + Expect(aliceSession.Info().EndpointPKID).To(Equal(bobNode.pkID.Bytes())) + + // send + Eventually(aliceSession.Send([]byte("ping"))).Should(Succeed()) + + // receive + Eventually(func(g Gomega) { + response := <-aliceSession.Receive() + g.Expect(response).ToNot(BeNil()) + g.Expect(response).To(HaveField("Payload", Equal([]byte("pong")))) + }).Should(Succeed()) + + // close + aliceSession.Close() + Eventually(aliceSession.Info().Closed).Should(BeTrue()) + } + }() + } + + wgAlice.Wait() + cancel() + wgBob.Wait() + + bobMasterSession.Close() + Eventually(bobMasterSession.Info().Closed).Should(BeTrue()) +}