Skip to content

Commit 4a605f8

Browse files
committed
⚡ perf: replace FuturesUnordered to JoinSet (#630)
1 parent 71751e5 commit 4a605f8

File tree

1 file changed

+5
-7
lines changed

1 file changed

+5
-7
lines changed

src/cache/redis.rs

+5-7
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ use super::{Cacher, error::CacheError};
55
use crate::models::aggregation::SearchResults;
66
use crate::parser::Config;
77
use error_stack::Report;
8-
use futures::stream::FuturesUnordered;
98
use rayon::iter::{IntoParallelRefIterator, ParallelIterator};
109
use redis::{AsyncCommands, Client, ExistenceCheck, SetExpiry, SetOptions, aio::ConnectionManager};
10+
use tokio::task::JoinSet;
1111

1212
/// A constant holding the redis pipeline size.
1313
const REDIS_PIPELINE_SIZE: usize = 3;
@@ -45,18 +45,16 @@ impl RedisCache {
4545
cache_ttl: u16,
4646
) -> Result<Self, Box<dyn std::error::Error>> {
4747
let client = Client::open(redis_connection_url)?;
48-
let tasks: FuturesUnordered<_> = FuturesUnordered::new();
48+
let mut tasks: JoinSet<_> = JoinSet::new();
4949

5050
for _ in 0..pool_size {
5151
let client_partially_cloned = client.clone();
52-
tasks.push(tokio::spawn(async move {
53-
client_partially_cloned.get_connection_manager().await
54-
}));
52+
tasks.spawn(async move { client_partially_cloned.get_connection_manager().await });
5553
}
5654

5755
let mut outputs = Vec::with_capacity(tasks.len());
58-
for task in tasks {
59-
outputs.push(task.await??);
56+
while let Some(task) = tasks.join_next().await {
57+
outputs.push(task??);
6058
}
6159

6260
let redis_cache = RedisCache {

0 commit comments

Comments
 (0)