@@ -5636,3 +5636,174 @@ def test_flow_reuse_with_cycle():
56365636
56375637 controller .terminate ()
56385638 controller .await_termination ()
5639+
5640+
5641+ def test_flow_reuse_resets_closeables ():
5642+ """Test that _closeables is properly reset when a flow is reused.
5643+
5644+ When a flow is run multiple times, _init() should reset _closeables to an empty
5645+ list to prevent accumulation of closeable resources across runs.
5646+ """
5647+ source = SyncEmitSource ()
5648+ map_step = Map (lambda x : x + 1 )
5649+ reduce_step = Reduce (0 , lambda acc , x : acc + x )
5650+
5651+ source .to (map_step ).to (reduce_step )
5652+
5653+ # Run the flow multiple times and verify _closeables is reset each time
5654+ for run_num in range (3 ):
5655+ # Before running, _closeables should be empty (from previous _init or initial state)
5656+ controller = source .run ()
5657+
5658+ # After run(), _init() has been called and _closeables should be reset
5659+ # The base Flow class initializes _closeables to [] in _init()
5660+ assert map_step ._closeables == [], f"Run { run_num } : map_step._closeables should be empty"
5661+ assert reduce_step ._closeables == [], f"Run { run_num } : reduce_step._closeables should be empty"
5662+
5663+ for i in range (5 ):
5664+ controller .emit (i )
5665+ controller .terminate ()
5666+ result = controller .await_termination ()
5667+ assert result == 15 , f"Run { run_num } : Expected 15 but got { result } "
5668+
5669+
5670+ def test_aggregate_by_key_reuse_resets_closeables ():
5671+ """Test that AggregateByKey properly resets _closeables on flow reuse.
5672+
5673+ AggregateByKey sets _closeables = [self._table] in _init(), which should
5674+ happen fresh on each run rather than accumulating tables.
5675+ """
5676+ from storey import AggregateByKey , FieldAggregator
5677+ from storey .dtypes import FixedWindows
5678+
5679+ table = Table ("test" , NoopDriver ())
5680+ aggregator = AggregateByKey (
5681+ [FieldAggregator ("sum_col1" , "col1" , ["sum" ], FixedWindows (["1h" ]))],
5682+ table ,
5683+ time_field = "time" ,
5684+ )
5685+
5686+ source = SyncEmitSource ()
5687+ reduce_step = Reduce ([], lambda acc , x : acc + [x ])
5688+
5689+ source .to (aggregator ).to (reduce_step )
5690+
5691+ base_time = datetime (2020 , 7 , 21 , 12 , 0 , 0 )
5692+
5693+ for run_num in range (3 ):
5694+ controller = source .run ()
5695+
5696+ # After _init(), _closeables should contain exactly one table reference
5697+ assert len (aggregator ._closeables ) == 1 , (
5698+ f"Run { run_num } : AggregateByKey._closeables should have exactly 1 item, "
5699+ f"got { len (aggregator ._closeables )} "
5700+ )
5701+ assert aggregator ._closeables [0 ] is table , f"Run { run_num } : AggregateByKey._closeables[0] should be the table"
5702+
5703+ # Emit some data
5704+ for i in range (3 ):
5705+ data = {"col1" : i , "time" : base_time }
5706+ controller .emit (data , f"key{ i } " )
5707+
5708+ controller .terminate ()
5709+ controller .await_termination ()
5710+
5711+
5712+ def test_map_with_state_reuse_resets_closeables ():
5713+ """Test that MapWithState properly resets _closeables on flow reuse.
5714+
5715+ MapWithState (via _FunctionWithStateFlow) sets _closeables = [self._state]
5716+ in _init() when state has a close method (like Table).
5717+ """
5718+ table = Table ("test_state" , NoopDriver ())
5719+
5720+ def state_fn (event , state ):
5721+ state ["count" ] = state .get ("count" , 0 ) + 1
5722+ event ["count" ] = state ["count" ]
5723+ return event , state
5724+
5725+ source = SyncEmitSource ()
5726+ map_with_state = MapWithState (table , state_fn , group_by_key = True )
5727+ reduce_step = Reduce ([], lambda acc , x : acc + [x ])
5728+
5729+ source .to (map_with_state ).to (reduce_step )
5730+
5731+ for run_num in range (3 ):
5732+ controller = source .run ()
5733+
5734+ # Table has a close method, so _closeables should contain exactly the table
5735+ assert len (map_with_state ._closeables ) == 1 , (
5736+ f"Run { run_num } : MapWithState._closeables should have exactly 1 item, "
5737+ f"got { len (map_with_state ._closeables )} "
5738+ )
5739+ assert map_with_state ._closeables [0 ] is table , f"Run { run_num } : MapWithState._closeables[0] should be the table"
5740+
5741+ # Emit some data
5742+ for i in range (3 ):
5743+ controller .emit (Event (body = {"value" : i }, key = f"key{ i } " ))
5744+
5745+ controller .terminate ()
5746+ controller .await_termination ()
5747+
5748+
5749+ def test_map_with_state_no_closeables_without_close_method ():
5750+ """Test that MapWithState doesn't add state to _closeables if it has no close method.
5751+
5752+ When using a plain dict as state (no close method), _closeables should remain empty.
5753+ """
5754+ initial_state = {"count" : 0 }
5755+
5756+ def state_fn (event , state ):
5757+ state ["count" ] += 1
5758+ return event ["value" ] * state ["count" ], state
5759+
5760+ source = SyncEmitSource ()
5761+ map_with_state = MapWithState (initial_state , state_fn , group_by_key = False )
5762+ reduce_step = Reduce (0 , lambda acc , x : acc + x )
5763+
5764+ source .to (map_with_state ).to (reduce_step )
5765+
5766+ for run_num in range (3 ):
5767+ controller = source .run ()
5768+
5769+ # Dict has no close method, so _closeables should be empty
5770+ assert (
5771+ map_with_state ._closeables == []
5772+ ), f"Run { run_num } : MapWithState._closeables should be empty when state has no close method"
5773+
5774+ for i in range (3 ):
5775+ controller .emit ({"value" : i + 1 })
5776+
5777+ controller .terminate ()
5778+ controller .await_termination ()
5779+
5780+
5781+ def test_nosql_target_reuse_resets_closeables ():
5782+ """Test that NoSqlTarget properly resets _closeables on flow reuse.
5783+
5784+ NoSqlTarget sets _closeables = [self._table] in _init(), which should
5785+ happen fresh on each run.
5786+ """
5787+ table = Table ("test_nosql" , NoopDriver ())
5788+
5789+ source = SyncEmitSource ()
5790+ nosql_target = NoSqlTarget (table )
5791+
5792+ source .to (nosql_target )
5793+
5794+ for run_num in range (3 ):
5795+ controller = source .run ()
5796+
5797+ # After _init(), _closeables should contain exactly one table reference
5798+ assert len (nosql_target ._closeables ) == 1 , (
5799+ f"Run { run_num } : NoSqlTarget._closeables should have exactly 1 item, "
5800+ f"got { len (nosql_target ._closeables )} "
5801+ )
5802+ assert nosql_target ._closeables [0 ] is table , f"Run { run_num } : NoSqlTarget._closeables[0] should be the table"
5803+
5804+ # Emit some data with keys
5805+ for i in range (3 ):
5806+ controller .emit (Event (body = {"col" : i }, key = f"key{ i } " ))
5807+
5808+ controller .terminate ()
5809+ controller .await_termination ()
0 commit comments