Skip to content

Commit a2b9356

Browse files
committed
chore: merge conflicts
1 parent 8fbea44 commit a2b9356

File tree

2 files changed

+62
-10
lines changed

2 files changed

+62
-10
lines changed

consensus/propagation/requester.go

Lines changed: 48 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ import (
1717

1818
const (
1919
// concurrentPerPeerRequestLimit the maximum number of requests to send a peer.
20-
concurrentPerPeerRequestLimit = 100
20+
concurrentPerPeerRequestLimit = 40
2121

2222
// maxRequestsPerPart the maximum number of requests per parts.
23-
maxRequestsPerPart = 30
23+
maxRequestsPerPart = 40
2424

2525
// maxNumberOfPendingRequests the maximum number of pending requests.
2626
maxNumberOfPendingRequests = 50_000
@@ -29,7 +29,7 @@ const (
2929
maxRequestRetry = 10
3030

3131
// requestTimeout request timeout after it's pending to be sent
32-
requestTimeout = 60 * time.Second
32+
requestTimeout = 10 * time.Second
3333
)
3434

3535
type request struct {
@@ -46,6 +46,7 @@ type request struct {
4646
type requester struct {
4747
sync.Mutex
4848
pendingRequests []*request
49+
sentRequests []*request
4950
perPeerRequests map[p2p.ID]int
5051
// perPartRequests a map of [height][round][partIndex]numberOfRequests
5152
perPartRequests map[int64]map[int32]map[int]int
@@ -54,13 +55,40 @@ type requester struct {
5455
}
5556

5657
func newRequester(logger log.Logger) *requester {
57-
return &requester{
58+
requester := requester{
5859
perPeerRequests: make(map[p2p.ID]int),
5960
perPartRequests: make(map[int64]map[int32]map[int]int),
6061
pendingRequests: make([]*request, 0),
62+
sentRequests: make([]*request, 0),
6163
logger: logger,
6264
ctx: context.Background(),
6365
}
66+
go func() {
67+
ticker := time.NewTicker(2 * time.Second)
68+
for {
69+
select {
70+
case <-requester.ctx.Done():
71+
return
72+
case <-ticker.C:
73+
requester.Lock()
74+
for i, req := range requester.sentRequests {
75+
if req.timestamp.Add(requestTimeout).Before(time.Now()) {
76+
err := requester.removeSentRequest(i)
77+
if err != nil {
78+
requester.logger.With("err", err).Error("failed to remove sent request")
79+
continue
80+
}
81+
requester.perPeerRequests[req.targetPeer.ID()]--
82+
for index := range req.want.Parts.GetTrueIndices() {
83+
requester.perPartRequests[req.want.Height][req.want.Round][index]--
84+
}
85+
}
86+
}
87+
requester.Unlock()
88+
}
89+
}
90+
}()
91+
return &requester
6492
}
6593

6694
func (r *requester) sendRequest(targetPeer p2p.Peer, want *proptypes.WantParts) (bool, error) {
@@ -120,6 +148,7 @@ func (r *requester) sendRequest(targetPeer p2p.Peer, want *proptypes.WantParts)
120148
return false, err
121149
}
122150
}
151+
r.sentRequests = append(r.sentRequests, req)
123152
r.perPeerRequests[targetPeer.ID()]++
124153
}
125154
if !toPostpone.IsEmpty() {
@@ -215,7 +244,7 @@ func (r *requester) sendNextRequest(from p2p.Peer) {
215244
req := r.pendingRequests[index]
216245
if !req.timestamp.Add(requestTimeout).After(time.Now()) {
217246
// remove expired request
218-
err := r.removeRequest(index)
247+
err := r.removePendingRequest(index)
219248
if err != nil {
220249
r.logger.Error("failed to remove expired request", "peer", from, "index", index, "err", err)
221250
}
@@ -229,7 +258,7 @@ func (r *requester) sendNextRequest(from p2p.Peer) {
229258
}
230259
}()
231260
// remove pending request from pending requests list
232-
err := r.removeRequest(index)
261+
err := r.removePendingRequest(index)
233262
if err != nil {
234263
r.logger.Error("failed to remove expired request", "peer", from, "index", index, "err", err)
235264
}
@@ -239,7 +268,7 @@ func (r *requester) sendNextRequest(from p2p.Peer) {
239268
}
240269
}
241270

242-
func (r *requester) removeRequest(index int) error {
271+
func (r *requester) removePendingRequest(index int) error {
243272
if index >= len(r.pendingRequests) {
244273
return errors.New("request index out of pending requests range")
245274
}
@@ -250,3 +279,15 @@ func (r *requester) removeRequest(index int) error {
250279
r.pendingRequests = append(r.pendingRequests[:index], r.pendingRequests[index+1:]...)
251280
return nil
252281
}
282+
283+
func (r *requester) removeSentRequest(index int) error {
284+
if index >= len(r.sentRequests) {
285+
return errors.New("request index out of pending requests range")
286+
}
287+
if index+1 == len(r.sentRequests) {
288+
r.sentRequests = r.sentRequests[:index]
289+
return nil
290+
}
291+
r.sentRequests = append(r.sentRequests[:index], r.sentRequests[index+1:]...)
292+
return nil
293+
}

consensus/propagation/requester_test.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,11 +38,12 @@ func TestRequester_SendRequest(t *testing.T) {
3838
r.perPeerRequests = make(map[p2p.ID]int)
3939
r.perPartRequests = make(map[int64]map[int32]map[int]int)
4040
r.pendingRequests = []*request{}
41+
r.sentRequests = []*request{}
4142
},
4243
want: &proptypes.WantParts{
4344
Height: 10,
4445
Round: 1,
45-
Parts: bits.NewBitArray(1),
46+
Parts: bits.NewBitArray(10).Not(),
4647
Prove: false,
4748
},
4849
expectedSent: true,
@@ -57,7 +58,7 @@ func TestRequester_SendRequest(t *testing.T) {
5758
want: &proptypes.WantParts{
5859
Height: 10,
5960
Round: 1,
60-
Parts: bits.NewBitArray(1),
61+
Parts: bits.NewBitArray(10).Not(),
6162
Prove: false,
6263
},
6364
expectedSent: true,
@@ -72,7 +73,7 @@ func TestRequester_SendRequest(t *testing.T) {
7273
}
7374
},
7475
want: func() *proptypes.WantParts {
75-
bitArray := bits.NewBitArray(1)
76+
bitArray := bits.NewBitArray(10).Not()
7677
bitArray.SetIndex(0, true)
7778
return &proptypes.WantParts{
7879
Height: 10,
@@ -108,6 +109,16 @@ func TestRequester_SendRequest(t *testing.T) {
108109
sent, err := r.sendRequest(peer, tt.want)
109110

110111
assert.Equal(t, tt.expectedSent, sent)
112+
if tt.expectedSent {
113+
exists := false
114+
for _, req := range r.sentRequests {
115+
if req.want.Height == tt.want.Height && req.want.Round == tt.want.Round {
116+
exists = true
117+
}
118+
}
119+
assert.True(t, exists)
120+
}
121+
111122
if tt.expectedError != nil {
112123
assert.EqualError(t, err, tt.expectedError.Error())
113124
} else {

0 commit comments

Comments
 (0)