3737from storey .flow import _is_generator
3838
3939
40+ class StreamingRunnable (ParallelExecutionRunnable ):
41+ """A streaming runnable that yields 3 chunks for testing."""
42+
43+ def run (self , body , path , origin_name = None ):
44+ for i in range (3 ):
45+ yield f"{ body } _chunk_{ i } "
46+
47+
48+ class AsyncStreamingRunnable (ParallelExecutionRunnable ):
49+ """An async streaming runnable that yields 3 chunks for testing."""
50+
51+ async def run_async (self , body , path , origin_name = None ):
52+ for i in range (3 ):
53+ yield f"{ body } _chunk_{ i } "
54+
55+
4056class TestStreamingPrimitives :
4157 """Tests for streaming primitive classes."""
4258
@@ -1075,12 +1091,6 @@ class TestParallelExecutionStreaming:
10751091
10761092 def test_parallel_execution_single_runnable_streaming (self ):
10771093 """Test streaming with a single runnable."""
1078-
1079- class StreamingRunnable (ParallelExecutionRunnable ):
1080- def run (self , body , path , origin_name = None ):
1081- for i in range (3 ):
1082- yield f"{ body } _chunk_{ i } "
1083-
10841094 runnable = StreamingRunnable (name = "streamer" )
10851095 controller = build_flow (
10861096 [
@@ -1104,12 +1114,6 @@ def run(self, body, path, origin_name=None):
11041114
11051115 def test_parallel_execution_async_runnable_streaming (self ):
11061116 """Test streaming with an async runnable."""
1107-
1108- class AsyncStreamingRunnable (ParallelExecutionRunnable ):
1109- async def run_async (self , body , path , origin_name = None ):
1110- for i in range (3 ):
1111- yield f"{ body } _chunk_{ i } "
1112-
11131117 runnable = AsyncStreamingRunnable (name = "async_streamer" )
11141118 controller = build_flow (
11151119 [
@@ -1135,11 +1139,6 @@ def test_async_parallel_execution_single_runnable_streaming(self):
11351139 """Async version: Test streaming with a single runnable."""
11361140
11371141 async def _test ():
1138- class StreamingRunnable (ParallelExecutionRunnable ):
1139- def run (self , body , path , origin_name = None ):
1140- for i in range (3 ):
1141- yield f"{ body } _chunk_{ i } "
1142-
11431142 runnable = StreamingRunnable (name = "streamer" )
11441143 controller = build_flow (
11451144 [
@@ -1167,11 +1166,6 @@ def test_async_parallel_execution_async_runnable_streaming(self):
11671166 """Async version: Test streaming with an async runnable."""
11681167
11691168 async def _test ():
1170- class AsyncStreamingRunnable (ParallelExecutionRunnable ):
1171- async def run_async (self , body , path , origin_name = None ):
1172- for i in range (3 ):
1173- yield f"{ body } _chunk_{ i } "
1174-
11751169 runnable = AsyncStreamingRunnable (name = "async_streamer" )
11761170 controller = build_flow (
11771171 [
@@ -1197,12 +1191,6 @@ async def run_async(self, body, path, origin_name=None):
11971191
11981192 def test_parallel_execution_streaming_with_thread_pool (self ):
11991193 """Test streaming works with thread_pool execution mechanism."""
1200-
1201- class StreamingRunnable (ParallelExecutionRunnable ):
1202- def run (self , body , path , origin_name = None ):
1203- for i in range (3 ):
1204- yield f"{ body } _chunk_{ i } "
1205-
12061194 runnable = StreamingRunnable (name = "streamer" )
12071195 controller = build_flow (
12081196 [
@@ -1226,12 +1214,6 @@ def run(self, body, path, origin_name=None):
12261214
12271215 def test_parallel_execution_streaming_with_shared_executor_thread_based (self ):
12281216 """Test streaming works with shared_executor when the shared executor uses threads."""
1229-
1230- class StreamingRunnable (ParallelExecutionRunnable ):
1231- def run (self , body , path , origin_name = None ):
1232- for i in range (3 ):
1233- yield f"{ body } _chunk_{ i } "
1234-
12351217 # Create a shared executor with a thread-based runnable
12361218 shared_executor = RunnableExecutor ()
12371219 shared_runnable = StreamingRunnable (name = "shared_streamer" )
@@ -1273,12 +1255,6 @@ def __init__(self, executor):
12731255 )
12741256 def test_parallel_execution_streaming_with_process_based_fails_at_init (self , mechanism ):
12751257 """Test that StreamingError is raised at init time when streaming runnable uses process-based mechanism."""
1276-
1277- class StreamingRunnable (ParallelExecutionRunnable ):
1278- def run (self , body , path , origin_name = None ):
1279- for i in range (3 ):
1280- yield f"{ body } _chunk_{ i } "
1281-
12821258 runnable = StreamingRunnable (name = "streamer" )
12831259
12841260 flow = build_flow (
@@ -1302,12 +1278,6 @@ def run(self, body, path, origin_name=None):
13021278
13031279 def test_parallel_execution_streaming_with_shared_executor_process_based_fails_at_init (self ):
13041280 """Test that StreamingError is raised at init when shared_executor uses process-based mechanism."""
1305-
1306- class StreamingRunnable (ParallelExecutionRunnable ):
1307- def run (self , body , path , origin_name = None ):
1308- for i in range (3 ):
1309- yield f"{ body } _chunk_{ i } "
1310-
13111281 # Create a shared executor with a process-based runnable
13121282 shared_executor = RunnableExecutor ()
13131283 shared_runnable = StreamingRunnable (name = "shared_streamer" )
0 commit comments