@@ -354,6 +354,10 @@ class Peer {
354
354
else this . sendSync ( )
355
355
}
356
356
357
+ _markInflight ( index ) {
358
+ this . missingBlocks . set ( index , false )
359
+ }
360
+
357
361
broadcastRange ( start , length , drop ) {
358
362
if ( drop ) this . _unclearLocalRange ( start , length )
359
363
else this . _clearLocalRange ( start , length )
@@ -638,11 +642,12 @@ class Peer {
638
642
}
639
643
640
644
_cancelRequest ( id ) {
641
- const exists = this . replicator . _inflight . get ( id )
642
- if ( ! exists ) return
645
+ const req = this . replicator . _inflight . get ( id )
646
+ if ( ! req ) return
643
647
644
648
this . inflight --
645
649
this . replicator . _removeInflight ( id )
650
+ if ( isBlockRequest ( req ) ) this . replicator . _unmarkInflight ( req . block . index )
646
651
647
652
this . wireCancel . send ( { request : id } )
648
653
}
@@ -692,6 +697,8 @@ class Peer {
692
697
if ( reorg === true ) return await this . replicator . _onreorgdata ( this , req , data )
693
698
} catch ( err ) {
694
699
safetyCatch ( err )
700
+ if ( isBlockRequest ( req ) ) this . replicator . _unmarkInflight ( req . block . index )
701
+
695
702
this . paused = true
696
703
this . replicator . oninvalid ( err , req , data , this )
697
704
return
@@ -706,6 +713,7 @@ class Peer {
706
713
}
707
714
} catch ( err ) {
708
715
safetyCatch ( err )
716
+ if ( isBlockRequest ( req ) ) this . replicator . _unmarkInflight ( req . block . index )
709
717
710
718
if ( err . code === 'WRITE_FAILED' ) {
711
719
// For example, we don't want to keep pulling data when storage is full
@@ -795,9 +803,13 @@ class Peer {
795
803
}
796
804
}
797
805
806
+ _resetMissingBlock ( index ) {
807
+ this . missingBlocks . set ( index , this . _remoteHasBlock ( index ) && ! this . core . bitfield . get ( index ) )
808
+ }
809
+
798
810
_unclearLocalRange ( start , length ) {
799
811
if ( length === 1 ) {
800
- this . missingBlocks . set ( start , this . _remoteHasBlock ( start ) && ! this . core . bitfield . get ( start ) )
812
+ this . _resetMissingBlock ( start )
801
813
return
802
814
}
803
815
@@ -1010,6 +1022,14 @@ class Peer {
1010
1022
return index < this . _remoteContiguousLength || this . remoteBitfield . get ( index ) === true
1011
1023
}
1012
1024
1025
+ _sendBlockRequest ( req , b ) {
1026
+ req . block = { index : b . index , nodes : 0 }
1027
+ this . replicator . _markInflight ( b . index )
1028
+
1029
+ b . inflight . push ( req )
1030
+ this . _send ( req )
1031
+ }
1032
+
1013
1033
_requestBlock ( b ) {
1014
1034
const { length, fork } = this . core . tree
1015
1035
@@ -1025,10 +1045,7 @@ class Peer {
1025
1045
const req = this . _makeRequest ( b . index >= length , b . priority )
1026
1046
if ( req === null ) return false
1027
1047
1028
- req . block = { index : b . index , nodes : 0 }
1029
-
1030
- b . inflight . push ( req )
1031
- this . _send ( req )
1048
+ this . _sendBlockRequest ( req , b )
1032
1049
1033
1050
return true
1034
1051
}
@@ -1047,10 +1064,7 @@ class Peer {
1047
1064
return false
1048
1065
}
1049
1066
1050
- req . block = { index, nodes : 0 }
1051
-
1052
- b . inflight . push ( req )
1053
- this . _send ( req )
1067
+ this . _sendBlockRequest ( req , b )
1054
1068
1055
1069
// Don't think this will ever happen, as the pending queue is drained before the range queue
1056
1070
// but doesn't hurt to check this explicitly here also.
@@ -1552,11 +1566,14 @@ module.exports = class Replicator {
1552
1566
this . onpeerupdate ( true , peer )
1553
1567
}
1554
1568
1569
+ _markInflight ( index ) {
1570
+ for ( const peer of this . peers ) peer . _markInflight ( index )
1571
+ }
1572
+
1555
1573
_removeInflight ( id ) {
1556
1574
this . _inflight . remove ( id )
1557
- if ( this . isDownloading ( ) === false ) {
1558
- for ( const peer of this . peers ) peer . signalUpgrade ( )
1559
- }
1575
+ if ( this . isDownloading ( ) === true ) return
1576
+ for ( const peer of this . peers ) peer . signalUpgrade ( )
1560
1577
}
1561
1578
1562
1579
_removePeer ( peer ) {
@@ -1759,6 +1776,7 @@ module.exports = class Replicator {
1759
1776
_clearRequest ( peer , req ) {
1760
1777
if ( req . block !== null ) {
1761
1778
this . _clearInflightBlock ( this . _blocks , req )
1779
+ this . _unmarkInflight ( req . block . index )
1762
1780
}
1763
1781
1764
1782
if ( req . hash !== null ) {
@@ -1783,6 +1801,10 @@ module.exports = class Replicator {
1783
1801
this . updateAll ( )
1784
1802
}
1785
1803
1804
+ _unmarkInflight ( index ) {
1805
+ for ( const peer of this . peers ) peer . _resetMissingBlock ( index )
1806
+ }
1807
+
1786
1808
_ondata ( peer , req , data ) {
1787
1809
if ( data . block !== null ) {
1788
1810
this . _resolveBlockRequest ( this . _blocks , data . block . index , data . block . value , req )
@@ -2218,3 +2240,7 @@ function onwireextension (m, c) {
2218
2240
function setDownloadingLater ( repl , downloading , session ) {
2219
2241
repl . setDownloadingNow ( downloading , session )
2220
2242
}
2243
+
2244
+ function isBlockRequest ( req ) {
2245
+ return req !== null && req . block !== null
2246
+ }
0 commit comments