@@ -35,7 +35,7 @@ pub(crate) struct SelfRateLimiter<Id: ReqId, E: EthSpec> {
35
35
/// Rate limiter for our own requests.
36
36
limiter : RateLimiter ,
37
37
/// Requests that are ready to be sent.
38
- ready_requests : SmallVec < [ BehaviourAction < Id , E > ; 3 ] > ,
38
+ ready_requests : SmallVec < [ ( PeerId , RPCSend < Id , E > ) ; 3 ] > ,
39
39
/// Slog logger.
40
40
log : Logger ,
41
41
}
@@ -76,7 +76,7 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
76
76
peer_id : PeerId ,
77
77
request_id : Id ,
78
78
req : RequestType < E > ,
79
- ) -> Result < BehaviourAction < Id , E > , Error > {
79
+ ) -> Result < RPCSend < Id , E > , Error > {
80
80
let protocol = req. versioned_protocol ( ) . protocol ( ) ;
81
81
// First check that there are not already other requests waiting to be sent.
82
82
if let Some ( queued_requests) = self . delayed_requests . get_mut ( & ( peer_id, protocol) ) {
@@ -108,13 +108,9 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
108
108
request_id : Id ,
109
109
req : RequestType < E > ,
110
110
log : & Logger ,
111
- ) -> Result < BehaviourAction < Id , E > , ( QueuedRequest < Id , E > , Duration ) > {
111
+ ) -> Result < RPCSend < Id , E > , ( QueuedRequest < Id , E > , Duration ) > {
112
112
match limiter. allows ( & peer_id, & req) {
113
- Ok ( ( ) ) => Ok ( BehaviourAction :: NotifyHandler {
114
- peer_id,
115
- handler : NotifyHandler :: Any ,
116
- event : RPCSend :: Request ( request_id, req) ,
117
- } ) ,
113
+ Ok ( ( ) ) => Ok ( RPCSend :: Request ( request_id, req) ) ,
118
114
Err ( e) => {
119
115
let protocol = req. versioned_protocol ( ) ;
120
116
match e {
@@ -126,11 +122,7 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
126
122
"Self rate limiting error for a batch that will never fit. Sending request anyway. Check configuration parameters." ;
127
123
"protocol" => %req. versioned_protocol( ) . protocol( )
128
124
) ;
129
- Ok ( BehaviourAction :: NotifyHandler {
130
- peer_id,
131
- handler : NotifyHandler :: Any ,
132
- event : RPCSend :: Request ( request_id, req) ,
133
- } )
125
+ Ok ( RPCSend :: Request ( request_id, req) )
134
126
}
135
127
RateLimitedErr :: TooSoon ( wait_time) => {
136
128
debug ! ( log, "Self rate limiting" ; "protocol" => %protocol. protocol( ) , "wait_time_ms" => wait_time. as_millis( ) , "peer_id" => %peer_id) ;
@@ -156,7 +148,7 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
156
148
// If one fails just wait for the next window that allows sending requests.
157
149
return ;
158
150
}
159
- Ok ( event) => self . ready_requests . push ( event) ,
151
+ Ok ( event) => self . ready_requests . push ( ( peer_id , event) ) ,
160
152
}
161
153
}
162
154
if queued_requests. is_empty ( ) {
@@ -203,8 +195,12 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
203
195
let _ = self . limiter . poll_unpin ( cx) ;
204
196
205
197
// Finally return any queued events.
206
- if !self . ready_requests . is_empty ( ) {
207
- return Poll :: Ready ( self . ready_requests . remove ( 0 ) ) ;
198
+ if let Some ( ( peer_id, event) ) = self . ready_requests . pop ( ) {
199
+ return Poll :: Ready ( BehaviourAction :: NotifyHandler {
200
+ peer_id,
201
+ handler : NotifyHandler :: Any ,
202
+ event,
203
+ } ) ;
208
204
}
209
205
210
206
Poll :: Pending
0 commit comments