-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathbatched.py
More file actions
37 lines (32 loc) · 1.34 KB
/
Copy pathbatched.py
File metadata and controls
37 lines (32 loc) · 1.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
from concurrent.futures import Future
def batched_futures(
lst: list[Future], nested_skip_lst: list[Future[list]], n: int
) -> list[list] | BaseException:
"""
Batch n completed future objects. If the number of completed futures is smaller than n and the end of the batch is
not reached yet, then an empty list is returned. If n future objects are done, which are not included in the skip_set
then they are returned as batch.
Args:
lst (list): list of all future objects
nested_skip_lst (list): nest list of individual results already assigned to previous batches
n (int): batch size
Returns:
list: results of the batched futures
"""
skip_set = {id(item) for f in nested_skip_lst for item in f.result()}
done_lst = []
failed_lst = []
n_expected = min(n, len(lst) - len(skip_set))
for v in lst:
if v.done():
excp = v.exception()
if excp is not None:
failed_lst.append(excp)
elif id(v.result()) not in skip_set:
done_lst.append(v.result())
if len(done_lst) == n_expected:
return done_lst
if len(failed_lst) == len(lst) - len(skip_set) and len(failed_lst) > 0:
return failed_lst[0] # raise the exception only after all futures have failed
else:
return []