@@ -293,10 +293,7 @@ async def _do_downstream(self, event, outlets=None):
293293 outlet_names = self ._selected_outlets
294294 self ._selected_outlets = None
295295 else :
296- if asyncio .iscoroutinefunction (self .select_outlets ):
297- outlet_names = await self .select_outlets (event .body )
298- else :
299- outlet_names = self .select_outlets (event .body )
296+ outlet_names = self .select_outlets (event .body )
300297 outlets = self ._check_outlets_by_names (outlet_names ) if outlet_names else self ._outlets
301298
302299 if not outlets :
@@ -402,10 +399,7 @@ def check_and_update_iteration_number(self, event) -> Optional[Callable]:
402399 event ._cyclic_counter = {self .name : 1 }
403400
404401 def get_iteration_counter (self , event ):
405- if not hasattr (event , "_cyclic_counter" ):
406- return 0
407- else :
408- return event ._cyclic_counter .get (self .name , 0 )
402+ return getattr (event , "_cyclic_counter" , {}).get (self .name , 0 )
409403
410404 def select_outlets (self , event ) -> Optional [Collection [str ]]:
411405 """
@@ -511,6 +505,9 @@ def __init__(
511505 if fn_select_outlets and not callable (fn_select_outlets ):
512506 raise TypeError (f"Expected fn_select_outlets to be callable, got { type (fn )} " )
513507 self ._outlets_selector = fn_select_outlets
508+ self ._create_name_to_outlet = self ._outlets_selector is not None or self ._method_is_overridden (
509+ "select_outlets" , _UnaryFunctionFlow
510+ )
514511
515512 async def _call (self , element , fn , pass_kwargs = True ):
516513 if self ._long_running :
@@ -538,18 +535,12 @@ async def _do(self, event):
538535 fn_result = await self ._call (element , self ._fn )
539536 await self ._do_internal (event , fn_result )
540537
541- async def select_outlets (self , event_body ) -> Optional [Collection [str ]]:
538+ def select_outlets (self , event_body ) -> Optional [Collection [str ]]:
542539 if self ._outlets_selector :
543540 return self ._outlets_selector (event_body )
544541 else :
545542 return super ().select_outlets (event_body )
546543
547- def _init (self ):
548- self ._create_name_to_outlet = self ._outlets_selector is not None or self ._method_is_overridden (
549- "select_outlets" , _UnaryFunctionFlow
550- )
551- super ()._init ()
552-
553544
554545class DropColumns (Flow ):
555546 def __init__ (self , columns , ** kwargs ):
0 commit comments