Skip to content

Commit 1495dea

Browse files
committed
use join_all instead of JoinSet for upkeep
1 parent 2aad553 commit 1495dea

File tree

1 file changed

+81
-118
lines changed

1 file changed

+81
-118
lines changed

src/store/inflight_activation.rs

+81-118
Original file line numberDiff line numberDiff line change
@@ -159,43 +159,33 @@ impl InflightActivationStore {
159159
}
160160

161161
pub async fn count_pending_activations(&self) -> Result<usize, Error> {
162-
Ok(self
163-
.shards
164-
.iter()
165-
.cloned()
166-
.map(|shard| async move { shard.count_pending_activations().await })
167-
.collect::<JoinSet<_>>()
168-
.join_all()
169-
.await
170-
.into_iter()
171-
.collect::<Result<Vec<_>, _>>()?
172-
.into_iter()
173-
.sum())
162+
Ok(join_all(
163+
self.shards
164+
.iter()
165+
.map(|shard| shard.count_pending_activations()),
166+
)
167+
.await
168+
.into_iter()
169+
.collect::<Result<Vec<_>, _>>()?
170+
.into_iter()
171+
.sum())
174172
}
175173

176174
pub async fn count_by_status(&self, status: InflightActivationStatus) -> Result<usize, Error> {
177-
Ok(self
178-
.shards
179-
.iter()
180-
.cloned()
181-
.map(|shard| async move { shard.count_by_status(status).await })
182-
.collect::<JoinSet<_>>()
183-
.join_all()
184-
.await
185-
.into_iter()
186-
.collect::<Result<Vec<_>, _>>()?
187-
.into_iter()
188-
.sum())
175+
Ok(join_all(
176+
self.shards
177+
.iter()
178+
.map(|shard| shard.count_by_status(status)),
179+
)
180+
.await
181+
.into_iter()
182+
.collect::<Result<Vec<_>, _>>()?
183+
.into_iter()
184+
.sum())
189185
}
190186

