Skip to content

Commit e847419

Browse files
vyzogammazero
authored andcommitted
add default validator to pubsub
1 parent 1ca0ae0 commit e847419

File tree

1 file changed

+56
-7
lines changed

1 file changed

+56
-7
lines changed

core/node/libp2p/pubsub.go

Lines changed: 56 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,75 @@
11
package libp2p
22

33
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
8+
datastore "github.com/ipfs/go-datastore"
49
pubsub "github.com/libp2p/go-libp2p-pubsub"
510
"github.com/libp2p/go-libp2p/core/discovery"
611
"github.com/libp2p/go-libp2p/core/host"
12+
"github.com/libp2p/go-libp2p/core/peer"
713
"go.uber.org/fx"
814

915
"github.com/ipfs/kubo/core/node/helpers"
16+
"github.com/ipfs/kubo/repo"
1017
)
1118

19+
type P2PPubSubIn struct {
20+
fx.In
21+
22+
Repo repo.Repo
23+
Host host.Host
24+
Discovery discovery.Discovery
25+
}
26+
1227
func FloodSub(pubsubOptions ...pubsub.Option) interface{} {
13-
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, disc discovery.Discovery) (service *pubsub.PubSub, err error) {
14-
return pubsub.NewFloodSub(helpers.LifecycleCtx(mctx, lc), host, append(pubsubOptions, pubsub.WithDiscovery(disc))...)
28+
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PPubSubIn) (service *pubsub.PubSub, err error) {
29+
return pubsub.NewFloodSub(
30+
helpers.LifecycleCtx(mctx, lc),
31+
params.Host,
32+
append(pubsubOptions,
33+
pubsub.WithDiscovery(params.Discovery),
34+
pubsub.WithDefaultValidator(pubsub.NewBasicSeqnoValidator(makePubSubMetadataStore(params.Repo.Datastore()))))...,
35+
)
1536
}
1637
}
1738

1839
func GossipSub(pubsubOptions ...pubsub.Option) interface{} {
19-
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, disc discovery.Discovery) (service *pubsub.PubSub, err error) {
20-
return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, append(
21-
pubsubOptions,
22-
pubsub.WithDiscovery(disc),
23-
pubsub.WithFloodPublish(true))...,
40+
return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PPubSubIn) (service *pubsub.PubSub, err error) {
41+
return pubsub.NewGossipSub(
42+
helpers.LifecycleCtx(mctx, lc),
43+
params.Host,
44+
append(
45+
pubsubOptions,
46+
pubsub.WithDiscovery(params.Discovery),
47+
pubsub.WithFloodPublish(true),
48+
pubsub.WithDefaultValidator(pubsub.NewBasicSeqnoValidator(makePubSubMetadataStore(params.Repo.Datastore()))))...,
2449
)
2550
}
2651
}
52+
53+
func makePubSubMetadataStore(ds datastore.Datastore) pubsub.PeerMetadataStore {
54+
return &pubsubMetadataStore{ds: ds}
55+
}
56+
57+
type pubsubMetadataStore struct {
58+
ds datastore.Datastore
59+
}
60+
61+
func (m *pubsubMetadataStore) Get(ctx context.Context, p peer.ID) ([]byte, error) {
62+
k := datastore.NewKey(fmt.Sprintf("/pubsub/seqno/%s", p))
63+
64+
v, err := m.ds.Get(ctx, k)
65+
if err != nil && errors.Is(err, datastore.ErrNotFound) {
66+
return nil, nil
67+
}
68+
69+
return v, err
70+
}
71+
72+
func (m *pubsubMetadataStore) Put(ctx context.Context, p peer.ID, v []byte) error {
73+
k := datastore.NewKey(fmt.Sprintf("/pubsub/seqno/%s", p))
74+
return m.ds.Put(ctx, k, v)
75+
}

0 commit comments

Comments
 (0)