|
30 | 30 | cached::{Cached, SizedCache}, |
31 | 31 | ethcontract::BlockNumber, |
32 | 32 | ethrpc::block_stream::CurrentBlockWatcher, |
33 | | - futures::{FutureExt, StreamExt, TryStreamExt}, |
| 33 | + futures::{FutureExt, StreamExt}, |
34 | 34 | itertools::Itertools, |
35 | 35 | prometheus::IntCounterVec, |
36 | 36 | std::{ |
@@ -246,13 +246,15 @@ where |
246 | 246 | } |
247 | 247 |
|
248 | 248 | async fn fetch_inner_many(&self, keys: HashSet<K>, block: Block) -> Result<Vec<V>> { |
249 | | - let futures = keys.into_iter().map(|key| self.fetch_inner(key, block)); |
250 | | - // only process a limited number of requests in parallel to avoid creating |
251 | | - // a ton of tracing spans at the same time since tracing never deallocates |
252 | | - // span memory to reuse it later |
253 | | - let stream = futures::stream::iter(futures).buffered(50); |
254 | | - let fetched_items: Vec<_> = stream.try_collect().await?; |
255 | | - Ok(fetched_items.into_iter().flatten().collect()) |
| 249 | + let fetched = |
| 250 | + futures::future::join_all(keys.iter().map(|key| self.fetch_inner(key.clone(), block))) |
| 251 | + .await; |
| 252 | + let fetched: Vec<_> = fetched |
| 253 | + .into_iter() |
| 254 | + .filter_map(|res| res.ok()) |
| 255 | + .flatten() |
| 256 | + .collect(); |
| 257 | + Ok(fetched) |
256 | 258 | } |
257 | 259 |
|
258 | 260 | // Sometimes nodes requests error when we try to get state from what we think is |
|
0 commit comments