191187
pub async fn count(&self) -> Result<usize, Error> {
192-
Ok(self
193-
.shards
194-
.iter()
195-
.cloned()
196-
.map(|shard| async move { shard.count().await })
197-
.collect::<JoinSet<_>>()
198-
.join_all()
188+
Ok(join_all(self.shards.iter().map(|shard| shard.count()))
199189
.await
200190
.into_iter()
201191
.collect::<Result<Vec<_>, _>>()?
@@ -227,28 +217,21 @@ impl InflightActivationStore {
227217
}
228218

229219
pub async fn get_retry_activations(&self) -> Result<Vec<InflightActivation>, Error> {
230-
Ok(self
231-
.shards
232-
.iter()
233-
.cloned()
234-
.map(|shard| async move { shard.get_retry_activations().await })
235-
.collect::<JoinSet<_>>()
236-
.join_all()
237-
.await
238-
.into_iter()
239-
.collect::<Result<Vec<_>, _>>()?
240-
.into_iter()
241-
.flatten()
242-
.collect())
220+
Ok(join_all(
221+
self.shards
222+
.iter()
223+
.map(|shard| shard.get_retry_activations()),
224+
)
225+
.await
226+
.into_iter()
227+
.collect::<Result<Vec<_>, _>>()?
228+
.into_iter()
229+
.flatten()
230+
.collect())
243231
}
244232

245233
pub async fn clear(&self) -> Result<(), Error> {
246-
self.shards
247-
.iter()
248-
.cloned()
249-
.map(|shard| async move { shard.clear().await })
250-
.collect::<JoinSet<_>>()
251-
.join_all()
234+
join_all(self.shards.iter().map(|shard| shard.clear()))
252235
.await
253236
.into_iter()
254237
.collect::<Result<Vec<_>, _>>()?;
@@ -259,35 +242,31 @@ impl InflightActivationStore {
259242
/// Exceeding a processing deadline does not consume a retry as we don't know
260243
/// if a worker took the task and was killed, or failed.
261244
pub async fn handle_processing_deadline(&self) -> Result<u64, Error> {
262-
Ok(self
263-
.shards
264-
.iter()
265-
.cloned()
266-
.map(|shard| async move { shard.handle_processing_deadline().await })
267-
.collect::<JoinSet<_>>()
268-
.join_all()
269-
.await
270-
.into_iter()
271-
.collect::<Result<Vec<_>, _>>()?
272-
.into_iter()
273-
.sum())
245+
Ok(join_all(
246+
self.shards
247+
.iter()
248+
.map(|shard| shard.handle_processing_deadline()),
249+
)
250+
.await
251+
.into_iter()
252+
.collect::<Result<Vec<_>, _>>()?
253+
.into_iter()
254+
.sum())
274255
}
275256

276257
/// Update tasks that have exceeded their max processing attempts.
277258
/// These tasks are set to status=failure and will be handled by handle_failed_tasks accordingly.
278259
pub async fn handle_processing_attempts(&self) -> Result<u64, Error> {
279-
Ok(self
280-
.shards
281-
.iter()
282-
.cloned()
283-
.map(|shard| async move { shard.handle_processing_attempts().await })
284-
.collect::<JoinSet<_>>()
285-
.join_all()
286-
.await
287-
.into_iter()
288-
.collect::<Result<Vec<_>, _>>()?
289-
.into_iter()
290-
.sum())
260+
Ok(join_all(
261+
self.shards
262+
.iter()
263+
.map(|shard| shard.handle_processing_attempts()),
264+
)
265+
.await
266+
.into_iter()
267+
.collect::<Result<Vec<_>, _>>()?
268+
.into_iter()
269+
.sum())
291270
}
292271

293272
/// Perform upkeep work for tasks that are past expires_at deadlines
@@ -297,18 +276,14 @@ impl InflightActivationStore {
297276
///
298277
/// The number of impacted records is returned in a Result.
299278
pub async fn handle_expires_at(&self) -> Result<u64, Error> {
300-
Ok(self
301-
.shards
302-
.iter()
303-
.cloned()
304-
.map(|shard| async move { shard.handle_expires_at().await })
305-
.collect::<JoinSet<_>>()
306-
.join_all()
307-
.await
308-
.into_iter()
309-
.collect::<Result<Vec<_>, _>>()?
310-
.into_iter()
311-
.sum())
279+
Ok(
280+
join_all(self.shards.iter().map(|shard| shard.handle_expires_at()))
281+
.await
282+
.into_iter()
283+
.collect::<Result<Vec<_>, _>>()?
284+
.into_iter()
285+
.sum(),
286+
)
312287
}
313288

314289
/// Perform upkeep work related to status=failure
@@ -318,13 +293,7 @@ impl InflightActivationStore {
318293
/// Once dead-lettered tasks have been added to Kafka those tasks can have their status set to
319294
/// complete.
320295
pub async fn handle_failed_tasks(&self) -> Result<FailedTasksForwarder, Error> {
321-
let results = self
322-
.shards
323-
.iter()
324-
.cloned()
325-
.map(|shard| async move { shard.handle_failed_tasks().await })
326-
.collect::<JoinSet<_>>()
327-
.join_all()
296+
let results = join_all(self.shards.iter().map(|shard| shard.handle_failed_tasks()))
328297
.await
329298
.into_iter()
330299
.collect::<Result<Vec<_>, _>>()?;
@@ -350,36 +319,30 @@ impl InflightActivationStore {
350319
ids.into_iter()
351320
.for_each(|id| routed[self.route(&id)].push(id));
352321

353-
Ok(self
354-
.shards
355-
.iter()
356-
.cloned()
357-
.zip(routed.into_iter())
358-
.map(|(shard, ids)| async move { shard.mark_completed(ids).await })
359-
.collect::<JoinSet<_>>()
360-
.join_all()
361-
.await
362-
.into_iter()
363-
.collect::<Result<Vec<_>, _>>()?
364-
.into_iter()
365-
.sum())
322+
Ok(join_all(
323+
self.shards
324+
.iter()
325+
.zip(routed.into_iter())
326+
.map(|(shard, ids)| shard.mark_completed(ids)),
327+
)
328+
.await
329+
.into_iter()
330+
.collect::<Result<Vec<_>, _>>()?
331+
.into_iter()
332+
.sum())
366333
}
367334

368335
/// Remove completed tasks.
369336
/// This method is a garbage collector for the inflight task store.
370337
pub async fn remove_completed(&self) -> Result<u64, Error> {
371-
Ok(self
372-
.shards
373-
.iter()
374-
.cloned()
375-
.map(|shard| async move { shard.remove_completed().await })
376-
.collect::<JoinSet<_>>()
377-
.join_all()
378-
.await
379-
.into_iter()
380-
.collect::<Result<Vec<_>, _>>()?
381-
.into_iter()
382-
.sum())
338+
Ok(
339+
join_all(self.shards.iter().map(|shard| shard.remove_completed()))
340+
.await
341+
.into_iter()
342+
.collect::<Result<Vec<_>, _>>()?
343+
.into_iter()
344+
.sum(),
345+
)
383346
}
384347
}
385348

0 commit comments

Comments
 (0)