From ac448e88f109e89124218258d87434d0ab2e8f59 Mon Sep 17 00:00:00 2001 From: Gal Topper Date: Tue, 27 Jan 2026 14:37:04 +0700 Subject: [PATCH] Fix `IndexError in `ParallelExecution` Fix `IndexError: list index out of range` when `select_outlets` returns an empty list. Instead, refrain from emitting the event. An alternative approach could be to raise an exception. --- storey/flow.py | 7 +++++-- tests/test_flow.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/storey/flow.py b/storey/flow.py index 7d707739..9431261c 100644 --- a/storey/flow.py +++ b/storey/flow.py @@ -2162,11 +2162,14 @@ async def _do(self, event): "Streaming is not supported when multiple runnables are selected. " "Streaming runnables must be the only runnable selected for an event." ) + # If no runnables were selected, don't emit the event + if not results: + return None + # Use self.runnables (registered) not runnables (selected) to determine wrapping if len(self.runnables) == 1: result: _ParallelExecutionRunnableResult = results[0] - event.body = result.data if results else None - + event.body = result.data metadata = { "microsec": result.runtime, "when": result.timestamp.isoformat(sep=" ", timespec="microseconds"), diff --git a/tests/test_flow.py b/tests/test_flow.py index 3bb91f23..5a11650d 100644 --- a/tests/test_flow.py +++ b/tests/test_flow.py @@ -5420,6 +5420,35 @@ def select_runnables(self, event): assert result == {"model2": 1} +def test_parallel_execution_empty_selection(): + """When 1 runnable is registered but 0 are selected, event should not be emitted.""" + runnable = RunnableNaiveNoOp("model1") + + class EmptySelectParallelExecution(ParallelExecution): + def select_runnables(self, event): + # Return empty list - select no runnables + return [] + + parallel_execution = EmptySelectParallelExecution( + [runnable], + execution_mechanism_by_runnable_name={"model1": "naive"}, + ) + + controller = build_flow( + [ + SyncEmitSource(), + parallel_execution, + Reduce([], lambda acc, x: acc + [x]), + ] + ).run() + controller.emit({"value": 42}) + controller.terminate() + termination_result = controller.await_termination() + + # When no runnables are selected, event should not be emitted downstream + assert termination_result == [] + + def test_enrichment(): busy_wait_pool = RunnableBusyWait("busy1") busy_wait_dedicated = RunnableBusyWait("busy2")