@@ -339,7 +339,7 @@ def _deepcopy_event_for_outlet(self, event, target_obj, is_stream_completion):
339339 return event_copy
340340
341341 async def _do_downstream (self , event , outlets = None , select_outlets : bool = True ):
342- # StreamCompletion objects don't have a body - skip outlet selection for them
342+ # Termination object and StreamCompletion should propagate to all outlets
343343 if not outlets and event is not _termination_obj and not isinstance (event , StreamCompletion ) and select_outlets :
344344 outlet_names = self .select_outlets (event .body )
345345 outlets = self ._check_outlets_by_names (outlet_names ) if outlet_names else None
@@ -365,10 +365,9 @@ async def _do_downstream(self, event, outlets=None, select_outlets: bool = True)
365365 # If there is more than one outlet, allow concurrent execution.
366366 tasks = []
367367 if len (outlets ) > 1 :
368- # Determine target object for unpicklable attributes
368+ # Deep copy event and create a task per outlet (except the first, which is awaited directly below)
369369 is_stream_completion = isinstance (event , StreamCompletion )
370370 target_obj = event .original_event if is_stream_completion else event
371-
372371 for i in range (1 , len (outlets )):
373372 event_copy = self ._deepcopy_event_for_outlet (event , target_obj , is_stream_completion )
374373 tasks .append (asyncio .get_running_loop ().create_task (outlets [i ]._do_and_recover (event_copy )))
@@ -436,7 +435,7 @@ def _check_step_in_flow(self, type_to_check, visited=None):
436435 return False
437436
438437 def check_and_update_iteration_number (self , event ) -> Optional [Callable ]:
439- # Skip iteration counting for StreamCompletion objects
438+ # Skip iteration counting in case of StreamCompletion
440439 if isinstance (event , StreamCompletion ):
441440 return
442441 if hasattr (event , "_cyclic_counter" ) and self ._max_iterations is not None :
@@ -505,22 +504,18 @@ def _init(self):
505504 self ._passthrough_for_preview = list (self ._name_to_outlet ) == ["dataframe" ] if self ._name_to_outlet else False
506505
507506 async def _do (self , event ):
508- if event is _termination_obj :
509- return await self ._do_downstream (_termination_obj , select_outlets = False )
510- # StreamCompletion should propagate to all outlets (like _termination_obj)
511- # to avoid hangs in cyclic graphs and ensure all Collectors receive completions
512- if isinstance (event , StreamCompletion ):
507+ if event is _termination_obj or isinstance (event , StreamCompletion ):
513508 return await self ._do_downstream (event , select_outlets = False )
509+
510+ event_body = event if self ._full_event else event .body
511+ outlet_names = self .select_outlets (event_body )
512+ outlets = []
513+ if self ._passthrough_for_preview :
514+ outlet = self ._name_to_outlet ["dataframe" ]
515+ outlets .append (outlet )
514516 else :
515- event_body = event if self ._full_event else event .body
516- outlet_names = self .select_outlets (event_body )
517- outlets = []
518- if self ._passthrough_for_preview :
519- outlet = self ._name_to_outlet ["dataframe" ]
520- outlets .append (outlet )
521- else :
522- outlets = self ._check_outlets_by_names (outlet_names )
523- return await self ._do_downstream (event , outlets = outlets , select_outlets = False )
517+ outlets = self ._check_outlets_by_names (outlet_names )
518+ return await self ._do_downstream (event , outlets = outlets , select_outlets = False )
524519
525520
526521class Recover (Flow ):
0 commit comments