Skip to content

Commit 260f105

Browse files
committed
Fix style
1 parent cf293ab commit 260f105

File tree

2 files changed

+9
-7
lines changed

2 files changed

+9
-7
lines changed

storey/steps/collector.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ async def _do(self, event):
5353
# Handle StreamCompletion sentinel
5454
if isinstance(event, StreamCompletion):
5555
stream_id = event.original_event.id
56-
stream_data = self._collected_streams[stream_id] # Use [] to trigger defaultdict
56+
stream_data = self._collected_streams[stream_id]
5757

5858
stream_data["completions"] += 1
5959

tests/test_streaming.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1008,7 +1008,8 @@ def stream_chunks(x):
10081008

10091009
# Collector should receive chunks from both branches and emit collected list
10101010
# Each branch processes each chunk, so we get 4 items total (2 chunks x 2 branches)
1011-
assert [set(*result)] == [{"a_test_chunk_0", "a_test_chunk_1", "b_test_chunk_0", "b_test_chunk_1"}]
1011+
assert set(result[0]) == {"a_test_chunk_0", "a_test_chunk_1", "b_test_chunk_0", "b_test_chunk_1"}
1012+
assert len(result) == 1
10121013

10131014
def test_async_streaming_graph_split_collector_expected_completions_2(self):
10141015
"""Async version: Test streaming through a split with Collector(expected_completions=2)."""
@@ -1036,7 +1037,8 @@ def stream_chunks(x):
10361037
await controller.terminate()
10371038
result = await controller.await_termination()
10381039

1039-
assert [set(*result)] == [{"a_test_chunk_0", "a_test_chunk_1", "b_test_chunk_0", "b_test_chunk_1"}]
1040+
assert set(result[0]) == {"a_test_chunk_0", "a_test_chunk_1", "b_test_chunk_0", "b_test_chunk_1"}
1041+
assert len(result) == 1
10401042

10411043
asyncio.run(_test())
10421044

@@ -1070,7 +1072,7 @@ def stream_chunks(x):
10701072
chunks = list(result)
10711073

10721074
# Should have 4 chunks total (2 chunks x 2 branches)
1073-
assert [set(chunks)] == [{"a_test_chunk_0", "a_test_chunk_1", "b_test_chunk_0", "b_test_chunk_1"}]
1075+
assert set(chunks) == {"a_test_chunk_0", "a_test_chunk_1", "b_test_chunk_0", "b_test_chunk_1"}
10741076
finally:
10751077
controller.terminate()
10761078
controller.await_termination()
@@ -1102,7 +1104,7 @@ def stream_chunks(x):
11021104
assert inspect.isasyncgen(result)
11031105
chunks = [chunk async for chunk in result]
11041106

1105-
assert [set(chunks)] == [{"a_test_chunk_0", "a_test_chunk_1", "b_test_chunk_0", "b_test_chunk_1"}]
1107+
assert set(chunks) == {"a_test_chunk_0", "a_test_chunk_1", "b_test_chunk_0", "b_test_chunk_1"}
11061108
finally:
11071109
await controller.terminate()
11081110
await controller.await_termination()
@@ -1138,7 +1140,7 @@ def stream_chunks(x):
11381140
chunks = list(result)
11391141

11401142
# Should have 4 chunks total (2 chunks x 2 branches)
1141-
assert [set(chunks)] == [{"a_test_chunk_0", "a_test_chunk_1", "b_test_chunk_0", "b_test_chunk_1"}]
1143+
assert set(chunks) == {"a_test_chunk_0", "a_test_chunk_1", "b_test_chunk_0", "b_test_chunk_1"}
11421144
finally:
11431145
controller.terminate()
11441146
controller.await_termination()
@@ -1169,7 +1171,7 @@ def stream_chunks(x):
11691171
assert inspect.isasyncgen(result)
11701172
chunks = [chunk async for chunk in result]
11711173

1172-
assert [set(chunks)] == [{"a_test_chunk_0", "a_test_chunk_1", "b_test_chunk_0", "b_test_chunk_1"}]
1174+
assert set(chunks) == {"a_test_chunk_0", "a_test_chunk_1", "b_test_chunk_0", "b_test_chunk_1"}
11731175
finally:
11741176
await controller.terminate()
11751177
await controller.await_termination()

0 commit comments

Comments
 (0)