File tree Expand file tree Collapse file tree 1 file changed +24
-0
lines changed
Expand file tree Collapse file tree 1 file changed +24
-0
lines changed Original file line number Diff line number Diff line change @@ -458,6 +458,30 @@ def stream_chunks(x):
458458
459459 asyncio .run (_test ())
460460
461+ def test_async_collector_single_chunk_unwrap (self ):
462+ """Async version: Test that a single chunk is unwrapped by Collector."""
463+
464+ async def _test ():
465+ def single_chunk (x ):
466+ yield x * 2
467+
468+ controller = build_flow (
469+ [
470+ AsyncEmitSource (),
471+ Map (single_chunk ),
472+ Collector (),
473+ Reduce ([], lambda acc , x : acc + [x ]),
474+ ]
475+ ).run ()
476+
477+ await controller .emit (5 )
478+ await controller .terminate ()
479+ result = await controller .await_termination ()
480+
481+ assert result == [10 ]
482+
483+ asyncio .run (_test ())
484+
461485 def test_collector_empty_stream (self ):
462486 """Test that Collector emits an empty list for a stream with zero chunks."""
463487
You can’t perform that action at this time.
0 commit comments