@@ -5642,88 +5642,31 @@ def test_flow_reuse_resets_closeables():
56425642 """Test that _closeables is properly reset when a flow is reused.
56435643
56445644 Baseline test for flow reuse without closeables. This test uses steps (Map, Reduce)
5645- that don't create closeables, so it verifies the baseline behavior. Note: This test
5646- does NOT detect the ML-11518 bug since there are no closeables to accumulate.
5645+ that don't create closeables, so it verifies the baseline behavior.
56475646 """
56485647 source = SyncEmitSource ()
56495648 map_step = Map (lambda x : x + 1 )
56505649 reduce_step = Reduce (0 , lambda acc , x : acc + x )
56515650
56525651 source .to (map_step ).to (reduce_step )
56535652
5654- first_run_closeables_count = None
56555653 for run_num in range (3 ):
56565654 controller = source .run ()
5657-
5658- # Track closeables count - should be consistent across runs
5659- if first_run_closeables_count is None :
5660- first_run_closeables_count = len (source ._closeables )
5661- else :
5662- assert len (source ._closeables ) == first_run_closeables_count , (
5663- f"Run { run_num } : source._closeables count should be { first_run_closeables_count } , "
5664- f"got { len (source ._closeables )} (closeables are accumulating!)"
5665- )
5666-
5667- for i in range (5 ):
5668- controller .emit (i )
5669- controller .terminate ()
5655+ try :
5656+ assert len (source ._closeables ) == 0
5657+ for i in range (5 ):
5658+ controller .emit (i )
5659+ finally :
5660+ controller .terminate ()
56705661 result = controller .await_termination ()
56715662 assert result == 15 , f"Run { run_num } : Expected 15 but got { result } "
56725663
56735664
5674- def test_aggregate_by_key_reuse_resets_closeables ():
5675- """Test that AggregateByKey properly resets _closeables on flow reuse.
5676-
5677- Regression test for ML-11518: Without resetting _closeables in _init(), closeables
5678- accumulate in the source's _closeables list across runs (1→2→3 instead of 1→1→1).
5679- This test DOES detect the bug by checking source._closeables remains consistent.
5680- """
5681- from storey import AggregateByKey , FieldAggregator
5682- from storey .dtypes import FixedWindows
5683-
5684- table = Table ("test" , NoopDriver ())
5685- aggregator = AggregateByKey (
5686- [FieldAggregator ("sum_col1" , "col1" , ["sum" ], FixedWindows (["1h" ]))],
5687- table ,
5688- time_field = "time" ,
5689- )
5690-
5691- source = SyncEmitSource ()
5692- reduce_step = Reduce ([], lambda acc , x : acc + [x ])
5693-
5694- source .to (aggregator ).to (reduce_step )
5695-
5696- base_time = datetime (2020 , 7 , 21 , 12 , 0 , 0 )
5697-
5698- first_run_closeables_count = None
5699- for run_num in range (3 ):
5700- controller = source .run ()
5701-
5702- # Track closeables count - should be consistent across runs
5703- if first_run_closeables_count is None :
5704- first_run_closeables_count = len (source ._closeables )
5705- else :
5706- assert len (source ._closeables ) == first_run_closeables_count , (
5707- f"Run { run_num } : source._closeables count should be { first_run_closeables_count } , "
5708- f"got { len (source ._closeables )} (closeables are accumulating!)"
5709- )
5710-
5711- # Emit some data
5712- for i in range (3 ):
5713- data = {"col1" : i , "time" : base_time }
5714- controller .emit (data , f"key{ i } " )
5715-
5716- controller .terminate ()
5717- controller .await_termination ()
5718-
5719-
57205665def test_map_with_state_reuse_resets_closeables ():
57215666 """Test that MapWithState properly resets _closeables on flow reuse.
57225667
57235668 Regression test for ML-11518: Without resetting _closeables in _init(), closeables
5724- accumulate in the source's _closeables list across runs, causing close() to be called
5725- multiple times per run (1, 2, 3 times instead of 1, 1, 1 times). This test DOES detect
5726- the bug by tracking close() call counts.
5669+ accumulate across runs, causing close() to be called multiple times per run.
57275670 """
57285671
57295672 class CloseCounter :
@@ -5743,9 +5686,10 @@ def close(self):
57435686 for run_num in range (3 ):
57445687 initial_close_count = state .close_count
57455688 controller = source .run ()
5746-
5747- controller .emit (1 )
5748- controller .terminate ()
5689+ try :
5690+ controller .emit (1 )
5691+ finally :
5692+ controller .terminate ()
57495693 controller .await_termination ()
57505694
57515695 # close() should be called exactly once per run
@@ -5760,9 +5704,7 @@ def test_map_with_state_no_closeables_without_close_method():
57605704 """Test that MapWithState doesn't add state to _closeables if it has no close method.
57615705
57625706 Edge case test: When using a plain dict as state (no close method), _closeables
5763- should remain empty. This verifies the hasattr(self._state, "close") check works
5764- correctly. Note: This test does NOT detect the ML-11518 bug since there are no
5765- closeables to accumulate.
5707+ should remain empty. This verifies the hasattr(self._state, "close") check works.
57665708 """
57675709 initial_state = {"count" : 0 }
57685710
@@ -5776,27 +5718,23 @@ def state_fn(event, state):
57765718
57775719 source .to (map_with_state ).to (reduce_step )
57785720
5779- for run_num in range (3 ):
5721+ for _ in range (3 ):
57805722 controller = source .run ()
5781-
5782- # Dict has no close method, so _closeables should be empty
5783- assert (
5784- map_with_state ._closeables == []
5785- ), f"Run { run_num } : MapWithState._closeables should be empty when state has no close method"
5786-
5787- for i in range (3 ):
5788- controller .emit ({"value" : i + 1 })
5789-
5790- controller .terminate ()
5723+ try :
5724+ # Dict has no close method, so _closeables should be empty
5725+ assert map_with_state ._closeables == []
5726+ for i in range (3 ):
5727+ controller .emit ({"value" : i + 1 })
5728+ finally :
5729+ controller .terminate ()
57915730 controller .await_termination ()
57925731
57935732
57945733def test_nosql_target_reuse_resets_closeables ():
57955734 """Test that NoSqlTarget properly resets _closeables on flow reuse.
57965735
57975736 Regression test for ML-11518: Without resetting _closeables in _init(), closeables
5798- accumulate in the source's _closeables list across runs (1→2→3 instead of 1→1→1).
5799- This test DOES detect the bug by checking source._closeables remains consistent.
5737+ accumulate in the source's _closeables list across runs.
58005738 """
58015739 table = Table ("test_nosql" , NoopDriver ())
58025740
@@ -5805,22 +5743,13 @@ def test_nosql_target_reuse_resets_closeables():
58055743
58065744 source .to (nosql_target )
58075745
5808- first_run_closeables_count = None
5809- for run_num in range (3 ):
5746+ for _ in range (3 ):
58105747 controller = source .run ()
5811-
5812- # Track closeables count - should be consistent across runs
5813- if first_run_closeables_count is None :
5814- first_run_closeables_count = len (source ._closeables )
5815- else :
5816- assert len (source ._closeables ) == first_run_closeables_count , (
5817- f"Run { run_num } : source._closeables count should be { first_run_closeables_count } , "
5818- f"got { len (source ._closeables )} (closeables are accumulating!)"
5819- )
5820-
5821- # Emit some data with keys
5822- for i in range (3 ):
5823- controller .emit (Event (body = {"col" : i }, key = f"key{ i } " ))
5824-
5825- controller .terminate ()
5748+ try :
5749+ assert len (source ._closeables ) == 1
5750+ # Emit some data with keys
5751+ for i in range (3 ):
5752+ controller .emit (Event (body = {"col" : i }, key = f"key{ i } " ))
5753+ finally :
5754+ controller .terminate ()
58265755 controller .await_termination ()
0 commit comments