@@ -11,10 +11,12 @@ use smallvec::SmallVec;
11
11
use tokio_util:: time:: DelayQueue ;
12
12
use types:: EthSpec ;
13
13
14
+ use crate :: service:: BehaviourEvent ;
15
+
14
16
use super :: {
15
17
config:: OutboundRateLimiterConfig ,
16
18
rate_limiter:: { RPCRateLimiter as RateLimiter , RateLimitedErr } ,
17
- BehaviourAction , Protocol , RPCSend , ReqId , RequestType ,
19
+ BehaviourAction , Protocol , RPCMessage , RPCSend , ReqId , RequestType ,
18
20
} ;
19
21
20
22
/// A request that was rate limited or waiting on rate limited requests for the same peer and
@@ -34,7 +36,7 @@ pub(crate) struct SelfRateLimiter<Id: ReqId, E: EthSpec> {
34
36
/// Rate limiter for our own requests.
35
37
limiter : RateLimiter ,
36
38
/// Requests that are ready to be sent.
37
- ready_requests : SmallVec < [ BehaviourAction < Id , E > ; 3 ] > ,
39
+ ready_requests : SmallVec < [ ( PeerId , RPCSend < Id , E > ) ; 3 ] > ,
38
40
/// Slog logger.
39
41
log : Logger ,
40
42
}
@@ -71,7 +73,7 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
71
73
peer_id : PeerId ,
72
74
request_id : Id ,
73
75
req : RequestType < E > ,
74
- ) -> Result < BehaviourAction < Id , E > , Error > {
76
+ ) -> Result < RPCSend < Id , E > , Error > {
75
77
let protocol = req. versioned_protocol ( ) . protocol ( ) ;
76
78
// First check that there are not already other requests waiting to be sent.
77
79
if let Some ( queued_requests) = self . delayed_requests . get_mut ( & ( peer_id, protocol) ) {
@@ -103,13 +105,9 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
103
105
request_id : Id ,
104
106
req : RequestType < E > ,
105
107
log : & Logger ,
106
- ) -> Result < BehaviourAction < Id , E > , ( QueuedRequest < Id , E > , Duration ) > {
108
+ ) -> Result < RPCSend < Id , E > , ( QueuedRequest < Id , E > , Duration ) > {
107
109
match limiter. allows ( & peer_id, & req) {
108
- Ok ( ( ) ) => Ok ( BehaviourAction :: NotifyHandler {
109
- peer_id,
110
- handler : NotifyHandler :: Any ,
111
- event : RPCSend :: Request ( request_id, req) ,
112
- } ) ,
110
+ Ok ( ( ) ) => Ok ( RPCSend :: Request ( request_id, req) ) ,
113
111
Err ( e) => {
114
112
let protocol = req. versioned_protocol ( ) ;
115
113
match e {
@@ -121,11 +119,7 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
121
119
"Self rate limiting error for a batch that will never fit. Sending request anyway. Check configuration parameters." ;
122
120
"protocol" => %req. versioned_protocol( ) . protocol( )
123
121
) ;
124
- Ok ( BehaviourAction :: NotifyHandler {
125
- peer_id,
126
- handler : NotifyHandler :: Any ,
127
- event : RPCSend :: Request ( request_id, req) ,
128
- } )
122
+ Ok ( RPCSend :: Request ( request_id, req) )
129
123
}
130
124
RateLimitedErr :: TooSoon ( wait_time) => {
131
125
debug ! ( log, "Self rate limiting" ; "protocol" => %protocol. protocol( ) , "wait_time_ms" => wait_time. as_millis( ) , "peer_id" => %peer_id) ;
@@ -151,7 +145,7 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
151
145
// If one fails just wait for the next window that allows sending requests.
152
146
return ;
153
147
}
154
- Ok ( event) => self . ready_requests . push ( event) ,
148
+ Ok ( event) => self . ready_requests . push ( ( peer_id , event) ) ,
155
149
}
156
150
}
157
151
if queued_requests. is_empty ( ) {
@@ -198,8 +192,12 @@ impl<Id: ReqId, E: EthSpec> SelfRateLimiter<Id, E> {
198
192
let _ = self . limiter . poll_unpin ( cx) ;
199
193
200
194
// Finally return any queued events.
201
- if !self . ready_requests . is_empty ( ) {
202
- return Poll :: Ready ( self . ready_requests . remove ( 0 ) ) ;
195
+ if let Some ( ( peer_id, event) ) = self . ready_requests . pop ( ) {
196
+ return Poll :: Ready ( BehaviourAction :: NotifyHandler {
197
+ peer_id,
198
+ handler : NotifyHandler :: Any ,
199
+ event,
200
+ } ) ;
203
201
}
204
202
205
203
Poll :: Pending
0 commit comments