Skip to content

Unit test for websocket comm service #870

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
71 changes: 8 additions & 63 deletions platform/view/core/endpoint/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package endpoint
import (
"bytes"
"net"
"reflect"
"strings"
"sync"

Expand All @@ -19,7 +18,6 @@ import (
"github.com/hyperledger-labs/fabric-smart-client/platform/view/driver"
"github.com/hyperledger-labs/fabric-smart-client/platform/view/view"
"github.com/pkg/errors"
"go.uber.org/zap/zapcore"
"golang.org/x/exp/slices"
)

Expand Down Expand Up @@ -69,17 +67,14 @@ type Service struct {
resolversMutex sync.RWMutex
bindingKVS driver2.BindingStore

pkiExtractorsLock sync.RWMutex
publicKeyExtractors []driver.PublicKeyExtractor
publicKeyIDSynthesizer driver.PublicKeyIDSynthesizer
pkiExtractor *PKIExtractor
}

// NewService returns a new instance of the view-sdk endpoint service
func NewService(bindingKVS driver2.BindingStore) (*Service, error) {
er := &Service{
bindingKVS: bindingKVS,
publicKeyExtractors: []driver.PublicKeyExtractor{},
publicKeyIDSynthesizer: DefaultPublicKeyIDSynthesizer{},
bindingKVS: bindingKVS,
pkiExtractor: NewPKIExtractor(),
}
return er, nil
}
Expand All @@ -89,7 +84,7 @@ func (r *Service) Resolve(party view.Identity) (driver.Resolver, []byte, error)
if err != nil {
return nil, nil, err
}
return resolver, r.pkiResolve(resolver), nil
return resolver, r.pkiExtractor.PkiResolve(resolver), nil
}

func (r *Service) GetResolver(party view.Identity) (driver.Resolver, error) {
Expand Down Expand Up @@ -161,7 +156,7 @@ func (r *Service) matchesResolver(endpoint string, pkID []byte, resolver *Resolv
}

return len(pkID) > 0 && (bytes.Equal(pkID, resolver.Id) ||
bytes.Equal(pkID, r.pkiResolve(resolver)))
bytes.Equal(pkID, r.pkiExtractor.PkiResolve(resolver)))
}

func (r *Service) AddResolver(name string, domain string, addresses map[string]string, aliases []string, id []byte) (view.Identity, error) {
Expand Down Expand Up @@ -203,54 +198,15 @@ func (r *Service) AddResolver(name string, domain string, addresses map[string]s
}

func (r *Service) AddPublicKeyExtractor(publicKeyExtractor driver.PublicKeyExtractor) error {
r.pkiExtractorsLock.Lock()
defer r.pkiExtractorsLock.Unlock()

if publicKeyExtractor == nil {
return errors.New("pki resolver should not be nil")
}
r.publicKeyExtractors = append(r.publicKeyExtractors, publicKeyExtractor)
return nil
return r.pkiExtractor.AddPublicKeyExtractor(publicKeyExtractor)
}

func (r *Service) SetPublicKeyIDSynthesizer(publicKeyIDSynthesizer driver.PublicKeyIDSynthesizer) {
r.publicKeyIDSynthesizer = publicKeyIDSynthesizer
}

func (r *Service) pkiResolve(resolver *Resolver) []byte {
resolver.PKILock.RLock()
if len(resolver.PKI) != 0 {
resolver.PKILock.RUnlock()
return resolver.PKI
}
resolver.PKILock.RUnlock()

resolver.PKILock.Lock()
defer resolver.PKILock.Unlock()
if len(resolver.PKI) == 0 {
resolver.PKI = r.ExtractPKI(resolver.Id)
}
return resolver.PKI
r.pkiExtractor.SetPublicKeyIDSynthesizer(publicKeyIDSynthesizer)
}

func (r *Service) ExtractPKI(id []byte) []byte {
r.pkiExtractorsLock.RLock()
defer r.pkiExtractorsLock.RUnlock()

for _, extractor := range r.publicKeyExtractors {
if pk, err := extractor.ExtractPublicKey(id); pk != nil {
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("pki resolved for [%s]", id)
}
return r.publicKeyIDSynthesizer.PublicKeyID(pk)
} else {
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("pki not resolved by [%s] for [%s]: [%s]", getIdentifier(extractor), id, err)
}
}
}
logger.Warnf("cannot resolve pki for [%s]", id)
return nil
return r.pkiExtractor.ExtractPKI(id)
}

