From 405b3617859c7c014fb814102e981d4470272e5a Mon Sep 17 00:00:00 2001 From: arrivets Date: Tue, 6 Apr 2021 07:56:11 +0100 Subject: [PATCH 1/2] Add support for NKN transport --- cmd/babble/commands/run.go | 3 ++ go.mod | 4 +-- src/babble/babble.go | 24 ++++++++++++- src/config/config.go | 3 ++ src/net/net_transport.go | 1 - src/net/nkn_stream_layer.go | 47 ++++++++++++++++++++++++++ src/net/nkn_transport.go | 67 +++++++++++++++++++++++++++++++++++++ src/net/tcp_transport.go | 13 +++++-- src/net/transport_test.go | 48 +++++++++++++++++++------- 9 files changed, 191 insertions(+), 19 deletions(-) create mode 100644 src/net/nkn_stream_layer.go create mode 100644 src/net/nkn_transport.go diff --git a/cmd/babble/commands/run.go b/cmd/babble/commands/run.go index d058a219..ca475015 100644 --- a/cmd/babble/commands/run.go +++ b/cmd/babble/commands/run.go @@ -86,6 +86,9 @@ func AddRunFlags(cmd *cobra.Command) { cmd.Flags().String("ice-username", _config.Babble.ICEUsername, "Username to authenticate to the ICE server") cmd.Flags().String("ice-password", _config.Babble.ICEPassword, "Password to authenticate to the ICE server") + // NKN + cmd.Flags().Bool("nkn", _config.Babble.NKN, "Use NKN transport") + // Proxy cmd.Flags().StringP("proxy-listen", "p", _config.ProxyAddr, "Listen IP:Port for babble proxy") cmd.Flags().StringP("client-connect", "c", _config.ClientAddr, "IP:Port to connect to client") diff --git a/go.mod b/go.mod index 7ba11e17..1d93b681 100644 --- a/go.mod +++ b/go.mod @@ -11,17 +11,17 @@ require ( github.com/magiconair/properties v1.8.1 // indirect github.com/mattn/go-colorable v0.1.2 // indirect github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b // indirect + github.com/nknorg/nkn-sdk-go v1.3.5 github.com/onsi/ginkgo v1.12.0 // indirect github.com/onsi/gomega v1.9.0 // indirect github.com/pion/datachannel v1.4.14 github.com/pion/webrtc/v2 v2.2.0 github.com/rifflock/lfshook v0.0.0-20180920164130-b9218ef580f5 - github.com/sirupsen/logrus v1.2.0 + github.com/sirupsen/logrus v1.4.2 github.com/spf13/afero v1.2.2 // indirect github.com/spf13/cobra v0.0.5 github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/viper v1.3.2 github.com/ugorji/go/codec v1.1.7 github.com/x-cray/logrus-prefixed-formatter v0.5.2 - golang.org/x/net v0.0.0-20200226121028-0de0cce0169b // indirect ) diff --git a/src/babble/babble.go b/src/babble/babble.go index 6ee22c37..34e7b1af 100644 --- a/src/babble/babble.go +++ b/src/babble/babble.go @@ -13,6 +13,7 @@ import ( "github.com/mosaicnetworks/babble/src/node" "github.com/mosaicnetworks/babble/src/peers" "github.com/mosaicnetworks/babble/src/service" + "github.com/nknorg/nkn-sdk-go" "github.com/sirupsen/logrus" ) @@ -124,7 +125,6 @@ func (b *Babble) validateConfig() error { logFields["babble.SignalSkipVerify"] = b.Config.SignalSkipVerify logFields["babble.ICEAddress"] = b.Config.ICEAddress logFields["babble.ICEUsername"] = b.Config.ICEUsername - } else { logFields["babble.BindAddr"] = b.Config.BindAddr logFields["babble.AdvertiseAddr"] = b.Config.AdvertiseAddr @@ -197,6 +197,28 @@ func (b *Babble) initTransport() error { } b.Transport = webRTCTransport + } else if b.Config.NKN { + nknAccount, err := nkn.NewAccount(keys.DumpPrivateKey(b.Config.Key)) + if err != nil { + return err + } + + nknTransport, err := net.NewNKNTransport( + nknAccount, + "", + 3, + nil, + 10*time.Second, + b.Config.MaxPool, + b.Config.TCPTimeout, + b.Config.JoinTimeout, + b.Config.Logger().WithField("component", "nkn-transport"), + ) + if err != nil { + return err + } + + b.Transport = nknTransport } else { tcpTransport, err := net.NewTCPTransport( b.Config.BindAddr, diff --git a/src/config/config.go b/src/config/config.go index 320fd71d..de4dcf00 100644 --- a/src/config/config.go +++ b/src/config/config.go @@ -186,6 +186,9 @@ type Config struct { // ICE server defined in ICEAddress. ICEPassword string `mapstructure:"ice-password"` + // XXX + NKN bool `mapstructure:"nkn"` + // Proxy is the application proxy that enables Babble to communicate with // the application. Proxy proxy.AppProxy diff --git a/src/net/net_transport.go b/src/net/net_transport.go index f77bb144..b1970863 100644 --- a/src/net/net_transport.go +++ b/src/net/net_transport.go @@ -120,7 +120,6 @@ func (n *NetworkTransport) Close() error { if !n.shutdown { close(n.shutdownCh) n.stream.Close() - n.shutdown = true } return nil diff --git a/src/net/nkn_stream_layer.go b/src/net/nkn_stream_layer.go new file mode 100644 index 00000000..082ad84d --- /dev/null +++ b/src/net/nkn_stream_layer.go @@ -0,0 +1,47 @@ +package net + +import ( + "net" + "time" + + "github.com/nknorg/nkn-sdk-go" +) + +// nknStreamLayer implements the StreamLayer interface over the nkn network. +type nknStreamLayer struct { + multiclient *nkn.MultiClient +} + +// Accept implements the net.Listener interface. +func (n *nknStreamLayer) Accept() (c net.Conn, err error) { + return n.multiclient.Accept() +} + +// implements the net.Listener interface. +func (n *nknStreamLayer) Close() (err error) { + return n.multiclient.Close() +} + +// Addr implements the net.Listener interface. +func (n *nknStreamLayer) Addr() net.Addr { + return n.multiclient.Addr() +} + +// Dial implements the StreamLayer interface. +func (n *nknStreamLayer) Dial(address string, timeout time.Duration) (net.Conn, error) { + dialConfig := &nkn.DialConfig{ + DialTimeout: int32(timeout.Milliseconds()), + } + + session, err := n.multiclient.DialWithConfig(address, dialConfig) + if err != nil { + return nil, err + } + + return session, nil +} + +// AdvertiseAddr implements the StreamLayer interface. +func (n *nknStreamLayer) AdvertiseAddr() string { + return n.multiclient.Address() +} diff --git a/src/net/nkn_transport.go b/src/net/nkn_transport.go new file mode 100644 index 00000000..12b787dd --- /dev/null +++ b/src/net/nkn_transport.go @@ -0,0 +1,67 @@ +package net + +import ( + "fmt" + "time" + + "github.com/nknorg/nkn-sdk-go" + "github.com/sirupsen/logrus" +) + +// NewNKNTransport implements a NetworkTransport that is built on top of a NKN +// StreamLayer. +func NewNKNTransport( + nknAccount *nkn.Account, + nknBaseIdentifier string, + nknNumSubClients int, + nknConfig *nkn.ClientConfig, + nknConnectTimeout time.Duration, + maxPool int, + timeout time.Duration, + joinTimeout time.Duration, + logger *logrus.Entry, +) (*NetworkTransport, error) { + return newNKNTransport( + nknAccount, + nknBaseIdentifier, + nknNumSubClients, + nknConfig, + nknConnectTimeout, + func(stream StreamLayer) *NetworkTransport { + return NewNetworkTransport(stream, maxPool, timeout, joinTimeout, logger) + }, + ) +} + +func newNKNTransport( + account *nkn.Account, + baseIdentifier string, + numSubClients int, + config *nkn.ClientConfig, + connectTimeout time.Duration, + transportCreator func(stream StreamLayer) *NetworkTransport, +) (*NetworkTransport, error) { + + multiclient, err := nkn.NewMultiClient(account, baseIdentifier, numSubClients, false, config) + if err != nil { + return nil, err + } + + select { + case <-time.After(connectTimeout): + return nil, fmt.Errorf("timeout waiting to connect to nkn") + case <-multiclient.OnConnect.C: + break + } + + err = multiclient.Listen(nil) + if err != nil { + return nil, err + } + + stream := &nknStreamLayer{ + multiclient: multiclient, + } + + return transportCreator(stream), nil +} diff --git a/src/net/tcp_transport.go b/src/net/tcp_transport.go index 31540bc8..e1bdb803 100644 --- a/src/net/tcp_transport.go +++ b/src/net/tcp_transport.go @@ -23,9 +23,16 @@ func NewTCPTransport( joinTimeout time.Duration, logger *logrus.Entry, ) (*NetworkTransport, error) { - return newTCPTransport(bindAddr, advertise, maxPool, timeout, joinTimeout, func(stream StreamLayer) *NetworkTransport { - return NewNetworkTransport(stream, maxPool, timeout, joinTimeout, logger) - }) + return newTCPTransport( + bindAddr, + advertise, + maxPool, + timeout, + joinTimeout, + func(stream StreamLayer) *NetworkTransport { + return NewNetworkTransport(stream, maxPool, timeout, joinTimeout, logger) + }, + ) } func newTCPTransport(bindAddr string, diff --git a/src/net/transport_test.go b/src/net/transport_test.go index 63a96916..393bf402 100644 --- a/src/net/transport_test.go +++ b/src/net/transport_test.go @@ -11,21 +11,25 @@ import ( "github.com/mosaicnetworks/babble/src/hashgraph" "github.com/mosaicnetworks/babble/src/net/signal/wamp" "github.com/mosaicnetworks/babble/src/peers" + "github.com/nknorg/nkn-sdk-go" ) const ( - INMEM = iota - TCP - WEBRTC + NKN = iota numTestTransports // NOTE: must be last + INMEM // NOTE: move up to include in tests + TCP // NOTE: move up to include in tests + WEBRTC // NOTE: move up to include in tests ) var ( - realm = config.DefaultSignalRealm - wampPort = 8443 - certFile = "signal/wamp/test_data/cert.pem" - keyFile = "signal/wamp/test_data/key.pem" - signalTimeout = 5 * time.Second + realm = config.DefaultSignalRealm + wampPort = 8443 + certFile = "signal/wamp/test_data/cert.pem" + keyFile = "signal/wamp/test_data/key.pem" + signalTimeout = 5 * time.Second + nknConnectTimeout = 10 * time.Second + listenTimeout = 10 * time.Second ) func NewTestTransport(ttype int, addr string, wampserver string, t *testing.T) Transport { @@ -65,6 +69,26 @@ func NewTestTransport(ttype int, addr string, wampserver string, t *testing.T) T } go wt.Listen() return wt + case NKN: + account, err := nkn.NewAccount(nil) + if err != nil { + t.Fatal(err) + } + nt, err := NewNKNTransport( + account, + "babble", + 3, + nil, + nknConnectTimeout, + 1, + 10*time.Second, + 10*time.Second, + common.NewTestEntry(t, common.TestLogLevel)) + if err != nil { + t.Fatal(err) + } + go nt.Listen() + return nt default: panic("Unknown transport type") } @@ -167,7 +191,7 @@ func TestTransport_Sync(t *testing.T) { rpc.Respond(&resp, nil) case <-stopCh: return - case <-time.After(signalTimeout): + case <-time.After(listenTimeout): t.Logf("consumer timeout") } }() @@ -248,7 +272,7 @@ func TestTransport_EagerSync(t *testing.T) { rpc.Respond(&resp, nil) case <-stopCh: return - case <-time.After(signalTimeout): + case <-time.After(listenTimeout): t.Logf("consumer timeout") } }() @@ -393,7 +417,7 @@ func TestTransport_FastForward(t *testing.T) { rpc.Respond(&resp, nil) case <-stopCh: return - case <-time.After(signalTimeout): + case <-time.After(listenTimeout): t.Logf("consumer timeout") } }() @@ -489,7 +513,7 @@ func TestTransport_Join(t *testing.T) { rpc.Respond(&resp, nil) case <-stopCh: return - case <-time.After(signalTimeout): + case <-time.After(listenTimeout): t.Logf("consumer timeout") } }() From 5a0a8144dbf46b4e856c53050e28903d97fb8a18 Mon Sep 17 00:00:00 2001 From: arrivets Date: Sun, 18 Apr 2021 19:37:12 +0100 Subject: [PATCH 2/2] Add printf statements for debugging --- src/babble/babble.go | 2 +- src/net/net_transport.go | 17 ++++++++++++----- src/net/transport_test.go | 17 ++++++++++++----- 3 files changed, 25 insertions(+), 11 deletions(-) diff --git a/src/babble/babble.go b/src/babble/babble.go index 34e7b1af..5e2a95dd 100644 --- a/src/babble/babble.go +++ b/src/babble/babble.go @@ -206,7 +206,7 @@ func (b *Babble) initTransport() error { nknTransport, err := net.NewNKNTransport( nknAccount, "", - 3, + 10, nil, 10*time.Second, b.Config.MaxPool, diff --git a/src/net/net_transport.go b/src/net/net_transport.go index b1970863..5721df1a 100644 --- a/src/net/net_transport.go +++ b/src/net/net_transport.go @@ -325,6 +325,9 @@ func (n *NetworkTransport) Listen() { "from": conn.RemoteAddr(), }).Debug("accepted connection") + // XXX + // conn.SetDeadline(time.Now().Add(n.timeout)) + // Handle the connection in dedicated routine go n.handleConn(conn) } @@ -332,6 +335,8 @@ func (n *NetworkTransport) Listen() { // handleConn is used to handle an inbound connection for its lifespan. func (n *NetworkTransport) handleConn(conn net.Conn) { + fmt.Printf("XXX [%s] handleConn\n", n.AdvertiseAddr()) + defer conn.Close() r := bufio.NewReaderSize(conn, bufSize) w := bufio.NewWriterSize(conn, bufSize) @@ -340,18 +345,17 @@ func (n *NetworkTransport) handleConn(conn net.Conn) { for { if err := n.handleCommand(r, dec, enc); err != nil { - if err == ErrTransportShutdown { - n.logger.WithField("error", err).Warn("Failed to decode incoming command") + n.logger.WithError(err).Warn("failed to handle command") } else { if err != io.EOF { - n.logger.WithField("error", err).Error("Failed to decode incoming command") + n.logger.WithError(err).Error("failed to handle command") } } return } if err := w.Flush(); err != nil { - n.logger.WithField("error", err).Error("Failed to flush response") + n.logger.WithField("error", err).Error("failed to flush response") return } } @@ -360,8 +364,10 @@ func (n *NetworkTransport) handleConn(conn net.Conn) { // handleCommand is used to decode and dispatch a single command. func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *json.Decoder, enc *json.Encoder) error { // Get the rpc type + fmt.Printf("XXX [%s] %v reading byte\n", n.AdvertiseAddr(), time.Now()) rpcType, err := r.ReadByte() if err != nil { + n.logger.WithError(err).Errorf("XXX [%s] failed to read first byte", n.AdvertiseAddr()) return err } @@ -401,6 +407,8 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *json.Decoder, enc return fmt.Errorf("unknown rpc type %d", rpcType) } + fmt.Printf("XXX [%s] %v command %d\n", n.AdvertiseAddr(), time.Now(), rpcType) + // Dispatch the RPC select { case n.consumeCh <- rpc: @@ -419,7 +427,6 @@ func (n *NetworkTransport) handleCommand(r *bufio.Reader, dec *json.Decoder, enc if err := enc.Encode(respErr); err != nil { return err } - // Send the response if err := enc.Encode(resp.Response); err != nil { return err diff --git a/src/net/transport_test.go b/src/net/transport_test.go index 393bf402..46f22d5f 100644 --- a/src/net/transport_test.go +++ b/src/net/transport_test.go @@ -29,7 +29,7 @@ var ( keyFile = "signal/wamp/test_data/key.pem" signalTimeout = 5 * time.Second nknConnectTimeout = 10 * time.Second - listenTimeout = 10 * time.Second + listenTimeout = 20 * time.Second ) func NewTestTransport(ttype int, addr string, wampserver string, t *testing.T) Transport { @@ -38,7 +38,14 @@ func NewTestTransport(ttype int, addr string, wampserver string, t *testing.T) T _, it := NewInmemTransport(addr) return it case TCP: - tt, err := NewTCPTransport(addr, "", 2, time.Second, 2*time.Second, common.NewTestEntry(t, common.TestLogLevel)) + tt, err := NewTCPTransport( + addr, + "", + 2, + time.Second, + 2*time.Second, + common.NewTestEntry(t, common.TestLogLevel).WithField("node", addr), + ) if err != nil { t.Fatal(err) } @@ -77,12 +84,12 @@ func NewTestTransport(ttype int, addr string, wampserver string, t *testing.T) T nt, err := NewNKNTransport( account, "babble", - 3, + 10, nil, nknConnectTimeout, 1, - 10*time.Second, - 10*time.Second, + 20*time.Second, + 20*time.Second, common.NewTestEntry(t, common.TestLogLevel)) if err != nil { t.Fatal(err)