Skip to content

Commit 2fdde02

Browse files
authored
Merge pull request #683 from ipfs/release-v0.24.0
Release v0.24.0
2 parents 41b8882 + eac6c25 commit 2fdde02

File tree

24 files changed

+1267
-155
lines changed

24 files changed

+1267
-155
lines changed

CHANGELOG.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,26 @@ The following emojis are used to highlight certain changes:
2424

2525
### Security
2626

27+
## [v0.24.0]
28+
29+
### Added
30+
31+
* `boxo/bitswap/server`:
32+
* A new [`WithWantHaveReplaceSize(n)`](https://pkg.go.dev/github.com/ipfs/boxo/bitswap/server/#WithWantHaveReplaceSize) option can be used with `bitswap.New` to fine-tune cost-vs-performance. It sets the maximum size of a block in bytes up to which the bitswap server will replace a WantHave with a WantBlock response. Setting this to 0 disables this WantHave replacement and means that block sizes are not read when processing WantHave requests. [#672](https://github.com/ipfs/boxo/pull/672)
33+
* `routing/http`:
34+
* added support for address and protocol filtering to the delegated routing server ([IPIP-484](https://github.com/ipfs/specs/pull/484)) [#671](https://github.com/ipfs/boxo/pull/671) [#678](https://github.com/ipfs/boxo/pull/678)
35+
* added support for address and protocol filtering to the delegated routing client ([IPIP-484](https://github.com/ipfs/specs/pull/484)) [#678](https://github.com/ipfs/boxo/pull/678). To add filtering to the client, use the [`WithFilterAddrs`](https://pkg.go.dev/github.com/ipfs/boxo/routing/http/client#WithFilterAddrs) and [`WithFilterProtocols`](https://pkg.go.dev/github.com/ipfs/boxo/routing/http/client#WithFilterProtocols) options when creating the client.Client-side filtering for servers that don't support filtering is enabled by default. To disable it, use the [`disableLocalFiltering`](https://pkg.go.dev/github.com/ipfs/boxo/routing/http/client#disableLocalFiltering) option when creating the client.
36+
37+
### Changed
38+
39+
### Removed
40+
41+
### Fixed
42+
43+
- `unixfs/hamt` Log error instead of panic if both link and shard are nil [#393](https://github.com/ipfs/boxo/pull/393)
44+
45+
### Security
46+
2747
## [v0.23.0]
2848

2949
### Added

bitswap/internal/defaults/defaults.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,7 @@ const (
3737
// RebroadcastDelay is the default delay to trigger broadcast of
3838
// random CIDs in the wantlist.
3939
RebroadcastDelay = time.Minute
40+
41+
// DefaultWantHaveReplaceSize controls the implicit behavior of WithWantHaveReplaceSize.
42+
DefaultWantHaveReplaceSize = 1024
4043
)

bitswap/options.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,13 @@ func WithTaskComparator(comparator server.TaskComparator) Option {
7171
return Option{server.WithTaskComparator(comparator)}
7272
}
7373

74+
// WithWantHaveReplaceSize sets the maximum size of a block in bytes up to
75+
// which the bitswap server will replace a WantHave with a WantBlock response.
76+
// See [server.WithWantHaveReplaceSize] for details.
77+
func WithWantHaveReplaceSize(size int) Option {
78+
return Option{server.WithWantHaveReplaceSize(size)}
79+
}
80+
7481
func ProviderSearchDelay(newProvSearchDelay time.Duration) Option {
7582
return Option{client.ProviderSearchDelay(newProvSearchDelay)}
7683
}

bitswap/server/internal/decision/blockstoremanager.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,42 @@ func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) (
121121
return res, nil
122122
}
123123

124+
func (bsm *blockstoreManager) hasBlocks(ctx context.Context, ks []cid.Cid) (map[cid.Cid]struct{}, error) {
125+
if len(ks) == 0 {
126+
return nil, nil
127+
}
128+
hasBlocks := make([]bool, len(ks))
129+
130+
var count atomic.Int32
131+
err := bsm.jobPerKey(ctx, ks, func(i int, c cid.Cid) {
132+
has, err := bsm.bs.Has(ctx, c)
133+
if err != nil {
134+
// Note: this isn't a fatal error. We shouldn't abort the request
135+
log.Errorf("blockstore.Has(%c) error: %s", c, err)
136+
return
137+
}
138+
if has {
139+
hasBlocks[i] = true
140+
count.Add(1)
141+
}
142+
})
143+
if err != nil {
144+
return nil, err
145+
}
146+
results := count.Load()
147+
if results == 0 {
148+
return nil, nil
149+
}
150+
151+
res := make(map[cid.Cid]struct{}, results)
152+
for i, ok := range hasBlocks {
153+
if ok {
154+
res[ks[i]] = struct{}{}
155+
}
156+
}
157+
return res, nil
158+
}
159+
124160
func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) (map[cid.Cid]blocks.Block, error) {
125161
if len(ks) == 0 {
126162
return nil, nil

bitswap/server/internal/decision/blockstoremanager_test.go

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -98,29 +98,22 @@ func TestBlockstoreManager(t *testing.T) {
9898
cids = append(cids, b.Cid())
9999
}
100100

101-
sizes, err := bsm.getBlockSizes(ctx, cids)
101+
hasBlocks, err := bsm.hasBlocks(ctx, cids)
102102
if err != nil {
103103
t.Fatal(err)
104104
}
105-
if len(sizes) != len(blks)-1 {
105+
if len(hasBlocks) != len(blks)-1 {
106106
t.Fatal("Wrong response length")
107107
}
108-
109108
for _, c := range cids {
110-
expSize := len(exp[c].RawData())
111-
size, ok := sizes[c]
112-
113-
// Only the last key should be missing
109+
_, ok := hasBlocks[c]
114110
if c.Equals(cids[len(cids)-1]) {
115111
if ok {
116112
t.Fatal("Non-existent block should not be in sizes map")
117113
}
118114
} else {
119115
if !ok {
120-
t.Fatal("Block should be in sizes map")
121-
}
122-
if size != expSize {
123-
t.Fatal("Block has wrong size")
116+
t.Fatal("Block should be in hasBlocks")
124117
}
125118
}
126119
}

bitswap/server/internal/decision/engine.go

Lines changed: 76 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,6 @@ const (
7777
// queuedTagWeight is the default weight for peers that have work queued
7878
// on their behalf.
7979
queuedTagWeight = 10
80-
81-
// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
82-
// bytes up to which we will replace a want-have with a want-block
83-
maxBlockSizeReplaceHasWithBlock = 1024
8480
)
8581

8682
// Envelope contains a message for a Peer.
@@ -202,9 +198,9 @@ type Engine struct {
202198

203199
targetMessageSize int
204200

205-
// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
206-
// bytes up to which we will replace a want-have with a want-block
207-
maxBlockSizeReplaceHasWithBlock int
201+
// wantHaveReplaceSize is the maximum size of the block in bytes up to
202+
// which to replace a WantHave with a WantBlock.
203+
wantHaveReplaceSize int
208204

209205
sendDontHaves bool
210206

@@ -343,6 +339,14 @@ func WithSetSendDontHave(send bool) Option {
343339
}
344340
}
345341

342+
// WithWantHaveReplaceSize sets the maximum size of a block in bytes up to
343+
// which to replace a WantHave with a WantBlock response.
344+
func WithWantHaveReplaceSize(size int) Option {
345+
return func(e *Engine) {
346+
e.wantHaveReplaceSize = size
347+
}
348+
}
349+
346350
// wrapTaskComparator wraps a TaskComparator so it can be used as a QueueTaskComparator
347351
func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator {
348352
return func(a, b *peertask.QueueTask) bool {
@@ -369,32 +373,14 @@ func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator {
369373
}
370374

371375
// NewEngine creates a new block sending engine for the given block store.
372-
// maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer more tasks if it has some maximum
373-
// work already outstanding.
376+
// maxOutstandingBytesPerPeer hints to the peer task queue not to give a peer
377+
// more tasks if it has some maximum work already outstanding.
374378
func NewEngine(
375379
ctx context.Context,
376380
bs bstore.Blockstore,
377381
peerTagger PeerTagger,
378382
self peer.ID,
379383
opts ...Option,
380-
) *Engine {
381-
return newEngine(
382-
ctx,
383-
bs,
384-
peerTagger,
385-
self,
386-
maxBlockSizeReplaceHasWithBlock,
387-
opts...,
388-
)
389-
}
390-
391-
func newEngine(
392-
ctx context.Context,
393-
bs bstore.Blockstore,
394-
peerTagger PeerTagger,
395-
self peer.ID,
396-
maxReplaceSize int,
397-
opts ...Option,
398384
) *Engine {
399385
e := &Engine{
400386
scoreLedger: NewDefaultScoreLedger(),
@@ -404,7 +390,7 @@ func newEngine(
404390
outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
405391
workSignal: make(chan struct{}, 1),
406392
ticker: time.NewTicker(time.Millisecond * 100),
407-
maxBlockSizeReplaceHasWithBlock: maxReplaceSize,
393+
wantHaveReplaceSize: defaults.DefaultWantHaveReplaceSize,
408394
taskWorkerCount: defaults.BitswapEngineTaskWorkerCount,
409395
sendDontHaves: true,
410396
self: self,
@@ -445,6 +431,12 @@ func newEngine(
445431

446432
e.peerRequestQueue = peertaskqueue.New(peerTaskQueueOpts...)
447433

434+
if e.wantHaveReplaceSize == 0 {
435+
log.Info("Replace WantHave with WantBlock is disabled")
436+
} else {
437+
log.Infow("Replace WantHave with WantBlock is enabled", "maxSize", e.wantHaveReplaceSize)
438+
}
439+
448440
return e
449441
}
450442

@@ -689,16 +681,38 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
689681
return true
690682
}
691683

684+
noReplace := e.wantHaveReplaceSize == 0
685+
692686
// Get block sizes for unique CIDs.
693-
wantKs := cid.NewSet()
687+
wantKs := make([]cid.Cid, 0, len(wants))
688+
var haveKs []cid.Cid
694689
for _, entry := range wants {
695-
wantKs.Add(entry.Cid)
690+
if noReplace && entry.WantType == pb.Message_Wantlist_Have {
691+
haveKs = append(haveKs, entry.Cid)
692+
} else {
693+
wantKs = append(wantKs, entry.Cid)
694+
}
696695
}
697-
blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs.Keys())
696+
blockSizes, err := e.bsm.getBlockSizes(ctx, wantKs)
698697
if err != nil {
699698
log.Info("aborting message processing", err)
700699
return false
701700
}
701+
if len(haveKs) != 0 {
702+
hasBlocks, err := e.bsm.hasBlocks(ctx, haveKs)
703+
if err != nil {
704+
log.Info("aborting message processing", err)
705+
return false
706+
}
707+
if len(hasBlocks) != 0 {
708+
if blockSizes == nil {
709+
blockSizes = make(map[cid.Cid]int, len(hasBlocks))
710+
}
711+
for blkCid := range hasBlocks {
712+
blockSizes[blkCid] = 0
713+
}
714+
}
715+
}
702716

703717
e.lock.Lock()
704718

@@ -707,20 +721,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
707721
}
708722

709723
var overflow []bsmsg.Entry
710-
if len(wants) != 0 {
711-
filteredWants := wants[:0] // shift inplace
712-
for _, entry := range wants {
713-
if !e.peerLedger.Wants(p, entry.Entry) {
714-
// Cannot add entry because it would exceed size limit.
715-
overflow = append(overflow, entry)
716-
continue
717-
}
718-
filteredWants = append(filteredWants, entry)
719-
}
720-
// Clear truncated entries - early GC.
721-
clear(wants[len(filteredWants):])
722-
wants = filteredWants
723-
}
724+
wants, overflow = e.filterOverflow(p, wants, overflow)
724725

725726
if len(overflow) != 0 {
726727
log.Infow("handling wantlist overflow", "local", e.self, "from", p, "wantlistSize", len(wants), "overflowSize", len(overflow))
@@ -764,7 +765,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
764765
sendDontHave(entry)
765766
}
766767

767-
// For each want-have / want-block
768+
// For each want-block
768769
for _, entry := range wants {
769770
c := entry.Cid
770771
blockSize, found := blockSizes[c]
@@ -776,7 +777,10 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
776777
continue
777778
}
778779
// The block was found, add it to the queue
779-
isWantBlock := e.sendAsBlock(entry.WantType, blockSize)
780+
781+
// Check if this is a want-block or a have-block that can be converted
782+
// to a want-block.
783+
isWantBlock := blockSize != 0 && e.sendAsBlock(entry.WantType, blockSize)
780784

781785
log.Debugw("Bitswap engine: block found", "local", e.self, "from", p, "cid", c, "isWantBlock", isWantBlock)
782786

@@ -810,6 +814,25 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
810814
return false
811815
}
812816

817+
func (e *Engine) filterOverflow(p peer.ID, wants, overflow []bsmsg.Entry) ([]bsmsg.Entry, []bsmsg.Entry) {
818+
if len(wants) == 0 {
819+
return wants, overflow
820+
}
821+
822+
filteredWants := wants[:0] // shift inplace
823+
for _, entry := range wants {
824+
if !e.peerLedger.Wants(p, entry.Entry) {
825+
// Cannot add entry because it would exceed size limit.
826+
overflow = append(overflow, entry)
827+
continue
828+
}
829+
filteredWants = append(filteredWants, entry)
830+
}
831+
// Clear truncated entries - early GC.
832+
clear(wants[len(filteredWants):])
833+
return filteredWants, overflow
834+
}
835+
813836
// handleOverflow processes incoming wants that could not be addded to the peer
814837
// ledger without exceeding the peer want limit. These are handled by trying to
815838
// make room by canceling existing wants for which there is no block. If this
@@ -913,17 +936,17 @@ func (e *Engine) splitWantsCancelsDenials(p peer.ID, m bsmsg.BitSwapMessage) ([]
913936
continue
914937
}
915938

939+
if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, c) {
940+
denials = append(denials, et)
941+
continue
942+
}
943+
916944
if et.WantType == pb.Message_Wantlist_Have {
917945
log.Debugw("Bitswap engine <- want-have", "local", e.self, "from", p, "cid", c)
918946
} else {
919947
log.Debugw("Bitswap engine <- want-block", "local", e.self, "from", p, "cid", c)
920948
}
921949

922-
if e.peerBlockRequestFilter != nil && !e.peerBlockRequestFilter(p, c) {
923-
denials = append(denials, et)
924-
continue
925-
}
926-
927950
// Do not take more wants that can be handled.
928951
if len(wants) < int(e.maxQueuedWantlistEntriesPerPeer) {
929952
wants = append(wants, et)
@@ -1057,8 +1080,7 @@ func (e *Engine) PeerDisconnected(p peer.ID) {
10571080
// If the want is a want-have, and it's below a certain size, send the full
10581081
// block (instead of sending a HAVE)
10591082
func (e *Engine) sendAsBlock(wantType pb.Message_Wantlist_WantType, blockSize int) bool {
1060-
isWantBlock := wantType == pb.Message_Wantlist_Block
1061-
return isWantBlock || blockSize <= e.maxBlockSizeReplaceHasWithBlock
1083+
return wantType == pb.Message_Wantlist_Block || blockSize <= e.wantHaveReplaceSize
10621084
}
10631085

10641086
func (e *Engine) numBytesSentTo(p peer.ID) uint64 {

bitswap/server/internal/decision/engine_test.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -188,17 +188,11 @@ func newEngineForTesting(
188188
bs blockstore.Blockstore,
189189
peerTagger PeerTagger,
190190
self peer.ID,
191-
maxReplaceSize int,
191+
wantHaveReplaceSize int,
192192
opts ...Option,
193193
) *Engine {
194-
return newEngine(
195-
ctx,
196-
bs,
197-
peerTagger,
198-
self,
199-
maxReplaceSize,
200-
opts...,
201-
)
194+
opts = append(opts, WithWantHaveReplaceSize(wantHaveReplaceSize))
195+
return NewEngine(ctx, bs, peerTagger, self, opts...)
202196
}
203197

204198
func TestOutboxClosedWhenEngineClosed(t *testing.T) {

0 commit comments

Comments
 (0)