@@ -625,10 +625,8 @@ async def _do_internal(self, element, fn_result):
625625 raise NotImplementedError ()
626626
627627 async def _do (self , event ):
628- if event is _termination_obj :
629- return await self ._do_downstream (_termination_obj )
630- # Forward StreamCompletion without processing
631- if isinstance (event , StreamCompletion ):
628+ # Forward termination object and StreamCompletion without processing
629+ if event is _termination_obj or isinstance (event , StreamCompletion ):
632630 return await self ._do_downstream (event )
633631 element = self ._get_event_or_body (event )
634632 fn_result = await self ._call (element , self ._fn )
@@ -859,10 +857,8 @@ async def _call(self, event):
859857 return res
860858
861859 async def _do (self , event ):
862- if event is _termination_obj :
863- return await self ._do_downstream (_termination_obj )
864- # Forward StreamCompletion without processing
865- if isinstance (event , StreamCompletion ):
860+ # Forward termination object and StreamCompletion without processing
861+ if event is _termination_obj or isinstance (event , StreamCompletion ):
866862 return await self ._do_downstream (event )
867863 element = self ._get_event_or_body (event )
868864 fn_result = await self ._call (element )
@@ -2115,10 +2111,8 @@ def _init(self):
21152111 self .runnable_executor .init_executors ()
21162112
21172113 async def _do (self , event ):
2118- if event is _termination_obj :
2119- return await self ._do_downstream (_termination_obj )
2120- # Forward StreamCompletion without processing
2121- if isinstance (event , StreamCompletion ):
2114+ # Forward termination object and StreamCompletion without processing
2115+ if event is _termination_obj or isinstance (event , StreamCompletion ):
21222116 return await self ._do_downstream (event )
21232117
21242118 event = self .preprocess_event (event )
0 commit comments