Skip to content

Commit d414841

Browse files
committed
bitswap/network,bitswap/client: move content routing responsabilities to an option of the client
Given that the previous commit remove the content advertising from the server, it did not made sense to share these paths on the network. The code has been reworked: - addresses aren't magically added to the peerstore as a side-effect of calling `Network.FindProvidersAsync`. Instead they are passed as hints to ConnectTo which copies libp2p `host.ConnectTo` API. - the providerquerymanager is completely shutdown when not using `WithContentSearch` option, this helps usecase where `routinghelpers.Null` is used for content routing and the consumer exclusively rely on broadcast, like networks where most peoples have all the content (Filecoin, Celestia, ...).
1 parent ac0f6e0 commit d414841

24 files changed

+222
-240
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,14 @@ The following emojis are used to highlight certain changes:
2020
- `blockservice.NewWritethrough` deprecated function has been removed, instead you can do `blockservice.New(..., ..., WriteThrough())` like previously.
2121
- `blockservice` now has `WithContentBlocker` option which allows to filter Add and Get requests by CID.
2222
- `blockservice` now have a `WithProvider` option, this allows to recreate the behavior of advertising added blocks the bitswap server used to do.
23+
- `bitswap` & `bitswap/client` now have a `WithContentSearch` option, this pickup the content routing job from `bitswap/network`.
24+
It used to be a commun pattern for consumers which do not need external content routing to pass a [`routinghelpers.Null`](https://pkg.go.dev/github.com/libp2p/go-libp2p-routing-helpers#Null), now this can be ommited completely which is more efficient.
2325

2426
### Changed
2527

28+
- 🛠 `bitswap/network` no longer manages content routing, related Methods and function Arguments have been removed.
29+
- `Network.ConnectTo` method has been changed from [`peer.ID`](https://pkg.go.dev/github.com/libp2p/go-libp2p/core/peer#ID) to [`peer.AddrInfo`](https://pkg.go.dev/github.com/libp2p/go-libp2p/core/peer#AddrInfo), given adding addresses hints used to be a side effect of the network. Theses now need to be passed in as values.
30+
2631
### Removed
2732

2833
- 🛠 `bitswap` & `bitswap/server` no longer provide to content routers, instead you can use the `provider` package because it uses a datastore queue and batches calls to ProvideMany.

bitswap/benchmarks_test.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
bsnet "github.com/ipfs/boxo/bitswap/network"
2121
testinstance "github.com/ipfs/boxo/bitswap/testinstance"
2222
tn "github.com/ipfs/boxo/bitswap/testnet"
23-
mockrouting "github.com/ipfs/boxo/routing/mock"
2423
cid "github.com/ipfs/go-cid"
2524
delay "github.com/ipfs/go-ipfs-delay"
2625
)
@@ -142,7 +141,7 @@ func BenchmarkFetchFromOldBitswap(b *testing.B) {
142141
oldSeedCount := bch.oldSeedCount
143142
newSeedCount := bch.nodeCount - (fetcherCount + oldSeedCount)
144143

145-
net := tn.VirtualNetwork(mockrouting.NewServer(), fixedDelay)
144+
net := tn.VirtualNetwork(fixedDelay)
146145

147146
// Simulate an older Bitswap node (old protocol ID) that doesn't
148147
// send DONT_HAVE responses
@@ -294,7 +293,7 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) {
294293
numblks := 1000
295294

296295
for i := 0; i < b.N; i++ {
297-
net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)
296+
net := tn.RateLimitedVirtualNetwork(d, rateLimitGenerator)
298297

299298
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
300299
defer ig.Close()
@@ -312,7 +311,7 @@ func BenchmarkDatacenterMultiLeechMultiSeed(b *testing.B) {
312311

313312
func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
314313
for i := 0; i < b.N; i++ {
315-
net := tn.VirtualNetwork(mockrouting.NewServer(), d)
314+
net := tn.VirtualNetwork(d)
316315

317316
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
318317

@@ -327,7 +326,7 @@ func subtestDistributeAndFetch(b *testing.B, numnodes, numblks int, d delay.D, b
327326

328327
func subtestDistributeAndFetchRateLimited(b *testing.B, numnodes, numblks int, d delay.D, rateLimitGenerator tn.RateLimitGenerator, blockSize int64, bstoreLatency time.Duration, df distFunc, ff fetchFunc) {
329328
for i := 0; i < b.N; i++ {
330-
net := tn.RateLimitedVirtualNetwork(mockrouting.NewServer(), d, rateLimitGenerator)
329+
net := tn.RateLimitedVirtualNetwork(d, rateLimitGenerator)
331330

332331
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
333332
defer ig.Close()

bitswap/bitswap_test.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk
4949
const kNetworkDelay = 0 * time.Millisecond
5050

5151
func TestClose(t *testing.T) {
52-
vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
52+
vnet := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
5353
ig := testinstance.NewTestInstanceGenerator(vnet, nil, nil)
5454
defer ig.Close()
5555
bgen := blocksutil.NewBlockGenerator()
@@ -66,7 +66,7 @@ func TestClose(t *testing.T) {
6666

6767
func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
6868
rs := mockrouting.NewServer()
69-
net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
69+
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
7070
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
7171
defer ig.Close()
7272

@@ -90,7 +90,7 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
9090
}
9191

9292
func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
93-
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
93+
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
9494
block := blocks.NewBlock([]byte("block"))
9595
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
9696
defer ig.Close()
@@ -118,7 +118,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
118118
}
119119

120120
func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
121-
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
121+
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
122122
block := blocks.NewBlock([]byte("block"))
123123
bsOpts := []bitswap.Option{bitswap.ProviderSearchDelay(50 * time.Millisecond)}
124124
ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts)
@@ -150,7 +150,7 @@ func TestDoesNotProvideWhenConfiguredNotTo(t *testing.T) {
150150
// Tests that a received block is not stored in the blockstore if the block was
151151
// not requested by the client
152152
func TestUnwantedBlockNotAdded(t *testing.T) {
153-
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
153+
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
154154
block := blocks.NewBlock([]byte("block"))
155155
bsMessage := bsmsg.New(true)
156156
bsMessage.AddBlock(block)
@@ -186,7 +186,7 @@ func TestUnwantedBlockNotAdded(t *testing.T) {
186186
// (because the live request queue is full)
187187
func TestPendingBlockAdded(t *testing.T) {
188188
ctx := context.Background()
189-
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
189+
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
190190
bg := blocksutil.NewBlockGenerator()
191191
sessionBroadcastWantCapacity := 4
192192

@@ -278,7 +278,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
278278
if testing.Short() {
279279
t.SkipNow()
280280
}
281-
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
281+
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
282282
ig := testinstance.NewTestInstanceGenerator(net, nil, []bitswap.Option{
283283
bitswap.TaskWorkerCount(5),
284284
bitswap.EngineTaskWorkerCount(5),
@@ -335,7 +335,7 @@ func TestSendToWantingPeer(t *testing.T) {
335335
t.SkipNow()
336336
}
337337

338-
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
338+
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
339339
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
340340
defer ig.Close()
341341
bg := blocksutil.NewBlockGenerator()
@@ -373,7 +373,7 @@ func TestSendToWantingPeer(t *testing.T) {
373373
}
374374

375375
func TestEmptyKey(t *testing.T) {
376-
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
376+
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
377377
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
378378
defer ig.Close()
379379
bs := ig.Instances(1)[0].Exchange
@@ -406,7 +406,7 @@ func assertStat(t *testing.T, st *bitswap.Stat, sblks, rblks, sdata, rdata uint6
406406
}
407407

408408
func TestBasicBitswap(t *testing.T) {
409-
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
409+
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
410410
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
411411
defer ig.Close()
412412
bg := blocksutil.NewBlockGenerator()
@@ -478,7 +478,7 @@ func TestBasicBitswap(t *testing.T) {
478478
}
479479

480480
func TestDoubleGet(t *testing.T) {
481-
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
481+
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
482482
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
483483
defer ig.Close()
484484
bg := blocksutil.NewBlockGenerator()
@@ -543,7 +543,7 @@ func TestDoubleGet(t *testing.T) {
543543
}
544544

545545
func TestWantlistCleanup(t *testing.T) {
546-
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
546+
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
547547
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
548548
defer ig.Close()
549549
bg := blocksutil.NewBlockGenerator()
@@ -665,7 +665,7 @@ func newReceipt(sent, recv, exchanged uint64) *server.Receipt {
665665
}
666666

667667
func TestBitswapLedgerOneWay(t *testing.T) {
668-
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
668+
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
669669
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
670670
defer ig.Close()
671671
bg := blocksutil.NewBlockGenerator()
@@ -714,7 +714,7 @@ func TestBitswapLedgerOneWay(t *testing.T) {
714714
}
715715

716716
func TestBitswapLedgerTwoWay(t *testing.T) {
717-
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
717+
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
718718
ig := testinstance.NewTestInstanceGenerator(net, nil, nil)
719719
defer ig.Close()
720720
bg := blocksutil.NewBlockGenerator()
@@ -803,7 +803,7 @@ func (tsl *testingScoreLedger) Stop() {
803803
// Tests start and stop of a custom decision logic
804804
func TestWithScoreLedger(t *testing.T) {
805805
tsl := newTestingScoreLedger()
806-
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
806+
net := tn.VirtualNetwork(delay.Fixed(kNetworkDelay))
807807
bsOpts := []bitswap.Option{bitswap.WithScoreLedger(tsl)}
808808
ig := testinstance.NewTestInstanceGenerator(net, nil, bsOpts)
809809
defer ig.Close()

bitswap/client/bitswap_with_sessions_test.go

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/ipfs/boxo/bitswap"
1111
"github.com/ipfs/boxo/bitswap/client/internal/session"
1212
"github.com/ipfs/boxo/bitswap/client/traceability"
13+
bsnet "github.com/ipfs/boxo/bitswap/network"
1314
testinstance "github.com/ipfs/boxo/bitswap/testinstance"
1415
tn "github.com/ipfs/boxo/bitswap/testnet"
1516
mockrouting "github.com/ipfs/boxo/routing/mock"
@@ -18,13 +19,15 @@ import (
1819
blocksutil "github.com/ipfs/go-ipfs-blocksutil"
1920
delay "github.com/ipfs/go-ipfs-delay"
2021
tu "github.com/libp2p/go-libp2p-testing/etc"
22+
tnet "github.com/libp2p/go-libp2p-testing/net"
2123
"github.com/libp2p/go-libp2p/core/peer"
24+
"github.com/stretchr/testify/require"
2225
)
2326

2427
func getVirtualNetwork() tn.Network {
2528
// FIXME: the tests are really sensitive to the network delay. fix them to work
2629
// well under varying conditions
27-
return tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
30+
return tn.VirtualNetwork(delay.Fixed(0))
2831
}
2932

3033
func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk blocks.Block) {
@@ -37,10 +40,6 @@ func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk
3740
if err != nil {
3841
t.Fatal(err)
3942
}
40-
err = inst.Adapter.Provide(ctx, blk.Cid())
41-
if err != nil {
42-
t.Fatal(err)
43-
}
4443
}
4544

4645
func TestBasicSessions(t *testing.T) {
@@ -114,7 +113,7 @@ func TestSessionBetweenPeers(t *testing.T) {
114113
ctx, cancel := context.WithCancel(context.Background())
115114
defer cancel()
116115

117-
vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(time.Millisecond))
116+
vnet := tn.VirtualNetwork(delay.Fixed(time.Millisecond))
118117
ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{bitswap.SetSimulateDontHavesOnTimeout(false)})
119118
defer ig.Close()
120119
bgen := blocksutil.NewBlockGenerator()
@@ -219,16 +218,23 @@ func TestFetchNotConnected(t *testing.T) {
219218
defer cancel()
220219

221220
vnet := getVirtualNetwork()
221+
rs := mockrouting.NewServer()
222222
ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{bitswap.ProviderSearchDelay(10 * time.Millisecond)})
223223
defer ig.Close()
224224
bgen := blocksutil.NewBlockGenerator()
225225

226-
other := ig.Next()
226+
var otherClient mockrouting.Client
227+
other := ig.NextWithExtraOptions(func(id tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option) {
228+
otherClient = rs.Client(id)
229+
return nil, nil // don't add content search, only the client needs it
230+
})
227231

228232
// Provide 10 blocks on Peer A
229233
blks := bgen.Blocks(10)
230234
for _, block := range blks {
231235
addBlock(t, ctx, other, block)
236+
err := otherClient.Provide(ctx, block.Cid(), true)
237+
require.NoError(t, err)
232238
}
233239

234240
var cids []cid.Cid
@@ -239,7 +245,9 @@ func TestFetchNotConnected(t *testing.T) {
239245
// Request blocks with Peer B
240246
// Note: Peer A and Peer B are not initially connected, so this tests
241247
// that Peer B will search for and find Peer A
242-
thisNode := ig.Next()
248+
thisNode := ig.NextWithExtraOptions(func(id tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option) {
249+
return nil, []bitswap.Option{bitswap.WithContentSearch(rs.Client(id))}
250+
})
243251
ses := thisNode.Exchange.NewSession(ctx).(*session.Session)
244252
ses.SetBaseTickDelay(time.Millisecond * 10)
245253

@@ -262,16 +270,19 @@ func TestFetchAfterDisconnect(t *testing.T) {
262270
defer cancel()
263271

264272
vnet := getVirtualNetwork()
273+
rs := mockrouting.NewServer()
265274
ig := testinstance.NewTestInstanceGenerator(vnet, nil, []bitswap.Option{
266275
bitswap.ProviderSearchDelay(10 * time.Millisecond),
267276
bitswap.RebroadcastDelay(delay.Fixed(15 * time.Millisecond)),
268277
})
269278
defer ig.Close()
270279
bgen := blocksutil.NewBlockGenerator()
271280

272-
inst := ig.Instances(2)
273-
peerA := inst[0]
274-
peerB := inst[1]
281+
var aClient mockrouting.Client
282+
peerA := ig.NextWithExtraOptions(func(id tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option) {
283+
aClient = rs.Client(id)
284+
return nil, nil // don't add content search, only the client needs it
285+
})
275286

276287
// Provide 5 blocks on Peer A
277288
blks := bgen.Blocks(10)
@@ -283,9 +294,14 @@ func TestFetchAfterDisconnect(t *testing.T) {
283294
firstBlks := blks[:5]
284295
for _, block := range firstBlks {
285296
addBlock(t, ctx, peerA, block)
297+
err := aClient.Provide(ctx, block.Cid(), true)
298+
require.NoError(t, err)
286299
}
287300

288301
// Request all blocks with Peer B
302+
peerB := ig.NextWithExtraOptions(func(id tnet.Identity) ([]bsnet.NetOpt, []bitswap.Option) {
303+
return nil, []bitswap.Option{bitswap.WithContentSearch(rs.Client(id))}
304+
})
289305
ses := peerB.Exchange.NewSession(ctx).(*session.Session)
290306
ses.SetBaseTickDelay(time.Millisecond * 10)
291307

@@ -317,6 +333,8 @@ func TestFetchAfterDisconnect(t *testing.T) {
317333
lastBlks := blks[5:]
318334
for _, block := range lastBlks {
319335
addBlock(t, ctx, peerA, block)
336+
err := aClient.Provide(ctx, block.Cid(), true)
337+
require.NoError(t, err)
320338
}
321339

322340
// Peer B should call FindProviders() and find Peer A

0 commit comments

Comments
 (0)