Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/bootnode/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ func main() {
DialTimeout: time.Minute,
Muxer: *muxer,
NoRelay: *noRelay,
IsBootNode: true,
})
if err != nil {
utils.FatalErrMsg(err, "cannot initialize network")
Expand Down
1 change: 1 addition & 0 deletions cmd/harmony/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,7 @@ func createGlobalConfig(hc harmonyconfig.HarmonyConfig) (*nodeconfig.ConfigType,
DialTimeout: hc.P2P.DialTimeout,
Muxer: hc.P2P.Muxer,
NoRelay: hc.P2P.NoRelay,
IsBootNode: false,
})
if err != nil {
return nil, errors.Wrap(err, "cannot create P2P network host")
Expand Down
101 changes: 88 additions & 13 deletions p2p/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package discovery

import (
"context"
"sync"
"time"

"github.com/harmony-one/harmony/internal/utils"
dht "github.com/libp2p/go-libp2p-kad-dht"
libp2p_dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/discovery"
libp2p_host "github.com/libp2p/go-libp2p/core/host"
Expand All @@ -21,14 +21,16 @@ type Discovery interface {
Close() error
Advertise(ctx context.Context, ns string) (time.Duration, error)
FindPeers(ctx context.Context, ns string, peerLimit int) (<-chan libp2p_peer.AddrInfo, error)
GetRawDiscovery() discovery.Discovery
GetRawDiscovery() []discovery.Discovery
// todo(sun): revert in phase 2
// GetRawDiscovery() discovery.Discovery
}

// dhtDiscovery is a wrapper of libp2p dht discovery service. It implements Discovery
// interface.
type dhtDiscovery struct {
dht *libp2p_dht.IpfsDHT
disc discovery.Discovery
dht []*libp2p_dht.IpfsDHT
disc []discovery.Discovery
host libp2p_host.Host

opt DHTConfig
Expand All @@ -38,11 +40,15 @@ type dhtDiscovery struct {
}

// NewDHTDiscovery creates a new dhtDiscovery that implements Discovery interface.
func NewDHTDiscovery(ctx context.Context, cancel context.CancelFunc, host libp2p_host.Host, dht *dht.IpfsDHT, opt DHTConfig) (Discovery, error) {
d := libp2p_dis.NewRoutingDiscovery(dht)
func NewDHTDiscovery(ctx context.Context, cancel context.CancelFunc, host libp2p_host.Host, opt DHTConfig, dhts ...*libp2p_dht.IpfsDHT) (Discovery, error) {
var d []discovery.Discovery
for _, dht := range dhts {
d = append(d, libp2p_dis.NewRoutingDiscovery(dht))
}

logger := utils.Logger().With().Str("module", "discovery").Logger()
return &dhtDiscovery{
dht: dht,
dht: dhts,
disc: d,
host: host,
opt: opt,
Expand All @@ -54,28 +60,97 @@ func NewDHTDiscovery(ctx context.Context, cancel context.CancelFunc, host libp2p

// Start bootstrap the dht discovery service.
func (d *dhtDiscovery) Start() error {
return d.dht.Bootstrap(d.ctx)
for _, dht := range d.dht {
if err := dht.Bootstrap(d.ctx); err != nil {
return err
}
}
return nil

// todo(sun): revert in phase 2
// return d.dht.Bootstrap(d.ctx)
}

// Stop stop the dhtDiscovery service
func (d *dhtDiscovery) Close() error {
d.dht.Close()
for _, dht := range d.dht {
if err := dht.Close(); err != nil {
return err
}
}
d.cancel()
return nil

// todo(sun): revert in phase 2
// d.dht.Close()
// d.cancel()
// return nil
}

// Advertise advertises a service
func (d *dhtDiscovery) Advertise(ctx context.Context, ns string) (time.Duration, error) {
return d.disc.Advertise(ctx, ns)
var lastDur time.Duration
var lastErr error
for _, disc := range d.disc {
lastDur, lastErr = disc.Advertise(ctx, ns)
if lastErr != nil {
break
}
}
return lastDur, lastErr

// todo(sun): revert in phase 2
// return d.disc.Advertise(ctx, ns)
}

// FindPeers discovers peers providing a service
func (d *dhtDiscovery) FindPeers(ctx context.Context, ns string, peerLimit int) (<-chan libp2p_peer.AddrInfo, error) {
opt := discovery.Limit(peerLimit)
return d.disc.FindPeers(ctx, ns, opt)
mergedChan := make(chan libp2p_peer.AddrInfo)
var wg sync.WaitGroup
limitOpt := discovery.Limit(peerLimit)

// loop through each discovery instance (harmony and legacy, in bootnode's case)
for _, disc := range d.disc {
wg.Add(1)

// launch a goroutine for each DHT query
go func(disc discovery.Discovery) {
defer wg.Done()
peerChan, err := disc.FindPeers(ctx, ns, limitOpt)
if err != nil {
d.logger.Error().Err(err).Msg("Discovery failed in one of the DHTs")
return
}

// read peers from the current DHT chan and forward to the merged chan
for peer := range peerChan {
select {
case mergedChan <- peer:
case <-ctx.Done():
return
}
}
}(disc)
}

// close the merged chan onceboth DHT queries are completed
go func() {
wg.Wait()
close(mergedChan)
}()

// immediately return merged chan
return mergedChan, nil

// todo(sun): revert in phase 2
// opt := discovery.Limit(peerLimit)
// return d.disc.FindPeers(ctx, ns, opt)
}

// GetRawDiscovery get the raw discovery to be used for libp2p pubsub options
func (d *dhtDiscovery) GetRawDiscovery() discovery.Discovery {
// todo(sun): libp2p pubsub option only accepts a single discover.Discovery as option
// todo(sun): revert in phase 2
// func (d *dhtDiscovery) GetRawDiscovery() discovery.Discovery {
func (d *dhtDiscovery) GetRawDiscovery() []discovery.Discovery {
return d.disc
}
32 changes: 28 additions & 4 deletions p2p/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ const (
MaxMessageHandlers = SetAsideForConsensus + SetAsideOtherwise
// MaxMessageSize is 2Mb
MaxMessageSize = 1 << 21
// Harmony specific kad-dht protocol
HarmonyKadPrefix = "/harmony/kad/1.0.0"
)

// HostConfig is the config structure to create a new host
Expand All @@ -127,6 +129,7 @@ type HostConfig struct {
DialTimeout time.Duration
Muxer string
NoRelay bool
IsBootNode bool
}

func init() {
Expand All @@ -150,6 +153,7 @@ func NewHost(cfg HostConfig) (Host, error) {
key = cfg.BLSKey
pub = cfg.BLSKey.GetPublic()
dataStorePath = cfg.DataStoreFile
isBootNode = cfg.IsBootNode
)

pubKey := key.GetPublic()
Expand Down Expand Up @@ -344,12 +348,32 @@ func NewHost(cfg HostConfig) (Host, error) {
cancel()
return nil, errors.Wrapf(err, "initialize libp2p raw options failed")
}
idht, errDHT := dht.New(ctx, p2pHost, opts...)
if errDHT != nil {

// todo(sun): for now, bootnodes should support both ipfs and harmony kad protocol
// todo(sun): all other nodes should only support harmony kad protocol
// bootnode supports both ipfs and harmony kad protocol during transition period
dhts := []*dht.IpfsDHT{}
if isBootNode {
utils.Logger().Info().Msg("Bootnode is configured to support the legacy IPFS KAD protocol for transition.")
legacyOpts := append(opts, dht.ProtocolPrefix(protocol.ID("/ipfs/kad/1.0.0")))
legacyDht, err := dht.New(ctx, p2pHost, legacyOpts...)
if err != nil {
cancel()
return nil, errors.Wrapf(err, "cannot initialize legacy libp2p DHT")
}
dhts = append(dhts, legacyDht)
}

// all other nodes should only support harmony kad protocol
harmonyOpts := append(opts, dht.ProtocolPrefix(protocol.ID(HarmonyKadPrefix)))
harmonyDht, err := dht.New(ctx, p2pHost, harmonyOpts...)
if err != nil {
cancel()
return nil, errors.Wrapf(errDHT, "cannot initialize libp2p DHT")
return nil, errors.Wrapf(err, "cannot initialize harmony libp2p DHT")
}
disc, err := discovery.NewDHTDiscovery(ctx, cancel, p2pHost, idht, opt)
dhts = append(dhts, harmonyDht)

disc, err := discovery.NewDHTDiscovery(ctx, cancel, p2pHost, opt, dhts...)
if err != nil {
cancel()
p2pHost.Close()
Expand Down
4 changes: 3 additions & 1 deletion p2p/stream/protocols/sync/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ func (disc *testDiscovery) FindPeers(ctx context.Context, ns string, peerLimit i
return nil, nil
}

func (disc *testDiscovery) GetRawDiscovery() discovery.Discovery {
// todo(sun): revert in phase 2
// func (disc *testDiscovery) GetRawDiscovery() discovery.Discovery {
func (disc *testDiscovery) GetRawDiscovery() []discovery.Discovery {
return nil
}