Skip to content

Commit 1324d3d

Browse files
authored
Delayed RPC Send Using Tokens (#5923)
closes #5785 The diagram below shows the differences in how the receiver (responder) behaves before and after this PR. The following sentences will detail the changes. ```mermaid flowchart TD subgraph "*** After ***" Start2([START]) --> AA[Receive request] AA --> COND1{Is there already an active request <br> with the same protocol?} COND1 --> |Yes| CC[Send error response] CC --> End2([END]) %% COND1 --> |No| COND2{Request is too large?} %% COND2 --> |Yes| CC COND1 --> |No| DD[Process request] DD --> EE{Rate limit reached?} EE --> |Yes| FF[Wait until tokens are regenerated] FF --> EE EE --> |No| GG[Send response] GG --> End2 end subgraph "*** Before ***" Start([START]) --> A[Receive request] A --> B{Rate limit reached <br> or <br> request is too large?} B -->|Yes| C[Send error response] C --> End([END]) B -->|No| E[Process request] E --> F[Send response] F --> End end ``` ### `Is there already an active request with the same protocol?` This check is not performed in `Before`. This is taken from the PR in the consensus-spec, which proposes updates regarding rate limiting and response timeout. https://github.com/ethereum/consensus-specs/pull/3767/files > The requester MUST NOT make more than two concurrent requests with the same ID. The PR mentions the requester side. In this PR, I introduced the `ActiveRequestsLimiter` for the `responder` side to restrict more than two requests from running simultaneously on the same protocol per peer. If the limiter disallows a request, the responder sends a rate-limited error and penalizes the requester. ### `Rate limit reached?` and `Wait until tokens are regenerated` UPDATE: I moved the limiter logic to the behaviour side. #5923 (comment) ~~The rate limiter is shared between the behaviour and the handler. (`Arc<Mutex<RateLimiter>>>`) The handler checks the rate limit and queues the response if the limit is reached. The behaviour handles pruning.~~ ~~I considered not sharing the rate limiter between the behaviour and the handler, and performing all of these either within the behaviour or handler. However, I decided against this for the following reasons:~~ - ~~Regarding performing everything within the behaviour: The behaviour is unable to recognize the response protocol when `RPC::send_response()` is called, especially when the response is `RPCCodedResponse::Error`. Therefore, the behaviour can't rate limit responses based on the response protocol.~~ - ~~Regarding performing everything within the handler: When multiple connections are established with a peer, there could be multiple handlers interacting with that peer. Thus, we cannot enforce rate limiting per peer solely within the handler. (Any ideas? 🤔 )~~
1 parent 402a81c commit 1324d3d

File tree

9 files changed

+976
-163
lines changed

9 files changed

+976
-163
lines changed

beacon_node/lighthouse_network/src/metrics.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,20 @@ pub static REPORT_PEER_MSGS: LazyLock<Result<IntCounterVec>> = LazyLock::new(||
206206
)
207207
});
208208

209+
pub static OUTBOUND_REQUEST_IDLING: LazyLock<Result<Histogram>> = LazyLock::new(|| {
210+
try_create_histogram(
211+
"outbound_request_idling_seconds",
212+
"The time our own request remained idle in the self-limiter",
213+
)
214+
});
215+
216+
pub static RESPONSE_IDLING: LazyLock<Result<Histogram>> = LazyLock::new(|| {
217+
try_create_histogram(
218+
"response_idling_seconds",
219+
"The time our response remained idle in the response limiter",
220+
)
221+
});
222+
209223
pub fn scrape_discovery_metrics() {
210224
let metrics =
211225
discv5::metrics::Metrics::from(discv5::Discv5::<discv5::DefaultProtocolId>::raw_metrics());

beacon_node/lighthouse_network/src/rpc/handler.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ where
141141
/// Waker, to be sure the handler gets polled when needed.
142142
waker: Option<std::task::Waker>,
143143

144-
/// Timeout that will me used for inbound and outbound responses.
144+
/// Timeout that will be used for inbound and outbound responses.
145145
resp_timeout: Duration,
146146
}
147147

@@ -314,6 +314,7 @@ where
314314
}
315315
return;
316316
};
317+
317318
// If the response we are sending is an error, report back for handling
318319
if let RpcResponse::Error(ref code, ref reason) = response {
319320
self.events_out.push(HandlerEvent::Err(HandlerErr::Inbound {
@@ -331,6 +332,7 @@ where
331332
"Response not sent. Deactivated handler");
332333
return;
333334
}
335+
334336
inbound_info.pending_items.push_back(response);
335337
}
336338
}

beacon_node/lighthouse_network/src/rpc/methods.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,20 @@ pub enum ResponseTermination {
606606
LightClientUpdatesByRange,
607607
}
608608

609+
impl ResponseTermination {
610+
pub fn as_protocol(&self) -> Protocol {
611+
match self {
612+
ResponseTermination::BlocksByRange => Protocol::BlocksByRange,
613+
ResponseTermination::BlocksByRoot => Protocol::BlocksByRoot,
614+
ResponseTermination::BlobsByRange => Protocol::BlobsByRange,
615+
ResponseTermination::BlobsByRoot => Protocol::BlobsByRoot,
616+
ResponseTermination::DataColumnsByRoot => Protocol::DataColumnsByRoot,
617+
ResponseTermination::DataColumnsByRange => Protocol::DataColumnsByRange,
618+
ResponseTermination::LightClientUpdatesByRange => Protocol::LightClientUpdatesByRange,
619+
}
620+
}
621+
}
622+
609623
/// The structured response containing a result/code indicating success or failure
610624
/// and the contents of the response
611625
#[derive(Debug, Clone)]

0 commit comments

Comments
 (0)