Skip to content

feat(share/discovery): persist discovered peers #2210

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 68 additions & 2 deletions share/availability/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package discovery

import (
"context"
"encoding/json"
"fmt"
"sync"
"time"

"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/event"
Expand Down Expand Up @@ -82,7 +85,9 @@ func NewDiscovery(
}
}

func (d *Discovery) Start(context.Context) error {
func (d *Discovery) Start(ctx context.Context) error {
d.loadPeers(ctx)

ctx, cancel := context.WithCancel(context.Background())
d.cancel = cancel

Expand All @@ -102,7 +107,8 @@ func (d *Discovery) Start(context.Context) error {
return nil
}

func (d *Discovery) Stop(context.Context) error {
func (d *Discovery) Stop(ctx context.Context) error {
d.dumpPeers(ctx)
d.cancel()
return nil
}
Expand Down Expand Up @@ -309,6 +315,45 @@ func (d *Discovery) discover(ctx context.Context) bool {
}
}

// loadPeers loads and connects to persisted peers from datastore if registered
func (d *Discovery) loadPeers(ctx context.Context) {
if d.params.datastore == nil {
return
}

peerInfos, err := loadPeers(ctx, d.params.datastore)
if err != nil {
log.Warnw("failed to load persisted peers", "error", err)
return
}

// TODO(@Wondertan): To wait or not to wait?
var wg sync.WaitGroup
wg.Add(len(peerInfos))
defer wg.Wait()
for _, pi := range peerInfos {
go func(pi peer.AddrInfo) {
d.handleDiscoveredPeer(ctx, pi)
wg.Done()
}(pi)
}
}

// dumpPeers dumps discovered peers to datastore if registered
func (d *Discovery) dumpPeers(ctx context.Context) {
if d.params.datastore == nil {
return
}

// TODO(@Wondertan): We should be using persistent peerstore instead
// and store here only peer ID pointers
peerInfos := d.set.peerInfos(d.host.Peerstore())
err := dumpPeers(ctx, d.params.datastore, peerInfos)
if err != nil {
log.Warnw("failed to persist peers", "error", err)
}
}

// handleDiscoveredPeer adds peer to the internal if can connect or is connected.
// Report whether it succeeded.
func (d *Discovery) handleDiscoveredPeer(ctx context.Context, peer peer.AddrInfo) bool {
Expand Down Expand Up @@ -372,3 +417,24 @@ func drainChannel(c <-chan time.Time) {
}
}
}

var dsKey = datastore.RawKey("/discovery/dump")

func dumpPeers(ctx context.Context, ds datastore.Batching, infos []peer.AddrInfo) error {
dump, err := json.Marshal(infos)
if err != nil {
return err
}

return ds.Put(ctx, dsKey, dump)
}

func loadPeers(ctx context.Context, ds datastore.Batching) ([]peer.AddrInfo, error) {
dump, err := ds.Get(ctx, dsKey)
if err != nil {
return nil, err
}

var infos []peer.AddrInfo
return infos, json.Unmarshal(dump, &infos)
}
80 changes: 80 additions & 0 deletions share/availability/discovery/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/ipfs/go-datastore"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/discovery"
"github.com/libp2p/go-libp2p/core/host"
Expand Down Expand Up @@ -72,6 +73,58 @@ func TestDiscovery(t *testing.T) {
assert.EqualValues(t, 0, peerA.set.Size())
}

func TestDiscoveryPersistence(t *testing.T) {
const nodes = 2

discoveryRetryTimeout = time.Millisecond * 100 // defined in discovery.go

ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
t.Cleanup(cancel)

tn := newTestnet(ctx, t)

ds := datastore.NewMapDatastore()
peerA := tn.discovery(
WithPeersLimit(nodes),
WithAdvertiseInterval(-1),
WithPersistence(ds),
)

evt := make(chan struct{})
peerA.WithOnPeersUpdate(func(peerID peer.ID, isAdded bool) {
evt <- struct{}{}
})

discs := make([]*Discovery, nodes)
for range discs {
tn.discovery(
WithPeersLimit(nodes),
WithAdvertiseInterval(time.Millisecond*100),
)

select {
case <-evt:
case <-ctx.Done():
t.Fatal("did not discover peer in time")
}
}

err := peerA.Stop(ctx)
require.NoError(t, err)

// make new borker discovery that won't discover anything
// over datastore with persisted peers
peerANew := tn.borkenDiscovery(
WithPeersLimit(nodes),
WithAdvertiseInterval(-1),
WithPersistence(ds),
)

peers, err := peerANew.Peers(ctx)
require.NoError(t, err)
assert.Len(t, peers, nodes)
}

type testnet struct {
ctx context.Context
T *testing.T
Expand Down Expand Up @@ -109,6 +162,18 @@ func (t *testnet) discovery(opts ...Option) *Discovery {
return disc
}

func (t *testnet) borkenDiscovery(opts ...Option) *Discovery {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo brokenDiscovery

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was intentional 😈

hst, _ := t.peer()
disc := NewDiscovery(hst, &dummyDiscovery{}, opts...)
err := disc.Start(t.ctx)
require.NoError(t.T, err)
t.T.Cleanup(func() {
err := disc.Stop(t.ctx)
require.NoError(t.T, err)
})
return disc
}

func (t *testnet) peer() (host.Host, discovery.Discovery) {
swarm := swarmt.GenSwarm(t.T, swarmt.OptDisableTCP)
hst, err := basic.NewHost(swarm, &basic.HostOpts{})
Expand All @@ -131,3 +196,18 @@ func (t *testnet) peer() (host.Host, discovery.Discovery) {

return hst, routing.NewRoutingDiscovery(dht)
}

type dummyDiscovery struct{}

func (d *dummyDiscovery) Advertise(context.Context, string, ...discovery.Option) (time.Duration, error) {
return time.Hour, nil
}

func (d *dummyDiscovery) FindPeers(context.Context, string, ...discovery.Option) (<-chan peer.AddrInfo, error) {
retCh := make(chan peer.AddrInfo)
go func() {
time.Sleep(time.Second)
close(retCh)
}()
return retCh, nil
}
10 changes: 10 additions & 0 deletions share/availability/discovery/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package discovery
import (
"fmt"
"time"

"github.com/ipfs/go-datastore"
)

// Parameters is the set of Parameters that must be configured for the Discovery module
Expand All @@ -14,6 +16,8 @@ type Parameters struct {
// Set -1 to disable.
// NOTE: only full and bridge can advertise themselves.
AdvertiseInterval time.Duration

datastore datastore.Batching
}

// Option is a function that configures Discovery Parameters
Expand Down Expand Up @@ -57,3 +61,9 @@ func WithAdvertiseInterval(advInterval time.Duration) Option {
p.AdvertiseInterval = advInterval
}
}

func WithPersistence(ds datastore.Batching) Option {
return func(p *Parameters) {
p.datastore = ds
}
}
13 changes: 12 additions & 1 deletion share/availability/discovery/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import (
"sync"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
)

// limitedSet is a thread safe set of peers with given limit.
// Inspired by libp2p peer.Set but extended with Remove method.
type limitedSet struct {
lk sync.RWMutex
ps map[peer.ID]struct{}
Expand Down Expand Up @@ -91,3 +91,14 @@ func (ps *limitedSet) Peers(ctx context.Context) ([]peer.ID, error) {
return []peer.ID{p}, nil
}
}

func (ps *limitedSet) peerInfos(pstore peerstore.Peerstore) []peer.AddrInfo {
infos := make([]peer.AddrInfo, 0, len(ps.ps))
for p := range ps.ps {
infos = append(infos, peer.AddrInfo{
ID: p,
Addrs: pstore.Addrs(p),
})
}
return infos
}