Skip to content

Commit 4ff9e23

Browse files
committed
Dedup local code
1 parent 51a1bfb commit 4ff9e23

File tree

1 file changed

+13
-16
lines changed

1 file changed

+13
-16
lines changed

storey/flow.py

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -568,23 +568,20 @@ async def _emit_streaming_chunks(self, event, generator: Union[Generator, AsyncG
568568
"""
569569
self._validate_not_already_streaming(event)
570570

571+
async def gen_to_async_gen(sync_gen):
572+
for item in sync_gen:
573+
yield item
574+
575+
# If needed, wrap sync generator as async to unify iteration
576+
async_gen = gen_to_async_gen(generator) if inspect.isgenerator(generator) else generator
577+
571578
chunk_id = 0
572-
if inspect.isgenerator(generator):
573-
# Sync generator
574-
for chunk_body in generator:
575-
chunk_event = self._user_fn_output_to_event(event, chunk_body)
576-
chunk_event.streaming_step = self.name
577-
chunk_event.chunk_id = chunk_id
578-
await self._do_downstream(chunk_event)
579-
chunk_id += 1
580-
else:
581-
# Async generator
582-
async for chunk_body in generator:
583-
chunk_event = self._user_fn_output_to_event(event, chunk_body)
584-
chunk_event.streaming_step = self.name
585-
chunk_event.chunk_id = chunk_id
586-
await self._do_downstream(chunk_event)
587-
chunk_id += 1
579+
async for chunk_body in async_gen:
580+
chunk_event = self._user_fn_output_to_event(event, chunk_body)
581+
chunk_event.streaming_step = self.name
582+
chunk_event.chunk_id = chunk_id
583+
await self._do_downstream(chunk_event)
584+
chunk_id += 1
588585

589586
# Send completion signal
590587
await self._do_downstream(StreamCompletion(self.name, event))

0 commit comments

Comments
 (0)