func (r *Service) rootEndpoint(party view.Identity) (*Resolver, error) {
Expand Down Expand Up @@ -296,14 +252,3 @@ func LookupIPv4(endpoint string) string {
port := s[1]
return net.JoinHostPort(addrS, port)
}

func getIdentifier(f any) string {
if f == nil {
return "<nil view>"
}
t := reflect.TypeOf(f)
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
return t.PkgPath() + "/" + t.Name()
}
2 changes: 1 addition & 1 deletion platform/view/core/endpoint/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestPKIResolveConcurrency(t *testing.T) {
for i := 0; i < 100; i++ {
go func() {
defer wg.Done()
svc.pkiResolve(resolver)
svc.pkiExtractor.PkiResolve(resolver)
}()
}
wg.Wait()
Expand Down
91 changes: 91 additions & 0 deletions platform/view/core/endpoint/extractor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package endpoint

/*
Copyright IBM Corp. All Rights Reserved.

SPDX-License-Identifier: Apache-2.0
*/

import (
"reflect"
"sync"

"github.com/hyperledger-labs/fabric-smart-client/platform/view/driver"
"github.com/pkg/errors"
"go.uber.org/zap/zapcore"
)

func NewPKIExtractor() *PKIExtractor {
return &PKIExtractor{
publicKeyExtractors: []driver.PublicKeyExtractor{},
publicKeyIDSynthesizer: DefaultPublicKeyIDSynthesizer{},
}
}

type PKIExtractor struct {
pkiExtractorsLock sync.RWMutex
publicKeyExtractors []driver.PublicKeyExtractor
publicKeyIDSynthesizer driver.PublicKeyIDSynthesizer
}

func (r *PKIExtractor) AddPublicKeyExtractor(publicKeyExtractor driver.PublicKeyExtractor) error {
r.pkiExtractorsLock.Lock()
defer r.pkiExtractorsLock.Unlock()

if publicKeyExtractor == nil {
return errors.New("pki resolver should not be nil")
}
r.publicKeyExtractors = append(r.publicKeyExtractors, publicKeyExtractor)
return nil
}

func (r *PKIExtractor) SetPublicKeyIDSynthesizer(publicKeyIDSynthesizer driver.PublicKeyIDSynthesizer) {
r.publicKeyIDSynthesizer = publicKeyIDSynthesizer
}

func (r *PKIExtractor) PkiResolve(resolver *Resolver) []byte {
resolver.PKILock.RLock()
if len(resolver.PKI) != 0 {
resolver.PKILock.RUnlock()
return resolver.PKI
}
resolver.PKILock.RUnlock()

resolver.PKILock.Lock()
defer resolver.PKILock.Unlock()
if len(resolver.PKI) == 0 {
resolver.PKI = r.ExtractPKI(resolver.Id)
}
return resolver.PKI
}

func (r *PKIExtractor) ExtractPKI(id []byte) []byte {
r.pkiExtractorsLock.RLock()
defer r.pkiExtractorsLock.RUnlock()

for _, extractor := range r.publicKeyExtractors {
if pk, err := extractor.ExtractPublicKey(id); pk != nil {
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("pki resolved for [%s]", id)
}
return r.publicKeyIDSynthesizer.PublicKeyID(pk)
} else {
if logger.IsEnabledFor(zapcore.DebugLevel) {
logger.Debugf("pki not resolved by [%s] for [%s]: [%s]", getIdentifier(extractor), id, err)
}
}
}
logger.Warnf("cannot resolve pki for [%s]", id)
return nil
}

func getIdentifier(f any) string {
if f == nil {
return "<nil view>"
}
t := reflect.TypeOf(f)
for t.Kind() == reflect.Ptr {
t = t.Elem()
}
return t.PkgPath() + "/" + t.Name()
}
32 changes: 24 additions & 8 deletions platform/view/services/comm/comm.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,39 @@ type ConfigService interface {
type Service struct {
HostProvider host.GeneratorProvider
EndpointService EndpointService
ConfigService ConfigService
config *Config

Node *P2PNode
NodeSync sync.RWMutex
tracerProvider trace.TracerProvider
metricsProvider metrics.Provider
}

type Config struct {
ListenAddress string
BootstrapNode string
KeyFile string
CertFile string
}

func NewService(hostProvider host.GeneratorProvider, endpointService EndpointService, configService ConfigService, tracerProvider trace.TracerProvider, metricsProvider metrics.Provider) (*Service, error) {
s := &Service{
config := &Config{
ListenAddress: configService.GetString("fsc.p2p.listenAddress"),
BootstrapNode: configService.GetString("fsc.p2p.opts.bootstrapNode"),
KeyFile: configService.GetPath("fsc.identity.key.file"),
CertFile: configService.GetPath("fsc.identity.cert.file"),
}
return newService(hostProvider, endpointService, config, tracerProvider, metricsProvider), nil
}

func newService(hostProvider host.GeneratorProvider, endpointService EndpointService, config *Config, tracerProvider trace.TracerProvider, metricsProvider metrics.Provider) *Service {
return &Service{
HostProvider: hostProvider,
EndpointService: endpointService,
ConfigService: configService,
config: config,
tracerProvider: tracerProvider,
metricsProvider: metricsProvider,
}
return s, nil
}

func (s *Service) Start(ctx context.Context) {
Expand Down Expand Up @@ -123,10 +139,10 @@ func (s *Service) init() error {
return nil
}

p2pListenAddress := s.ConfigService.GetString("fsc.p2p.listenAddress")
p2pBootstrapNode := s.ConfigService.GetString("fsc.p2p.opts.bootstrapNode")
keyFile := s.ConfigService.GetPath("fsc.identity.key.file")
certFile := s.ConfigService.GetPath("fsc.identity.cert.file")
p2pListenAddress := s.config.ListenAddress
p2pBootstrapNode := s.config.BootstrapNode
keyFile := s.config.KeyFile
certFile := s.config.CertFile

if len(p2pBootstrapNode) == 0 {
// this is a bootstrap node
Expand Down
Loading
Loading