-
-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Expand file tree
/
Copy pathbitswap.go
More file actions
146 lines (127 loc) · 4.65 KB
/
bitswap.go
File metadata and controls
146 lines (127 loc) · 4.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package node
import (
"context"
"time"
"github.com/ipfs/boxo/bitswap"
"github.com/ipfs/boxo/bitswap/client"
bsnet "github.com/ipfs/boxo/bitswap/network/bsnet"
blockstore "github.com/ipfs/boxo/blockstore"
exchange "github.com/ipfs/boxo/exchange"
"github.com/ipfs/boxo/exchange/providing"
provider "github.com/ipfs/boxo/provider"
rpqm "github.com/ipfs/boxo/routing/providerquerymanager"
"github.com/ipfs/kubo/config"
irouting "github.com/ipfs/kubo/routing"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/routing"
"go.uber.org/fx"
"github.com/ipfs/kubo/core/node/helpers"
)
// Docs: https://github.com/ipfs/kubo/blob/master/docs/config.md#internalbitswap
const (
DefaultEngineBlockstoreWorkerCount = 128
DefaultTaskWorkerCount = 8
DefaultEngineTaskWorkerCount = 8
DefaultMaxOutstandingBytesPerPeer = 1 << 20
DefaultProviderSearchDelay = 1000 * time.Millisecond
DefaultMaxProviders = 10 // matching BitswapClientDefaultMaxProviders from https://github.com/ipfs/boxo/blob/v0.29.1/bitswap/internal/defaults/defaults.go#L15
DefaultWantHaveReplaceSize = 1024
)
type bitswapOptionsOut struct {
fx.Out
BitswapOpts []bitswap.Option `group:"bitswap-options,flatten"`
}
// BitswapOptions creates configuration options for Bitswap from the config file
// and whether to provide data.
func BitswapOptions(cfg *config.Config) interface{} {
return func() bitswapOptionsOut {
var internalBsCfg config.InternalBitswap
if cfg.Internal.Bitswap != nil {
internalBsCfg = *cfg.Internal.Bitswap
}
opts := []bitswap.Option{
bitswap.ProviderSearchDelay(internalBsCfg.ProviderSearchDelay.WithDefault(DefaultProviderSearchDelay)), // See https://github.com/ipfs/go-ipfs/issues/8807 for rationale
bitswap.EngineBlockstoreWorkerCount(int(internalBsCfg.EngineBlockstoreWorkerCount.WithDefault(DefaultEngineBlockstoreWorkerCount))),
bitswap.TaskWorkerCount(int(internalBsCfg.TaskWorkerCount.WithDefault(DefaultTaskWorkerCount))),
bitswap.EngineTaskWorkerCount(int(internalBsCfg.EngineTaskWorkerCount.WithDefault(DefaultEngineTaskWorkerCount))),
bitswap.MaxOutstandingBytesPerPeer(int(internalBsCfg.MaxOutstandingBytesPerPeer.WithDefault(DefaultMaxOutstandingBytesPerPeer))),
bitswap.WithWantHaveReplaceSize(int(internalBsCfg.WantHaveReplaceSize.WithDefault(DefaultWantHaveReplaceSize))),
}
return bitswapOptionsOut{BitswapOpts: opts}
}
}
type bitswapIn struct {
fx.In
Mctx helpers.MetricsCtx
Cfg *config.Config
Host host.Host
Rt irouting.ProvideManyRouter
Bs blockstore.GCBlockstore
BitswapOpts []bitswap.Option `group:"bitswap-options"`
}
// Bitswap creates the BitSwap server/client instance.
// Additional options to bitswap.New can be provided via the "bitswap-options"
// group.
func Bitswap(provide bool) interface{} {
return func(in bitswapIn, lc fx.Lifecycle) (*bitswap.Bitswap, error) {
bitswapNetwork := bsnet.NewFromIpfsHost(in.Host)
var provider routing.ContentDiscovery
if provide {
var maxProviders int = DefaultMaxProviders
if in.Cfg.Internal.Bitswap != nil {
maxProviders = int(in.Cfg.Internal.Bitswap.ProviderSearchMaxResults.WithDefault(DefaultMaxProviders))
}
pqm, err := rpqm.New(bitswapNetwork,
in.Rt,
rpqm.WithMaxProviders(maxProviders),
rpqm.WithIgnoreProviders(in.Cfg.Routing.IgnoreProviders...),
)
if err != nil {
return nil, err
}
in.BitswapOpts = append(in.BitswapOpts, bitswap.WithClientOption(client.WithDefaultProviderQueryManager(false)))
provider = pqm
}
bs := bitswap.New(helpers.LifecycleCtx(in.Mctx, lc), bitswapNetwork, provider, in.Bs, in.BitswapOpts...)
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return bs.Close()
},
})
return bs, nil
}
}
// OnlineExchange creates new LibP2P backed block exchange.
func OnlineExchange() interface{} {
return func(in *bitswap.Bitswap, lc fx.Lifecycle) exchange.Interface {
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return in.Close()
},
})
return in
}
}
type providingExchangeIn struct {
fx.In
BaseExch exchange.Interface
Provider provider.System
}
// ProvidingExchange creates a providing.Exchange with the existing exchange
// and the provider.System.
// We cannot do this in OnlineExchange because it causes cycles so this is for
// a decorator.
func ProvidingExchange(provide bool) interface{} {
return func(in providingExchangeIn, lc fx.Lifecycle) exchange.Interface {
exch := in.BaseExch
if provide {
exch = providing.New(in.BaseExch, in.Provider)
lc.Append(fx.Hook{
OnStop: func(ctx context.Context) error {
return exch.Close()
},
})
}
return exch
}
}