File tree Expand file tree Collapse file tree 1 file changed +3
-0
lines changed
Expand file tree Collapse file tree 1 file changed +3
-0
lines changed Original file line number Diff line number Diff line change @@ -2148,6 +2148,7 @@ async def _do(self, event):
21482148 # Forward termination object and StreamCompletion without processing
21492149 if event is _termination_obj or isinstance (event , StreamCompletion ):
21502150 return await self ._do_downstream (event )
2151+
21512152 event = self .preprocess_event (event )
21522153 sub_events_to_modify = []
21532154 is_full_event_batched = (
@@ -2193,6 +2194,7 @@ async def _do(self, event):
21932194 runnables_encountered .add (id (runnable ))
21942195 futures .append (future )
21952196 results = await asyncio .gather (* futures )
2197+
21962198 # Check for streaming response (only when a single runnable is selected)
21972199 if len (runnables ) == 1 and results :
21982200 result = results [0 ]
@@ -2212,6 +2214,7 @@ async def _do(self, event):
22122214 # If no runnables were selected, don't emit the event
22132215 if not results :
22142216 return None
2217+
22152218 # Use self.runnables (registered) not runnables (selected) to determine wrapping
22162219 if len (self .runnables ) == 1 :
22172220 result : _ParallelExecutionRunnableResult = results [0 ]
You can’t perform that action at this time.
0 commit comments