Skip to content

Commit 8bb87dd

Browse files
committed
fix: chunk header requests
1 parent 76e3e9e commit 8bb87dd

File tree

1 file changed

+34
-31
lines changed

1 file changed

+34
-31
lines changed

script/src/util.rs

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
#![allow(dead_code)]
22
use crate::types::*;
33
use alloy::primitives::B256;
4-
use futures::{stream, StreamExt};
5-
use log::debug;
4+
use futures::stream::FuturesOrdered;
5+
use futures::StreamExt;
66
use reqwest::Client;
77
use sp1_blobstream_primitives::types::ProofInputs;
88
use std::sync::Arc;
@@ -34,7 +34,10 @@ impl Default for TendermintRPCClient {
3434
const DEFAULT_TENDERMINT_RPC_TIMEOUT_SECS: u64 = 20;
3535

3636
/// The default concurrency for Tendermint RPC requests.
37-
const DEFAULT_TENDERMINT_RPC_CONCURRENCY: usize = 100;
37+
const DEFAULT_TENDERMINT_RPC_CONCURRENCY: usize = 50;
38+
39+
/// The default sleep duration for Tendermint RPC requests in milliseconds.
40+
const DEFAULT_TENDERMINT_RPC_SLEEP_MS: u64 = 1250;
3841

3942
impl TendermintRPCClient {
4043
pub fn new(url: String) -> Self {
@@ -58,6 +61,7 @@ impl TendermintRPCClient {
5861
let (trusted_light_block, target_light_block) = self
5962
.get_light_blocks(trusted_block_height, target_block_height)
6063
.await;
64+
6165
let headers = self
6266
.get_headers_in_range(trusted_block_height + 1, target_block_height - 1)
6367
.await;
@@ -93,28 +97,6 @@ impl TendermintRPCClient {
9397
}
9498
}
9599

96-
/// Fetches all light blocks for the given range of block heights. Inclusive of start and end.
97-
pub async fn fetch_light_blocks_in_range(
98-
&self,
99-
start_height: u64,
100-
end_height: u64,
101-
) -> Vec<LightBlock> {
102-
let peer_id = self.fetch_peer_id().await.unwrap();
103-
debug!(
104-
"Fetching light blocks in range: {} to {}",
105-
start_height, end_height
106-
);
107-
108-
let blocks = stream::iter(start_height..=end_height)
109-
.map(|height| async move { self.fetch_light_block(height, peer_id).await.unwrap() })
110-
.buffered(DEFAULT_TENDERMINT_RPC_CONCURRENCY)
111-
.collect::<Vec<_>>()
112-
.await;
113-
114-
debug!("Finished fetching light blocks!");
115-
blocks
116-
}
117-
118100
/// Retrieves light blocks for the trusted and target block heights.
119101
pub async fn get_light_blocks(
120102
&self,
@@ -142,13 +124,33 @@ impl TendermintRPCClient {
142124

143125
/// Retrieves the headers for the given range of block heights. Inclusive of start and end.
144126
pub async fn get_headers_in_range(&self, start_height: u64, end_height: u64) -> Vec<Header> {
145-
let mut headers = Vec::new();
146-
let headers_stream = stream::iter(start_height..=end_height)
147-
.map(|height| async move { self.get_block(height).await.header })
148-
.buffered(DEFAULT_TENDERMINT_RPC_CONCURRENCY)
149-
.collect::<Vec<_>>()
127+
let mut headers = Vec::with_capacity((end_height - start_height) as usize);
128+
129+
let mut next_batch_start = start_height;
130+
while next_batch_start <= end_height {
131+
// Top of the range is non-inclusive so max out at `end_height + 1`.
132+
let batch_end = std::cmp::min(
133+
next_batch_start + DEFAULT_TENDERMINT_RPC_CONCURRENCY as u64,
134+
end_height + 1,
135+
);
136+
137+
// Chunk the range into batches of DEFAULT_TENDERMINT_RPC_CONCURRENCY.
138+
let batch_headers = (next_batch_start..batch_end)
139+
.map(|height| async move { self.get_block(height).await.header })
140+
.collect::<FuturesOrdered<_>>()
141+
.collect::<Vec<_>>()
142+
.await;
143+
144+
headers.extend(batch_headers);
145+
next_batch_start = batch_end;
146+
147+
// Sleep for 1.25 seconds to avoid rate limiting.
148+
tokio::time::sleep(std::time::Duration::from_millis(
149+
DEFAULT_TENDERMINT_RPC_SLEEP_MS,
150+
))
150151
.await;
151-
headers.extend(headers_stream);
152+
}
153+
152154
headers
153155
}
154156

@@ -254,6 +256,7 @@ impl TendermintRPCClient {
254256
.await?
255257
.json::<CommitResponse>()
256258
.await?;
259+
257260
Ok(response)
258261
}
259262

0 commit comments

Comments
 (0)