Skip to content

Commit c19ef6f

Browse files
committed
chore: update boxo for bitswap providing refactor
Keeps in sync with ipfs/boxo#528
1 parent 1649469 commit c19ef6f

File tree

20 files changed

+108
-69
lines changed

20 files changed

+108
-69
lines changed

core/commands/bitswap.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,6 @@ var bitswapStatCmd = &cmds.Command{
134134
human, _ := req.Options[bitswapHumanOptionName].(bool)
135135

136136
fmt.Fprintln(w, "bitswap status")
137-
fmt.Fprintf(w, "\tprovides buffer: %d / %d\n", s.ProvideBufLen, bitswap.HasBlockBufferSize)
138137
fmt.Fprintf(w, "\tblocks received: %d\n", s.BlocksReceived)
139138
fmt.Fprintf(w, "\tblocks sent: %d\n", s.BlocksSent)
140139
if human {

core/commands/files.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@ import (
1414
"github.com/ipfs/kubo/core"
1515
"github.com/ipfs/kubo/core/commands/cmdenv"
1616

17-
bservice "github.com/ipfs/boxo/blockservice"
18-
offline "github.com/ipfs/boxo/exchange/offline"
1917
dag "github.com/ipfs/boxo/ipld/merkledag"
2018
ft "github.com/ipfs/boxo/ipld/unixfs"
2119
mfs "github.com/ipfs/boxo/mfs"
@@ -162,11 +160,7 @@ var filesStatCmd = &cmds.Command{
162160

163161
var dagserv ipld.DAGService
164162
if withLocal {
165-
// an offline DAGService will not fetch from the network
166-
dagserv = dag.NewDAGService(bservice.New(
167-
node.Blockstore,
168-
offline.Exchange(node.Blockstore),
169-
))
163+
dagserv = node.OfflineDAG
170164
} else {
171165
dagserv = node.DAG
172166
}

core/commands/pin/pin.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ import (
88
"os"
99
"time"
1010

11-
bserv "github.com/ipfs/boxo/blockservice"
12-
offline "github.com/ipfs/boxo/exchange/offline"
1311
dag "github.com/ipfs/boxo/ipld/merkledag"
1412
verifcid "github.com/ipfs/boxo/verifcid"
1513
cid "github.com/ipfs/go-cid"
@@ -702,8 +700,7 @@ type pinVerifyOpts struct {
702700
func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts, enc cidenc.Encoder) (<-chan any, error) {
703701
visited := make(map[cid.Cid]PinStatus)
704702

705-
bs := n.Blocks.Blockstore()
706-
DAG := dag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
703+
DAG := n.OfflineDAG
707704
getLinks := dag.GetLinksWithDAG(DAG)
708705

709706
var checkPin func(root cid.Cid) PinStatus

core/core.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,9 @@ type IpfsNode struct {
8181
BaseBlocks node.BaseBlocks // the raw blockstore, no filestore wrapping
8282
GCLocker bstore.GCLocker // the locker used to protect the blockstore during gc
8383
Blocks bserv.BlockService // the block service, get/add blocks.
84+
OfflineBlocks bserv.BlockService `name:"offlineBlockService"` // blockservice which doesn't try to fetch from the network
8485
DAG ipld.DAGService // the merkle dag service, get/add objects.
86+
OfflineDAG ipld.DAGService `name:"offlineDagService"` // merkle dag service which doesn't try to fetch from the network
8587
IPLDFetcherFactory fetcher.Factory `name:"ipldFetcher"` // fetcher that paths over the IPLD data model
8688
UnixFSFetcherFactory fetcher.Factory `name:"unixfsFetcher"` // fetcher that interprets UnixFS data
8789
OfflineIPLDFetcherFactory fetcher.Factory `name:"offlineIpldFetcher"` // fetcher that paths over the IPLD data model without fetching new blocks

core/coreapi/coreapi.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ func (api *CoreAPI) WithOptions(opts ...options.ApiOption) (coreiface.CoreAPI, e
244244

245245
if settings.Offline || !settings.FetchBlocks {
246246
subAPI.exchange = offlinexch.Exchange(subAPI.blockstore)
247-
subAPI.blocks = bserv.New(subAPI.blockstore, subAPI.exchange)
247+
subAPI.blocks = bserv.New(subAPI.blockstore, nil, bserv.WithProvider(subAPI.provider))
248248
subAPI.dag = dag.NewDAGService(subAPI.blocks)
249249
}
250250

core/coreapi/dht.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ import (
66

77
blockservice "github.com/ipfs/boxo/blockservice"
88
blockstore "github.com/ipfs/boxo/blockstore"
9-
offline "github.com/ipfs/boxo/exchange/offline"
109
dag "github.com/ipfs/boxo/ipld/merkledag"
1110
"github.com/ipfs/boxo/path"
11+
"github.com/ipfs/boxo/provider"
1212
cid "github.com/ipfs/go-cid"
1313
cidutil "github.com/ipfs/go-cidutil"
1414
coreiface "github.com/ipfs/kubo/core/coreiface"
@@ -99,7 +99,7 @@ func (api *DhtAPI) Provide(ctx context.Context, path path.Path, opts ...caopts.D
9999
}
100100

101101
if settings.Recursive {
102-
err = provideKeysRec(ctx, api.routing, api.blockstore, []cid.Cid{c})
102+
err = provideKeysRec(ctx, api.routing, api.blockstore, api.provider, []cid.Cid{c})
103103
} else {
104104
err = provideKeys(ctx, api.routing, []cid.Cid{c})
105105
}
@@ -120,12 +120,13 @@ func provideKeys(ctx context.Context, r routing.Routing, cids []cid.Cid) error {
120120
return nil
121121
}
122122

123-
func provideKeysRec(ctx context.Context, r routing.Routing, bs blockstore.Blockstore, cids []cid.Cid) error {
123+
func provideKeysRec(ctx context.Context, r routing.Routing, bs blockstore.Blockstore, prov provider.Provider, cids []cid.Cid) error {
124124
provided := cidutil.NewStreamingSet()
125125

126126
errCh := make(chan error)
127127
go func() {
128-
dserv := dag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
128+
// FIXME: we are recreating a dag and blockservice, maybe offline varients should be shared ?
129+
dserv := dag.NewDAGService(blockservice.New(bs, nil, blockservice.WithProvider(prov)))
129130
for _, c := range cids {
130131
err := dag.Walk(ctx, dag.GetLinksDirect(dserv), c, provided.Visitor(ctx))
131132
if err != nil {

core/coreapi/pin.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66

77
bserv "github.com/ipfs/boxo/blockservice"
8-
offline "github.com/ipfs/boxo/exchange/offline"
98
"github.com/ipfs/boxo/ipld/merkledag"
109
"github.com/ipfs/boxo/path"
1110
pin "github.com/ipfs/boxo/pinning/pinner"
@@ -195,7 +194,8 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro
195194

196195
visited := make(map[cid.Cid]*pinStatus)
197196
bs := api.blockstore
198-
DAG := merkledag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
197+
// FIXME: we are recreating a dag and blockservice, maybe offline varients should be shared ?
198+
DAG := merkledag.NewDAGService(bserv.New(bs, nil, bserv.WithProvider(api.provider)))
199199
getLinks := merkledag.GetLinksWithDAG(DAG)
200200

201201
var checkPin func(root cid.Cid) *pinStatus

core/coreapi/unixfs.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
106106
}
107107
exch := api.exchange
108108
pinning := api.pinning
109+
prov := api.provider
109110

110111
if settings.OnlyHash {
111112
node, err := getOrCreateNilNode()
@@ -115,9 +116,10 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
115116
addblockstore = node.Blockstore
116117
exch = node.Exchange
117118
pinning = node.Pinning
119+
prov = nil
118120
}
119121

120-
bserv := blockservice.New(addblockstore, exch) // hash security 001
122+
bserv := blockservice.New(addblockstore, exch, blockservice.WithProvider(prov)) // hash security 001
121123
dserv := merkledag.NewDAGService(bserv)
122124

123125
// add a sync call to the DagService

core/corehttp/gateway.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ import (
99
"net/http"
1010
"time"
1111

12-
"github.com/ipfs/boxo/blockservice"
13-
"github.com/ipfs/boxo/exchange/offline"
1412
"github.com/ipfs/boxo/files"
1513
"github.com/ipfs/boxo/gateway"
1614
"github.com/ipfs/boxo/namesys"
@@ -79,7 +77,7 @@ func VersionOption() ServeOption {
7977

8078
func Libp2pGatewayOption() ServeOption {
8179
return func(n *core.IpfsNode, _ net.Listener, mux *http.ServeMux) (*http.ServeMux, error) {
82-
bserv := blockservice.New(n.Blocks.Blockstore(), offline.Exchange(n.Blocks.Blockstore()))
80+
bserv := n.OfflineBlocks
8381

8482
backend, err := gateway.NewBlocksBackend(bserv,
8583
// GatewayOverLibp2p only returns things that are in local blockstore
@@ -118,7 +116,7 @@ func newGatewayBackend(n *core.IpfsNode) (gateway.IPFSBackend, error) {
118116
pathResolver := n.UnixFSPathResolver
119117

120118
if cfg.Gateway.NoFetch {
121-
bserv = blockservice.New(bserv.Blockstore(), offline.Exchange(bserv.Blockstore()))
119+
bserv = n.OfflineBlocks
122120

123121
cs := cfg.Ipns.ResolveCacheSize
124122
if cs == 0 {

core/node/bitswap.go

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,16 @@ type bitswapOptionsOut struct {
3131
BitswapOpts []bitswap.Option `group:"bitswap-options,flatten"`
3232
}
3333

34-
// BitswapOptions creates configuration options for Bitswap from the config file
35-
// and whether to provide data.
36-
func BitswapOptions(cfg *config.Config, provide bool) interface{} {
37-
return func() bitswapOptionsOut {
34+
// BitswapOptions creates configuration options for Bitswap from the config file.
35+
func BitswapOptions(cfg *config.Config) fx.Option {
36+
return fx.Provide(func(routing irouting.ProvideManyRouter) bitswapOptionsOut {
3837
var internalBsCfg config.InternalBitswap
3938
if cfg.Internal.Bitswap != nil {
4039
internalBsCfg = *cfg.Internal.Bitswap
4140
}
4241

4342
opts := []bitswap.Option{
44-
bitswap.ProvideEnabled(provide),
43+
bitswap.WithContentSearch(routing),
4544
bitswap.ProviderSearchDelay(internalBsCfg.ProviderSearchDelay.WithDefault(DefaultProviderSearchDelay)), // See https://github.com/ipfs/go-ipfs/issues/8807 for rationale
4645
bitswap.EngineBlockstoreWorkerCount(int(internalBsCfg.EngineBlockstoreWorkerCount.WithDefault(DefaultEngineBlockstoreWorkerCount))),
4746
bitswap.TaskWorkerCount(int(internalBsCfg.TaskWorkerCount.WithDefault(DefaultTaskWorkerCount))),
@@ -50,25 +49,24 @@ func BitswapOptions(cfg *config.Config, provide bool) interface{} {
5049
}
5150

5251
return bitswapOptionsOut{BitswapOpts: opts}
53-
}
52+
})
5453
}
5554

5655
type onlineExchangeIn struct {
5756
fx.In
5857

5958
Mctx helpers.MetricsCtx
6059
Host host.Host
61-
Rt irouting.ProvideManyRouter
6260
Bs blockstore.GCBlockstore
6361
BitswapOpts []bitswap.Option `group:"bitswap-options"`
6462
}
6563

6664
// OnlineExchange creates new LibP2P backed block exchange (BitSwap).
6765
// Additional options to bitswap.New can be provided via the "bitswap-options"
6866
// group.
69-
func OnlineExchange() interface{} {
70-
return func(in onlineExchangeIn, lc fx.Lifecycle) exchange.Interface {
71-
bitswapNetwork := network.NewFromIpfsHost(in.Host, in.Rt)
67+
func OnlineExchange() fx.Option {
68+
return fx.Provide(func(in onlineExchangeIn, lc fx.Lifecycle) exchange.Interface {
69+
bitswapNetwork := network.NewFromIpfsHost(in.Host)
7270

7371
exch := bitswap.New(helpers.LifecycleCtx(in.Mctx, lc), bitswapNetwork, in.Bs, in.BitswapOpts...)
7472
lc.Append(fx.Hook{
@@ -77,5 +75,5 @@ func OnlineExchange() interface{} {
7775
},
7876
})
7977
return exch
80-
}
78+
})
8179
}

core/node/core.go

Lines changed: 65 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"github.com/ipfs/boxo/blockservice"
88
blockstore "github.com/ipfs/boxo/blockstore"
99
exchange "github.com/ipfs/boxo/exchange"
10-
offline "github.com/ipfs/boxo/exchange/offline"
1110
"github.com/ipfs/boxo/fetcher"
1211
bsfetcher "github.com/ipfs/boxo/fetcher/impl/blockservice"
1312
"github.com/ipfs/boxo/filestore"
@@ -17,6 +16,7 @@ import (
1716
pathresolver "github.com/ipfs/boxo/path/resolver"
1817
pin "github.com/ipfs/boxo/pinning/pinner"
1918
"github.com/ipfs/boxo/pinning/pinner/dspinner"
19+
"github.com/ipfs/boxo/provider"
2020
"github.com/ipfs/go-cid"
2121
"github.com/ipfs/go-datastore"
2222
format "github.com/ipfs/go-ipld-format"
@@ -29,8 +29,8 @@ import (
2929
)
3030

3131
// BlockService creates new blockservice which provides an interface to fetch content-addressable blocks
32-
func BlockService(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface) blockservice.BlockService {
33-
bsvc := blockservice.New(bs, rem)
32+
func BlockService(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interface, prov provider.System) blockservice.BlockService {
33+
bsvc := blockservice.New(bs, rem, blockservice.WithProvider(prov))
3434

3535
lc.Append(fx.Hook{
3636
OnStop: func(ctx context.Context) error {
@@ -41,6 +41,32 @@ func BlockService(lc fx.Lifecycle, bs blockstore.Blockstore, rem exchange.Interf
4141
return bsvc
4242
}
4343

44+
type offlineIn struct {
45+
fx.In
46+
47+
Bs blockstore.Blockstore
48+
Prov provider.System `optional:"true"`
49+
}
50+
51+
type offlineOut struct {
52+
fx.Out
53+
54+
Bs blockservice.BlockService `name:"offlineBlockService"`
55+
}
56+
57+
// OfflineBlockservice is like [BlockService] but it makes an offline version.
58+
func OfflineBlockservice(lc fx.Lifecycle, in offlineIn) offlineOut {
59+
bsvc := blockservice.New(in.Bs, nil, blockservice.WithProvider(in.Prov))
60+
61+
lc.Append(fx.Hook{
62+
OnStop: func(ctx context.Context) error {
63+
return bsvc.Close()
64+
},
65+
})
66+
67+
return offlineOut{Bs: bsvc}
68+
}
69+
4470
// Pinning creates new pinner which tells GC which blocks should be kept
4571
func Pinning(bstore blockstore.Blockstore, ds format.DAGService, repo repo.Repo) (pin.Pinner, error) {
4672
rootDS := repo.Datastore()
@@ -82,38 +108,34 @@ func (s *syncDagService) Session(ctx context.Context) format.NodeGetter {
82108
return merkledag.NewSession(ctx, s.DAGService)
83109
}
84110

85-
// FetchersOut allows injection of fetchers.
86-
type FetchersOut struct {
111+
// fetchersOut allows injection of fetchers.
112+
type fetchersOut struct {
87113
fx.Out
88114
IPLDFetcher fetcher.Factory `name:"ipldFetcher"`
89115
UnixfsFetcher fetcher.Factory `name:"unixfsFetcher"`
90116
OfflineIPLDFetcher fetcher.Factory `name:"offlineIpldFetcher"`
91117
OfflineUnixfsFetcher fetcher.Factory `name:"offlineUnixfsFetcher"`
92118
}
93119

94-
// FetchersIn allows using fetchers for other dependencies.
95-
type FetchersIn struct {
120+
type fetcherIn struct {
96121
fx.In
97-
IPLDFetcher fetcher.Factory `name:"ipldFetcher"`
98-
UnixfsFetcher fetcher.Factory `name:"unixfsFetcher"`
99-
OfflineIPLDFetcher fetcher.Factory `name:"offlineIpldFetcher"`
100-
OfflineUnixfsFetcher fetcher.Factory `name:"offlineUnixfsFetcher"`
122+
Online blockservice.BlockService
123+
Offline blockservice.BlockService `name:"offlineBlockService"`
101124
}
102125

103126
// FetcherConfig returns a fetcher config that can build new fetcher instances
104-
func FetcherConfig(bs blockservice.BlockService) FetchersOut {
105-
ipldFetcher := bsfetcher.NewFetcherConfig(bs)
127+
func FetcherConfig(in fetcherIn) fetchersOut {
128+
ipldFetcher := bsfetcher.NewFetcherConfig(in.Online)
106129
ipldFetcher.PrototypeChooser = dagpb.AddSupportToChooser(bsfetcher.DefaultPrototypeChooser)
107130
unixFSFetcher := ipldFetcher.WithReifier(unixfsnode.Reify)
108131

109132
// Construct offline versions which we can safely use in contexts where
110133
// path resolution should not fetch new blocks via exchange.
111-
offlineBs := blockservice.New(bs.Blockstore(), offline.Exchange(bs.Blockstore()))
112-
offlineIpldFetcher := bsfetcher.NewFetcherConfig(offlineBs)
134+
offlineIpldFetcher := bsfetcher.NewFetcherConfig(in.Offline)
113135
offlineIpldFetcher.PrototypeChooser = dagpb.AddSupportToChooser(bsfetcher.DefaultPrototypeChooser)
114136
offlineUnixFSFetcher := offlineIpldFetcher.WithReifier(unixfsnode.Reify)
115137

116-
return FetchersOut{
138+
return fetchersOut{
117139
IPLDFetcher: ipldFetcher,
118140
UnixfsFetcher: unixFSFetcher,
119141
OfflineIPLDFetcher: offlineIpldFetcher,
@@ -130,8 +152,17 @@ type PathResolversOut struct {
130152
OfflineUnixFSPathResolver pathresolver.Resolver `name:"offlineUnixFSPathResolver"`
131153
}
132154

155+
// PathResolverIn allows using fetchers for other dependencies.
156+
type PathResolverIn struct {
157+
fx.In
158+
IPLDFetcher fetcher.Factory `name:"ipldFetcher"`
159+
UnixfsFetcher fetcher.Factory `name:"unixfsFetcher"`
160+
OfflineIPLDFetcher fetcher.Factory `name:"offlineIpldFetcher"`
161+
OfflineUnixfsFetcher fetcher.Factory `name:"offlineUnixfsFetcher"`
162+
}
163+
133164
// PathResolverConfig creates path resolvers with the given fetchers.
134-
func PathResolverConfig(fetchers FetchersIn) PathResolversOut {
165+
func PathResolverConfig(fetchers PathResolverIn) PathResolversOut {
135166
return PathResolversOut{
136167
IPLDPathResolver: pathresolver.NewBasicResolver(fetchers.IPLDFetcher),
137168
UnixFSPathResolver: pathresolver.NewBasicResolver(fetchers.UnixfsFetcher),
@@ -145,6 +176,23 @@ func Dag(bs blockservice.BlockService) format.DAGService {
145176
return merkledag.NewDAGService(bs)
146177
}
147178

179+
type offlineDagIn struct {
180+
fx.In
181+
182+
Bs blockservice.BlockService `name:"offlineBlockService"`
183+
}
184+
185+
type offlineDagOut struct {
186+
fx.Out
187+
188+
DAG format.DAGService `name:"offlineDagService"`
189+
}
190+
191+
// OfflineDag is like [Dag] but it makes an offline version.
192+
func OfflineDag(lc fx.Lifecycle, in offlineDagIn) offlineDagOut {
193+
return offlineDagOut{DAG: merkledag.NewDAGService(in.Bs)}
194+
}
195+
148196
// Files loads persisted MFS root
149197
func Files(mctx helpers.MetricsCtx, lc fx.Lifecycle, repo repo.Repo, dag format.DAGService) (*mfs.Root, error) {
150198
dsk := datastore.NewKey("/local/filesroot")

0 commit comments

Comments
 (0)