Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions storey/flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2162,11 +2162,14 @@ async def _do(self, event):
"Streaming is not supported when multiple runnables are selected. "
"Streaming runnables must be the only runnable selected for an event."
)
# If no runnables were selected, don't emit the event
if not results:
return None

# Use self.runnables (registered) not runnables (selected) to determine wrapping
if len(self.runnables) == 1:
result: _ParallelExecutionRunnableResult = results[0]
event.body = result.data if results else None

event.body = result.data
metadata = {
"microsec": result.runtime,
"when": result.timestamp.isoformat(sep=" ", timespec="microseconds"),
Expand Down
29 changes: 29 additions & 0 deletions tests/test_flow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5420,6 +5420,35 @@ def select_runnables(self, event):
assert result == {"model2": 1}


def test_parallel_execution_empty_selection():
"""When 1 runnable is registered but 0 are selected, event should not be emitted."""
runnable = RunnableNaiveNoOp("model1")

class EmptySelectParallelExecution(ParallelExecution):
def select_runnables(self, event):
# Return empty list - select no runnables
return []

parallel_execution = EmptySelectParallelExecution(
[runnable],
execution_mechanism_by_runnable_name={"model1": "naive"},
)

controller = build_flow(
[
SyncEmitSource(),
parallel_execution,
Reduce([], lambda acc, x: acc + [x]),
]
).run()
controller.emit({"value": 42})
controller.terminate()
termination_result = controller.await_termination()

# When no runnables are selected, event should not be emitted downstream
assert termination_result == []


def test_enrichment():
busy_wait_pool = RunnableBusyWait("busy1")
busy_wait_dedicated = RunnableBusyWait("busy2")
Expand Down