diff --git a/platform/view/core/endpoint/endpoint.go b/platform/view/core/endpoint/endpoint.go index b970372bc..c91046d78 100644 --- a/platform/view/core/endpoint/endpoint.go +++ b/platform/view/core/endpoint/endpoint.go @@ -9,7 +9,6 @@ package endpoint import ( "bytes" "net" - "reflect" "strings" "sync" @@ -19,7 +18,6 @@ import ( "github.com/hyperledger-labs/fabric-smart-client/platform/view/driver" "github.com/hyperledger-labs/fabric-smart-client/platform/view/view" "github.com/pkg/errors" - "go.uber.org/zap/zapcore" "golang.org/x/exp/slices" ) @@ -69,17 +67,14 @@ type Service struct { resolversMutex sync.RWMutex bindingKVS driver2.BindingStore - pkiExtractorsLock sync.RWMutex - publicKeyExtractors []driver.PublicKeyExtractor - publicKeyIDSynthesizer driver.PublicKeyIDSynthesizer + pkiExtractor *PKIExtractor } // NewService returns a new instance of the view-sdk endpoint service func NewService(bindingKVS driver2.BindingStore) (*Service, error) { er := &Service{ - bindingKVS: bindingKVS, - publicKeyExtractors: []driver.PublicKeyExtractor{}, - publicKeyIDSynthesizer: DefaultPublicKeyIDSynthesizer{}, + bindingKVS: bindingKVS, + pkiExtractor: NewPKIExtractor(), } return er, nil } @@ -89,7 +84,7 @@ func (r *Service) Resolve(party view.Identity) (driver.Resolver, []byte, error) if err != nil { return nil, nil, err } - return resolver, r.pkiResolve(resolver), nil + return resolver, r.pkiExtractor.PkiResolve(resolver), nil } func (r *Service) GetResolver(party view.Identity) (driver.Resolver, error) { @@ -161,7 +156,7 @@ func (r *Service) matchesResolver(endpoint string, pkID []byte, resolver *Resolv } return len(pkID) > 0 && (bytes.Equal(pkID, resolver.Id) || - bytes.Equal(pkID, r.pkiResolve(resolver))) + bytes.Equal(pkID, r.pkiExtractor.PkiResolve(resolver))) } func (r *Service) AddResolver(name string, domain string, addresses map[string]string, aliases []string, id []byte) (view.Identity, error) { @@ -203,54 +198,15 @@ func (r *Service) AddResolver(name string, domain string, addresses map[string]s } func (r *Service) AddPublicKeyExtractor(publicKeyExtractor driver.PublicKeyExtractor) error { - r.pkiExtractorsLock.Lock() - defer r.pkiExtractorsLock.Unlock() - - if publicKeyExtractor == nil { - return errors.New("pki resolver should not be nil") - } - r.publicKeyExtractors = append(r.publicKeyExtractors, publicKeyExtractor) - return nil + return r.pkiExtractor.AddPublicKeyExtractor(publicKeyExtractor) } func (r *Service) SetPublicKeyIDSynthesizer(publicKeyIDSynthesizer driver.PublicKeyIDSynthesizer) { - r.publicKeyIDSynthesizer = publicKeyIDSynthesizer -} - -func (r *Service) pkiResolve(resolver *Resolver) []byte { - resolver.PKILock.RLock() - if len(resolver.PKI) != 0 { - resolver.PKILock.RUnlock() - return resolver.PKI - } - resolver.PKILock.RUnlock() - - resolver.PKILock.Lock() - defer resolver.PKILock.Unlock() - if len(resolver.PKI) == 0 { - resolver.PKI = r.ExtractPKI(resolver.Id) - } - return resolver.PKI + r.pkiExtractor.SetPublicKeyIDSynthesizer(publicKeyIDSynthesizer) } func (r *Service) ExtractPKI(id []byte) []byte { - r.pkiExtractorsLock.RLock() - defer r.pkiExtractorsLock.RUnlock() - - for _, extractor := range r.publicKeyExtractors { - if pk, err := extractor.ExtractPublicKey(id); pk != nil { - if logger.IsEnabledFor(zapcore.DebugLevel) { - logger.Debugf("pki resolved for [%s]", id) - } - return r.publicKeyIDSynthesizer.PublicKeyID(pk) - } else { - if logger.IsEnabledFor(zapcore.DebugLevel) { - logger.Debugf("pki not resolved by [%s] for [%s]: [%s]", getIdentifier(extractor), id, err) - } - } - } - logger.Warnf("cannot resolve pki for [%s]", id) - return nil + return r.pkiExtractor.ExtractPKI(id) } func (r *Service) rootEndpoint(party view.Identity) (*Resolver, error) { @@ -296,14 +252,3 @@ func LookupIPv4(endpoint string) string { port := s[1] return net.JoinHostPort(addrS, port) } - -func getIdentifier(f any) string { - if f == nil { - return "" - } - t := reflect.TypeOf(f) - for t.Kind() == reflect.Ptr { - t = t.Elem() - } - return t.PkgPath() + "/" + t.Name() -} diff --git a/platform/view/core/endpoint/endpoint_test.go b/platform/view/core/endpoint/endpoint_test.go index fa54c74e2..92dc47e78 100644 --- a/platform/view/core/endpoint/endpoint_test.go +++ b/platform/view/core/endpoint/endpoint_test.go @@ -41,7 +41,7 @@ func TestPKIResolveConcurrency(t *testing.T) { for i := 0; i < 100; i++ { go func() { defer wg.Done() - svc.pkiResolve(resolver) + svc.pkiExtractor.PkiResolve(resolver) }() } wg.Wait() diff --git a/platform/view/core/endpoint/extractor.go b/platform/view/core/endpoint/extractor.go new file mode 100644 index 000000000..f516823b2 --- /dev/null +++ b/platform/view/core/endpoint/extractor.go @@ -0,0 +1,91 @@ +package endpoint + +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +import ( + "reflect" + "sync" + + "github.com/hyperledger-labs/fabric-smart-client/platform/view/driver" + "github.com/pkg/errors" + "go.uber.org/zap/zapcore" +) + +func NewPKIExtractor() *PKIExtractor { + return &PKIExtractor{ + publicKeyExtractors: []driver.PublicKeyExtractor{}, + publicKeyIDSynthesizer: DefaultPublicKeyIDSynthesizer{}, + } +} + +type PKIExtractor struct { + pkiExtractorsLock sync.RWMutex + publicKeyExtractors []driver.PublicKeyExtractor + publicKeyIDSynthesizer driver.PublicKeyIDSynthesizer +} + +func (r *PKIExtractor) AddPublicKeyExtractor(publicKeyExtractor driver.PublicKeyExtractor) error { + r.pkiExtractorsLock.Lock() + defer r.pkiExtractorsLock.Unlock() + + if publicKeyExtractor == nil { + return errors.New("pki resolver should not be nil") + } + r.publicKeyExtractors = append(r.publicKeyExtractors, publicKeyExtractor) + return nil +} + +func (r *PKIExtractor) SetPublicKeyIDSynthesizer(publicKeyIDSynthesizer driver.PublicKeyIDSynthesizer) { + r.publicKeyIDSynthesizer = publicKeyIDSynthesizer +} + +func (r *PKIExtractor) PkiResolve(resolver *Resolver) []byte { + resolver.PKILock.RLock() + if len(resolver.PKI) != 0 { + resolver.PKILock.RUnlock() + return resolver.PKI + } + resolver.PKILock.RUnlock() + + resolver.PKILock.Lock() + defer resolver.PKILock.Unlock() + if len(resolver.PKI) == 0 { + resolver.PKI = r.ExtractPKI(resolver.Id) + } + return resolver.PKI +} + +func (r *PKIExtractor) ExtractPKI(id []byte) []byte { + r.pkiExtractorsLock.RLock() + defer r.pkiExtractorsLock.RUnlock() + + for _, extractor := range r.publicKeyExtractors { + if pk, err := extractor.ExtractPublicKey(id); pk != nil { + if logger.IsEnabledFor(zapcore.DebugLevel) { + logger.Debugf("pki resolved for [%s]", id) + } + return r.publicKeyIDSynthesizer.PublicKeyID(pk) + } else { + if logger.IsEnabledFor(zapcore.DebugLevel) { + logger.Debugf("pki not resolved by [%s] for [%s]: [%s]", getIdentifier(extractor), id, err) + } + } + } + logger.Warnf("cannot resolve pki for [%s]", id) + return nil +} + +func getIdentifier(f any) string { + if f == nil { + return "" + } + t := reflect.TypeOf(f) + for t.Kind() == reflect.Ptr { + t = t.Elem() + } + return t.PkgPath() + "/" + t.Name() +} diff --git a/platform/view/services/comm/comm.go b/platform/view/services/comm/comm.go index f6fffa0b8..db37c2306 100644 --- a/platform/view/services/comm/comm.go +++ b/platform/view/services/comm/comm.go @@ -33,7 +33,7 @@ type ConfigService interface { type Service struct { HostProvider host.GeneratorProvider EndpointService EndpointService - ConfigService ConfigService + config *Config Node *P2PNode NodeSync sync.RWMutex @@ -41,15 +41,31 @@ type Service struct { metricsProvider metrics.Provider } +type Config struct { + ListenAddress string + BootstrapNode string + KeyFile string + CertFile string +} + func NewService(hostProvider host.GeneratorProvider, endpointService EndpointService, configService ConfigService, tracerProvider trace.TracerProvider, metricsProvider metrics.Provider) (*Service, error) { - s := &Service{ + config := &Config{ + ListenAddress: configService.GetString("fsc.p2p.listenAddress"), + BootstrapNode: configService.GetString("fsc.p2p.opts.bootstrapNode"), + KeyFile: configService.GetPath("fsc.identity.key.file"), + CertFile: configService.GetPath("fsc.identity.cert.file"), + } + return newService(hostProvider, endpointService, config, tracerProvider, metricsProvider), nil +} + +func newService(hostProvider host.GeneratorProvider, endpointService EndpointService, config *Config, tracerProvider trace.TracerProvider, metricsProvider metrics.Provider) *Service { + return &Service{ HostProvider: hostProvider, EndpointService: endpointService, - ConfigService: configService, + config: config, tracerProvider: tracerProvider, metricsProvider: metricsProvider, } - return s, nil } func (s *Service) Start(ctx context.Context) { @@ -123,10 +139,10 @@ func (s *Service) init() error { return nil } - p2pListenAddress := s.ConfigService.GetString("fsc.p2p.listenAddress") - p2pBootstrapNode := s.ConfigService.GetString("fsc.p2p.opts.bootstrapNode") - keyFile := s.ConfigService.GetPath("fsc.identity.key.file") - certFile := s.ConfigService.GetPath("fsc.identity.cert.file") + p2pListenAddress := s.config.ListenAddress + p2pBootstrapNode := s.config.BootstrapNode + keyFile := s.config.KeyFile + certFile := s.config.CertFile if len(p2pBootstrapNode) == 0 { // this is a bootstrap node diff --git a/platform/view/services/comm/comm_bench_test.go b/platform/view/services/comm/comm_bench_test.go new file mode 100644 index 000000000..1d9483501 --- /dev/null +++ b/platform/view/services/comm/comm_bench_test.go @@ -0,0 +1,293 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package comm + +import ( + "context" + "fmt" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/hyperledger-labs/fabric-smart-client/platform/common/utils" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest/routing" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/view" + . "github.com/onsi/gomega" +) + +const ( + numOfNodes int = 2 + numOfSessions int = 1 + numOfMsgs int = 1 +) + +type Nodes struct { + sender *Service + senderNode Node + receiverNode Node + receiver *Service +} + +type benchmarkMetrics struct { + latency time.Duration +} + +func createWebsocketNodes() []Nodes { + + var nodes []Nodes + + for senderNodeNum := 0; senderNodeNum < numOfNodes; senderNodeNum++ { + + senderConfig := GetConfig("initiator") + + var receiverConfigs []*Config + var receivesIPAddress []host.PeerIPAddress + for receivedNodeNum := 0; receivedNodeNum < numOfNodes; receivedNodeNum++ { + if senderNodeNum == receivedNodeNum { + continue + } + + receiverConfig := GetConfig("responder") + receivesIPAddress = append(receivesIPAddress, rest.ConvertAddress(receiverConfig.ListenAddress)) + receiverConfigs = append(receiverConfigs, receiverConfig) + } + + router := &routing.StaticIDRouter{ + "sender": []host.PeerIPAddress{rest.ConvertAddress(senderConfig.ListenAddress)}, + "receiver": receivesIPAddress} + + sender := NewWebsocketCommService(router, senderConfig) + sender.Start(context.Background()) + + senderNode := Node{ + commService: sender, + address: senderConfig.ListenAddress, + pkID: []byte("sender"), + } + + for _, receiverConfig := range receiverConfigs { + receiver := NewWebsocketCommService(router, receiverConfig) + receiver.Start(context.Background()) + receiverNode := Node{ + commService: receiver, + address: receiverConfig.ListenAddress, + pkID: []byte("receiver"), + } + nodes = append(nodes, Nodes{sender: sender, senderNode: senderNode, receiver: receiver, receiverNode: receiverNode}) + } + } + return nodes +} + +func createLibp2pNodes() []Nodes { + + var nodes []Nodes + + for senderNodeNum := 0; senderNodeNum < numOfNodes; senderNodeNum++ { + + senderConfig := GetConfig("initiator") + + for receivedNodeNum := 0; receivedNodeNum < numOfNodes; receivedNodeNum++ { + if senderNodeNum == receivedNodeNum { + continue + } + + receiverConfig := GetConfig("responder") + receiverConfig.BootstrapNode = "sender" + sender, senderPkID := NewLibP2PCommService(senderConfig, nil) + sender.Start(context.Background()) + receiver, receiverPkID := NewLibP2PCommService(receiverConfig, &BootstrapNodeResolver{nodeID: senderPkID, nodeAddress: senderConfig.ListenAddress}) + receiver.Start(context.Background()) + + senderNode := Node{ + commService: sender, + address: senderConfig.ListenAddress, + pkID: senderPkID, + } + + receiverNode := Node{ + commService: receiver, + address: receiverConfig.ListenAddress, + pkID: receiverPkID, + } + nodes = append(nodes, Nodes{sender: sender, senderNode: senderNode, receiver: receiver, receiverNode: receiverNode}) + } + } + + return nodes +} + +func testNodesExchange(nodes []Nodes, protocol string) { + metricsMap := map[string]benchmarkMetrics{} + + mainwg := sync.WaitGroup{} + startTime := time.Now() + + mainwg.Add(len(nodes)) + for _, exchangeNodes := range nodes { + go testExchangeMsgs(exchangeNodes.senderNode, exchangeNodes.receiverNode, metricsMap, &mainwg) + } + mainwg.Wait() + endTime := time.Now() + + for _, exchangeNodes := range nodes { + exchangeNodes.sender.Stop() + exchangeNodes.receiver.Stop() + } + + displayMetrics(endTime.Sub(startTime), metricsMap, protocol) +} + +func BenchmarkTestWebsocket(b *testing.B) { + RegisterTestingT(b) + testNodesExchange(createWebsocketNodes(), "Websocket") +} + +func BenchmarkTestLibp2p(b *testing.B) { + RegisterTestingT(b) + testNodesExchange(createLibp2pNodes(), "LipP2P") +} + +func testExchangeMsgs(senderNode, receiverNode Node, metricsMap map[string]benchmarkMetrics, mainwg *sync.WaitGroup) { + defer mainwg.Done() + + wg := sync.WaitGroup{} + wg.Add(2 * numOfSessions) + + //sessions := make([]view.Session, 0, 2*numOfSessions+1) + var sessions []view.Session + mu := sync.Mutex{} + + for sessId := 0; sessId < numOfSessions; sessId++ { + go senderExchangeMsgs(senderNode, receiverNode, sessId, &sessions, metricsMap, &mu, &wg) + } + go receiverExchangeMsgs(receiverNode, &sessions, &mu, &wg) + + logger.Infof("Waiting on execution...") + + wg.Wait() + + logger.Infof("Execution finished. Closing sessions") + for _, s := range sessions { + s.Close() + } +} + +func senderExchangeMsgs(senderNode, receiverNode Node, sessId int, sessions *[]view.Session, metricsMap map[string]benchmarkMetrics, mu *sync.Mutex, wg *sync.WaitGroup) { + logger.Infof("---> Create new session # %d", sessId) + senderSession, err := senderNode.commService.NewSession("", "", rest.ConvertAddress(receiverNode.address), receiverNode.pkID) + Expect(err).ToNot(HaveOccurred()) + mu.Lock() + *sessions = append(*sessions, senderSession) + mu.Unlock() + Expect(senderSession.Info().Endpoint).To(Equal(rest.ConvertAddress(receiverNode.address))) + Expect(senderSession.Info().EndpointPKID).To(Equal(receiverNode.pkID.Bytes())) + logger.Infof("---> Created new session # %d [%v]", sessId, senderSession.Info()) + + for msgId := 0; msgId < numOfMsgs; msgId++ { + payload := fmt.Sprintf("request-%d-%d", sessId, msgId) + logger.Infof("---> Sender: sends message [%s]", payload) + start := time.Now() + Expect(senderSession.Send([]byte(payload))).To(Succeed()) + logger.Infof("---> Sender: sent message [%s]", payload) + response := <-senderSession.Receive() + end := time.Now() + elapsed := end.Sub(start) + Expect(response).ToNot(BeNil()) + Expect(string(response.Payload)).To(Equal(fmt.Sprintf("response-%d-%d", sessId, msgId))) + logger.Infof("---> Sender: received message: [%s]", string(response.Payload)) + + _, ok := metricsMap[senderSession.Info().ID+strconv.Itoa(sessId)+strconv.Itoa(msgId)] + if ok { + Expect(ok).To(BeFalse(), "Metrics map keys should be unique") + } + + metricsMap[senderSession.Info().ID+strconv.Itoa(sessId)+strconv.Itoa(msgId)] = benchmarkMetrics{latency: elapsed} + } + logger.Infof("Send EOF") + Expect(senderSession.Send([]byte("EOF"))).To(Succeed()) + logger.Infof("Sent EOF") + defer wg.Done() +} + +func receiverExchangeMsgs(receiverNode Node, sessions *[]view.Session, mu *sync.Mutex, wg *sync.WaitGroup) { + receiverMasterSession, err := receiverNode.commService.MasterSession() + Expect(err).ToNot(HaveOccurred()) + + mu.Lock() + *sessions = append(*sessions, receiverMasterSession) + mu.Unlock() + sessionMap := map[string]struct{}{} + + logger.Infof("---> Receiver: start receiving on master session") + for response := range receiverMasterSession.Receive() { + Expect(response).ToNot(BeNil()) + logger.Infof("---> Receiver: received message [%s]", string(response.Payload)) + Expect(string(response.Payload)).To(ContainSubstring("request")) + elements := strings.Split(string(response.Payload), "-") + Expect(elements).To(HaveLen(3)) + sessId, msgId := utils.MustGet(strconv.Atoi(elements[1])), utils.MustGet(strconv.Atoi(elements[2])) + Expect(msgId).To(Equal(0)) + + _, ok := sessionMap[response.SessionID] + Expect(ok).To(BeFalse(), "we should not receive a second message on the master session [%s]", string(response.Payload)) + logger.Infof("---> Receiver: open session [%d]", sessId) + session, err := receiverNode.commService.NewSessionWithID(response.SessionID, "", response.FromEndpoint, response.FromPKID, nil, nil) + Expect(err).To(BeNil()) + logger.Infof("---> Receiver: opened session [%d] [%v]", sessId, session.Info()) + sessionMap[response.SessionID] = struct{}{} + mu.Lock() + *sessions = append(*sessions, session) + mu.Unlock() + + go receiverSessionExchangeMsgs(sessId, session, wg) + } +} + +func receiverSessionExchangeMsgs(sessId int, session view.Session, wg *sync.WaitGroup) { + defer wg.Done() + payload := fmt.Sprintf("response-%d-0", sessId) + logger.Infof("---> Receiver: Send message [%s]", payload) + Expect(session.Send([]byte(payload))).To(Succeed()) + logger.Infof("---> Receiver: Sent message [%s]", payload) + for response := range session.Receive() { + if string(response.Payload) == "EOF" { + logger.Infof("---> Receiver: Received EOF on [%d]", sessId) + return + } + elements := strings.Split(string(response.Payload), "-") + Expect(elements).To(HaveLen(3)) + sessId, msgId := utils.MustGet(strconv.Atoi(elements[1])), utils.MustGet(strconv.Atoi(elements[2])) + payload := fmt.Sprintf("response-%d-%d", sessId, msgId) + logger.Infof("---> Receiver: Send message [%s]", payload) + Expect(session.Send([]byte(payload))).To(Succeed()) + logger.Infof("---> Receiver: Sent message [%s]", payload) + } +} + +func displayMetrics(duration time.Duration, metricsMap map[string]benchmarkMetrics, protocol string) { + var latency time.Duration = 0 + + for _, metric := range metricsMap { + latency = latency + metric.latency + } + + averageLatency := latency / time.Duration(len(metricsMap)) + throughput := float64(2*numOfMsgs) / float64(duration.Seconds()) + + logger.Infof("============================= %s Metrics =============================\n", protocol) + logger.Infof("Number of nodes: %d\n", numOfNodes*2) + logger.Infof("Number of sessions per node: %d\n", numOfSessions) + logger.Infof("Number of Msgs per session: %d\n", 2*numOfMsgs) + logger.Infof("Average latency: %v\n", averageLatency) + logger.Infof("Throughput: %f messages per second \n", throughput) + logger.Infof("Total run time: %v\n", duration) +} diff --git a/platform/view/services/comm/comm_test.go b/platform/view/services/comm/comm_test.go new file mode 100644 index 000000000..8e8d7f0fe --- /dev/null +++ b/platform/view/services/comm/comm_test.go @@ -0,0 +1,216 @@ +/* +Copyright IBM Corp. All Rights Reserved. + +SPDX-License-Identifier: Apache-2.0 +*/ + +package comm + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest/routing" + . "github.com/onsi/gomega" +) + +func TestWebsocketSession(t *testing.T) { + RegisterTestingT(t) + + aliceNode, bobNode := setupWebsocketSession() + + testExchange(aliceNode, bobNode) +} + +// TestWebsocketSessionManySenders tests with multiple sender goroutines; creating a new session for every interaction +// TODO: current this test seems to deadlock, and cause the test to timeout; +// go test -v -count 10 -failfast -timeout 30s -run ^TestWebsocketSessionManySenders$ +func TestWebsocketSessionManySenders(t *testing.T) { + RegisterTestingT(t) + + numWorkers := 1 + numMessages := 1000 + + aliceNode, bobNode := setupWebsocketSession() + testExchangeManySenders(aliceNode, bobNode, numWorkers, numMessages) + shutdown(aliceNode, bobNode) +} + +func setupWebsocketSession() (Node, Node) { + aliceConfig, bobConfig := GetConfig("initiator"), GetConfig("responder") + + router := &routing.StaticIDRouter{ + "alice": []host.PeerIPAddress{rest.ConvertAddress(aliceConfig.ListenAddress)}, + "bob": []host.PeerIPAddress{rest.ConvertAddress(bobConfig.ListenAddress)}, + } + alice := NewWebsocketCommService(router, aliceConfig) + alice.Start(context.Background()) + bob := NewWebsocketCommService(router, bobConfig) + bob.Start(context.Background()) + + aliceNode := Node{ + commService: alice, + address: aliceConfig.ListenAddress, + pkID: []byte("alice"), + } + bobNode := Node{ + commService: bob, + address: bobConfig.ListenAddress, + pkID: []byte("bob"), + } + + return aliceNode, bobNode +} + +func shutdown(nodes ...Node) { + // TODO: how to check that the comm service is actually stopped? + for _, n := range nodes { + n.commService.Stop() + } + // until we figure out how to check when the comm service has stopped completely we give it a bit of time to ZzzZzz + time.Sleep(1 * time.Second) +} + +func TestLibp2pSession(t *testing.T) { + RegisterTestingT(t) + + aliceConfig, bobConfig := GetConfig("initiator"), GetConfig("responder") + bobConfig.BootstrapNode = "alice" + + alice, alicePkID := NewLibP2PCommService(aliceConfig, nil) + alice.Start(context.Background()) + bob, bobPkID := NewLibP2PCommService(bobConfig, &BootstrapNodeResolver{nodeID: alicePkID, nodeAddress: aliceConfig.ListenAddress}) + bob.Start(context.Background()) + + aliceNode := Node{ + commService: alice, + address: aliceConfig.ListenAddress, + pkID: alicePkID, + } + bobNode := Node{ + commService: bob, + address: bobConfig.ListenAddress, + pkID: bobPkID, + } + + testExchange(aliceNode, bobNode) +} + +func testExchange(aliceNode, bobNode Node) { + wg := sync.WaitGroup{} + wg.Add(2) + + aliceSession, err := aliceNode.commService.NewSession("", "", rest.ConvertAddress(bobNode.address), bobNode.pkID) + Expect(err).ToNot(HaveOccurred()) + Expect(aliceSession.Info().Endpoint).To(Equal(rest.ConvertAddress(bobNode.address))) + Expect(aliceSession.Info().EndpointPKID).To(Equal(bobNode.pkID.Bytes())) + bobMasterSession, err := bobNode.commService.MasterSession() + Expect(err).ToNot(HaveOccurred()) + go func() { + defer wg.Done() + Expect(aliceSession.Send([]byte("msg1"))).To(Succeed()) + Expect(aliceSession.Send([]byte("msg2"))).To(Succeed()) + response := <-aliceSession.Receive() + Expect(response).ToNot(BeNil()) + Expect(response).To(HaveField("Payload", Equal([]byte("msg3")))) + Expect(aliceSession.Send([]byte("msg4"))).To(Succeed()) + }() + + go func() { + defer wg.Done() + response := <-bobMasterSession.Receive() + Expect(response).ToNot(BeNil()) + Expect(response.Payload).To(Equal([]byte("msg1"))) + Expect(response.SessionID).To(Equal(aliceSession.Info().ID)) + + response = <-bobMasterSession.Receive() + Expect(response).ToNot(BeNil()) + Expect(response.Payload).To(Equal([]byte("msg2"))) + + bobSession, err := bobNode.commService.NewSessionWithID(response.SessionID, "", response.FromEndpoint, response.FromPKID, nil, nil) + Expect(err).ToNot(HaveOccurred()) + Expect(bobSession.Send([]byte("msg3"))).To(Succeed()) + response = <-bobSession.Receive() + Expect(response).ToNot(BeNil()) + Expect(response.Payload).To(Equal([]byte("msg4"))) + }() + + wg.Wait() +} + +func testExchangeManySenders(aliceNode, bobNode Node, numWorker, numOfMsgs int) { + var wgBob sync.WaitGroup + + ctx, cancel := context.WithCancel(context.Background()) + + // setup bob as our receiver + bobMasterSession, err := bobNode.commService.MasterSession() + Expect(err).ToNot(HaveOccurred()) + wgBob.Add(1) + go func() { + defer wgBob.Done() + for { + select { + // run until we close via the context + case <-ctx.Done(): + return + case response := <-bobMasterSession.Receive(): + // get our message from master session + Expect(response).ToNot(BeNil()) + Expect(response.Payload).To(Equal([]byte("ping"))) + + // create a response session + bobSession, err := bobNode.commService.NewSessionWithID(response.SessionID, "", response.FromEndpoint, response.FromPKID, nil, nil) + Expect(err).ToNot(HaveOccurred()) + Expect(bobSession.Send([]byte("pong"))).To(Succeed()) + + // close it + bobSession.Close() + Eventually(bobSession.Info().Closed).Should(BeTrue()) + } + } + }() + + // setup alice our sender + var wgAlice sync.WaitGroup + for i := 0; i <= numWorker; i++ { + wgAlice.Add(1) + go func() { + defer wgAlice.Done() + + // we send every message in a fresh session + for j := 0; j <= numOfMsgs; j++ { + // setup + aliceSession, err := aliceNode.commService.NewSession("", "", rest.ConvertAddress(bobNode.address), bobNode.pkID) + Expect(err).ToNot(HaveOccurred()) + Expect(aliceSession.Info().Endpoint).To(Equal(rest.ConvertAddress(bobNode.address))) + Expect(aliceSession.Info().EndpointPKID).To(Equal(bobNode.pkID.Bytes())) + + // send + Eventually(aliceSession.Send([]byte("ping"))).Should(Succeed()) + + // receive + Eventually(func(g Gomega) { + response := <-aliceSession.Receive() + g.Expect(response).ToNot(BeNil()) + g.Expect(response).To(HaveField("Payload", Equal([]byte("pong")))) + }).Should(Succeed()) + + // close + aliceSession.Close() + Eventually(aliceSession.Info().Closed).Should(BeTrue()) + } + }() + } + + wgAlice.Wait() + cancel() + wgBob.Wait() + + bobMasterSession.Close() + Eventually(bobMasterSession.Info().Closed).Should(BeTrue()) +} diff --git a/platform/view/services/comm/host/rest/provider.go b/platform/view/services/comm/host/rest/provider.go index 1bdad7930..eed10960e 100644 --- a/platform/view/services/comm/host/rest/provider.go +++ b/platform/view/services/comm/host/rest/provider.go @@ -42,14 +42,14 @@ func (p *endpointServiceBasedProvider) NewBootstrapHost(listenAddress host2.Peer return nil, errors.Wrapf(err, "failed to load identity in [%s]", certPath) } nodeID := string(p.pkiExtractor.ExtractPKI(raw)) - return NewHost(nodeID, convertAddress(listenAddress), p.routing, p.tracerProvider, p.streamProvider, privateKeyPath, certPath, nil) + return NewHost(nodeID, ConvertAddress(listenAddress), p.routing, p.tracerProvider, p.streamProvider, privateKeyPath, certPath, nil) } func (p *endpointServiceBasedProvider) NewHost(listenAddress host2.PeerIPAddress, privateKeyPath, certPath string, _ host2.PeerIPAddress) (host2.P2PHost, error) { return p.NewBootstrapHost(listenAddress, privateKeyPath, certPath) } -func convertAddress(addr string) string { +func ConvertAddress(addr string) string { parts := strings.Split(addr, "/") if len(parts) != 5 { panic("unexpected address found: " + addr) diff --git a/platform/view/services/comm/test_utils.go b/platform/view/services/comm/test_utils.go index 6f7798e31..9d2af37e3 100644 --- a/platform/view/services/comm/test_utils.go +++ b/platform/view/services/comm/test_utils.go @@ -8,17 +8,40 @@ package comm import ( "context" + "fmt" + "os" + "path" "sync" "testing" - host2 "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host" + "github.com/hashicorp/consul/sdk/freeport" + "github.com/hyperledger-labs/fabric-smart-client/platform/common/utils" + endpoint2 "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/endpoint" + "github.com/hyperledger-labs/fabric-smart-client/platform/fabric/core/generic/msp/x509" + view3 "github.com/hyperledger-labs/fabric-smart-client/platform/view" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/core/endpoint" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/libp2p" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest/routing" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/comm/host/rest/websocket" + "github.com/hyperledger-labs/fabric-smart-client/platform/view/services/metrics/disabled" + view2 "github.com/hyperledger-labs/fabric-smart-client/platform/view/view" + . "github.com/onsi/gomega" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/otel/trace/noop" ) type HostNode struct { *P2PNode - ID host2.PeerID - Address host2.PeerIPAddress + ID host.PeerID + Address host.PeerIPAddress +} + +type Node struct { + commService *Service + address string + pkID view2.Identity } func P2PLayerTestRound(t *testing.T, bootstrapNode *HostNode, node *HostNode) { @@ -28,7 +51,7 @@ func P2PLayerTestRound(t *testing.T, bootstrapNode *HostNode, node *HostNode) { defer wg.Done() messages := bootstrapNode.incomingMessages - info := host2.StreamInfo{ + info := host.StreamInfo{ RemotePeerID: node.ID, RemotePeerAddress: node.Address, ContextID: "context", @@ -54,7 +77,7 @@ func P2PLayerTestRound(t *testing.T, bootstrapNode *HostNode, node *HostNode) { assert.NotNil(t, msg) assert.Equal(t, []byte("msg2"), msg.message.Payload) - info := host2.StreamInfo{ + info := host.StreamInfo{ RemotePeerID: bootstrapNode.ID, RemotePeerAddress: bootstrapNode.Address, ContextID: "context", @@ -165,3 +188,57 @@ func SessionsForMPCTestRound(t *testing.T, bootstrapNode *HostNode, node *HostNo bootstrapNode.Stop() node.Stop() } + +func freeAddress() string { + return fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", utils.MustGet(freeport.Take(1))[0]) +} + +func GetConfig(name string) *Config { + rootPath, ok := os.LookupEnv("GOPATH") + Expect(ok).To(BeTrue(), "GOPATH is not set") + projectPath := path.Join(rootPath, "src", "github.com", "hyperledger-labs", "fabric-smart-client") + mspDir := path.Join(projectPath, "integration", "fsc", "pingpong", "testdata", "fsc", "crypto", "peerOrganizations", "fsc.example.com", "peers", fmt.Sprintf("%s.fsc.example.com", name), "msp") + return &Config{ + ListenAddress: freeAddress(), + KeyFile: path.Join(mspDir, "keystore", "priv_sk"), + CertFile: path.Join(mspDir, "signcerts", fmt.Sprintf("%s.fsc.example.com-cert.pem", name)), + } +} + +type BootstrapNodeResolver struct { + nodeAddress string + nodeID []byte +} + +func (r *BootstrapNodeResolver) Resolve(view2.Identity) (view2.Identity, map[view3.PortName]string, []byte, error) { + return nil, map[view3.PortName]string{view3.P2PPort: rest.ConvertAddress(r.nodeAddress)}, r.nodeID, nil +} + +func (r *BootstrapNodeResolver) GetIdentity(string, []byte) (view2.Identity, error) { + return view2.Identity("bstpnd"), nil +} + +func NewWebsocketCommService(addresses *routing.StaticIDRouter, config *Config) *Service { + discovery := routing.NewServiceDiscovery(addresses, routing.Random[host.PeerIPAddress]()) + + pkiExtractor := endpoint.NewPKIExtractor() + utils.Must(pkiExtractor.AddPublicKeyExtractor(&PKExtractor{})) + pkiExtractor.SetPublicKeyIDSynthesizer(&rest.PKIDSynthesizer{}) + + hostProvider := rest.NewEndpointBasedProvider(pkiExtractor, discovery, noop.NewTracerProvider(), websocket.NewMultiplexedProvider(noop.NewTracerProvider(), &disabled.Provider{})) + + return newService(hostProvider, nil, config, noop.NewTracerProvider(), &disabled.Provider{}) +} + +func NewLibP2PCommService(config *Config, resolver EndpointService) (*Service, view2.Identity) { + pkiExtractor := endpoint.NewPKIExtractor() + utils.Must(pkiExtractor.AddPublicKeyExtractor(&PKExtractor{})) + utils.Must(pkiExtractor.AddPublicKeyExtractor(endpoint2.PublicKeyExtractor{})) + pkiExtractor.SetPublicKeyIDSynthesizer(&libp2p.PKIDSynthesizer{}) + + hostProvider := libp2p.NewHostGeneratorProvider(&disabled.Provider{}) + + pkID := pkiExtractor.ExtractPKI(utils.MustGet(x509.Serialize("", config.CertFile))) + + return newService(hostProvider, resolver, config, noop.NewTracerProvider(), &disabled.Provider{}), pkID +}