@@ -4,6 +4,7 @@ package decision
44import (
55 "context"
66 "fmt"
7+ "math/bits"
78 "sync"
89 "time"
910
@@ -147,9 +148,6 @@ type Engine struct {
147148
148149 lock sync.RWMutex // protects the fields immediately below
149150
150- // ledgerMap lists block-related Ledgers by their Partner key.
151- ledgerMap map [peer.ID ]* ledger
152-
153151 // peerLedger saves which peers are waiting for a Cid
154152 peerLedger * peerLedger
155153
@@ -187,6 +185,8 @@ type Engine struct {
187185
188186 bstoreWorkerCount int
189187 maxOutstandingBytesPerPeer int
188+
189+ maxQueuedWantlistEntriesPerPeer uint
190190}
191191
192192// TaskInfo represents the details of a request from a peer.
@@ -270,6 +270,15 @@ func WithMaxOutstandingBytesPerPeer(count int) Option {
270270 }
271271}
272272
273+ // WithMaxQueuedWantlistEntriesPerPeer limits how much individual entries each peer is allowed to send.
274+ // If a peer send us more than this we will truncate newest entries.
275+ // It defaults to DefaultMaxQueuedWantlistEntiresPerPeer.
276+ func WithMaxQueuedWantlistEntriesPerPeer (count uint ) Option {
277+ return func (e * Engine ) {
278+ e .maxQueuedWantlistEntriesPerPeer = count
279+ }
280+ }
281+
273282func WithSetSendDontHave (send bool ) Option {
274283 return func (e * Engine ) {
275284 e .sendDontHaves = send
@@ -330,7 +339,6 @@ func newEngine(
330339 opts ... Option ,
331340) * Engine {
332341 e := & Engine {
333- ledgerMap : make (map [peer.ID ]* ledger ),
334342 scoreLedger : NewDefaultScoreLedger (),
335343 bstoreWorkerCount : defaults .BitswapEngineBlockstoreWorkerCount ,
336344 maxOutstandingBytesPerPeer : defaults .BitswapMaxOutstandingBytesPerPeer ,
@@ -348,6 +356,7 @@ func newEngine(
348356 targetMessageSize : defaultTargetMessageSize ,
349357 tagQueued : fmt .Sprintf (tagFormat , "queued" , uuid .New ().String ()),
350358 tagUseful : fmt .Sprintf (tagFormat , "useful" , uuid .New ().String ()),
359+ maxQueuedWantlistEntriesPerPeer : defaults .MaxQueuedWantlistEntiresPerPeer ,
351360 }
352361
353362 for _ , opt := range opts {
@@ -450,13 +459,10 @@ func (e *Engine) onPeerRemoved(p peer.ID) {
450459
451460// WantlistForPeer returns the list of keys that the given peer has asked for
452461func (e * Engine ) WantlistForPeer (p peer.ID ) []wl.Entry {
453- partner := e .findOrCreate (p )
454-
455- partner .lk .Lock ()
456- entries := partner .wantList .Entries ()
457- partner .lk .Unlock ()
462+ e .lock .RLock ()
463+ defer e .lock .RUnlock ()
458464
459- return entries
465+ return e . peerLedger . WantlistForPeer ( p )
460466}
461467
462468// LedgerForPeer returns aggregated data communication with a given peer.
@@ -605,12 +611,7 @@ func (e *Engine) Peers() []peer.ID {
605611 e .lock .RLock ()
606612 defer e .lock .RUnlock ()
607613
608- response := make ([]peer.ID , 0 , len (e .ledgerMap ))
609-
610- for _ , ledger := range e .ledgerMap {
611- response = append (response , ledger .Partner )
612- }
613- return response
614+ return e .peerLedger .CollectPeerIDs ()
614615}
615616
616617// MessageReceived is called when a message is received from a remote peer.
@@ -659,33 +660,34 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
659660 }
660661
661662 e .lock .Lock ()
662- for _ , entry := range wants {
663- e .peerLedger .Wants (p , entry .Cid )
664- }
665- for _ , entry := range cancels {
666- e .peerLedger .CancelWant (p , entry .Cid )
667- }
668- e .lock .Unlock ()
669663
670- // Get the ledger for the peer
671- l := e .findOrCreate (p )
672- l .lk .Lock ()
673- defer l .lk .Unlock ()
674-
675- // If the peer sent a full wantlist, replace the ledger's wantlist
676664 if m .Full () {
677- l . wantList = wl . New ( )
665+ e . peerLedger . ClearPeerWantlist ( p )
678666 }
679667
680- var activeEntries []peertask.Task
668+ s := uint (e .peerLedger .WantlistSizeForPeer (p ))
669+ if wouldBe := s + uint (len (wants )); wouldBe > e .maxQueuedWantlistEntriesPerPeer {
670+ log .Debugw ("wantlist overflow" , "local" , e .self , "remote" , p , "would be" , wouldBe )
671+ // truncate wantlist to avoid overflow
672+ available , o := bits .Sub (e .maxQueuedWantlistEntriesPerPeer , s , 0 )
673+ if o != 0 {
674+ available = 0
675+ }
676+ wants = wants [:available ]
677+ }
681678
682- // Remove cancelled blocks from the queue
679+ for _ , entry := range wants {
680+ e .peerLedger .Wants (p , entry .Entry )
681+ }
683682 for _ , entry := range cancels {
684683 log .Debugw ("Bitswap engine <- cancel" , "local" , e .self , "from" , p , "cid" , entry .Cid )
685- if l . CancelWant (entry .Cid ) {
684+ if e . peerLedger . CancelWant (p , entry .Cid ) {
686685 e .peerRequestQueue .Remove (entry .Cid , p )
687686 }
688687 }
688+ e .lock .Unlock ()
689+
690+ var activeEntries []peertask.Task
689691
690692 // Cancel a block operation
691693 sendDontHave := func (entry bsmsg.Entry ) {
@@ -724,9 +726,6 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
724726 c := entry .Cid
725727 blockSize , found := blockSizes [entry .Cid ]
726728
727- // Add each want-have / want-block to the ledger
728- l .Wants (c , entry .Priority , entry .WantType )
729-
730729 // If the block was not found
731730 if ! found {
732731 log .Debugw ("Bitswap engine: block not found" , "local" , e .self , "from" , p , "cid" , entry .Cid , "sendDontHave" , entry .SendDontHave )
@@ -763,7 +762,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
763762
764763 // Push entries onto the request queue
765764 if len (activeEntries ) > 0 {
766- e .peerRequestQueue .PushTasks ( p , activeEntries ... )
765+ e .peerRequestQueue .PushTasksTruncated ( e . maxQueuedWantlistEntriesPerPeer , p , activeEntries ... )
767766 e .updateMetrics ()
768767 }
769768}
@@ -809,14 +808,10 @@ func (e *Engine) ReceivedBlocks(from peer.ID, blks []blocks.Block) {
809808 return
810809 }
811810
812- l := e .findOrCreate (from )
813-
814811 // Record how many bytes were received in the ledger
815- l .lk .Lock ()
816- defer l .lk .Unlock ()
817812 for _ , blk := range blks {
818813 log .Debugw ("Bitswap engine <- block" , "local" , e .self , "from" , from , "cid" , blk .Cid (), "size" , len (blk .RawData ()))
819- e .scoreLedger .AddToReceivedBytes (l . Partner , len (blk .RawData ()))
814+ e .scoreLedger .AddToReceivedBytes (from , len (blk .RawData ()))
820815 }
821816}
822817
@@ -835,34 +830,14 @@ func (e *Engine) NotifyNewBlocks(blks []blocks.Block) {
835830
836831 // Check each peer to see if it wants one of the blocks we received
837832 var work bool
838- missingWants := make (map [peer.ID ][]cid.Cid )
839833 for _ , b := range blks {
840834 k := b .Cid ()
841835
842836 e .lock .RLock ()
843837 peers := e .peerLedger .Peers (k )
844838 e .lock .RUnlock ()
845839
846- for _ , p := range peers {
847- e .lock .RLock ()
848- ledger , ok := e .ledgerMap [p ]
849- e .lock .RUnlock ()
850-
851- if ! ok {
852- // This can happen if the peer has disconnected while we're processing this list.
853- log .Debugw ("failed to find peer in ledger" , "peer" , p )
854- missingWants [p ] = append (missingWants [p ], k )
855- continue
856- }
857- ledger .lk .RLock ()
858- entry , ok := ledger .WantListContains (k )
859- ledger .lk .RUnlock ()
860- if ! ok {
861- // This can happen if the peer has canceled their want while we're processing this message.
862- log .Debugw ("wantlist index doesn't match peer's wantlist" , "peer" , p )
863- missingWants [p ] = append (missingWants [p ], k )
864- continue
865- }
840+ for _ , entry := range peers {
866841 work = true
867842
868843 blockSize := blockSizes [k ]
@@ -873,8 +848,8 @@ func (e *Engine) NotifyNewBlocks(blks []blocks.Block) {
873848 entrySize = bsmsg .BlockPresenceSize (k )
874849 }
875850
876- e .peerRequestQueue .PushTasks ( p , peertask.Task {
877- Topic : entry . Cid ,
851+ e .peerRequestQueue .PushTasksTruncated ( e . maxQueuedWantlistEntriesPerPeer , entry . Peer , peertask.Task {
852+ Topic : k ,
878853 Priority : int (entry .Priority ),
879854 Work : entrySize ,
880855 Data : & taskData {
@@ -888,30 +863,6 @@ func (e *Engine) NotifyNewBlocks(blks []blocks.Block) {
888863 }
889864 }
890865
891- // If we found missing wants (e.g., because the peer disconnected, we have some races here)
892- // remove them from the list. Unfortunately, we still have to re-check because the user
893- // could have re-connected in the meantime.
894- if len (missingWants ) > 0 {
895- e .lock .Lock ()
896- for p , wl := range missingWants {
897- if ledger , ok := e .ledgerMap [p ]; ok {
898- ledger .lk .RLock ()
899- for _ , k := range wl {
900- if _ , has := ledger .WantListContains (k ); has {
901- continue
902- }
903- e .peerLedger .CancelWant (p , k )
904- }
905- ledger .lk .RUnlock ()
906- } else {
907- for _ , k := range wl {
908- e .peerLedger .CancelWant (p , k )
909- }
910- }
911- }
912- e .lock .Unlock ()
913- }
914-
915866 if work {
916867 e .signalNewWork ()
917868 }
@@ -926,21 +877,20 @@ func (e *Engine) NotifyNewBlocks(blks []blocks.Block) {
926877// MessageSent is called when a message has successfully been sent out, to record
927878// changes.
928879func (e * Engine ) MessageSent (p peer.ID , m bsmsg.BitSwapMessage ) {
929- l := e .findOrCreate (p )
930- l .lk .Lock ()
931- defer l .lk .Unlock ()
880+ e .lock .Lock ()
881+ defer e .lock .Unlock ()
932882
933883 // Remove sent blocks from the want list for the peer
934884 for _ , block := range m .Blocks () {
935- e .scoreLedger .AddToSentBytes (l . Partner , len (block .RawData ()))
936- l . wantList . RemoveType ( block .Cid (), pb .Message_Wantlist_Block )
885+ e .scoreLedger .AddToSentBytes (p , len (block .RawData ()))
886+ e . peerLedger . CancelWantWithType ( p , block .Cid (), pb .Message_Wantlist_Block )
937887 }
938888
939889 // Remove sent block presences from the want list for the peer
940890 for _ , bp := range m .BlockPresences () {
941891 // Don't record sent data. We reserve that for data blocks.
942892 if bp .Type == pb .Message_Have {
943- l . wantList . RemoveType ( bp .Cid , pb .Message_Wantlist_Have )
893+ e . peerLedger . CancelWantWithType ( p , bp .Cid , pb .Message_Wantlist_Have )
944894 }
945895 }
946896}
@@ -951,31 +901,17 @@ func (e *Engine) PeerConnected(p peer.ID) {
951901 e .lock .Lock ()
952902 defer e .lock .Unlock ()
953903
954- _ , ok := e .ledgerMap [p ]
955- if ! ok {
956- e .ledgerMap [p ] = newLedger (p )
957- }
958-
959904 e .scoreLedger .PeerConnected (p )
960905}
961906
962907// PeerDisconnected is called when a peer disconnects.
963908func (e * Engine ) PeerDisconnected (p peer.ID ) {
909+ e .peerRequestQueue .Clear (p )
910+
964911 e .lock .Lock ()
965912 defer e .lock .Unlock ()
966913
967- ledger , ok := e .ledgerMap [p ]
968- if ok {
969- ledger .lk .RLock ()
970- entries := ledger .Entries ()
971- ledger .lk .RUnlock ()
972-
973- for _ , entry := range entries {
974- e .peerLedger .CancelWant (p , entry .Cid )
975- }
976- }
977- delete (e .ledgerMap , p )
978-
914+ e .peerLedger .PeerDisconnected (p )
979915 e .scoreLedger .PeerDisconnected (p )
980916}
981917
@@ -994,29 +930,6 @@ func (e *Engine) numBytesReceivedFrom(p peer.ID) uint64 {
994930 return e .LedgerForPeer (p ).Recv
995931}
996932
997- // ledger lazily instantiates a ledger
998- func (e * Engine ) findOrCreate (p peer.ID ) * ledger {
999- // Take a read lock (as it's less expensive) to check if we have a ledger
1000- // for the peer
1001- e .lock .RLock ()
1002- l , ok := e .ledgerMap [p ]
1003- e .lock .RUnlock ()
1004- if ok {
1005- return l
1006- }
1007-
1008- // There's no ledger, so take a write lock, then check again and create the
1009- // ledger if necessary
1010- e .lock .Lock ()
1011- defer e .lock .Unlock ()
1012- l , ok = e .ledgerMap [p ]
1013- if ! ok {
1014- l = newLedger (p )
1015- e .ledgerMap [p ] = l
1016- }
1017- return l
1018- }
1019-
1020933func (e * Engine ) signalNewWork () {
1021934 // Signal task generation to restart (if stopped!)
1022935 select {
0 commit comments