@@ -235,6 +235,42 @@ def test_scatter_estimate_tracks_skewed_items(tmp_path):
235235 assert writer ._mid_write_flushes > 0 , "expected mid-write flushes for large items"
236236
237237
238+ def test_scatter_estimate_adapts_to_gradual_drift (tmp_path ):
239+ """Write-time EMA bounds peak buffered rows even when item sizes grow gradually."""
240+ num_shards = 1
241+ data_path = str (tmp_path / "shard-0000.shuffle" )
242+
243+ # Items grow linearly from ~100 B to ~100 KB across 200 records.
244+ # If all 200 were buffered at once the real RSS would be ~10 MB.
245+ n_items = 200
246+ items = [{"k" : 0 , "v" : "x" * (100 + i * 500 )} for i in range (n_items )]
247+
248+ # 500 KB budget. With a frozen first-item estimate (~110 B) the budget check
249+ # would read 200 * 110 = 22 KB < 500 KB and never flush mid-write, letting
250+ # all items accumulate. With EMA adaptation the estimate tracks the growing
251+ # sizes and flushes before peak RSS reaches the budget.
252+ budget = 500_000
253+ writer = ScatterWriter (
254+ data_path = data_path ,
255+ key_fn = _key ,
256+ num_output_shards = num_shards ,
257+ buffer_limit_bytes = budget ,
258+ )
259+ for item in items :
260+ writer .write (item )
261+ writer .close ()
262+
263+ scatter_paths = [data_path ]
264+ recovered = list (ScatterReader .from_sidecars (scatter_paths , 0 ))
265+ assert sorted (recovered , key = lambda x : x ["v" ]) == sorted (items , key = lambda x : x ["v" ])
266+
267+ assert writer ._mid_write_flushes > 0 , "expected mid-write flushes as item sizes grew"
268+ assert writer ._peak_buffer_rows < n_items , (
269+ f"peak_buffer_rows={ writer ._peak_buffer_rows } should be < { n_items } ; "
270+ "a frozen estimate lets all items accumulate before close()"
271+ )
272+
273+
238274def test_scatter_byte_budget_preserves_all_items (tmp_path ):
239275 """Items are not lost or duplicated when byte-budget flushes fire mid-write."""
240276 num_shards = 3
0 commit comments