@@ -62,6 +62,15 @@ def _is_generator(obj) -> bool:
6262 return inspect .isgenerator (obj ) or inspect .isasyncgen (obj )
6363
6464
65+ def is_batched_event (event ) -> bool :
66+ return (
67+ not isinstance (event , StreamCompletion )
68+ and isinstance (getattr (event , "body" , None ), list )
69+ and event .body
70+ and all (hasattr (sub_event , "body" ) for sub_event in event .body )
71+ )
72+
73+
6574class Flow :
6675 _legal_first_step = False
6776
@@ -381,16 +390,10 @@ async def _do_downstream(self, event, outlets=None, select_outlets: bool = True)
381390 # Deep copy event and create a task per outlet (except the first, which is awaited directly below)
382391 is_stream_completion = isinstance (event , StreamCompletion )
383392 target_obj = event .original_event if is_stream_completion else event
384- is_batched = (
385- not is_stream_completion
386- and isinstance (getattr (event , "body" , None ), list )
387- and event .body
388- and all (hasattr (sub_event , "body" ) for sub_event in event .body )
389- )
390393
391394 for i in range (1 , len (outlets )):
392395 event_copy = self ._deepcopy_event (
393- event , target_obj , is_stream_completion = is_stream_completion , is_batched = is_batched
396+ event , target_obj , is_stream_completion = is_stream_completion , is_batched = is_batched_event ( event )
394397 )
395398 tasks .append (asyncio .get_running_loop ().create_task (outlets [i ]._do_and_recover (event_copy )))
396399 if self .verbose and self .logger :
@@ -2213,12 +2216,7 @@ async def _do(self, event):
22132216
22142217 event = self .preprocess_event (event )
22152218 sub_events_to_modify = []
2216- is_full_event_batched = (
2217- not isinstance (event , StreamCompletion )
2218- and isinstance (getattr (event , "body" , None ), list )
2219- and event .body
2220- and all (hasattr (sub_event , "body" ) for sub_event in event .body )
2221- )
2219+ is_full_event_batched = is_batched_event (event )
22222220 if is_full_event_batched :
22232221 event_bodies = []
22242222 for sub_event in event .body :
0 commit comments