File tree Expand file tree Collapse file tree 2 files changed +34
-2
lines changed
Expand file tree Collapse file tree 2 files changed +34
-2
lines changed Original file line number Diff line number Diff line change @@ -2162,11 +2162,14 @@ async def _do(self, event):
21622162 "Streaming is not supported when multiple runnables are selected. "
21632163 "Streaming runnables must be the only runnable selected for an event."
21642164 )
2165+ # If no runnables were selected, don't emit the event
2166+ if not results :
2167+ return None
2168+
21652169 # Use self.runnables (registered) not runnables (selected) to determine wrapping
21662170 if len (self .runnables ) == 1 :
21672171 result : _ParallelExecutionRunnableResult = results [0 ]
2168- event .body = result .data if results else None
2169-
2172+ event .body = result .data
21702173 metadata = {
21712174 "microsec" : result .runtime ,
21722175 "when" : result .timestamp .isoformat (sep = " " , timespec = "microseconds" ),
Original file line number Diff line number Diff line change @@ -5420,6 +5420,35 @@ def select_runnables(self, event):
54205420 assert result == {"model2" : 1 }
54215421
54225422
5423+ def test_parallel_execution_empty_selection ():
5424+ """When 1 runnable is registered but 0 are selected, event should not be emitted."""
5425+ runnable = RunnableNaiveNoOp ("model1" )
5426+
5427+ class EmptySelectParallelExecution (ParallelExecution ):
5428+ def select_runnables (self , event ):
5429+ # Return empty list - select no runnables
5430+ return []
5431+
5432+ parallel_execution = EmptySelectParallelExecution (
5433+ [runnable ],
5434+ execution_mechanism_by_runnable_name = {"model1" : "naive" },
5435+ )
5436+
5437+ controller = build_flow (
5438+ [
5439+ SyncEmitSource (),
5440+ parallel_execution ,
5441+ Reduce ([], lambda acc , x : acc + [x ]),
5442+ ]
5443+ ).run ()
5444+ controller .emit ({"value" : 42 })
5445+ controller .terminate ()
5446+ termination_result = controller .await_termination ()
5447+
5448+ # When no runnables are selected, event should not be emitted downstream
5449+ assert termination_result == []
5450+
5451+
54235452def test_enrichment ():
54245453 busy_wait_pool = RunnableBusyWait ("busy1" )
54255454 busy_wait_dedicated = RunnableBusyWait ("busy2" )
You can’t perform that action at this time.
0 commit comments