From 1c7ee9c064d1b7ff7a4fb5c47b4e38c429594d9a Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 11 Jun 2026 08:02:11 +0200 Subject: [PATCH 01/14] [Test] Add duplicated test for batched --- tests/unit/standalone/test_batched.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/tests/unit/standalone/test_batched.py b/tests/unit/standalone/test_batched.py index 31e3d578..5eddcdab 100644 --- a/tests/unit/standalone/test_batched.py +++ b/tests/unit/standalone/test_batched.py @@ -6,7 +6,7 @@ class TestBatched(TestCase): def test_batched_futures(self): lst = [] - for i in list(range(10)): + for i in range(10): f = Future() f.set_result(i) lst.append(f) @@ -19,6 +19,21 @@ def test_batched_futures(self): self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:2], n=3), [6, 7, 8]) self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst, n=3), [9]) + def test_batched_futures_duplicated(self): + lst = [] + for i in range(1,4): + for _ in range(3): + f = Future() + f.set_result(i) + lst.append(f) + batched_lst = [Future(), Future(), Future()] + batched_lst[0].set_result([1, 1, 1]) + batched_lst[1].set_result([2, 2, 2]) + batched_lst[2].set_result([3, 3, 3]) + self.assertEqual(batched_futures(lst=lst, n=3, nested_skip_lst=set()), [1, 1, 1]) + self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:1], n=3), [2, 2, 2]) + self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:2], n=3), [3, 3, 3]) + def test_batched_futures_not_finished(self): lst = [] for _ in list(range(10)): From dbb62f8d999a3cb98af25ec1853bcb69a9155c3f Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 11 Jun 2026 08:51:39 +0200 Subject: [PATCH 02/14] Handling exceptions --- src/executorlib/standalone/batched.py | 29 ------------- .../task_scheduler/interactive/dependency.py | 34 +++++++++++++++ tests/unit/standalone/test_batched.py | 42 ------------------- .../interactive/test_dependency.py | 40 ++++++++++++++++++ 4 files changed, 74 insertions(+), 71 deletions(-) delete mode 100644 src/executorlib/standalone/batched.py delete mode 100644 tests/unit/standalone/test_batched.py diff --git a/src/executorlib/standalone/batched.py b/src/executorlib/standalone/batched.py deleted file mode 100644 index 3c44c0c7..00000000 --- a/src/executorlib/standalone/batched.py +++ /dev/null @@ -1,29 +0,0 @@ -from concurrent.futures import Future - - -def batched_futures( - lst: list[Future], nested_skip_lst: list[Future[list]], n: int -) -> list[list]: - """ - Batch n completed future objects. If the number of completed futures is smaller than n and the end of the batch is - not reached yet, then an empty list is returned. If n future objects are done, which are not included in the skip_set - then they are returned as batch. - - Args: - lst (list): list of all future objects - nested_skip_lst (list): nest list of individual results already assigned to previous batches - n (int): batch size - - Returns: - list: results of the batched futures - """ - skip_set = {id(item) for f in nested_skip_lst for item in f.result()} - - done_lst = [] - n_expected = min(n, len(lst) - len(skip_set)) - for v in lst: - if v.done() and id(v.result()) not in skip_set: - done_lst.append(v.result()) - if len(done_lst) == n_expected: - return done_lst - return [] diff --git a/src/executorlib/task_scheduler/interactive/dependency.py b/src/executorlib/task_scheduler/interactive/dependency.py index 349b3f1c..216535e2 100644 --- a/src/executorlib/task_scheduler/interactive/dependency.py +++ b/src/executorlib/task_scheduler/interactive/dependency.py @@ -357,3 +357,37 @@ def _update_waiting_task( if len(wait_lst) == len(wait_tmp_lst): sleep(refresh_rate) return wait_tmp_lst + + +def batched_futures( + lst: list[Future], nested_skip_lst: list[Future[list]], n: int +) -> list[list]: + """ + Batch n completed future objects. If the number of completed futures is smaller than n and the end of the batch is + not reached yet, then an empty list is returned. If n future objects are done, which are not included in the skip_set + then they are returned as batch. + + Args: + lst (list): list of all future objects + nested_skip_lst (list): nest list of individual results already assigned to previous batches + n (int): batch size + + Returns: + list: results of the batched futures + """ + skip_set = {id(item) for f in nested_skip_lst for item in f.result()} + + done_lst = [] + failed_lst = [] + n_expected = min(n, len(lst) - len(skip_set)) + for v in lst: + if v.done(): + if check_exception_was_raised(future_obj=v): + failed_lst.append(v) + elif id(v.result()) not in skip_set: + done_lst.append(v.result()) + if len(done_lst) == n_expected: + return done_lst + if len(failed_lst) == n_expected: + return failed_lst + return [] diff --git a/tests/unit/standalone/test_batched.py b/tests/unit/standalone/test_batched.py deleted file mode 100644 index 5eddcdab..00000000 --- a/tests/unit/standalone/test_batched.py +++ /dev/null @@ -1,42 +0,0 @@ -from unittest import TestCase -from concurrent.futures import Future -from executorlib.standalone.batched import batched_futures - - -class TestBatched(TestCase): - def test_batched_futures(self): - lst = [] - for i in range(10): - f = Future() - f.set_result(i) - lst.append(f) - batched_lst = [Future(), Future(), Future()] - batched_lst[0].set_result([0, 1, 2]) - batched_lst[1].set_result([3, 4, 5]) - batched_lst[2].set_result([6, 7, 8]) - self.assertEqual(batched_futures(lst=lst, n=3, nested_skip_lst=set()), [0, 1, 2]) - self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:1], n=3), [3, 4, 5]) - self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:2], n=3), [6, 7, 8]) - self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst, n=3), [9]) - - def test_batched_futures_duplicated(self): - lst = [] - for i in range(1,4): - for _ in range(3): - f = Future() - f.set_result(i) - lst.append(f) - batched_lst = [Future(), Future(), Future()] - batched_lst[0].set_result([1, 1, 1]) - batched_lst[1].set_result([2, 2, 2]) - batched_lst[2].set_result([3, 3, 3]) - self.assertEqual(batched_futures(lst=lst, n=3, nested_skip_lst=set()), [1, 1, 1]) - self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:1], n=3), [2, 2, 2]) - self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:2], n=3), [3, 3, 3]) - - def test_batched_futures_not_finished(self): - lst = [] - for _ in list(range(10)): - f = Future() - lst.append(f) - self.assertEqual(batched_futures(lst=lst, n=3, nested_skip_lst=set()), []) diff --git a/tests/unit/task_scheduler/interactive/test_dependency.py b/tests/unit/task_scheduler/interactive/test_dependency.py index 74d48d2b..a4cda4cc 100644 --- a/tests/unit/task_scheduler/interactive/test_dependency.py +++ b/tests/unit/task_scheduler/interactive/test_dependency.py @@ -6,6 +6,7 @@ import numpy as np from executorlib.task_scheduler.interactive.blockallocation import BlockAllocationTaskScheduler +from executorlib.task_scheduler.interactive.dependency import batched_futures from executorlib.standalone.interactive.spawner import MpiExecSpawner @@ -141,3 +142,42 @@ def finished(self, future): foo.running, msg="After task completion, we expect the callback to modify the class", ) + + +class TestBatched(unittest.TestCase): + def test_batched_futures(self): + lst = [] + for i in range(10): + f = Future() + f.set_result(i) + lst.append(f) + batched_lst = [Future(), Future(), Future()] + batched_lst[0].set_result([0, 1, 2]) + batched_lst[1].set_result([3, 4, 5]) + batched_lst[2].set_result([6, 7, 8]) + self.assertEqual(batched_futures(lst=lst, n=3, nested_skip_lst=set()), [0, 1, 2]) + self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:1], n=3), [3, 4, 5]) + self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:2], n=3), [6, 7, 8]) + self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst, n=3), [9]) + + def test_batched_futures_duplicated(self): + lst = [] + for i in range(1,4): + for _ in range(3): + f = Future() + f.set_result(i) + lst.append(f) + batched_lst = [Future(), Future(), Future()] + batched_lst[0].set_result([1, 1, 1]) + batched_lst[1].set_result([2, 2, 2]) + batched_lst[2].set_result([3, 3, 3]) + self.assertEqual(batched_futures(lst=lst, n=3, nested_skip_lst=set()), [1, 1, 1]) + self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:1], n=3), [2, 2, 2]) + self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:2], n=3), [3, 3, 3]) + + def test_batched_futures_not_finished(self): + lst = [] + for _ in list(range(10)): + f = Future() + lst.append(f) + self.assertEqual(batched_futures(lst=lst, n=3, nested_skip_lst=set()), []) From bdf197ee2898c5d1ba29f9b7b0683e0538ffb146 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 11 Jun 2026 09:11:17 +0200 Subject: [PATCH 03/14] revert refactor --- src/executorlib/standalone/batched.py | 37 ++++++++++++++++ .../task_scheduler/interactive/dependency.py | 40 ++--------------- tests/unit/standalone/test_batched.py | 43 +++++++++++++++++++ .../interactive/test_dependency.py | 40 ----------------- 4 files changed, 84 insertions(+), 76 deletions(-) create mode 100644 src/executorlib/standalone/batched.py create mode 100644 tests/unit/standalone/test_batched.py diff --git a/src/executorlib/standalone/batched.py b/src/executorlib/standalone/batched.py new file mode 100644 index 00000000..7a4bee25 --- /dev/null +++ b/src/executorlib/standalone/batched.py @@ -0,0 +1,37 @@ +from concurrent.futures import Future + + +def batched_futures( + lst: list[Future], nested_skip_lst: list[Future[list]], n: int +) -> list[list]: + """ + Batch n completed future objects. If the number of completed futures is smaller than n and the end of the batch is + not reached yet, then an empty list is returned. If n future objects are done, which are not included in the skip_set + then they are returned as batch. + + Args: + lst (list): list of all future objects + nested_skip_lst (list): nest list of individual results already assigned to previous batches + n (int): batch size + + Returns: + list: results of the batched futures + """ + skip_set = {id(item) for f in nested_skip_lst for item in f.result()} + + done_lst = [] + failed_lst = [] + n_expected = min(n, len(lst) - len(skip_set)) + for v in lst: + if v.done(): + excp = v.exception() + if excp is not None: + failed_lst.append(excp) + elif id(v.result()) not in skip_set: + done_lst.append(v.result()) + if len(done_lst) == n_expected: + return done_lst + if len(failed_lst) == len(lst) and len(failed_lst) > 0: + return failed_lst[0] # raise the exception only after all futures have failed + else: + return [] diff --git a/src/executorlib/task_scheduler/interactive/dependency.py b/src/executorlib/task_scheduler/interactive/dependency.py index 216535e2..d84606a9 100644 --- a/src/executorlib/task_scheduler/interactive/dependency.py +++ b/src/executorlib/task_scheduler/interactive/dependency.py @@ -348,46 +348,14 @@ def _update_waiting_task( n=task_wait_dict["kwargs"]["n"], nested_skip_lst=task_wait_dict["kwargs"]["skip_lst"], ) - if len(done_lst) == 0: + if isinstance(done_lst, list) and len(done_lst) == 0: wait_tmp_lst.append(task_wait_dict) - else: + elif not isinstance(done_lst, list): task_wait_dict["future"].set_result(done_lst) + else: + task_wait_dict["future"].set_exception(done_lst) else: wait_tmp_lst.append(task_wait_dict) if len(wait_lst) == len(wait_tmp_lst): sleep(refresh_rate) return wait_tmp_lst - - -def batched_futures( - lst: list[Future], nested_skip_lst: list[Future[list]], n: int -) -> list[list]: - """ - Batch n completed future objects. If the number of completed futures is smaller than n and the end of the batch is - not reached yet, then an empty list is returned. If n future objects are done, which are not included in the skip_set - then they are returned as batch. - - Args: - lst (list): list of all future objects - nested_skip_lst (list): nest list of individual results already assigned to previous batches - n (int): batch size - - Returns: - list: results of the batched futures - """ - skip_set = {id(item) for f in nested_skip_lst for item in f.result()} - - done_lst = [] - failed_lst = [] - n_expected = min(n, len(lst) - len(skip_set)) - for v in lst: - if v.done(): - if check_exception_was_raised(future_obj=v): - failed_lst.append(v) - elif id(v.result()) not in skip_set: - done_lst.append(v.result()) - if len(done_lst) == n_expected: - return done_lst - if len(failed_lst) == n_expected: - return failed_lst - return [] diff --git a/tests/unit/standalone/test_batched.py b/tests/unit/standalone/test_batched.py new file mode 100644 index 00000000..2fcec00a --- /dev/null +++ b/tests/unit/standalone/test_batched.py @@ -0,0 +1,43 @@ +import unittest +from concurrent.futures import Future + +from executorlib.task_scheduler.interactive.dependency import batched_futures + + +class TestBatched(unittest.TestCase): + def test_batched_futures(self): + lst = [] + for i in range(10): + f = Future() + f.set_result(i) + lst.append(f) + batched_lst = [Future(), Future(), Future()] + batched_lst[0].set_result([0, 1, 2]) + batched_lst[1].set_result([3, 4, 5]) + batched_lst[2].set_result([6, 7, 8]) + self.assertEqual(batched_futures(lst=lst, n=3, nested_skip_lst=set()), [0, 1, 2]) + self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:1], n=3), [3, 4, 5]) + self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:2], n=3), [6, 7, 8]) + self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst, n=3), [9]) + + def test_batched_futures_duplicated(self): + lst = [] + for i in range(1,4): + for _ in range(3): + f = Future() + f.set_result(i) + lst.append(f) + batched_lst = [Future(), Future(), Future()] + batched_lst[0].set_result([1, 1, 1]) + batched_lst[1].set_result([2, 2, 2]) + batched_lst[2].set_result([3, 3, 3]) + self.assertEqual(batched_futures(lst=lst, n=3, nested_skip_lst=set()), [1, 1, 1]) + self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:1], n=3), [2, 2, 2]) + self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:2], n=3), [3, 3, 3]) + + def test_batched_futures_not_finished(self): + lst = [] + for _ in list(range(10)): + f = Future() + lst.append(f) + self.assertEqual(batched_futures(lst=lst, n=3, nested_skip_lst=set()), []) diff --git a/tests/unit/task_scheduler/interactive/test_dependency.py b/tests/unit/task_scheduler/interactive/test_dependency.py index a4cda4cc..74d48d2b 100644 --- a/tests/unit/task_scheduler/interactive/test_dependency.py +++ b/tests/unit/task_scheduler/interactive/test_dependency.py @@ -6,7 +6,6 @@ import numpy as np from executorlib.task_scheduler.interactive.blockallocation import BlockAllocationTaskScheduler -from executorlib.task_scheduler.interactive.dependency import batched_futures from executorlib.standalone.interactive.spawner import MpiExecSpawner @@ -142,42 +141,3 @@ def finished(self, future): foo.running, msg="After task completion, we expect the callback to modify the class", ) - - -class TestBatched(unittest.TestCase): - def test_batched_futures(self): - lst = [] - for i in range(10): - f = Future() - f.set_result(i) - lst.append(f) - batched_lst = [Future(), Future(), Future()] - batched_lst[0].set_result([0, 1, 2]) - batched_lst[1].set_result([3, 4, 5]) - batched_lst[2].set_result([6, 7, 8]) - self.assertEqual(batched_futures(lst=lst, n=3, nested_skip_lst=set()), [0, 1, 2]) - self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:1], n=3), [3, 4, 5]) - self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:2], n=3), [6, 7, 8]) - self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst, n=3), [9]) - - def test_batched_futures_duplicated(self): - lst = [] - for i in range(1,4): - for _ in range(3): - f = Future() - f.set_result(i) - lst.append(f) - batched_lst = [Future(), Future(), Future()] - batched_lst[0].set_result([1, 1, 1]) - batched_lst[1].set_result([2, 2, 2]) - batched_lst[2].set_result([3, 3, 3]) - self.assertEqual(batched_futures(lst=lst, n=3, nested_skip_lst=set()), [1, 1, 1]) - self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:1], n=3), [2, 2, 2]) - self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:2], n=3), [3, 3, 3]) - - def test_batched_futures_not_finished(self): - lst = [] - for _ in list(range(10)): - f = Future() - lst.append(f) - self.assertEqual(batched_futures(lst=lst, n=3, nested_skip_lst=set()), []) From 87d38caf66294ac2279467421820682e2b23a032 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 11 Jun 2026 09:13:10 +0200 Subject: [PATCH 04/14] Raise an exception when futures failed --- src/executorlib/standalone/batched.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/executorlib/standalone/batched.py b/src/executorlib/standalone/batched.py index 7a4bee25..08dc90f7 100644 --- a/src/executorlib/standalone/batched.py +++ b/src/executorlib/standalone/batched.py @@ -3,7 +3,7 @@ def batched_futures( lst: list[Future], nested_skip_lst: list[Future[list]], n: int -) -> list[list]: +) -> list[list] | BaseException: """ Batch n completed future objects. If the number of completed futures is smaller than n and the end of the batch is not reached yet, then an empty list is returned. If n future objects are done, which are not included in the skip_set From e06eb5552fdd0605b9d12f5bd16874bb8a660fdf Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 11 Jun 2026 09:36:22 +0200 Subject: [PATCH 05/14] fix --- src/executorlib/task_scheduler/interactive/dependency.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/executorlib/task_scheduler/interactive/dependency.py b/src/executorlib/task_scheduler/interactive/dependency.py index d84606a9..ef3d8118 100644 --- a/src/executorlib/task_scheduler/interactive/dependency.py +++ b/src/executorlib/task_scheduler/interactive/dependency.py @@ -350,7 +350,7 @@ def _update_waiting_task( ) if isinstance(done_lst, list) and len(done_lst) == 0: wait_tmp_lst.append(task_wait_dict) - elif not isinstance(done_lst, list): + elif isinstance(done_lst, list) and len(done_lst) > 0: task_wait_dict["future"].set_result(done_lst) else: task_wait_dict["future"].set_exception(done_lst) From 2225400b851521019e8e7d2a0b8a1a6ddf0cf6f2 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 11 Jun 2026 09:41:29 +0200 Subject: [PATCH 06/14] add unit test for failing futures --- tests/unit/standalone/test_batched.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/tests/unit/standalone/test_batched.py b/tests/unit/standalone/test_batched.py index 2fcec00a..e2872224 100644 --- a/tests/unit/standalone/test_batched.py +++ b/tests/unit/standalone/test_batched.py @@ -35,6 +35,23 @@ def test_batched_futures_duplicated(self): self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:1], n=3), [2, 2, 2]) self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:2], n=3), [3, 3, 3]) + def test_batched_futures(self): + lst = [] + for i in range(10): + f = Future() + if i % 3 == 0: + f.set_exception(ValueError(f"Error for {i}")) + else: + f.set_result(i) + lst.append(f) + batched_lst = [Future(), Future()] + batched_lst[0].set_result([1, 2, 4]) + batched_lst[1].set_result([5, 7, 8]) + self.assertEqual(batched_futures(lst=lst, n=3, nested_skip_lst=set()), [1, 2, 4]) + self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:1], n=3), [5, 7, 8]) + with self.assertRaises(ValueError): + batched_futures(lst=lst, nested_skip_lst=batched_lst, n=3) + def test_batched_futures_not_finished(self): lst = [] for _ in list(range(10)): From d9604072bc3cfa9c4b1f0b435ff276e07a790402 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 11 Jun 2026 09:44:41 +0200 Subject: [PATCH 07/14] compare to valueerror --- tests/unit/standalone/test_batched.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/unit/standalone/test_batched.py b/tests/unit/standalone/test_batched.py index e2872224..9b9f11b1 100644 --- a/tests/unit/standalone/test_batched.py +++ b/tests/unit/standalone/test_batched.py @@ -49,8 +49,7 @@ def test_batched_futures(self): batched_lst[1].set_result([5, 7, 8]) self.assertEqual(batched_futures(lst=lst, n=3, nested_skip_lst=set()), [1, 2, 4]) self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:1], n=3), [5, 7, 8]) - with self.assertRaises(ValueError): - batched_futures(lst=lst, nested_skip_lst=batched_lst, n=3) + self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst, n=3), ValueError("Error for 0")) def test_batched_futures_not_finished(self): lst = [] From a1cd6ceb244aab6baeb57f42e3bc90ed7a415884 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 11 Jun 2026 09:51:49 +0200 Subject: [PATCH 08/14] fixes --- src/executorlib/standalone/batched.py | 2 +- tests/unit/standalone/test_batched.py | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/executorlib/standalone/batched.py b/src/executorlib/standalone/batched.py index 08dc90f7..575ccc5c 100644 --- a/src/executorlib/standalone/batched.py +++ b/src/executorlib/standalone/batched.py @@ -31,7 +31,7 @@ def batched_futures( done_lst.append(v.result()) if len(done_lst) == n_expected: return done_lst - if len(failed_lst) == len(lst) and len(failed_lst) > 0: + if len(failed_lst) == len(lst) - len(skip_set) and len(failed_lst) > 0: return failed_lst[0] # raise the exception only after all futures have failed else: return [] diff --git a/tests/unit/standalone/test_batched.py b/tests/unit/standalone/test_batched.py index 9b9f11b1..fb0297f4 100644 --- a/tests/unit/standalone/test_batched.py +++ b/tests/unit/standalone/test_batched.py @@ -49,7 +49,8 @@ def test_batched_futures(self): batched_lst[1].set_result([5, 7, 8]) self.assertEqual(batched_futures(lst=lst, n=3, nested_skip_lst=set()), [1, 2, 4]) self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:1], n=3), [5, 7, 8]) - self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst, n=3), ValueError("Error for 0")) + with self.assertRaises(ValueError): + raise batched_futures(lst=lst, nested_skip_lst=batched_lst, n=3) def test_batched_futures_not_finished(self): lst = [] From 43407f6effcffcda69c6afbc147cd90e547e14ee Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 11 Jun 2026 10:02:53 +0200 Subject: [PATCH 09/14] two futures worked two failed --- .../unit/executor/test_single_dependencies.py | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/tests/unit/executor/test_single_dependencies.py b/tests/unit/executor/test_single_dependencies.py index 98e1b14c..07b20e74 100644 --- a/tests/unit/executor/test_single_dependencies.py +++ b/tests/unit/executor/test_single_dependencies.py @@ -82,6 +82,31 @@ def test_batched(self): self.assertEqual(len(result_lst), 4) self.assertTrue(t3-t2 > t2-t1) + def test_batched_error_future(self): + with SingleNodeExecutor() as exe: + t1 = time() + future_first_lst = [] + for i in range(10): + if i % 3 == 0: + future_first_lst.append(exe.submit(raise_error, parameter=0)) + else: + future_first_lst.append(exe.submit(return_input_dict, i)) + future_second_lst = exe.batched(future_first_lst, n=3) + + future_third_lst = [] + for f in future_second_lst: + future_third_lst.append(exe.submit(sum, f)) + + t2 = time() + self.assertEqual(future_third_lst[0].result(), 7) + self.assertEqual(future_third_lst[1].result(), 20) + with self.assertRaises(RuntimeError): + future_third_lst[2].result() + with self.assertRaises(RuntimeError): + future_third_lst[3].result() + t3 = time() + self.assertTrue(t3-t2 > t2-t1) + def test_batched_error(self): with self.assertRaises(TypeError): with SingleNodeExecutor() as exe: From ea56fbf563b9944904a6243056e596bb52a4d2a4 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 11 Jun 2026 10:05:59 +0200 Subject: [PATCH 10/14] order is not guranteed --- tests/unit/executor/test_single_dependencies.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/unit/executor/test_single_dependencies.py b/tests/unit/executor/test_single_dependencies.py index 07b20e74..83de011a 100644 --- a/tests/unit/executor/test_single_dependencies.py +++ b/tests/unit/executor/test_single_dependencies.py @@ -98,8 +98,7 @@ def test_batched_error_future(self): future_third_lst.append(exe.submit(sum, f)) t2 = time() - self.assertEqual(future_third_lst[0].result(), 7) - self.assertEqual(future_third_lst[1].result(), 20) + self.assertEqual(future_third_lst[0].result() + future_third_lst[1].result(), 27) with self.assertRaises(RuntimeError): future_third_lst[2].result() with self.assertRaises(RuntimeError): From 2b254c9e8091146bedf9b6f20c6435bf50e81339 Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 11 Jun 2026 12:02:54 +0200 Subject: [PATCH 11/14] Keep track of the futures --- src/executorlib/standalone/batched.py | 25 ++++---- .../task_scheduler/interactive/dependency.py | 18 +++--- tests/unit/standalone/test_batched.py | 60 +++++++++++++------ 3 files changed, 64 insertions(+), 39 deletions(-) diff --git a/src/executorlib/standalone/batched.py b/src/executorlib/standalone/batched.py index 575ccc5c..8b21b599 100644 --- a/src/executorlib/standalone/batched.py +++ b/src/executorlib/standalone/batched.py @@ -3,7 +3,7 @@ def batched_futures( lst: list[Future], nested_skip_lst: list[Future[list]], n: int -) -> list[list] | BaseException: +) -> tuple[bool, list[Future]]: """ Batch n completed future objects. If the number of completed futures is smaller than n and the end of the batch is not reached yet, then an empty list is returned. If n future objects are done, which are not included in the skip_set @@ -11,27 +11,26 @@ def batched_futures( Args: lst (list): list of all future objects - nested_skip_lst (list): nest list of individual results already assigned to previous batches + nested_skip_lst (list): list of future objects, which contain the list of future objects ids which should be skipped for the batch n (int): batch size Returns: list: results of the batched futures """ - skip_set = {id(item) for f in nested_skip_lst for item in f.result()} + skip_set = {fid for f in nested_skip_lst for fid in f.result()} done_lst = [] failed_lst = [] n_expected = min(n, len(lst) - len(skip_set)) for v in lst: - if v.done(): - excp = v.exception() - if excp is not None: - failed_lst.append(excp) - elif id(v.result()) not in skip_set: - done_lst.append(v.result()) + if id(v) not in skip_set and v.done(): + if v.exception() is not None: + failed_lst.append(v) + elif id(v) not in skip_set and v.done(): + done_lst.append(v) if len(done_lst) == n_expected: - return done_lst - if len(failed_lst) == len(lst) - len(skip_set) and len(failed_lst) > 0: - return failed_lst[0] # raise the exception only after all futures have failed + return True, done_lst + if (len(lst) - len(skip_set)) == len(failed_lst): + return False, failed_lst[:n_expected] # raise the exception only after all futures have failed else: - return [] + return True, [] diff --git a/src/executorlib/task_scheduler/interactive/dependency.py b/src/executorlib/task_scheduler/interactive/dependency.py index ef3d8118..afaa3810 100644 --- a/src/executorlib/task_scheduler/interactive/dependency.py +++ b/src/executorlib/task_scheduler/interactive/dependency.py @@ -177,6 +177,7 @@ def batched( future_lst: list[Future] = [] for _ in range(len(iterable) // n + (1 if len(iterable) % n > 0 else 0)): f: Future = Future() + f_skip = Future() if self._future_queue is not None: self._future_queue.put( { @@ -184,10 +185,11 @@ def batched( "args": (), "kwargs": {"lst": iterable, "n": n, "skip_lst": skip_lst}, "future": f, + "future_skip": f_skip, "resource_dict": {}, } ) - skip_lst = skip_lst.copy() + [f] # be careful + skip_lst = skip_lst.copy() + [f_skip] # be careful future_lst.append(f) return future_lst @@ -330,7 +332,7 @@ def _update_waiting_task( wait_tmp_lst = [] for task_wait_dict in wait_lst: exception_lst = get_exception_lst(future_lst=task_wait_dict["future_lst"]) - if len(exception_lst) > 0: + if len(exception_lst) > 0 and task_wait_dict["fn"] != "batched": task_wait_dict["future"].set_exception(exception_lst[0]) elif task_wait_dict["fn"] != "batched" and all( future.done() for future in task_wait_dict["future_lst"] @@ -343,17 +345,19 @@ def _update_waiting_task( elif task_wait_dict["fn"] == "batched" and all( future.done() for future in task_wait_dict["kwargs"]["skip_lst"] ): - done_lst = batched_futures( + success, done_lst = batched_futures( lst=task_wait_dict["kwargs"]["lst"], n=task_wait_dict["kwargs"]["n"], nested_skip_lst=task_wait_dict["kwargs"]["skip_lst"], ) - if isinstance(done_lst, list) and len(done_lst) == 0: + if success and len(done_lst) == 0: wait_tmp_lst.append(task_wait_dict) - elif isinstance(done_lst, list) and len(done_lst) > 0: - task_wait_dict["future"].set_result(done_lst) + elif success and len(done_lst) > 0: + task_wait_dict["future"].set_result([f.result() for f in done_lst]) + task_wait_dict["future_skip"].set_result([id(f) for f in done_lst]) else: - task_wait_dict["future"].set_exception(done_lst) + task_wait_dict["future"].set_exception(done_lst[0].exception()) + task_wait_dict["future_skip"].set_result([id(f) for f in done_lst]) else: wait_tmp_lst.append(task_wait_dict) if len(wait_lst) == len(wait_tmp_lst): diff --git a/tests/unit/standalone/test_batched.py b/tests/unit/standalone/test_batched.py index fb0297f4..58388746 100644 --- a/tests/unit/standalone/test_batched.py +++ b/tests/unit/standalone/test_batched.py @@ -12,13 +12,21 @@ def test_batched_futures(self): f.set_result(i) lst.append(f) batched_lst = [Future(), Future(), Future()] - batched_lst[0].set_result([0, 1, 2]) - batched_lst[1].set_result([3, 4, 5]) - batched_lst[2].set_result([6, 7, 8]) - self.assertEqual(batched_futures(lst=lst, n=3, nested_skip_lst=set()), [0, 1, 2]) - self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:1], n=3), [3, 4, 5]) - self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:2], n=3), [6, 7, 8]) - self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst, n=3), [9]) + batched_lst[0].set_result([id(lst[0]), id(lst[1]), id(lst[2])]) + batched_lst[1].set_result([id(lst[3]), id(lst[4]), id(lst[5])]) + batched_lst[2].set_result([id(lst[6]), id(lst[7]), id(lst[8])]) + success, done_lst = batched_futures(lst=lst, n=3, nested_skip_lst=set()) + self.assertTrue(success) + self.assertEqual([f.result() for f in done_lst], [0, 1, 2]) + success, done_lst = batched_futures(lst=lst, nested_skip_lst=batched_lst[:1], n=3) + self.assertTrue(success) + self.assertEqual([f.result() for f in done_lst], [3, 4, 5]) + success, done_lst = batched_futures(lst=lst, nested_skip_lst=batched_lst[:2], n=3) + self.assertTrue(success) + self.assertEqual([f.result() for f in done_lst], [6, 7, 8]) + success, done_lst = batched_futures(lst=lst, nested_skip_lst=batched_lst, n=3) + self.assertTrue(success) + self.assertEqual([f.result() for f in done_lst], [9]) def test_batched_futures_duplicated(self): lst = [] @@ -28,12 +36,18 @@ def test_batched_futures_duplicated(self): f.set_result(i) lst.append(f) batched_lst = [Future(), Future(), Future()] - batched_lst[0].set_result([1, 1, 1]) - batched_lst[1].set_result([2, 2, 2]) - batched_lst[2].set_result([3, 3, 3]) - self.assertEqual(batched_futures(lst=lst, n=3, nested_skip_lst=set()), [1, 1, 1]) - self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:1], n=3), [2, 2, 2]) - self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:2], n=3), [3, 3, 3]) + batched_lst[0].set_result([id(lst[0]), id(lst[1]), id(lst[2])]) + batched_lst[1].set_result([id(lst[3]), id(lst[4]), id(lst[5])]) + batched_lst[2].set_result([id(lst[6]), id(lst[7]), id(lst[8])]) + success, done_lst = batched_futures(lst=lst, n=3, nested_skip_lst=set()) + self.assertTrue(success) + self.assertEqual([f.result() for f in done_lst], [1, 1, 1]) + success, done_lst = batched_futures(lst=lst, nested_skip_lst=batched_lst[:1], n=3) + self.assertTrue(success) + self.assertEqual([f.result() for f in done_lst], [2, 2, 2]) + success, done_lst = batched_futures(lst=lst, nested_skip_lst=batched_lst[:2], n=3) + self.assertTrue(success) + self.assertEqual([f.result() for f in done_lst], [3, 3, 3]) def test_batched_futures(self): lst = [] @@ -45,16 +59,24 @@ def test_batched_futures(self): f.set_result(i) lst.append(f) batched_lst = [Future(), Future()] - batched_lst[0].set_result([1, 2, 4]) - batched_lst[1].set_result([5, 7, 8]) - self.assertEqual(batched_futures(lst=lst, n=3, nested_skip_lst=set()), [1, 2, 4]) - self.assertEqual(batched_futures(lst=lst, nested_skip_lst=batched_lst[:1], n=3), [5, 7, 8]) + batched_lst[0].set_result([id(lst[1]), id(lst[2]), id(lst[4])]) + batched_lst[1].set_result([id(lst[5]), id(lst[7]), id(lst[8])]) + success, done_lst = batched_futures(lst=lst, n=3, nested_skip_lst=set()) + self.assertTrue(success) + self.assertEqual([f.result() for f in done_lst], [1, 2, 4]) + success, done_lst = batched_futures(lst=lst, nested_skip_lst=batched_lst[:1], n=3) + self.assertTrue(success) + self.assertEqual([f.result() for f in done_lst], [5, 7, 8]) + succss, done_lst = batched_futures(lst=lst, nested_skip_lst=batched_lst, n=3) + self.assertFalse(succss) with self.assertRaises(ValueError): - raise batched_futures(lst=lst, nested_skip_lst=batched_lst, n=3) + raise done_lst[0].exception() def test_batched_futures_not_finished(self): lst = [] for _ in list(range(10)): f = Future() lst.append(f) - self.assertEqual(batched_futures(lst=lst, n=3, nested_skip_lst=set()), []) + success, done_lst = batched_futures(lst=lst, n=3, nested_skip_lst=set()) + self.assertTrue(success) + self.assertEqual(done_lst, []) From 5b9db129e6e02be87f31cba8112daf47acb5df7e Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Thu, 11 Jun 2026 10:03:04 +0000 Subject: [PATCH 12/14] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/executorlib/standalone/batched.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/executorlib/standalone/batched.py b/src/executorlib/standalone/batched.py index 8b21b599..aa2cd488 100644 --- a/src/executorlib/standalone/batched.py +++ b/src/executorlib/standalone/batched.py @@ -31,6 +31,8 @@ def batched_futures( if len(done_lst) == n_expected: return True, done_lst if (len(lst) - len(skip_set)) == len(failed_lst): - return False, failed_lst[:n_expected] # raise the exception only after all futures have failed + return False, failed_lst[ + :n_expected + ] # raise the exception only after all futures have failed else: return True, [] From 0829e03cfc2fdf21ab40739aa927de31f050ee52 Mon Sep 17 00:00:00 2001 From: pyiron-runner Date: Thu, 11 Jun 2026 10:03:31 +0000 Subject: [PATCH 13/14] Format black --- src/executorlib/standalone/batched.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/executorlib/standalone/batched.py b/src/executorlib/standalone/batched.py index aa2cd488..8c4ab149 100644 --- a/src/executorlib/standalone/batched.py +++ b/src/executorlib/standalone/batched.py @@ -31,8 +31,9 @@ def batched_futures( if len(done_lst) == n_expected: return True, done_lst if (len(lst) - len(skip_set)) == len(failed_lst): - return False, failed_lst[ - :n_expected - ] # raise the exception only after all futures have failed + return ( + False, + failed_lst[:n_expected], + ) # raise the exception only after all futures have failed else: return True, [] From 8d6b0fb579265842ada50db3fc141f0642fc14ad Mon Sep 17 00:00:00 2001 From: Jan Janssen Date: Thu, 11 Jun 2026 12:05:18 +0200 Subject: [PATCH 14/14] fix type annotation --- src/executorlib/task_scheduler/interactive/dependency.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/executorlib/task_scheduler/interactive/dependency.py b/src/executorlib/task_scheduler/interactive/dependency.py index afaa3810..9f2a69dd 100644 --- a/src/executorlib/task_scheduler/interactive/dependency.py +++ b/src/executorlib/task_scheduler/interactive/dependency.py @@ -177,7 +177,7 @@ def batched( future_lst: list[Future] = [] for _ in range(len(iterable) // n + (1 if len(iterable) % n > 0 else 0)): f: Future = Future() - f_skip = Future() + f_skip: Future = Future() if self._future_queue is not None: self._future_queue.put( {